Skip to content

logpuller: modify region request worker#5020

Merged
ti-chi-bot[bot] merged 2 commits into
pingcap:masterfrom
wk989898:puller-0511
May 12, 2026
Merged

logpuller: modify region request worker#5020
ti-chi-bot[bot] merged 2 commits into
pingcap:masterfrom
wk989898:puller-0511

Conversation

@wk989898
Copy link
Copy Markdown
Collaborator

@wk989898 wk989898 commented May 11, 2026

What problem does this PR solve?

Issue Number: ref #4238

What is changed and how it works?

Streamlined internal region event processing logic for improved code clarity.
Optimized queue initialization and worker creation with reduced lock contention in the log service infrastructure.

Check List

Tests

  • Unit test
  • Integration test
  • Manual test (add detailed scripts or steps below)
  • No code

Questions

Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?

Release note

Please refer to [Release Notes Language Style Guide](https://pingcap.github.io/tidb-dev-guide/contribute-to-tidb/release-notes-style-guide.html) to write a quality release note.

If you don't think this PR needs a release note then fill it with `None`.

Summary by CodeRabbit

  • Refactor
    • Improved dispatching of region events to downstream clients to avoid redundant computation and enhance reliability.
    • Optimized pending-request queue initialization and worker setup to reduce lock contention and improve throughput and predictability of log delivery.
    • Net effect: more consistent performance and lower latency under load.

Review Change Stack

Signed-off-by: wk989898 <nhsmwk@gmail.com>
@ti-chi-bot ti-chi-bot Bot added the release-note Denotes a PR that will be considered when it comes time to generate release notes. label May 11, 2026
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 11, 2026

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 193b6e5f-6d3d-4c12-b740-fa63ff781a9e

📥 Commits

Reviewing files that changed from the base of the PR and between 1358d62 and 31695d5.

📒 Files selected for processing (1)
  • logservice/logpuller/subscription_client.go
🚧 Files skipped from review as they are similar to previous changes (1)
  • logservice/logpuller/subscription_client.go

📝 Walkthrough

Walkthrough

Consolidates worker initialization by reading pending queue size once and preallocating workers, reuses a precomputed subscription ID when dispatching region events, and simplifies request fetching by calling requestCache.pop directly.

Changes

Region Request Handling Optimization

Layer / File(s) Summary
Worker Pool Initialization
logservice/logpuller/subscription_client.go
PendingRegionRequestQueueSize is read once from global config before getStore. Worker slice is preallocated, perWorkerQueueSize is computed and clamped (>=1) with a warning, and new regionRequestWorker instances are appended while holding the request-workers lock for the entire loop.
Event Dispatch Optimization
logservice/logpuller/region_request_worker.go
dispatchRegionChangeEvents passes the locally cached subscriptionID to pushRegionEventToDS instead of reconstructing it from event.RequestId.
Request Fetching Simplification
logservice/logpuller/region_request_worker.go
processRegionSendTask removes the fetchMoreReq() helper and calls s.requestCache.pop(ctx) directly in the send loop, keeping the same error-return behavior when pop fails.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

Possibly related PRs

  • pingcap/ticdc#4557: Modifies processRegionSendTask and request-send logic in region_request_worker.go.
  • pingcap/ticdc#4267: Related edits to region dispatch and subscription worker initialization paths.

Suggested labels

lgtm, size/M, release-note-none

Suggested reviewers

  • lidezhu
  • asddongmen
  • hongyunyan

Poem

🐰 A tiny tweak, a tidy vein,
Queue size read once, no extra strain,
Subscription IDs reused with cheer,
Pop requests directly, the path is clear,
Hoppity code, lighter and lean.

🚥 Pre-merge checks | ✅ 3 | ❌ 2

❌ Failed checks (2 inconclusive)

Check name Status Explanation Resolution
Title check ❓ Inconclusive The title 'logpuller: modify region request worker' is vague and generic, using the term 'modify' without specifying what aspect of the region request worker was changed or why. Replace with a more specific title that describes the actual changes, such as 'logpuller: optimize region request worker queue initialization and reduce lock contention' or 'logpuller: simplify region event processing and improve worker allocation efficiency'.
Description check ❓ Inconclusive The description provides a high-level summary of changes and references issue #4238, but leaves critical sections incomplete: unanswered questions about performance regression and compatibility, missing documentation update assessment, and no actual release note provided (only template placeholder). Complete the unanswered questions about potential performance regression or compatibility issues, specify if documentation updates are needed, and provide a concrete release note (or explicitly state 'None') following the Release Notes Language Style Guide.
✅ Passed checks (3 passed)
Check name Status Explanation
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Tip

💬 Introducing Slack Agent: The best way for teams to turn conversations into code.

Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.

  • Generate code and open pull requests
  • Plan features and break down work
  • Investigate incidents and troubleshoot customer tickets together
  • Automate recurring tasks and respond to alerts with triggers
  • Summarize progress and report instantly

Built for teams:

  • Shared memory across your entire org—no repeating context
  • Per-thread sandboxes to safely plan and execute work
  • Governance built-in—scoped access, auditability, and budget controls

One agent for your entire SDLC. Right inside Slack.

👉 Get started


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@ti-chi-bot ti-chi-bot Bot added the size/S Denotes a PR that changes 10-29 lines, ignoring generated files. label May 11, 2026
Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request refactors the region request worker and subscription client to simplify request handling and optimize store initialization. In region_request_worker.go, the code now directly accesses the request cache and uses a local variable for subscription IDs. In subscription_client.go, the initialization of requestedStore was modified to pre-allocate worker slices and consolidate locking. Feedback indicates a potential division-by-zero risk if worker counts are zero, a race condition caused by publishing the store before workers are initialized, and a suggestion to fetch configuration dynamically within the closure to support runtime changes.

Comment on lines 562 to 578
rs = &requestedStore{storeAddr: storeAddr}
rs.requestWorkers.s = make([]*regionRequestWorker, 0, s.config.RegionRequestWorkerPerStore)
s.stores.Store(storeAddr, rs)

config := config.GetGlobalServerConfig()
perWorkerQueueSize := config.Debug.Puller.PendingRegionRequestQueueSize / int(s.config.RegionRequestWorkerPerStore)
perWorkerQueueSize := pendingRegionRequestQueueSize / int(s.config.RegionRequestWorkerPerStore)
if perWorkerQueueSize <= 0 {
log.Warn("pending region request queue size is smaller than the number of workers, adjust per worker queue size to 1", zap.Int("pendingRegionRequestQueueSize", config.Debug.Puller.PendingRegionRequestQueueSize), zap.Uint("regionRequestWorkerPerStore", s.config.RegionRequestWorkerPerStore))
perWorkerQueueSize = 1
}

rs.requestWorkers.Lock()
for i := uint(0); i < s.config.RegionRequestWorkerPerStore; i++ {
requestWorker := newRegionRequestWorker(ctx, s, s.credential, eg, rs, perWorkerQueueSize)
rs.requestWorkers.Lock()
rs.requestWorkers.s = append(rs.requestWorkers.s, requestWorker)
rs.requestWorkers.Unlock()
}
rs.requestWorkers.Unlock()
return rs
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This initialization logic has several issues:

  1. Division by zero risk: If RegionRequestWorkerPerStore is 0, the calculation of perWorkerQueueSize will panic.
  2. Publication order: The store is published to s.stores before its workers are initialized, allowing concurrent goroutines (like updateMetrics) to see an empty worker list.
  3. Unnecessary locking: Holding the lock while initializing workers is unnecessary if the store is not yet published to the global map.

Suggested improvement: Fetch configuration locally, handle the zero worker count case, and publish the store only after it is fully ready.

		config := config.GetGlobalServerConfig()
		pendingRegionRequestQueueSize := config.Debug.Puller.PendingRegionRequestQueueSize
		workerCount := int(s.config.RegionRequestWorkerPerStore)
		if workerCount <= 0 {
			log.Warn("RegionRequestWorkerPerStore is 0, adjust to 1", zap.String("storeAddr", storeAddr))
			workerCount = 1
		}
		rs = &requestedStore{storeAddr: storeAddr}
		rs.requestWorkers.s = make([]*regionRequestWorker, 0, workerCount)

		perWorkerQueueSize := pendingRegionRequestQueueSize / workerCount
		if perWorkerQueueSize <= 0 {
			log.Warn("pending region request queue size is smaller than the number of workers, adjust per worker queue size to 1",
				zap.Int("pendingRegionRequestQueueSize", pendingRegionRequestQueueSize),
				zap.Int("workerCount", workerCount))
			perWorkerQueueSize = 1
		}

		for i := 0; i < workerCount; i++ {
			requestWorker := newRegionRequestWorker(ctx, s, s.credential, eg, rs, perWorkerQueueSize)
			rs.requestWorkers.s = append(rs.requestWorkers.s, requestWorker)
		}
		s.stores.Store(storeAddr, rs)
		return rs

Comment on lines +553 to +554
config := config.GetGlobalServerConfig()
pendingRegionRequestQueueSize := config.Debug.Puller.PendingRegionRequestQueueSize
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Capturing the global configuration here makes it static for all stores created during the lifetime of handleRegions. It is better to fetch the configuration inside the getStore closure so that new stores can pick up configuration changes made at runtime, which is particularly useful for debug settings.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (1)
logservice/logpuller/subscription_client.go (1)

568-568: ⚡ Quick win

Use captured variable for consistency.

The log statement references config.Debug.Puller.PendingRegionRequestQueueSize directly, but should use the pendingRegionRequestQueueSize variable captured at line 554 for consistency and to avoid potential discrepancies.

Suggested fix
-		log.Warn("pending region request queue size is smaller than the number of workers, adjust per worker queue size to 1", zap.Int("pendingRegionRequestQueueSize", config.Debug.Puller.PendingRegionRequestQueueSize), zap.Uint("regionRequestWorkerPerStore", s.config.RegionRequestWorkerPerStore))
+		log.Warn("pending region request queue size is smaller than the number of workers, adjust per worker queue size to 1", zap.Int("pendingRegionRequestQueueSize", pendingRegionRequestQueueSize), zap.Uint("regionRequestWorkerPerStore", s.config.RegionRequestWorkerPerStore))
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@logservice/logpuller/subscription_client.go` at line 568, The log call uses
the global config value instead of the captured local variable; update the
log.Warn invocation to use the local pendingRegionRequestQueueSize variable (the
one declared at/around line where pendingRegionRequestQueueSize is set) instead
of config.Debug.Puller.PendingRegionRequestQueueSize so the message reflects the
captured value consistently; keep other fields like
s.config.RegionRequestWorkerPerStore unchanged and ensure the zap.Int key still
represents the pending region queue size.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Nitpick comments:
In `@logservice/logpuller/subscription_client.go`:
- Line 568: The log call uses the global config value instead of the captured
local variable; update the log.Warn invocation to use the local
pendingRegionRequestQueueSize variable (the one declared at/around line where
pendingRegionRequestQueueSize is set) instead of
config.Debug.Puller.PendingRegionRequestQueueSize so the message reflects the
captured value consistently; keep other fields like
s.config.RegionRequestWorkerPerStore unchanged and ensure the zap.Int key still
represents the pending region queue size.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 6c1b4194-e5b4-4e4b-a2a2-7d574be89c80

📥 Commits

Reviewing files that changed from the base of the PR and between 810169e and 1358d62.

📒 Files selected for processing (2)
  • logservice/logpuller/region_request_worker.go
  • logservice/logpuller/subscription_client.go

@ti-chi-bot ti-chi-bot Bot added needs-1-more-lgtm Indicates a PR needs 1 more LGTM. approved labels May 11, 2026
// handleRegions receives regionInfo from regionTaskQueue and attach rpcCtx to them,
// then send them to corresponding requestedStore.
func (s *subscriptionClient) handleRegions(ctx context.Context, eg *errgroup.Group) error {
config := config.GetGlobalServerConfig()
Copy link
Copy Markdown
Collaborator

@asddongmen asddongmen May 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe cfg is a better var name than config, since config is same as the pkg name config.

Signed-off-by: wk989898 <nhsmwk@gmail.com>
@ti-chi-bot ti-chi-bot Bot added size/M Denotes a PR that changes 30-99 lines, ignoring generated files. and removed size/S Denotes a PR that changes 10-29 lines, ignoring generated files. labels May 12, 2026
@ti-chi-bot ti-chi-bot Bot added the lgtm label May 12, 2026
@ti-chi-bot
Copy link
Copy Markdown

ti-chi-bot Bot commented May 12, 2026

[APPROVALNOTIFIER] This PR is APPROVED

This pull-request has been approved by: asddongmen, lidezhu

The full list of commands accepted by this bot can be found here.

The pull request process is described here

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@ti-chi-bot ti-chi-bot Bot removed the needs-1-more-lgtm Indicates a PR needs 1 more LGTM. label May 12, 2026
@ti-chi-bot
Copy link
Copy Markdown

ti-chi-bot Bot commented May 12, 2026

[LGTM Timeline notifier]

Timeline:

  • 2026-05-11 10:45:56.97377024 +0000 UTC m=+89725.506549559: ☑️ agreed by lidezhu.
  • 2026-05-12 08:07:19.543872434 +0000 UTC m=+166608.076651753: ☑️ agreed by asddongmen.

@wk989898
Copy link
Copy Markdown
Collaborator Author

/retest

@ti-chi-bot ti-chi-bot Bot merged commit 68be621 into pingcap:master May 12, 2026
29 of 30 checks passed
@ti-chi-bot
Copy link
Copy Markdown

ti-chi-bot Bot commented May 12, 2026

@wk989898: The following tests failed, say /retest to rerun all failed tests or /retest-required to rerun all mandatory failed tests:

Test name Commit Details Required Rerun command
pull-cdc-mysql-integration-light 31695d5 link unknown /test pull-cdc-mysql-integration-light
pull-unit-test 31695d5 link unknown /test pull-unit-test

Full PR test history. Your PR dashboard.

Details

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

approved lgtm release-note Denotes a PR that will be considered when it comes time to generate release notes. size/M Denotes a PR that changes 30-99 lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants