Skip to content

[refactor][broker] Decouple delayed delivery trackers from dispatcher#25384

Open
Denovo1998 wants to merge 8 commits intoapache:masterfrom
Denovo1998:bucket_delivery_tracker_prep
Open

[refactor][broker] Decouple delayed delivery trackers from dispatcher#25384
Denovo1998 wants to merge 8 commits intoapache:masterfrom
Denovo1998:bucket_delivery_tracker_prep

Conversation

@Denovo1998
Copy link
Copy Markdown
Contributor

@Denovo1998 Denovo1998 commented Mar 22, 2026

Main Issue: #24600

Related PR: #24739

Motivation

PR #24739 currently mixes correctness and threading changes with benchmark and testability refactors, which makes the review difficult to scope. This preparatory PR extracts the delayed delivery tracker context that is needed to construct trackers without a real dispatcher, so the benchmark and test scaffolding can be reviewed separately. The goal is to keep the existing production behavior intact while making the tracker setup reusable from microbench code.

Modifications

  • introduce DelayedDeliveryContext plus dispatcher-backed and no-op implementations for delayed delivery trackers
  • add context-based and name+cursor-based constructors to the delayed delivery trackers, while keeping the existing dispatcher-based production entry points intact
  • update the delayed delivery tracker factories to expose benchmark/test construction helpers without changing the production factory flow
  • make MockManagedCursor public for microbench usage and switch the JMH benchmark setup to the new context-based tracker construction
  • add MockBucketSnapshotStorage, add the new BucketDelayedDeliveryTrackerBenchmark, and remove the old simple benchmark

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository: Denovo1998#24

@github-actions
Copy link
Copy Markdown

@Denovo1998 Please add the following content to your PR description and select a checkbox:

- [ ] `doc` <!-- Your PR contains doc changes -->
- [ ] `doc-required` <!-- Your PR changes impact docs and you will update later -->
- [ ] `doc-not-needed` <!-- Your PR changes do not impact docs -->
- [ ] `doc-complete` <!-- Docs have been already added -->

@Denovo1998
Copy link
Copy Markdown
Contributor Author

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Refactors delayed delivery trackers to depend on a lightweight DelayedDeliveryContext abstraction instead of directly depending on the dispatcher, enabling tracker construction in benchmarks/tests without a real dispatcher while keeping the production entry points intact.

Changes:

  • Introduce DelayedDeliveryContext plus dispatcher-backed (DispatcherDelayedDeliveryContext) and no-op (NoopDelayedDeliveryContext) implementations.
  • Add context-based (and name+cursor-based) constructors/creation helpers for InMemoryDelayedDeliveryTracker and BucketDelayedDeliveryTracker, while preserving existing dispatcher-based construction.
  • Update microbench scaffolding: remove the old simple benchmark, add a new BucketDelayedDeliveryTrackerBenchmark, add MockBucketSnapshotStorage, and make MockManagedCursor public for reuse.

Reviewed changes

Copilot reviewed 13 out of 13 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DelayedDeliveryContext.java New context interface to decouple trackers from dispatcher.
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/DispatcherDelayedDeliveryContext.java Production context backed by dispatcher operations.
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/NoopDelayedDeliveryContext.java Test/bench-friendly context implementation.
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/AbstractDelayedDeliveryTracker.java Base tracker now uses DelayedDeliveryContext and a context-provided trigger lock.
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTracker.java Adds context/name+cursor constructors; replaces dispatcher references with context.
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/InMemoryDelayedDeliveryTrackerFactory.java Uses dispatcher-backed context internally; adds test/bench helper constructor path.
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/BucketDelayedDeliveryTrackerFactory.java Uses dispatcher-backed context internally; adds test/bench helper constructor path.
pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java Adds context/name+cursor constructors; replaces dispatcher references with context.
microbench/src/main/java/org/apache/pulsar/broker/package-info.java Adds package-level Javadoc (currently targets a higher-level package).
microbench/src/main/java/org/apache/pulsar/broker/delayed/bucket/MockBucketSnapshotStorage.java Adds a microbench snapshot storage stub.
microbench/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerBenchmark.java New benchmark using context-based tracker construction.
microbench/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerSimpleBenchmark.java Removes old simplified benchmark.
microbench/src/main/java/org/apache/bookkeeper/mledger/impl/MockManagedCursor.java Makes cursor mock public and factory method public for benchmark reuse.

# Conflicts:
#	microbench/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTrackerSimpleBenchmark.java
#	pulsar-broker/src/main/java/org/apache/pulsar/broker/delayed/bucket/BucketDelayedDeliveryTracker.java
@Denovo1998
Copy link
Copy Markdown
Contributor Author

@nodece
All reviews have been processed.

@Denovo1998
Copy link
Copy Markdown
Contributor Author

@nodece All done.

@Denovo1998 Denovo1998 requested a review from nodece April 3, 2026 09:18
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 11 out of 11 changed files in this pull request and generated 3 comments.

Comment on lines +158 to +163
@Benchmark
public boolean benchmarkMixedOperations() {
String[] parts = readWriteRatio.split("_");
int readPercentage = Integer.parseInt(parts[0]);

if (ThreadLocalRandom.current().nextInt(100) < readPercentage) {
Copy link

Copilot AI Apr 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

benchmarkMixedOperations() parses readWriteRatio (split + Integer.parseInt) on every benchmark invocation. That parsing overhead will be included in the measured throughput and can dominate at high rates. Consider parsing the ratio once in @Setup (e.g., store an int readPercentage field) and using that field in the benchmark methods.

Copilot uses AI. Check for mistakes.
Comment on lines +147 to +150
// Preload messages to create realistic test conditions
long baseTime = System.currentTimeMillis() + 10000; // Future delivery
for (int i = 1; i <= initialMessages; i++) {
tracker.addMessage(i, i, baseTime + i * 1000);
Copy link

Copilot AI Apr 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The benchmark uses System.currentTimeMillis()-based delivery timestamps (e.g., preloadMessages() and performWriteOperation()), which can cause the tracker's Netty timer to start firing during the measurement window (and introduce noisy, non-deterministic background work). Consider using delivery times far beyond the trial duration (or a fixed base timestamp) so timeouts don't trigger while measuring tracker operations.

Suggested change
// Preload messages to create realistic test conditions
long baseTime = System.currentTimeMillis() + 10000; // Future delivery
for (int i = 1; i <= initialMessages; i++) {
tracker.addMessage(i, i, baseTime + i * 1000);
// Preload messages to create realistic test conditions while keeping
// delivery timestamps far beyond the benchmark trial duration so the
// tracker's timer does not start firing during measurement.
long baseTime = 4102444800000L; // 2100-01-01T00:00:00Z
for (int i = 1; i <= initialMessages; i++) {
tracker.addMessage(i, i, baseTime + i * 1000L);

Copilot uses AI. Check for mistakes.
Comment on lines +216 to +219
private boolean performWriteOperation() {
long deliverAt = System.currentTimeMillis() + ThreadLocalRandom.current().nextLong(5000, 30000);
return addMessageSequential(deliverAt, 1000);
}
Copy link

Copilot AI Apr 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

performWriteOperation() calls System.currentTimeMillis() on the hot path. Besides adding unrelated overhead, it also makes the scheduled timeout firing time depend on wall-clock timing/jitter. Consider deriving deliverAt from a precomputed base timestamp (set up in @Setup) to reduce noise and improve benchmark repeatability.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Member

@dao-jun dao-jun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Comment on lines +125 to +161
this(new DispatcherDelayedDeliveryContext(dispatcher), timer, tickTimeMillis, Clock.systemUTC(),
isDelayedDeliveryDeliverAtTimeStrict, bucketSnapshotStorage, minIndexCountPerBucket,
timeStepPerBucketSnapshotSegmentInMillis, maxIndexesPerBucketSnapshotSegment, maxNumBuckets);
}

public BucketDelayedDeliveryTracker(AbstractPersistentDispatcherMultipleConsumers dispatcher,
Timer timer, long tickTimeMillis, Clock clock,
boolean isDelayedDeliveryDeliverAtTimeStrict,
BucketSnapshotStorage bucketSnapshotStorage,
long minIndexCountPerBucket, long timeStepPerBucketSnapshotSegmentInMillis,
int maxIndexesPerBucketSnapshotSegment, int maxNumBuckets)
throws RecoverDelayedDeliveryTrackerException {
this(new DispatcherDelayedDeliveryContext(dispatcher), timer, tickTimeMillis, clock,
isDelayedDeliveryDeliverAtTimeStrict, bucketSnapshotStorage, minIndexCountPerBucket,
timeStepPerBucketSnapshotSegmentInMillis, maxIndexesPerBucketSnapshotSegment, maxNumBuckets);
}

public BucketDelayedDeliveryTracker(String dispatcherName, ManagedCursor cursor,
Timer timer, long tickTimeMillis,
boolean isDelayedDeliveryDeliverAtTimeStrict,
BucketSnapshotStorage bucketSnapshotStorage,
long minIndexCountPerBucket, long timeStepPerBucketSnapshotSegmentInMillis,
int maxIndexesPerBucketSnapshotSegment, int maxNumBuckets)
throws RecoverDelayedDeliveryTrackerException {
this(new NoopDelayedDeliveryContext(dispatcherName, cursor), timer, tickTimeMillis, Clock.systemUTC(),
isDelayedDeliveryDeliverAtTimeStrict, bucketSnapshotStorage, minIndexCountPerBucket,
timeStepPerBucketSnapshotSegmentInMillis, maxIndexesPerBucketSnapshotSegment, maxNumBuckets);
}

public BucketDelayedDeliveryTracker(DelayedDeliveryContext context,
Timer timer, long tickTimeMillis,
boolean isDelayedDeliveryDeliverAtTimeStrict,
BucketSnapshotStorage bucketSnapshotStorage,
long minIndexCountPerBucket, long timeStepPerBucketSnapshotSegmentInMillis,
int maxIndexesPerBucketSnapshotSegment, int maxNumBuckets)
throws RecoverDelayedDeliveryTrackerException {
this(context, timer, tickTimeMillis, Clock.systemUTC(), isDelayedDeliveryDeliverAtTimeStrict,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Too many constructors

@dao-jun dao-jun added ready-to-test type/refactor Code or documentation refactors. e.g. refactor code structure or methods to improve code readability release/4.2.1 release/4.1.4 release/4.0.10 labels Apr 3, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area/broker doc-not-needed Your PR changes do not impact docs ready-to-test release/4.0.10 release/4.1.4 release/4.2.1 type/refactor Code or documentation refactors. e.g. refactor code structure or methods to improve code readability

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants