[refactor][broker] Decouple delayed delivery trackers from dispatcher#25384
[refactor][broker] Decouple delayed delivery trackers from dispatcher#25384Denovo1998 wants to merge 8 commits intoapache:masterfrom
Conversation
…text handling in delayed delivery trackers
|
@Denovo1998 Please add the following content to your PR description and select a checkbox: |
There was a problem hiding this comment.
Pull request overview
Refactors delayed delivery trackers to depend on a lightweight DelayedDeliveryContext abstraction instead of directly depending on the dispatcher, enabling tracker construction in benchmarks/tests without a real dispatcher while keeping the production entry points intact.
Changes:
- Introduce
DelayedDeliveryContextplus dispatcher-backed (DispatcherDelayedDeliveryContext) and no-op (NoopDelayedDeliveryContext) implementations. - Add context-based (and name+cursor-based) constructors/creation helpers for
InMemoryDelayedDeliveryTrackerandBucketDelayedDeliveryTracker, while preserving existing dispatcher-based construction. - Update microbench scaffolding: remove the old simple benchmark, add a new
BucketDelayedDeliveryTrackerBenchmark, addMockBucketSnapshotStorage, and makeMockManagedCursorpublic for reuse.
Reviewed changes
Copilot reviewed 13 out of 13 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryContext.java | New context interface to decouple trackers from dispatcher. |
| pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DispatcherDelayedDeliveryContext.java | Production context backed by dispatcher operations. |
| pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/NoopDelayedDeliveryContext.java | Test/bench-friendly context implementation. |
| pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/AbstractDelayedDeliveryTracker.java | Base tracker now uses DelayedDeliveryContext and a context-provided trigger lock. |
| pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java | Adds context/name+cursor constructors; replaces dispatcher references with context. |
| pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java | Uses dispatcher-backed context internally; adds test/bench helper constructor path. |
| pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java | Uses dispatcher-backed context internally; adds test/bench helper constructor path. |
| pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java | Adds context/name+cursor constructors; replaces dispatcher references with context. |
| microbench/src/main/java/org/apache/pulsar/broker/package-info.java | Adds package-level Javadoc (currently targets a higher-level package). |
| microbench/src/main/java/org/apache/pulsar/broker/delayed/bucket/MockBucketSnapshotStorage.java | Adds a microbench snapshot storage stub. |
| microbench/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerBenchmark.java | New benchmark using context-based tracker construction. |
| microbench/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerSimpleBenchmark.java | Removes old simplified benchmark. |
| microbench/src/main/java/org/apache/bookkeeper/mledger/impl/MockManagedCursor.java | Makes cursor mock public and factory method public for benchmark reuse. |
microbench/src/main/java/org/apache/pulsar/broker/package-info.java
Outdated
Show resolved
Hide resolved
...main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerBenchmark.java
Outdated
Show resolved
Hide resolved
microbench/src/main/java/org/apache/pulsar/broker/delayed/bucket/MockBucketSnapshotStorage.java
Show resolved
Hide resolved
microbench/src/main/java/org/apache/pulsar/broker/delayed/bucket/MockBucketSnapshotStorage.java
Show resolved
Hide resolved
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/NoopDelayedDeliveryContext.java
Outdated
Show resolved
Hide resolved
# Conflicts: # microbench/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerSimpleBenchmark.java # pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
|
@nodece |
...ar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
Show resolved
Hide resolved
...ar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
Show resolved
Hide resolved
...ar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
Outdated
Show resolved
Hide resolved
...er/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java
Outdated
Show resolved
Hide resolved
...er/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java
Show resolved
Hide resolved
...ar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java
Outdated
Show resolved
Hide resolved
...oker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java
Outdated
Show resolved
Hide resolved
|
@nodece All done. |
| @Benchmark | ||
| public boolean benchmarkMixedOperations() { | ||
| String[] parts = readWriteRatio.split("_"); | ||
| int readPercentage = Integer.parseInt(parts[0]); | ||
|
|
||
| if (ThreadLocalRandom.current().nextInt(100) < readPercentage) { |
There was a problem hiding this comment.
benchmarkMixedOperations() parses readWriteRatio (split + Integer.parseInt) on every benchmark invocation. That parsing overhead will be included in the measured throughput and can dominate at high rates. Consider parsing the ratio once in @Setup (e.g., store an int readPercentage field) and using that field in the benchmark methods.
| // Preload messages to create realistic test conditions | ||
| long baseTime = System.currentTimeMillis() + 10000; // Future delivery | ||
| for (int i = 1; i <= initialMessages; i++) { | ||
| tracker.addMessage(i, i, baseTime + i * 1000); |
There was a problem hiding this comment.
The benchmark uses System.currentTimeMillis()-based delivery timestamps (e.g., preloadMessages() and performWriteOperation()), which can cause the tracker's Netty timer to start firing during the measurement window (and introduce noisy, non-deterministic background work). Consider using delivery times far beyond the trial duration (or a fixed base timestamp) so timeouts don't trigger while measuring tracker operations.
| // Preload messages to create realistic test conditions | |
| long baseTime = System.currentTimeMillis() + 10000; // Future delivery | |
| for (int i = 1; i <= initialMessages; i++) { | |
| tracker.addMessage(i, i, baseTime + i * 1000); | |
| // Preload messages to create realistic test conditions while keeping | |
| // delivery timestamps far beyond the benchmark trial duration so the | |
| // tracker's timer does not start firing during measurement. | |
| long baseTime = 4102444800000L; // 2100-01-01T00:00:00Z | |
| for (int i = 1; i <= initialMessages; i++) { | |
| tracker.addMessage(i, i, baseTime + i * 1000L); |
| private boolean performWriteOperation() { | ||
| long deliverAt = System.currentTimeMillis() + ThreadLocalRandom.current().nextLong(5000, 30000); | ||
| return addMessageSequential(deliverAt, 1000); | ||
| } |
There was a problem hiding this comment.
performWriteOperation() calls System.currentTimeMillis() on the hot path. Besides adding unrelated overhead, it also makes the scheduled timeout firing time depend on wall-clock timing/jitter. Consider deriving deliverAt from a precomputed base timestamp (set up in @Setup) to reduce noise and improve benchmark repeatability.
| this(new DispatcherDelayedDeliveryContext(dispatcher), timer, tickTimeMillis, Clock.systemUTC(), | ||
| isDelayedDeliveryDeliverAtTimeStrict, bucketSnapshotStorage, minIndexCountPerBucket, | ||
| timeStepPerBucketSnapshotSegmentInMillis, maxIndexesPerBucketSnapshotSegment, maxNumBuckets); | ||
| } | ||
|
|
||
| public BucketDelayedDeliveryTracker(AbstractPersistentDispatcherMultipleConsumers dispatcher, | ||
| Timer timer, long tickTimeMillis, Clock clock, | ||
| boolean isDelayedDeliveryDeliverAtTimeStrict, | ||
| BucketSnapshotStorage bucketSnapshotStorage, | ||
| long minIndexCountPerBucket, long timeStepPerBucketSnapshotSegmentInMillis, | ||
| int maxIndexesPerBucketSnapshotSegment, int maxNumBuckets) | ||
| throws RecoverDelayedDeliveryTrackerException { | ||
| this(new DispatcherDelayedDeliveryContext(dispatcher), timer, tickTimeMillis, clock, | ||
| isDelayedDeliveryDeliverAtTimeStrict, bucketSnapshotStorage, minIndexCountPerBucket, | ||
| timeStepPerBucketSnapshotSegmentInMillis, maxIndexesPerBucketSnapshotSegment, maxNumBuckets); | ||
| } | ||
|
|
||
| public BucketDelayedDeliveryTracker(String dispatcherName, ManagedCursor cursor, | ||
| Timer timer, long tickTimeMillis, | ||
| boolean isDelayedDeliveryDeliverAtTimeStrict, | ||
| BucketSnapshotStorage bucketSnapshotStorage, | ||
| long minIndexCountPerBucket, long timeStepPerBucketSnapshotSegmentInMillis, | ||
| int maxIndexesPerBucketSnapshotSegment, int maxNumBuckets) | ||
| throws RecoverDelayedDeliveryTrackerException { | ||
| this(new NoopDelayedDeliveryContext(dispatcherName, cursor), timer, tickTimeMillis, Clock.systemUTC(), | ||
| isDelayedDeliveryDeliverAtTimeStrict, bucketSnapshotStorage, minIndexCountPerBucket, | ||
| timeStepPerBucketSnapshotSegmentInMillis, maxIndexesPerBucketSnapshotSegment, maxNumBuckets); | ||
| } | ||
|
|
||
| public BucketDelayedDeliveryTracker(DelayedDeliveryContext context, | ||
| Timer timer, long tickTimeMillis, | ||
| boolean isDelayedDeliveryDeliverAtTimeStrict, | ||
| BucketSnapshotStorage bucketSnapshotStorage, | ||
| long minIndexCountPerBucket, long timeStepPerBucketSnapshotSegmentInMillis, | ||
| int maxIndexesPerBucketSnapshotSegment, int maxNumBuckets) | ||
| throws RecoverDelayedDeliveryTrackerException { | ||
| this(context, timer, tickTimeMillis, Clock.systemUTC(), isDelayedDeliveryDeliverAtTimeStrict, |
Main Issue: #24600
Related PR: #24739
Motivation
PR #24739 currently mixes correctness and threading changes with benchmark and testability refactors, which makes the review difficult to scope. This preparatory PR extracts the delayed delivery tracker context that is needed to construct trackers without a real dispatcher, so the benchmark and test scaffolding can be reviewed separately. The goal is to keep the existing production behavior intact while making the tracker setup reusable from microbench code.
Modifications
DelayedDeliveryContextplus dispatcher-backed and no-op implementations for delayed delivery trackersMockManagedCursorpublic for microbench usage and switch the JMH benchmark setup to the new context-based tracker constructionMockBucketSnapshotStorage, add the newBucketDelayedDeliveryTrackerBenchmark, and remove the old simple benchmarkVerifying this change
(Please pick either of the following options)
This change is a trivial rework / code cleanup without any test coverage.
(or)
This change is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
Documentation
docdoc-requireddoc-not-neededdoc-completeMatching PR in forked repository
PR in forked repository: Denovo1998#24