Skip to content

Commit 20b3338

Browse files
timsaucerclaude
andcommitted
docs: user guide page + runnable examples for distributing expressions
Wraps up the Expr-pickle work with the user-facing material: * docs/source/user-guide/io/distributing_work.rst — new user guide page covering the multiprocessing, Ray, and datafusion-distributed patterns. Includes the Security section that is the canonical home for the cloudpickle / pickle.loads threat model. * docs/source/user-guide/io/index.rst — toctree entry. * examples/multiprocessing_pickle_expr.py — runnable example: a Pool.map of a closure-capturing UDF across processes, with worker context registration in the initializer. * examples/ray_pickle_expr.py — Ray actor analogue. * examples/datafusion-ffi-example/python/tests/_test_pickle_strict_ffi.py — exercises the strict-mode refusal end to end against an FFI capsule scalar UDF (kept under the FFI example crate because the test needs that crate's compiled artifacts). * examples/README.md — index entries for the new files. Also tightens three docstrings that previously duplicated the security warning so they point at the canonical Security section instead: * PythonLogicalCodec::with_python_udf_inlining (rustdoc): one-line summary plus a relative pointer to distributing_work.rst and the upstream Python pickle module security warning. * SessionContext.with_python_udf_inlining: one-sentence summary plus :doc: link to the user guide. * datafusion.ipc module docstring: cross-reference to the user guide for the full pattern. The crate-level codec.rs module rustdoc also updates "pure-Python scalar UDFs" to "scalar / aggregate / window UDFs" now that all three are covered. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent d680e12 commit 20b3338

7 files changed

Lines changed: 763 additions & 13 deletions

File tree

crates/core/src/codec.rs

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,11 @@
1919
//!
2020
//! Datafusion-python plans can carry references to Python-defined
2121
//! objects that the upstream protobuf codecs do not know how to
22-
//! serialize: pure-Python scalar UDFs, Python query-planning
23-
//! extensions, and so on. Their state lives inside `Py<PyAny>`
24-
//! callables and closures rather than being recoverable from a name
25-
//! in the receiver's function registry. To ship a plan across a
26-
//! process boundary (pickle, `multiprocessing`, Ray actor,
22+
//! serialize: pure-Python scalar / aggregate / window UDFs, Python
23+
//! query-planning extensions, and so on. Their state lives inside
24+
//! `Py<PyAny>` callables and closures rather than being recoverable
25+
//! from a name in the receiver's function registry. To ship a plan
26+
//! across a process boundary (pickle, `multiprocessing`, Ray actor,
2727
//! `datafusion-distributed`, etc.) those payloads have to be encoded
2828
//! into the proto wire format itself.
2929
//!
@@ -217,8 +217,10 @@ impl PythonLogicalCodec {
217217
/// `cloudpickle.loads` on the inline `DFPY*` payload. It does
218218
/// **not** make `pickle.loads(untrusted_bytes)` safe; treat every
219219
/// `pickle.loads` on untrusted input as unsafe regardless of this
220-
/// setting. See Python's [pickle module security warning][1] for
221-
/// why `pickle.loads` is unsafe in general.
220+
/// setting. See `docs/source/user-guide/io/distributing_work.rst`
221+
/// (Security section) for the full threat model, and Python's
222+
/// [pickle module security warning][1] for why `pickle.loads` is
223+
/// unsafe in general.
222224
///
223225
/// [1]: https://docs.python.org/3/library/pickle.html#module-pickle
224226
pub fn with_python_udf_inlining(mut self, enabled: bool) -> Self {
@@ -369,7 +371,7 @@ fn refuse_inline_payload(kind: &str, name: &str) -> datafusion::error::DataFusio
369371
/// encoding on this layer too — otherwise a plan with a Python UDF
370372
/// would round-trip at the logical level but break at the physical
371373
/// level. Both layers reuse the shared payload framing
372-
/// ([`PY_SCALAR_UDF_FAMILY`]) so the wire format is identical.
374+
/// ([`PY_SCALAR_UDF_FAMILY`] et al.) so the wire format is identical.
373375
#[derive(Debug)]
374376
pub struct PythonPhysicalCodec {
375377
inner: Arc<dyn PhysicalExtensionCodec>,
@@ -496,10 +498,10 @@ impl PhysicalExtensionCodec for PythonPhysicalCodec {
496498
// =============================================================================
497499

498500
/// Encode a Python scalar UDF inline if `node` is one. Returns
499-
/// `Ok(true)` when the payload (`DFPYUDF` family prefix, version byte,
500-
/// cloudpickled tuple) was written and the caller should skip its
501-
/// inner codec. Returns `Ok(false)` for any non-Python UDF, signalling
502-
/// the caller to delegate to its `inner`.
501+
/// `Ok(true)` when the payload (`DFPYUDF1` prefix + cloudpickled
502+
/// tuple) was written and the caller should skip its inner codec.
503+
/// Returns `Ok(false)` for any non-Python UDF, signalling the caller
504+
/// to delegate to its `inner`.
503505
pub(crate) fn try_encode_python_scalar_udf(node: &ScalarUDF, buf: &mut Vec<u8>) -> Result<bool> {
504506
let Some(py_udf) = node
505507
.inner()
@@ -698,7 +700,7 @@ fn volatility_wire_str(v: Volatility) -> &'static str {
698700

699701
/// Cached handle to the `cloudpickle` module.
700702
///
701-
/// The encode/decode helpers above would otherwise re-resolve the
703+
/// Six encode/decode helpers below would otherwise re-resolve the
702704
/// module on every call. `py.import` is backed by `sys.modules` and
703705
/// therefore cheap, but each call still walks a dict and re-binds the
704706
/// result; a plan with many Python UDFs pays that cost per UDF.

0 commit comments

Comments
 (0)