-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[feat][broker] Add adaptive publish throttle controller (disabled by default) #25239
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
[feat][broker] Add adaptive publish throttle controller (disabled by default) #25239
Conversation
|
@hemanth-19 Please add the following content to your PR description and select a checkbox: |
…default) Introduces an opt-in, broker-level adaptive publish throttle that dynamically reduces producer publish rates when JVM heap usage or per-topic backlog size approaches configurable watermarks. Key design points ----------------- - AdaptivePublishRateLimiter: per-topic PublishRateLimiter that is a complete no-op (zero overhead) when inactive. Asymmetric EWMA (α_up=0.30, α_down=0.05) tracks the producer's peak natural rate. - AdaptivePublishThrottleController: single-threaded broker-level scheduler; bounded-step rate changes (≤ 25 % of natural rate per cycle) prevent oscillation; hysteresis (activate on pressure > 0, deactivate only when pressure == 0) prevents rapid toggling. - observeOnly mode: compute + log + emit metrics without applying any throttling; togglable via dynamic config as an emergency circuit-breaker (no restart required). - Controller never dies silently: every evaluation cycle is wrapped in try-catch-finally; failures are counted and logged at ERROR. - ThrottleType.AdaptivePublishRate: dedicated reentrant enum constant appended at the end of ThrottleType to preserve existing ordinals. - OpenTelemetry: 3 always-on broker metrics + 3 controller health metrics (last-eval timestamp, duration, failure count) + 6 optional per-topic metrics (disabled by default). Disabled by default: adaptivePublisherThrottlingEnabled=false. A broker restart is required to enable it. All tuning parameters (watermarks, rate factors, observeOnly flag) are dynamic. Tests added ----------- - AdaptivePublishRateLimiterTest (13 unit tests) - AdaptivePublishThrottleControllerTest (30+ unit tests) - AdaptiveThrottleEndToEndTest (9 integration tests) - ThrottleTypeEnumTest (4 ordinal-stability guard tests) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1bb6e14 to
35b7d6a
Compare
Local build notes (for reviewers — not a code issue)tl;dr — the broker module compiles cleanly and Checkstyle is green. The failures below are all environment constraints on the dev machine, not problems with this PR. What succeeded
A fix was needed mid-way: the initial parallel build ( One genuine bug caught by the local build: three casts in Why the build stopped at
|
|
Doc label: This PR adds an experimental, disabled-by-default broker feature. It includes:
When this feature graduates from experimental (i.e. Label suggestion: |
|
@hemanth-19 Thanks for the contribution! In Apache Pulsar, we follow the PIP process to introduce new features. Please check https://github.com/apache/pulsar/tree/master/pip#pulsar-improvement-proposal-pip for more details. |
| Runtime runtime = Runtime.getRuntime(); | ||
| long maxMemory = runtime.maxMemory(); | ||
| if (maxMemory == Long.MAX_VALUE) { | ||
| // Unbounded heap — cannot compute pressure. | ||
| return 0.0; | ||
| } | ||
| long usedMemory = maxMemory - runtime.freeMemory(); | ||
| double usageFraction = (double) usedMemory / maxMemory; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pulsar uses direct memory besides heap memory.
Here's one example of heap + direct memory usage calculation:
pulsar/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/LoadManagerShared.java
Lines 300 to 316 in df51972
| // Get the system resource usage for this broker. | |
| public static SystemResourceUsage getSystemResourceUsage(final BrokerHostUsage brokerHostUsage) { | |
| SystemResourceUsage systemResourceUsage = brokerHostUsage.getBrokerHostUsage(); | |
| // Override System memory usage and limit with JVM heap usage and limit | |
| double maxHeapMemoryInBytes = Runtime.getRuntime().maxMemory(); | |
| double memoryUsageInBytes = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory(); | |
| double memoryUsage = memoryUsageInBytes / MIBI; | |
| double memoryLimit = maxHeapMemoryInBytes / MIBI; | |
| systemResourceUsage.setMemory(new ResourceUsage(memoryUsage, memoryLimit)); | |
| // Collect JVM direct memory | |
| systemResourceUsage.setDirectMemory(new ResourceUsage((double) (getJvmDirectMemoryUsed() / MIBI), | |
| (double) (DirectMemoryUtils.jvmMaxDirectMemory() / MIBI))); | |
| return systemResourceUsage; | |
| } |
However, that example doesn't take it into account that Netty's allocator performs pooling and there would be more free direct memory available that the stats show in that example.
pulsar/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/AllocatorStatsGenerator.java
Line 35 in 019ae9f
| public class AllocatorStatsGenerator { |
@hemanth-19 Please setup "Personal CI" in your fork and run Pulsar CI in that environment. It would be advisable to reduce the AI generated noise in PR description and comments so that you spend effort on your side validating what gets generated by AI. Thank you! |
Summary
adaptivePublisherThrottlingEnabled=false.adaptivePublisherThrottlingObserveOnly=true, toggleable at runtime) computes and logs decisions but never applies throttling — safe for validating in production and usable as an emergency circuit-breaker.try-catch-finally; failures increment a dedicated OTel counter.ThrottleType.AdaptivePublishRateis appended as the last enum constant to preserve existing ordinals 0–6. A guard test (ThrottleTypeEnumTest) fails immediately if the enum is accidentally mutated.Diff navigation
Configuration (reviewers: verify defaults, dynamic flags, and conf entry comments)
ServiceConfiguration.java@FieldContextfields —adaptivePublisherThrottling*conf/broker.confREADME.mdbroker.confsnippetBroker wiring (reviewers: focus on concurrency, lifecycle, and enum ordinal safety)
ServerCnxThrottleTracker.javaThrottleType.AdaptivePublishRateconstant (appended last, reentrant); class-level Javadoc on ordinal stabilityAbstractTopic.javaAdaptivePublishRateLimiterfield;handlePublishThrottling()delegation; usesThrottleType.AdaptivePublishRateBrokerService.javastartAdaptivePublishThrottleController()lifecycle;forEachPersistentTopic()helper;close()teardownAdaptivePublishRateLimiter.java(new)volatile boolean activefast path, asymmetric EWMA,activate()/deactivate()AdaptivePublishThrottleController.java(new)observeOnlyguard,try-catch-finallysafetyOpenTelemetryAdaptiveThrottleStats.java(new)Tests (reviewers: check the observeOnly safety tests and the enum guard tests)
AdaptivePublishRateLimiterTest.java(new)observeOnlynever changes channel autoreadAdaptivePublishThrottleControllerTest.java(new)linearPressure(), bounded-stepcomputeTargetRate(), hysteresis,observeOnlynever callsactivate()AdaptiveThrottleEndToEndTest.java(new)observeOnlyIO-thread safetyThrottleTypeEnumTest.java(new)AdaptivePublishRatepresent and last, declared reentrantDesign FAQ
Q: Why introduce a new
ThrottleType.AdaptivePublishRateinstead of reusingTopicPublishRate?ServerCnxThrottleTrackeruses reference-counting viastates[ThrottleType.ordinal()].If the adaptive limiter shared
TopicPublishRatewith the static rate limiter, a singleunmarkThrottled()call from the adaptive side could decrement the count that the staticlimiter had incremented — leaving the connection unthrottled even though the static limit is
still exceeded. A dedicated constant keeps the two signals fully independent and eliminates an
entire class of ordering bugs.
Q: Why does the controller not coordinate across brokers? Each broker throttles independently.
Adaptive throttling reacts to local signals — JVM heap on this JVM, backlog on topics owned
by this broker. Cross-broker coordination would require a consensus protocol, introduce network
latency on the hot publish path, and create a single point of failure. Local-only decisions
are simpler, faster, and fail-safe: if a network partition occurs, each broker still protects
itself independently. Cluster-wide load imbalance (e.g. one hot broker) is better addressed
by Pulsar's topic-migration and load-balancer machinery than by throttle coordination.
Q: What happens if the controller thread crashes?
The evaluation loop is wrapped in
try-catch-finally. A caught exception:evaluationFailureCount(surfaced as OTel counterpulsar.broker.adaptive.throttle.controller.evaluation.failure.count).ERRORlevel so it appears in broker logs immediately.ScheduledExecutorService— the scheduler reschedules the next cycle unconditionally.If the failure is persistent,
last.evaluation.timestampstops advancing whileevaluation.failure.countclimbs — a clear alert signal. A full controller stall (only possible via an uncheckedError) requires a broker restart; the OTel staleness alert will fire within3 × intervalMs.Test plan
mvn test -pl pulsar-broker -am -Dtest="AdaptivePublishRateLimiterTest,AdaptivePublishThrottleControllerTest,AdaptiveThrottleEndToEndTest,ThrottleTypeEnumTest" -DfailIfNoTests=false --no-transfer-progresspassesmvn test -pl pulsar-broker -am -Dtest="PublishRateLimiterTest" -DfailIfNoTests=false --no-transfer-progress(regression: static rate limiter unaffected)adaptivePublisherThrottlingEnabled=false(default): no controller thread, noAdaptivePublishRateLimiterallocated, no OTel instruments registeredobserveOnly=true: logs showOBSERVE-ONLY would-activate, zeroACTIVATEDlines,active.topic.countmetric stays at 0🤖 Generated with Claude Code