1717
1818//! Python-aware extension codecs.
1919//!
20- //! [`PythonLogicalCodec`] wraps a user-supplied (or default)
21- //! [`LogicalExtensionCodec`] and is the codec datafusion-python parks
22- //! on every `SessionContext`. [`PythonPhysicalCodec`] is the symmetric
23- //! wrapper around [`PhysicalExtensionCodec`].
20+ //! Datafusion-python plans can carry references to Python-defined
21+ //! objects that the upstream protobuf codecs do not know how to
22+ //! serialize: pure-Python scalar / aggregate / window UDFs, Python
23+ //! query-planning extensions, and so on. Their state lives inside
24+ //! `Py<PyAny>` callables and closures rather than being recoverable
25+ //! from a name in the receiver's function registry. To ship a plan
26+ //! across a process boundary (pickle, `multiprocessing`, Ray actor,
27+ //! `datafusion-distributed`, etc.) those payloads have to be encoded
28+ //! into the proto wire format itself.
2429//!
25- //! In PR1 both codecs delegate every call to their `inner` codec. The
26- //! types exist so that follow-up work (pickle support, Python scalar
27- //! UDF inline encoding) can add in-band Python payloads without
28- //! re-plumbing the session field.
30+ //! [`PythonLogicalCodec`] is the [`LogicalExtensionCodec`] that
31+ //! datafusion-python parks on every `SessionContext`. It wraps a
32+ //! user-supplied (or default) inner codec and adds Python-aware
33+ //! in-band encoding on top: when the encoder sees a Python-defined
34+ //! UDF, the codec cloudpickles the callable + signature into the
35+ //! `fun_definition` proto field; when the decoder sees a payload it
36+ //! produced, it reconstructs the UDF from the bytes alone — no
37+ //! pre-registration on the receiver. UDFs the codec does not
38+ //! recognise are delegated to `inner`, which is typically
39+ //! `DefaultLogicalExtensionCodec` but may be a downstream-supplied
40+ //! FFI codec installed via
41+ //! `SessionContext.with_logical_extension_codec(...)`.
2942//!
30- //! ## Wire-format magic prefix registry
43+ //! [`PythonPhysicalCodec`] is the symmetric wrapper around
44+ //! [`PhysicalExtensionCodec`]. Logical and physical layers each have
45+ //! a `try_encode_udf` / `try_decode_udf` pair, so a `ScalarUDF`
46+ //! referenced inside a `LogicalPlan`, an `ExecutionPlan`, or a
47+ //! `PhysicalExpr` must encode identically through either layer for
48+ //! plans to survive a serialization round-trip. Both codecs share
49+ //! the same payload framing for that reason.
50+ //!
51+ //! Payloads emitted by these codecs are tagged with an 8-byte magic
52+ //! prefix so the decoder can distinguish them from arbitrary bytes
53+ //! (empty `fun_definition` from the default codec, user FFI payloads
54+ //! that picked a non-colliding prefix). Dispatch precedence on
55+ //! decode: **Python-inline payload (magic prefix match) → `inner`
56+ //! codec → caller's `FunctionRegistry` fallback.**
3157//!
32- //! Future in-band Python payloads will be prefixed with an 8-byte
33- //! magic so the decoder can distinguish them from arbitrary
34- //! `fun_definition` bytes produced by the default codec or a user FFI
35- //! codec.
58+ //! ## Wire-format magic prefix registry
3659//!
37- //! | Layer + kind | Magic prefix | Status |
38- //! | ----------------------------- | ------------ | ------------- |
39- //! | `PythonLogicalCodec` scalar | `DFPYUDF1` | reserved (PR2)|
40- //! | `PythonLogicalCodec` agg | `DFPYUDA1` | reserved |
41- //! | `PythonLogicalCodec` window | `DFPYUDW1` | reserved |
42- //! | `PythonPhysicalCodec` scalar | `DFPYUDF1` | reserved (PR2)|
43- //! | `PythonPhysicalCodec` agg | `DFPYUDA1` | reserved |
44- //! | `PythonPhysicalCodec` window | `DFPYUDW1` | reserved |
45- //! | `PythonPhysicalCodec` expr | `DFPYPE1` | reserved |
46- //! | User FFI extension codec | user-chosen | downstream |
47- //! | Default codec | (none) | upstream |
60+ //! | Layer + kind | Magic prefix |
61+ //! | ----------------------------- | ------------ |
62+ //! | `PythonLogicalCodec` scalar | `DFPYUDF1` |
63+ //! | `PythonLogicalCodec` agg | `DFPYUDA1` |
64+ //! | `PythonLogicalCodec` window | `DFPYUDW1` |
65+ //! | `PythonPhysicalCodec` scalar | `DFPYUDF1` |
66+ //! | `PythonPhysicalCodec` agg | `DFPYUDA1` |
67+ //! | `PythonPhysicalCodec` window | `DFPYUDW1` |
68+ //! | `PythonPhysicalCodec` expr | `DFPYPE1` |
69+ //! | User FFI extension codec | user-chosen |
70+ //! | Default codec | (none) |
4871//!
49- //! Dispatch precedence once in-band payloads land: **Python-inline
50- //! payload (magic prefix match) → `inner` codec → caller's registry
51- //! fallback.** User FFI codecs should pick non-colliding prefixes
52- //! (recommend a `DF` namespace plus a crate-specific suffix).
72+ //! Downstream FFI codecs should pick non-colliding prefixes (use a
73+ //! `DF` namespace plus a crate-specific suffix). The codec
74+ //! implementations in this module currently delegate every method to
75+ //! `inner`; the encoder/decoder hooks for each kind are added as the
76+ //! corresponding Python-side type becomes serializable.
5377
5478use std:: sync:: Arc ;
5579
@@ -62,16 +86,23 @@ use datafusion::physical_plan::ExecutionPlan;
6286use datafusion_proto:: logical_plan:: { DefaultLogicalExtensionCodec , LogicalExtensionCodec } ;
6387use datafusion_proto:: physical_plan:: { DefaultPhysicalExtensionCodec , PhysicalExtensionCodec } ;
6488
65- /// Reserved magic prefix for an inlined Python scalar UDF payload.
66- /// Not produced or consumed by PR1; the constant is reserved here so
67- /// follow-up work has a single definition site.
89+ /// Wire-format prefix that tags a `fun_definition` payload as an
90+ /// inlined Python scalar UDF (cloudpickled tuple of name, callable,
91+ /// input schema, return field, volatility). Defined once here so
92+ /// the encoder and decoder cannot drift.
6893#[ allow( dead_code) ]
6994pub ( crate ) const PY_SCALAR_UDF_MAGIC : & [ u8 ] = b"DFPYUDF1" ;
7095
71- /// `LogicalExtensionCodec` parked on every `SessionContext`. Wraps a
72- /// composable `inner` codec; PR1 delegates every method straight
73- /// through. The wrapper exists so follow-up patches can add Python
74- /// in-band encoding without changing every serializer.
96+ /// `LogicalExtensionCodec` parked on every `SessionContext`. Holds
97+ /// the Python-aware encoding hooks for logical-layer types
98+ /// (`LogicalPlan`, `Expr`) and delegates everything it does not
99+ /// handle to the composable `inner` codec — typically
100+ /// `DefaultLogicalExtensionCodec`, or a downstream FFI codec
101+ /// installed via `SessionContext.with_logical_extension_codec(...)`.
102+ ///
103+ /// Sitting at the top of the session's logical codec stack means
104+ /// every serializer that reads `session.logical_codec()` automatically
105+ /// picks up Python-aware encoding for free.
75106#[ derive( Debug ) ]
76107pub struct PythonLogicalCodec {
77108 inner : Arc < dyn LogicalExtensionCodec > ,
@@ -136,9 +167,18 @@ impl LogicalExtensionCodec for PythonLogicalCodec {
136167 }
137168}
138169
139- /// `PhysicalExtensionCodec` mirror of [`PythonLogicalCodec`]. Same
140- /// motivation: a stable session field that follow-up patches can layer
141- /// Python in-band encoding onto.
170+ /// `PhysicalExtensionCodec` mirror of [`PythonLogicalCodec`] parked
171+ /// on the same `SessionContext`. Carries the Python-aware encoding
172+ /// hooks for physical-layer types (`ExecutionPlan`, `PhysicalExpr`)
173+ /// and delegates the rest to `inner`.
174+ ///
175+ /// The `PhysicalExtensionCodec` trait has its own `try_encode_udf`
176+ /// / `try_decode_udf` pair distinct from the logical one, so a
177+ /// `ScalarUDF` referenced inside a physical plan needs Python-aware
178+ /// encoding on this layer too — otherwise a plan with a Python UDF
179+ /// would round-trip at the logical level but break at the physical
180+ /// level. Both layers reuse the shared payload framing
181+ /// ([`PY_SCALAR_UDF_MAGIC`] et al.) so the wire format is identical.
142182#[ derive( Debug ) ]
143183pub struct PythonPhysicalCodec {
144184 inner : Arc < dyn PhysicalExtensionCodec > ,
0 commit comments