[server] Throttle auto-partition drop to protect coordinator queue#3174
[server] Throttle auto-partition drop to protect coordinator queue#3174swuferhong wants to merge 1 commit into
Conversation
8369aee to
af71f36
Compare
There was a problem hiding this comment.
Pull request overview
This PR addresses issue #3173 by ensuring auto-partition retention cleanup is evaluated against the actual current time (not the DAY creation jitter), and adds throttling/backpressure to prevent large bursts of partition-drop work from flooding the coordinator event queue during rotation boundaries.
Changes:
- Fix retention cleanup timing by using the real “now” instant for partition drops (decoupled from DAY creation jitter).
- Add adaptive drop throttling: a queue-size backpressure gate plus a per-round shared drop budget (with a starvation guard).
- Expose coordinator event queue size for backpressure, add tests for the new behavior, and document the new configuration options.
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| website/docs/table-design/data-distribution/partitioning.md | Documents cluster-level auto-partition drop throttling/backpressure behavior and new config knobs. |
| website/docs/maintenance/configuration.md | Adds the new auto-partition drop throttling/backpressure configuration options to the config reference. |
| fluss-server/src/test/java/org/apache/fluss/server/coordinator/AutoPartitionManagerTest.java | Adds tests covering “drop not delayed by jitter” and multiple throttle/budget scenarios. |
| fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/CoordinatorEventManager.java | Adds a queue size accessor used for queue-aware backpressure. |
| fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java | Wires AutoPartitionManager with a late-bound coordinator queue size probe. |
| fluss-server/src/main/java/org/apache/fluss/server/coordinator/AutoPartitionManager.java | Implements queue-aware backpressure gate, per-round drop budget, and uses real-time “now” for drops. |
| fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java | Introduces new config options controlling drop backpressure threshold and per-round drop budget. |
Comments suppressed due to low confidence (1)
fluss-server/src/main/java/org/apache/fluss/server/coordinator/AutoPartitionManager.java:606
- In the PartitionNotExistException path, the code logs that the partition doesn't exist but then still proceeds as if it was deleted (iterator removal, incrementing dropped/bucketsConsumed, and logging "deleted"). This can incorrectly consume the per-round bucket budget and may stop further legitimate drops in the same round. Consider treating PartitionNotExistException as a no-op: remove it from the in-memory set/map but do not increment bucketsConsumed/dropped and do not log it as deleted.
try {
metadataManager.dropPartition(
tablePath,
ResolvedPartitionSpec.fromPartitionName(partitionKeys, partitionName),
false);
} catch (PartitionNotExistException e) {
LOG.info(
"Auto partitioning skip to delete partition {} for table [{}] as the partition is not exist.",
partitionName,
tablePath);
}
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // only remove when zk success, this reflects to the partitionsByTable | ||
| dropIterator.remove(); | ||
| dropped++; | ||
| bucketsConsumed += numBucketsPerPartition; | ||
| LOG.info( | ||
| "Auto partitioning deleted partition {} for table [{}].", | ||
| partitionName, | ||
| tablePath); |
| boolean unlimitedBudget = dropMaxBucketsPerRound <= 0; | ||
| int dropBudgetThisRound = unlimitedBudget ? Integer.MAX_VALUE : dropMaxBucketsPerRound; | ||
|
|
||
| for (Long tableId : tableIds) { |
| CoordinatorEventProcessor p = this.coordinatorEventProcessor; | ||
| return p == null ? 0 : p.getCoordinatorEventManager().queueSize(); |
| "When the coordinator event queue size reaches this threshold, " | ||
| + "auto partition will skip the drop step for the current round and retry " | ||
| + "in the next AUTO_PARTITION_CHECK_INTERVAL. This prevents overwhelming " | ||
| + "the coordinator at midnight day rotation when one partition drop fans " | ||
| + "out into thousands of bucket/replica deletion events. Set to a " | ||
| + "non-positive value to disable backpressure (always drop)."); |
| + "bucket count exceeds the budget, every table is allowed at " | ||
| + "least one partition drop per round even if it temporarily " | ||
| + "overshoots the budget. Remaining over-quota drops will be " | ||
| + "processed in the next AUTO_PARTITION_CHECK_INTERVAL. Set to " | ||
| + "a non-positive value to disable the cap (drop without " | ||
| + "limit)."); |
| int budgetForCall = unlimitedBudget ? Integer.MAX_VALUE : dropBudgetThisRound; | ||
| int bucketsConsumed = | ||
| dropPartitions( | ||
| tablePath, | ||
| tableInfo.getPartitionKeys(), | ||
| now, | ||
| tableInfo.getTableConfig().getAutoPartitionStrategy(), | ||
| currentPartitions, | ||
| budgetForCall, | ||
| tableInfo.getNumBuckets()); |
platinumhamburg
left a comment
There was a problem hiding this comment.
It seems that the designed mechanism is somewhat complex and has quite a few configuration variables. Could we simplify the implementation so that doAutoPartition() maintains a PendingDeletePartition set, and an asynchronous deletion thread polls this set, following these rules:
Only delete one partition at a time;
Skip whenever CoordinatorEventQueue is not empty;
The thread can poll the event queue every few seconds to look for a deletion opportunity.
| } | ||
| } | ||
|
|
||
| if (!skipDrop && (unlimitedBudget || dropBudgetThisRound > 0)) { |
There was a problem hiding this comment.
dropBudgetThisRound is always greater than 0.
| // Use the actual current time for drop to ensure timely cleanup. | ||
| // The random delay is only meant for spreading partition creation load, | ||
| // not for delaying retention cleanup. | ||
| int budgetForCall = unlimitedBudget ? Integer.MAX_VALUE : dropBudgetThisRound; |
There was a problem hiding this comment.
dropBudgetThisRound appears to be a local variable. Why is it necessary to recompute a new variable based on it?
| // (a=?,dt=20250506,b=?) (a=?,dt=20250507,b=?) will be retained. | ||
| Iterator<Map.Entry<String, Set<String>>> iterator = | ||
| currentPartitions.headMap(lastRetainPartitionTime).entrySet().iterator(); | ||
| int dropped = 0; |
There was a problem hiding this comment.
Why is this variable needed?
Purpose
Linked issue: close #3173
Two related changes on the auto-partition path:
Fix:
dropPartitionspreviously usedcreatePartitionInstant, which included the random jitter that is only meant to spread partition creation load. As a result, expired partitions were not always cleaned up on time. Use the actual current time for drops so retention is honored promptly.Throttle: removing the jitter from drops means every auto-partition table rotates its day partition at the same instant (typically midnight). One drop fans out into
numBuckets * replicationFactorbucket-deletion events on the coordinator event queue, so a simultaneous burst across many tables floods the queue and starves normal coordinator work (leader election, metadata updates).Add an adaptive throttle that decides per round how aggressively to drop, jointly based on coordinator queue pressure and pending drop volume:
Queue-aware backpressure: skip drops when the coordinator event queue size crosses a configurable threshold, retry next round.Pre-creation of new partitions is unaffected. Leftover expired partitions are picked up in the next check interval, which preserves timely cleanup without bursting the coordinator queue.
Brief change log
Tests
API and Format
Documentation