Skip to content

fix(concurrent-cdk): enforce runtime cap on concurrent partition generators to prevent deadlock#1015

Open
Anatolii Yatsuk (tolik0) wants to merge 7 commits intomainfrom
tolik0/concurrent-source/fix-concurrent-generator-starvation-deadlock
Open

fix(concurrent-cdk): enforce runtime cap on concurrent partition generators to prevent deadlock#1015
Anatolii Yatsuk (tolik0) wants to merge 7 commits intomainfrom
tolik0/concurrent-source/fix-concurrent-generator-starvation-deadlock

Conversation

@tolik0
Copy link
Copy Markdown
Contributor

@tolik0 Anatolii Yatsuk (tolik0) commented May 7, 2026

Summary

  • Adds max_concurrent_partition_generators: Optional[int] to ConcurrentReadProcessor.__init__. Default None = no cap (preserves all existing behaviour).
  • In start_next_partition_generator, returns None immediately when the cap is reached, preventing generator tasks from consuming all worker slots and starving partition readers.
  • ConcurrentSource.read() now passes max_concurrent_partition_generators=self._initial_number_partitions_to_generate, tying the runtime enforcement to the same value as the construction-time assertion in ConcurrentSource.create().
  • Two new unit tests: one verifying the cap blocks a new generator when the limit is reached, one verifying a generator does start when below the limit.

Root cause

ConcurrentSource.create() asserts initial_number_of_partitions_to_generate < num_workers at construction time, but there was no runtime enforcement. PR #870 (feat: Add block_simultaneous_read) introduced a second call site: on_partition_complete_sentinel also calls start_next_partition_generator. When multiple stream completions land in the queue back-to-back, both on_partition_generation_completed and on_partition_complete_sentinel fire in sequence and each independently starts a new generator — accumulating more concurrent generators than workers can safely accommodate.

When all worker slots are occupied by sleeping generate_partitions tasks (see partition_enqueuer.py:59), no partition readers can run, leaving the main thread blocked on queue.get() indefinitely.

Observed in production: source-google-ads 4.2.5, job 82903962 — final freeze at 2026-05-07 06:07:30 with msgs=2063, queue_size=0. All workers sleeping at partition_enqueuer.py:59 → time.sleep(), main thread at concurrent_source.py → queue.get() → not_empty.wait().

Recovery guarantee

on_partition_generation_completed always removes the stream from _streams_currently_generating_partitions before calling start_next_partition_generator, so the guard always passes there and the pipeline keeps moving forward. No stream is permanently stuck.

Relation to PR #977

PR #977 (merged) fixed a different deadlock: the main thread blocking on queue.put() when the queue was full. This PR fixes a separate root cause (generator starvation via thread pool exhaustion) and touches different files. Keeping them separate makes the git history bisectable.

Test plan

  • unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py — 41 tests pass (including 2 new + all TestBlockSimultaneousRead tests)
  • Full concurrent suite (unit_tests/sources/concurrent_source/ + unit_tests/sources/streams/concurrent/) — 244 tests pass

🤖 Generated with Claude Code

Summary by CodeRabbit

  • New Features

    • Added an optional cap on how many partition generators can run concurrently to better control resource usage during concurrent source operations.
    • The cap is enforced at runtime to prevent starting new generators once the limit is reached.
  • Tests

    • Added unit tests covering behavior when the concurrent-generator limit is reached and validating that invalid cap values raise an error.

…rators to prevent thread pool starvation deadlock

When multiple stream completions arrive back-to-back in the queue, both
on_partition_generation_completed and on_partition_complete_sentinel each
call start_next_partition_generator independently, causing the number of
concurrent generator tasks to exceed the initial_number_partitions_to_generate
bound enforced only at construction time. When all worker slots are occupied
by sleeping generate_partitions tasks no partition readers can run, leaving
the main thread blocked on queue.get() indefinitely.

The fix adds max_concurrent_partition_generators (Optional[int]) to
ConcurrentReadProcessor. When set, start_next_partition_generator returns
None immediately if the cap is already reached. Recovery is guaranteed:
on_partition_generation_completed decrements _streams_currently_generating_partitions
before calling start_next_partition_generator, so the guard always passes there.
The default of None preserves existing behaviour for callers that do not use
ConcurrentSource (including block_simultaneous_read tests). ConcurrentSource
passes initial_number_partitions_to_generate explicitly, tying the runtime cap
to the same value as the construction-time assertion.

Root cause introduced in PR #870 (feat: Add block_simultaneous_read), which
added a second call site to start_next_partition_generator inside
on_partition_complete_sentinel without a corresponding runtime bound.

Observed in production: source-google-ads 4.2.5, job 82903962, all workers
blocked in partition_enqueuer.py:59 time.sleep(), main thread stuck on
concurrent_source.py queue.get().

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Copilot AI review requested due to automatic review settings May 7, 2026 09:01
@github-actions
Copy link
Copy Markdown

github-actions Bot commented May 7, 2026

👋 Greetings, Airbyte Team Member!

Here are some helpful tips and reminders for your convenience.

💡 Show Tips and Tricks

Testing This CDK Version

You can test this version of the CDK using the following:

# Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@tolik0/concurrent-source/fix-concurrent-generator-starvation-deadlock#egg=airbyte-python-cdk[dev]' --help

# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch tolik0/concurrent-source/fix-concurrent-generator-starvation-deadlock

PR Slash Commands

Airbyte Maintainers can execute the following slash commands on your PR:

  • /autofix - Fixes most formatting and linting issues
  • /poetry-lock - Updates poetry.lock file
  • /test - Runs connector tests with the updated CDK
  • /prerelease - Triggers a prerelease publish with default arguments
  • /poe build - Regenerate git-committed build artifacts, such as the pydantic models which are generated from the manifest JSON schema in YAML.
  • /poe <command> - Runs any poe command in the CDK environment
📚 Show Repo Guidance

Helpful Resources

📝 Edit this welcome message.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR mitigates a concurrency deadlock in the CDK’s concurrent reading pipeline by enforcing a runtime cap on how many partition-generator tasks can run at once, preventing thread pool starvation of partition readers.

Changes:

  • Add max_concurrent_partition_generators: Optional[int] to ConcurrentReadProcessor and enforce it in start_next_partition_generator().
  • Pass the cap from ConcurrentSource.read() using _initial_number_partitions_to_generate to align runtime behavior with the existing construction-time constraint.
  • Add unit tests to verify generator start is blocked at the cap and allowed below it.

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 1 comment.

File Description
airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py Adds and enforces a runtime cap on concurrent partition generators.
airbyte_cdk/sources/concurrent_source/concurrent_source.py Wires the cap into ConcurrentReadProcessor construction in read().
unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py Adds unit tests covering cap enforcement behavior.
Comments suppressed due to low confidence (1)

airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py:52

  • max_concurrent_partition_generators accepts any int, but 0 or negative values will cause start_next_partition_generator() to always return None (since len(...) >= max will always be true), effectively preventing any partition generation and potentially hanging the sync. Consider validating this argument in __init__ (e.g., raise ValueError when the value is not None and is < 1).
        max_concurrent_partition_generators: Optional[int] = None,
    ):
        """
        This class is responsible for handling items from a concurrent stream read process.
        :param stream_instances_to_read_from: List of streams to read from

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py Outdated
@tolik0
Copy link
Copy Markdown
Contributor Author

Anatolii Yatsuk (tolik0) commented May 7, 2026

/autofix

Auto-Fix Job Info

This job attempts to auto-fix any linting or formating issues. If any fixes are made,
those changes will be automatically committed and pushed back to the PR.

Note: This job can only be run by maintainers. On PRs from forks, this command requires
that the PR author has enabled the Allow edits from maintainers option.

PR auto-fix job started... Check job output.

✅ Changes applied successfully.

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 7, 2026

Review Change Stack
No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 0fce0cd8-cacf-4843-a96e-a8f00d4a07fb

📥 Commits

Reviewing files that changed from the base of the PR and between 3ce4738 and 5990d45.

📒 Files selected for processing (3)
  • airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py
  • airbyte_cdk/sources/concurrent_source/concurrent_source.py
  • unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py
🚧 Files skipped from review as they are similar to previous changes (3)
  • airbyte_cdk/sources/concurrent_source/concurrent_source.py
  • unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py
  • airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py

📝 Walkthrough

Walkthrough

This PR adds an optional max_concurrent_partition_generators parameter to ConcurrentReadProcessor, validates and stores it, enforces the cap in start_next_partition_generator (early return when cap reached), wires the parameter from ConcurrentSource.read(), and adds unit tests for enforcement and invalid inputs.

Changes

Concurrent Partition Generator Limit

Layer / File(s) Summary
API Contract & Constructor
airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py
ConcurrentReadProcessor.__init__ signature extended with optional max_concurrent_partition_generators: Optional[int] = None parameter and docstring entries.
Instance State Storage
airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py
Constructor validates (>= 1 when provided) and stores max_concurrent_partition_generators as self._max_concurrent_partition_generators.
Concurrency Limit Enforcement
airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py
start_next_partition_generator now returns early (None) if the cap is set and currently generating streams meet or exceed it, before processing the queue.
ConcurrentSource Integration
airbyte_cdk/sources/concurrent_source/concurrent_source.py
ConcurrentSource.read() passes max_concurrent_partition_generators=self._initial_number_partitions_to_generate when instantiating ConcurrentReadProcessor.
Concurrency Limit & Validation Tests
unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py
Adds tests: invalid constructor values (0, -1) raise ValueError; start_next_partition_generator() respects the limit (returns None, no submit) and starts/submits when below the limit (updates tracker and calls submit).

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

Would you like me to add a short inline comment in the constructor clarifying the relationship between this cap and the worker pool size, wdyt?

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 33.33% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately describes the main change: adding runtime enforcement to cap concurrent partition generators to prevent deadlock, which is the primary objective of this PR.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch tolik0/concurrent-source/fix-concurrent-generator-starvation-deadlock

Tip

💬 Introducing Slack Agent: The best way for teams to turn conversations into code.

Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.

  • Generate code and open pull requests
  • Plan features and break down work
  • Investigate incidents and troubleshoot customer tickets together
  • Automate recurring tasks and respond to alerts with triggers
  • Summarize progress and report instantly

Built for teams:

  • Shared memory across your entire org—no repeating context
  • Per-thread sandboxes to safely plan and execute work
  • Governance built-in—scoped access, auditability, and budget controls

One agent for your entire SDLC. Right inside Slack.

👉 Get started


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (2)
airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py (1)

264-272: 💤 Low value

Consider adding a debug log when the cap guard fires.

The block_simultaneous_read deferral path logs at INFO when it skips a stream. When the cap guard fires here there's no log at all, which could make it harder to spot a misconfiguration in production. Would it make sense to add a self._logger.debug(...) here? Something like:

💡 Optional debug log suggestion
         if (
             self._max_concurrent_partition_generators is not None
             and len(self._streams_currently_generating_partitions)
             >= self._max_concurrent_partition_generators
         ):
+            self._logger.debug(
+                f"Concurrent partition generator cap ({self._max_concurrent_partition_generators}) reached "
+                f"({len(self._streams_currently_generating_partitions)} active). "
+                f"Deferring next generator start."
+            )
             return None

wdyt?

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py` around
lines 264 - 272, Add a debug log before returning None when the concurrent
partition generator cap guard fires: in the block that checks
self._max_concurrent_partition_generators and
len(self._streams_currently_generating_partitions) (the cap guard inside
concurrent_read_processor), call self._logger.debug(...) to record that
generation was skipped and include the current count and the max cap (e.g.,
current=len(self._streams_currently_generating_partitions),
max=self._max_concurrent_partition_generators) so operators can see when the
guard prevented a new generator before still returning None as before.
unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py (1)

866-872: 💤 Low value

The list equality assertion on line 871 compares the object to itself.

In ConcurrentReadProcessor.__init__, self._stream_instances_to_start_partition_generation = stream_instances_to_read_from stores the same reference — no copy is made. So handler._stream_instances_to_start_partition_generation == stream_instances_to_read_from reduces to a == a, which is always True regardless of whether the guard dequeued or re-enqueued anything. The real guard is the submit.assert_not_called() below it. Would you consider tightening the assertion to check the list's actual contents? wdyt?

🔧 Proposed fix for the list assertion
-        assert handler._stream_instances_to_start_partition_generation == stream_instances_to_read_from
+        assert list(handler._stream_instances_to_start_partition_generation) == [self._stream]
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py`
around lines 866 - 872, The test currently compares
handler._stream_instances_to_start_partition_generation to
stream_instances_to_read_from by reference; instead capture the expected
contents before calling start_next_partition_generator (e.g. expected =
list(stream_instances_to_read_from)) and then assert
handler._stream_instances_to_start_partition_generation == expected (or compare
sorted/element-wise contents) so you verify actual list contents rather than
object identity; locate the symbols stream_instances_to_read_from and
handler._stream_instances_to_start_partition_generation in the test and replace
the equality assertion accordingly.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Nitpick comments:
In `@airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py`:
- Around line 264-272: Add a debug log before returning None when the concurrent
partition generator cap guard fires: in the block that checks
self._max_concurrent_partition_generators and
len(self._streams_currently_generating_partitions) (the cap guard inside
concurrent_read_processor), call self._logger.debug(...) to record that
generation was skipped and include the current count and the max cap (e.g.,
current=len(self._streams_currently_generating_partitions),
max=self._max_concurrent_partition_generators) so operators can see when the
guard prevented a new generator before still returning None as before.

In `@unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py`:
- Around line 866-872: The test currently compares
handler._stream_instances_to_start_partition_generation to
stream_instances_to_read_from by reference; instead capture the expected
contents before calling start_next_partition_generator (e.g. expected =
list(stream_instances_to_read_from)) and then assert
handler._stream_instances_to_start_partition_generation == expected (or compare
sorted/element-wise contents) so you verify actual list contents rather than
object identity; locate the symbols stream_instances_to_read_from and
handler._stream_instances_to_start_partition_generation in the test and replace
the equality assertion accordingly.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 70d6137d-23a6-4af9-a6ee-4e7e4246b3e0

📥 Commits

Reviewing files that changed from the base of the PR and between ccc185f and 3012695.

📒 Files selected for processing (3)
  • airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py
  • airbyte_cdk/sources/concurrent_source/concurrent_source.py
  • unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py

… on cap guard

Addresses reviewer feedback:
- Raise ValueError when max_concurrent_partition_generators is not None and < 1,
  preventing silent hangs from zero/negative values
- Log at DEBUG when the cap guard fires, so operators can observe deferral

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@tolik0
Copy link
Copy Markdown
Contributor Author

Anatolii Yatsuk (tolik0) commented May 7, 2026

/autofix

Auto-Fix Job Info

This job attempts to auto-fix any linting or formating issues. If any fixes are made,
those changes will be automatically committed and pushed back to the PR.

Note: This job can only be run by maintainers. On PRs from forks, this command requires
that the PR author has enabled the Allow edits from maintainers option.

PR auto-fix job started... Check job output.

✅ Changes applied successfully.

@github-actions
Copy link
Copy Markdown

github-actions Bot commented May 7, 2026

PyTest Results (Fast)

4 046 tests  +3   4 035 ✅ +3   7m 45s ⏱️ -3s
    1 suites ±0      11 💤 ±0 
    1 files   ±0       0 ❌ ±0 

Results for commit 5990d45. ± Comparison against base commit ccc185f.

♻️ This comment has been updated with latest results.

@github-actions
Copy link
Copy Markdown

github-actions Bot commented May 7, 2026

PyTest Results (Full)

4 049 tests  +3   4 037 ✅ +3   11m 4s ⏱️ +3s
    1 suites ±0      12 💤 ±0 
    1 files   ±0       0 ❌ ±0 

Results for commit 5990d45. ± Comparison against base commit ccc185f.

♻️ This comment has been updated with latest results.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 3 out of 3 changed files in this pull request and generated 1 comment.

Comment thread airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py Outdated
…le-threaded mode

The previous wording said the value "must be less than the number of workers"
which is incorrect for the single-threaded case (num_workers=1,
initial_number_of_partitions_to_generate=1) that ConcurrentSource.create()
explicitly allows. Reword to "should be less than the number of workers in
multi-worker mode" and note that ConcurrentSource.create() handles the
distinction.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@tolik0
Copy link
Copy Markdown
Contributor Author

Anatolii Yatsuk (tolik0) commented May 7, 2026

/autofix

Auto-Fix Job Info

This job attempts to auto-fix any linting or formating issues. If any fixes are made,
those changes will be automatically committed and pushed back to the PR.

Note: This job can only be run by maintainers. On PRs from forks, this command requires
that the PR author has enabled the Allow edits from maintainers option.

PR auto-fix job started... Check job output.

🟦 Job completed successfully (no changes).

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 3 out of 3 changed files in this pull request and generated 1 comment.

Comment thread airbyte_cdk/sources/concurrent_source/concurrent_source.py
…urrentSource.create()

Without this check, passing 0 bypasses the existing too_many_generator assertion
(0 < num_workers is always true), then propagates into ConcurrentReadProcessor as
max_concurrent_partition_generators=0 and raises ValueError at read() time instead
of at construction time. Fail fast at create() so the error is raised consistently.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@tolik0
Copy link
Copy Markdown
Contributor Author

Anatolii Yatsuk (tolik0) commented May 7, 2026

/autofix

Auto-Fix Job Info

This job attempts to auto-fix any linting or formating issues. If any fixes are made,
those changes will be automatically committed and pushed back to the PR.

Note: This job can only be run by maintainers. On PRs from forks, this command requires
that the PR author has enabled the Allow edits from maintainers option.

PR auto-fix job started... Check job output.

🟦 Job completed successfully (no changes).

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 3 out of 3 changed files in this pull request and generated no new comments.

Comments suppressed due to low confidence (1)

airbyte_cdk/sources/concurrent_source/concurrent_source.py:59

  • ConcurrentSource.create() uses an assert to enforce initial_number_of_partitions_to_generate < num_workers (except single-threaded mode). Because Python assertions can be disabled with -O, this safety check may be skipped in production, reintroducing the thread-pool starvation/deadlock risk this PR is trying to prevent. Prefer raising a regular exception (e.g., ValueError) instead of assert for this user-input validation.
        too_many_generator = (
            not is_single_threaded and initial_number_of_partitions_to_generate >= num_workers
        )
        assert not too_many_generator, (
            "It is required to have more workers than threads generating partitions"

@devin-ai-integration
Copy link
Copy Markdown
Contributor

Review

Solid fix. The root cause analysis is thorough and the recovery guarantee reasoning is convincing. A few observations:

Correctness

The cap placement is correct — it sits after the "no streams left" early return but before the block_simultaneous_read while-loop, so we avoid pointless iteration when we're already at capacity.

The key invariant holds: on_partition_generation_completed removes from _streams_currently_generating_partitions before calling start_next_partition_generator, so the guard always passes on that path and the pipeline keeps draining. The cap only blocks the redundant calls from on_partition_complete_sentinel, which is exactly the scenario that caused the starvation.

Thread safety is fine — all accesses to _streams_currently_generating_partitions and the cap check happen on the main thread (from _consume_from_queue), so no races.

The initial setup in _submit_initial_partition_generators loops exactly N = _initial_number_partitions_to_generate times, and the cap is also N, so the cap is never hit during initialization (after N-1 successful calls, len(...) = N-1 < N). The Nth call succeeds, bringing it to N, and any subsequent call is correctly blocked.

Minor suggestions

  1. ValueError test coverage: Consider adding a small test asserting that ConcurrentReadProcessor(max_concurrent_partition_generators=0) raises ValueError. The validation is there but not covered by the new tests.

  2. Log level: The cap-hit message uses debug. In production, this could be valuable signal when diagnosing slow syncs or unexpected stalling. Would info (or at least a counter metric) be more appropriate? Not blocking — debug is fine if you want to keep logs clean.

  3. create() validation: The new initial_number_of_partitions_to_generate < 1 check in create() is a good addition — without it, initial=0 would silently pass the existing assert not too_many_generator (since 0 >= num_workers is False for any num_workers >= 1), leading to zero generators and a hung sync.

LGTM

The fix is minimal, well-scoped, and the tests cover the two key scenarios (at-cap → blocked, below-cap → proceeds). No concerns on correctness or backward compatibility (None default preserves existing behavior for any caller that doesn't pass the new param).

…s values

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
@tolik0
Copy link
Copy Markdown
Contributor Author

Anatolii Yatsuk (tolik0) commented May 7, 2026

/prerelease

Prerelease Job Info

This job triggers the publish workflow with default arguments to create a prerelease.

Prerelease job started... Check job output.

✅ Prerelease workflow triggered successfully.

View the publish workflow run: https://github.com/airbytehq/airbyte-python-cdk/actions/runs/25490693965

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants