[kv] Support elect standby replicas for primary key table#2829
Conversation
5dd4398 to
4087a19
Compare
7159e5c to
fbbfdb8
Compare
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
Adds “standby replica” support for primary-key (KV) tables by extending LeaderAndIsr to carry standby replicas and wiring that through ZooKeeper JSON, RPC protos, election logic, and the related unit/integration tests.
Changes:
- Extend
LeaderAndIsr/IsrStateto includestandbyReplicasand propagate through coordinator/replica workflows. - Add standby replicas to ZK JSON serde (v2) and to RPC messages (
NotifyLeaderAndIsr,AdjustIsr). - Update leader-election implementations/tests to elect & promote a standby for PK tables.
Reviewed changes
Copilot reviewed 35 out of 35 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| fluss-server/src/test/java/org/apache/fluss/server/zk/data/LeaderAndIsrJsonSerdeTest.java | Updates serde test vectors to v2 + adds backward-compat test for v1. |
| fluss-server/src/test/java/org/apache/fluss/server/zk/ZooKeeperClientTest.java | Adjusts ZK client leader/isr tests to include standby replicas. |
| fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java | Adds helper to fetch standby replica from LeaderAndIsr in tests. |
| fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServiceITCase.java | Propagates standby replicas through notify/transition test flows. |
| fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServerShutdownITCase.java | Adjusts shutdown/failover expectations for updated LeaderAndIsr. |
| fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThreadTest.java | Updates fetcher tests to pass standby replicas. |
| fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherManagerTest.java | Updates fetcher manager tests to pass standby replicas. |
| fluss-server/src/test/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherITCase.java | Updates IT case to pass standby replicas in notifications. |
| fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java | Updates replica test scaffolding to include standby replicas. |
| fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java | Updates replica tests to include standby replicas in LeaderAndIsr. |
| fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaManagerTest.java | Updates replica manager tests for new LeaderAndIsr shape. |
| fluss-server/src/test/java/org/apache/fluss/server/replica/KvRecoverFromRemoteLogITCase.java | Updates recovery IT case to include standby replicas in updates. |
| fluss-server/src/test/java/org/apache/fluss/server/replica/HighWatermarkPersistenceTest.java | Updates HW persistence test to include standby replicas. |
| fluss-server/src/test/java/org/apache/fluss/server/replica/AdjustIsrTest.java | Updates ISR adjustment tests + expected message formatting. |
| fluss-server/src/test/java/org/apache/fluss/server/replica/AdjustIsrManagerTest.java | Updates adjust-ISR manager tests for standby replicas. |
| fluss-server/src/test/java/org/apache/fluss/server/replica/AdjustIsrITCase.java | Updates ISR expand/shrink IT to keep standby replicas. |
| fluss-server/src/test/java/org/apache/fluss/server/metadata/ZkBasedMetadataProviderTest.java | Updates metadata provider tests to include standby replicas. |
| fluss-server/src/test/java/org/apache/fluss/server/log/remote/RemoteLogTestBase.java | Updates remote log test base to create LeaderAndIsr with standby. |
| fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachineTest.java | Adds PK-table init-election tests + ensures table info present in context. |
| fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachineTest.java | Updates replica state machine tests for standby replicas. |
| fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/ReplicaLeaderElectionTest.java | Adds extensive PK standby election/promotion test coverage. |
| fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java | Ensures standby replicas are included in adjusted LeaderAndIsr responses. |
| fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorHighAvailabilityITCase.java | Updates HA IT case to use new LeaderAndIsr constructor. |
| fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java | Avoids PK leader-election semantics for a rebalance test by using LOG table. |
| fluss-server/src/main/java/org/apache/fluss/server/zk/data/LeaderAndIsrJsonSerde.java | Bumps JSON version to 2 and serializes/deserializes standby replicas. |
| fluss-server/src/main/java/org/apache/fluss/server/zk/data/LeaderAndIsr.java | Adds standbyReplicas field and updates equality/hash/toString/newLeaderAndIsr. |
| fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java | Wires standby replicas through notify/adjust-isr request/response conversions. |
| fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java | Stores standby replicas in ISR state; adds standby flag handling on follower transition. |
| fluss-server/src/main/java/org/apache/fluss/server/replica/IsrState.java | Extends ISR state model to include standby replicas across committed/pending states. |
| fluss-server/src/main/java/org/apache/fluss/server/entity/NotifyLeaderAndIsrData.java | Exposes standby replicas/array for RPC serialization. |
| fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachine.java | Threads PK-table flag into election paths + adds init-election standby selection. |
| fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachine.java | Preserves standby replicas when creating new LeaderAndIsr with NO_LEADER. |
| fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaLeaderElection.java | Adds common election path + PK standby promotion/selection logic. |
| fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java | Ensures standby replicas are preserved when producing new LeaderAndIsr. |
| fluss-rpc/src/main/proto/FlussApi.proto | Adds standby_replicas to adjust-isr + notify leader/isr messages. |
Comments suppressed due to low confidence (1)
fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java:1
waitAndGetStandbycan throwIndexOutOfBoundsExceptionwhenstandbyReplicasis empty. Since the method name implies waiting for readiness, it should wait untilstandbyReplicasbecomes non-empty (or fail with a clear assertion/timeout), instead of unconditionally doingget(0).
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| boolean isNowStandby = (standbyReplica == localTabletServerId); | ||
| boolean wasLeader = isLeader(); | ||
| boolean wasStandby = this.isStandbyReplica; | ||
| if (isNowStandby) { | ||
| // Mark as standby immediately to ensure coordinator's state is consistent. | ||
| isStandbyReplica = true; | ||
| } else { | ||
| if (wasStandby || wasLeader) { | ||
| // standby -> follower or leader -> follower | ||
| if (wasStandby) { | ||
| isStandbyReplica = false; | ||
| } | ||
| } | ||
| // follower -> follower: do nothing. | ||
| } |
There was a problem hiding this comment.
big +1, the wasStandby and wasLeader flag are very confusing here and makes the code hard to understand.
There was a problem hiding this comment.
I'd prefer to keep isNowStandby, wasLeader, and wasStandby here. They will be useful for follow-up PRs, because the behavior differs depending on the specific transition — for example, leader→standby, leader→follower, and follower→standby each require different actions: some need to delete data, others need to download data. So we need to distinguish between these cases.
wuchong
left a comment
There was a problem hiding this comment.
@swuferhong I left some comments.
| boolean isNowStandby = (standbyReplica == localTabletServerId); | ||
| boolean wasLeader = isLeader(); | ||
| boolean wasStandby = this.isStandbyReplica; | ||
| if (isNowStandby) { | ||
| // Mark as standby immediately to ensure coordinator's state is consistent. | ||
| isStandbyReplica = true; | ||
| } else { | ||
| if (wasStandby || wasLeader) { | ||
| // standby -> follower or leader -> follower | ||
| if (wasStandby) { | ||
| isStandbyReplica = false; | ||
| } | ||
| } | ||
| // follower -> follower: do nothing. | ||
| } |
There was a problem hiding this comment.
big +1, the wasStandby and wasLeader flag are very confusing here and makes the code hard to understand.
| } | ||
|
|
||
| // it should be from leader to follower, we need to destroy the kv tablet | ||
| dropKv(); |
There was a problem hiding this comment.
dropKv() is called unconditionally in onBecomeNewFollower(), even when the replica is designated as a standby. IIUC, a standby replica should retain the local kv rather than dropped. Is this to be implemented in a follow-up PR, or missed in this PR?
There was a problem hiding this comment.
This logic will be implements in next pr: #2835
| localTabletServerId, | ||
| leaderEpoch, | ||
| isrToSend, | ||
| currentState.standbyReplicas(), |
There was a problem hiding this comment.
[P2] If a PK bucket currently has no standby and another follower later catches up, prepareIsrExpand() sends the enlarged ISR but reuses currentState.standbyReplicas() unchanged here. That persists the expanded ISR with an empty standby list, so these buckets never gain a standby on the normal replica-recovery path until some later leader election. If the standby replicas is updated, we should also send a notification to the replica to make it ready for standby.
There was a problem hiding this comment.
This is a bit complex and can be implemented in a separate PR. I'll add a TODO for now.
fbbfdb8 to
a7fd14b
Compare
a7fd14b to
bc33f2d
Compare
|
@wuchong comments addressed. |
|
@swuferhong I have merged this PR. However, to ensure backward compatibility, we should introduce a table configuration option to explicitly enable standby replicas for legacy tables. For newly created tables, standby replicas can be enabled by default. |
The linked issue: #3253 |
Purpose
Linked issue: close #2828
Brief change log
Tests
API and Format
Documentation