Skip to content

Commit 9e15f72

Browse files
timsaucerclaude
andcommitted
feat(codec): split UDF wire-format magic into family prefix + version byte
The wire-format magics `DFPYUDF1` / `DFPYUDA1` / `DFPYUDW1` encoded the version in the trailing byte but the decoder did nothing with it: a payload tagged for a future tuple shape would either skip the prefix check (and fall through to `inner`) or, if the prefix still matched, detonate inside `cloudpickle.loads` with an opaque tuple-unpack error. Split the framing into a 7-byte family prefix plus a 1-byte version: * Family magics: `DFPYUDF` (scalar), `DFPYUDA` (aggregate), `DFPYUDW` (window). * `WIRE_VERSION_CURRENT: u8 = 1` — version emitted by this build. * `WIRE_VERSION_MIN_SUPPORTED: u8 = 1` — oldest version this build decodes; raise when retiring a payload shape. * `write_wire_header(buf, family)` and `strip_wire_header(buf, family, kind) -> Result<Option<&[u8]>>` centralize the framing. The decoder returns `Ok(None)` when the family prefix is absent (fall through to `inner`), `Ok(Some(payload))` inside the supported version range, and an `Execution("Inline Python {kind} payload wire-format version vN, this build supports vMIN..=vCURRENT. Align datafusion-python versions on sender and receiver.")` outside it. Wire format is unreleased, so the rename of the magic constants is rename-grade not BC-break-grade. Six unit tests cover the helper: family absent → fall through; truncated version byte; too-new version; too-old version (when applicable); successful round-trip; family mismatch. Module doc updated with the new framing layout and a version-bump policy. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 9ef5d0a commit 9e15f72

1 file changed

Lines changed: 178 additions & 52 deletions

File tree

crates/core/src/codec.rs

Lines changed: 178 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -48,29 +48,36 @@
4848
//! plans to survive a serialization round-trip. Both codecs share
4949
//! the same payload framing for that reason.
5050
//!
51-
//! Payloads emitted by these codecs are tagged with an 8-byte magic
52-
//! prefix so the decoder can distinguish them from arbitrary bytes
53-
//! (empty `fun_definition` from the default codec, user FFI payloads
54-
//! that picked a non-colliding prefix). Dispatch precedence on
55-
//! decode: **Python-inline payload (magic prefix match) → `inner`
56-
//! codec → caller's `FunctionRegistry` fallback.**
51+
//! Payloads emitted by these codecs are framed as
52+
//! `<family_magic: 7 bytes> <version: u8> <cloudpickle blob>`. The
53+
//! family magic identifies the UDF flavor; the version byte lets the
54+
//! decoder reject too-new or too-old payloads with a clean error
55+
//! instead of falling into an opaque `cloudpickle` tuple-unpack
56+
//! failure when the tuple shape changes. Dispatch precedence on
57+
//! decode: **family match + supported version → `inner` codec →
58+
//! caller's `FunctionRegistry` fallback.**
5759
//!
58-
//! ## Wire-format magic prefix registry
60+
//! ## Wire-format family registry
5961
//!
60-
//! | Layer + kind | Magic prefix |
61-
//! | ----------------------------- | ------------ |
62-
//! | `PythonLogicalCodec` scalar | `DFPYUDF1` |
63-
//! | `PythonLogicalCodec` agg | `DFPYUDA1` |
64-
//! | `PythonLogicalCodec` window | `DFPYUDW1` |
65-
//! | `PythonPhysicalCodec` scalar | `DFPYUDF1` |
66-
//! | `PythonPhysicalCodec` agg | `DFPYUDA1` |
67-
//! | `PythonPhysicalCodec` window | `DFPYUDW1` |
68-
//! | `PythonPhysicalCodec` expr | `DFPYPE1` |
69-
//! | User FFI extension codec | user-chosen |
70-
//! | Default codec | (none) |
62+
//! | Layer + kind | Family prefix |
63+
//! | ----------------------------- | ------------- |
64+
//! | `PythonLogicalCodec` scalar | `DFPYUDF` |
65+
//! | `PythonLogicalCodec` agg | `DFPYUDA` |
66+
//! | `PythonLogicalCodec` window | `DFPYUDW` |
67+
//! | `PythonPhysicalCodec` scalar | `DFPYUDF` |
68+
//! | `PythonPhysicalCodec` agg | `DFPYUDA` |
69+
//! | `PythonPhysicalCodec` window | `DFPYUDW` |
70+
//! | User FFI extension codec | user-chosen |
71+
//! | Default codec | (none) |
7172
//!
72-
//! Downstream FFI codecs should pick non-colliding prefixes (use a
73-
//! `DF` namespace plus a crate-specific suffix). The codec
73+
//! Current wire-format version is [`WIRE_VERSION_CURRENT`]; supported
74+
//! receive range is `WIRE_VERSION_MIN_SUPPORTED..=WIRE_VERSION_CURRENT`.
75+
//! Bump [`WIRE_VERSION_CURRENT`] whenever the cloudpickle tuple shape
76+
//! changes; raise [`WIRE_VERSION_MIN_SUPPORTED`] when dropping support
77+
//! for an older shape.
78+
//!
79+
//! Downstream FFI codecs should pick non-colliding family prefixes
80+
//! (use a `DF` namespace plus a crate-specific suffix). The codec
7481
//! implementations in this module currently delegate every method to
7582
//! `inner`; the encoder/decoder hooks for each kind are added as the
7683
//! corresponding Python-side type becomes serializable.
@@ -100,21 +107,76 @@ use crate::udaf::PythonFunctionAggregateUDF;
100107
use crate::udf::PythonFunctionScalarUDF;
101108
use crate::udwf::PythonFunctionWindowUDF;
102109

103-
/// Wire-format prefix that tags a `fun_definition` payload as an
104-
/// inlined Python scalar UDF (cloudpickled tuple of name, callable,
105-
/// input schema, return field, volatility). Defined once here so
106-
/// the encoder and decoder cannot drift.
107-
pub(crate) const PY_SCALAR_UDF_MAGIC: &[u8] = b"DFPYUDF1";
110+
// Wire-format framing for inlined Python UDF payloads.
111+
//
112+
// Layout: `<family_magic: 7 bytes> <version: u8> <cloudpickle blob>`.
113+
// The family magic identifies the UDF flavor; the version byte lets
114+
// the decoder reject too-new or too-old payloads with a clean error
115+
// instead of falling into an opaque `cloudpickle` tuple-unpack failure
116+
// when the tuple shape changes. Bump [`WIRE_VERSION_CURRENT`] whenever
117+
// the tuple shape changes; raise [`WIRE_VERSION_MIN_SUPPORTED`] when
118+
// dropping support for an older shape.
119+
120+
/// Family prefix for an inlined Python scalar UDF
121+
/// (cloudpickled tuple of name, callable, input schema, return field,
122+
/// volatility).
123+
pub(crate) const PY_SCALAR_UDF_FAMILY: &[u8] = b"DFPYUDF";
108124

109-
/// Wire-format prefix for an inlined Python aggregate UDF
125+
/// Family prefix for an inlined Python aggregate UDF
110126
/// (cloudpickled tuple of name, accumulator factory, input schema,
111127
/// return type, state types schema, volatility).
112-
pub(crate) const PY_AGG_UDF_MAGIC: &[u8] = b"DFPYUDA1";
128+
pub(crate) const PY_AGG_UDF_FAMILY: &[u8] = b"DFPYUDA";
113129

114-
/// Wire-format prefix for an inlined Python window UDF (cloudpickled
115-
/// tuple of name, evaluator factory, input schema, return type,
116-
/// volatility).
117-
pub(crate) const PY_WINDOW_UDF_MAGIC: &[u8] = b"DFPYUDW1";
130+
/// Family prefix for an inlined Python window UDF
131+
/// (cloudpickled tuple of name, evaluator factory, input schema,
132+
/// return type, volatility).
133+
pub(crate) const PY_WINDOW_UDF_FAMILY: &[u8] = b"DFPYUDW";
134+
135+
/// Wire-format version this build emits.
136+
pub(crate) const WIRE_VERSION_CURRENT: u8 = 1;
137+
138+
/// Oldest wire-format version this build still decodes. Bump when
139+
/// retiring support for an older payload shape.
140+
pub(crate) const WIRE_VERSION_MIN_SUPPORTED: u8 = 1;
141+
142+
/// Tag `buf` with the framing header for `family` at the current
143+
/// wire-format version. Append-only — the caller writes the
144+
/// cloudpickle payload after.
145+
fn write_wire_header(buf: &mut Vec<u8>, family: &[u8]) {
146+
buf.extend_from_slice(family);
147+
buf.push(WIRE_VERSION_CURRENT);
148+
}
149+
150+
/// Inspect the framing on `buf`.
151+
///
152+
/// * `Ok(None)` — `buf` does not carry `family`. The caller should
153+
/// delegate to its `inner` codec.
154+
/// * `Ok(Some(payload))` — `buf` carries `family` at a version this
155+
/// build accepts; `payload` is the cloudpickle blob.
156+
/// * `Err(_)` — `buf` carries `family` but at a version outside
157+
/// `WIRE_VERSION_MIN_SUPPORTED..=WIRE_VERSION_CURRENT`. The error
158+
/// names the version and the supported range so an operator can
159+
/// diagnose sender/receiver version drift instead of seeing an
160+
/// opaque cloudpickle tuple-unpack failure.
161+
fn strip_wire_header<'a>(buf: &'a [u8], family: &[u8], kind: &str) -> Result<Option<&'a [u8]>> {
162+
if !buf.starts_with(family) {
163+
return Ok(None);
164+
}
165+
let version_idx = family.len();
166+
let Some(&version) = buf.get(version_idx) else {
167+
return Err(datafusion::error::DataFusionError::Execution(format!(
168+
"Truncated inline Python {kind} payload: missing wire-format version byte"
169+
)));
170+
};
171+
if !(WIRE_VERSION_MIN_SUPPORTED..=WIRE_VERSION_CURRENT).contains(&version) {
172+
return Err(datafusion::error::DataFusionError::Execution(format!(
173+
"Inline Python {kind} payload wire-format version v{version}; \
174+
this build supports v{WIRE_VERSION_MIN_SUPPORTED}..=v{WIRE_VERSION_CURRENT}. \
175+
Align datafusion-python versions on sender and receiver."
176+
)));
177+
}
178+
Ok(Some(&buf[version_idx + 1..]))
179+
}
118180

119181
/// `LogicalExtensionCodec` parked on every `SessionContext`. Holds
120182
/// the Python-aware encoding hooks for logical-layer types
@@ -239,7 +301,7 @@ impl LogicalExtensionCodec for PythonLogicalCodec {
239301
if let Some(udf) = try_decode_python_scalar_udf(buf)? {
240302
return Ok(udf);
241303
}
242-
} else if buf.starts_with(PY_SCALAR_UDF_MAGIC) {
304+
} else if buf.starts_with(PY_SCALAR_UDF_FAMILY) {
243305
return Err(refuse_inline_payload("scalar UDF", name));
244306
}
245307
self.inner.try_decode_udf(name, buf)
@@ -257,7 +319,7 @@ impl LogicalExtensionCodec for PythonLogicalCodec {
257319
if let Some(udaf) = try_decode_python_agg_udf(buf)? {
258320
return Ok(udaf);
259321
}
260-
} else if buf.starts_with(PY_AGG_UDF_MAGIC) {
322+
} else if buf.starts_with(PY_AGG_UDF_FAMILY) {
261323
return Err(refuse_inline_payload("aggregate UDF", name));
262324
}
263325
self.inner.try_decode_udaf(name, buf)
@@ -275,7 +337,7 @@ impl LogicalExtensionCodec for PythonLogicalCodec {
275337
if let Some(udwf) = try_decode_python_window_udf(buf)? {
276338
return Ok(udwf);
277339
}
278-
} else if buf.starts_with(PY_WINDOW_UDF_MAGIC) {
340+
} else if buf.starts_with(PY_WINDOW_UDF_FAMILY) {
279341
return Err(refuse_inline_payload("window UDF", name));
280342
}
281343
self.inner.try_decode_udwf(name, buf)
@@ -309,7 +371,7 @@ fn refuse_inline_payload(kind: &str, name: &str) -> datafusion::error::DataFusio
309371
/// encoding on this layer too — otherwise a plan with a Python UDF
310372
/// would round-trip at the logical level but break at the physical
311373
/// level. Both layers reuse the shared payload framing
312-
/// ([`PY_SCALAR_UDF_MAGIC`] et al.) so the wire format is identical.
374+
/// ([`PY_SCALAR_UDF_FAMILY`] et al.) so the wire format is identical.
313375
#[derive(Debug)]
314376
pub struct PythonPhysicalCodec {
315377
inner: Arc<dyn PhysicalExtensionCodec>,
@@ -371,7 +433,7 @@ impl PhysicalExtensionCodec for PythonPhysicalCodec {
371433
if let Some(udf) = try_decode_python_scalar_udf(buf)? {
372434
return Ok(udf);
373435
}
374-
} else if buf.starts_with(PY_SCALAR_UDF_MAGIC) {
436+
} else if buf.starts_with(PY_SCALAR_UDF_FAMILY) {
375437
return Err(refuse_inline_payload("scalar UDF", name));
376438
}
377439
self.inner.try_decode_udf(name, buf)
@@ -401,7 +463,7 @@ impl PhysicalExtensionCodec for PythonPhysicalCodec {
401463
if let Some(udaf) = try_decode_python_agg_udf(buf)? {
402464
return Ok(udaf);
403465
}
404-
} else if buf.starts_with(PY_AGG_UDF_MAGIC) {
466+
} else if buf.starts_with(PY_AGG_UDF_FAMILY) {
405467
return Err(refuse_inline_payload("aggregate UDF", name));
406468
}
407469
self.inner.try_decode_udaf(name, buf)
@@ -419,7 +481,7 @@ impl PhysicalExtensionCodec for PythonPhysicalCodec {
419481
if let Some(udwf) = try_decode_python_window_udf(buf)? {
420482
return Ok(udwf);
421483
}
422-
} else if buf.starts_with(PY_WINDOW_UDF_MAGIC) {
484+
} else if buf.starts_with(PY_WINDOW_UDF_FAMILY) {
423485
return Err(refuse_inline_payload("window UDF", name));
424486
}
425487
self.inner.try_decode_udwf(name, buf)
@@ -452,21 +514,20 @@ pub(crate) fn try_encode_python_scalar_udf(node: &ScalarUDF, buf: &mut Vec<u8>)
452514
Python::attach(|py| -> Result<bool> {
453515
let bytes = encode_python_scalar_udf(py, py_udf)
454516
.map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))?;
455-
buf.extend_from_slice(PY_SCALAR_UDF_MAGIC);
517+
write_wire_header(buf, PY_SCALAR_UDF_FAMILY);
456518
buf.extend_from_slice(&bytes);
457519
Ok(true)
458520
})
459521
}
460522

461523
/// Decode an inline Python scalar UDF payload. Returns `Ok(None)`
462-
/// when `buf` does not carry the `DFPYUDF1` prefix, signalling the
463-
/// caller to delegate to its `inner` codec (and eventually the
524+
/// when `buf` does not carry the `DFPYUDF` family prefix, signalling
525+
/// the caller to delegate to its `inner` codec (and eventually the
464526
/// `FunctionRegistry`).
465527
pub(crate) fn try_decode_python_scalar_udf(buf: &[u8]) -> Result<Option<Arc<ScalarUDF>>> {
466-
if !buf.starts_with(PY_SCALAR_UDF_MAGIC) {
528+
let Some(payload) = strip_wire_header(buf, PY_SCALAR_UDF_FAMILY, "scalar UDF")? else {
467529
return Ok(None);
468-
}
469-
let payload = &buf[PY_SCALAR_UDF_MAGIC.len()..];
530+
};
470531

471532
Python::attach(|py| -> Result<Option<Arc<ScalarUDF>>> {
472533
let udf = decode_python_scalar_udf(py, payload)
@@ -675,17 +736,16 @@ pub(crate) fn try_encode_python_window_udf(node: &WindowUDF, buf: &mut Vec<u8>)
675736
Python::attach(|py| -> Result<bool> {
676737
let bytes = encode_python_window_udf(py, py_udf)
677738
.map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))?;
678-
buf.extend_from_slice(PY_WINDOW_UDF_MAGIC);
739+
write_wire_header(buf, PY_WINDOW_UDF_FAMILY);
679740
buf.extend_from_slice(&bytes);
680741
Ok(true)
681742
})
682743
}
683744

684745
pub(crate) fn try_decode_python_window_udf(buf: &[u8]) -> Result<Option<Arc<WindowUDF>>> {
685-
if !buf.starts_with(PY_WINDOW_UDF_MAGIC) {
746+
let Some(payload) = strip_wire_header(buf, PY_WINDOW_UDF_FAMILY, "window UDF")? else {
686747
return Ok(None);
687-
}
688-
let payload = &buf[PY_WINDOW_UDF_MAGIC.len()..];
748+
};
689749

690750
Python::attach(|py| -> Result<Option<Arc<WindowUDF>>> {
691751
let udf = decode_python_window_udf(py, payload)
@@ -765,17 +825,16 @@ pub(crate) fn try_encode_python_agg_udf(node: &AggregateUDF, buf: &mut Vec<u8>)
765825
Python::attach(|py| -> Result<bool> {
766826
let bytes = encode_python_agg_udf(py, py_udf)
767827
.map_err(|e| datafusion::error::DataFusionError::External(Box::new(e)))?;
768-
buf.extend_from_slice(PY_AGG_UDF_MAGIC);
828+
write_wire_header(buf, PY_AGG_UDF_FAMILY);
769829
buf.extend_from_slice(&bytes);
770830
Ok(true)
771831
})
772832
}
773833

774834
pub(crate) fn try_decode_python_agg_udf(buf: &[u8]) -> Result<Option<Arc<AggregateUDF>>> {
775-
if !buf.starts_with(PY_AGG_UDF_MAGIC) {
835+
let Some(payload) = strip_wire_header(buf, PY_AGG_UDF_FAMILY, "aggregate UDF")? else {
776836
return Ok(None);
777-
}
778-
let payload = &buf[PY_AGG_UDF_MAGIC.len()..];
837+
};
779838

780839
Python::attach(|py| -> Result<Option<Arc<AggregateUDF>>> {
781840
let udf = decode_python_agg_udf(py, payload)
@@ -850,3 +909,70 @@ fn decode_python_agg_udf(py: Python<'_>, payload: &[u8]) -> PyResult<PythonFunct
850909
volatility,
851910
))
852911
}
912+
913+
#[cfg(test)]
914+
mod wire_header_tests {
915+
use super::*;
916+
917+
#[test]
918+
fn strip_returns_none_when_family_absent() {
919+
let buf = b"OTHER_PAYLOAD";
920+
assert!(matches!(
921+
strip_wire_header(buf, PY_SCALAR_UDF_FAMILY, "scalar UDF"),
922+
Ok(None)
923+
));
924+
}
925+
926+
#[test]
927+
fn strip_errors_on_truncated_version_byte() {
928+
let buf = PY_SCALAR_UDF_FAMILY;
929+
let err = strip_wire_header(buf, PY_SCALAR_UDF_FAMILY, "scalar UDF").unwrap_err();
930+
assert!(format!("{err}").contains("missing wire-format version byte"));
931+
}
932+
933+
#[test]
934+
fn strip_errors_on_too_new_version() {
935+
let mut buf = PY_SCALAR_UDF_FAMILY.to_vec();
936+
buf.push(WIRE_VERSION_CURRENT.saturating_add(1));
937+
buf.extend_from_slice(b"payload");
938+
let err = strip_wire_header(&buf, PY_SCALAR_UDF_FAMILY, "scalar UDF").unwrap_err();
939+
let msg = format!("{err}");
940+
assert!(msg.contains("wire-format version v"));
941+
assert!(msg.contains("supports"));
942+
assert!(msg.contains("Align datafusion-python versions"));
943+
}
944+
945+
#[test]
946+
fn strip_errors_on_too_old_version() {
947+
if WIRE_VERSION_MIN_SUPPORTED == 0 {
948+
return;
949+
}
950+
let mut buf = PY_SCALAR_UDF_FAMILY.to_vec();
951+
buf.push(WIRE_VERSION_MIN_SUPPORTED - 1);
952+
buf.extend_from_slice(b"payload");
953+
assert!(strip_wire_header(&buf, PY_SCALAR_UDF_FAMILY, "scalar UDF").is_err());
954+
}
955+
956+
#[test]
957+
fn write_then_strip_round_trips_payload() {
958+
let mut buf = Vec::new();
959+
write_wire_header(&mut buf, PY_AGG_UDF_FAMILY);
960+
buf.extend_from_slice(b"agg-payload");
961+
962+
let payload = strip_wire_header(&buf, PY_AGG_UDF_FAMILY, "aggregate UDF")
963+
.unwrap()
964+
.unwrap();
965+
assert_eq!(payload, b"agg-payload");
966+
}
967+
968+
#[test]
969+
fn strip_does_not_match_a_different_family() {
970+
let mut buf = Vec::new();
971+
write_wire_header(&mut buf, PY_SCALAR_UDF_FAMILY);
972+
buf.extend_from_slice(b"payload");
973+
assert!(matches!(
974+
strip_wire_header(&buf, PY_WINDOW_UDF_FAMILY, "window UDF"),
975+
Ok(None)
976+
));
977+
}
978+
}

0 commit comments

Comments
 (0)