Skip to content

Commit f7c8c6c

Browse files
committed
Additional debugging for mp tests in CI
1 parent d00c619 commit f7c8c6c

2 files changed

Lines changed: 100 additions & 3 deletions

File tree

python/tests/_pickle_multiprocessing_helpers.py

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,71 @@ def diag_init():
122122
_diag("worker init: ready for tasks")
123123

124124

125+
def _read_text(path: str) -> str:
126+
"""Read a /proc file; return ``"<unreadable>"`` if not accessible."""
127+
try:
128+
return Path(path).read_text(encoding="utf-8", errors="replace").strip()
129+
except OSError:
130+
return "<unreadable>"
131+
132+
133+
def _descendants(root_pid: int) -> list[int]:
134+
"""Return ``root_pid`` plus all descendant pids via /proc/<pid>/task/.../children.
135+
136+
Linux-only; returns ``[root_pid]`` on platforms without ``/proc``.
137+
"""
138+
out: list[int] = [root_pid]
139+
if not Path("/proc").is_dir():
140+
return out
141+
queue = [root_pid]
142+
while queue:
143+
pid = queue.pop()
144+
try:
145+
task_dir = Path(f"/proc/{pid}/task")
146+
if not task_dir.is_dir():
147+
continue
148+
for tdir in task_dir.iterdir():
149+
children_file = tdir / "children"
150+
try:
151+
children = children_file.read_text(encoding="utf-8").split()
152+
except OSError:
153+
continue
154+
for child in children:
155+
try:
156+
cpid = int(child)
157+
except ValueError:
158+
continue
159+
out.append(cpid)
160+
queue.append(cpid)
161+
except OSError:
162+
continue
163+
return out
164+
165+
166+
def snapshot_processes(label: str, root_pid: int | None = None) -> None:
167+
"""Dump a process-state snapshot to the diagnostic log.
168+
169+
For each descendant of ``root_pid`` (default: current process), record
170+
cmdline, status (``R``/``S``/``D``), wchan (kernel function the task
171+
is blocked in), and kernel stack. Use this to localize a worker hang:
172+
a wchan of ``do_futex`` points at a lock; ``poll_schedule_timeout``
173+
points at a blocking I/O wait; ``do_select`` at multiprocessing's
174+
pipe read.
175+
"""
176+
pid = root_pid if root_pid is not None else os.getpid()
177+
_diag(f"snapshot[{label}] root_pid={pid}")
178+
for cpid in _descendants(pid):
179+
cmd = _read_text(f"/proc/{cpid}/cmdline").replace("\x00", " ").strip()
180+
stat = _read_text(f"/proc/{cpid}/status").splitlines()
181+
state_line = next((s for s in stat if s.startswith("State:")), "State: ?")
182+
wchan = _read_text(f"/proc/{cpid}/wchan")
183+
stack = _read_text(f"/proc/{cpid}/stack")
184+
_diag(f"snapshot[{label}] pid={cpid} {state_line} wchan={wchan} cmd={cmd!r}")
185+
if stack and stack != "<unreadable>":
186+
for line in stack.splitlines()[:10]:
187+
_diag(f"snapshot[{label}] pid={cpid} stack: {line}")
188+
189+
125190
def unpickle_and_describe(blob: bytes) -> str:
126191
"""Unpickle a proto-bytes blob and return its canonical name."""
127192
import pickle

python/tests/test_pickle_multiprocessing.py

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,17 +27,40 @@
2727

2828
from __future__ import annotations
2929

30+
import contextlib
3031
import functools
3132
import multiprocessing as mp
3233
import pickle
3334
import sys
35+
import threading
3436

3537
import pytest
3638
from datafusion import col, lit
3739

3840
from . import _pickle_multiprocessing_helpers as helpers
3941

4042

43+
@contextlib.contextmanager
44+
def _snapshot_on_hang(label: str, fire_after_seconds: float = 30.0):
45+
"""Schedule a process-state snapshot ``fire_after_seconds`` from now.
46+
47+
Cancelled if the ``with`` block exits before then. Used to capture
48+
worker state mid-hang — fork tests return in well under the delay,
49+
so the timer only fires when something is actually stuck.
50+
"""
51+
timer = threading.Timer(
52+
fire_after_seconds,
53+
helpers.snapshot_processes,
54+
args=(label,),
55+
)
56+
timer.daemon = True
57+
timer.start()
58+
try:
59+
yield
60+
finally:
61+
timer.cancel()
62+
63+
4164
@functools.cache
4265
def _multiprocessing_available() -> tuple[bool, str]:
4366
"""Return (available, reason). Some sandboxed environments deny semaphore
@@ -86,7 +109,10 @@ def test_builtin_pickle_via_pool(start_method):
86109

87110
ctx = mp.get_context(start_method)
88111
helpers._diag(f"test_builtin_pickle_via_pool[{start_method}]: creating Pool")
89-
with ctx.Pool(processes=2, initializer=helpers.diag_init) as pool:
112+
with (
113+
ctx.Pool(processes=2, initializer=helpers.diag_init) as pool,
114+
_snapshot_on_hang(f"builtin[{start_method}]"),
115+
):
90116
helpers._diag(f"test_builtin_pickle_via_pool[{start_method}]: pool ready, map")
91117
results = pool.map(helpers.unpickle_and_describe, [blob, blob, blob])
92118
helpers._diag(f"test_builtin_pickle_via_pool[{start_method}]: pool closed")
@@ -109,7 +135,10 @@ def test_udf_pickle_self_contained(start_method):
109135

110136
ctx = mp.get_context(start_method)
111137
helpers._diag(f"test_udf_pickle_self_contained[{start_method}]: creating Pool")
112-
with ctx.Pool(processes=2, initializer=helpers.diag_init) as pool:
138+
with (
139+
ctx.Pool(processes=2, initializer=helpers.diag_init) as pool,
140+
_snapshot_on_hang(f"udf[{start_method}]"),
141+
):
113142
helpers._diag(
114143
f"test_udf_pickle_self_contained[{start_method}]: pool ready, starmap"
115144
)
@@ -134,7 +163,10 @@ def test_closure_capturing_udf_via_pool(start_method):
134163

135164
ctx = mp.get_context(start_method)
136165
helpers._diag(f"test_closure_capturing_udf_via_pool[{start_method}]: creating Pool")
137-
with ctx.Pool(processes=2, initializer=helpers.diag_init) as pool:
166+
with (
167+
ctx.Pool(processes=2, initializer=helpers.diag_init) as pool,
168+
_snapshot_on_hang(f"closure[{start_method}]"),
169+
):
138170
helpers._diag(
139171
f"test_closure_capturing_udf_via_pool[{start_method}]: pool ready, apply"
140172
)

0 commit comments

Comments
 (0)