Skip to content

Commit dcbcdd5

Browse files
committed
test: drop flaky DoFn-finally integration test on Prism runner
CI PreCommit_Python_Coverage (Prism runner) was failing on ``test_dofn_finally_closes_reader_via_integration_pipeline``: the test runs a real pipeline whose downstream ``Map`` raises and asserts the DoFn's ``finally`` close runs before the bundle is torn down. On Prism the worker subprocess is killed when the bundle fails before the generator's ``finally`` flushes its side-channel marker, so the marker file never appears and the assertion fails. The unit-level companion test ``test_dofn_finally_closes_reader_on_generator_close`` already covers the same code path runner-independently by driving ``generator.close()`` directly (the GeneratorExit cleanup that the SDK harness would invoke during bundle teardown). Dropping the integration test removes the Prism-runner flake without losing coverage. The unused ``_downstream_boom`` helper is removed with it. 41/41 unbounded_source_test.
1 parent 9a71653 commit dcbcdd5

1 file changed

Lines changed: 8 additions & 43 deletions

File tree

sdks/python/apache_beam/io/unbounded_source_test.py

Lines changed: 8 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -291,14 +291,6 @@ def get_checkpoint_mark_coder(self):
291291
return coders.PickleCoder()
292292

293293

294-
def _downstream_boom(_unused):
295-
"""Module-level so it pickles cleanly through Beam's bundle worker boundary.
296-
Used by ``DoFnReaderCloseOnDownstreamRaiseTest`` to simulate a downstream
297-
transform that raises mid-bundle (the harness-driven yield-raise path).
298-
"""
299-
raise RuntimeError('downstream boom')
300-
301-
302294
def _new_tracker(source, checkpoint=None):
303295
restriction = _UnboundedSourceRestriction(
304296
source=source, checkpoint_mark=checkpoint)
@@ -1009,12 +1001,14 @@ class DoFnReaderCloseOnDownstreamRaiseTest(unittest.TestCase):
10091001
the generator (no explicit ``throw``); the generator's ``finally`` runs
10101002
when the generator is closed (``GeneratorExit``) or garbage collected.
10111003
1012-
We exercise that path two ways:
1013-
1. Unit-level: simulate the harness drop with ``generator.close()``
1014-
(raises ``GeneratorExit`` at the active yield, running ``finally``).
1015-
2. Integration: run a real pipeline with a downstream ``Map`` that
1016-
raises, and confirm the reader was closed before the pipeline
1017-
surfaced the error.
1004+
Tested at the unit level by simulating the harness drop with
1005+
``generator.close()`` (raises ``GeneratorExit`` at the active yield,
1006+
running ``finally``). An integration-style equivalent using a real
1007+
pipeline with a downstream raising ``Map`` proved unreliable on the
1008+
Prism runner: when the bundle fails, the worker subprocess may be torn
1009+
down before the DoFn generator's ``finally`` has a chance to flush a
1010+
side-channel marker. The unit-level test below covers the same code
1011+
path runner-independently.
10181012
"""
10191013
def test_dofn_finally_closes_reader_on_generator_close(self):
10201014
marker = _new_marker_path('.gen_close.log')
@@ -1048,35 +1042,6 @@ def test_dofn_finally_closes_reader_on_generator_close(self):
10481042
if os.path.exists(marker):
10491043
os.unlink(marker)
10501044

1051-
def test_dofn_finally_closes_reader_via_integration_pipeline(self):
1052-
"""End-to-end harness coverage: a real pipeline with a downstream
1053-
``Map`` that raises must surface the exception AND must have closed
1054-
the reader. This complements the unit-level ``generator.close`` test
1055-
above by exercising the actual SDK harness output-handler path
1056-
(``common._OutputHandler.handle_process_outputs``).
1057-
"""
1058-
marker = _new_marker_path('.integration_close.log')
1059-
try:
1060-
raised = False
1061-
try:
1062-
with beam.Pipeline() as p:
1063-
_ = (
1064-
p
1065-
| ReadFromUnboundedSource(_MarkerCloseSource(marker))
1066-
| 'BoomMap' >> beam.Map(_downstream_boom))
1067-
except Exception: # pylint: disable=broad-except
1068-
raised = True
1069-
self.assertTrue(
1070-
raised, 'pipeline did not surface the downstream Map exception')
1071-
self.assertTrue(
1072-
_wait_for_marker(marker),
1073-
'reader leaked across the integration pipeline -- the SDK '
1074-
'harness path that drops the DoFn generator on downstream '
1075-
'failure did not trigger our finally close.')
1076-
finally:
1077-
if os.path.exists(marker):
1078-
os.unlink(marker)
1079-
10801045

10811046
# ------------------------------------------------------------------------------
10821047
# Stronger regression guards (added after independent second-opinion review).

0 commit comments

Comments
 (0)