Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
292 changes: 292 additions & 0 deletions ADAPTIVE_THROTTLE_PR.md

Large diffs are not rendered by default.

31 changes: 31 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,37 @@ Learn more about Pulsar at https://pulsar.apache.org
- Transparent handling of partitioned topics
- Transparent batching of messages

## Experimental broker features

### Adaptive publish throttling (disabled by default)

The broker includes an opt-in adaptive publish-rate controller that dynamically
reduces producer publish rates when JVM heap usage or per-topic backlog size
approaches configurable watermarks. It is additive to existing static rate
limits and backlog quota enforcement and is safe to enable alongside them.

**Quick start** — add the following to `conf/broker.conf` (broker restart required
to enable; all tuning knobs can be changed dynamically at runtime):

```properties
# Enable the feature (requires restart)
adaptivePublisherThrottlingEnabled=true

# Recommended starting values — validate with observeOnly=true first
adaptivePublisherThrottlingObserveOnly=true # dry-run: log but do not throttle
adaptivePublisherThrottlingMemoryHighWatermarkPct=0.85
adaptivePublisherThrottlingMemoryLowWatermarkPct=0.70
adaptivePublisherThrottlingBacklogHighWatermarkPct=0.90
adaptivePublisherThrottlingBacklogLowWatermarkPct=0.75
adaptivePublisherThrottlingMinRateFactor=0.10 # floor: never below 10 % of natural rate
adaptivePublisherThrottlingMaxRateChangeFactor=0.25 # max 25 % change per 1 s cycle
```

Once the observe-only logs look correct, flip `adaptivePublisherThrottlingObserveOnly=false`
via the dynamic config admin API without restarting the broker. See
[`ADAPTIVE_THROTTLE_PR.md`](ADAPTIVE_THROTTLE_PR.md) for the full config reference,
metrics guide, and troubleshooting steps.

## Repositories

This repository is the main repository of Apache Pulsar. Pulsar PMC also maintains other repositories for
Expand Down
37 changes: 37 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -2159,3 +2159,40 @@ brokerInterceptors=

# Enable or disable the broker interceptor, which is only used for testing for now
disableBrokerInterceptors=true

### --- Adaptive Publish Throttling ---
# Dynamically reduces publish rates under memory or backlog pressure.
# Disabled by default. A broker restart is required to enable/disable.
# Use adaptivePublisherThrottlingObserveOnly=true to evaluate impact safely first.

# Master switch — enable adaptive publish throttle controller.
#adaptivePublisherThrottlingEnabled=false

# Dry-run mode: compute and log throttle decisions but apply no actual throttling.
# Can be toggled at runtime via dynamic config as an emergency circuit-breaker.
#adaptivePublisherThrottlingObserveOnly=false

# Controller evaluation interval in milliseconds.
#adaptivePublisherThrottlingIntervalMs=1000

# JVM heap usage fraction above which memory pressure starts (linear ramp to highWm).
#adaptivePublisherThrottlingMemoryHighWatermarkPct=0.85

# JVM heap usage fraction below which memory pressure is zero (hysteresis low watermark).
#adaptivePublisherThrottlingMemoryLowWatermarkPct=0.70

# Backlog/quota fraction above which backlog pressure starts (linear ramp to highWm).
# Only topics with a configured backlog quota are affected.
#adaptivePublisherThrottlingBacklogHighWatermarkPct=0.90

# Backlog/quota fraction below which backlog pressure is zero (hysteresis low watermark).
#adaptivePublisherThrottlingBacklogLowWatermarkPct=0.75

# Minimum effective rate as a fraction of natural rate (floor). Must be in (0.0, 1.0).
#adaptivePublisherThrottlingMinRateFactor=0.10

# Maximum rate change per cycle as a fraction of natural rate (prevents sudden bursts).
#adaptivePublisherThrottlingMaxRateChangeFactor=0.25

# Enable per-topic OpenTelemetry metrics (6 time series per topic — evaluate cardinality first).
#adaptivePublisherThrottlingPerTopicMetricsEnabled=false
Original file line number Diff line number Diff line change
Expand Up @@ -1130,6 +1130,106 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
)
private long brokerPublisherThrottlingMaxByteRate = 0;

// --- Adaptive publish throttling ---

@FieldContext(
category = CATEGORY_SERVER,
dynamic = false,
doc = "Enable adaptive publish rate throttling. When enabled, the broker dynamically reduces "
+ "publish rates for topics under memory or backlog pressure. This feature is additive to "
+ "static rate limits and backlog quota policies. A broker restart is required to enable or "
+ "disable. Use adaptivePublisherThrottlingObserveOnly=true to evaluate impact safely first."
)
private boolean adaptivePublisherThrottlingEnabled = false;

@FieldContext(
category = CATEGORY_SERVER,
dynamic = true,
doc = "When true, the adaptive throttle controller runs and emits metrics and logs, but never "
+ "applies any actual throttling. Use this mode to validate watermarks and observe which "
+ "topics would be throttled before committing to live throttling. Can be toggled at runtime "
+ "via dynamic config as an emergency rollback mechanism. Only effective when "
+ "adaptivePublisherThrottlingEnabled=true."
)
private boolean adaptivePublisherThrottlingObserveOnly = false;

@FieldContext(
category = CATEGORY_SERVER,
dynamic = true,
doc = "Interval in milliseconds between adaptive throttle controller evaluation cycles. "
+ "Only effective when adaptivePublisherThrottlingEnabled=true."
)
private int adaptivePublisherThrottlingIntervalMs = 1000;

@FieldContext(
category = CATEGORY_SERVER,
dynamic = true,
doc = "JVM heap usage fraction (0.0-1.0) above which adaptive throttling begins to apply "
+ "memory pressure. The pressure factor increases linearly from 0 at "
+ "adaptivePublisherThrottlingMemoryLowWatermarkPct to 1.0 at this threshold. "
+ "Only effective when adaptivePublisherThrottlingEnabled=true."
)
private double adaptivePublisherThrottlingMemoryHighWatermarkPct = 0.85;

@FieldContext(
category = CATEGORY_SERVER,
dynamic = true,
doc = "JVM heap usage fraction (0.0-1.0) below which memory-based adaptive throttling is fully "
+ "released (hysteresis low watermark). Must be less than "
+ "adaptivePublisherThrottlingMemoryHighWatermarkPct. "
+ "Only effective when adaptivePublisherThrottlingEnabled=true."
)
private double adaptivePublisherThrottlingMemoryLowWatermarkPct = 0.70;

@FieldContext(
category = CATEGORY_SERVER,
dynamic = true,
doc = "Backlog size as a fraction of the configured backlog quota limit (0.0-1.0) above which "
+ "adaptive throttling begins applying backlog pressure. Only topics with a configured backlog "
+ "quota are affected. Only effective when adaptivePublisherThrottlingEnabled=true."
)
private double adaptivePublisherThrottlingBacklogHighWatermarkPct = 0.90;

@FieldContext(
category = CATEGORY_SERVER,
dynamic = true,
doc = "Backlog size as a fraction of the configured backlog quota limit (0.0-1.0) below which "
+ "backlog-based adaptive throttling is fully released (hysteresis low watermark). Must be "
+ "less than adaptivePublisherThrottlingBacklogHighWatermarkPct. "
+ "Only effective when adaptivePublisherThrottlingEnabled=true."
)
private double adaptivePublisherThrottlingBacklogLowWatermarkPct = 0.75;

@FieldContext(
category = CATEGORY_SERVER,
dynamic = true,
doc = "Minimum effective publish rate as a fraction of the natural (unthrottled) rate. "
+ "When maximum pressure is reached, the effective rate will never drop below "
+ "naturalRate * minRateFactor. Value must be in (0.0, 1.0). "
+ "Only effective when adaptivePublisherThrottlingEnabled=true."
)
private double adaptivePublisherThrottlingMinRateFactor = 0.10;

@FieldContext(
category = CATEGORY_SERVER,
dynamic = true,
doc = "Maximum change in the effective publish rate per controller cycle, expressed as a fraction "
+ "of the natural rate. Limits how quickly the rate ramps up or down, preventing sudden "
+ "bursts after throttle release and preventing oscillation. "
+ "Only effective when adaptivePublisherThrottlingEnabled=true."
)
private double adaptivePublisherThrottlingMaxRateChangeFactor = 0.25;

@FieldContext(
category = CATEGORY_SERVER,
dynamic = true,
doc = "When true, emit per-topic OpenTelemetry metrics for adaptive throttle state (natural rate, "
+ "effective rate, pressure factors). This adds 6 time series per topic; evaluate cardinality "
+ "impact before enabling in large clusters. Only effective when "
+ "adaptivePublisherThrottlingEnabled=true."
)
private boolean adaptivePublisherThrottlingPerTopicMetricsEnabled = false;

@FieldContext(category = CATEGORY_SERVER, doc =
"The class name of the factory that creates DispatchRateLimiter implementations. Current options are "
+ "org.apache.pulsar.broker.service.persistent.DispatchRateLimiterFactoryAsyncTokenBucket "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,14 @@ public abstract class AbstractTopic implements Topic, TopicPolicyListener {
protected volatile PublishRateLimiter topicPublishRateLimiter;
protected volatile ResourceGroupPublishLimiter resourceGroupPublishLimiter;

/**
* Per-topic adaptive publish rate limiter, non-null only when
* {@code adaptivePublisherThrottlingEnabled=true}. The field is {@code final}
* after construction so it is safely published to all threads via normal
* Java memory model rules.
*/
private final AdaptivePublishRateLimiter adaptivePublishRateLimiter;

@Getter
protected boolean resourceGroupRateLimitingEnabled;

Expand Down Expand Up @@ -202,6 +210,21 @@ public AbstractTopic(String topic, BrokerService brokerService) {
}, producer -> {
producer.getCnx().getThrottleTracker().unmarkThrottled(ThrottleType.TopicPublishRate);
});

// Adaptive throttle limiter: created only when the feature is enabled.
// Uses ThrottleType.AdaptivePublishRate (reentrant/reference-counted) so that
// adaptive and static throttle activations are tracked independently on the connection.
if (config.isAdaptivePublisherThrottlingEnabled()) {
adaptivePublishRateLimiter = new AdaptivePublishRateLimiter(
brokerService.getPulsar().getMonotonicClock(),
producer -> producer.getCnx().getThrottleTracker()
.markThrottled(ThrottleType.AdaptivePublishRate),
producer -> producer.getCnx().getThrottleTracker()
.unmarkThrottled(ThrottleType.AdaptivePublishRate));
} else {
adaptivePublishRateLimiter = null;
}

updateActiveRateLimiters();

additionalSystemCursorNames = brokerService.pulsar().getConfiguration().getAdditionalSystemCursorNames();
Expand Down Expand Up @@ -952,9 +975,23 @@ private void updateActiveRateLimiters() {
if (isResourceGroupRateLimitingEnabled()) {
updatedRateLimiters.add(resourceGroupPublishLimiter);
}
// Adaptive limiter is always in the list when the feature is enabled.
// It is a no-op when inactive (active == false), so it has zero overhead
// when no pressure is detected.
if (adaptivePublishRateLimiter != null) {
updatedRateLimiters.add(adaptivePublishRateLimiter);
}
activeRateLimiters = updatedRateLimiters.stream().filter(Objects::nonNull).toList();
}

/**
* Returns the per-topic adaptive publish rate limiter, or {@code null} when
* adaptive throttling is disabled.
*/
public AdaptivePublishRateLimiter getAdaptivePublishRateLimiter() {
return adaptivePublishRateLimiter;
}

public void updateDispatchRateLimiter() {
}

Expand Down
Loading