Skip to content

Commit 0450713

Browse files
timsaucerclaude
andcommitted
refactor: drop dead state from PythonFunctionScalarUDF
`input_fields: Vec<Field>` and `volatility: Volatility` were added to the struct so the codec could read them on encode. Both were redundant: * `Signature` already carries the `Vec<DataType>` (via `TypeSignature::Exact`) and `Volatility` — the constructor collapses the incoming `Vec<Field>` to `DataType`s on its way into the signature, so `Field`-level metadata (nullability, attached metadata) is never propagated anywhere on the local side. * On decode, `from_parts` runs that same collapse again. Sender's `Signature` and receiver's `Signature` end up with the same `DataType`s and the same `Volatility`. The reconstructed `PythonFunctionScalarUDF` is functionally equivalent to the original without preserving the input-side `Field`s. Revert the struct to its original 4-field shape (`name`, `func`, `signature`, `return_field`). The codec now derives the input `DataType`s from `signature.type_signature` and reads volatility from `signature.volatility`. Input fields are still serialized into the cloudpickle payload (with synthesized `arg_i` names) so the wire format is unchanged. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent c7f95ac commit 0450713

2 files changed

Lines changed: 32 additions & 20 deletions

File tree

crates/core/src/codec.rs

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ use datafusion::datasource::TableProvider;
8585
use datafusion::datasource::file_format::FileFormatFactory;
8686
use datafusion::execution::TaskContext;
8787
use datafusion::logical_expr::{
88-
AggregateUDF, Extension, LogicalPlan, ScalarUDF, ScalarUDFImpl, WindowUDF,
88+
AggregateUDF, Extension, LogicalPlan, ScalarUDF, ScalarUDFImpl, TypeSignature, WindowUDF,
8989
};
9090
use datafusion::physical_expr::PhysicalExpr;
9191
use datafusion::physical_plan::ExecutionPlan;
@@ -357,13 +357,32 @@ pub(crate) fn try_decode_python_scalar_udf(buf: &[u8]) -> Result<Option<Arc<Scal
357357
/// Build the cloudpickle payload for a `PythonFunctionScalarUDF`.
358358
///
359359
/// Layout: `cloudpickle.dumps((name, func, input_schema_bytes,
360-
/// return_field, volatility_str))`. Input fields ride along as an
361-
/// IPC-encoded pyarrow Schema so they round-trip without extra
362-
/// plumbing.
360+
/// return_field, volatility_str))`. Input `DataType`s are derived
361+
/// from the UDF's `Signature` (always `TypeSignature::Exact` for
362+
/// Python-defined UDFs) and packaged as a pyarrow `Schema` with
363+
/// synthesized field names — the local `PythonFunctionScalarUDF`
364+
/// does not retain field-level metadata, and reconstructing one on
365+
/// the receiver via `from_parts` immediately collapses any incoming
366+
/// `Field` info back to `DataType`, so the original sender-side
367+
/// fields and receiver-side fields are functionally equivalent.
363368
fn encode_python_scalar_udf(py: Python<'_>, udf: &PythonFunctionScalarUDF) -> PyResult<Vec<u8>> {
364369
let cloudpickle = py.import("cloudpickle")?;
365370

366-
let input_schema = Schema::new(udf.input_fields().to_vec());
371+
let signature = udf.signature();
372+
let input_dtypes: Vec<arrow::datatypes::DataType> = match &signature.type_signature {
373+
TypeSignature::Exact(types) => types.clone(),
374+
other => {
375+
return Err(pyo3::exceptions::PyValueError::new_err(format!(
376+
"PythonFunctionScalarUDF expected Signature::Exact, got {other:?}"
377+
)));
378+
}
379+
};
380+
let fields: Vec<Field> = input_dtypes
381+
.into_iter()
382+
.enumerate()
383+
.map(|(i, dt)| Field::new(format!("arg_{i}"), dt, true))
384+
.collect();
385+
let input_schema = Schema::new(fields);
367386
let pa_schema_obj = input_schema.to_pyarrow(py)?;
368387
let pa_schema = pa_schema_obj.into_bound();
369388
let schema_bytes: Vec<u8> = pa_schema
@@ -372,7 +391,7 @@ fn encode_python_scalar_udf(py: Python<'_>, udf: &PythonFunctionScalarUDF) -> Py
372391
.extract()?;
373392

374393
let return_field_obj = udf.return_field().as_ref().to_pyarrow(py)?;
375-
let volatility = format!("{:?}", udf.volatility()).to_lowercase();
394+
let volatility = format!("{:?}", signature.volatility).to_lowercase();
376395

377396
let payload = PyTuple::new(
378397
py,

crates/core/src/udf.rs

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,8 @@ use crate::expr::PyExpr;
4646
pub(crate) struct PythonFunctionScalarUDF {
4747
name: String,
4848
func: Py<PyAny>,
49-
input_fields: Vec<Field>,
50-
return_field: FieldRef,
5149
signature: Signature,
52-
volatility: Volatility,
50+
return_field: FieldRef,
5351
}
5452

5553
impl PythonFunctionScalarUDF {
@@ -65,10 +63,8 @@ impl PythonFunctionScalarUDF {
6563
Self {
6664
name,
6765
func,
68-
input_fields,
69-
return_field: Arc::new(return_field),
7066
signature,
71-
volatility,
67+
return_field: Arc::new(return_field),
7268
}
7369
}
7470

@@ -78,18 +74,15 @@ impl PythonFunctionScalarUDF {
7874
&self.func
7975
}
8076

81-
pub(crate) fn input_fields(&self) -> &[Field] {
82-
&self.input_fields
83-
}
84-
8577
pub(crate) fn return_field(&self) -> &FieldRef {
8678
&self.return_field
8779
}
8880

89-
pub(crate) fn volatility(&self) -> Volatility {
90-
self.volatility
91-
}
92-
81+
/// Reconstruct a `PythonFunctionScalarUDF` from the parts emitted
82+
/// by the codec. `input_fields` here only contributes `data_type`
83+
/// info (collapsed into `Signature::exact`); their names,
84+
/// nullability, and metadata are not retained, so the decoder is
85+
/// free to fabricate them from `Vec<DataType>`.
9386
pub(crate) fn from_parts(
9487
name: String,
9588
func: Py<PyAny>,

0 commit comments

Comments
 (0)