Skip to content

Commit f6be2fb

Browse files
timsaucerclaude
andcommitted
feat: per-session Python UDF inlining toggle + sender ctx + strict refusal
Adds a per-session toggle that turns inline Python UDF encoding on or off, plus the supporting plumbing to make it usable through pickle.dumps. Codec layer: * PythonLogicalCodec / PythonPhysicalCodec gain a python_udf_inlining bool (default true) and a with_python_udf_inlining(enabled) builder. Each try_encode_udf{,af,wf} short-circuits to inner when the toggle is off; each try_decode_udf{,af,wf} that recognizes a DFPY* magic on a strict codec returns a clean Execution error instead of invoking cloudpickle.loads. The refusal message names the UDF and the wire family so an operator can see at a glance whether to re-encode the bytes or register the UDF on the receiver. Session layer: * PySessionContext::with_python_udf_inlining(enabled) returns a new session whose stacked logical + physical codecs both carry the toggle. The Arc<SessionState> is cloned (cheap), only the codec pair is rebuilt, so registrations and config stay attached. * SessionContext.with_python_udf_inlining(*, enabled) is the Python wrapper. enabled is keyword-only because positional booleans at the call site read as opaque. Sender-side context: * datafusion.ipc gains set_sender_ctx / get_sender_ctx / clear_sender_ctx thread-locals. Expr.__reduce__ now consults get_sender_ctx() to pick the codec for outbound pickles, which is the only path through which a strict session affects pickle.dumps (the protocol calls __reduce__ with no arguments). Without a sender context the default codec is used. Tests: * test_pickle_expr.py picks up TestPythonUdfInliningToggle (covers both directions of the toggle plus the explicit-ctx fast path), TestWorkerCtxLifecycle (set/clear/threading), and TestSenderCtxLifecycle. * New test_pickle_multiprocessing.py + helpers exercise the full driver -> worker round-trip on a multiprocessing.Pool with set_*_ctx installed in the worker initializer. * CI workflow gets a 30-minute timeout-minutes backstop so a hung pickle worker can't block the matrix indefinitely. User-guide docs and the runnable examples land in PR4 of this series. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent dac9ec6 commit f6be2fb

9 files changed

Lines changed: 704 additions & 27 deletions

File tree

.github/workflows/test.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@ env:
2929
jobs:
3030
test-matrix:
3131
runs-on: ubuntu-latest
32+
# Backstop: a hung multiprocessing worker (e.g. during a pickle regression)
33+
# should not block CI longer than this.
34+
timeout-minutes: 30
3235
strategy:
3336
fail-fast: false
3437
matrix:

crates/core/src/codec.rs

Lines changed: 102 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -232,16 +232,44 @@ fn strip_wire_header<'a>(
232232
#[derive(Debug)]
233233
pub struct PythonLogicalCodec {
234234
inner: Arc<dyn LogicalExtensionCodec>,
235+
python_udf_inlining: bool,
235236
}
236237

237238
impl PythonLogicalCodec {
238239
pub fn new(inner: Arc<dyn LogicalExtensionCodec>) -> Self {
239-
Self { inner }
240+
Self {
241+
inner,
242+
python_udf_inlining: true,
243+
}
240244
}
241245

242246
pub fn inner(&self) -> &Arc<dyn LogicalExtensionCodec> {
243247
&self.inner
244248
}
249+
250+
/// Whether Python-defined UDFs are encoded inline (and decoded
251+
/// from cloudpickle blobs). Defaults to `true`. Set to `false`
252+
/// when the codec sits on a session that must produce
253+
/// cross-language wire bytes, or reject `cloudpickle.loads` on
254+
/// untrusted `from_bytes` input.
255+
///
256+
/// Security scope: strict mode (`false`) narrows only the codec
257+
/// layer — it stops `Expr::from_bytes` from invoking
258+
/// `cloudpickle.loads` on the inline `DFPY*` payload. It does
259+
/// **not** make `pickle.loads(untrusted_bytes)` safe; treat every
260+
/// `pickle.loads` on untrusted input as unsafe regardless of this
261+
/// setting. See Python's [pickle module security warning][1] for
262+
/// why `pickle.loads` is unsafe in general.
263+
///
264+
/// [1]: https://docs.python.org/3/library/pickle.html#module-pickle
265+
pub fn with_python_udf_inlining(mut self, enabled: bool) -> Self {
266+
self.python_udf_inlining = enabled;
267+
self
268+
}
269+
270+
pub fn python_udf_inlining(&self) -> bool {
271+
self.python_udf_inlining
272+
}
245273
}
246274

247275
impl Default for PythonLogicalCodec {
@@ -301,48 +329,76 @@ impl LogicalExtensionCodec for PythonLogicalCodec {
301329
}
302330

303331
fn try_encode_udf(&self, node: &ScalarUDF, buf: &mut Vec<u8>) -> Result<()> {
304-
if try_encode_python_scalar_udf(node, buf)? {
332+
if self.python_udf_inlining && try_encode_python_scalar_udf(node, buf)? {
305333
return Ok(());
306334
}
307335
self.inner.try_encode_udf(node, buf)
308336
}
309337

310338
fn try_decode_udf(&self, name: &str, buf: &[u8]) -> Result<Arc<ScalarUDF>> {
311-
if let Some(udf) = try_decode_python_scalar_udf(buf)? {
312-
return Ok(udf);
339+
if self.python_udf_inlining {
340+
if let Some(udf) = try_decode_python_scalar_udf(buf)? {
341+
return Ok(udf);
342+
}
343+
} else if buf.starts_with(PY_SCALAR_UDF_FAMILY) {
344+
return Err(refuse_inline_payload("scalar UDF", name));
313345
}
314346
self.inner.try_decode_udf(name, buf)
315347
}
316348

317349
fn try_encode_udaf(&self, node: &AggregateUDF, buf: &mut Vec<u8>) -> Result<()> {
318-
if try_encode_python_udaf(node, buf)? {
350+
if self.python_udf_inlining && try_encode_python_udaf(node, buf)? {
319351
return Ok(());
320352
}
321353
self.inner.try_encode_udaf(node, buf)
322354
}
323355

324356
fn try_decode_udaf(&self, name: &str, buf: &[u8]) -> Result<Arc<AggregateUDF>> {
325-
if let Some(udaf) = try_decode_python_udaf(buf)? {
326-
return Ok(udaf);
357+
if self.python_udf_inlining {
358+
if let Some(udaf) = try_decode_python_udaf(buf)? {
359+
return Ok(udaf);
360+
}
361+
} else if buf.starts_with(PY_AGG_UDF_FAMILY) {
362+
return Err(refuse_inline_payload("aggregate UDF", name));
327363
}
328364
self.inner.try_decode_udaf(name, buf)
329365
}
330366

331367
fn try_encode_udwf(&self, node: &WindowUDF, buf: &mut Vec<u8>) -> Result<()> {
332-
if try_encode_python_udwf(node, buf)? {
368+
if self.python_udf_inlining && try_encode_python_udwf(node, buf)? {
333369
return Ok(());
334370
}
335371
self.inner.try_encode_udwf(node, buf)
336372
}
337373

338374
fn try_decode_udwf(&self, name: &str, buf: &[u8]) -> Result<Arc<WindowUDF>> {
339-
if let Some(udwf) = try_decode_python_udwf(buf)? {
340-
return Ok(udwf);
375+
if self.python_udf_inlining {
376+
if let Some(udwf) = try_decode_python_udwf(buf)? {
377+
return Ok(udwf);
378+
}
379+
} else if buf.starts_with(PY_WINDOW_UDF_FAMILY) {
380+
return Err(refuse_inline_payload("window UDF", name));
341381
}
342382
self.inner.try_decode_udwf(name, buf)
343383
}
344384
}
345385

386+
/// Build the error returned by a strict codec when it receives an
387+
/// inline Python-UDF payload it has been told not to deserialize.
388+
fn refuse_inline_payload(kind: &str, name: &str) -> datafusion::error::DataFusionError {
389+
// `Execution`, not `Plan`: this is a wire-format decode refusal at
390+
// codec time, not a planner-stage failure. Downstream error
391+
// classification keys off the variant — surfacing this as a planner
392+
// error would mis-route it into "fix your SQL" buckets.
393+
datafusion::error::DataFusionError::Execution(format!(
394+
"Refusing to deserialize inline Python {kind} '{name}': Python UDF \
395+
inlining is disabled on this session. Ask the sender to re-encode \
396+
with inlining disabled (so the UDF travels by name), or register \
397+
'{name}' on this receiver's session and enable inlining on both \
398+
sides — receivers cannot re-encode bytes they did not produce."
399+
))
400+
}
401+
346402
/// `PhysicalExtensionCodec` mirror of [`PythonLogicalCodec`] parked
347403
/// on the same `SessionContext`. Carries the Python-aware encoding
348404
/// hooks for physical-layer types (`ExecutionPlan`, `PhysicalExpr`)
@@ -358,16 +414,30 @@ impl LogicalExtensionCodec for PythonLogicalCodec {
358414
#[derive(Debug)]
359415
pub struct PythonPhysicalCodec {
360416
inner: Arc<dyn PhysicalExtensionCodec>,
417+
python_udf_inlining: bool,
361418
}
362419

363420
impl PythonPhysicalCodec {
364421
pub fn new(inner: Arc<dyn PhysicalExtensionCodec>) -> Self {
365-
Self { inner }
422+
Self {
423+
inner,
424+
python_udf_inlining: true,
425+
}
366426
}
367427

368428
pub fn inner(&self) -> &Arc<dyn PhysicalExtensionCodec> {
369429
&self.inner
370430
}
431+
432+
/// See [`PythonLogicalCodec::with_python_udf_inlining`].
433+
pub fn with_python_udf_inlining(mut self, enabled: bool) -> Self {
434+
self.python_udf_inlining = enabled;
435+
self
436+
}
437+
438+
pub fn python_udf_inlining(&self) -> bool {
439+
self.python_udf_inlining
440+
}
371441
}
372442

373443
impl Default for PythonPhysicalCodec {
@@ -391,15 +461,19 @@ impl PhysicalExtensionCodec for PythonPhysicalCodec {
391461
}
392462

393463
fn try_encode_udf(&self, node: &ScalarUDF, buf: &mut Vec<u8>) -> Result<()> {
394-
if try_encode_python_scalar_udf(node, buf)? {
464+
if self.python_udf_inlining && try_encode_python_scalar_udf(node, buf)? {
395465
return Ok(());
396466
}
397467
self.inner.try_encode_udf(node, buf)
398468
}
399469

400470
fn try_decode_udf(&self, name: &str, buf: &[u8]) -> Result<Arc<ScalarUDF>> {
401-
if let Some(udf) = try_decode_python_scalar_udf(buf)? {
402-
return Ok(udf);
471+
if self.python_udf_inlining {
472+
if let Some(udf) = try_decode_python_scalar_udf(buf)? {
473+
return Ok(udf);
474+
}
475+
} else if buf.starts_with(PY_SCALAR_UDF_FAMILY) {
476+
return Err(refuse_inline_payload("scalar UDF", name));
403477
}
404478
self.inner.try_decode_udf(name, buf)
405479
}
@@ -417,29 +491,37 @@ impl PhysicalExtensionCodec for PythonPhysicalCodec {
417491
}
418492

419493
fn try_encode_udaf(&self, node: &AggregateUDF, buf: &mut Vec<u8>) -> Result<()> {
420-
if try_encode_python_udaf(node, buf)? {
494+
if self.python_udf_inlining && try_encode_python_udaf(node, buf)? {
421495
return Ok(());
422496
}
423497
self.inner.try_encode_udaf(node, buf)
424498
}
425499

426500
fn try_decode_udaf(&self, name: &str, buf: &[u8]) -> Result<Arc<AggregateUDF>> {
427-
if let Some(udaf) = try_decode_python_udaf(buf)? {
428-
return Ok(udaf);
501+
if self.python_udf_inlining {
502+
if let Some(udaf) = try_decode_python_udaf(buf)? {
503+
return Ok(udaf);
504+
}
505+
} else if buf.starts_with(PY_AGG_UDF_FAMILY) {
506+
return Err(refuse_inline_payload("aggregate UDF", name));
429507
}
430508
self.inner.try_decode_udaf(name, buf)
431509
}
432510

433511
fn try_encode_udwf(&self, node: &WindowUDF, buf: &mut Vec<u8>) -> Result<()> {
434-
if try_encode_python_udwf(node, buf)? {
512+
if self.python_udf_inlining && try_encode_python_udwf(node, buf)? {
435513
return Ok(());
436514
}
437515
self.inner.try_encode_udwf(node, buf)
438516
}
439517

440518
fn try_decode_udwf(&self, name: &str, buf: &[u8]) -> Result<Arc<WindowUDF>> {
441-
if let Some(udwf) = try_decode_python_udwf(buf)? {
442-
return Ok(udwf);
519+
if self.python_udf_inlining {
520+
if let Some(udwf) = try_decode_python_udwf(buf)? {
521+
return Ok(udwf);
522+
}
523+
} else if buf.starts_with(PY_WINDOW_UDF_FAMILY) {
524+
return Err(refuse_inline_payload("window UDF", name));
443525
}
444526
self.inner.try_decode_udwf(name, buf)
445527
}

crates/core/src/context.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1404,6 +1404,22 @@ impl PySessionContext {
14041404
physical_codec,
14051405
})
14061406
}
1407+
1408+
pub fn with_python_udf_inlining(&self, enabled: bool) -> Self {
1409+
let logical_codec = Arc::new(
1410+
PythonLogicalCodec::new(Arc::clone(self.logical_codec.inner()))
1411+
.with_python_udf_inlining(enabled),
1412+
);
1413+
let physical_codec = Arc::new(
1414+
PythonPhysicalCodec::new(Arc::clone(self.physical_codec.inner()))
1415+
.with_python_udf_inlining(enabled),
1416+
);
1417+
Self {
1418+
ctx: Arc::clone(&self.ctx),
1419+
logical_codec,
1420+
physical_codec,
1421+
}
1422+
}
14071423
}
14081424

14091425
impl PySessionContext {

python/datafusion/context.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1769,3 +1769,46 @@ def with_physical_extension_codec(self, codec: Any) -> SessionContext:
17691769
new = SessionContext.__new__(SessionContext)
17701770
new.ctx = new_internal
17711771
return new
1772+
1773+
def with_python_udf_inlining(self, *, enabled: bool) -> SessionContext:
1774+
"""Toggle inline encoding of Python-defined UDFs on this session.
1775+
1776+
``enabled`` is keyword-only:
1777+
``with_python_udf_inlining(enabled=False)`` reads at the call
1778+
site as the inverse of
1779+
``with_python_udf_inlining(enabled=True)``, where a positional
1780+
``True`` / ``False`` would not.
1781+
1782+
When ``True`` (the default), Python scalar, aggregate, and window
1783+
UDFs travel inside the serialized expression and are
1784+
reconstructed on the receiver without pre-registration.
1785+
1786+
Set ``False`` to:
1787+
1788+
* Produce serialized bytes that round-trip through a non-Python
1789+
decoder (cross-language portability). UDFs are stored by name
1790+
only; the receiver must have matching registrations.
1791+
* Refuse to reconstruct Python UDFs from
1792+
:meth:`Expr.from_bytes` input that may come from an untrusted
1793+
source — ``cloudpickle.loads`` will not be invoked.
1794+
1795+
The toggle applies directly to :meth:`Expr.to_bytes` /
1796+
:meth:`Expr.from_bytes` calls that pass this session as their
1797+
``ctx`` argument. To make the toggle apply through
1798+
:func:`pickle.dumps` (which calls :meth:`Expr.to_bytes` with no
1799+
context), install this session as the driver's sender context
1800+
via :func:`datafusion.ipc.set_sender_ctx` — and install it as
1801+
the worker's context via
1802+
:func:`datafusion.ipc.set_worker_ctx` for the corresponding
1803+
:func:`pickle.loads`.
1804+
1805+
For the full security model, see
1806+
:doc:`/user-guide/io/distributing_work` (Security section). In
1807+
short: this toggle narrows only the :meth:`Expr.from_bytes`
1808+
surface; :func:`pickle.loads` on untrusted bytes remains
1809+
unsafe regardless of the toggle.
1810+
"""
1811+
new_internal = self.ctx.with_python_udf_inlining(enabled)
1812+
new = SessionContext.__new__(SessionContext)
1813+
new.ctx = new_internal
1814+
return new

python/datafusion/expr.py

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -446,13 +446,16 @@ def to_bytes(self, ctx: SessionContext | None = None) -> bytes:
446446
worker process for distributed evaluation.
447447
448448
When ``ctx`` is supplied, encoding routes through that session's
449-
installed :class:`LogicalExtensionCodec`. When ``ctx`` is
450-
``None``, the default codec is used.
449+
installed :class:`LogicalExtensionCodec` (so settings like
450+
:meth:`SessionContext.with_python_udf_inlining` take effect).
451+
When ``ctx`` is ``None``, the default codec is used (Python UDF
452+
inlining on, no user-installed extension codec).
451453
452454
Built-in functions and Python UDFs (scalar, aggregate, window)
453455
travel inside the returned bytes; the worker does not need to
454456
pre-register them. UDFs imported via the FFI capsule protocol
455-
travel by name only and must be registered on the worker.
457+
travel by name only and must be registered on the worker. See
458+
:doc:`/user-guide/io/distributing_work`.
456459
457460
.. warning:: Security
458461
Bytes returned here may embed a cloudpickled Python
@@ -522,7 +525,9 @@ def from_bytes(cls, buf: bytes, ctx: SessionContext | None = None) -> Expr:
522525
523526
Accepts output of :meth:`to_bytes` or :func:`pickle.dumps`.
524527
``ctx`` is the :class:`SessionContext` used to resolve any
525-
function references that travel by name (e.g. FFI UDFs). When
528+
function references that travel by name (e.g. FFI UDFs, or
529+
Python UDFs sent with inlining disabled via
530+
:meth:`SessionContext.with_python_udf_inlining`). When
526531
``ctx`` is ``None`` the worker context installed via
527532
:func:`datafusion.ipc.set_worker_ctx` is consulted; if no worker
528533
context is installed, the global :class:`SessionContext` is used
@@ -586,8 +591,15 @@ def __reduce__(self) -> tuple[Callable[[bytes], Expr], tuple[bytes]]:
586591
>>> e = col("a") * lit(2)
587592
>>> pickle.loads(pickle.dumps(e)).canonical_name()
588593
'a * Int64(2)'
594+
595+
The encoding side honors a driver-side sender context installed
596+
via :func:`datafusion.ipc.set_sender_ctx` — that is how
597+
:meth:`SessionContext.with_python_udf_inlining` propagates
598+
through ``pickle.dumps``.
589599
"""
590-
return (Expr._reconstruct, (self.to_bytes(),))
600+
from datafusion.ipc import get_sender_ctx
601+
602+
return (Expr._reconstruct, (self.to_bytes(get_sender_ctx()),))
591603

592604
@classmethod
593605
def _reconstruct(cls, proto_bytes: bytes) -> Expr:

0 commit comments

Comments
 (0)