|
75 | 75 | //! `inner`; the encoder/decoder hooks for each kind are added as the |
76 | 76 | //! corresponding Python-side type becomes serializable. |
77 | 77 |
|
78 | | -use std::sync::Arc; |
| 78 | +use std::sync::{Arc, OnceLock}; |
79 | 79 |
|
80 | 80 | use arrow::datatypes::{Field, Schema, SchemaRef}; |
81 | 81 | use arrow::ipc::reader::StreamReader; |
@@ -470,7 +470,7 @@ pub(crate) fn try_decode_python_scalar_udf(buf: &[u8]) -> Result<Option<Arc<Scal |
470 | 470 | /// `from_parts` immediately collapses incoming `Field`s back to |
471 | 471 | /// `DataType`s for the reconstructed `Signature`. |
472 | 472 | fn encode_python_scalar_udf(py: Python<'_>, udf: &PythonFunctionScalarUDF) -> PyResult<Vec<u8>> { |
473 | | - let cloudpickle = py.import("cloudpickle")?; |
| 473 | + let cloudpickle = cloudpickle(py)?; |
474 | 474 |
|
475 | 475 | let signature = udf.signature(); |
476 | 476 | let input_dtypes: Vec<arrow::datatypes::DataType> = match &signature.type_signature { |
@@ -513,7 +513,7 @@ fn encode_python_scalar_udf(py: Python<'_>, udf: &PythonFunctionScalarUDF) -> Py |
513 | 513 |
|
514 | 514 | /// Inverse of [`encode_python_scalar_udf`]. |
515 | 515 | fn decode_python_scalar_udf(py: Python<'_>, payload: &[u8]) -> PyResult<PythonFunctionScalarUDF> { |
516 | | - let cloudpickle = py.import("cloudpickle")?; |
| 516 | + let cloudpickle = cloudpickle(py)?; |
517 | 517 |
|
518 | 518 | let tuple = cloudpickle |
519 | 519 | .call_method1("loads", (PyBytes::new(py, payload),))? |
@@ -589,6 +589,33 @@ fn volatility_wire_str(v: Volatility) -> &'static str { |
589 | 589 | } |
590 | 590 | } |
591 | 591 |
|
| 592 | +/// Cached handle to the `cloudpickle` module. |
| 593 | +/// |
| 594 | +/// Six encode/decode helpers below would otherwise re-resolve the |
| 595 | +/// module on every call. `py.import` is backed by `sys.modules` and |
| 596 | +/// therefore cheap, but each call still walks a dict and re-binds the |
| 597 | +/// result; a plan with many Python UDFs pays that cost per UDF. The |
| 598 | +/// `OnceLock` collapses it to a single import per process while the |
| 599 | +/// `Py<PyAny>` lets us hand out a fresh `Bound` against the current |
| 600 | +/// GIL token without holding one in the static slot. |
| 601 | +fn cloudpickle<'py>(py: Python<'py>) -> PyResult<Bound<'py, PyAny>> { |
| 602 | + static CLOUDPICKLE: OnceLock<Py<PyAny>> = OnceLock::new(); |
| 603 | + if let Some(cached) = CLOUDPICKLE.get() { |
| 604 | + return Ok(cached.bind(py).clone()); |
| 605 | + } |
| 606 | + // Race: two threads can both miss and import. CPython's |
| 607 | + // `sys.modules` makes the second import essentially free, and |
| 608 | + // `set` losing the race still leaves the winning value in the |
| 609 | + // slot — both threads end up returning the same module. |
| 610 | + let module = py.import("cloudpickle")?; |
| 611 | + let _ = CLOUDPICKLE.set(module.clone().unbind().into_any()); |
| 612 | + Ok(CLOUDPICKLE |
| 613 | + .get() |
| 614 | + .expect("cloudpickle slot populated above") |
| 615 | + .bind(py) |
| 616 | + .clone()) |
| 617 | +} |
| 618 | + |
592 | 619 | // ============================================================================= |
593 | 620 | // Shared Python window UDF encode / decode helpers |
594 | 621 | // |
@@ -629,7 +656,7 @@ pub(crate) fn try_decode_python_window_udf(buf: &[u8]) -> Result<Option<Arc<Wind |
629 | 656 | } |
630 | 657 |
|
631 | 658 | fn encode_python_window_udf(py: Python<'_>, udf: &PythonFunctionWindowUDF) -> PyResult<Vec<u8>> { |
632 | | - let cloudpickle = py.import("cloudpickle")?; |
| 659 | + let cloudpickle = cloudpickle(py)?; |
633 | 660 |
|
634 | 661 | let signature = WindowUDFImpl::signature(udf); |
635 | 662 | let input_dtypes: Vec<arrow::datatypes::DataType> = match &signature.type_signature { |
@@ -671,7 +698,7 @@ fn encode_python_window_udf(py: Python<'_>, udf: &PythonFunctionWindowUDF) -> Py |
671 | 698 | } |
672 | 699 |
|
673 | 700 | fn decode_python_window_udf(py: Python<'_>, payload: &[u8]) -> PyResult<PythonFunctionWindowUDF> { |
674 | | - let cloudpickle = py.import("cloudpickle")?; |
| 701 | + let cloudpickle = cloudpickle(py)?; |
675 | 702 |
|
676 | 703 | let tuple = cloudpickle |
677 | 704 | .call_method1("loads", (PyBytes::new(py, payload),))? |
@@ -757,7 +784,7 @@ pub(crate) fn try_decode_python_agg_udf(buf: &[u8]) -> Result<Option<Arc<Aggrega |
757 | 784 | } |
758 | 785 |
|
759 | 786 | fn encode_python_agg_udf(py: Python<'_>, udf: &PythonFunctionAggregateUDF) -> PyResult<Vec<u8>> { |
760 | | - let cloudpickle = py.import("cloudpickle")?; |
| 787 | + let cloudpickle = cloudpickle(py)?; |
761 | 788 |
|
762 | 789 | let signature = AggregateUDFImpl::signature(udf); |
763 | 790 | let input_dtypes: Vec<arrow::datatypes::DataType> = match &signature.type_signature { |
@@ -807,7 +834,7 @@ fn encode_python_agg_udf(py: Python<'_>, udf: &PythonFunctionAggregateUDF) -> Py |
807 | 834 | } |
808 | 835 |
|
809 | 836 | fn decode_python_agg_udf(py: Python<'_>, payload: &[u8]) -> PyResult<PythonFunctionAggregateUDF> { |
810 | | - let cloudpickle = py.import("cloudpickle")?; |
| 837 | + let cloudpickle = cloudpickle(py)?; |
811 | 838 |
|
812 | 839 | let tuple = cloudpickle |
813 | 840 | .call_method1("loads", (PyBytes::new(py, payload),))? |
|
0 commit comments