Skip to content

Commit d00c619

Browse files
committed
Add testing for CI failure
1 parent e8413dd commit d00c619

3 files changed

Lines changed: 120 additions & 13 deletions

File tree

.github/workflows/test.yml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,20 @@ jobs:
102102
git submodule update --init
103103
uv run --no-project pytest -v --import-mode=importlib
104104
105+
# Always dump the multiprocessing worker diagnostic log, even on
106+
# job timeout, so we can see where forkserver/spawn workers stalled.
107+
# See python/tests/_pickle_multiprocessing_helpers.py for what each
108+
# line means. Safe to remove once forkserver/spawn hang is resolved.
109+
- name: Dump multiprocessing diagnostic log
110+
if: always()
111+
run: |
112+
echo "=== /tmp/df_mp_worker_diag.log ==="
113+
if [ -f /tmp/df_mp_worker_diag.log ]; then
114+
cat /tmp/df_mp_worker_diag.log
115+
else
116+
echo "(no diagnostic log produced)"
117+
fi
118+
105119
- name: FFI unit tests
106120
run: |
107121
cd examples/datafusion-ffi-example

python/tests/_pickle_multiprocessing_helpers.py

Lines changed: 87 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,56 @@
2323

2424
from __future__ import annotations
2525

26-
import pyarrow as pa
27-
from datafusion import SessionContext, udf
28-
from datafusion.ipc import clear_worker_ctx, set_worker_ctx
26+
import os
27+
import tempfile
28+
import time
29+
import traceback
30+
from pathlib import Path
31+
32+
# Diagnostic log path for multiprocessing worker timing.
33+
# Workers write here so a CI-side `cat` after a job timeout can show
34+
# where each worker stalled (e.g. inside `import datafusion`). Lives in
35+
# the system temp dir so it persists across Pool worker exits and is
36+
# readable by a follow-up workflow step. Override via env var when
37+
# debugging locally.
38+
_DIAG_LOG = Path(
39+
os.environ.get(
40+
"DF_MP_DIAG_LOG",
41+
str(Path(tempfile.gettempdir()) / "df_mp_worker_diag.log"),
42+
)
43+
)
44+
45+
46+
def _diag(event: str) -> None:
47+
"""Append a diagnostic line: timestamp, pid, parent pid, event.
48+
49+
Opens / flushes / closes per call so a hang mid-import still leaves
50+
a partial trail on disk. Parent pid distinguishes forkserver-born
51+
workers (parent = forkserver) from spawn-born workers (parent =
52+
main pytest process).
53+
"""
54+
try:
55+
with _DIAG_LOG.open("a", encoding="utf-8") as fh:
56+
fh.write(
57+
f"{time.time():.3f} pid={os.getpid()} ppid={os.getppid()} {event}\n"
58+
)
59+
fh.flush()
60+
os.fsync(fh.fileno())
61+
except OSError:
62+
# Best-effort diagnostic; never let logging itself break a test.
63+
pass
64+
65+
66+
_diag("helpers module: starting imports")
67+
import pyarrow as pa # noqa: E402
68+
69+
_diag("helpers module: pyarrow imported")
70+
from datafusion import SessionContext, udf # noqa: E402
71+
72+
_diag("helpers module: datafusion imported")
73+
from datafusion.ipc import clear_worker_ctx, set_worker_ctx # noqa: E402
74+
75+
_diag("helpers module: all imports complete")
2976

3077

3178
def make_double_udf():
@@ -65,12 +112,31 @@ def init_worker_clear():
65112
clear_worker_ctx()
66113

67114

115+
def diag_init():
116+
"""Pool initializer used by the diagnostic-instrumented tests.
117+
118+
Logs that a worker process is alive and has finished its module
119+
imports. If this line never appears for a given pid, the hang is
120+
inside import / Rust extension init (before any task runs).
121+
"""
122+
_diag("worker init: ready for tasks")
123+
124+
68125
def unpickle_and_describe(blob: bytes) -> str:
69126
"""Unpickle a proto-bytes blob and return its canonical name."""
70127
import pickle
71128

72-
expr = pickle.loads(blob) # noqa: S301
73-
return expr.canonical_name()
129+
_diag("unpickle_and_describe: enter")
130+
try:
131+
expr = pickle.loads(blob) # noqa: S301
132+
_diag("unpickle_and_describe: pickle.loads done")
133+
name = expr.canonical_name()
134+
except BaseException as exc:
135+
_diag(f"unpickle_and_describe: raised {type(exc).__name__}: {exc}")
136+
_diag(traceback.format_exc())
137+
raise
138+
_diag(f"unpickle_and_describe: returning name={name!r}")
139+
return name
74140

75141

76142
def unpickle_and_evaluate(blob: bytes, batch: list[int]) -> list[int]:
@@ -82,8 +148,19 @@ def unpickle_and_evaluate(blob: bytes, batch: list[int]) -> list[int]:
82148
"""
83149
import pickle
84150

85-
expr = pickle.loads(blob) # noqa: S301
86-
ctx = SessionContext()
87-
df = ctx.from_pydict({"a": batch})
88-
out = df.with_column("result", expr).select("result")
89-
return out.to_pydict()["result"]
151+
_diag(f"unpickle_and_evaluate: enter batch_len={len(batch)}")
152+
try:
153+
expr = pickle.loads(blob) # noqa: S301
154+
_diag("unpickle_and_evaluate: pickle.loads done")
155+
ctx = SessionContext()
156+
_diag("unpickle_and_evaluate: SessionContext built")
157+
df = ctx.from_pydict({"a": batch})
158+
out = df.with_column("result", expr).select("result")
159+
_diag("unpickle_and_evaluate: plan built, collecting")
160+
result = out.to_pydict()["result"]
161+
except BaseException as exc:
162+
_diag(f"unpickle_and_evaluate: raised {type(exc).__name__}: {exc}")
163+
_diag(traceback.format_exc())
164+
raise
165+
_diag(f"unpickle_and_evaluate: returning len={len(result)}")
166+
return result

python/tests/test_pickle_multiprocessing.py

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,12 +80,16 @@ def _skip_if_multiprocessing_unavailable():
8080
@pytest.mark.timeout(120)
8181
def test_builtin_pickle_via_pool(start_method):
8282
"""Built-in expressions round-trip in every start method."""
83+
helpers._diag(f"test_builtin_pickle_via_pool[{start_method}]: enter")
8384
expr = col("a") + lit(1)
8485
blob = pickle.dumps(expr)
8586

8687
ctx = mp.get_context(start_method)
87-
with ctx.Pool(processes=2) as pool:
88+
helpers._diag(f"test_builtin_pickle_via_pool[{start_method}]: creating Pool")
89+
with ctx.Pool(processes=2, initializer=helpers.diag_init) as pool:
90+
helpers._diag(f"test_builtin_pickle_via_pool[{start_method}]: pool ready, map")
8891
results = pool.map(helpers.unpickle_and_describe, [blob, blob, blob])
92+
helpers._diag(f"test_builtin_pickle_via_pool[{start_method}]: pool closed")
8993

9094
assert all(r == expr.canonical_name() for r in results)
9195

@@ -98,16 +102,22 @@ def test_udf_pickle_self_contained(start_method):
98102
Workers start with no UDF registered. The Rust-side ``PythonUDFCodec``
99103
reconstructs the UDF from bytes embedded in the pickle blob.
100104
"""
105+
helpers._diag(f"test_udf_pickle_self_contained[{start_method}]: enter")
101106
udf_obj = helpers.make_double_udf()
102107
expr = udf_obj(col("a"))
103108
blob = pickle.dumps(expr)
104109

105110
ctx = mp.get_context(start_method)
106-
with ctx.Pool(processes=2) as pool:
111+
helpers._diag(f"test_udf_pickle_self_contained[{start_method}]: creating Pool")
112+
with ctx.Pool(processes=2, initializer=helpers.diag_init) as pool:
113+
helpers._diag(
114+
f"test_udf_pickle_self_contained[{start_method}]: pool ready, starmap"
115+
)
107116
results = pool.starmap(
108117
helpers.unpickle_and_evaluate,
109118
[(blob, [1, 2, 3]), (blob, [10, 20, 30])],
110119
)
120+
helpers._diag(f"test_udf_pickle_self_contained[{start_method}]: pool closed")
111121

112122
assert results[0] == [2, 4, 6]
113123
assert results[1] == [20, 40, 60]
@@ -117,12 +127,18 @@ def test_udf_pickle_self_contained(start_method):
117127
@pytest.mark.timeout(120)
118128
def test_closure_capturing_udf_via_pool(start_method):
119129
"""Cloudpickle preserves closure state across the codec boundary."""
130+
helpers._diag(f"test_closure_capturing_udf_via_pool[{start_method}]: enter")
120131
udf_obj = helpers.make_times_seven_udf()
121132
expr = udf_obj(col("a"))
122133
blob = pickle.dumps(expr)
123134

124135
ctx = mp.get_context(start_method)
125-
with ctx.Pool(processes=2) as pool:
136+
helpers._diag(f"test_closure_capturing_udf_via_pool[{start_method}]: creating Pool")
137+
with ctx.Pool(processes=2, initializer=helpers.diag_init) as pool:
138+
helpers._diag(
139+
f"test_closure_capturing_udf_via_pool[{start_method}]: pool ready, apply"
140+
)
126141
result = pool.apply(helpers.unpickle_and_evaluate, (blob, [1, 2, 3]))
142+
helpers._diag(f"test_closure_capturing_udf_via_pool[{start_method}]: pool closed")
127143

128144
assert result == [7, 14, 21]

0 commit comments

Comments
 (0)