Skip to content

Commit cc5ce7e

Browse files
timsaucerclaude
andcommitted
docs: reframe distributed-expression docs around the user goal
User-facing docs throughout this PR led with "pickle support": filename-shaped headings, function docstrings describing how things get cloudpickled into a Rust-side codec, etc. That's the implementation pathway, not the user's goal. The user's goal is to build an expression in a driver process and ship it to worker processes for distributed evaluation. Pickle is the mechanism Python provides to make that work; we hook into it. End users typically don't care how the bytes are produced — they care which references survive the trip and what they have to register on each worker. Reframe across user-facing surfaces: * `docs/source/user-guide/io/distributing_expressions.rst` — leads with the worker-pool use case, drops `PythonUDFCodec` / cloudpickle vocabulary, presents "what travels with the expression" as the user contract. * `datafusion.ipc` module docstring + `set_worker_ctx` / `clear_worker_ctx` / `get_worker_ctx` — describes what the user installs and why, not internal lookup details. * `Expr.to_bytes` / `from_bytes` / `__reduce__` — describes what's shipped vs what travels by name; cross-references the user guide instead of repeating the codec story. * `examples/ray_pickle_expr.py` header + comment + README entry — goal-first wording. * Pickle test module docstrings — drop the dangling reference to `PythonUDFCodec` (also a stale name post-PR1). Code behavior unchanged. 1088 tests still green. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 0879309 commit cc5ce7e

7 files changed

Lines changed: 139 additions & 120 deletions

File tree

docs/source/user-guide/io/distributing_expressions.rst

Lines changed: 70 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -18,41 +18,36 @@
1818
Distributing expressions across processes
1919
=========================================
2020

21-
DataFusion expressions (:py:class:`~datafusion.Expr`) can be serialized and
22-
shipped across process boundaries — useful for distributing work over a
23-
``multiprocessing.Pool``, a Ray actor pool, or any framework that supports a
24-
per-worker initialization hook.
21+
A common pattern is to build a DataFusion expression
22+
(:py:class:`~datafusion.Expr`) in a driver process, hand it to a pool of
23+
worker processes (``multiprocessing.Pool``, a Ray actor pool, or any other
24+
framework with a per-worker initialization hook), and have each worker
25+
evaluate the expression against its own slice of data.
2526

26-
Pickle support
27-
--------------
27+
DataFusion expressions support this directly: they can be sent through
28+
:py:mod:`pickle` like any other Python object. Python scalar UDFs ride along
29+
inside the pickled bytes — the receiver does not need to pre-register them.
2830

29-
:py:class:`~datafusion.Expr` implements the pickle protocol directly. Call
30-
:py:func:`pickle.dumps` on an expression and ship the bytes; the receiver
31-
calls :py:func:`pickle.loads`. Python *scalar UDFs* are cloudpickled into the
32-
proto wire format by a Rust-side codec (``PythonUDFCodec``), so the blob is
33-
self-contained — the receiver does not need to pre-register the UDF.
31+
Basic worker-pool example
32+
-------------------------
3433

3534
.. code-block:: python
3635
3736
import multiprocessing as mp
3837
import pickle
3938
4039
import pyarrow as pa
41-
from datafusion import SessionContext, col, lit, udf
40+
from datafusion import SessionContext, col, udf
4241
43-
def init_worker():
44-
# Optional: install a worker context for aggregate / window UDFs,
45-
# table providers, or Rust-side function registrations. Not needed
46-
# for built-ins or Python scalar UDFs.
47-
pass
4842
4943
def evaluate(blob_and_batch):
5044
blob, batch = blob_and_batch
51-
expr = pickle.loads(blob)
45+
expr = pickle.loads(blob) # Python scalar UDFs ride along inline.
5246
ctx = SessionContext()
5347
df = ctx.from_pydict({"a": batch})
5448
return df.with_column("result", expr).select("result").to_pydict()["result"]
5549
50+
5651
if __name__ == "__main__":
5752
double = udf(
5853
lambda arr: pa.array([(v.as_py() or 0) * 2 for v in arr]),
@@ -68,74 +63,92 @@ self-contained — the receiver does not need to pre-register the UDF.
6863
)
6964
print(results) # [[2, 4, 6], [20, 40, 60]]
7065
71-
Worker-scoped context
72-
---------------------
7366
74-
For references the codec cannot inline — aggregate UDFs, window UDFs, FFI
75-
capsule UDFs, or anything resolved through the
76-
:class:`SessionContext`'s function registry — set a worker-scoped context
77-
once per process using :py:func:`datafusion.ipc.set_worker_ctx`:
67+
What travels with the expression
68+
--------------------------------
69+
70+
* **Built-in functions** (``abs``, ``length``, arithmetic, comparisons, etc.)
71+
— fully portable. Worker needs nothing pre-registered.
72+
* **Python scalar UDFs** (defined with :py:func:`datafusion.udf`) — fully
73+
portable. The callable and its signature travel inside the pickled bytes
74+
and are reconstructed on the worker automatically.
75+
* **Aggregate UDFs**, **window UDFs**, **UDFs imported via the FFI capsule
76+
protocol** — travel **by name only**. The worker must already have a
77+
matching registration on its :py:class:`SessionContext`. Without that
78+
registration, evaluation raises an error.
79+
80+
Registering shared UDFs on workers
81+
----------------------------------
82+
83+
When an expression references something that travels by name only (aggregate
84+
UDF, window UDF, FFI UDF), set up the worker's :py:class:`SessionContext`
85+
once per process and install it as the *worker context*:
7886

7987
.. code-block:: python
8088
8189
from datafusion import SessionContext
8290
from datafusion.ipc import set_worker_ctx
8391
92+
8493
def init_worker():
8594
ctx = SessionContext()
86-
ctx.register_udaf(my_aggregate) # if needed
95+
ctx.register_udaf(my_aggregate)
8796
set_worker_ctx(ctx)
8897
98+
8999
with mp.get_context("forkserver").Pool(
90100
processes=4, initializer=init_worker
91101
) as pool:
92102
...
93103
94-
Without a worker context, unpickling falls back to a fresh
95-
:py:class:`SessionContext`. Built-in functions resolve; Python scalar UDFs
96-
ride along inside the blob via the codec. References to aggregate / window
97-
UDFs or other registry-only entries raise an informative error if not
98-
registered on the worker.
104+
Inside a worker, expressions reconstructed by :py:func:`pickle.loads` resolve
105+
their by-name references against the installed worker context. If no worker
106+
context is installed, a fresh empty :py:class:`SessionContext` is used —
107+
fine for expressions that only reference built-ins and Python scalar UDFs,
108+
but anything by-name-only will fail to resolve.
99109

100110
Python 3.14 default change
101111
--------------------------
102112

103113
Python 3.14 changed the POSIX default start method for
104-
:py:mod:`multiprocessing` from ``fork`` to ``forkserver``. With ``fork``, a
105-
context set in the parent was visible in workers via copy-on-write; with
106-
``forkserver`` and ``spawn`` it is not. The codec + worker-init pattern works
107-
on every start method — prefer it over relying on inherited state.
108-
109-
Trade-offs of inline UDFs
110-
-------------------------
111-
112-
* **Blob size** — cloudpickled callables add bytes per blob. A trivial
113-
built-in expression is ~20 bytes; an expression referencing a Python scalar
114-
UDF is hundreds of bytes (the cloudpickled callable + signature). Pre-register
115-
shared UDFs on workers via :py:func:`~datafusion.ipc.set_worker_ctx` when
116-
the same UDF is shipped many times and you want to avoid the overhead.
117-
* **Closure capture** — cloudpickle captures closure state. Surprises are
118-
possible if the UDF closes over large objects, module-level mutable state,
119-
or non-portable file paths.
120-
* **FFI scalar UDFs cannot be inlined** — PyCapsule-backed UDFs have no
121-
Python callable to cloudpickle. The codec leaves their ``fun_definition``
122-
empty; the receiver must have a matching registration.
123-
* **Aggregate and window UDFs cannot be inlined yet** — their Python state
124-
is held inside opaque factory closures on the Rust side. Pre-register on
125-
the worker.
114+
:py:mod:`multiprocessing` from ``fork`` to ``forkserver``. With ``fork``, any
115+
state set in the parent was visible in workers via copy-on-write; with
116+
``forkserver`` and ``spawn`` it is not. The
117+
:py:func:`~datafusion.ipc.set_worker_ctx` pattern works on every start
118+
method — prefer it over relying on inherited state.
119+
120+
Practical considerations
121+
------------------------
122+
123+
* **Pickled size scales with what travels inline.** A pickled expression of
124+
just built-ins is small (tens of bytes). An expression carrying a Python
125+
scalar UDF is hundreds of bytes (the callable and its signature). When the
126+
same UDF is shipped many times, pre-registering it on each worker via
127+
:py:func:`~datafusion.ipc.set_worker_ctx` and referring to it by name
128+
cuts the per-blob overhead.
129+
* **Closure capture.** When a Python scalar UDF closes over surrounding
130+
state — local variables, module-level objects, file paths — that state
131+
is captured at pickling time. Surprises are possible if the captured
132+
state is large, mutable, or not portable to the worker's environment.
133+
* **Aggregate and window UDFs always travel by name.** Their Python state
134+
is held inside opaque factory closures that cannot be reconstructed from
135+
bytes alone. Use :py:func:`~datafusion.ipc.set_worker_ctx` to register
136+
them on each worker.
126137

127138
Security
128139
--------
129140

130141
.. warning::
131142

132-
Pickle blobs containing inlined UDFs deserialize via :py:mod:`cloudpickle`,
133-
which executes arbitrary code on the receiver. Only :py:func:`pickle.loads`
134-
blobs from trusted sources. For untrusted-source workflows, restrict the
135-
sender to built-in functions and pre-registered Rust-side UDFs.
143+
Reconstructing an expression containing a Python scalar UDF executes
144+
arbitrary Python code on the receiver. Only :py:func:`pickle.loads`
145+
expressions from trusted sources. For untrusted-source workflows,
146+
restrict senders to built-in functions and pre-registered Rust-side
147+
UDFs, and never feed externally supplied bytes through
148+
:py:func:`pickle.loads`.
136149

137150
See also
138151
--------
139152

140-
* :py:mod:`datafusion.ipc` — module-level API reference.
153+
* :py:mod:`datafusion.ipc` — worker context API.
141154
* ``examples/ray_pickle_expr.py`` — runnable Ray actor example.

examples/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ Here is a direct link to the file used in the examples:
4646

4747
### Distributing DataFusion expressions
4848

49-
- [Pickle expressions and send them to Ray actors](./ray_pickle_expr.py)
49+
- [Distribute expression evaluation across Ray actors](./ray_pickle_expr.py)
5050

5151
### Substrait Support
5252

examples/ray_pickle_expr.py

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,11 @@
1717

1818
"""Distribute DataFusion expressions to Ray actors.
1919
20-
This example shows the worker-init pattern from the user guide adapted to
21-
Ray's actor model: each actor builds its own :class:`SessionContext`.
22-
Python scalar UDFs travel inside the pickle blob via the Rust-side
23-
``PythonUDFCodec`` — no actor-side pre-registration is required. The
24-
worker context (set via :func:`datafusion.ipc.set_worker_ctx`) is still
25-
useful for aggregate/window UDFs or other registry-only entries; we set
26-
one here to show the pattern.
20+
Build an expression in the driver, ship it to a pool of Ray actors, and have
21+
each actor evaluate it against its own slice of data. Each actor sets up
22+
its own :class:`SessionContext` once in `__init__` and registers any UDFs
23+
it needs to resolve by name. Python scalar UDFs travel with the shipped
24+
expression and need no actor-side pre-registration.
2725
2826
Prerequisites:
2927
pip install ray
@@ -58,8 +56,9 @@ class DataFusionWorker:
5856
def __init__(self) -> None:
5957
ctx = SessionContext()
6058
ctx.register_udf(_build_double_udf())
61-
# The worker context is what Expr.__setstate__ consults when
62-
# pickled expressions arrive at this actor.
59+
# Install the actor's SessionContext as its worker context;
60+
# expressions reconstructed in this actor will resolve their
61+
# by-name references against it.
6362
set_worker_ctx(ctx)
6463
self._ctx = ctx
6564

python/datafusion/expr.py

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -434,30 +434,32 @@ def variant_name(self) -> str:
434434
return self.expr.variant_name()
435435

436436
def to_bytes(self, ctx: SessionContext | None = None) -> bytes:
437-
"""Serialize this expression to protobuf bytes.
437+
"""Serialize this expression to bytes for shipping to another process.
438438
439-
Python scalar UDFs are inlined into the returned bytes — the
440-
receiver does not need to pre-register them. Aggregate UDFs,
441-
window UDFs, and UDFs imported via the FFI capsule protocol
442-
are stored by name only; the receiver must have them
443-
registered.
439+
Use this — or :func:`pickle.dumps` — to send an expression to a
440+
worker process for distributed evaluation.
444441
445-
When ``ctx`` is supplied, encoding also routes through the
446-
session's installed codec stack.
442+
Built-in functions and Python scalar UDFs travel inside the
443+
returned bytes; the worker does not need to pre-register them.
444+
Aggregate UDFs, window UDFs, and UDFs imported via the FFI
445+
capsule protocol travel by name only and must be registered on
446+
the worker. See :doc:`/user-guide/io/distributing_expressions`.
447447
"""
448448
ctx_arg = ctx.ctx if ctx is not None else None
449449
return bytes(self.expr.to_bytes(ctx_arg))
450450

451451
@classmethod
452452
def from_bytes(cls, buf: bytes, ctx: SessionContext | None = None) -> Expr:
453-
"""Decode an expression from serialized protobuf bytes.
454-
455-
``ctx`` is the receiver :class:`SessionContext` used to resolve
456-
function references not inlined by the codec (aggregate UDFs,
457-
window UDFs, FFI UDFs). When ``ctx`` is ``None`` the worker
458-
context set via :func:`datafusion.ipc.set_worker_ctx` is
459-
consulted; if no worker context is set, a fresh
460-
:class:`SessionContext` is used.
453+
"""Reconstruct an expression from serialized bytes.
454+
455+
Accepts output of :meth:`to_bytes` or :func:`pickle.dumps`.
456+
``ctx`` is the :class:`SessionContext` used to resolve any
457+
function references that travel by name — aggregate UDFs, window
458+
UDFs, FFI UDFs. When ``ctx`` is ``None`` the worker context
459+
installed via :func:`datafusion.ipc.set_worker_ctx` is consulted;
460+
if no worker context is installed, a fresh
461+
:class:`SessionContext` is used (sufficient for built-ins and
462+
Python scalar UDFs).
461463
"""
462464
from datafusion.ipc import _resolve_ctx
463465

@@ -467,14 +469,12 @@ def from_bytes(cls, buf: bytes, ctx: SessionContext | None = None) -> Expr:
467469
def __reduce__(self) -> tuple:
468470
"""Pickle protocol hook.
469471
470-
Python scalar UDFs referenced by the expression are inlined
471-
into the pickle blob, so the receiver does not need to
472-
pre-register them. On unpickle the bytes are decoded against
473-
the worker context set via
474-
:func:`datafusion.ipc.set_worker_ctx` (or a fresh
475-
:class:`SessionContext` if none) for any registry-resolved
476-
references — aggregate UDFs, window UDFs, UDFs imported via
477-
the FFI capsule protocol.
472+
Lets expressions be shipped to worker processes via
473+
:func:`pickle.dumps` / :func:`pickle.loads`. The worker's
474+
:class:`SessionContext` for resolving by-name references is
475+
looked up via :func:`datafusion.ipc.set_worker_ctx`, falling
476+
back to a fresh empty :class:`SessionContext` if none has been
477+
installed on the worker.
478478
"""
479479
return (Expr._reconstruct, (self.to_bytes(),))
480480

python/datafusion/ipc.py

Lines changed: 26 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -15,28 +15,29 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717

18-
"""Inter-process communication helpers for distributing DataFusion expressions.
18+
"""Worker-side setup for distributing DataFusion expressions.
1919
20-
This module provides a worker-scoped :class:`SessionContext` slot that
21-
:meth:`Expr.__reduce__` consults when unpickling expressions across process
22-
boundaries. Set the worker context once per worker process (typically from a
23-
``multiprocessing.Pool`` initializer or a Ray actor ``__init__``):
20+
When a :class:`Expr` is shipped to a worker process (e.g. through
21+
:func:`multiprocessing.Pool` or a Ray actor), the worker reconstructs the
22+
expression against a :class:`SessionContext`. If the expression references
23+
aggregate UDFs, window UDFs, table providers, or UDFs imported via the FFI
24+
capsule protocol — anything the worker would otherwise resolve from its
25+
registered functions — install a configured :class:`SessionContext` once
26+
per worker:
2427
2528
>>> # doctest: +SKIP
2629
>>> from datafusion import SessionContext
2730
>>> from datafusion.ipc import set_worker_ctx
2831
>>>
2932
>>> def init_worker():
3033
... ctx = SessionContext()
31-
... # register Rust-backed UDFs / aggregates / window functions here
34+
... ctx.register_udaf(my_aggregate)
3235
... set_worker_ctx(ctx)
3336
34-
Python scalar UDFs do not need pre-registration: their definitions
35-
travel inside the pickled expression and are reconstructed on the
36-
receiver automatically. The worker context is only needed when the
37-
expression references aggregate UDFs, window UDFs, table providers,
38-
or UDFs imported via the FFI capsule protocol — anything the
39-
receiver would otherwise resolve from its registered functions.
37+
Built-in functions and Python scalar UDFs travel inside the shipped
38+
expression itself and do not need pre-registration on the worker.
39+
40+
See :doc:`/user-guide/io/distributing_expressions` for the full pattern.
4041
"""
4142

4243
from __future__ import annotations
@@ -59,25 +60,30 @@
5960

6061

6162
def set_worker_ctx(ctx: SessionContext) -> None:
62-
"""Register the receiver :class:`SessionContext` for this worker.
63-
64-
Call once per worker process — typically from a ``Pool`` initializer or a
65-
Ray actor ``__init__``. Idempotent: overwrites any previous value.
63+
"""Install this worker's :class:`SessionContext` for shipped expressions.
6664
67-
The worker context is stored in a thread-local slot, so each thread within
68-
a worker can install its own context independently.
65+
Call once per worker — typically from a ``multiprocessing.Pool``
66+
initializer or a Ray actor ``__init__``. Idempotent: overwrites any
67+
previous value. Stored in a thread-local slot, so each thread within a
68+
worker may install its own context independently.
6969
"""
7070
_local.ctx = ctx
7171

7272

7373
def clear_worker_ctx() -> None:
74-
"""Remove the worker context, restoring fresh-context fallback behavior."""
74+
"""Remove this worker's installed :class:`SessionContext`.
75+
76+
After clearing, expressions reconstructed in this worker fall back to a
77+
fresh empty :class:`SessionContext` — adequate for built-ins and Python
78+
scalar UDFs, but anything that travels by name only (aggregate UDFs,
79+
window UDFs, FFI UDFs) will fail to resolve.
80+
"""
7581
if hasattr(_local, "ctx"):
7682
del _local.ctx
7783

7884

7985
def get_worker_ctx() -> SessionContext | None:
80-
"""Return the worker context if set, else ``None``."""
86+
"""Return this worker's installed :class:`SessionContext`, or ``None``."""
8187
return getattr(_local, "ctx", None)
8288

8389

python/tests/test_pickle_expr.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,11 @@
1717

1818
"""In-process pickle round-trip tests for :class:`Expr`.
1919
20-
The Rust-side ``PythonUDFCodec`` cloudpickles Python scalar UDF callables
21-
directly into the proto wire format, so pickle blobs are self-contained.
22-
The worker context (:mod:`datafusion.ipc`) is only needed for references
23-
the codec can't inline — aggregate UDFs, window UDFs, FFI capsule UDFs.
20+
Built-in functions and Python scalar UDFs travel with the pickled
21+
expression and do not need worker-side pre-registration. The worker
22+
context (:mod:`datafusion.ipc`) is only consulted for references that
23+
travel by name — aggregate UDFs, window UDFs, UDFs imported via the FFI
24+
capsule protocol.
2425
2526
Cross-process tests live in ``test_pickle_multiprocessing.py``.
2627
"""

0 commit comments

Comments
 (0)