Skip to content

Conversation

@hemanth-19
Copy link

Experimental feature — disabled by default (adaptivePublisherThrottlingEnabled=false).
A broker restart is required to enable it; all tuning parameters are dynamic config.

Focused review request — the areas most likely to have subtle bugs are:

  1. ConcurrencyAdaptivePublishRateLimiter: the volatile boolean active fast path on IO threads (see handlePublishThrottling), and the single-writer contract on the controller scheduler thread.
  2. LifecycleBrokerService.startAdaptivePublishThrottleController() and the close() path; ThrottleType.AdaptivePublishRate ordinal stability and the new states[] slot.
  3. Hysteresis — deactivation requires both memory and backlog pressure to drop below their low watermarks simultaneously; verify this is correct for your use case.

Summary

  • Adds an adaptive publish throttle controller that dynamically reduces producer publish rates when JVM heap usage or per-topic backlog size approaches configurable watermarks.
  • Disabled by default. Zero code path changes when adaptivePublisherThrottlingEnabled=false.
  • Observe-only mode (adaptivePublisherThrottlingObserveOnly=true, toggleable at runtime) computes and logs decisions but never applies throttling — safe for validating in production and usable as an emergency circuit-breaker.
  • Controller never dies silently: every cycle is wrapped in try-catch-finally; failures increment a dedicated OTel counter.
  • ThrottleType.AdaptivePublishRate is appended as the last enum constant to preserve existing ordinals 0–6. A guard test (ThrottleTypeEnumTest) fails immediately if the enum is accidentally mutated.

Diff navigation

Configuration (reviewers: verify defaults, dynamic flags, and conf entry comments)

File What changed
ServiceConfiguration.java 10 new @FieldContext fields — adaptivePublisherThrottling*
conf/broker.conf 10 new commented-out entries with inline docs and a recommended quick-start snippet
README.md New "Experimental broker features" section with quick-start broker.conf snippet

Broker wiring (reviewers: focus on concurrency, lifecycle, and enum ordinal safety)

File What changed
ServerCnxThrottleTracker.java New ThrottleType.AdaptivePublishRate constant (appended last, reentrant); class-level Javadoc on ordinal stability
AbstractTopic.java New AdaptivePublishRateLimiter field; handlePublishThrottling() delegation; uses ThrottleType.AdaptivePublishRate
BrokerService.java startAdaptivePublishThrottleController() lifecycle; forEachPersistentTopic() helper; close() teardown
AdaptivePublishRateLimiter.java (new) Per-topic limiter: volatile boolean active fast path, asymmetric EWMA, activate()/deactivate()
AdaptivePublishThrottleController.java (new) Broker-level scheduler: pressure math, bounded-step rate changes, hysteresis, observeOnly guard, try-catch-finally safety
OpenTelemetryAdaptiveThrottleStats.java (new) 6 broker-level OTel metrics (3 always-on + 3 health); 6 per-topic metrics (opt-in)

Tests (reviewers: check the observeOnly safety tests and the enum guard tests)

File Key assertions
AdaptivePublishRateLimiterTest.java (new) No-op guarantee, activate/deactivate, asymmetric EWMA, observeOnly never changes channel autoread
AdaptivePublishThrottleControllerTest.java (new) linearPressure(), bounded-step computeTargetRate(), hysteresis, observeOnly never calls activate()
AdaptiveThrottleEndToEndTest.java (new) Full control-loop integration: pressure → throttle → drain → deactivate; observeOnly IO-thread safety
ThrottleTypeEnumTest.java (new) Minimum constant count, AdaptivePublishRate present and last, declared reentrant

Design FAQ

Q: Why introduce a new ThrottleType.AdaptivePublishRate instead of reusing TopicPublishRate?

ServerCnxThrottleTracker uses reference-counting via states[ThrottleType.ordinal()].
If the adaptive limiter shared TopicPublishRate with the static rate limiter, a single
unmarkThrottled() call from the adaptive side could decrement the count that the static
limiter had incremented — leaving the connection unthrottled even though the static limit is
still exceeded. A dedicated constant keeps the two signals fully independent and eliminates an
entire class of ordering bugs.

Q: Why does the controller not coordinate across brokers? Each broker throttles independently.

Adaptive throttling reacts to local signals — JVM heap on this JVM, backlog on topics owned
by this broker. Cross-broker coordination would require a consensus protocol, introduce network
latency on the hot publish path, and create a single point of failure. Local-only decisions
are simpler, faster, and fail-safe: if a network partition occurs, each broker still protects
itself independently. Cluster-wide load imbalance (e.g. one hot broker) is better addressed
by Pulsar's topic-migration and load-balancer machinery than by throttle coordination.

Q: What happens if the controller thread crashes?

The evaluation loop is wrapped in try-catch-finally. A caught exception:

  1. Increments evaluationFailureCount (surfaced as OTel counter pulsar.broker.adaptive.throttle.controller.evaluation.failure.count).
  2. Logs the full stack trace at ERROR level so it appears in broker logs immediately.
  3. Does not kill the ScheduledExecutorService — the scheduler reschedules the next cycle unconditionally.

If the failure is persistent, last.evaluation.timestamp stops advancing while evaluation.failure.count climbs — a clear alert signal. A full controller stall (only possible via an unchecked Error) requires a broker restart; the OTel staleness alert will fire within 3 × intervalMs.


Test plan

  • mvn test -pl pulsar-broker -am -Dtest="AdaptivePublishRateLimiterTest,AdaptivePublishThrottleControllerTest,AdaptiveThrottleEndToEndTest,ThrottleTypeEnumTest" -DfailIfNoTests=false --no-transfer-progress passes
  • mvn test -pl pulsar-broker -am -Dtest="PublishRateLimiterTest" -DfailIfNoTests=false --no-transfer-progress (regression: static rate limiter unaffected)
  • Broker starts with adaptivePublisherThrottlingEnabled=false (default): no controller thread, no AdaptivePublishRateLimiter allocated, no OTel instruments registered
  • observeOnly=true: logs show OBSERVE-ONLY would-activate, zero ACTIVATED lines, active.topic.count metric stays at 0
  • OTel scrape shows all 6 broker-level metrics when feature is enabled

🤖 Generated with Claude Code

@github-actions
Copy link

@hemanth-19 Please add the following content to your PR description and select a checkbox:

- [ ] `doc` <!-- Your PR contains doc changes -->
- [ ] `doc-required` <!-- Your PR changes impact docs and you will update later -->
- [ ] `doc-not-needed` <!-- Your PR changes do not impact docs -->
- [ ] `doc-complete` <!-- Docs have been already added -->

…default)

Introduces an opt-in, broker-level adaptive publish throttle that
dynamically reduces producer publish rates when JVM heap usage or
per-topic backlog size approaches configurable watermarks.

Key design points
-----------------
- AdaptivePublishRateLimiter: per-topic PublishRateLimiter that is a
  complete no-op (zero overhead) when inactive. Asymmetric EWMA
  (α_up=0.30, α_down=0.05) tracks the producer's peak natural rate.
- AdaptivePublishThrottleController: single-threaded broker-level
  scheduler; bounded-step rate changes (≤ 25 % of natural rate per
  cycle) prevent oscillation; hysteresis (activate on pressure > 0,
  deactivate only when pressure == 0) prevents rapid toggling.
- observeOnly mode: compute + log + emit metrics without applying
  any throttling; togglable via dynamic config as an emergency
  circuit-breaker (no restart required).
- Controller never dies silently: every evaluation cycle is wrapped
  in try-catch-finally; failures are counted and logged at ERROR.
- ThrottleType.AdaptivePublishRate: dedicated reentrant enum constant
  appended at the end of ThrottleType to preserve existing ordinals.
- OpenTelemetry: 3 always-on broker metrics + 3 controller health
  metrics (last-eval timestamp, duration, failure count) + 6 optional
  per-topic metrics (disabled by default).

Disabled by default: adaptivePublisherThrottlingEnabled=false.
A broker restart is required to enable it. All tuning parameters
(watermarks, rate factors, observeOnly flag) are dynamic.

Tests added
-----------
- AdaptivePublishRateLimiterTest  (13 unit tests)
- AdaptivePublishThrottleControllerTest  (30+ unit tests)
- AdaptiveThrottleEndToEndTest  (9 integration tests)
- ThrottleTypeEnumTest  (4 ordinal-stability guard tests)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@hemanth-19 hemanth-19 force-pushed the feat/adaptive-publish-throttle branch from 1bb6e14 to 35b7d6a Compare February 11, 2026 08:32
@hemanth-19
Copy link
Author

Local build notes (for reviewers — not a code issue)

tl;dr — the broker module compiles cleanly and Checkstyle is green. The failures below are all environment constraints on the dev machine, not problems with this PR.


What succeeded

Step Result
mvn install -Pcore-modules,-main -DskipTests -rf :pulsar-client-messagecrypto-bc BUILD SUCCESS — full core reactor installed
Checkstyle on pulsar-broker 0 violations
All 14 new/changed files compile

A fix was needed mid-way: the initial parallel build (-T 4) had a pre-existing reactor race where pulsar-client-messagecrypto-bc started compiling against bouncy-castle-bc-4.2.0-SNAPSHOT-pkg.jar before the package phase finished writing it. This is a known fragility in the Pulsar reactor and is unrelated to this PR. Rebuilding sequentially (no -T flag) resolved it immediately.

One genuine bug caught by the local build: three casts in AdaptiveThrottleEndToEndTest.java were written as (long)(expr) instead of (long) (expr) (missing space after typecast). Fixed and force-pushed to the branch before the Checkstyle run.


Why the build stopped at pulsar-offloader-distribution

The mvn install -rf :pulsar-broker run continued downstream into pulsar-offloader-distribution, which needs tiered-storage-file-system and tiered-storage-jcloud — modules that weren't in the local cache because they sit outside the core-modules profile scope. This failure is completely unrelated to the throttling change; it would happen on any clean checkout without a prior full install.


Why the targeted tests did not finish locally

Surefire's default configuration for pulsar-broker forks 4 JVMs simultaneously, each with -Xmx1300M -XX:+UseZGC. On this dev machine that totals ~5.2 GB of heap, which exceeds available memory. The JVM exits with Error occurred during initialization of VM before running a single test — no test failure, purely a resource limit.

To reproduce the test run on any machine with ≥ 2 GB free heap:

mvn test -pl pulsar-broker \
  -Dtest="AdaptivePublishRateLimiterTest,AdaptivePublishThrottleControllerTest,\
AdaptiveThrottleEndToEndTest,ThrottleTypeEnumTest" \
  -DfailIfNoTests=false \
  -DforkCount=1 -DreuseForks=true \
  -DargLine="-Xmx512m -XX:+UseSerialGC" \
  --no-transfer-progress

CI (GitHub Actions) runs in a clean environment with sufficient memory and will validate the full suite there.

@hemanth-19
Copy link
Author

Doc label: This PR adds an experimental, disabled-by-default broker feature. It includes:

  • Inline Javadoc on all new public classes and the new ThrottleType.AdaptivePublishRate constant
  • README.md updated with a quick-start snippet under a new "Experimental broker features" section
  • conf/broker.conf updated with all 10 new config keys, commented out with inline descriptions
  • ADAPTIVE_THROTTLE_PR.md at the repo root with the full config reference, metrics guide, and troubleshooting steps

When this feature graduates from experimental (i.e. adaptivePublisherThrottlingEnabled defaults to true), the Pulsar site docs (reference-configuration-broker.md and the "Manage topics" section) will need corresponding updates. Until then, the in-tree README and conf comments are the intended doc surface.

Label suggestion: doc-required (for the site update at graduation) or doc-not-needed if maintainers prefer to defer site docs until the feature is no longer experimental.

@lhotari
Copy link
Member

lhotari commented Feb 11, 2026

@hemanth-19 Thanks for the contribution! In Apache Pulsar, we follow the PIP process to introduce new features. Please check https://github.com/apache/pulsar/tree/master/pip#pulsar-improvement-proposal-pip for more details.

Comment on lines +273 to +280
Runtime runtime = Runtime.getRuntime();
long maxMemory = runtime.maxMemory();
if (maxMemory == Long.MAX_VALUE) {
// Unbounded heap — cannot compute pressure.
return 0.0;
}
long usedMemory = maxMemory - runtime.freeMemory();
double usageFraction = (double) usedMemory / maxMemory;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pulsar uses direct memory besides heap memory.
Here's one example of heap + direct memory usage calculation:

// Get the system resource usage for this broker.
public static SystemResourceUsage getSystemResourceUsage(final BrokerHostUsage brokerHostUsage) {
SystemResourceUsage systemResourceUsage = brokerHostUsage.getBrokerHostUsage();
// Override System memory usage and limit with JVM heap usage and limit
double maxHeapMemoryInBytes = Runtime.getRuntime().maxMemory();
double memoryUsageInBytes = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();
double memoryUsage = memoryUsageInBytes / MIBI;
double memoryLimit = maxHeapMemoryInBytes / MIBI;
systemResourceUsage.setMemory(new ResourceUsage(memoryUsage, memoryLimit));
// Collect JVM direct memory
systemResourceUsage.setDirectMemory(new ResourceUsage((double) (getJvmDirectMemoryUsed() / MIBI),
(double) (DirectMemoryUtils.jvmMaxDirectMemory() / MIBI)));
return systemResourceUsage;
}

However, that example doesn't take it into account that Netty's allocator performs pooling and there would be more free direct memory available that the stats show in that example.
contains an example with details Netty allocator stats. It would be possible to calculate the exact allocation of direct memory by subtracting the free memory in directArenas->chunkLists->freeBytes

@lhotari
Copy link
Member

lhotari commented Feb 11, 2026

CI (GitHub Actions) runs in a clean environment with sufficient memory and will validate the full suite there.

@hemanth-19 Please setup "Personal CI" in your fork and run Pulsar CI in that environment.

It would be advisable to reduce the AI generated noise in PR description and comments so that you spend effort on your side validating what gets generated by AI. Thank you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants