stream: add gather operator and tighten coverage#2824
stream: add gather operator and tighten coverage#2824He-Pin wants to merge 2 commits intoapache:mainfrom
Conversation
Motivation: add the gather operator across the Scala and Java DSLs, document it, tighten its execution semantics, and broaden its statefulMap-equivalent coverage. Modification: implement the gather stage and DSL wiring, add Scala/Java/docs examples and tests, add JMH coverage, optimize the public gather hot path, and fix the one-to-one backpressure bug found in review. Result: gather is now documented and verified end-to-end with stronger semantics, broader regression coverage, and near-parity zipWithIndex performance against statefulMap. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Motivation: follow-up review and documentation work for the new gather operator. Modification: correct the new gather API @SInCE annotations to 2.0.0 and document how gather coverage aligns with statefulMap semantics while differing on internal null-state handling. Result: the published PR now reflects the intended release version and explains the remaining semantic differences more clearly. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
He-Pin
left a comment
There was a problem hiding this comment.
This is a substantial PR adding the gather operator. The implementation looks thorough with good test coverage.
Positive:
- Excellent test coverage (764 lines of spec) including supervision, backpressure, and completion scenarios
- Both Scala and Java DSL support with proper documentation
- JMH benchmarks added to validate performance claims
OneToOneGathereroptimization for the common 1:1 case is a good design choice
Concerns:
-
docs/gather_zero_allocation_evaluation.md-- this file appears to be an AI-generated evaluation document in Chinese. It doesn't belong in the final PR and should be removed before merge. It's not user-facing documentation. -
The
gatheroperator overlaps significantly withstatefulMap. The docs acknowledge this but the distinction (avoiding per-element tuple allocation) may not justify the added API surface for most users. The benchmarks will help justify this, but I'd recommend making sure the performance advantage is clearly documented in the main docs, not just the JMH results. -
GatherCollector-- what happens ifpushis called after downstream has cancelled? The spec should cover this edge case explicitly. -
The
@since 2.0.0markers are noted as corrected -- good. Just verify this aligns with the actual release plan.
He-Pin
left a comment
There was a problem hiding this comment.
Deep CR: PR #2824 - stream: add gather operator and tighten coverage
Architecture Review
Concept: The gather operator introduces a mutable-state, direct-push model for stream transformations. Unlike statefulMap which requires returning (S, Out) tuples (causing per-element allocation), gather uses a Gatherer that mutates internal state and pushes to a GatherCollector.
API Design:
Gatherer[In, Out]withapply(in, collector)and optionalonComplete(collector)- clean and minimalGatherCollector[-Out]withpush(elem)- simple emission interfaceOneToOneGatherer[In, Out]- internal optimization for the common 1:1 mapping case, avoiding the collector indirection@DoNotInheritonGatherCollector- correct, users should not implement their own collector@FunctionalInterfaceonGatherer- correct, allows lambda usage in Java
Scala vs Java DSL alignment:
- Scala:
GatherCollectoris a trait withpush(elem: Out): Unit - Java:
GatherCollectorextendsfunction.Procedure[Out]withapplydelegating topush- this allows Java users to pass method references - Both DSLs share the same implementation via
viaScaladelegation
Binary Compatibility
New public API classes and methods are added to the stream module:
Gatherer,GatherCollector,OneToOneGatherer(new types)gather()method onSource,Flow,SubFlow,SubSource(both scaladsl and javadsl)
All changes are additive. Since this is Pekko 2.0 (major version), this is expected and acceptable. No MiMa exclusions needed for new APIs.
Important: @since 2.0.0 markers are correctly applied to all new public APIs.
Performance Analysis
The PR includes JMH benchmarks comparing:
statefulMap-based zipWithIndex (baseline)gather-based zipWithIndex (public API)OneToOneGatherer-based zipWithIndex (internal optimization)
The PR description claims "gather-based zipWithIndex is now near parity with statefulMap". This means the optimization goal is NOT to beat statefulMap performance, but to provide comparable performance while offering a zero-allocation API for users who need it.
The real value of gather is NOT in the 1:1 case (where statefulMap performs equally well), but in N:M transformations where statefulMapConcat would allocate an Iterable[Out] per element. The benchmarks should ideally compare gather against statefulMapConcat for fan-out scenarios, not just statefulMap.
OneToOneGatherer optimization: This is an internal type (private[stream]) used by the implementation when a gatherer is known to produce exactly one output per input. It avoids the collector.push indirection. However, looking at the API, users cannot explicitly opt into this optimization - it would need to be detected at runtime or provided through a separate factory method.
Test Coverage
FlowGatherSpec (764 lines) covers:
- Happy-path stateful mapping
- Delayed completion output (onComplete pushing elements)
- Restart supervision behavior
- Stop supervision behavior
- Backpressure-sensitive transformations
- Java parity tests (FlowTest, SourceTest)
Missing test scenarios:
- Null element handling: What happens if
collector.push(null)is called? The Java docs say "MUST NOT be null" but is this enforced? - Push after onComplete: What happens if a user calls
collector.push()after the gatherer has already completed? - Push after downstream cancellation: The docs say "ignored on downstream cancellation" - is this tested?
- Multiple pushes from a single apply: Tested indirectly via batching examples, but no explicit stress test.
- Backpressure with large buffers: Does the collector correctly respect downstream demand, or does it buffer unbounded?
Documentation
Positive:
- Comprehensive operator documentation with Scala and Java examples
- Reactive Streams semantics documented (emits, backpressures, completes, cancels)
- Index updated with new operator entry
Concern: docs/gather_zero_allocation_evaluation.md is an AI-generated internal evaluation document in Chinese. It should NOT be included in the final PR - it is not user-facing documentation and clutters the docs directory.
Code Quality
GatherCollector safety: The collector is documented as "only valid while the current Gatherer callback is running." This implies the implementation must invalidate the collector reference after the callback returns. If a user stores a reference to the collector and uses it asynchronously, this would cause undefined behavior. The implementation should guard against this.
Supervision strategy: The PR mentions adherence to ActorAttributes.SupervisionStrategy. On restart, a new Gatherer instance is created (via the factory), losing all state. On stop, the stream terminates. This matches statefulMap behavior, which is correct for consistency.
Suggestions
- Remove
docs/gather_zero_allocation_evaluation.md- it is an internal AI evaluation document, not user documentation. - Add null check in
collector.push()to fail fast with a clear error message when Java users accidentally push null. - Add collector lifecycle validation - detect and fail when
push()is called outside the active callback context. - Consider a factory method for creating
OneToOneGathererinstances, so users who know their transformation is 1:1 can opt into the optimization. - Add benchmark comparison between
gatherandstatefulMapConcatfor fan-out scenarios, not juststatefulMap.
Motivation
Add the
gatheroperator across the Scala and Java DSLs, tighten its execution semantics, and broaden its statefulMap-equivalent regression coverage.This PR also includes the hot-path and backpressure fixes found during review, plus benchmark and documentation support for the new operator.
Modification
gatherAPI support in the Scala and Java DSLsGatherstage and supportingGatherer/GatherCollectorAPIsFlowGatherSpecplus Java parity coveragezipWithIndex-style workloadsOneToOneGathererbackpressure issue found during review@sincemarkers to2.0.0Result
gatheris documented and available end-to-end in Scala and Java DSLs2.0.0) for the new APIssbt --no-colors 'scalafmtAll' 'stream-tests/testOnly org.apache.pekko.stream.scaladsl.FlowGatherSpec' 'stream-tests/testOnly org.apache.pekko.stream.javadsl.FlowTest' 'stream-tests/testOnly org.apache.pekko.stream.javadsl.SourceTest' 'docs/test:compile' 'bench-jmh/Jmh/compile'gather-basedzipWithIndexis now near parity withstatefulMapReferences
stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scalastream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowGatherSpec.scalastream/src/main/scala/org/apache/pekko/stream/scaladsl/Gather.scalastream/src/main/scala/org/apache/pekko/stream/javadsl/Gather.scaladocs/src/main/paradox/stream/operators/Source-or-Flow/gather.md