Skip to content

Commit ca77810

Browse files
timsaucerclaude
andcommitted
docs: drop manual pickle.dumps/loads from worker examples
The Pool / Ray-actor examples called `pickle.dumps` on the sender and `pickle.loads` on the worker explicitly. That's not what real user code looks like — `multiprocessing.Pool.starmap`, Ray's `@ray.remote`, and similar frameworks serialize their function arguments automatically. Showing the manual wrapping makes the API look more involved than it is and obscures the point: users hand a DataFusion `Expr` to their distribution framework like any other Python object, and it Just Works. Rewrites: * User guide worker-pool example switches from `pool.map(evaluate, [(blob, batch), ...])` (where `blob = pickle.dumps(expr)`) to `pool.starmap(evaluate, [(expr, batch), ...])`. `evaluate(expr, batch)` receives the reconstructed expression directly. * Ray example drops the `pickle.dumps(expr)` / `pickle.loads(blob)` pair; `evaluate(expr, batch)` takes a typed `Expr`. Drops the unused `pickle` import. * Worker-context narrative updated: "expressions reconstructed by pickle.loads" -> "expressions arriving from the driver". * Security warning reworded to mention pickle as the underlying mechanism while still framing the contract in user terms (only accept expressions from trusted sources). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 9b4bbd6 commit ca77810

2 files changed

Lines changed: 27 additions & 27 deletions

File tree

docs/source/user-guide/io/distributing_expressions.rst

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -24,27 +24,30 @@ worker processes (``multiprocessing.Pool``, a Ray actor pool, or any other
2424
framework with a per-worker initialization hook), and have each worker
2525
evaluate the expression against its own slice of data.
2626

27-
DataFusion expressions support this directly: they can be sent through
28-
Python's standard `pickle <https://docs.python.org/3/library/pickle.html>`_
29-
module like any other Python object. Python UDFs — scalar, aggregate, and
30-
window — travel inside the pickled bytes; the receiver does not need to
31-
pre-register them.
27+
DataFusion expressions support this directly: pass one to a worker
28+
process and Python's standard
29+
`pickle <https://docs.python.org/3/library/pickle.html>`_ machinery
30+
serializes it transparently — the same machinery
31+
:py:meth:`multiprocessing.pool.Pool.map`, Ray's
32+
``@ray.remote``, and similar libraries already use to ship function
33+
arguments. Python UDFs — scalar, aggregate, and window — travel inside
34+
the serialized expression; the receiver does not need to pre-register
35+
them.
3236

3337
Basic worker-pool example
3438
-------------------------
3539

3640
.. code-block:: python
3741
3842
import multiprocessing as mp
39-
import pickle
4043
4144
import pyarrow as pa
4245
from datafusion import SessionContext, col, udf
4346
4447
45-
def evaluate(blob_and_batch):
46-
blob, batch = blob_and_batch
47-
expr = pickle.loads(blob) # Python UDFs travel inside the bytes.
48+
def evaluate(expr, batch):
49+
# `expr` arrived here via the pool's automatic pickling —
50+
# no manual serialization needed in user code.
4851
ctx = SessionContext()
4952
df = ctx.from_pydict({"a": batch})
5053
return df.with_column("result", expr).select("result").to_pydict()["result"]
@@ -55,13 +58,13 @@ Basic worker-pool example
5558
lambda arr: pa.array([(v.as_py() or 0) * 2 for v in arr]),
5659
[pa.int64()], pa.int64(), volatility="immutable", name="double",
5760
)
58-
blob = pickle.dumps(double(col("a")))
61+
expr = double(col("a"))
5962
6063
mp_ctx = mp.get_context("forkserver")
6164
with mp_ctx.Pool(processes=4) as pool:
62-
results = pool.map(
65+
results = pool.starmap(
6366
evaluate,
64-
[(blob, [1, 2, 3]), (blob, [10, 20, 30])],
67+
[(expr, [1, 2, 3]), (expr, [10, 20, 30])],
6568
)
6669
print(results) # [[2, 4, 6], [20, 40, 60]]
6770
@@ -108,8 +111,8 @@ must resolve from its registered functions), set up the worker's
108111
) as pool:
109112
...
110113
111-
Inside a worker, expressions reconstructed by :py:func:`pickle.loads` resolve
112-
their by-name references against the installed worker context. If no worker
114+
Inside a worker, expressions arriving from the driver resolve their
115+
by-name references against the installed worker context. If no worker
113116
context is installed, a fresh empty :py:class:`SessionContext` is used —
114117
fine for expressions that only reference built-ins and Python UDFs, but
115118
FFI-capsule-backed registrations will fail to resolve.
@@ -144,10 +147,10 @@ Security
144147
.. warning::
145148

146149
Reconstructing an expression containing a Python UDF executes arbitrary
147-
Python code on the receiver. Only :py:func:`pickle.loads` expressions
148-
from trusted sources. For untrusted-source workflows, restrict senders
149-
to built-in functions and pre-registered Rust-side UDFs, and never feed
150-
externally supplied bytes through :py:func:`pickle.loads`.
150+
Python code on the receiverpickle is doing the work under the hood
151+
and pickle is unsafe on untrusted input. Only accept expressions from
152+
trusted sources. For untrusted-source workflows, restrict senders to
153+
built-in functions and pre-registered Rust-side UDFs.
151154

152155
See also
153156
--------

examples/ray_pickle_expr.py

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,9 @@
3030
python examples/ray_pickle_expr.py
3131
"""
3232

33-
import pickle
34-
3533
import pyarrow as pa
3634
import ray
37-
from datafusion import SessionContext, col, lit, udf
35+
from datafusion import Expr, SessionContext, col, lit, udf
3836
from datafusion.ipc import set_worker_ctx
3937

4038

@@ -62,9 +60,10 @@ def __init__(self) -> None:
6260
set_worker_ctx(ctx)
6361
self._ctx = ctx
6462

65-
def evaluate(self, expr_blob: bytes, batch_pylist: list[int]) -> list[int]:
66-
"""Unpickle an Expr, run it over an in-memory batch, return results."""
67-
expr = pickle.loads(expr_blob)
63+
def evaluate(self, expr: Expr, batch_pylist: list[int]) -> list[int]:
64+
"""Run the expression against an in-memory batch."""
65+
# `expr` arrived here via Ray's automatic argument serialization —
66+
# no manual pickle handling needed in user code.
6867
df = self._ctx.from_pydict({"a": batch_pylist})
6968
out = df.with_column("result", expr).select("result")
7069
return out.to_pydict()["result"]
@@ -76,13 +75,11 @@ def main() -> None:
7675
sender = SessionContext()
7776
sender.register_udf(_build_double_udf())
7877
expr = _build_double_udf()(col("a")) + lit(1)
79-
blob = pickle.dumps(expr)
80-
print(f"pickled expression: {len(blob)} bytes")
8178

8279
workers = [DataFusionWorker.remote() for _ in range(2)]
8380
batches = [[1, 2, 3], [10, 20, 30], [100, 200, 300]]
8481
futures = [
85-
workers[i % len(workers)].evaluate.remote(blob, batch)
82+
workers[i % len(workers)].evaluate.remote(expr, batch)
8683
for i, batch in enumerate(batches)
8784
]
8885
for batch, result in zip(batches, ray.get(futures), strict=True):

0 commit comments

Comments
 (0)