Skip to content

Commit 71760c5

Browse files
timsaucerclaude
andcommitted
docs: extend the inline-UDF guarantee to aggregate + window UDFs
With aggregate UDFs and window UDFs now reconstructable from bytes alone, the user-facing contract simplifies to: * Built-in functions and **all** Python UDFs (scalar, aggregate, window) travel inside the shipped expression. No worker-side pre-registration. * Only UDFs imported via the FFI capsule protocol travel by name and require pre-registration via `set_worker_ctx`. Update each user-facing surface: * `docs/source/user-guide/io/distributing_expressions.rst` — drop the "aggregate/window UDFs travel by name only" caveat; rename the practical-considerations entry that called out the limitation. * `python/datafusion/ipc.py` module + `clear_worker_ctx` — explicitly list scalar, aggregate, and window as inline-portable. * `python/datafusion/expr.py` — `to_bytes` and `__reduce__` docstrings updated. * Test module docstrings updated. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 4b51402 commit 71760c5

5 files changed

Lines changed: 59 additions & 57 deletions

File tree

docs/source/user-guide/io/distributing_expressions.rst

Lines changed: 31 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -69,20 +69,25 @@ What travels with the expression
6969

7070
* **Built-in functions** (``abs``, ``length``, arithmetic, comparisons, etc.)
7171
— fully portable. Worker needs nothing pre-registered.
72-
* **Python scalar UDFs** (defined with :py:func:`datafusion.udf`) — fully
73-
portable. The callable and its signature travel inside the pickled bytes
74-
and are reconstructed on the worker automatically.
75-
* **Aggregate UDFs**, **window UDFs**, **UDFs imported via the FFI capsule
76-
protocol** — travel **by name only**. The worker must already have a
77-
matching registration on its :py:class:`SessionContext`. Without that
78-
registration, evaluation raises an error.
72+
* **Python UDFs** — fully portable. The callable, its signature, and any
73+
state captured in closures travel inside the pickled bytes and are
74+
reconstructed on the worker automatically. Applies equally to:
75+
76+
* **scalar UDFs** (:py:func:`datafusion.udf`)
77+
* **aggregate UDFs** (:py:func:`datafusion.udaf`)
78+
* **window UDFs** (:py:func:`datafusion.udwf`)
79+
* **UDFs imported via the FFI capsule protocol** — travel **by name only**.
80+
The worker must already have a matching registration on its
81+
:py:class:`SessionContext`. Without that registration, evaluation raises
82+
an error.
7983

8084
Registering shared UDFs on workers
8185
----------------------------------
8286

83-
When an expression references something that travels by name only (aggregate
84-
UDF, window UDF, FFI UDF), set up the worker's :py:class:`SessionContext`
85-
once per process and install it as the *worker context*:
87+
When an expression references an FFI capsule UDF (or any UDF the worker
88+
must resolve from its registered functions), set up the worker's
89+
:py:class:`SessionContext` once per process and install it as the
90+
*worker context*:
8691

8792
.. code-block:: python
8893
@@ -92,7 +97,7 @@ once per process and install it as the *worker context*:
9297
9398
def init_worker():
9499
ctx = SessionContext()
95-
ctx.register_udaf(my_aggregate)
100+
ctx.register_udaf(my_ffi_aggregate)
96101
set_worker_ctx(ctx)
97102
98103
@@ -104,8 +109,8 @@ once per process and install it as the *worker context*:
104109
Inside a worker, expressions reconstructed by :py:func:`pickle.loads` resolve
105110
their by-name references against the installed worker context. If no worker
106111
context is installed, a fresh empty :py:class:`SessionContext` is used —
107-
fine for expressions that only reference built-ins and Python scalar UDFs,
108-
but anything by-name-only will fail to resolve.
112+
fine for expressions that only reference built-ins and Python UDFs, but
113+
FFI-capsule-backed registrations will fail to resolve.
109114

110115
Python 3.14 default change
111116
--------------------------
@@ -122,30 +127,25 @@ Practical considerations
122127

123128
* **Pickled size scales with what travels inline.** A pickled expression of
124129
just built-ins is small (tens of bytes). An expression carrying a Python
125-
scalar UDF is hundreds of bytes (the callable and its signature). When the
126-
same UDF is shipped many times, pre-registering it on each worker via
127-
:py:func:`~datafusion.ipc.set_worker_ctx` and referring to it by name
128-
cuts the per-blob overhead.
129-
* **Closure capture.** When a Python scalar UDF closes over surrounding
130-
state — local variables, module-level objects, file paths — that state
131-
is captured at pickling time. Surprises are possible if the captured
132-
state is large, mutable, or not portable to the worker's environment.
133-
* **Aggregate and window UDFs always travel by name.** Their Python state
134-
is held inside opaque factory closures that cannot be reconstructed from
135-
bytes alone. Use :py:func:`~datafusion.ipc.set_worker_ctx` to register
136-
them on each worker.
130+
UDF is hundreds of bytes (the callable and its signature). When the same
131+
UDF is shipped many times, registering an equivalent FFI-capsule UDF on
132+
each worker via :py:func:`~datafusion.ipc.set_worker_ctx` and referring
133+
to it by name cuts the per-blob overhead.
134+
* **Closure capture.** When a Python UDF closes over surrounding state —
135+
local variables, module-level objects, file paths — that state is
136+
captured at pickling time. Surprises are possible if the captured state
137+
is large, mutable, or not portable to the worker's environment.
137138

138139
Security
139140
--------
140141

141142
.. warning::
142143

143-
Reconstructing an expression containing a Python scalar UDF executes
144-
arbitrary Python code on the receiver. Only :py:func:`pickle.loads`
145-
expressions from trusted sources. For untrusted-source workflows,
146-
restrict senders to built-in functions and pre-registered Rust-side
147-
UDFs, and never feed externally supplied bytes through
148-
:py:func:`pickle.loads`.
144+
Reconstructing an expression containing a Python UDF executes arbitrary
145+
Python code on the receiver. Only :py:func:`pickle.loads` expressions
146+
from trusted sources. For untrusted-source workflows, restrict senders
147+
to built-in functions and pre-registered Rust-side UDFs, and never feed
148+
externally supplied bytes through :py:func:`pickle.loads`.
149149

150150
See also
151151
--------

python/datafusion/expr.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -439,11 +439,11 @@ def to_bytes(self, ctx: SessionContext | None = None) -> bytes:
439439
Use this — or :func:`pickle.dumps` — to send an expression to a
440440
worker process for distributed evaluation.
441441
442-
Built-in functions and Python scalar UDFs travel inside the
443-
returned bytes; the worker does not need to pre-register them.
444-
Aggregate UDFs, window UDFs, and UDFs imported via the FFI
445-
capsule protocol travel by name only and must be registered on
446-
the worker. See :doc:`/user-guide/io/distributing_expressions`.
442+
Built-in functions and Python UDFs (scalar, aggregate, window)
443+
travel inside the returned bytes; the worker does not need to
444+
pre-register them. UDFs imported via the FFI capsule protocol
445+
travel by name only and must be registered on the worker. See
446+
:doc:`/user-guide/io/distributing_expressions`.
447447
"""
448448
ctx_arg = ctx.ctx if ctx is not None else None
449449
return bytes(self.expr.to_bytes(ctx_arg))
@@ -470,8 +470,10 @@ def __reduce__(self) -> tuple:
470470
"""Pickle protocol hook.
471471
472472
Lets expressions be shipped to worker processes via
473-
:func:`pickle.dumps` / :func:`pickle.loads`. The worker's
474-
:class:`SessionContext` for resolving by-name references is
473+
:func:`pickle.dumps` / :func:`pickle.loads`. Built-in functions
474+
and Python UDFs travel inside the pickle bytes; only FFI-capsule
475+
UDFs require pre-registration on the worker. The worker's
476+
:class:`SessionContext` for resolving those references is
475477
looked up via :func:`datafusion.ipc.set_worker_ctx`, falling
476478
back to a fresh empty :class:`SessionContext` if none has been
477479
installed on the worker.

python/datafusion/ipc.py

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,22 +20,23 @@
2020
When a :class:`Expr` is shipped to a worker process (e.g. through
2121
:func:`multiprocessing.Pool` or a Ray actor), the worker reconstructs the
2222
expression against a :class:`SessionContext`. If the expression references
23-
aggregate UDFs, window UDFs, table providers, or UDFs imported via the FFI
24-
capsule protocol — anything the worker would otherwise resolve from its
25-
registered functions — install a configured :class:`SessionContext` once
26-
per worker:
23+
UDFs imported via the FFI capsule protocol — or any UDF the worker would
24+
otherwise resolve from its registered functions rather than from inside
25+
the shipped expression — install a configured :class:`SessionContext`
26+
once per worker:
2727
2828
>>> # doctest: +SKIP
2929
>>> from datafusion import SessionContext
3030
>>> from datafusion.ipc import set_worker_ctx
3131
>>>
3232
>>> def init_worker():
3333
... ctx = SessionContext()
34-
... ctx.register_udaf(my_aggregate)
34+
... ctx.register_udaf(my_ffi_aggregate)
3535
... set_worker_ctx(ctx)
3636
37-
Built-in functions and Python scalar UDFs travel inside the shipped
38-
expression itself and do not need pre-registration on the worker.
37+
Built-in functions and Python UDFs (scalar, aggregate, window) travel
38+
inside the shipped expression itself and do not need pre-registration
39+
on the worker.
3940
4041
See :doc:`/user-guide/io/distributing_expressions` for the full pattern.
4142
"""
@@ -75,8 +76,8 @@ def clear_worker_ctx() -> None:
7576
7677
After clearing, expressions reconstructed in this worker fall back to a
7778
fresh empty :class:`SessionContext` — adequate for built-ins and Python
78-
scalar UDFs, but anything that travels by name only (aggregate UDFs,
79-
window UDFs, FFI UDFs) will fail to resolve.
79+
UDFs (scalar, aggregate, window), but anything imported via the FFI
80+
capsule protocol will fail to resolve.
8081
"""
8182
if hasattr(_local, "ctx"):
8283
del _local.ctx

python/tests/test_pickle_expr.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,10 @@
1717

1818
"""In-process pickle round-trip tests for :class:`Expr`.
1919
20-
Built-in functions and Python scalar UDFs travel with the pickled
21-
expression and do not need worker-side pre-registration. The worker
22-
context (:mod:`datafusion.ipc`) is only consulted for references that
23-
travel by name — aggregate UDFs, window UDFs, UDFs imported via the FFI
24-
capsule protocol.
20+
Built-in functions and Python UDFs (scalar, aggregate, window) travel
21+
with the pickled expression and do not need worker-side pre-registration.
22+
The worker context (:mod:`datafusion.ipc`) is only consulted for UDFs
23+
imported via the FFI capsule protocol.
2524
2625
Cross-process tests live in ``test_pickle_multiprocessing.py``.
2726
"""

python/tests/test_pickle_multiprocessing.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,11 @@
1818
"""Cross-process pickle tests for :class:`Expr`.
1919
2020
Workers run with each :mod:`multiprocessing` start method (``fork``,
21-
``forkserver``, ``spawn``). Python scalar UDFs travel with the pickled
22-
expression and need no worker-side pre-registration. Worker-side helpers
23-
live in ``_pickle_multiprocessing_helpers`` — the underscore prefix
24-
avoids pytest collection so the module imports under its real name in
25-
worker subprocesses.
21+
``forkserver``, ``spawn``). Python UDFs (scalar, aggregate, window) travel
22+
with the pickled expression and need no worker-side pre-registration.
23+
Worker-side helpers live in ``_pickle_multiprocessing_helpers`` — the
24+
underscore prefix avoids pytest collection so the module imports under
25+
its real name in worker subprocesses.
2626
"""
2727

2828
from __future__ import annotations

0 commit comments

Comments
 (0)