Skip to content

Commit 0fc78b7

Browse files
timsaucerclaude
andcommitted
test(pickle): remove multiprocessing CI debug instrumentation
Multiprocessing forkserver/spawn hang was diagnosed and fixed: workers could not import `tests._pickle_multiprocessing_helpers` because `pytest --import-mode=importlib` does not add the test parent dir to `sys.path`. The fix (appending the parent dir to `sys.path` so it is inherited by mp workers without shadowing the installed `datafusion` wheel) is retained. This commit drops the diagnostic scaffolding that was added to identify the hang point: - `_diag` + per-import / per-task log writes to /tmp - `snapshot_processes` and the `threading.Timer` that captured worker state mid-hang - `diag_init` Pool initializer - "Dump multiprocessing diagnostic log" CI step Pre-existing infrastructure is kept: per-test `@pytest.mark.timeout(120)` (backed by `pytest-timeout` dev dep) and the job-level `timeout-minutes: 30` backstop on the test matrix. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
1 parent 050c7f2 commit 0fc78b7

3 files changed

Lines changed: 13 additions & 217 deletions

File tree

.github/workflows/test.yml

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -102,20 +102,6 @@ jobs:
102102
git submodule update --init
103103
uv run --no-project pytest -v --import-mode=importlib
104104
105-
# Always dump the multiprocessing worker diagnostic log, even on
106-
# job timeout, so we can see where forkserver/spawn workers stalled.
107-
# See python/tests/_pickle_multiprocessing_helpers.py for what each
108-
# line means. Safe to remove once forkserver/spawn hang is resolved.
109-
- name: Dump multiprocessing diagnostic log
110-
if: always()
111-
run: |
112-
echo "=== /tmp/df_mp_worker_diag.log ==="
113-
if [ -f /tmp/df_mp_worker_diag.log ]; then
114-
cat /tmp/df_mp_worker_diag.log
115-
else
116-
echo "(no diagnostic log produced)"
117-
fi
118-
119105
- name: FFI unit tests
120106
run: |
121107
cd examples/datafusion-ffi-example

python/tests/_pickle_multiprocessing_helpers.py

Lines changed: 10 additions & 152 deletions
Original file line numberDiff line numberDiff line change
@@ -23,56 +23,9 @@
2323

2424
from __future__ import annotations
2525

26-
import os
27-
import tempfile
28-
import time
29-
import traceback
30-
from pathlib import Path
31-
32-
# Diagnostic log path for multiprocessing worker timing.
33-
# Workers write here so a CI-side `cat` after a job timeout can show
34-
# where each worker stalled (e.g. inside `import datafusion`). Lives in
35-
# the system temp dir so it persists across Pool worker exits and is
36-
# readable by a follow-up workflow step. Override via env var when
37-
# debugging locally.
38-
_DIAG_LOG = Path(
39-
os.environ.get(
40-
"DF_MP_DIAG_LOG",
41-
str(Path(tempfile.gettempdir()) / "df_mp_worker_diag.log"),
42-
)
43-
)
44-
45-
46-
def _diag(event: str) -> None:
47-
"""Append a diagnostic line: timestamp, pid, parent pid, event.
48-
49-
Opens / flushes / closes per call so a hang mid-import still leaves
50-
a partial trail on disk. Parent pid distinguishes forkserver-born
51-
workers (parent = forkserver) from spawn-born workers (parent =
52-
main pytest process).
53-
"""
54-
try:
55-
with _DIAG_LOG.open("a", encoding="utf-8") as fh:
56-
fh.write(
57-
f"{time.time():.3f} pid={os.getpid()} ppid={os.getppid()} {event}\n"
58-
)
59-
fh.flush()
60-
os.fsync(fh.fileno())
61-
except OSError:
62-
# Best-effort diagnostic; never let logging itself break a test.
63-
pass
64-
65-
66-
_diag("helpers module: starting imports")
67-
import pyarrow as pa # noqa: E402
68-
69-
_diag("helpers module: pyarrow imported")
70-
from datafusion import SessionContext, udf # noqa: E402
71-
72-
_diag("helpers module: datafusion imported")
73-
from datafusion.ipc import clear_worker_ctx, set_worker_ctx # noqa: E402
74-
75-
_diag("helpers module: all imports complete")
26+
import pyarrow as pa
27+
from datafusion import SessionContext, udf
28+
from datafusion.ipc import clear_worker_ctx, set_worker_ctx
7629

7730

7831
def make_double_udf():
@@ -112,96 +65,12 @@ def init_worker_clear():
11265
clear_worker_ctx()
11366

11467

115-
def diag_init():
116-
"""Pool initializer used by the diagnostic-instrumented tests.
117-
118-
Logs that a worker process is alive and has finished its module
119-
imports. If this line never appears for a given pid, the hang is
120-
inside import / Rust extension init (before any task runs).
121-
"""
122-
_diag("worker init: ready for tasks")
123-
124-
125-
def _read_text(path: str) -> str:
126-
"""Read a /proc file; return ``"<unreadable>"`` if not accessible."""
127-
try:
128-
return Path(path).read_text(encoding="utf-8", errors="replace").strip()
129-
except OSError:
130-
return "<unreadable>"
131-
132-
133-
def _descendants(root_pid: int) -> list[int]:
134-
"""Return ``root_pid`` plus all descendant pids via /proc/<pid>/task/.../children.
135-
136-
Linux-only; returns ``[root_pid]`` on platforms without ``/proc``.
137-
"""
138-
out: list[int] = [root_pid]
139-
if not Path("/proc").is_dir():
140-
return out
141-
queue = [root_pid]
142-
while queue:
143-
pid = queue.pop()
144-
try:
145-
task_dir = Path(f"/proc/{pid}/task")
146-
if not task_dir.is_dir():
147-
continue
148-
for tdir in task_dir.iterdir():
149-
children_file = tdir / "children"
150-
try:
151-
children = children_file.read_text(encoding="utf-8").split()
152-
except OSError:
153-
continue
154-
for child in children:
155-
try:
156-
cpid = int(child)
157-
except ValueError:
158-
continue
159-
out.append(cpid)
160-
queue.append(cpid)
161-
except OSError:
162-
continue
163-
return out
164-
165-
166-
def snapshot_processes(label: str, root_pid: int | None = None) -> None:
167-
"""Dump a process-state snapshot to the diagnostic log.
168-
169-
For each descendant of ``root_pid`` (default: current process), record
170-
cmdline, status (``R``/``S``/``D``), wchan (kernel function the task
171-
is blocked in), and kernel stack. Use this to localize a worker hang:
172-
a wchan of ``do_futex`` points at a lock; ``poll_schedule_timeout``
173-
points at a blocking I/O wait; ``do_select`` at multiprocessing's
174-
pipe read.
175-
"""
176-
pid = root_pid if root_pid is not None else os.getpid()
177-
_diag(f"snapshot[{label}] root_pid={pid}")
178-
for cpid in _descendants(pid):
179-
cmd = _read_text(f"/proc/{cpid}/cmdline").replace("\x00", " ").strip()
180-
stat = _read_text(f"/proc/{cpid}/status").splitlines()
181-
state_line = next((s for s in stat if s.startswith("State:")), "State: ?")
182-
wchan = _read_text(f"/proc/{cpid}/wchan")
183-
stack = _read_text(f"/proc/{cpid}/stack")
184-
_diag(f"snapshot[{label}] pid={cpid} {state_line} wchan={wchan} cmd={cmd!r}")
185-
if stack and stack != "<unreadable>":
186-
for line in stack.splitlines()[:10]:
187-
_diag(f"snapshot[{label}] pid={cpid} stack: {line}")
188-
189-
19068
def unpickle_and_describe(blob: bytes) -> str:
19169
"""Unpickle a proto-bytes blob and return its canonical name."""
19270
import pickle
19371

194-
_diag("unpickle_and_describe: enter")
195-
try:
196-
expr = pickle.loads(blob) # noqa: S301
197-
_diag("unpickle_and_describe: pickle.loads done")
198-
name = expr.canonical_name()
199-
except BaseException as exc:
200-
_diag(f"unpickle_and_describe: raised {type(exc).__name__}: {exc}")
201-
_diag(traceback.format_exc())
202-
raise
203-
_diag(f"unpickle_and_describe: returning name={name!r}")
204-
return name
72+
expr = pickle.loads(blob) # noqa: S301
73+
return expr.canonical_name()
20574

20675

20776
def unpickle_and_evaluate(blob: bytes, batch: list[int]) -> list[int]:
@@ -213,19 +82,8 @@ def unpickle_and_evaluate(blob: bytes, batch: list[int]) -> list[int]:
21382
"""
21483
import pickle
21584

216-
_diag(f"unpickle_and_evaluate: enter batch_len={len(batch)}")
217-
try:
218-
expr = pickle.loads(blob) # noqa: S301
219-
_diag("unpickle_and_evaluate: pickle.loads done")
220-
ctx = SessionContext()
221-
_diag("unpickle_and_evaluate: SessionContext built")
222-
df = ctx.from_pydict({"a": batch})
223-
out = df.with_column("result", expr).select("result")
224-
_diag("unpickle_and_evaluate: plan built, collecting")
225-
result = out.to_pydict()["result"]
226-
except BaseException as exc:
227-
_diag(f"unpickle_and_evaluate: raised {type(exc).__name__}: {exc}")
228-
_diag(traceback.format_exc())
229-
raise
230-
_diag(f"unpickle_and_evaluate: returning len={len(result)}")
231-
return result
85+
expr = pickle.loads(blob) # noqa: S301
86+
ctx = SessionContext()
87+
df = ctx.from_pydict({"a": batch})
88+
out = df.with_column("result", expr).select("result")
89+
return out.to_pydict()["result"]

python/tests/test_pickle_multiprocessing.py

Lines changed: 3 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,10 @@
2727

2828
from __future__ import annotations
2929

30-
import contextlib
3130
import functools
3231
import multiprocessing as mp
3332
import pickle
3433
import sys
35-
import threading
3634
from pathlib import Path
3735

3836
import pytest
@@ -57,27 +55,6 @@
5755
sys.path.append(_TESTS_PARENT)
5856

5957

60-
@contextlib.contextmanager
61-
def _snapshot_on_hang(label: str, fire_after_seconds: float = 30.0):
62-
"""Schedule a process-state snapshot ``fire_after_seconds`` from now.
63-
64-
Cancelled if the ``with`` block exits before then. Used to capture
65-
worker state mid-hang — fork tests return in well under the delay,
66-
so the timer only fires when something is actually stuck.
67-
"""
68-
timer = threading.Timer(
69-
fire_after_seconds,
70-
helpers.snapshot_processes,
71-
args=(label,),
72-
)
73-
timer.daemon = True
74-
timer.start()
75-
try:
76-
yield
77-
finally:
78-
timer.cancel()
79-
80-
8158
@functools.cache
8259
def _multiprocessing_available() -> tuple[bool, str]:
8360
"""Return (available, reason). Some sandboxed environments deny semaphore
@@ -120,19 +97,12 @@ def _skip_if_multiprocessing_unavailable():
12097
@pytest.mark.timeout(120)
12198
def test_builtin_pickle_via_pool(start_method):
12299
"""Built-in expressions round-trip in every start method."""
123-
helpers._diag(f"test_builtin_pickle_via_pool[{start_method}]: enter")
124100
expr = col("a") + lit(1)
125101
blob = pickle.dumps(expr)
126102

127103
ctx = mp.get_context(start_method)
128-
helpers._diag(f"test_builtin_pickle_via_pool[{start_method}]: creating Pool")
129-
with (
130-
ctx.Pool(processes=2, initializer=helpers.diag_init) as pool,
131-
_snapshot_on_hang(f"builtin[{start_method}]"),
132-
):
133-
helpers._diag(f"test_builtin_pickle_via_pool[{start_method}]: pool ready, map")
104+
with ctx.Pool(processes=2) as pool:
134105
results = pool.map(helpers.unpickle_and_describe, [blob, blob, blob])
135-
helpers._diag(f"test_builtin_pickle_via_pool[{start_method}]: pool closed")
136106

137107
assert all(r == expr.canonical_name() for r in results)
138108

@@ -145,25 +115,16 @@ def test_udf_pickle_self_contained(start_method):
145115
Workers start with no UDF registered. The Rust-side ``PythonUDFCodec``
146116
reconstructs the UDF from bytes embedded in the pickle blob.
147117
"""
148-
helpers._diag(f"test_udf_pickle_self_contained[{start_method}]: enter")
149118
udf_obj = helpers.make_double_udf()
150119
expr = udf_obj(col("a"))
151120
blob = pickle.dumps(expr)
152121

153122
ctx = mp.get_context(start_method)
154-
helpers._diag(f"test_udf_pickle_self_contained[{start_method}]: creating Pool")
155-
with (
156-
ctx.Pool(processes=2, initializer=helpers.diag_init) as pool,
157-
_snapshot_on_hang(f"udf[{start_method}]"),
158-
):
159-
helpers._diag(
160-
f"test_udf_pickle_self_contained[{start_method}]: pool ready, starmap"
161-
)
123+
with ctx.Pool(processes=2) as pool:
162124
results = pool.starmap(
163125
helpers.unpickle_and_evaluate,
164126
[(blob, [1, 2, 3]), (blob, [10, 20, 30])],
165127
)
166-
helpers._diag(f"test_udf_pickle_self_contained[{start_method}]: pool closed")
167128

168129
assert results[0] == [2, 4, 6]
169130
assert results[1] == [20, 40, 60]
@@ -173,21 +134,12 @@ def test_udf_pickle_self_contained(start_method):
173134
@pytest.mark.timeout(120)
174135
def test_closure_capturing_udf_via_pool(start_method):
175136
"""Cloudpickle preserves closure state across the codec boundary."""
176-
helpers._diag(f"test_closure_capturing_udf_via_pool[{start_method}]: enter")
177137
udf_obj = helpers.make_times_seven_udf()
178138
expr = udf_obj(col("a"))
179139
blob = pickle.dumps(expr)
180140

181141
ctx = mp.get_context(start_method)
182-
helpers._diag(f"test_closure_capturing_udf_via_pool[{start_method}]: creating Pool")
183-
with (
184-
ctx.Pool(processes=2, initializer=helpers.diag_init) as pool,
185-
_snapshot_on_hang(f"closure[{start_method}]"),
186-
):
187-
helpers._diag(
188-
f"test_closure_capturing_udf_via_pool[{start_method}]: pool ready, apply"
189-
)
142+
with ctx.Pool(processes=2) as pool:
190143
result = pool.apply(helpers.unpickle_and_evaluate, (blob, [1, 2, 3]))
191-
helpers._diag(f"test_closure_capturing_udf_via_pool[{start_method}]: pool closed")
192144

193145
assert result == [7, 14, 21]

0 commit comments

Comments
 (0)