logpuller: modify region request worker#5020
Conversation
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (1)
🚧 Files skipped from review as they are similar to previous changes (1)
📝 WalkthroughWalkthroughConsolidates worker initialization by reading pending queue size once and preallocating workers, reuses a precomputed subscription ID when dispatching region events, and simplifies request fetching by calling requestCache.pop directly. ChangesRegion Request Handling Optimization
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 3 | ❌ 2❌ Failed checks (2 inconclusive)
✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Tip 💬 Introducing Slack Agent: The best way for teams to turn conversations into code.Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.
Built for teams:
One agent for your entire SDLC. Right inside Slack. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request refactors the region request worker and subscription client to simplify request handling and optimize store initialization. In region_request_worker.go, the code now directly accesses the request cache and uses a local variable for subscription IDs. In subscription_client.go, the initialization of requestedStore was modified to pre-allocate worker slices and consolidate locking. Feedback indicates a potential division-by-zero risk if worker counts are zero, a race condition caused by publishing the store before workers are initialized, and a suggestion to fetch configuration dynamically within the closure to support runtime changes.
| rs = &requestedStore{storeAddr: storeAddr} | ||
| rs.requestWorkers.s = make([]*regionRequestWorker, 0, s.config.RegionRequestWorkerPerStore) | ||
| s.stores.Store(storeAddr, rs) | ||
|
|
||
| config := config.GetGlobalServerConfig() | ||
| perWorkerQueueSize := config.Debug.Puller.PendingRegionRequestQueueSize / int(s.config.RegionRequestWorkerPerStore) | ||
| perWorkerQueueSize := pendingRegionRequestQueueSize / int(s.config.RegionRequestWorkerPerStore) | ||
| if perWorkerQueueSize <= 0 { | ||
| log.Warn("pending region request queue size is smaller than the number of workers, adjust per worker queue size to 1", zap.Int("pendingRegionRequestQueueSize", config.Debug.Puller.PendingRegionRequestQueueSize), zap.Uint("regionRequestWorkerPerStore", s.config.RegionRequestWorkerPerStore)) | ||
| perWorkerQueueSize = 1 | ||
| } | ||
|
|
||
| rs.requestWorkers.Lock() | ||
| for i := uint(0); i < s.config.RegionRequestWorkerPerStore; i++ { | ||
| requestWorker := newRegionRequestWorker(ctx, s, s.credential, eg, rs, perWorkerQueueSize) | ||
| rs.requestWorkers.Lock() | ||
| rs.requestWorkers.s = append(rs.requestWorkers.s, requestWorker) | ||
| rs.requestWorkers.Unlock() | ||
| } | ||
| rs.requestWorkers.Unlock() | ||
| return rs |
There was a problem hiding this comment.
This initialization logic has several issues:
- Division by zero risk: If
RegionRequestWorkerPerStoreis 0, the calculation ofperWorkerQueueSizewill panic. - Publication order: The store is published to
s.storesbefore its workers are initialized, allowing concurrent goroutines (likeupdateMetrics) to see an empty worker list. - Unnecessary locking: Holding the lock while initializing workers is unnecessary if the store is not yet published to the global map.
Suggested improvement: Fetch configuration locally, handle the zero worker count case, and publish the store only after it is fully ready.
config := config.GetGlobalServerConfig()
pendingRegionRequestQueueSize := config.Debug.Puller.PendingRegionRequestQueueSize
workerCount := int(s.config.RegionRequestWorkerPerStore)
if workerCount <= 0 {
log.Warn("RegionRequestWorkerPerStore is 0, adjust to 1", zap.String("storeAddr", storeAddr))
workerCount = 1
}
rs = &requestedStore{storeAddr: storeAddr}
rs.requestWorkers.s = make([]*regionRequestWorker, 0, workerCount)
perWorkerQueueSize := pendingRegionRequestQueueSize / workerCount
if perWorkerQueueSize <= 0 {
log.Warn("pending region request queue size is smaller than the number of workers, adjust per worker queue size to 1",
zap.Int("pendingRegionRequestQueueSize", pendingRegionRequestQueueSize),
zap.Int("workerCount", workerCount))
perWorkerQueueSize = 1
}
for i := 0; i < workerCount; i++ {
requestWorker := newRegionRequestWorker(ctx, s, s.credential, eg, rs, perWorkerQueueSize)
rs.requestWorkers.s = append(rs.requestWorkers.s, requestWorker)
}
s.stores.Store(storeAddr, rs)
return rs| config := config.GetGlobalServerConfig() | ||
| pendingRegionRequestQueueSize := config.Debug.Puller.PendingRegionRequestQueueSize |
There was a problem hiding this comment.
Capturing the global configuration here makes it static for all stores created during the lifetime of handleRegions. It is better to fetch the configuration inside the getStore closure so that new stores can pick up configuration changes made at runtime, which is particularly useful for debug settings.
There was a problem hiding this comment.
🧹 Nitpick comments (1)
logservice/logpuller/subscription_client.go (1)
568-568: ⚡ Quick winUse captured variable for consistency.
The log statement references
config.Debug.Puller.PendingRegionRequestQueueSizedirectly, but should use thependingRegionRequestQueueSizevariable captured at line 554 for consistency and to avoid potential discrepancies.Suggested fix
- log.Warn("pending region request queue size is smaller than the number of workers, adjust per worker queue size to 1", zap.Int("pendingRegionRequestQueueSize", config.Debug.Puller.PendingRegionRequestQueueSize), zap.Uint("regionRequestWorkerPerStore", s.config.RegionRequestWorkerPerStore)) + log.Warn("pending region request queue size is smaller than the number of workers, adjust per worker queue size to 1", zap.Int("pendingRegionRequestQueueSize", pendingRegionRequestQueueSize), zap.Uint("regionRequestWorkerPerStore", s.config.RegionRequestWorkerPerStore))🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@logservice/logpuller/subscription_client.go` at line 568, The log call uses the global config value instead of the captured local variable; update the log.Warn invocation to use the local pendingRegionRequestQueueSize variable (the one declared at/around line where pendingRegionRequestQueueSize is set) instead of config.Debug.Puller.PendingRegionRequestQueueSize so the message reflects the captured value consistently; keep other fields like s.config.RegionRequestWorkerPerStore unchanged and ensure the zap.Int key still represents the pending region queue size.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@logservice/logpuller/subscription_client.go`:
- Line 568: The log call uses the global config value instead of the captured
local variable; update the log.Warn invocation to use the local
pendingRegionRequestQueueSize variable (the one declared at/around line where
pendingRegionRequestQueueSize is set) instead of
config.Debug.Puller.PendingRegionRequestQueueSize so the message reflects the
captured value consistently; keep other fields like
s.config.RegionRequestWorkerPerStore unchanged and ensure the zap.Int key still
represents the pending region queue size.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 6c1b4194-e5b4-4e4b-a2a2-7d574be89c80
📒 Files selected for processing (2)
logservice/logpuller/region_request_worker.gologservice/logpuller/subscription_client.go
| // handleRegions receives regionInfo from regionTaskQueue and attach rpcCtx to them, | ||
| // then send them to corresponding requestedStore. | ||
| func (s *subscriptionClient) handleRegions(ctx context.Context, eg *errgroup.Group) error { | ||
| config := config.GetGlobalServerConfig() |
There was a problem hiding this comment.
Maybe cfg is a better var name than config, since config is same as the pkg name config.
|
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: asddongmen, lidezhu The full list of commands accepted by this bot can be found here. The pull request process is described here DetailsNeeds approval from an approver in each of these files:
Approvers can indicate their approval by writing |
[LGTM Timeline notifier]Timeline:
|
|
/retest |
|
@wk989898: The following tests failed, say
Full PR test history. Your PR dashboard. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
What problem does this PR solve?
Issue Number: ref #4238
What is changed and how it works?
Streamlined internal region event processing logic for improved code clarity.
Optimized queue initialization and worker creation with reduced lock contention in the log service infrastructure.
Check List
Tests
Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note
Summary by CodeRabbit