fix(concurrent-cdk): enforce runtime cap on concurrent partition generators to prevent deadlock#1015
Conversation
…rators to prevent thread pool starvation deadlock When multiple stream completions arrive back-to-back in the queue, both on_partition_generation_completed and on_partition_complete_sentinel each call start_next_partition_generator independently, causing the number of concurrent generator tasks to exceed the initial_number_partitions_to_generate bound enforced only at construction time. When all worker slots are occupied by sleeping generate_partitions tasks no partition readers can run, leaving the main thread blocked on queue.get() indefinitely. The fix adds max_concurrent_partition_generators (Optional[int]) to ConcurrentReadProcessor. When set, start_next_partition_generator returns None immediately if the cap is already reached. Recovery is guaranteed: on_partition_generation_completed decrements _streams_currently_generating_partitions before calling start_next_partition_generator, so the guard always passes there. The default of None preserves existing behaviour for callers that do not use ConcurrentSource (including block_simultaneous_read tests). ConcurrentSource passes initial_number_partitions_to_generate explicitly, tying the runtime cap to the same value as the construction-time assertion. Root cause introduced in PR #870 (feat: Add block_simultaneous_read), which added a second call site to start_next_partition_generator inside on_partition_complete_sentinel without a corresponding runtime bound. Observed in production: source-google-ads 4.2.5, job 82903962, all workers blocked in partition_enqueuer.py:59 time.sleep(), main thread stuck on concurrent_source.py queue.get(). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. 💡 Show Tips and TricksTesting This CDK VersionYou can test this version of the CDK using the following: # Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@tolik0/concurrent-source/fix-concurrent-generator-starvation-deadlock#egg=airbyte-python-cdk[dev]' --help
# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch tolik0/concurrent-source/fix-concurrent-generator-starvation-deadlockPR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR:
|
There was a problem hiding this comment.
Pull request overview
This PR mitigates a concurrency deadlock in the CDK’s concurrent reading pipeline by enforcing a runtime cap on how many partition-generator tasks can run at once, preventing thread pool starvation of partition readers.
Changes:
- Add
max_concurrent_partition_generators: Optional[int]toConcurrentReadProcessorand enforce it instart_next_partition_generator(). - Pass the cap from
ConcurrentSource.read()using_initial_number_partitions_to_generateto align runtime behavior with the existing construction-time constraint. - Add unit tests to verify generator start is blocked at the cap and allowed below it.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 1 comment.
| File | Description |
|---|---|
airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py |
Adds and enforces a runtime cap on concurrent partition generators. |
airbyte_cdk/sources/concurrent_source/concurrent_source.py |
Wires the cap into ConcurrentReadProcessor construction in read(). |
unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py |
Adds unit tests covering cap enforcement behavior. |
Comments suppressed due to low confidence (1)
airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py:52
max_concurrent_partition_generatorsaccepts any int, but0or negative values will causestart_next_partition_generator()to always returnNone(sincelen(...) >= maxwill always be true), effectively preventing any partition generation and potentially hanging the sync. Consider validating this argument in__init__(e.g., raiseValueErrorwhen the value is notNoneand is < 1).
max_concurrent_partition_generators: Optional[int] = None,
):
"""
This class is responsible for handling items from a concurrent stream read process.
:param stream_instances_to_read_from: List of streams to read from
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
/autofix
|
|
ℹ️ Recent review info⚙️ Run configurationConfiguration used: Repository UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (3)
🚧 Files skipped from review as they are similar to previous changes (3)
📝 WalkthroughWalkthroughThis PR adds an optional max_concurrent_partition_generators parameter to ConcurrentReadProcessor, validates and stores it, enforces the cap in start_next_partition_generator (early return when cap reached), wires the parameter from ConcurrentSource.read(), and adds unit tests for enforcement and invalid inputs. ChangesConcurrent Partition Generator Limit
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Would you like me to add a short inline comment in the constructor clarifying the relationship between this cap and the worker pool size, wdyt? 🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Tip 💬 Introducing Slack Agent: The best way for teams to turn conversations into code.Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.
Built for teams:
One agent for your entire SDLC. Right inside Slack. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
🧹 Nitpick comments (2)
airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py (1)
264-272: 💤 Low valueConsider adding a debug log when the cap guard fires.
The
block_simultaneous_readdeferral path logs atINFOwhen it skips a stream. When the cap guard fires here there's no log at all, which could make it harder to spot a misconfiguration in production. Would it make sense to add aself._logger.debug(...)here? Something like:💡 Optional debug log suggestion
if ( self._max_concurrent_partition_generators is not None and len(self._streams_currently_generating_partitions) >= self._max_concurrent_partition_generators ): + self._logger.debug( + f"Concurrent partition generator cap ({self._max_concurrent_partition_generators}) reached " + f"({len(self._streams_currently_generating_partitions)} active). " + f"Deferring next generator start." + ) return Nonewdyt?
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py` around lines 264 - 272, Add a debug log before returning None when the concurrent partition generator cap guard fires: in the block that checks self._max_concurrent_partition_generators and len(self._streams_currently_generating_partitions) (the cap guard inside concurrent_read_processor), call self._logger.debug(...) to record that generation was skipped and include the current count and the max cap (e.g., current=len(self._streams_currently_generating_partitions), max=self._max_concurrent_partition_generators) so operators can see when the guard prevented a new generator before still returning None as before.unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py (1)
866-872: 💤 Low valueThe list equality assertion on line 871 compares the object to itself.
In
ConcurrentReadProcessor.__init__,self._stream_instances_to_start_partition_generation = stream_instances_to_read_fromstores the same reference — no copy is made. Sohandler._stream_instances_to_start_partition_generation == stream_instances_to_read_fromreduces toa == a, which is alwaysTrueregardless of whether the guard dequeued or re-enqueued anything. The real guard is thesubmit.assert_not_called()below it. Would you consider tightening the assertion to check the list's actual contents? wdyt?🔧 Proposed fix for the list assertion
- assert handler._stream_instances_to_start_partition_generation == stream_instances_to_read_from + assert list(handler._stream_instances_to_start_partition_generation) == [self._stream]🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py` around lines 866 - 872, The test currently compares handler._stream_instances_to_start_partition_generation to stream_instances_to_read_from by reference; instead capture the expected contents before calling start_next_partition_generator (e.g. expected = list(stream_instances_to_read_from)) and then assert handler._stream_instances_to_start_partition_generation == expected (or compare sorted/element-wise contents) so you verify actual list contents rather than object identity; locate the symbols stream_instances_to_read_from and handler._stream_instances_to_start_partition_generation in the test and replace the equality assertion accordingly.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py`:
- Around line 264-272: Add a debug log before returning None when the concurrent
partition generator cap guard fires: in the block that checks
self._max_concurrent_partition_generators and
len(self._streams_currently_generating_partitions) (the cap guard inside
concurrent_read_processor), call self._logger.debug(...) to record that
generation was skipped and include the current count and the max cap (e.g.,
current=len(self._streams_currently_generating_partitions),
max=self._max_concurrent_partition_generators) so operators can see when the
guard prevented a new generator before still returning None as before.
In `@unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py`:
- Around line 866-872: The test currently compares
handler._stream_instances_to_start_partition_generation to
stream_instances_to_read_from by reference; instead capture the expected
contents before calling start_next_partition_generator (e.g. expected =
list(stream_instances_to_read_from)) and then assert
handler._stream_instances_to_start_partition_generation == expected (or compare
sorted/element-wise contents) so you verify actual list contents rather than
object identity; locate the symbols stream_instances_to_read_from and
handler._stream_instances_to_start_partition_generation in the test and replace
the equality assertion accordingly.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 70d6137d-23a6-4af9-a6ee-4e7e4246b3e0
📒 Files selected for processing (3)
airbyte_cdk/sources/concurrent_source/concurrent_read_processor.pyairbyte_cdk/sources/concurrent_source/concurrent_source.pyunit_tests/sources/streams/concurrent/test_concurrent_read_processor.py
… on cap guard Addresses reviewer feedback: - Raise ValueError when max_concurrent_partition_generators is not None and < 1, preventing silent hangs from zero/negative values - Log at DEBUG when the cap guard fires, so operators can observe deferral Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
|
/autofix
|
…le-threaded mode The previous wording said the value "must be less than the number of workers" which is incorrect for the single-threaded case (num_workers=1, initial_number_of_partitions_to_generate=1) that ConcurrentSource.create() explicitly allows. Reword to "should be less than the number of workers in multi-worker mode" and note that ConcurrentSource.create() handles the distinction. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
|
/autofix
|
…urrentSource.create() Without this check, passing 0 bypasses the existing too_many_generator assertion (0 < num_workers is always true), then propagates into ConcurrentReadProcessor as max_concurrent_partition_generators=0 and raises ValueError at read() time instead of at construction time. Fail fast at create() so the error is raised consistently. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
|
/autofix
|
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 3 out of 3 changed files in this pull request and generated no new comments.
Comments suppressed due to low confidence (1)
airbyte_cdk/sources/concurrent_source/concurrent_source.py:59
ConcurrentSource.create()uses anassertto enforceinitial_number_of_partitions_to_generate < num_workers(except single-threaded mode). Because Python assertions can be disabled with-O, this safety check may be skipped in production, reintroducing the thread-pool starvation/deadlock risk this PR is trying to prevent. Prefer raising a regular exception (e.g.,ValueError) instead ofassertfor this user-input validation.
too_many_generator = (
not is_single_threaded and initial_number_of_partitions_to_generate >= num_workers
)
assert not too_many_generator, (
"It is required to have more workers than threads generating partitions"
ReviewSolid fix. The root cause analysis is thorough and the recovery guarantee reasoning is convincing. A few observations: CorrectnessThe cap placement is correct — it sits after the "no streams left" early return but before the The key invariant holds: Thread safety is fine — all accesses to The initial setup in Minor suggestions
LGTMThe fix is minimal, well-scoped, and the tests cover the two key scenarios (at-cap → blocked, below-cap → proceeds). No concerns on correctness or backward compatibility ( |
…s values Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
|
/prerelease
|
Summary
max_concurrent_partition_generators: Optional[int]toConcurrentReadProcessor.__init__. DefaultNone= no cap (preserves all existing behaviour).start_next_partition_generator, returnsNoneimmediately when the cap is reached, preventing generator tasks from consuming all worker slots and starving partition readers.ConcurrentSource.read()now passesmax_concurrent_partition_generators=self._initial_number_partitions_to_generate, tying the runtime enforcement to the same value as the construction-time assertion inConcurrentSource.create().Root cause
ConcurrentSource.create()assertsinitial_number_of_partitions_to_generate < num_workersat construction time, but there was no runtime enforcement. PR #870 (feat: Addblock_simultaneous_read) introduced a second call site:on_partition_complete_sentinelalso callsstart_next_partition_generator. When multiple stream completions land in the queue back-to-back, bothon_partition_generation_completedandon_partition_complete_sentinelfire in sequence and each independently starts a new generator — accumulating more concurrent generators than workers can safely accommodate.When all worker slots are occupied by sleeping
generate_partitionstasks (seepartition_enqueuer.py:59), no partition readers can run, leaving the main thread blocked onqueue.get()indefinitely.Observed in production: source-google-ads 4.2.5, job 82903962 — final freeze at 2026-05-07 06:07:30 with
msgs=2063, queue_size=0. All workers sleeping atpartition_enqueuer.py:59 → time.sleep(), main thread atconcurrent_source.py → queue.get() → not_empty.wait().Recovery guarantee
on_partition_generation_completedalways removes the stream from_streams_currently_generating_partitionsbefore callingstart_next_partition_generator, so the guard always passes there and the pipeline keeps moving forward. No stream is permanently stuck.Relation to PR #977
PR #977 (merged) fixed a different deadlock: the main thread blocking on
queue.put()when the queue was full. This PR fixes a separate root cause (generator starvation via thread pool exhaustion) and touches different files. Keeping them separate makes the git history bisectable.Test plan
unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py— 41 tests pass (including 2 new + allTestBlockSimultaneousReadtests)unit_tests/sources/concurrent_source/+unit_tests/sources/streams/concurrent/) — 244 tests pass🤖 Generated with Claude Code
Summary by CodeRabbit
New Features
Tests