Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 59 additions & 39 deletions arrow-pyarrow/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,10 @@ use arrow_data::ArrayData;
use arrow_schema::{ArrowError, DataType, Field, Schema, SchemaRef};
use pyo3::exceptions::{PyTypeError, PyValueError};
use pyo3::ffi::Py_uintptr_t;
use pyo3::import_exception;
use pyo3::prelude::*;
use pyo3::pybacked::PyBackedStr;
use pyo3::types::{PyCapsule, PyDict, PyList, PyTuple};
use pyo3::{import_exception, intern};
use pyo3::sync::PyOnceLock;
use pyo3::types::{PyCapsule, PyDict, PyList, PyTuple, PyType};

import_exception!(pyarrow, ArrowException);
/// Represents an exception raised by PyArrow.
Expand Down Expand Up @@ -118,17 +118,13 @@ impl<T: ToPyArrow> IntoPyArrow for T {
}
}

fn validate_class(expected: &str, value: &Bound<PyAny>) -> PyResult<()> {
let pyarrow = PyModule::import(value.py(), "pyarrow")?;
let class = pyarrow.getattr(expected)?;
if !value.is_instance(&class)? {
let expected_module = class.getattr("__module__")?.extract::<PyBackedStr>()?;
let expected_name = class.getattr("__name__")?.extract::<PyBackedStr>()?;
fn validate_class(expected: &Bound<PyType>, value: &Bound<PyAny>) -> PyResult<()> {
if !value.is_instance(expected)? {
let expected_module = expected.getattr("__module__")?;
let expected_name = expected.getattr("__name__")?;
let found_class = value.get_type();
let found_module = found_class
.getattr("__module__")?
.extract::<PyBackedStr>()?;
let found_name = found_class.getattr("__name__")?.extract::<PyBackedStr>()?;
let found_module = found_class.getattr("__module__")?;
let found_name = found_class.getattr("__name__")?;
return Err(PyTypeError::new_err(format!(
"Expected instance of {expected_module}.{expected_name}, got {found_module}.{found_name}",
)));
Expand Down Expand Up @@ -173,7 +169,7 @@ impl FromPyArrow for DataType {
}
}

validate_class("DataType", value)?;
validate_class(data_type_class(value.py())?, value)?;

let c_schema = FFI_ArrowSchema::empty();
let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
Expand All @@ -187,9 +183,8 @@ impl ToPyArrow for DataType {
fn to_pyarrow<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
let c_schema = FFI_ArrowSchema::try_from(self).map_err(to_py_err)?;
let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
let module = py.import("pyarrow")?;
let class = module.getattr("DataType")?;
let dtype = class.call_method1("_import_from_c", (c_schema_ptr as Py_uintptr_t,))?;
let dtype =
data_type_class(py)?.call_method1("_import_from_c", (c_schema_ptr as Py_uintptr_t,))?;
Ok(dtype)
}
}
Expand All @@ -213,7 +208,7 @@ impl FromPyArrow for Field {
}
}

validate_class("Field", value)?;
validate_class(field_class(value.py())?, value)?;

let c_schema = FFI_ArrowSchema::empty();
let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
Expand All @@ -227,9 +222,8 @@ impl ToPyArrow for Field {
fn to_pyarrow<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
let c_schema = FFI_ArrowSchema::try_from(self).map_err(to_py_err)?;
let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
let module = py.import("pyarrow")?;
let class = module.getattr("Field")?;
let dtype = class.call_method1("_import_from_c", (c_schema_ptr as Py_uintptr_t,))?;
let dtype =
field_class(py)?.call_method1("_import_from_c", (c_schema_ptr as Py_uintptr_t,))?;
Ok(dtype)
}
}
Expand All @@ -253,7 +247,7 @@ impl FromPyArrow for Schema {
}
}

validate_class("Schema", value)?;
validate_class(schema_class(value.py())?, value)?;

let c_schema = FFI_ArrowSchema::empty();
let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
Expand All @@ -267,9 +261,8 @@ impl ToPyArrow for Schema {
fn to_pyarrow<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
let c_schema = FFI_ArrowSchema::try_from(self).map_err(to_py_err)?;
let c_schema_ptr = &c_schema as *const FFI_ArrowSchema;
let module = py.import("pyarrow")?;
let class = module.getattr("Schema")?;
let schema = class.call_method1("_import_from_c", (c_schema_ptr as Py_uintptr_t,))?;
let schema =
schema_class(py)?.call_method1("_import_from_c", (c_schema_ptr as Py_uintptr_t,))?;
Ok(schema)
}
}
Expand Down Expand Up @@ -310,7 +303,7 @@ impl FromPyArrow for ArrayData {
return unsafe { ffi::from_ffi(array, schema_ptr.as_ref()) }.map_err(to_py_err);
}

validate_class("Array", value)?;
validate_class(array_class(value.py())?, value)?;

// prepare a pointer to receive the Array struct
let mut array = FFI_ArrowArray::empty();
Expand All @@ -336,9 +329,7 @@ impl ToPyArrow for ArrayData {
let array = FFI_ArrowArray::new(self);
let schema = FFI_ArrowSchema::try_from(self.data_type()).map_err(to_py_err)?;

let module = py.import("pyarrow")?;
let class = module.getattr("Array")?;
let array = class.call_method1(
let array = array_class(py)?.call_method1(
"_import_from_c",
(
addr_of!(array) as Py_uintptr_t,
Expand Down Expand Up @@ -423,7 +414,7 @@ impl FromPyArrow for RecordBatch {
return RecordBatch::try_new_with_options(schema, columns, &options).map_err(to_py_err);
}

validate_class("RecordBatch", value)?;
validate_class(record_batch_class(value.py())?, value)?;
// TODO(kszucs): implement the FFI conversions in arrow-rs for RecordBatches
let schema = value.getattr("schema")?;
let schema = Arc::new(Schema::from_pyarrow_bound(&schema)?);
Expand Down Expand Up @@ -483,7 +474,7 @@ impl FromPyArrow for ArrowArrayStreamReader {
return Ok(stream_reader);
}

validate_class("RecordBatchReader", value)?;
validate_class(record_batch_reader_class(value.py())?, value)?;

// prepare a pointer to receive the stream struct
let mut stream = FFI_ArrowArrayStream::empty();
Expand All @@ -510,10 +501,8 @@ impl IntoPyArrow for Box<dyn RecordBatchReader + Send> {
let mut stream = FFI_ArrowArrayStream::new(self);

let stream_ptr = (&mut stream) as *mut FFI_ArrowArrayStream;
let module = py.import("pyarrow")?;
let class = module.getattr("RecordBatchReader")?;
let args = PyTuple::new(py, [stream_ptr as Py_uintptr_t])?;
let reader = class.call_method1("_import_from_c", args)?;
let reader = record_batch_reader_class(py)?
.call_method1("_import_from_c", (stream_ptr as Py_uintptr_t,))?;

Ok(reader)
}
Expand Down Expand Up @@ -606,21 +595,52 @@ impl FromPyArrow for Table {
/// Convert a [`Table`] into `pyarrow.Table`.
impl IntoPyArrow for Table {
fn into_pyarrow(self, py: Python) -> PyResult<Bound<PyAny>> {
let module = py.import(intern!(py, "pyarrow"))?;
let class = module.getattr(intern!(py, "Table"))?;

let py_batches = PyList::new(py, self.record_batches.into_iter().map(PyArrowType))?;
let py_schema = PyArrowType(Arc::unwrap_or_clone(self.schema));

let kwargs = PyDict::new(py);
kwargs.set_item("schema", py_schema)?;

let reader = class.call_method("from_batches", (py_batches,), Some(&kwargs))?;
let reader = table_class(py)?.call_method("from_batches", (py_batches,), Some(&kwargs))?;

Ok(reader)
}
}

fn array_class(py: Python<'_>) -> PyResult<&Bound<'_, PyType>> {
static TYPE: PyOnceLock<Py<PyType>> = PyOnceLock::new();
TYPE.import(py, "pyarrow", "Array")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://docs.rs/pyo3/0.28.2/pyo3/sync/struct.PyOnceLock.html#method.import

Looks like this is exactly the pattern it was designed for

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Codex also found similar uses in other well respected crates

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting that those crates cache the imports. I thought that Python cached the import anyways on the C side, so it was unnecessary to do it on the pyo3 side

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought that Python cached the import anyways on the C side, so it was unnecessary to do it on the pyo3 side

Yes, cpython does not reinitialize the module each time (I guess this what you mean by "cache the import", sorry for the dumb answer if it's not the case). However, doing py.import(my_module)?.getattr(my_class)? requires at least two map lookups, one to fetch the module object from its path and one to fetch the class from the module. The cache allows to skip these lookups and directly use the type object. If the GIL is enabled there is no synchronization cost to do that (PyOnceLock use the GIL as lock).

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see; so it's just saving two lookups into the CPython HashMap? I guess that's not nothing, but it's not the slow module reinitialization I was worried it was.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd be curious about a benchmark, but not required to merge

Copy link
Copy Markdown
Contributor Author

@Tpt Tpt Mar 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you! I just ran a benchmark by curiosity. Here is the result:

import_direct           time:   [272.33 ns 274.23 ns 276.52 ns]
Found 4 outliers among 100 measurements (4.00%)
  1 (1.00%) high mild
  3 (3.00%) high severe

import_intern           time:   [206.61 ns 207.60 ns 208.84 ns]
Found 8 outliers among 100 measurements (8.00%)
  6 (6.00%) high mild
  2 (2.00%) high severe

import_static           time:   [1.3524 ns 1.3578 ns 1.3648 ns]
Found 17 outliers among 100 measurements (17.00%)
  4 (4.00%) high mild
  13 (13.00%) high severe

the three benchmarks import uuid.UUID.

  • import_direct uses Python::import()?.getattr()
  • import_intern uses Python::import(intern!())?.getattr(intern!()) to avoid always allocating the strings "uuid" and "UUID"
  • import_static uses PyOnceLock::import

Code:

Details ```rust use std::hint::black_box;

use codspeed_criterion_compat::{criterion_group, criterion_main, Bencher, Criterion};

use pyo3::prelude::*;

use pyo3::intern;
use pyo3::sync::PyOnceLock;
use pyo3::types::PyType;

fn import_direct(b: &mut Bencher<'_>) {
Python::attach(|py| {
b.iter(|| black_box(black_box(&py.import("uuid").unwrap()).getattr("UUID")).unwrap());
});
}

fn import_intern(b: &mut Bencher<'_>) {
Python::attach(|py| {
b.iter(|| {
black_box(
black_box(&py.import(intern!(py, "uuid")).unwrap()).getattr(intern!(py, "UUID")),
)
.unwrap()
});
});
}

fn import_static(b: &mut Bencher<'_>) {
Python::attach(|py| {
static TYPE: PyOnceLock<Py> = PyOnceLock::new();
b.iter(|| {
black_box(TYPE.import(py, "uuid", "UUID")).unwrap();
});
});
}

fn criterion_benchmark(c: &mut Criterion) {
c.bench_function("import_direct", import_direct);
c.bench_function("import_intern", import_intern);
c.bench_function("import_static", import_static);
}

criterion_group!(benches, criterion_benchmark);
criterion_main!(benches);

</details>

}

fn record_batch_class(py: Python<'_>) -> PyResult<&Bound<'_, PyType>> {
static TYPE: PyOnceLock<Py<PyType>> = PyOnceLock::new();
TYPE.import(py, "pyarrow", "RecordBatch")
}

fn record_batch_reader_class(py: Python<'_>) -> PyResult<&Bound<'_, PyType>> {
static TYPE: PyOnceLock<Py<PyType>> = PyOnceLock::new();
TYPE.import(py, "pyarrow", "RecordBatchReader")
}
fn data_type_class(py: Python<'_>) -> PyResult<&Bound<'_, PyType>> {
static TYPE: PyOnceLock<Py<PyType>> = PyOnceLock::new();
TYPE.import(py, "pyarrow", "DataType")
}

fn field_class(py: Python<'_>) -> PyResult<&Bound<'_, PyType>> {
static TYPE: PyOnceLock<Py<PyType>> = PyOnceLock::new();
TYPE.import(py, "pyarrow", "Field")
}

fn schema_class(py: Python<'_>) -> PyResult<&Bound<'_, PyType>> {
static TYPE: PyOnceLock<Py<PyType>> = PyOnceLock::new();
TYPE.import(py, "pyarrow", "Schema")
}

fn table_class(py: Python<'_>) -> PyResult<&Bound<'_, PyType>> {
static TYPE: PyOnceLock<Py<PyType>> = PyOnceLock::new();
TYPE.import(py, "pyarrow", "Table")
}

/// A newtype wrapper for types implementing [`FromPyArrow`] or [`IntoPyArrow`].
///
/// When wrapped around a type `T: FromPyArrow`, it
Expand Down
Loading