Skip to content

Commit ad0f956

Browse files
timsaucerclaude
andcommitted
docs(ray-example): drop unnecessary UDF registration
The actor was calling `ctx.register_udf(...)` and `set_worker_ctx(ctx)` to make the inbound expression's UDF resolvable on the worker. With Python scalar/aggregate/window UDFs now traveling inside the serialized expression, neither call is necessary — the actor just needs a `SessionContext` to evaluate against. Also drops the parallel `sender.register_udf(...)` in the driver; an expression built with a `udf(...)` callable carries its own reference and does not require the UDF to be registered on the driver session. Result: each actor is a few lines (one `SessionContext`, one `evaluate` method) — what the inline-UDF story is actually trying to demonstrate. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent d7a1ff4 commit ad0f956

1 file changed

Lines changed: 8 additions & 18 deletions

File tree

examples/ray_pickle_expr.py

Lines changed: 8 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,9 @@
1717

1818
"""Distribute DataFusion expressions to Ray actors.
1919
20-
Build an expression in the driver, ship it to a pool of Ray actors, and have
21-
each actor evaluate it against its own slice of data. Each actor sets up
22-
its own :class:`SessionContext` once in `__init__` and registers any UDFs
23-
it needs to resolve by name. Python scalar UDFs travel with the shipped
24-
expression and need no actor-side pre-registration.
20+
Build an expression in the driver, ship it to a pool of Ray actors, and
21+
have each actor evaluate it against its own slice of data. Python UDFs
22+
travel with the shipped expression — no actor-side registration needed.
2523
2624
Prerequisites:
2725
pip install ray
@@ -33,11 +31,10 @@
3331
import pyarrow as pa
3432
import ray
3533
from datafusion import Expr, SessionContext, col, lit, udf
36-
from datafusion.ipc import set_worker_ctx
3734

3835

3936
def _build_double_udf():
40-
"""Return the demo UDF used by the actors."""
37+
"""Return the demo UDF used by the driver."""
4138
return udf(
4239
lambda arr: pa.array([(v.as_py() or 0) * 2 for v in arr]),
4340
[pa.int64()],
@@ -52,18 +49,13 @@ class DataFusionWorker:
5249
"""A Ray actor with a private :class:`SessionContext`."""
5350

5451
def __init__(self) -> None:
55-
ctx = SessionContext()
56-
ctx.register_udf(_build_double_udf())
57-
# Install the actor's SessionContext as its worker context;
58-
# expressions reconstructed in this actor will resolve their
59-
# by-name references against it.
60-
set_worker_ctx(ctx)
61-
self._ctx = ctx
52+
self._ctx = SessionContext()
6253

6354
def evaluate(self, expr: Expr, batch_pylist: list[int]) -> list[int]:
6455
"""Run the expression against an in-memory batch."""
65-
# `expr` arrived here via Ray's automatic argument serialization —
66-
# no manual pickle handling needed in user code.
56+
# `expr` arrived here via Ray's automatic argument serialization;
57+
# the Python UDF inside it was reconstructed from the bytes — no
58+
# pre-registration on this actor required.
6759
df = self._ctx.from_pydict({"a": batch_pylist})
6860
out = df.with_column("result", expr).select("result")
6961
return out.to_pydict()["result"]
@@ -72,8 +64,6 @@ def evaluate(self, expr: Expr, batch_pylist: list[int]) -> list[int]:
7264
def main() -> None:
7365
ray.init(ignore_reinit_error=True)
7466

75-
sender = SessionContext()
76-
sender.register_udf(_build_double_udf())
7767
expr = _build_double_udf()(col("a")) + lit(1)
7868

7969
workers = [DataFusionWorker.remote() for _ in range(2)]

0 commit comments

Comments
 (0)