Skip to content

Commit 977e88c

Browse files
timsaucerclaude
andcommitted
feat: per-session toggle for Python UDF inline encoding
Adds `SessionContext.with_python_udf_inlining(enabled)` for two related use cases: * **Cross-language portability.** With inlining disabled, the codec no longer emits `DFPYUDF1` / `DFPYUDA1` / `DFPYUDW1` cloudpickle blobs. Python UDFs travel by name only, the same way FFI-capsule UDFs do. Bytes round-trip through a non-Python decoder. * **Untrusted-source decode.** `Expr.from_bytes` on bytes from a misbehaving sender no longer invokes `cloudpickle.loads`. Inline payloads received by a strict decoder raise a clear error. `PythonLogicalCodec` and `PythonPhysicalCodec` gain a `python_udf_inlining: bool` field (default `true`) and a builder method `with_python_udf_inlining(enabled)`. The six UDF encode/decode dispatchers consult the flag before calling the inline helpers. Strict decoders that see a magic-prefix payload return a clear `Plan` error rather than silently failing through to the inner codec (which would otherwise produce "LogicalExtensionCodec is not provided" — accurate but unhelpful). `PySessionContext::with_python_udf_inlining(enabled)` rebuilds both codecs with the new setting; Python wrapper at `SessionContext.with_python_udf_inlining` mirrors. Test coverage: encoder size delta, strict roundtrip via registry, clear-error-on-inline-payload-when-strict. `pickle.loads` on untrusted bytes remains unsafe regardless of this flag; the toggle only governs the `to_bytes` / `from_bytes` codec path. User guide documents both use cases plus the limitation. 1097 root tests pass (up from 1094 with 3 new strict-mode cases). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 44df444 commit 977e88c

5 files changed

Lines changed: 226 additions & 22 deletions

File tree

crates/core/src/codec.rs

Lines changed: 88 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -128,16 +128,34 @@ pub(crate) const PY_WINDOW_UDF_MAGIC: &[u8] = b"DFPYUDW1";
128128
#[derive(Debug)]
129129
pub struct PythonLogicalCodec {
130130
inner: Arc<dyn LogicalExtensionCodec>,
131+
python_udf_inlining: bool,
131132
}
132133

133134
impl PythonLogicalCodec {
134135
pub fn new(inner: Arc<dyn LogicalExtensionCodec>) -> Self {
135-
Self { inner }
136+
Self {
137+
inner,
138+
python_udf_inlining: true,
139+
}
136140
}
137141

138142
pub fn inner(&self) -> &Arc<dyn LogicalExtensionCodec> {
139143
&self.inner
140144
}
145+
146+
/// Whether Python-defined UDFs are encoded inline (and decoded
147+
/// from cloudpickle blobs). Defaults to `true`. Set to `false`
148+
/// when the codec sits on a session that must produce
149+
/// cross-language wire bytes, or reject `cloudpickle.loads` on
150+
/// untrusted `from_bytes` input.
151+
pub fn with_python_udf_inlining(mut self, enabled: bool) -> Self {
152+
self.python_udf_inlining = enabled;
153+
self
154+
}
155+
156+
pub fn python_udf_inlining(&self) -> bool {
157+
self.python_udf_inlining
158+
}
141159
}
142160

143161
impl Default for PythonLogicalCodec {
@@ -197,48 +215,72 @@ impl LogicalExtensionCodec for PythonLogicalCodec {
197215
}
198216

199217
fn try_encode_udf(&self, node: &ScalarUDF, buf: &mut Vec<u8>) -> Result<()> {
200-
if try_encode_python_scalar_udf(node, buf)? {
218+
if self.python_udf_inlining && try_encode_python_scalar_udf(node, buf)? {
201219
return Ok(());
202220
}
203221
self.inner.try_encode_udf(node, buf)
204222
}
205223

206224
fn try_decode_udf(&self, name: &str, buf: &[u8]) -> Result<Arc<ScalarUDF>> {
207-
if let Some(udf) = try_decode_python_scalar_udf(buf)? {
208-
return Ok(udf);
225+
if self.python_udf_inlining {
226+
if let Some(udf) = try_decode_python_scalar_udf(buf)? {
227+
return Ok(udf);
228+
}
229+
} else if buf.starts_with(PY_SCALAR_UDF_MAGIC) {
230+
return Err(refuse_inline_payload("scalar UDF", name));
209231
}
210232
self.inner.try_decode_udf(name, buf)
211233
}
212234

213235
fn try_encode_udaf(&self, node: &AggregateUDF, buf: &mut Vec<u8>) -> Result<()> {
214-
if try_encode_python_agg_udf(node, buf)? {
236+
if self.python_udf_inlining && try_encode_python_agg_udf(node, buf)? {
215237
return Ok(());
216238
}
217239
self.inner.try_encode_udaf(node, buf)
218240
}
219241

220242
fn try_decode_udaf(&self, name: &str, buf: &[u8]) -> Result<Arc<AggregateUDF>> {
221-
if let Some(udaf) = try_decode_python_agg_udf(buf)? {
222-
return Ok(udaf);
243+
if self.python_udf_inlining {
244+
if let Some(udaf) = try_decode_python_agg_udf(buf)? {
245+
return Ok(udaf);
246+
}
247+
} else if buf.starts_with(PY_AGG_UDF_MAGIC) {
248+
return Err(refuse_inline_payload("aggregate UDF", name));
223249
}
224250
self.inner.try_decode_udaf(name, buf)
225251
}
226252

227253
fn try_encode_udwf(&self, node: &WindowUDF, buf: &mut Vec<u8>) -> Result<()> {
228-
if try_encode_python_window_udf(node, buf)? {
254+
if self.python_udf_inlining && try_encode_python_window_udf(node, buf)? {
229255
return Ok(());
230256
}
231257
self.inner.try_encode_udwf(node, buf)
232258
}
233259

234260
fn try_decode_udwf(&self, name: &str, buf: &[u8]) -> Result<Arc<WindowUDF>> {
235-
if let Some(udwf) = try_decode_python_window_udf(buf)? {
236-
return Ok(udwf);
261+
if self.python_udf_inlining {
262+
if let Some(udwf) = try_decode_python_window_udf(buf)? {
263+
return Ok(udwf);
264+
}
265+
} else if buf.starts_with(PY_WINDOW_UDF_MAGIC) {
266+
return Err(refuse_inline_payload("window UDF", name));
237267
}
238268
self.inner.try_decode_udwf(name, buf)
239269
}
240270
}
241271

272+
/// Build the error returned by a strict codec when it receives an
273+
/// inline Python-UDF payload it has been told not to deserialize.
274+
fn refuse_inline_payload(kind: &str, name: &str) -> datafusion::error::DataFusionError {
275+
datafusion::error::DataFusionError::Plan(format!(
276+
"Refusing to deserialize inline Python {kind} '{name}': Python UDF \
277+
inlining is disabled on this session. Re-encode the bytes with \
278+
inlining enabled, or register '{name}' on the sender's session \
279+
before encode (and on the receiver before decode) so the UDF \
280+
travels by name."
281+
))
282+
}
283+
242284
/// `PhysicalExtensionCodec` mirror of [`PythonLogicalCodec`] parked
243285
/// on the same `SessionContext`. Carries the Python-aware encoding
244286
/// hooks for physical-layer types (`ExecutionPlan`, `PhysicalExpr`)
@@ -254,16 +296,30 @@ impl LogicalExtensionCodec for PythonLogicalCodec {
254296
#[derive(Debug)]
255297
pub struct PythonPhysicalCodec {
256298
inner: Arc<dyn PhysicalExtensionCodec>,
299+
python_udf_inlining: bool,
257300
}
258301

259302
impl PythonPhysicalCodec {
260303
pub fn new(inner: Arc<dyn PhysicalExtensionCodec>) -> Self {
261-
Self { inner }
304+
Self {
305+
inner,
306+
python_udf_inlining: true,
307+
}
262308
}
263309

264310
pub fn inner(&self) -> &Arc<dyn PhysicalExtensionCodec> {
265311
&self.inner
266312
}
313+
314+
/// See [`PythonLogicalCodec::with_python_udf_inlining`].
315+
pub fn with_python_udf_inlining(mut self, enabled: bool) -> Self {
316+
self.python_udf_inlining = enabled;
317+
self
318+
}
319+
320+
pub fn python_udf_inlining(&self) -> bool {
321+
self.python_udf_inlining
322+
}
267323
}
268324

269325
impl Default for PythonPhysicalCodec {
@@ -287,15 +343,19 @@ impl PhysicalExtensionCodec for PythonPhysicalCodec {
287343
}
288344

289345
fn try_encode_udf(&self, node: &ScalarUDF, buf: &mut Vec<u8>) -> Result<()> {
290-
if try_encode_python_scalar_udf(node, buf)? {
346+
if self.python_udf_inlining && try_encode_python_scalar_udf(node, buf)? {
291347
return Ok(());
292348
}
293349
self.inner.try_encode_udf(node, buf)
294350
}
295351

296352
fn try_decode_udf(&self, name: &str, buf: &[u8]) -> Result<Arc<ScalarUDF>> {
297-
if let Some(udf) = try_decode_python_scalar_udf(buf)? {
298-
return Ok(udf);
353+
if self.python_udf_inlining {
354+
if let Some(udf) = try_decode_python_scalar_udf(buf)? {
355+
return Ok(udf);
356+
}
357+
} else if buf.starts_with(PY_SCALAR_UDF_MAGIC) {
358+
return Err(refuse_inline_payload("scalar UDF", name));
299359
}
300360
self.inner.try_decode_udf(name, buf)
301361
}
@@ -313,29 +373,37 @@ impl PhysicalExtensionCodec for PythonPhysicalCodec {
313373
}
314374

315375
fn try_encode_udaf(&self, node: &AggregateUDF, buf: &mut Vec<u8>) -> Result<()> {
316-
if try_encode_python_agg_udf(node, buf)? {
376+
if self.python_udf_inlining && try_encode_python_agg_udf(node, buf)? {
317377
return Ok(());
318378
}
319379
self.inner.try_encode_udaf(node, buf)
320380
}
321381

322382
fn try_decode_udaf(&self, name: &str, buf: &[u8]) -> Result<Arc<AggregateUDF>> {
323-
if let Some(udaf) = try_decode_python_agg_udf(buf)? {
324-
return Ok(udaf);
383+
if self.python_udf_inlining {
384+
if let Some(udaf) = try_decode_python_agg_udf(buf)? {
385+
return Ok(udaf);
386+
}
387+
} else if buf.starts_with(PY_AGG_UDF_MAGIC) {
388+
return Err(refuse_inline_payload("aggregate UDF", name));
325389
}
326390
self.inner.try_decode_udaf(name, buf)
327391
}
328392

329393
fn try_encode_udwf(&self, node: &WindowUDF, buf: &mut Vec<u8>) -> Result<()> {
330-
if try_encode_python_window_udf(node, buf)? {
394+
if self.python_udf_inlining && try_encode_python_window_udf(node, buf)? {
331395
return Ok(());
332396
}
333397
self.inner.try_encode_udwf(node, buf)
334398
}
335399

336400
fn try_decode_udwf(&self, name: &str, buf: &[u8]) -> Result<Arc<WindowUDF>> {
337-
if let Some(udwf) = try_decode_python_window_udf(buf)? {
338-
return Ok(udwf);
401+
if self.python_udf_inlining {
402+
if let Some(udwf) = try_decode_python_window_udf(buf)? {
403+
return Ok(udwf);
404+
}
405+
} else if buf.starts_with(PY_WINDOW_UDF_MAGIC) {
406+
return Err(refuse_inline_payload("window UDF", name));
339407
}
340408
self.inner.try_decode_udwf(name, buf)
341409
}

crates/core/src/context.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1407,6 +1407,28 @@ impl PySessionContext {
14071407
physical_codec,
14081408
})
14091409
}
1410+
1411+
/// Toggle inline encoding of Python-defined UDFs on this session's
1412+
/// codec stack. Disable when producing bytes that must round-trip
1413+
/// through a non-Python decoder, or when reconstructing bytes from
1414+
/// an untrusted source via `Expr.from_bytes` (cloudpickle.loads
1415+
/// will not be invoked on the receiver). Pickle remains unsafe on
1416+
/// untrusted input regardless of this flag.
1417+
pub fn with_python_udf_inlining(&self, enabled: bool) -> Self {
1418+
let logical_codec = Arc::new(
1419+
PythonLogicalCodec::new(Arc::clone(self.logical_codec.inner()))
1420+
.with_python_udf_inlining(enabled),
1421+
);
1422+
let physical_codec = Arc::new(
1423+
PythonPhysicalCodec::new(Arc::clone(self.physical_codec.inner()))
1424+
.with_python_udf_inlining(enabled),
1425+
);
1426+
Self {
1427+
ctx: Arc::clone(&self.ctx),
1428+
logical_codec,
1429+
physical_codec,
1430+
}
1431+
}
14101432
}
14111433

14121434
impl PySessionContext {

docs/source/user-guide/io/distributing_work.rst

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,37 @@ Practical considerations
173173
the captured state is large, mutable, or not portable to the
174174
worker's environment.
175175

176+
Disabling Python UDF inlining
177+
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
178+
179+
For a stricter wire format, call
180+
:py:meth:`SessionContext.with_python_udf_inlining(False)
181+
<datafusion.SessionContext.with_python_udf_inlining>` on the session
182+
producing or consuming the bytes. With inlining disabled, Python
183+
UDFs travel by name only — the same way FFI-capsule UDFs do — and
184+
the receiver must have a matching registration.
185+
186+
Two use cases:
187+
188+
* **Cross-language portability.** A non-Python decoder cannot
189+
reconstruct a cloudpickled payload. Senders aimed at Java, C++,
190+
or another Rust binary disable inlining and rely on the receiver
191+
having compatible UDF registrations.
192+
* **Untrusted-source decode.** With inlining disabled,
193+
:py:meth:`Expr.from_bytes` never calls ``cloudpickle.loads`` on
194+
the incoming bytes — an inline payload from a misbehaving sender
195+
raises a clear error instead of executing arbitrary Python code.
196+
197+
Mismatched configurations raise a descriptive error: an inline blob
198+
fed to a strict receiver fails fast rather than silently dropping
199+
into ``cloudpickle.loads``.
200+
201+
Note that :py:func:`pickle.loads` itself remains unsafe on untrusted
202+
input regardless of this setting — an attacker producing the outer
203+
pickle envelope can execute arbitrary code before the codec ever
204+
sees the bytes. The toggle only protects the
205+
:py:meth:`Expr.from_bytes` API surface.
206+
176207
Security
177208
~~~~~~~~
178209

@@ -182,8 +213,10 @@ Security
182213
arbitrary Python code on the receiver — pickle is doing the work
183214
under the hood and pickle is unsafe on untrusted input. Only
184215
accept expressions from trusted sources. For untrusted-source
185-
workflows, restrict senders to built-in functions and
186-
pre-registered Rust-side UDFs.
216+
workflows, disable Python UDF inlining (see above), restrict
217+
senders to built-in functions and pre-registered Rust-side UDFs,
218+
and avoid :py:func:`pickle.loads` on externally supplied bytes
219+
entirely.
187220

188221
Query-level distribution via datafusion-distributed
189222
---------------------------------------------------

python/datafusion/context.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1769,3 +1769,28 @@ def with_physical_extension_codec(self, codec: Any) -> SessionContext:
17691769
new = SessionContext.__new__(SessionContext)
17701770
new.ctx = new_internal
17711771
return new
1772+
1773+
def with_python_udf_inlining(self, enabled: bool) -> SessionContext:
1774+
"""Toggle inline encoding of Python-defined UDFs on this session.
1775+
1776+
When ``True`` (the default), Python scalar, aggregate, and window
1777+
UDFs travel inside the serialized expression and are
1778+
reconstructed on the receiver without pre-registration.
1779+
1780+
Set ``False`` to:
1781+
1782+
* Produce serialized bytes that round-trip through a non-Python
1783+
decoder (cross-language portability). UDFs are stored by name
1784+
only; the receiver must have matching registrations.
1785+
* Refuse to reconstruct Python UDFs from
1786+
:meth:`Expr.from_bytes` input that may come from an untrusted
1787+
source — ``cloudpickle.loads`` will not be invoked.
1788+
1789+
``pickle.loads`` on untrusted bytes remains unsafe regardless of
1790+
this setting; only the ``to_bytes`` / ``from_bytes`` API is
1791+
affected.
1792+
"""
1793+
new_internal = self.ctx.with_python_udf_inlining(enabled)
1794+
new = SessionContext.__new__(SessionContext)
1795+
new.ctx = new_internal
1796+
return new

python/tests/test_pickle_expr.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,62 @@ def test_window_udf_decodes_via_pickle_with_no_worker_ctx(self):
229229
assert "count_up" in decoded.canonical_name()
230230

231231

232+
class TestPythonUdfInliningToggle:
233+
"""`SessionContext.with_python_udf_inlining(False)` opts out of
234+
inline Python UDF encoding for both encode and decode paths."""
235+
236+
def _build_double_udf(self):
237+
return udf(
238+
lambda arr: pa.array([(v.as_py() or 0) * 2 for v in arr]),
239+
[pa.int64()],
240+
pa.int64(),
241+
volatility="immutable",
242+
name="double",
243+
)
244+
245+
def test_strict_encoder_emits_smaller_blob(self):
246+
"""Strict mode skips cloudpickle of the Python callable, so the
247+
encoded bytes are dramatically smaller than the inline form."""
248+
ctx_inline = SessionContext()
249+
ctx_strict = ctx_inline.with_python_udf_inlining(False)
250+
u = self._build_double_udf()
251+
e = u(col("a"))
252+
253+
blob_inline = e.to_bytes(ctx_inline)
254+
blob_strict = e.to_bytes(ctx_strict)
255+
256+
assert len(blob_strict) < len(blob_inline) // 4
257+
258+
def test_strict_roundtrip_via_registry(self):
259+
"""When both sender and receiver disable inlining, the UDF
260+
travels by name only and the receiver resolves it from its
261+
registered functions."""
262+
from datafusion import Expr
263+
264+
strict_sender = SessionContext().with_python_udf_inlining(False)
265+
u = self._build_double_udf()
266+
blob = u(col("a")).to_bytes(strict_sender)
267+
268+
receiver = SessionContext().with_python_udf_inlining(False)
269+
receiver.register_udf(u)
270+
restored = Expr.from_bytes(blob, ctx=receiver)
271+
assert "double" in restored.canonical_name()
272+
273+
def test_strict_decoder_refuses_inline_payload(self):
274+
"""An inline-encoded blob fed to a strict receiver raises with a
275+
clear error rather than silently invoking cloudpickle.loads."""
276+
from datafusion import Expr
277+
278+
sender = SessionContext()
279+
u = self._build_double_udf()
280+
blob = u(col("a")).to_bytes(sender)
281+
282+
strict_receiver = SessionContext().with_python_udf_inlining(False)
283+
strict_receiver.register_udf(u)
284+
with pytest.raises(Exception, match="inlining is disabled"):
285+
Expr.from_bytes(blob, ctx=strict_receiver)
286+
287+
232288
class TestWorkerCtxLifecycle:
233289
def test_set_and_clear(self):
234290
assert get_worker_ctx() is None

0 commit comments

Comments
 (0)