Skip to content

Commit c7f95ac

Browse files
timsaucerclaude
andcommitted
docs(pickle): user guide + Ray example + CI backstop + UDF.name
Companion to the pickle work in the previous commit. Ships the discoverable surface a user would actually reach for when they hit "how do I distribute these expressions": * `docs/source/user-guide/io/distributing_expressions.rst` — end-to-end user guide covering the recommended `Pool(initializer=...)` pattern, the worker context shape, what does and does not survive the round-trip (scalar UDFs yes, UDAF/UDWF/FFI by name), Python 3.14 start-method change, and the cloudpickle security note. * `examples/ray_pickle_expr.py` — runnable Ray actor demo using `set_worker_ctx` from an actor `__init__`. * `examples/README.md` — links to the Ray example. * `docs/source/user-guide/io/index.rst` — adds the new page to the IO TOC. * `.github/workflows/test.yml` — 30-minute `timeout-minutes` backstop on the test matrix so a hung multiprocessing worker (e.g. during a pickle regression) does not block CI indefinitely. * `python/datafusion/user_defined.py` — `ScalarUDF` / `AggregateUDF` / `WindowUDF` get a `.name` property surfacing the registered name. Useful for tests asserting an expression carries a specific UDF reference, and for users debugging worker registrations. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent cc1bff7 commit c7f95ac

6 files changed

Lines changed: 263 additions & 0 deletions

File tree

.github/workflows/test.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,9 @@ env:
2929
jobs:
3030
test-matrix:
3131
runs-on: ubuntu-latest
32+
# Backstop: a hung multiprocessing worker (e.g. during a pickle regression)
33+
# should not block CI longer than this.
34+
timeout-minutes: 30
3235
strategy:
3336
fail-fast: false
3437
matrix:
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
.. Licensed to the Apache Software Foundation (ASF) under one
2+
.. or more contributor license agreements. See the NOTICE file
3+
.. distributed with this work for additional information
4+
.. regarding copyright ownership. The ASF licenses this file
5+
.. to you under the Apache License, Version 2.0 (the
6+
.. "License"); you may not use this file except in compliance
7+
.. with the License. You may obtain a copy of the License at
8+
9+
.. http://www.apache.org/licenses/LICENSE-2.0
10+
11+
.. Unless required by applicable law or agreed to in writing,
12+
.. software distributed under the License is distributed on an
13+
.. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
.. KIND, either express or implied. See the License for the
15+
.. specific language governing permissions and limitations
16+
.. under the License.
17+
18+
Distributing expressions across processes
19+
=========================================
20+
21+
DataFusion expressions (:py:class:`~datafusion.Expr`) can be serialized and
22+
shipped across process boundaries — useful for distributing work over a
23+
``multiprocessing.Pool``, a Ray actor pool, or any framework that supports a
24+
per-worker initialization hook.
25+
26+
Pickle support
27+
--------------
28+
29+
:py:class:`~datafusion.Expr` implements the pickle protocol directly. Call
30+
:py:func:`pickle.dumps` on an expression and ship the bytes; the receiver
31+
calls :py:func:`pickle.loads`. Python *scalar UDFs* are cloudpickled into the
32+
proto wire format by a Rust-side codec (``PythonUDFCodec``), so the blob is
33+
self-contained — the receiver does not need to pre-register the UDF.
34+
35+
.. code-block:: python
36+
37+
import multiprocessing as mp
38+
import pickle
39+
40+
import pyarrow as pa
41+
from datafusion import SessionContext, col, lit, udf
42+
43+
def init_worker():
44+
# Optional: install a worker context for aggregate / window UDFs,
45+
# table providers, or Rust-side function registrations. Not needed
46+
# for built-ins or Python scalar UDFs.
47+
pass
48+
49+
def evaluate(blob_and_batch):
50+
blob, batch = blob_and_batch
51+
expr = pickle.loads(blob)
52+
ctx = SessionContext()
53+
df = ctx.from_pydict({"a": batch})
54+
return df.with_column("result", expr).select("result").to_pydict()["result"]
55+
56+
if __name__ == "__main__":
57+
double = udf(
58+
lambda arr: pa.array([(v.as_py() or 0) * 2 for v in arr]),
59+
[pa.int64()], pa.int64(), volatility="immutable", name="double",
60+
)
61+
blob = pickle.dumps(double(col("a")))
62+
63+
mp_ctx = mp.get_context("forkserver")
64+
with mp_ctx.Pool(processes=4) as pool:
65+
results = pool.map(
66+
evaluate,
67+
[(blob, [1, 2, 3]), (blob, [10, 20, 30])],
68+
)
69+
print(results) # [[2, 4, 6], [20, 40, 60]]
70+
71+
Worker-scoped context
72+
---------------------
73+
74+
For references the codec cannot inline — aggregate UDFs, window UDFs, FFI
75+
capsule UDFs, or anything resolved through the
76+
:class:`SessionContext`'s function registry — set a worker-scoped context
77+
once per process using :py:func:`datafusion.ipc.set_worker_ctx`:
78+
79+
.. code-block:: python
80+
81+
from datafusion import SessionContext
82+
from datafusion.ipc import set_worker_ctx
83+
84+
def init_worker():
85+
ctx = SessionContext()
86+
ctx.register_udaf(my_aggregate) # if needed
87+
set_worker_ctx(ctx)
88+
89+
with mp.get_context("forkserver").Pool(
90+
processes=4, initializer=init_worker
91+
) as pool:
92+
...
93+
94+
Without a worker context, unpickling falls back to a fresh
95+
:py:class:`SessionContext`. Built-in functions resolve; Python scalar UDFs
96+
ride along inside the blob via the codec. References to aggregate / window
97+
UDFs or other registry-only entries raise an informative error if not
98+
registered on the worker.
99+
100+
Python 3.14 default change
101+
--------------------------
102+
103+
Python 3.14 changed the POSIX default start method for
104+
:py:mod:`multiprocessing` from ``fork`` to ``forkserver``. With ``fork``, a
105+
context set in the parent was visible in workers via copy-on-write; with
106+
``forkserver`` and ``spawn`` it is not. The codec + worker-init pattern works
107+
on every start method — prefer it over relying on inherited state.
108+
109+
Trade-offs of inline UDFs
110+
-------------------------
111+
112+
* **Blob size** — cloudpickled callables add bytes per blob. A trivial
113+
built-in expression is ~20 bytes; an expression referencing a Python scalar
114+
UDF is hundreds of bytes (the cloudpickled callable + signature). Pre-register
115+
shared UDFs on workers via :py:func:`~datafusion.ipc.set_worker_ctx` when
116+
the same UDF is shipped many times and you want to avoid the overhead.
117+
* **Closure capture** — cloudpickle captures closure state. Surprises are
118+
possible if the UDF closes over large objects, module-level mutable state,
119+
or non-portable file paths.
120+
* **FFI scalar UDFs cannot be inlined** — PyCapsule-backed UDFs have no
121+
Python callable to cloudpickle. The codec leaves their ``fun_definition``
122+
empty; the receiver must have a matching registration.
123+
* **Aggregate and window UDFs cannot be inlined yet** — their Python state
124+
is held inside opaque factory closures on the Rust side. Pre-register on
125+
the worker.
126+
127+
Security
128+
--------
129+
130+
.. warning::
131+
132+
Pickle blobs containing inlined UDFs deserialize via :py:mod:`cloudpickle`,
133+
which executes arbitrary code on the receiver. Only :py:func:`pickle.loads`
134+
blobs from trusted sources. For untrusted-source workflows, restrict the
135+
sender to built-in functions and pre-registered Rust-side UDFs.
136+
137+
See also
138+
--------
139+
140+
* :py:mod:`datafusion.ipc` — module-level API reference.
141+
* ``examples/ray_pickle_expr.py`` — runnable Ray actor example.

docs/source/user-guide/io/index.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ IO
2424
arrow
2525
avro
2626
csv
27+
distributing_expressions
2728
json
2829
parquet
2930
table_provider

examples/README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,10 @@ Here is a direct link to the file used in the examples:
4444
- [Register a Python UDF with DataFusion](./python-udf.py)
4545
- [Register a Python UDAF with DataFusion](./python-udaf.py)
4646

47+
### Distributing DataFusion expressions
48+
49+
- [Pickle expressions and send them to Ray actors](./ray_pickle_expr.py)
50+
4751
### Substrait Support
4852

4953
- [Serialize query plans using Substrait](./substrait.py)

examples/ray_pickle_expr.py

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
"""Distribute DataFusion expressions to Ray actors.
19+
20+
This example shows the worker-init pattern from the user guide adapted to
21+
Ray's actor model: each actor builds its own :class:`SessionContext`.
22+
Python scalar UDFs travel inside the pickle blob via the Rust-side
23+
``PythonUDFCodec`` — no actor-side pre-registration is required. The
24+
worker context (set via :func:`datafusion.ipc.set_worker_ctx`) is still
25+
useful for aggregate/window UDFs or other registry-only entries; we set
26+
one here to show the pattern.
27+
28+
Prerequisites:
29+
pip install ray
30+
31+
Run:
32+
python examples/ray_pickle_expr.py
33+
"""
34+
35+
import pickle
36+
37+
import pyarrow as pa
38+
import ray
39+
from datafusion import SessionContext, col, lit, udf
40+
from datafusion.ipc import set_worker_ctx
41+
42+
43+
def _build_double_udf():
44+
"""Return the demo UDF used by the actors."""
45+
return udf(
46+
lambda arr: pa.array([(v.as_py() or 0) * 2 for v in arr]),
47+
[pa.int64()],
48+
pa.int64(),
49+
volatility="immutable",
50+
name="double",
51+
)
52+
53+
54+
@ray.remote
55+
class DataFusionWorker:
56+
"""A Ray actor with a private :class:`SessionContext`."""
57+
58+
def __init__(self) -> None:
59+
ctx = SessionContext()
60+
ctx.register_udf(_build_double_udf())
61+
# The worker context is what Expr.__setstate__ consults when
62+
# pickled expressions arrive at this actor.
63+
set_worker_ctx(ctx)
64+
self._ctx = ctx
65+
66+
def evaluate(self, expr_blob: bytes, batch_pylist: list[int]) -> list[int]:
67+
"""Unpickle an Expr, run it over an in-memory batch, return results."""
68+
expr = pickle.loads(expr_blob)
69+
df = self._ctx.from_pydict({"a": batch_pylist})
70+
out = df.with_column("result", expr).select("result")
71+
return out.to_pydict()["result"]
72+
73+
74+
def main() -> None:
75+
ray.init(ignore_reinit_error=True)
76+
77+
sender = SessionContext()
78+
sender.register_udf(_build_double_udf())
79+
expr = _build_double_udf()(col("a")) + lit(1)
80+
blob = pickle.dumps(expr)
81+
print(f"pickled expression: {len(blob)} bytes")
82+
83+
workers = [DataFusionWorker.remote() for _ in range(2)]
84+
batches = [[1, 2, 3], [10, 20, 30], [100, 200, 300]]
85+
futures = [
86+
workers[i % len(workers)].evaluate.remote(blob, batch)
87+
for i, batch in enumerate(batches)
88+
]
89+
for batch, result in zip(batches, ray.get(futures), strict=True):
90+
print(f"input {batch} -> {result}")
91+
92+
ray.shutdown()
93+
94+
95+
if __name__ == "__main__":
96+
main()

python/datafusion/user_defined.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@ def __init__(
132132
133133
See helper method :py:func:`udf` for argument details.
134134
"""
135+
self._name = name
135136
if hasattr(func, "__datafusion_scalar_udf__"):
136137
self._udf = df_internal.ScalarUDF.from_pycapsule(func)
137138
return
@@ -141,6 +142,11 @@ def __init__(
141142
name, func, input_fields, return_field, str(volatility)
142143
)
143144

145+
@property
146+
def name(self) -> str:
147+
"""Return the registered name of this UDF."""
148+
return self._name
149+
144150
def __repr__(self) -> str:
145151
"""Print a string representation of the Scalar UDF."""
146152
return self._udf.__repr__()
@@ -394,6 +400,7 @@ def __init__(
394400
See :py:func:`udaf` for a convenience function and argument
395401
descriptions.
396402
"""
403+
self._name = name
397404
if hasattr(accumulator, "__datafusion_aggregate_udf__"):
398405
self._udaf = df_internal.AggregateUDF.from_pycapsule(accumulator)
399406
return
@@ -418,6 +425,11 @@ def __init__(
418425
str(volatility),
419426
)
420427

428+
@property
429+
def name(self) -> str:
430+
"""Return the registered name of this UDAF."""
431+
return self._name
432+
421433
def __repr__(self) -> str:
422434
"""Print a string representation of the Aggregate UDF."""
423435
return self._udaf.__repr__()
@@ -821,13 +833,19 @@ def __init__(
821833
See :py:func:`udwf` for a convenience function and argument
822834
descriptions.
823835
"""
836+
self._name = name
824837
if hasattr(func, "__datafusion_window_udf__"):
825838
self._udwf = df_internal.WindowUDF.from_pycapsule(func)
826839
return
827840
self._udwf = df_internal.WindowUDF(
828841
name, func, input_types, return_type, str(volatility)
829842
)
830843

844+
@property
845+
def name(self) -> str:
846+
"""Return the registered name of this UDWF."""
847+
return self._name
848+
831849
def __repr__(self) -> str:
832850
"""Print a string representation of the Window UDF."""
833851
return self._udwf.__repr__()

0 commit comments

Comments
 (0)