eventservice: optimize scanwindow#4950
Conversation
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
|
Skipping CI for Draft Pull Request. |
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
📝 WalkthroughWalkthroughReplaces trend-based scan-interval tuning with an adaptive scan-window controller: reports normalized usage, maintains fast/slow EMAs and a pressure score, applies tiered reductions and critical/emergency braking with cooldowns, gates conservative recovery, and exposes per-changefeed Prometheus metrics; broker metric lifecycle and tests updated. ChangesAdaptive Scan Window Controller
🎯 4 (Complex) | ⏱️ ~45 minutes Sequence Diagram(s)sequenceDiagram
participant UpdateMemoryUsage
participant AdaptiveScanWindowController
participant EventServiceMetrics
participant ChangefeedStatus
UpdateMemoryUsage->>AdaptiveScanWindowController: OnCongestionReport(normalizedUsage, releaseCount)
AdaptiveScanWindowController-->>AdaptiveScanWindowController: update EMAs, compute pressure score, decide interval
AdaptiveScanWindowController->>EventServiceMetrics: emit usage/EMA/pressure/adjust metrics
AdaptiveScanWindowController->>ChangefeedStatus: decision(scanInterval)
ChangefeedStatus->>ChangefeedStatus: persist new scanInterval if changed
Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 3 | ❌ 2❌ Failed checks (2 warnings)
✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Tip 💬 Introducing Slack Agent: The best way for teams to turn conversations into code.Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.
Built for teams:
One agent for your entire SDLC. Right inside Slack. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request replaces the existing scan interval adjustment logic with a new adaptiveScanWindowController that utilizes Exponential Moving Averages (EMAs) and a pressure score for more stable memory pressure management. The update includes comprehensive simulation tests and enhanced Prometheus metrics for monitoring controller decisions. Review feedback highlights a non-monotonic discontinuity in the emergency brake calculation, potential over-throttling caused by latching peak usage values, the presence of magic numbers, and the use of a redundant maxFloat64 helper that should be replaced by the built-in max function.
| func scanWindowEmergencyBrakeInterval(current time.Duration) time.Duration { | ||
| if current <= 6*defaultScanInterval { | ||
| return max(current/2, defaultScanInterval) | ||
| } | ||
| return max(current/4, minScanInterval) | ||
| } |
There was a problem hiding this comment.
The logic in scanWindowEmergencyBrakeInterval has two significant issues:
- Discontinuity: There is a sharp jump at the boundary of
6*defaultScanInterval(30s). For an input of 30s, it returns 15s (current/2), but for 30.1s, it returns ~7.5s (current/4). This non-monotonic behavior can cause unstable oscillations in the scan interval. - Unreachable Minimum: The function floors at
defaultScanInterval(5s) for anycurrent <= 30s. This makes theminScanInterval(1s) constant effectively unreachable during emergency pressure (98%+ usage) if the interval has already been reduced to a moderate level. If the goal is to allow the interval to drop to 1s under extreme congestion, the floor should beminScanIntervalin both branches.
Signed-off-by: dongmen <414110582@qq.com>
There was a problem hiding this comment.
🧹 Nitpick comments (4)
pkg/eventservice/scan_window.go (4)
425-549: 💤 Low valueConsider factoring the repeated
scanWindowDecision{...}literal.
OnCongestionReportreturns the samescanWindowDecisionshape from six branches, each repeatingusage,fastUsageEMA,slowUsageEMA,pressureScore. A small builder method on the controller would let the function body focus on policy and reduce the surface for accidental field drift if a new metric/field is added later.♻️ Sketch
func (c *adaptiveScanWindowController) makeDecisionLocked( newInterval, maxInterval time.Duration, reason scanWindowDecisionReason, usage memoryUsageStats, ) scanWindowDecision { return scanWindowDecision{ newInterval: newInterval, maxInterval: maxInterval, reason: reason, usage: usage, fastUsageEMA: c.fastUsageEMA, slowUsageEMA: c.slowUsageEMA, pressureScore: c.pressureScore, } }Each branch then becomes
return c.makeDecisionLocked(newInterval, maxInterval, reason, usage).🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@pkg/eventservice/scan_window.go` around lines 425 - 549, The OnCongestionReport function repeats identical scanWindowDecision literals across multiple return sites; add a helper method on adaptiveScanWindowController (e.g., makeDecisionLocked(newInterval time.Duration, maxInterval time.Duration, reason scanWindowDecisionReason, usage memoryUsageStats) scanWindowDecision) that constructs and returns the scanWindowDecision using c.fastUsageEMA, c.slowUsageEMA, c.pressureScore and the supplied usage, newInterval, maxInterval, reason; then replace each repeated literal in OnCongestionReport with calls to c.makeDecisionLocked(...) (keep current names: OnCongestionReport, scanWindowDecision, usage, fastUsageEMA, slowUsageEMA, pressureScore).
388-406: ⚡ Quick winRace on band state can double-count target-band crossings.
observeScanWindowTargetBandMetricsreadsstate.Load(), compares tocurrentState, thenstate.Store(...). BecausehandleCongestionControlcan be invoked concurrently from differentfromnodes for the same changefeed (each call iterateschangefeedMap.Range), two goroutines can both observe the samepreviousState, both incrementEventServiceScanWindowTargetBandCrossCount, and both store — over-counting transitions for the same actual crossing.Use
Swapto read-and-replace atomically so only one caller observes each prior state:🔒 Proposed diff
- previousState := scanWindowBandState(state.Load()) - if previousState != scanWindowBandUnknown && previousState != currentState { - metrics.EventServiceScanWindowTargetBandCrossCount.WithLabelValues(changefeed, metricType).Inc() - } - state.Store(int32(currentState)) + previousState := scanWindowBandState(state.Swap(int32(currentState))) + if previousState != scanWindowBandUnknown && previousState != currentState { + metrics.EventServiceScanWindowTargetBandCrossCount.WithLabelValues(changefeed, metricType).Inc() + }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@pkg/eventservice/scan_window.go` around lines 388 - 406, The observeScanWindowTargetBandMetrics function currently uses state.Load() and state.Store(), which allows concurrent callers (e.g., handleCongestionControl) to both see the same previous state and double-count transitions; replace the Load/Store pair with an atomic swap so the read-and-replace is atomic: call state.Swap(int32(currentState)) (convert the returned int32 to scanWindowBandState) to obtain the previousState, then, if previousState != scanWindowBandUnknown && previousState != currentState, increment EventServiceScanWindowTargetBandCrossCount; keep the existing gauge Set(1)/Set(0) behavior and only change how previousState is read/stored.
321-362: 💤 Low valueMinor:
deleteScanWindowMetricsalso clearsEventServiceAvailableMemoryQuotaGaugeVec.That metric is not part of the scan-window family (registered separately, set in
handleCongestionControl), but its lifecycle is naturally tied to the changefeed. Functionally fine — just be aware the helper name slightly under-promises what it deletes. If you keep this coupling, consider a short comment explaining why the available-memory-quota label is wiped here.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@pkg/eventservice/scan_window.go` around lines 321 - 362, deleteScanWindowMetrics currently deletes EventServiceAvailableMemoryQuotaGaugeVec even though that metric is not part of the scan-window family; either move that deletion to the metric's owner (e.g. where handleCongestionControl manages the metric) or keep it here but add a short explanatory comment. Update the function deleteScanWindowMetrics to either remove the line metrics.EventServiceAvailableMemoryQuotaGaugeVec.DeleteLabelValues(changefeed) and place it in the lifecycle code that sets/clears available memory quota, or add a one-line comment above that DeleteLabelValues call explaining why the available-memory-quota label is cleared here (tie to changefeed lifecycle), referencing deleteScanWindowMetrics and handleCongestionControl so future readers can find the rationale.
796-801: ⚡ Quick winDrop
maxFloat64in favor of Go's built-inmax.This file already uses the Go 1.21+ built-in
min/maxforfloat64elsewhere (e.g.,min(c.pressureScore+2, scanWindowPressureScoreCeiling)at line 673), somaxFloat64is inconsistent and unnecessary. Replacing all three call sites with the builtin removes one ad-hoc helper and matches the surrounding style.♻️ Proposed diff
-func maxFloat64(a float64, b float64) float64 { - if a > b { - return a - } - return b -}Then at the call sites (lines 679, 681, 687):
- c.pressureScore = maxFloat64(0, c.pressureScore-1.5) + c.pressureScore = max(0, c.pressureScore-1.5) ... - c.pressureScore = maxFloat64(0, c.pressureScore-0.5) + c.pressureScore = max(0, c.pressureScore-0.5) ... - c.pressureScore = maxFloat64(0, c.pressureScore-relief) + c.pressureScore = max(0, c.pressureScore-relief)🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@pkg/eventservice/scan_window.go` around lines 796 - 801, Remove the ad-hoc helper maxFloat64 and replace its call sites with the Go 1.21 built-in max: find all uses of maxFloat64(a, b) and change them to max(a, b), then delete the maxFloat64 function definition; no extra imports are needed—just remove the function maxFloat64 and update callers to use max directly.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@pkg/eventservice/scan_window.go`:
- Around line 425-549: The OnCongestionReport function repeats identical
scanWindowDecision literals across multiple return sites; add a helper method on
adaptiveScanWindowController (e.g., makeDecisionLocked(newInterval
time.Duration, maxInterval time.Duration, reason scanWindowDecisionReason, usage
memoryUsageStats) scanWindowDecision) that constructs and returns the
scanWindowDecision using c.fastUsageEMA, c.slowUsageEMA, c.pressureScore and the
supplied usage, newInterval, maxInterval, reason; then replace each repeated
literal in OnCongestionReport with calls to c.makeDecisionLocked(...) (keep
current names: OnCongestionReport, scanWindowDecision, usage, fastUsageEMA,
slowUsageEMA, pressureScore).
- Around line 388-406: The observeScanWindowTargetBandMetrics function currently
uses state.Load() and state.Store(), which allows concurrent callers (e.g.,
handleCongestionControl) to both see the same previous state and double-count
transitions; replace the Load/Store pair with an atomic swap so the
read-and-replace is atomic: call state.Swap(int32(currentState)) (convert the
returned int32 to scanWindowBandState) to obtain the previousState, then, if
previousState != scanWindowBandUnknown && previousState != currentState,
increment EventServiceScanWindowTargetBandCrossCount; keep the existing gauge
Set(1)/Set(0) behavior and only change how previousState is read/stored.
- Around line 321-362: deleteScanWindowMetrics currently deletes
EventServiceAvailableMemoryQuotaGaugeVec even though that metric is not part of
the scan-window family; either move that deletion to the metric's owner (e.g.
where handleCongestionControl manages the metric) or keep it here but add a
short explanatory comment. Update the function deleteScanWindowMetrics to either
remove the line
metrics.EventServiceAvailableMemoryQuotaGaugeVec.DeleteLabelValues(changefeed)
and place it in the lifecycle code that sets/clears available memory quota, or
add a one-line comment above that DeleteLabelValues call explaining why the
available-memory-quota label is cleared here (tie to changefeed lifecycle),
referencing deleteScanWindowMetrics and handleCongestionControl so future
readers can find the rationale.
- Around line 796-801: Remove the ad-hoc helper maxFloat64 and replace its call
sites with the Go 1.21 built-in max: find all uses of maxFloat64(a, b) and
change them to max(a, b), then delete the maxFloat64 function definition; no
extra imports are needed—just remove the function maxFloat64 and update callers
to use max directly.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 4190f76c-a953-4e4d-887a-59bfaeef4a9c
📒 Files selected for processing (6)
pkg/eventservice/dispatcher_stat.gopkg/eventservice/event_broker.gopkg/eventservice/event_broker_test.gopkg/eventservice/scan_window.gopkg/eventservice/scan_window_test.gopkg/metrics/event_service.go
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
|
/test all |
|
[FORMAT CHECKER NOTIFICATION] Notice: To remove the 📖 For more info, you can check the "Contribute Code" section in the development guide. |
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
pkg/eventservice/scan_window_test.go (1)
179-181: ⚡ Quick winReplace the hardcoded
30with a duration-derived boundLine 179 bakes in a window-size assumption. Deriving the loop count from
memoryUsageWindowDurationmakes this test resilient to future constant tuning.Suggested change
- for i := 0; i <= 30; i++ { + for i := 0; i <= int(memoryUsageWindowDuration/time.Second); i++ { status.updateMemoryUsage(start.Add(time.Duration(i)*time.Second), 1, 0) }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@pkg/eventservice/scan_window_test.go` around lines 179 - 181, The test hardcodes 30 iterations when calling status.updateMemoryUsage, which couples it to a specific window size; change the loop bound to derive from memoryUsageWindowDuration (e.g., compute n := int(memoryUsageWindowDuration / time.Second) and use i := 0; i <= n; i++) so the test scales with the actual memoryUsageWindowDuration constant; update the loop surrounding status.updateMemoryUsage to use that computed n instead of the literal 30.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@pkg/eventservice/scan_window_test.go`:
- Around line 74-75: Replace hardcoded changefeed IDs created by
newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), ...) with a
unique per-test ID using t.Name() (e.g.,
newChangefeedStatus(common.NewChangefeedID4Test("default", t.Name()), ...)) for
all occurrences (including the instances around lines 89-90, 134-135, 157-158,
166-167, 175-176, 189-190) so updateMemoryUsage emits metrics keyed to a
test-unique label; also make the tests deterministic by using testify/require
assertions where applicable instead of non-deterministic checks.
---
Nitpick comments:
In `@pkg/eventservice/scan_window_test.go`:
- Around line 179-181: The test hardcodes 30 iterations when calling
status.updateMemoryUsage, which couples it to a specific window size; change the
loop bound to derive from memoryUsageWindowDuration (e.g., compute n :=
int(memoryUsageWindowDuration / time.Second) and use i := 0; i <= n; i++) so the
test scales with the actual memoryUsageWindowDuration constant; update the loop
surrounding status.updateMemoryUsage to use that computed n instead of the
literal 30.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: de3ec83f-f0f9-438c-87dc-c06631609797
📒 Files selected for processing (2)
pkg/eventservice/scan_window.gopkg/eventservice/scan_window_test.go
🚧 Files skipped from review as they are similar to previous changes (1)
- pkg/eventservice/scan_window.go
| status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 10*time.Minute) | ||
|
|
There was a problem hiding this comment.
Use unique changefeed IDs in parallel tests to avoid shared metric state
These tests run with t.Parallel() and currently reuse "default","test". Since updateMemoryUsage emits global metrics keyed by changefeed label, this can create hidden cross-test coupling/flakiness. Prefer t.Name() (as done in metric tests) for isolation.
Suggested change
- status := newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), 10*time.Minute)
+ status := newChangefeedStatus(common.NewChangefeedID4Test("default", t.Name()), 10*time.Minute)As per coding guidelines, "Unit tests should use *_test.go file naming convention and favor deterministic tests using testify/require."
Also applies to: 89-90, 134-135, 157-158, 166-167, 175-176, 189-190
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@pkg/eventservice/scan_window_test.go` around lines 74 - 75, Replace hardcoded
changefeed IDs created by
newChangefeedStatus(common.NewChangefeedID4Test("default", "test"), ...) with a
unique per-test ID using t.Name() (e.g.,
newChangefeedStatus(common.NewChangefeedID4Test("default", t.Name()), ...)) for
all occurrences (including the instances around lines 89-90, 134-135, 157-158,
166-167, 175-176, 189-190) so updateMemoryUsage emits metrics keyed to a
test-unique label; also make the tests deterministic by using testify/require
assertions where applicable instead of non-deterministic checks.
What problem does this PR solve?
Issue Number: close #xxx
The old scan window controller overreacted to memory feedback. A release pulse could reset the window, then later reports would shrink it again. Because it is a per-changefeed commit-ts span cap rather than a timer, this created sawtooth batching. Stale dispatchers could also pin the base ts and delay tables blocked by pending DDL.
What is changed and how it works?
This PR replaces the old policy with an adaptive controller while keeping scans event-driven. It combines a sliding usage window, EMAs, a pressure score, and cooldowns. High or critical pressure reduces the window in bounded steps. Low pressure recovers gradually and can leave the default floor faster. Release signals now relieve pressure instead of resetting the interval. The broker also skips stale dispatchers in
minSentTsrefresh and allows local advance for pending DDL when the global window is pinned.This smooths scan progress, reduces reset storms, and preserves DDL forward progress.
Before

After

Check List
Tests
Questions
Will it cause performance regression or break compatibility?
No compatibility break is expected. This changes only scan window control behavior and should stabilize throughput and memory pressure.
Do you need to update user documentation, design documentation or monitoring documentation?
Release note
Summary by CodeRabbit
New Features
Improvements
Bug Fixes
Tests