Skip to content

[Python] Add UnboundedSource SDF wrapper (#19137)#38723

Closed
Eliaaazzz wants to merge 4 commits into
apache:masterfrom
Eliaaazzz:gsoc-week1-minimal-poc
Closed

[Python] Add UnboundedSource SDF wrapper (#19137)#38723
Eliaaazzz wants to merge 4 commits into
apache:masterfrom
Eliaaazzz:gsoc-week1-minimal-poc

Conversation

@Eliaaazzz
Copy link
Copy Markdown
Contributor

Adds a Splittable-DoFn wrapper that brings Java's UnboundedSource /
UnboundedReader / CheckpointMark abstractions to the Python SDK and
makes them runnable on the portable Fn API (DirectRunner / FnApiRunner).
Wires the new source type into iobase.Read.expand() so
p | beam.io.Read(my_unbounded_source) dispatches alongside the existing
BoundedSource branch. Loosely inspired by Java's
Read.UnboundedSourceAsSDFWrapperFn -- not a literal port. The
streaming-SDF template followed for the process loop / watermark / defer
plumbing is apache_beam.transforms.periodicsequence.

Status: draft -- opening for early feedback from @yhu (mentor) while a
design-first thread goes to dev@beam.apache.org in parallel. The MVP is
deliberately small; see the "Out of scope" list below.

addresses #19137

What's in this PR

Two new files under sdks/python/apache_beam/io/:

  • unbounded_source.py (~745 lines incl. docstrings): public ABCs
    (CheckpointMark, UnboundedReader, UnboundedSource,
    ReadFromUnboundedSource) + SDF wrapper internals
    (_UnboundedSourceRestriction, _UnboundedSourceRestrictionCoder,
    _UnboundedSourceRestrictionTracker,
    _UnboundedSourceRestrictionProvider).
  • unbounded_source_test.py (~1130 lines): unit + integration coverage.

Plus a small change to iobase.py (~26 lines):

  • Read.expand() gains an UnboundedSource branch (lazy import to break
    the iobase <-> unbounded_source cycle) that delegates to
    ReadFromUnboundedSource.
  • Read.to_runner_api_parameter widens the source isinstance to
    (BoundedSource, UnboundedSource), writing READ.urn +
    IsBounded.UNBOUNDED. Decoding rides the existing PICKLED_SOURCE
    URN registered on SourceBase.

Correctness highlights

  • Watermark on data path uses reader.get_watermark() (Java
    Read.java:594 parity), not the per-record event time. Holder is
    (value, record_ts, source_wm); record event time labels the output
    TimestampedValue, source watermark advances the estimator.
  • Restriction has separate channels for resume (checkpoint_mark)
    and commit hook (finalization_checkpoint_mark) so a done primary's
    finalize callback cannot contaminate the residual's resume state.
  • Reader is closed on every exit path -- EOF and split paths close
    inside the tracker; try_claim / try_split wrap reader-method
    calls and close before re-raising; the DoFn's finally provides
    defense-in-depth for downstream yield exceptions via the SDF wrapper's
    private chain with an isinstance guard and a warning log if the
    chain is ever refactored upstream.
  • EOF advances the watermark estimator to MAX_TIMESTAMP so
    downstream event-time windows can close (would otherwise hang at the
    last reported watermark).
  • Initial fan-out via UnboundedSource.split(desired_num_splits=20, options) -- validates that returned sub-sources are
    UnboundedSource instances (raises TypeError if not, outside
    the split-refusal except); on split-refusal exceptions, falls back
    to single-restriction and logs WARNING.
  • default_output_coder wired via
    coders.registry.register_coder + element_type so a custom
    source-declared coder reaches the output PCollection through Beam's
    standard registry lookup (parameterised coders still require explicit
    user registration; logged as a warning).
  • poll_interval_seconds validated to be > 0 in
    ReadFromUnboundedSource.__init__.

Test coverage

58 tests, all green locally on Python 3.13 + Beam 2.71:

  • unbounded_source_test.py (42): ABC contracts; restriction coder
    round-trip; restriction tracker state machine (claim / split / EOF /
    no-data / check_done / progress / is_bounded); finalize idempotency;
    source-watermark vs. record-timestamp regression; finalize/resume
    channel separation; tracker-internal exception close on
    reader.advance and reader.get_watermark failures; DoFn
    generator close path (unit + integration with downstream raising
    Map); cloudpickle round-trip for transform and source; circular
    import in three orderings via subprocess + tempfile; e2e DirectRunner
    pipeline (records in order + windowed GroupByKey).
  • iobase_test.py (+3): Read(UnboundedSource) dispatch through the
    new expand branch; Read.to_runner_api / from_runner_api
    round-trip with IsBounded.UNBOUNDED; PCollection is_bounded
    assertion.

Out of scope (deferred to W2+, tracked under #19137)

Listed exhaustively in the module docstring at
sdks/python/apache_beam/io/unbounded_source.py:

  • Record-id-based deduplication (Java's ValueWithRecordId).
  • Backlog-byte reporting (restriction_size is constant 1;
    current_progress is binary 0.0 / 1.0).
  • Dynamic split fractions / runner-initiated work stealing.
  • Source-specific checkpoint coders threaded through the SDF restriction
    coder (today the coder always pickles checkpoint marks via
    _MemoizingPickleCoder regardless of the source's
    get_checkpoint_mark_coder).
  • Reader caching across bundles (Java caches readers via a Guava cache;
    this PoC always rebuilds the reader from the checkpoint).
  • EmptyUnboundedSource terminal-state marker (we use an is_done
    flag on the restriction instead).
  • Runner-side IsBounded.UNBOUNDED dispatch in
    bundle_processor.IMPULSE_READ_TRANSFORM. Today the wire format
    round-trips correctly but execution flows through the composite's
    expanded sub-transforms (Impulse | Map | SDF-ParDo), not the URN
    handler.

  • Mention the appropriate issue in your description -- addresses #19137.
  • Update CHANGES.md with noteworthy changes -- intentionally
    deferred until W1/W2 scope is finalised on dev@; will add before
    the PR moves out of draft.
  • ICLA -- to be confirmed by mentor (large contribution under GSoC
    2026).

cc @yhu for mentor review.

Eliaaazzz added 4 commits May 26, 2026 00:45
Add UnboundedSource, UnboundedReader, and CheckpointMark ABCs (Java
semantics, Python names) plus a Splittable-DoFn wrapper so that
`p | ReadFromUnboundedSource(MySource())` runs an unbounded source on the
portable Fn API DirectRunner. Mirrors the in-tree periodicsequence SDF
(`@DoFn.unbounded_per_element` + `restriction_tracker.defer_remainder` +
ManualWatermarkEstimator) and touches no core files (no iobase.py or
sdf_utils.py edits).

Covers read, event-time timestamps, monotonic watermark, checkpoint-based
pause/resume, bundle finalization, and the MAX-watermark done transition.
Out of scope (later weeks): record-id dedup, backlog reporting, dynamic
split fractions, and iobase.Read.expand() wiring.

14 deterministic tests pass, including an end-to-end DirectRunner read.

GSoC 2026 Week-1 deliverable for issue apache#19137.
* iobase.Read.expand dispatches UnboundedSource through
  ReadFromUnboundedSource (Impulse | Map | ParDo); function-local lazy
  import breaks the iobase <-> unbounded_source cycle. The Read.__init__
  docstring is updated to describe the new dispatch.

* iobase.Read.to_runner_api_parameter widens the source isinstance to
  (BoundedSource, UnboundedSource) so Read(unbounded_source) graphs
  round-trip as READ.urn + ReadPayload(is_bounded=UNBOUNDED). Runner-
  side dispatch on the UNBOUNDED flag in bundle_processor stays W2.

* SDF tracker correctness:
  - Data-path watermark now propagates reader.get_watermark() (Java
    Read.java:594 parity); holder is (value, record_ts, source_wm).
  - _UnboundedSourceRestriction adds finalization_checkpoint_mark so
    a done primary can carry a commit hook independent of the
    residual's RESUME checkpoint_mark. Coder is now a fixed 5-tuple.
  - try_claim / try_split close the reader before re-raising on any
    reader method failure; the DoFn finally is reduced to defense-
    in-depth for yield / downstream-raise paths and logs a warning if
    the private RestrictionTrackerView chain ever breaks.
  - ReadFromUnboundedSource validates poll_interval_seconds > 0.

* Regression coverage:
  - File-marker side-channel tests for EOF watermark advance to MAX
    and reader-close on (i) reader.advance raise, (ii) reader.get_*
    raise, (iii) downstream yield raise.
  - try_split / EOF separation of finalize and resume channels.
  - Circular import order in 3 subprocesses (clean module cache):
    iobase first, unbounded_source first, lazy via Read.expand.
  - Cloudpickle round-trip for the transform and source.
  - to_runner_api / from_runner_api round-trip asserting
    IsBounded.UNBOUNDED enum and source recovery.

unbounded_source_test 37/37, iobase_test 16/16.

Tracking apache#19137.
* default_output_coder wiring (HIGH): setting output.element_type alone
  let the registry's default coder for the type_hint silently shadow
  the source's declared coder. Now also register the source-declared
  coder against the element type via coders.registry.register_coder
  before assigning element_type, so the runner's coder lookup returns
  the source's coder. Registration failures (parameterised coders) are
  logged as a warning instead of crashing the pipeline build.

* Provider.split validation (LOW): a non-UnboundedSource returned from
  source.split() is a source-contract violation, not a split-refusal.
  Move the isinstance check OUTSIDE the try/except around source.split
  so we fail loudly instead of silently running single-shard.

* DoFn yield-raise close test (MEDIUM): the previous unit test used
  generator.throw(RuntimeError) which doesn't match Beam's SDK harness
  path (the harness raises in receiver.receive *outside* the user
  generator, then drops the generator). Switch to generator.close()
  which triggers GeneratorExit at the active yield -- the actual
  cleanup path Beam takes. Also add an integration test that runs a
  real pipeline with a downstream Map that raises, exercising
  common._OutputHandler.handle_process_outputs end-to-end.

42/42 unbounded_source_test, 16/16 iobase_test.

Tracking apache#19137.
@Eliaaazzz
Copy link
Copy Markdown
Contributor Author

Withdrawing to fix CI failures (yapf formatting + isort import ordering on the 3 modified files). Will re-open after local lint clean and re-verification.

@Eliaaazzz Eliaaazzz closed this May 28, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant