[Python] Add UnboundedSource SDF wrapper (#19137)#38723
Closed
Eliaaazzz wants to merge 4 commits into
Closed
Conversation
Add UnboundedSource, UnboundedReader, and CheckpointMark ABCs (Java semantics, Python names) plus a Splittable-DoFn wrapper so that `p | ReadFromUnboundedSource(MySource())` runs an unbounded source on the portable Fn API DirectRunner. Mirrors the in-tree periodicsequence SDF (`@DoFn.unbounded_per_element` + `restriction_tracker.defer_remainder` + ManualWatermarkEstimator) and touches no core files (no iobase.py or sdf_utils.py edits). Covers read, event-time timestamps, monotonic watermark, checkpoint-based pause/resume, bundle finalization, and the MAX-watermark done transition. Out of scope (later weeks): record-id dedup, backlog reporting, dynamic split fractions, and iobase.Read.expand() wiring. 14 deterministic tests pass, including an end-to-end DirectRunner read. GSoC 2026 Week-1 deliverable for issue apache#19137.
* iobase.Read.expand dispatches UnboundedSource through
ReadFromUnboundedSource (Impulse | Map | ParDo); function-local lazy
import breaks the iobase <-> unbounded_source cycle. The Read.__init__
docstring is updated to describe the new dispatch.
* iobase.Read.to_runner_api_parameter widens the source isinstance to
(BoundedSource, UnboundedSource) so Read(unbounded_source) graphs
round-trip as READ.urn + ReadPayload(is_bounded=UNBOUNDED). Runner-
side dispatch on the UNBOUNDED flag in bundle_processor stays W2.
* SDF tracker correctness:
- Data-path watermark now propagates reader.get_watermark() (Java
Read.java:594 parity); holder is (value, record_ts, source_wm).
- _UnboundedSourceRestriction adds finalization_checkpoint_mark so
a done primary can carry a commit hook independent of the
residual's RESUME checkpoint_mark. Coder is now a fixed 5-tuple.
- try_claim / try_split close the reader before re-raising on any
reader method failure; the DoFn finally is reduced to defense-
in-depth for yield / downstream-raise paths and logs a warning if
the private RestrictionTrackerView chain ever breaks.
- ReadFromUnboundedSource validates poll_interval_seconds > 0.
* Regression coverage:
- File-marker side-channel tests for EOF watermark advance to MAX
and reader-close on (i) reader.advance raise, (ii) reader.get_*
raise, (iii) downstream yield raise.
- try_split / EOF separation of finalize and resume channels.
- Circular import order in 3 subprocesses (clean module cache):
iobase first, unbounded_source first, lazy via Read.expand.
- Cloudpickle round-trip for the transform and source.
- to_runner_api / from_runner_api round-trip asserting
IsBounded.UNBOUNDED enum and source recovery.
unbounded_source_test 37/37, iobase_test 16/16.
Tracking apache#19137.
* default_output_coder wiring (HIGH): setting output.element_type alone let the registry's default coder for the type_hint silently shadow the source's declared coder. Now also register the source-declared coder against the element type via coders.registry.register_coder before assigning element_type, so the runner's coder lookup returns the source's coder. Registration failures (parameterised coders) are logged as a warning instead of crashing the pipeline build. * Provider.split validation (LOW): a non-UnboundedSource returned from source.split() is a source-contract violation, not a split-refusal. Move the isinstance check OUTSIDE the try/except around source.split so we fail loudly instead of silently running single-shard. * DoFn yield-raise close test (MEDIUM): the previous unit test used generator.throw(RuntimeError) which doesn't match Beam's SDK harness path (the harness raises in receiver.receive *outside* the user generator, then drops the generator). Switch to generator.close() which triggers GeneratorExit at the active yield -- the actual cleanup path Beam takes. Also add an integration test that runs a real pipeline with a downstream Map that raises, exercising common._OutputHandler.handle_process_outputs end-to-end. 42/42 unbounded_source_test, 16/16 iobase_test. Tracking apache#19137.
Contributor
Author
|
Withdrawing to fix CI failures (yapf formatting + isort import ordering on the 3 modified files). Will re-open after local lint clean and re-verification. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Adds a Splittable-DoFn wrapper that brings Java's
UnboundedSource/UnboundedReader/CheckpointMarkabstractions to the Python SDK andmakes them runnable on the portable Fn API (DirectRunner / FnApiRunner).
Wires the new source type into
iobase.Read.expand()sop | beam.io.Read(my_unbounded_source)dispatches alongside the existingBoundedSourcebranch. Loosely inspired by Java'sRead.UnboundedSourceAsSDFWrapperFn-- not a literal port. Thestreaming-SDF template followed for the process loop / watermark / defer
plumbing is
apache_beam.transforms.periodicsequence.Status: draft -- opening for early feedback from @yhu (mentor) while a
design-first thread goes to
dev@beam.apache.orgin parallel. The MVP isdeliberately small; see the "Out of scope" list below.
addresses #19137
What's in this PR
Two new files under
sdks/python/apache_beam/io/:unbounded_source.py(~745 lines incl. docstrings): public ABCs(
CheckpointMark,UnboundedReader,UnboundedSource,ReadFromUnboundedSource) + SDF wrapper internals(
_UnboundedSourceRestriction,_UnboundedSourceRestrictionCoder,_UnboundedSourceRestrictionTracker,_UnboundedSourceRestrictionProvider).unbounded_source_test.py(~1130 lines): unit + integration coverage.Plus a small change to
iobase.py(~26 lines):Read.expand()gains anUnboundedSourcebranch (lazy import to breakthe
iobase<->unbounded_sourcecycle) that delegates toReadFromUnboundedSource.Read.to_runner_api_parameterwidens the sourceisinstanceto(BoundedSource, UnboundedSource), writingREAD.urn+IsBounded.UNBOUNDED. Decoding rides the existingPICKLED_SOURCEURN registered on
SourceBase.Correctness highlights
reader.get_watermark()(JavaRead.java:594parity), not the per-record event time. Holder is(value, record_ts, source_wm); record event time labels the outputTimestampedValue, source watermark advances the estimator.checkpoint_mark)and commit hook (
finalization_checkpoint_mark) so a done primary'sfinalize callback cannot contaminate the residual's resume state.
inside the tracker;
try_claim/try_splitwrap reader-methodcalls and close before re-raising; the DoFn's
finallyprovidesdefense-in-depth for downstream yield exceptions via the SDF wrapper's
private chain with an
isinstanceguard and a warning log if thechain is ever refactored upstream.
MAX_TIMESTAMPsodownstream event-time windows can close (would otherwise hang at the
last reported watermark).
UnboundedSource.split(desired_num_splits=20, options)-- validates that returned sub-sources areUnboundedSourceinstances (raisesTypeErrorif not, outsidethe split-refusal
except); on split-refusal exceptions, falls backto single-restriction and logs WARNING.
default_output_coderwired viacoders.registry.register_coder+element_typeso a customsource-declared coder reaches the output PCollection through Beam's
standard registry lookup (parameterised coders still require explicit
user registration; logged as a warning).
poll_interval_secondsvalidated to be > 0 inReadFromUnboundedSource.__init__.Test coverage
58 tests, all green locally on Python 3.13 + Beam 2.71:
unbounded_source_test.py(42): ABC contracts; restriction coderround-trip; restriction tracker state machine (claim / split / EOF /
no-data / check_done / progress / is_bounded); finalize idempotency;
source-watermark vs. record-timestamp regression; finalize/resume
channel separation; tracker-internal exception close on
reader.advanceandreader.get_watermarkfailures; DoFngenerator close path (unit + integration with downstream raising
Map); cloudpickle round-trip for transform and source; circularimport in three orderings via subprocess + tempfile; e2e DirectRunner
pipeline (records in order + windowed GroupByKey).
iobase_test.py(+3):Read(UnboundedSource)dispatch through thenew
expandbranch;Read.to_runner_api / from_runner_apiround-trip with
IsBounded.UNBOUNDED; PCollectionis_boundedassertion.
Out of scope (deferred to W2+, tracked under #19137)
Listed exhaustively in the module docstring at
sdks/python/apache_beam/io/unbounded_source.py:ValueWithRecordId).restriction_sizeis constant 1;current_progressis binary 0.0 / 1.0).coder (today the coder always pickles checkpoint marks via
_MemoizingPickleCoderregardless of the source'sget_checkpoint_mark_coder).this PoC always rebuilds the reader from the checkpoint).
EmptyUnboundedSourceterminal-state marker (we use anis_doneflag on the restriction instead).
IsBounded.UNBOUNDEDdispatch inbundle_processor.IMPULSE_READ_TRANSFORM. Today the wire formatround-trips correctly but execution flows through the composite's
expanded sub-transforms (
Impulse | Map | SDF-ParDo), not the URNhandler.
addresses #19137.CHANGES.mdwith noteworthy changes -- intentionallydeferred until W1/W2 scope is finalised on
dev@; will add beforethe PR moves out of draft.
2026).
cc @yhu for mentor review.