gc, server: Add WatchGCStates API#10498
Conversation
Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughImplements streaming APIs for watching GC state changes across keyspaces ( Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant GrpcServer
participant GCStateManager as GCStateManager<br/>(Leader)
participant Listener
Client->>GrpcServer: WatchGCStates(ctx, request)
alt Not Leader
GrpcServer-->>Client: Error(ErrNotLeader)
else Is Leader
GrpcServer->>GCStateManager: WatchGCStates(ctx, ...)
activate GCStateManager
GCStateManager->>Listener: Create & Register Listener
GCStateManager->>Client: Return GCStateWatcher
deactivate GCStateManager
loop On Each State Change
GCStateManager->>Listener: Compute Diff (unchanged/barrier/full)
alt Diff Indicates Update
Listener->>Listener: Send to Channel (buffered)
end
end
GrpcServer->>Listener: consumeGCStateWatch(ctx)
activate Listener
loop Receive Batches
Listener-->>GrpcServer: GCStateBatch (up to maxResponseBytes)
GrpcServer-->>Client: WatchGCStatesResponse (batched)
end
alt Context Cancelled
Listener->>GCStateManager: OnNodeBecomesFollower()
GCStateManager->>Listener: Close & Cancel All Listeners
Listener-->>Client: Close Stream
end
deactivate Listener
end
sequenceDiagram
participant Server as PD Server
participant GCStateManager
participant Listener as GC Listeners
rect rgba(100, 150, 200, 0.5)
Note over Server,Listener: Leadership Transition
end
Server->>GCStateManager: OnNodeBecomesLeader()
activate GCStateManager
Note over GCStateManager: Ready to accept watchers
GCStateManager->>Listener: Accept new watch registrations
deactivate GCStateManager
loop State Changes
GCStateManager->>Listener: Emit GCState updates
end
rect rgba(200, 100, 100, 0.5)
Note over Server,Listener: Leader to Follower Transition
end
Server->>GCStateManager: OnNodeBecomesFollower()
activate GCStateManager
GCStateManager->>Listener: Close all watchers & cancel contexts
deactivate GCStateManager
Listener-->>Client: Stream ends (codes.Unavailable)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@go.mod`:
- Around line 253-254: The root go.mod contains a replace directive for
github.com/pingcap/kvproto pointing to a fork (the replace line shown); update
client/go.mod and tools/go.mod to use the same kvproto version (or add the same
replace directive) so all three go.mod files are consistent, then run make tidy
to reconcile module versions and vendor files; when the upstream kvproto PR
(`#1440`) is merged, change the replace/fork references in all three go.mod files
to the new upstream version and run make tidy again.
In `@pkg/gc/gc_state_manager.go`:
- Around line 324-340: sendGCStateToListenerLocked currently performs a
potentially blocking send while holding m.mu which can cause contention; to fix,
either change the send to a non-blocking send (use select with default to drop
the update and call m.removeGCStateListenerLocked(...) only if necessary) or
refactor notification so the critical section only collects listener references
and their flags (using cloneGCStateForListener) under lock, then perform the
timed or blocking sends outside the lock and call m.removeGCStateListenerLocked
when a listener is detected slow; update references in
sendGCStateToListenerLocked, cloneGCStateForListener and calls to
m.removeGCStateListenerLocked accordingly.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 7799de62-d18a-40cc-8dd1-fc2d48288651
⛔ Files ignored due to path filters (1)
go.sumis excluded by!**/*.sum
📒 Files selected for processing (4)
go.modpkg/gc/gc_state_manager.goserver/gc_service.goserver/server.go
|
|
||
| replace github.com/pingcap/kvproto => github.com/MyonKeminta/kvproto v0.0.0-20260326071414-bdabcb33ab4c |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Verify kvproto versions across all go.mod files
echo "=== Root go.mod kvproto ==="
grep -E 'kvproto' go.mod
echo ""
echo "=== client/go.mod kvproto ==="
grep -E 'kvproto' client/go.mod
echo ""
echo "=== tools/go.mod kvproto ==="
grep -E 'kvproto' tools/go.modRepository: tikv/pd
Length of output: 825
Ensure consistent kvproto versions across all go.mod files.
The root go.mod includes a replace directive for github.com/pingcap/kvproto pointing to a fork, but client/go.mod and tools/go.mod require the standard version without a corresponding replace directive. This inconsistency could cause build or behavior issues depending on how these modules are used.
When the upstream kvproto PR (#1440) is merged, update all three go.mod files to the new version and run make tidy to ensure consistency.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@go.mod` around lines 253 - 254, The root go.mod contains a replace directive
for github.com/pingcap/kvproto pointing to a fork (the replace line shown);
update client/go.mod and tools/go.mod to use the same kvproto version (or add
the same replace directive) so all three go.mod files are consistent, then run
make tidy to reconcile module versions and vendor files; when the upstream
kvproto PR (`#1440`) is merged, change the replace/fork references in all three
go.mod files to the new upstream version and run make tidy again.
Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (2)
tests/server/gc/gc_test.go (1)
839-865: Make theWatchGCStatesregistration observable before resigning the leader.With
SkipLoadingInitial: true, this stream has no client-visible ack that the server has actually appended the watcher beforeResignLeader()runs. On a slower run, the firstRecv()can fail on startup timing instead of the close-on-transfer path this test is trying to pin down. Priming the stream with an initial batch or a guaranteed state change would make this assertion deterministic.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/server/gc/gc_test.go` around lines 839 - 865, The WatchGCStates watcher may not be registered before ResignLeader() because SkipLoadingInitial: true gives no client-visible ack; to make registration observable, change the WatchGCStates call (in this test surrounding gcStatesStream and gcStatesWatchCtx) to request an initial batch (set SkipLoadingInitial: false) or perform a deterministic state change (e.g., write a dummy GC state change or call the server path that emits a GC state update) and wait for that initial event via recvGCStateWithTimeout before calling ResignLeader(); ensure you reference the existing gcStatesStream, WatchGCStates request, and recvGCStateWithTimeout flow so the test waits for the watcher to be appended prior to leader resignation.pkg/gc/gc_state_manager_test.go (1)
2201-2286: Add the overlapping leader/follower callback case here.The new counter only matters when callbacks arrive out of order. This test exercises clean transitions, but not
OnNodeBecomesLeader()twice before the olderOnNodeBecomesFollower(). Without that sequence, the stale-callback bug in the watch lifecycle can still slip through.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/gc/gc_state_manager_test.go` around lines 2201 - 2286, Add a subcase to TestWatchGCStates that simulates overlapping leader/follower callbacks: after starting a watcher via manager.WatchGCStates and verifying it receives events, call manager.OnNodeBecomesLeader() a second time (without calling OnNodeBecomesFollower() in between) to simulate stale/overlapping callbacks, then ensure the watcher channel is still valid and receives the next expected GC state update; finally call manager.OnNodeBecomesFollower() and verify the channel closes and listener count decrements. Use existing helpers (waitGCStateListenerCount, assertNextWatchedGCStateEqualsCurrent, assertGCStateChannelClosed) and the same manager/ctx/watch flow so this targets the stale-callback path in OnNodeBecomesLeader/OnNodeBecomesFollower and WatchGCStates.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@pkg/gc/gc_state_manager.go`:
- Around line 121-125: OnNodeBecomesFollower currently closes all
gcStateListeners regardless of a newer leadership increment; change the logic in
both OnNodeBecomesFollower and the corresponding OnNodeBecomesLeader handling so
the listener teardown only runs when the leadership counter actually reaches
zero: in OnNodeBecomesLeader increment nodeLeadership (atomic.Int32) and set up
listeners as before; in OnNodeBecomesFollower decrement nodeLeadership using
nodeLeadership.Add(-1) and only close/cleanup gcStateListeners if the Add call
returns 0. Ensure you reference and update the methods named
OnNodeBecomesFollower and OnNodeBecomesLeader and the fields gcStateListeners
and nodeLeadership so listeners created by a later leader aren't torn down by an
earlier follower callback.
- Around line 149-152: runGCStateWatch currently only exits on ctx.Done(), which
leaks a goroutine when removeGCStateListenerLocked() or
cancelAllGCStateListenersLocked() close listener.ch; update runGCStateWatch to
monitor listener.ch as well (select on ctx.Done() and a receive from
listener.ch, using the comma-ok form to detect closure) and return when the
channel is closed so the goroutine terminates; keep channel closing logic in
removeGCStateListenerLocked()/cancelAllGCStateListenersLocked() and do not add
additional closes.
---
Nitpick comments:
In `@pkg/gc/gc_state_manager_test.go`:
- Around line 2201-2286: Add a subcase to TestWatchGCStates that simulates
overlapping leader/follower callbacks: after starting a watcher via
manager.WatchGCStates and verifying it receives events, call
manager.OnNodeBecomesLeader() a second time (without calling
OnNodeBecomesFollower() in between) to simulate stale/overlapping callbacks,
then ensure the watcher channel is still valid and receives the next expected GC
state update; finally call manager.OnNodeBecomesFollower() and verify the
channel closes and listener count decrements. Use existing helpers
(waitGCStateListenerCount, assertNextWatchedGCStateEqualsCurrent,
assertGCStateChannelClosed) and the same manager/ctx/watch flow so this targets
the stale-callback path in OnNodeBecomesLeader/OnNodeBecomesFollower and
WatchGCStates.
In `@tests/server/gc/gc_test.go`:
- Around line 839-865: The WatchGCStates watcher may not be registered before
ResignLeader() because SkipLoadingInitial: true gives no client-visible ack; to
make registration observable, change the WatchGCStates call (in this test
surrounding gcStatesStream and gcStatesWatchCtx) to request an initial batch
(set SkipLoadingInitial: false) or perform a deterministic state change (e.g.,
write a dummy GC state change or call the server path that emits a GC state
update) and wait for that initial event via recvGCStateWithTimeout before
calling ResignLeader(); ensure you reference the existing gcStatesStream,
WatchGCStates request, and recvGCStateWithTimeout flow so the test waits for the
watcher to be appended prior to leader resignation.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 834395d8-a28a-4181-8cf1-44901db9d0a9
📒 Files selected for processing (3)
pkg/gc/gc_state_manager.gopkg/gc/gc_state_manager_test.gotests/server/gc/gc_test.go
| gcStateListeners []gcStateListener | ||
| // Note that nodeLeadership is a counter instead of a bool. Theoretically, it's possible that an | ||
| // OnNodeBecomesFollower invocation of the previous lease is later than the OnNodeBecomesLeader call of the new | ||
| // lease during PD leader changes. Making this a counter helps in guaranteeing the eventual consistency. | ||
| nodeLeadership atomic.Int32 |
There was a problem hiding this comment.
Only tear down listeners when the leadership count reaches zero.
OnNodeBecomesFollower() closes every watch even when a newer OnNodeBecomesLeader() has already run. With the callback pattern in server/server.go, a leader -> leader -> follower(previous) ordering is realistic, and this code will drop current-lease watchers even though nodeIsLeader() is still true.
Suggested fix
func (m *GCStateManager) OnNodeBecomesFollower() {
m.mu.Lock()
defer m.mu.Unlock()
- m.nodeLeadership.Add(-1)
- m.cancelAllGCStateListenersLocked()
+ remaining := m.nodeLeadership.Add(-1)
+ if remaining <= 0 {
+ if remaining < 0 {
+ m.nodeLeadership.Store(0)
+ }
+ m.cancelAllGCStateListenersLocked()
+ }
}Also applies to: 180-187
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@pkg/gc/gc_state_manager.go` around lines 121 - 125, OnNodeBecomesFollower
currently closes all gcStateListeners regardless of a newer leadership
increment; change the logic in both OnNodeBecomesFollower and the corresponding
OnNodeBecomesLeader handling so the listener teardown only runs when the
leadership counter actually reaches zero: in OnNodeBecomesLeader increment
nodeLeadership (atomic.Int32) and set up listeners as before; in
OnNodeBecomesFollower decrement nodeLeadership using nodeLeadership.Add(-1) and
only close/cleanup gcStateListeners if the Add call returns 0. Ensure you
reference and update the methods named OnNodeBecomesFollower and
OnNodeBecomesLeader and the fields gcStateListeners and nodeLeadership so
listeners created by a later leader aren't torn down by an earlier follower
callback.
Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>
Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>
Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (3)
pkg/core/region.go (1)
256-261: Consider using the accessor method for consistency with surrounding code.The adjacent lines use protobuf getter methods (
GetApproximateKvSize(),GetApproximateColumnarKvSize(),GetReplicationStatus()), but this line directly accesses the field. While both are functionally equivalent here (sincehis non-nil after the type assertion), usingh.GetCpuUsage()would maintain stylistic consistency.Suggested change for consistency
if h, ok := heartbeat.(*pdpb.RegionHeartbeatRequest); ok { region.approximateKvSize = int64(h.GetApproximateKvSize() / units.MiB) region.approximateColumnarKvSize = int64(h.GetApproximateColumnarKvSize() / units.MiB) region.replicationStatus = h.GetReplicationStatus() - region.cpuUsage = h.CpuUsage + region.cpuUsage = h.GetCpuUsage() }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/core/region.go` around lines 256 - 261, The code block handling heartbeat type assertion uses direct field access h.CpuUsage to set region.cpuUsage; change this to use the protobuf accessor h.GetCpuUsage() for consistency with the other getters (h.GetApproximateKvSize(), h.GetApproximateColumnarKvSize(), h.GetReplicationStatus()) so update the assignment to read region.cpuUsage = h.GetCpuUsage().tests/server/gc/gc_test.go (2)
584-605: These batching tests never force a split.With only 32 small states/events, the whole snapshot can still fit in one response, so
maxBatchSize > 1only proves coalescing. A regression ingcStateWatchMaxResponseBytesenforcement would still pass here. Consider injecting a smaller cap in tests (or inflating the payload) and asserting that multiple responses are emitted.Also applies to: 614-635
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/server/gc/gc_test.go` around lines 584 - 605, The test currently never forces response splitting so it only verifies coalescing; to fix, make the WatchGCStates snapshot exceed gcStateWatchMaxResponseBytes by either temporarily lowering gcStateWatchMaxResponseBytes for the test or inflating per-keyspace payloads created by createGCWatchTestKeyspaces so multiple responses are emitted; update the test around createGCWatchTestKeyspaces / collectWatchGCStatesInitial / grpcPDClient.WatchGCStates to set a smaller gcStateWatchMaxResponseBytes (or add large metadata to each GC state) and then assert that collectWatchGCStatesInitial returns multiple batches (maxBatchSize > 1 and number of responses > 1) and still contains expectedKeyspaceIDs and constant.NullKeyspaceID.
48-67: Let the helper own its cleanup.If a
requirefails aftertests.NewTestClusterortestutil.MustNewGrpcClient, the caller never reaches itsdefers, so setup failures can leak the cluster/context/connection and cascade intogoleakfailures. Please registert.CleanupinsidesetupGCWatchTestClusterinstead of relying entirely on callers.Based on learnings "Applies to **/*.go : Avoid new global state; keep concurrency safe; close resources properly".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/server/gc/gc_test.go` around lines 48 - 67, The setupGCWatchTestCluster helper currently returns resources but doesn't register cleanup, so failures during setup can leak cluster/context/connection; inside setupGCWatchTestCluster add t.Cleanup that calls cancel(), closes the TestCluster (e.g., cluster.Close() or cluster.Stop()—use the cluster's shutdown method), and closes the gRPC client resources (grpcPDClient.Close() and conn.Close()) to ensure the cluster, context and connections are always cleaned up on test exit even if require.* fails during setup.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@server/gc_service.go`:
- Around line 435-483: The current response-size accounting only adds
proto.Size(event) (eventSize) to currentSize and misses the repeated-field
envelope overhead (tag + length prefix), so many small events can make the
encoded WatchGCSafePointV2Response exceed gcStateWatchMaxResponseBytes; fix by
including the repeated-field envelope bytes when computing the delta for each
event (e.g., compute envelopeOverhead := protowire.SizeTag(fieldNumber) +
protowire.SizeVarint(uint64(eventSize)) or use protowire.SizeBytes(fieldNumber,
eventBytes) and add that to eventSize before comparing/adding to currentSize),
update the calculation in the loop that builds currentEvents (referencing
currentSize, eventSize, gcStateWatchMaxResponseBytes, flush, and
WatchGCSafePointV2Response) and make the same change in the analogous
WatchGCStatesResponse loop so flushing decisions account for the full encoded
size.
In `@tools/go.mod`:
- Line 245: Remove the temporary replace directive pointing to
github.com/MyonKeminta/kvproto and restore the upstream module
github.com/pingcap/kvproto once PR `#1440` (the WatchGCStates proto changes) is
merged; if you must merge before that upstream change lands, add a follow-up
task to revert the replace and update go.mod/go.sum prior to any release, and
ensure CI passes with a clean go.mod/go.sum after switching back to
github.com/pingcap/kvproto.
---
Nitpick comments:
In `@pkg/core/region.go`:
- Around line 256-261: The code block handling heartbeat type assertion uses
direct field access h.CpuUsage to set region.cpuUsage; change this to use the
protobuf accessor h.GetCpuUsage() for consistency with the other getters
(h.GetApproximateKvSize(), h.GetApproximateColumnarKvSize(),
h.GetReplicationStatus()) so update the assignment to read region.cpuUsage =
h.GetCpuUsage().
In `@tests/server/gc/gc_test.go`:
- Around line 584-605: The test currently never forces response splitting so it
only verifies coalescing; to fix, make the WatchGCStates snapshot exceed
gcStateWatchMaxResponseBytes by either temporarily lowering
gcStateWatchMaxResponseBytes for the test or inflating per-keyspace payloads
created by createGCWatchTestKeyspaces so multiple responses are emitted; update
the test around createGCWatchTestKeyspaces / collectWatchGCStatesInitial /
grpcPDClient.WatchGCStates to set a smaller gcStateWatchMaxResponseBytes (or add
large metadata to each GC state) and then assert that
collectWatchGCStatesInitial returns multiple batches (maxBatchSize > 1 and
number of responses > 1) and still contains expectedKeyspaceIDs and
constant.NullKeyspaceID.
- Around line 48-67: The setupGCWatchTestCluster helper currently returns
resources but doesn't register cleanup, so failures during setup can leak
cluster/context/connection; inside setupGCWatchTestCluster add t.Cleanup that
calls cancel(), closes the TestCluster (e.g., cluster.Close() or
cluster.Stop()—use the cluster's shutdown method), and closes the gRPC client
resources (grpcPDClient.Close() and conn.Close()) to ensure the cluster, context
and connections are always cleaned up on test exit even if require.* fails
during setup.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 3a044d62-b6fd-402e-8bdc-afc56049d373
⛔ Files ignored due to path filters (3)
client/go.sumis excluded by!**/*.sumtests/integrations/go.sumis excluded by!**/*.sumtools/go.sumis excluded by!**/*.sum
📒 Files selected for processing (7)
client/go.modpkg/core/region.gopkg/gc/gc_state_manager_test.goserver/gc_service.gotests/integrations/go.modtests/server/gc/gc_test.gotools/go.mod
🚧 Files skipped from review as they are similar to previous changes (1)
- pkg/gc/gc_state_manager_test.go
| header := grpcutil.WrapHeader() | ||
| baseResponse := &pdpb.WatchGCSafePointV2Response{Header: header} | ||
| currentEvents := make([]*pdpb.SafePointEvent, 0, len(gcStates)) | ||
| currentSize := proto.Size(baseResponse) | ||
| flush := func() error { | ||
| if len(currentEvents) == 0 { | ||
| return nil | ||
| } | ||
| if err := stream.Send(&pdpb.WatchGCSafePointV2Response{ | ||
| Header: header, | ||
| Events: currentEvents, | ||
| }); err != nil { | ||
| return err | ||
| } | ||
| currentEvents = currentEvents[:0] | ||
| currentSize = proto.Size(baseResponse) | ||
| return nil | ||
| } | ||
|
|
||
| for _, gcState := range gcStates { | ||
| if gcState.KeyspaceID == constant.NullKeyspaceID { | ||
| continue | ||
| } | ||
| if previous, loaded := lastSafePoints[gcState.KeyspaceID]; loaded && previous == gcState.GCSafePoint { | ||
| continue | ||
| } | ||
| lastSafePoints[gcState.KeyspaceID] = gcState.GCSafePoint | ||
|
|
||
| event := &pdpb.SafePointEvent{ | ||
| KeyspaceId: gcState.KeyspaceID, | ||
| SafePoint: gcState.GCSafePoint, | ||
| Type: pdpb.EventType_PUT, | ||
| } | ||
| eventSize := proto.Size(event) | ||
| if currentSize+eventSize > gcStateWatchMaxResponseBytes && len(currentEvents) > 0 { | ||
| if err := flush(); err != nil { | ||
| return err | ||
| } | ||
| } | ||
| if currentSize+eventSize > gcStateWatchMaxResponseBytes { | ||
| log.Warn( | ||
| "single WatchGCSafePointV2 event exceeds response size limit", | ||
| zap.Uint32("keyspace-id", gcState.KeyspaceID), | ||
| zap.Int("response-size", currentSize+eventSize), | ||
| zap.Int("size-limit", gcStateWatchMaxResponseBytes), | ||
| ) | ||
| } | ||
| currentEvents = append(currentEvents, event) | ||
| currentSize += eventSize |
There was a problem hiding this comment.
The response-size accounting is too small.
Both batching loops add only proto.Size(event/state) to currentSize. For repeated embedded messages, that misses the enclosing field tag and length prefix, so the encoded WatchGCSafePointV2Response / WatchGCStatesResponse can exceed gcStateWatchMaxResponseBytes when many small items are packed together. Please size the full response (or include the repeated-field envelope) before deciding to flush.
Also applies to: 510-546
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@server/gc_service.go` around lines 435 - 483, The current response-size
accounting only adds proto.Size(event) (eventSize) to currentSize and misses the
repeated-field envelope overhead (tag + length prefix), so many small events can
make the encoded WatchGCSafePointV2Response exceed gcStateWatchMaxResponseBytes;
fix by including the repeated-field envelope bytes when computing the delta for
each event (e.g., compute envelopeOverhead := protowire.SizeTag(fieldNumber) +
protowire.SizeVarint(uint64(eventSize)) or use protowire.SizeBytes(fieldNumber,
eventBytes) and add that to eventSize before comparing/adding to currentSize),
update the calculation in the loop that builds currentEvents (referencing
currentSize, eventSize, gcStateWatchMaxResponseBytes, flush, and
WatchGCSafePointV2Response) and make the same change in the analogous
WatchGCStatesResponse loop so flushing decisions account for the full encoded
size.
| // TODO: fix those flasky tests introduced by the behavior change of `Eventually` and `EventuallyWithT` assertions. | ||
| replace github.com/stretchr/testify => github.com/stretchr/testify v1.10.0 | ||
|
|
||
| replace github.com/pingcap/kvproto => github.com/MyonKeminta/kvproto v0.0.0-20260326071414-bdabcb33ab4c |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
Has https://github.com/pingcap/kvproto/pull/1440 been merged? If yes, what is the first official github.com/pingcap/kvproto version/commit that contains the WatchGCStates-related proto changes?
💡 Result:
No, pingcap/kvproto#1440 has not been merged. It remains open as of the latest available data (opened Mar 26, 2026, and listed under open pull requests). Therefore, there is no official GitHub version or commit containing the WatchGCStates-related proto changes.
Citations:
- 1: https://github.com/pingcap/kvproto/pulls
- 2: https://github.com/pingcap/kvproto
- 3: https://github.com/pingcap/kvproto/blob/master/proto/kvrpcpb.proto
Replace github.com/MyonKeminta/kvproto with upstream before merging to master.
The personal-fork replace directive on line 245 introduces a supply-chain and reproducibility risk if merged to master. PR #1440 (which contains the required WatchGCStates proto changes) remains unmerged as of now. Once it lands, immediately update the replace to use the official github.com/pingcap/kvproto version. If merging before that happens, add an explicit removal or follow-up before any release cut. Ensure go.mod and go.sum remain clean for CI.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tools/go.mod` at line 245, Remove the temporary replace directive pointing to
github.com/MyonKeminta/kvproto and restore the upstream module
github.com/pingcap/kvproto once PR `#1440` (the WatchGCStates proto changes) is
merged; if you must merge before that upstream change lands, add a follow-up
task to revert the replace and update go.mod/go.sum prior to any release, and
ensure CI passes with a clean go.mod/go.sum after switching back to
github.com/pingcap/kvproto.
| defer m.mu.Unlock() | ||
|
|
||
| m.nodeLeadership.Add(-1) | ||
| m.cancelAllGCStateListenersLocked() |
There was a problem hiding this comment.
I don't think the leadership counter is buying us much yet. If lease A's follower callback runs after lease B has already called OnNodeBecomesLeader(), we still decrement here and then close every watcher unconditionally. That means a stale callback from the old lease can tear down watches that belong to the current lease. Should this only cancel listeners when the counter drops to 0?
There was a problem hiding this comment.
I don't think the leadership counter is buying us much yet.
The counter is for solving the problem:
- Becomes leader of lease 1 (marked as leader)
- Becomes follower
- Becomes leader of lease 3 (marked as leader)
- Receives the message of losing leadership of lease 1 (marked as follower but it's acutally leader of lease 3)
And the incorrect state may be kept for a long time.
Should this only cancel listeners when the counter drops to 0?
This may cause streams that were created during leadership of lease 1 still be kept alive in lease 3. If there were any update during lease 2 in which the other PD node is the leader, the stream may lose that event and the cliet will never know it. Considering the possibility of low GC run interval, a stale GC state may be hold for a long time, causing ineffective GC.
That means a stale callback from the old lease can tear down watches that belong to the current lease.
To solve the problem, the proper solution is to distinguish listeners that were created in different leases. But I think it's questionable that how likely the case may happen and how unacceptable the overhead of recreating these new stream may be.
| // In the gRPC path, leadership loss closes listener.ch first. The consume loop then returns, the | ||
| // handler's deferred cancel() fires, and this wait is released so the watch goroutine can exit and | ||
| // run the deferred deregistration. | ||
| <-ctx.Done() |
There was a problem hiding this comment.
One lifecycle gap here: closing listener.ch does not stop this goroutine. runGCStateWatch() only waits on ctx.Done(), while removeGCStateListenerLocked() and cancelAllGCStateListenersLocked() close the channel without canceling the context. The new gRPC path happens to defer cancel(), so it eventually unwinds there, but the GCStateManager API itself still leaks until the caller cancels. I think this loop should also exit when the listener is removed or the channel is closed.
There was a problem hiding this comment.
consumeGCStateWatch should exit when the channel is closed, and the deferred cancel should be able to ble called. Does it make sense?
I don't like to write it this way personnally, but just accepted codex's suggestion. It seems it's not robust enough to rely on the external cancellation.
Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>
…de gc state manager; distinguish not leader error Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (1)
server/gc_service.go (1)
433-486:⚠️ Potential issue | 🟠 MajorBatch size checks still undercount the encoded protobuf.
Both watch loops add only
proto.Size(event/state)tocurrentSize, but each repeated item also carries its own field tag and length prefix in the enclosing response. That means many small updates can still pushWatchGCSafePointV2ResponseorWatchGCStatesResponsepastgcStateWatchMaxResponseByteseven when the guard passes.Also applies to: 508-546
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@server/gc_service.go` around lines 433 - 486, The batch-size accounting currently only adds proto.Size(event) to currentSize and ignores the per-item protobuf overhead (the repeated field tag + varint length prefix), so large batches can still exceed gcStateWatchMaxResponseBytes; update the size calculation in consumeGCStateWatch's WatchGCSafePointV2 loop (symbols: lastSafePoints, WatchGCSafePointV2Response, SafePointEvent, currentSize, proto.Size, gcStateWatchMaxResponseBytes, flush) to include the encoded overhead for each repeated event (i.e. add the field tag byte(s) plus the varint length prefix for eventSize — compute length prefix via protobuf varint size of eventSize — when testing currentSize+eventTotalSize against the limit and when incrementing currentSize), and apply the same fix to the analogous WatchGCStatesResponse loop (the 508-546 region) so both watchers account for the repeated-field encoding overhead.
🧹 Nitpick comments (1)
tests/server/gc/gc_test.go (1)
763-764: Replace the fixed 2s sleep with a condition-based wait.This makes the suite slower and still leaves the test timing-sensitive under loaded CI. Polling until the short-lived barrier disappears, or until the expected watch event arrives, would make this deterministic.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/server/gc/gc_test.go` around lines 763 - 764, Replace the fixed time.Sleep(2 * time.Second) before calling grpcPDClient.AdvanceTxnSafePoint with a condition-based wait: poll (with a short ticker and an overall timeout) until the short-lived barrier/watch event the test is waiting for has disappeared or the expected watch event has been observed, then proceed to call AdvanceTxnSafePoint(ctx, &pdpb.AdvanceTxnSafePointRequest{...}); use the existing context ctx and ensure the poll loop returns a clear failure if the timeout elapses so the test fails deterministically rather than being timing-sensitive.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@pkg/gc/gc_state_manager.go`:
- Around line 277-288: The race happens because sendGCStateToListenerLocked can
enqueue updates after the caller canceled the watch; to fix, make
sendGCStateToListenerLocked check the listener's context before sending and bail
out if listener.watch.Done() is closed or listener.watch.Err() != nil.
Specifically, update sendGCStateToListenerLocked to test listener.watch.Done()
(or listener.watch.Err()) at the top and again before enqueueing, returning
early if canceled, so runGCStateWatch can continue to deregister safely without
observing queued updates.
---
Duplicate comments:
In `@server/gc_service.go`:
- Around line 433-486: The batch-size accounting currently only adds
proto.Size(event) to currentSize and ignores the per-item protobuf overhead (the
repeated field tag + varint length prefix), so large batches can still exceed
gcStateWatchMaxResponseBytes; update the size calculation in
consumeGCStateWatch's WatchGCSafePointV2 loop (symbols: lastSafePoints,
WatchGCSafePointV2Response, SafePointEvent, currentSize, proto.Size,
gcStateWatchMaxResponseBytes, flush) to include the encoded overhead for each
repeated event (i.e. add the field tag byte(s) plus the varint length prefix for
eventSize — compute length prefix via protobuf varint size of eventSize — when
testing currentSize+eventTotalSize against the limit and when incrementing
currentSize), and apply the same fix to the analogous WatchGCStatesResponse loop
(the 508-546 region) so both watchers account for the repeated-field encoding
overhead.
---
Nitpick comments:
In `@tests/server/gc/gc_test.go`:
- Around line 763-764: Replace the fixed time.Sleep(2 * time.Second) before
calling grpcPDClient.AdvanceTxnSafePoint with a condition-based wait: poll (with
a short ticker and an overall timeout) until the short-lived barrier/watch event
the test is waiting for has disappeared or the expected watch event has been
observed, then proceed to call AdvanceTxnSafePoint(ctx,
&pdpb.AdvanceTxnSafePointRequest{...}); use the existing context ctx and ensure
the poll loop returns a clear failure if the timeout elapses so the test fails
deterministically rather than being timing-sensitive.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 43776c6d-9037-42f5-a8c5-c763219e51d3
📒 Files selected for processing (4)
pkg/gc/gc_state_manager.gopkg/gc/gc_state_manager_test.goserver/gc_service.gotests/server/gc/gc_test.go
| m.gcStateListeners = append(m.gcStateListeners, listener) | ||
| m.mu.Unlock() | ||
|
|
||
| go m.runGCStateWatch(listener, skipLoadingInitial) |
There was a problem hiding this comment.
Will the GC state fallback if the state changed before watch?
Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>
| return watch, nil | ||
| } | ||
|
|
||
| func (m *GCStateManager) runGCStateWatch(listener gcStateListener, skipLoadingInitial bool) { |
There was a problem hiding this comment.
Do we need to log panic or use a wait group/context to manage the lifetime?
There was a problem hiding this comment.
Sorry I didn't get it. What problem does this solve?
There was a problem hiding this comment.
We need a log if a goroutine panics.
There was a problem hiding this comment.
Added recover and a log here.
| func (m *GCStateManager) runGCStateWatch(listener gcStateListener, skipLoadingInitial bool) { | ||
| defer m.deregisterGCStateListener(listener, listener.watch.Err()) | ||
|
|
||
| if !skipLoadingInitial { |
There was a problem hiding this comment.
Please also add a comment for skipLoadingInitial about when it will become true and false.
There was a problem hiding this comment.
Added to comments of WatchGCStates
Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>
Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>
Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>
Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>
Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>
Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>
| timer := time.NewTimer(gcStateListenerSendTimeout) | ||
| defer timer.Stop() | ||
|
|
||
| select { |
There was a problem hiding this comment.
If the request is canceled, we need to wait for a timeout?
There was a problem hiding this comment.
listener.watcher.ctx is missing in the select block. Added.
| m.sendGCStateToListenerLocked(listener, gcState, false) | ||
| }) | ||
| if err != nil { | ||
| log.Warn("failed to load initial GC states for GC state watcher", zap.Error(err)) |
There was a problem hiding this comment.
If the error happens, will it keep waiting until <-listener.watcher.Done()?
There was a problem hiding this comment.
It's a mistake to ignore the error. Changed to closing the watcher.
Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>
Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>
Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #10498 +/- ##
==========================================
- Coverage 78.96% 78.91% -0.06%
==========================================
Files 532 532
Lines 71883 72364 +481
==========================================
+ Hits 56766 57109 +343
- Misses 11093 11199 +106
- Partials 4024 4056 +32
Flags with carried forward coverage won't be shown. Click here to find out more. 🚀 New features to boost your workflow:
|
Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>
|
@MyonKeminta: The following tests failed, say
Full PR test history. Your PR dashboard. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
|
CI failed |
What problem does this PR solve?
Issue Number: Close #10497, Ref #8978
What is changed and how does it work?
Requires:
Check List
Tests
Code changes
Side effects
Related changes
pingcap/docs/pingcap/docs-cn:pingcap/tiup:Release note
Summary by CodeRabbit
New Features
Tests
Chores