Skip to content

stream: add gather operator and tighten coverage#2824

Closed
He-Pin wants to merge 2 commits intoapache:mainfrom
He-Pin:hepin-gather-statefulmap-coverage
Closed

stream: add gather operator and tighten coverage#2824
He-Pin wants to merge 2 commits intoapache:mainfrom
He-Pin:hepin-gather-statefulmap-coverage

Conversation

@He-Pin
Copy link
Copy Markdown
Member

@He-Pin He-Pin commented Mar 29, 2026

Motivation

Add the gather operator across the Scala and Java DSLs, tighten its execution semantics, and broaden its statefulMap-equivalent regression coverage.

This PR also includes the hot-path and backpressure fixes found during review, plus benchmark and documentation support for the new operator.

Modification

  • add gather API support in the Scala and Java DSLs
  • implement the Gather stage and supporting Gatherer / GatherCollector APIs
  • add Scala, Java, and docs examples for the operator
  • add FlowGatherSpec plus Java parity coverage
  • add JMH coverage for zipWithIndex-style workloads
  • optimize the public single-output gather hot path
  • fix the OneToOneGatherer backpressure issue found during review
  • expand tests toward statefulMap-equivalent scenarios such as happy-path state retention, restart, stop, and backpressure
  • correct the new gather API @since markers to 2.0.0
  • document which statefulMap scenarios map directly to gather and which null-state cases do not translate one-to-one

Result

  • gather is documented and available end-to-end in Scala and Java DSLs
  • correctness coverage is materially stronger, especially around supervision and backpressure
  • the PR now documents the intended release version (2.0.0) for the new APIs
  • local verification passed:
    • sbt --no-colors 'scalafmtAll' 'stream-tests/testOnly org.apache.pekko.stream.scaladsl.FlowGatherSpec' 'stream-tests/testOnly org.apache.pekko.stream.javadsl.FlowTest' 'stream-tests/testOnly org.apache.pekko.stream.javadsl.SourceTest' 'docs/test:compile' 'bench-jmh/Jmh/compile'
  • corrected local JMH indicates gather-based zipWithIndex is now near parity with statefulMap

References

  • stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala
  • stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowGatherSpec.scala
  • stream/src/main/scala/org/apache/pekko/stream/scaladsl/Gather.scala
  • stream/src/main/scala/org/apache/pekko/stream/javadsl/Gather.scala
  • docs/src/main/paradox/stream/operators/Source-or-Flow/gather.md

He-Pin and others added 2 commits March 29, 2026 23:46
Motivation: add the gather operator across the Scala and Java DSLs, document it, tighten its execution semantics, and broaden its statefulMap-equivalent coverage.

Modification: implement the gather stage and DSL wiring, add Scala/Java/docs examples and tests, add JMH coverage, optimize the public gather hot path, and fix the one-to-one backpressure bug found in review.

Result: gather is now documented and verified end-to-end with stronger semantics, broader regression coverage, and near-parity zipWithIndex performance against statefulMap.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Motivation: follow-up review and documentation work for the new gather operator.

Modification: correct the new gather API @SInCE annotations to 2.0.0 and document how gather coverage aligns with statefulMap semantics while differing on internal null-state handling.

Result: the published PR now reflects the intended release version and explains the remaining semantic differences more clearly.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Copy link
Copy Markdown
Member Author

@He-Pin He-Pin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a substantial PR adding the gather operator. The implementation looks thorough with good test coverage.

Positive:

  • Excellent test coverage (764 lines of spec) including supervision, backpressure, and completion scenarios
  • Both Scala and Java DSL support with proper documentation
  • JMH benchmarks added to validate performance claims
  • OneToOneGatherer optimization for the common 1:1 case is a good design choice

Concerns:

  1. docs/gather_zero_allocation_evaluation.md -- this file appears to be an AI-generated evaluation document in Chinese. It doesn't belong in the final PR and should be removed before merge. It's not user-facing documentation.

  2. The gather operator overlaps significantly with statefulMap. The docs acknowledge this but the distinction (avoiding per-element tuple allocation) may not justify the added API surface for most users. The benchmarks will help justify this, but I'd recommend making sure the performance advantage is clearly documented in the main docs, not just the JMH results.

  3. GatherCollector -- what happens if push is called after downstream has cancelled? The spec should cover this edge case explicitly.

  4. The @since 2.0.0 markers are noted as corrected -- good. Just verify this aligns with the actual release plan.

Copy link
Copy Markdown
Member Author

@He-Pin He-Pin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deep CR: PR #2824 - stream: add gather operator and tighten coverage

Architecture Review

Concept: The gather operator introduces a mutable-state, direct-push model for stream transformations. Unlike statefulMap which requires returning (S, Out) tuples (causing per-element allocation), gather uses a Gatherer that mutates internal state and pushes to a GatherCollector.

API Design:

  • Gatherer[In, Out] with apply(in, collector) and optional onComplete(collector) - clean and minimal
  • GatherCollector[-Out] with push(elem) - simple emission interface
  • OneToOneGatherer[In, Out] - internal optimization for the common 1:1 mapping case, avoiding the collector indirection
  • @DoNotInherit on GatherCollector - correct, users should not implement their own collector
  • @FunctionalInterface on Gatherer - correct, allows lambda usage in Java

Scala vs Java DSL alignment:

  • Scala: GatherCollector is a trait with push(elem: Out): Unit
  • Java: GatherCollector extends function.Procedure[Out] with apply delegating to push - this allows Java users to pass method references
  • Both DSLs share the same implementation via viaScala delegation

Binary Compatibility

New public API classes and methods are added to the stream module:

  • Gatherer, GatherCollector, OneToOneGatherer (new types)
  • gather() method on Source, Flow, SubFlow, SubSource (both scaladsl and javadsl)

All changes are additive. Since this is Pekko 2.0 (major version), this is expected and acceptable. No MiMa exclusions needed for new APIs.

Important: @since 2.0.0 markers are correctly applied to all new public APIs.

Performance Analysis

The PR includes JMH benchmarks comparing:

  1. statefulMap-based zipWithIndex (baseline)
  2. gather-based zipWithIndex (public API)
  3. OneToOneGatherer-based zipWithIndex (internal optimization)

The PR description claims "gather-based zipWithIndex is now near parity with statefulMap". This means the optimization goal is NOT to beat statefulMap performance, but to provide comparable performance while offering a zero-allocation API for users who need it.

The real value of gather is NOT in the 1:1 case (where statefulMap performs equally well), but in N:M transformations where statefulMapConcat would allocate an Iterable[Out] per element. The benchmarks should ideally compare gather against statefulMapConcat for fan-out scenarios, not just statefulMap.

OneToOneGatherer optimization: This is an internal type (private[stream]) used by the implementation when a gatherer is known to produce exactly one output per input. It avoids the collector.push indirection. However, looking at the API, users cannot explicitly opt into this optimization - it would need to be detected at runtime or provided through a separate factory method.

Test Coverage

FlowGatherSpec (764 lines) covers:

  • Happy-path stateful mapping
  • Delayed completion output (onComplete pushing elements)
  • Restart supervision behavior
  • Stop supervision behavior
  • Backpressure-sensitive transformations
  • Java parity tests (FlowTest, SourceTest)

Missing test scenarios:

  1. Null element handling: What happens if collector.push(null) is called? The Java docs say "MUST NOT be null" but is this enforced?
  2. Push after onComplete: What happens if a user calls collector.push() after the gatherer has already completed?
  3. Push after downstream cancellation: The docs say "ignored on downstream cancellation" - is this tested?
  4. Multiple pushes from a single apply: Tested indirectly via batching examples, but no explicit stress test.
  5. Backpressure with large buffers: Does the collector correctly respect downstream demand, or does it buffer unbounded?

Documentation

Positive:

  • Comprehensive operator documentation with Scala and Java examples
  • Reactive Streams semantics documented (emits, backpressures, completes, cancels)
  • Index updated with new operator entry

Concern: docs/gather_zero_allocation_evaluation.md is an AI-generated internal evaluation document in Chinese. It should NOT be included in the final PR - it is not user-facing documentation and clutters the docs directory.

Code Quality

GatherCollector safety: The collector is documented as "only valid while the current Gatherer callback is running." This implies the implementation must invalidate the collector reference after the callback returns. If a user stores a reference to the collector and uses it asynchronously, this would cause undefined behavior. The implementation should guard against this.

Supervision strategy: The PR mentions adherence to ActorAttributes.SupervisionStrategy. On restart, a new Gatherer instance is created (via the factory), losing all state. On stop, the stream terminates. This matches statefulMap behavior, which is correct for consistency.

Suggestions

  1. Remove docs/gather_zero_allocation_evaluation.md - it is an internal AI evaluation document, not user documentation.
  2. Add null check in collector.push() to fail fast with a clear error message when Java users accidentally push null.
  3. Add collector lifecycle validation - detect and fail when push() is called outside the active callback context.
  4. Consider a factory method for creating OneToOneGatherer instances, so users who know their transformation is 1:1 can opt into the optimization.
  5. Add benchmark comparison between gather and statefulMapConcat for fan-out scenarios, not just statefulMap.

@He-Pin He-Pin closed this Apr 6, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant