Skip to content

[server] Throttle auto-partition drop to protect coordinator queue#3174

Open
swuferhong wants to merge 1 commit into
apache:mainfrom
swuferhong:remove-drop-part-jett
Open

[server] Throttle auto-partition drop to protect coordinator queue#3174
swuferhong wants to merge 1 commit into
apache:mainfrom
swuferhong:remove-drop-part-jett

Conversation

@swuferhong
Copy link
Copy Markdown
Contributor

@swuferhong swuferhong commented Apr 23, 2026

Purpose

Linked issue: close #3173

Two related changes on the auto-partition path:

  1. Fix: dropPartitions previously used createPartitionInstant, which included the random jitter that is only meant to spread partition creation load. As a result, expired partitions were not always cleaned up on time. Use the actual current time for drops so retention is honored promptly.

  2. Throttle: removing the jitter from drops means every auto-partition table rotates its day partition at the same instant (typically midnight). One drop fans out into numBuckets * replicationFactor bucket-deletion events on the coordinator event queue, so a simultaneous burst across many tables floods the queue and starves normal coordinator work (leader election, metadata updates).

Add an adaptive throttle that decides per round how aggressively to drop, jointly based on coordinator queue pressure and pending drop volume:

  • Queue-aware backpressure: skip drops when the coordinator event queue size crosses a configurable threshold, retry next round.
  • Per-round bucket-deletion budget shared across all auto-partition tables. Accounting in buckets (not partitions) gives uniform protection regardless of each table's bucket count.
  • Starvation guard: tables whose single-partition bucket count exceeds the remaining budget can still drop one partition per round, so very large tables cannot be permanently blocked.

Pre-creation of new partitions is unaffected. Leftover expired partitions are picked up in the next check interval, which preserves timely cleanup without bursting the coordinator queue.

Brief change log

Tests

API and Format

Documentation

@swuferhong swuferhong force-pushed the remove-drop-part-jett branch from 8369aee to af71f36 Compare May 20, 2026 11:59
@swuferhong swuferhong changed the title [server] Fix auto partition drop delayed by DAY creation jitter [server] Throttle auto-partition drop to protect coordinator queue May 20, 2026
@luoyuxia luoyuxia requested a review from Copilot May 21, 2026 08:00
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR addresses issue #3173 by ensuring auto-partition retention cleanup is evaluated against the actual current time (not the DAY creation jitter), and adds throttling/backpressure to prevent large bursts of partition-drop work from flooding the coordinator event queue during rotation boundaries.

Changes:

  • Fix retention cleanup timing by using the real “now” instant for partition drops (decoupled from DAY creation jitter).
  • Add adaptive drop throttling: a queue-size backpressure gate plus a per-round shared drop budget (with a starvation guard).
  • Expose coordinator event queue size for backpressure, add tests for the new behavior, and document the new configuration options.

Reviewed changes

Copilot reviewed 7 out of 7 changed files in this pull request and generated 6 comments.

Show a summary per file
File Description
website/docs/table-design/data-distribution/partitioning.md Documents cluster-level auto-partition drop throttling/backpressure behavior and new config knobs.
website/docs/maintenance/configuration.md Adds the new auto-partition drop throttling/backpressure configuration options to the config reference.
fluss-server/src/test/java/org/apache/fluss/server/coordinator/AutoPartitionManagerTest.java Adds tests covering “drop not delayed by jitter” and multiple throttle/budget scenarios.
fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/CoordinatorEventManager.java Adds a queue size accessor used for queue-aware backpressure.
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java Wires AutoPartitionManager with a late-bound coordinator queue size probe.
fluss-server/src/main/java/org/apache/fluss/server/coordinator/AutoPartitionManager.java Implements queue-aware backpressure gate, per-round drop budget, and uses real-time “now” for drops.
fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java Introduces new config options controlling drop backpressure threshold and per-round drop budget.
Comments suppressed due to low confidence (1)

fluss-server/src/main/java/org/apache/fluss/server/coordinator/AutoPartitionManager.java:606

  • In the PartitionNotExistException path, the code logs that the partition doesn't exist but then still proceeds as if it was deleted (iterator removal, incrementing dropped/bucketsConsumed, and logging "deleted"). This can incorrectly consume the per-round bucket budget and may stop further legitimate drops in the same round. Consider treating PartitionNotExistException as a no-op: remove it from the in-memory set/map but do not increment bucketsConsumed/dropped and do not log it as deleted.
                try {
                    metadataManager.dropPartition(
                            tablePath,
                            ResolvedPartitionSpec.fromPartitionName(partitionKeys, partitionName),
                            false);
                } catch (PartitionNotExistException e) {
                    LOG.info(
                            "Auto partitioning skip to delete partition {} for table [{}] as the partition is not exist.",
                            partitionName,
                            tablePath);
                }

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 608 to 615
// only remove when zk success, this reflects to the partitionsByTable
dropIterator.remove();
dropped++;
bucketsConsumed += numBucketsPerPartition;
LOG.info(
"Auto partitioning deleted partition {} for table [{}].",
partitionName,
tablePath);
boolean unlimitedBudget = dropMaxBucketsPerRound <= 0;
int dropBudgetThisRound = unlimitedBudget ? Integer.MAX_VALUE : dropMaxBucketsPerRound;

for (Long tableId : tableIds) {
Comment on lines +315 to +316
CoordinatorEventProcessor p = this.coordinatorEventProcessor;
return p == null ? 0 : p.getCoordinatorEventManager().queueSize();
Comment on lines +208 to +213
"When the coordinator event queue size reaches this threshold, "
+ "auto partition will skip the drop step for the current round and retry "
+ "in the next AUTO_PARTITION_CHECK_INTERVAL. This prevents overwhelming "
+ "the coordinator at midnight day rotation when one partition drop fans "
+ "out into thousands of bucket/replica deletion events. Set to a "
+ "non-positive value to disable backpressure (always drop).");
Comment on lines +226 to +231
+ "bucket count exceeds the budget, every table is allowed at "
+ "least one partition drop per round even if it temporarily "
+ "overshoots the budget. Remaining over-quota drops will be "
+ "processed in the next AUTO_PARTITION_CHECK_INTERVAL. Set to "
+ "a non-positive value to disable the cap (drop without "
+ "limit).");
Comment on lines +410 to +419
int budgetForCall = unlimitedBudget ? Integer.MAX_VALUE : dropBudgetThisRound;
int bucketsConsumed =
dropPartitions(
tablePath,
tableInfo.getPartitionKeys(),
now,
tableInfo.getTableConfig().getAutoPartitionStrategy(),
currentPartitions,
budgetForCall,
tableInfo.getNumBuckets());
Copy link
Copy Markdown
Contributor

@platinumhamburg platinumhamburg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that the designed mechanism is somewhat complex and has quite a few configuration variables. Could we simplify the implementation so that doAutoPartition() maintains a PendingDeletePartition set, and an asynchronous deletion thread polls this set, following these rules:

Only delete one partition at a time;
Skip whenever CoordinatorEventQueue is not empty;
The thread can poll the event queue every few seconds to look for a deletion opportunity.

}
}

if (!skipDrop && (unlimitedBudget || dropBudgetThisRound > 0)) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dropBudgetThisRound is always greater than 0.

// Use the actual current time for drop to ensure timely cleanup.
// The random delay is only meant for spreading partition creation load,
// not for delaying retention cleanup.
int budgetForCall = unlimitedBudget ? Integer.MAX_VALUE : dropBudgetThisRound;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dropBudgetThisRound appears to be a local variable. Why is it necessary to recompute a new variable based on it?

// (a=?,dt=20250506,b=?) (a=?,dt=20250507,b=?) will be retained.
Iterator<Map.Entry<String, Set<String>>> iterator =
currentPartitions.headMap(lastRetainPartitionTime).entrySet().iterator();
int dropped = 0;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this variable needed?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[server] Auto partition retention cleanup delayed by DAY partition creation jitter

3 participants