|
| 1 | +.. Licensed to the Apache Software Foundation (ASF) under one |
| 2 | +.. or more contributor license agreements. See the NOTICE file |
| 3 | +.. distributed with this work for additional information |
| 4 | +.. regarding copyright ownership. The ASF licenses this file |
| 5 | +.. to you under the Apache License, Version 2.0 (the |
| 6 | +.. "License"); you may not use this file except in compliance |
| 7 | +.. with the License. You may obtain a copy of the License at |
| 8 | +
|
| 9 | +.. http://www.apache.org/licenses/LICENSE-2.0 |
| 10 | +
|
| 11 | +.. Unless required by applicable law or agreed to in writing, |
| 12 | +.. software distributed under the License is distributed on an |
| 13 | +.. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| 14 | +.. KIND, either express or implied. See the License for the |
| 15 | +.. specific language governing permissions and limitations |
| 16 | +.. under the License. |
| 17 | +
|
| 18 | +Distributing DataFusion work |
| 19 | +============================ |
| 20 | + |
| 21 | +Splitting a DataFusion workload across multiple processes — for |
| 22 | +throughput, isolation, or to use a worker pool — comes in a few |
| 23 | +different shapes depending on what is being split. |
| 24 | + |
| 25 | +* **Expression-level distribution** ✅ *supported today*. The driver |
| 26 | + builds a DataFusion :py:class:`~datafusion.Expr`, sends it to |
| 27 | + worker processes, and each worker evaluates the expression against |
| 28 | + its own slice of data. Suits embarrassingly-parallel workloads |
| 29 | + where the driver decides up front how to partition. |
| 30 | +* **Query-level distribution via datafusion-distributed** 🚧 *work in |
| 31 | + progress upstream*. A single logical / physical plan is split into |
| 32 | + stages and run across worker nodes. The driver writes one SQL or |
| 33 | + DataFrame query; the runtime decides partitioning. |
| 34 | +* **Query-level distribution via Apache Ballista** 🚧 *work in |
| 35 | + progress upstream*. Similar query-level model, with a more |
| 36 | + cluster-management-oriented runtime. |
| 37 | + |
| 38 | +Only the first option is ready for use from datafusion-python today. |
| 39 | +The other two are documented below so the surrounding story is in |
| 40 | +one place; integration details will land here as those projects |
| 41 | +become usable from datafusion-python. |
| 42 | + |
| 43 | +Expression-level distribution |
| 44 | +----------------------------- |
| 45 | + |
| 46 | +DataFusion expressions support distribution directly: pass one to a |
| 47 | +worker process and Python's standard |
| 48 | +`pickle <https://docs.python.org/3/library/pickle.html>`_ machinery |
| 49 | +serializes it transparently — the same machinery |
| 50 | +:py:meth:`multiprocessing.pool.Pool.map`, Ray's ``@ray.remote``, and |
| 51 | +similar libraries already use to ship function arguments. Python UDFs |
| 52 | +— scalar, aggregate, and window — travel inside the serialized |
| 53 | +expression; the receiver does not need to pre-register them. |
| 54 | + |
| 55 | +Basic worker-pool example |
| 56 | +~~~~~~~~~~~~~~~~~~~~~~~~~ |
| 57 | + |
| 58 | +.. code-block:: python |
| 59 | +
|
| 60 | + import multiprocessing as mp |
| 61 | +
|
| 62 | + import pyarrow as pa |
| 63 | + from datafusion import SessionContext, col, udf |
| 64 | +
|
| 65 | +
|
| 66 | + def evaluate(expr, batch): |
| 67 | + # `expr` arrived here via the pool's automatic pickling — |
| 68 | + # no manual serialization needed in user code. |
| 69 | + ctx = SessionContext() |
| 70 | + df = ctx.from_pydict({"a": batch}) |
| 71 | + return df.with_column("result", expr).select("result").to_pydict()["result"] |
| 72 | +
|
| 73 | +
|
| 74 | + if __name__ == "__main__": |
| 75 | + double = udf( |
| 76 | + lambda arr: pa.array([(v.as_py() or 0) * 2 for v in arr]), |
| 77 | + [pa.int64()], pa.int64(), volatility="immutable", name="double", |
| 78 | + ) |
| 79 | + expr = double(col("a")) |
| 80 | +
|
| 81 | + mp_ctx = mp.get_context("forkserver") |
| 82 | + with mp_ctx.Pool(processes=4) as pool: |
| 83 | + results = pool.starmap( |
| 84 | + evaluate, |
| 85 | + [(expr, [1, 2, 3]), (expr, [10, 20, 30])], |
| 86 | + ) |
| 87 | + print(results) # [[2, 4, 6], [20, 40, 60]] |
| 88 | +
|
| 89 | +
|
| 90 | +What travels with the expression |
| 91 | +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
| 92 | + |
| 93 | +* **Built-in functions** (``abs``, ``length``, arithmetic, comparisons, |
| 94 | + etc.) — fully portable. Worker needs nothing pre-registered. |
| 95 | +* **Python UDFs** — fully portable. The callable, its signature, and |
| 96 | + any state captured in closures travel inside the serialized |
| 97 | + expression and are reconstructed on the worker automatically. |
| 98 | + Applies equally to: |
| 99 | + |
| 100 | + * **scalar UDFs** (:py:func:`datafusion.udf`) |
| 101 | + * **aggregate UDFs** (:py:func:`datafusion.udaf`) |
| 102 | + * **window UDFs** (:py:func:`datafusion.udwf`) |
| 103 | +* **UDFs imported via the FFI capsule protocol** — travel **by name |
| 104 | + only**. The worker must already have a matching registration on its |
| 105 | + :py:class:`SessionContext`. Without that registration, evaluation |
| 106 | + raises an error. |
| 107 | + |
| 108 | +Registering shared UDFs on workers |
| 109 | +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
| 110 | + |
| 111 | +When an expression references an FFI capsule UDF (or any UDF the |
| 112 | +worker must resolve from its registered functions), set up the |
| 113 | +worker's :py:class:`SessionContext` once per process and install it |
| 114 | +as the *worker context*: |
| 115 | + |
| 116 | +.. code-block:: python |
| 117 | +
|
| 118 | + from datafusion import SessionContext |
| 119 | + from datafusion.ipc import set_worker_ctx |
| 120 | +
|
| 121 | +
|
| 122 | + def init_worker(): |
| 123 | + ctx = SessionContext() |
| 124 | + ctx.register_udaf(my_ffi_aggregate) |
| 125 | + set_worker_ctx(ctx) |
| 126 | +
|
| 127 | +
|
| 128 | + with mp.get_context("forkserver").Pool( |
| 129 | + processes=4, initializer=init_worker |
| 130 | + ) as pool: |
| 131 | + ... |
| 132 | +
|
| 133 | +Inside a worker, expressions arriving from the driver resolve their |
| 134 | +by-name references against the installed worker context. If no worker |
| 135 | +context is installed, a fresh empty :py:class:`SessionContext` is |
| 136 | +used — fine for expressions that only reference built-ins and Python |
| 137 | +UDFs, but FFI-capsule-backed registrations will fail to resolve. |
| 138 | + |
| 139 | +Python 3.14 default change |
| 140 | +~~~~~~~~~~~~~~~~~~~~~~~~~~ |
| 141 | + |
| 142 | +Python 3.14 changed the POSIX default start method for |
| 143 | +:py:mod:`multiprocessing` from ``fork`` to ``forkserver``. With |
| 144 | +``fork``, any state set in the parent was visible in workers via |
| 145 | +copy-on-write; with ``forkserver`` and ``spawn`` it is not. The |
| 146 | +:py:func:`~datafusion.ipc.set_worker_ctx` pattern works on every |
| 147 | +start method — prefer it over relying on inherited state. |
| 148 | + |
| 149 | +Practical considerations |
| 150 | +~~~~~~~~~~~~~~~~~~~~~~~~ |
| 151 | + |
| 152 | +* **Serialized size scales with what travels inline.** A serialized |
| 153 | + expression of just built-ins is small (tens of bytes). An |
| 154 | + expression carrying a Python UDF is hundreds of bytes (the callable |
| 155 | + and its signature). When the same UDF is shipped many times, |
| 156 | + registering an equivalent FFI-capsule UDF on each worker via |
| 157 | + :py:func:`~datafusion.ipc.set_worker_ctx` and referring to it by |
| 158 | + name cuts the per-trip overhead. |
| 159 | +* **Closure capture.** When a Python UDF closes over surrounding |
| 160 | + state — local variables, module-level objects, file paths — that |
| 161 | + state is captured at serialization time. Surprises are possible if |
| 162 | + the captured state is large, mutable, or not portable to the |
| 163 | + worker's environment. |
| 164 | + |
| 165 | +Security |
| 166 | +~~~~~~~~ |
| 167 | + |
| 168 | +.. warning:: |
| 169 | + |
| 170 | + Reconstructing an expression containing a Python UDF executes |
| 171 | + arbitrary Python code on the receiver — pickle is doing the work |
| 172 | + under the hood and pickle is unsafe on untrusted input. Only |
| 173 | + accept expressions from trusted sources. For untrusted-source |
| 174 | + workflows, restrict senders to built-in functions and |
| 175 | + pre-registered Rust-side UDFs. |
| 176 | + |
| 177 | +Query-level distribution via datafusion-distributed |
| 178 | +--------------------------------------------------- |
| 179 | + |
| 180 | +🚧 *Work in progress upstream — not yet usable from datafusion-python.* |
| 181 | + |
| 182 | +`datafusion-distributed <https://github.com/apache/datafusion-distributed>`_ |
| 183 | +splits a single physical plan into stages and runs each stage on a |
| 184 | +different worker node. The driver writes a SQL or DataFrame query |
| 185 | +once; the runtime handles partitioning, shuffles, and reassembly. |
| 186 | + |
| 187 | +A datafusion-python integration is in development. This section will |
| 188 | +document the integration once it lands. In the meantime, the |
| 189 | +expression-level approach above covers most use cases that do not |
| 190 | +require automatic plan partitioning. |
| 191 | + |
| 192 | +Query-level distribution via Apache Ballista |
| 193 | +-------------------------------------------- |
| 194 | + |
| 195 | +🚧 *Work in progress upstream — not yet usable from datafusion-python.* |
| 196 | + |
| 197 | +`Apache Ballista <https://github.com/apache/datafusion-ballista>`_ |
| 198 | +provides distributed query execution on top of DataFusion with a |
| 199 | +scheduler / executor model better suited to long-lived cluster |
| 200 | +deployments. A datafusion-python integration is on the roadmap; this |
| 201 | +section will fill in once the integration is usable. |
| 202 | + |
| 203 | +See also |
| 204 | +-------- |
| 205 | + |
| 206 | +* :py:mod:`datafusion.ipc` — worker context API. |
| 207 | +* ``examples/ray_pickle_expr.py`` — runnable Ray actor example. |
0 commit comments