feat: Add block_simultaneous_read with top-level stream_groups interface#870
feat: Add block_simultaneous_read with top-level stream_groups interface#870Anatolii Yatsuk (tolik0) wants to merge 30 commits intomainfrom
Conversation
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. Testing This CDK VersionYou can test this version of the CDK using the following: # Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@tolik0/concurrent-source/add-block_simultaneous_read#egg=airbyte-python-cdk[dev]' --help
# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch tolik0/concurrent-source/add-block_simultaneous_readHelpful ResourcesPR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR:
|
|
/autofix
|
|
/prerelease
|
|
/prerelease
|
|
/prerelease
|
There was a problem hiding this comment.
Pull request overview
This PR introduces a block_simultaneous_read feature to the Python CDK that prevents concurrent execution of streams sharing the same resource (API endpoint, session, or rate limit pool). The feature uses string-based group identifiers where streams with matching non-empty group names will not run concurrently, addressing issues like duplicate API calls when streams function as both standalone and parent streams.
Changes:
- Added
block_simultaneous_readproperty to stream interfaces and schema definitions with empty string as default (backward compatible) - Implemented blocking logic in
ConcurrentReadProcessorthat defers streams when their group or parent's group is active - Added comprehensive test coverage for various blocking scenarios including parent-child relationships and multi-level hierarchies
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
test_concurrent_read_processor.py |
Added comprehensive test suite covering all blocking scenarios |
test_model_to_component_factory.py |
Added integration test verifying manifest-to-stream property flow |
default_stream.py |
Added block_simultaneous_read property to DefaultStream |
adapters.py |
Added property adapter for legacy stream compatibility |
abstract_stream.py |
Added abstract property definition with documentation |
model_to_component_factory.py |
Integrated property from manifest to stream construction |
declarative_component_schema.py |
Generated schema with new property definition |
declarative_component_schema.yaml |
Added schema definition with comprehensive documentation |
concurrent_read_processor.py |
Implemented core blocking logic with group tracking and deferral |
Comments suppressed due to low confidence (1)
airbyte_cdk/sources/declarative/declarative_component_schema.yaml:1
- The description change for
use_cacheappears unrelated to theblock_simultaneous_readfeature. This change should be separated into its own PR or have an explanation for why it's included in this feature PR.
"$schema": http://json-schema.org/draft-07/schema#
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py
Outdated
Show resolved
Hide resolved
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughAdds declarative-configurable stream groups and a stream-level Changes
Sequence Diagram(s)sequenceDiagram
participant Processor as ConcurrentReadProcessor
participant Queue as StreamQueue
participant Generator as PartitionGenerator
participant Tracker as ActiveStreamTracker
Processor->>Processor: start_next_partition_generator()
loop attempts until a stream starts or all blocked
Processor->>Queue: peek next stream
Processor->>Tracker: collect group + parent chain
alt blocked (self/parent/group active)
Processor->>Queue: defer stream (requeue to tail)
else not blocked
Processor->>Tracker: mark stream & parents active
Processor->>Tracker: associate stream with blocking group
Processor->>Generator: submit partition generator
Generator-->>Processor: STARTED status
end
end
Generator->>Processor: on_partition_generation_completed()
Processor->>Tracker: deactivate completed stream & related parents
Processor->>Processor: start_next_partition_generator() (may yield status)
Processor->>Processor: _on_stream_is_done()
Processor->>Tracker: remove stream from active sets and update groups
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Want me to call out specific lines or logic hotspots in concurrent_read_processor.py for deeper review (e.g., max_attempts loop, parent-collection edge cases, or log verbosity)? wdyt? 🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
📝 Coding Plan
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py (1)
90-135: Possible correctness issue: parent “deactivation” can un-block a parent that’s still independently active, wdyt?
Right now, after child partition generation completes you discard each parent from_active_stream_names/_active_groupspurely based on being a parent. If the parent stream is also actively generating partitions or still reading its own partitions, this can weaken the blocking guarantee for that parent’s group.Proposed guard before deactivating a parent
parent_streams = self._collect_all_parent_stream_names(stream_name) for parent_stream_name in parent_streams: if parent_stream_name in self._active_stream_names: + # If the parent is still independently active (generating partitions or reading partitions), + # don't deactivate it just because a child's partition generation finished. + if ( + parent_stream_name in self._streams_currently_generating_partitions + or len(self._streams_to_running_partitions.get(parent_stream_name, set())) > 0 + ): + continue self._logger.debug(f"Removing '{parent_stream_name}' from active streams") self._active_stream_names.discard(parent_stream_name)
🤖 Fix all issues with AI agents
In @airbyte_cdk/sources/declarative/declarative_component_schema.yaml:
- Around line 1556-1575: The docs claim block_simultaneous_read accepts "" or
null but the schema defines only type: string, causing validation errors; update
the declarative_component_schema.yaml to make block_simultaneous_read accept
null by changing its type to allow both string and null (e.g., type: [ "string",
"null" ] and keep default ""), or alternatively remove the word "null" from the
description so the docs match the current type: string; ensure the change
references the block_simultaneous_read entry and adjust the default/description
accordingly.
In @airbyte_cdk/sources/declarative/models/declarative_component_schema.py:
- Around line 2746-2749: The Field use_cache in declarative_component_schema.py
currently has default=False but the description implies a default of True;
update the description to match the default (e.g., change the sentence “Only set
this to false if you are certain that caching should be disabled…” to “Only set
this to true if you are certain that caching should be enabled…”) or, if the
intent was to enable caching by default, flip the Field default to True—locate
the use_cache Field definition and make the description and default consistent.
In @unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py:
- Around line 1175-1208: The test test_multiple_blocked_streams_retry_in_order
is incomplete and doesn't assert retry ordering; update the test to
deterministically exercise
ConcurrentReadProcessor.start_next_partition_generator and then assert the
expected order and retry behavior: after starting the parent (verify "parent" in
handler._active_stream_names), call handler.start_next_partition_generator
repeatedly and assert that child1 is deferred before child2 (inspect
handler._stream_instances_to_start_partition_generation order or pop sequence),
then simulate parent completion (mark partition generation finished or remove
"parent" from _active_stream_names) and call start_next_partition_generator to
verify that child1 is retried/started before child2. Use the existing mock
streams (parent, child1, child2) and the handler instance to make these concrete
assertions.
- Around line 1047-1073: The test's docstring is accidentally split into two
adjacent string literals ("""Test that blocking doesn't occur when
block_simultaneous_read=""" """) which is confusing; replace it with a single
proper triple-quoted docstring for test_no_defer_when_flag_false (e.g. """Test
that blocking doesn't occur when block_simultaneous_read=''""") and leave the
call to _create_mock_stream(stream1, block_simultaneous_read="") and references
to ConcurrentReadProcessor unchanged.
🧹 Nitpick comments (10)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
2098-2122: Propagation looks correct; consider normalizing whitespace-only group names (optional), wdyt?
model.block_simultaneous_read or ""keeps backward compatibility, nice. If you want to reduce accidental “blocking enabled” from values like" "in manifests, would you consider trimming here?Proposed tweak
- block_simultaneous_read=model.block_simultaneous_read or "", + block_simultaneous_read=(model.block_simultaneous_read or "").strip(),airbyte_cdk/sources/streams/concurrent/default_stream.py (2)
18-41: Consider defensively coercingNoneto""in the ctor, wdyt?Even with type hints, callers can pass
Noneat runtime; making this robust in the ctor avoids surprising downstream issues.Proposed tweak
- self._block_simultaneous_read = block_simultaneous_read + self._block_simultaneous_read = block_simultaneous_read or ""
86-103: Would addingblock_simultaneous_readto the debug “sync configuration” log help diagnosability, wdyt?Right now
log_stream_sync_configuration()logs PK + cursor; including the group could make concurrency issues much easier to debug.unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (1)
5217-5353: Nice end-to-end propagation test; consider also asserting the nested parent stream instance on the child carries the group, wdyt?
Right now you assertparent_stream.block_simultaneous_readandchild_stream.block_simultaneous_read, but not that theParentStreamConfig.streamcreated underchild_stream’sSubstreamPartitionRouteralso has"issues_endpoint"(covers the “parent stream instantiated as a dependency” path too). Also, the inline comments still mentiontruethough this is a string field.unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py (2)
809-839: Mockingblock_simultaneous_readas an attribute is fine here; could you make logger assertions less brittle, wdyt?
Instead of"... in str(call)", consider asserting onself._logger.info.call_args_list[i].args[0](or usingassert_any_callwith a stable substring) so formatting changes don’t break tests.
976-1015: This test mutates internal state in a way that can create duplicates; can we avoid appending “parent” twice, wdyt?
start_next_partition_generator()already appends"parent"to_streams_currently_generating_partitions; the extraappend("parent")can leave a stale entry after.remove()and make later invariants weird.airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py (4)
69-89: Per-streaminfolog on init may be noisy; woulddebugbe enough, wdyt?
This logs once per stream with a group; on connectors with many streams this could be a lot atinfo.
228-336: Docs/comments still mentionblock_simultaneous_read=Truebut the API is string-based; update wording, wdyt?
This is minor, but the docstring bullets (and some inline comments) still describe it like a boolean flag, which makes the grouping semantics harder to follow.
60-61: Do we want_stream_instances_to_start_partition_generationto be a copy to avoid mutating the caller’s list, wdyt?
Since you pop/append/reorder, copying is a cheap way to avoid surprising aliasing.Proposed tweak
- self._stream_instances_to_start_partition_generation = stream_instances_to_read_from + self._stream_instances_to_start_partition_generation = list(stream_instances_to_read_from)Also applies to: 240-335
367-407: Parent collection relies on private attributes; would a small helper/getattrchain improve readability, wdyt?
The logic is sound, but the nestedhasattrchain is a bit hard to scan and easy to break during refactors.
📜 Review details
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (9)
airbyte_cdk/sources/concurrent_source/concurrent_read_processor.pyairbyte_cdk/sources/declarative/declarative_component_schema.yamlairbyte_cdk/sources/declarative/models/declarative_component_schema.pyairbyte_cdk/sources/declarative/parsers/model_to_component_factory.pyairbyte_cdk/sources/streams/concurrent/abstract_stream.pyairbyte_cdk/sources/streams/concurrent/adapters.pyairbyte_cdk/sources/streams/concurrent/default_stream.pyunit_tests/sources/declarative/parsers/test_model_to_component_factory.pyunit_tests/sources/streams/concurrent/test_concurrent_read_processor.py
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2024-12-11T16:34:46.319Z
Learnt from: pnilan
Repo: airbytehq/airbyte-python-cdk PR: 0
File: :0-0
Timestamp: 2024-12-11T16:34:46.319Z
Learning: In the airbytehq/airbyte-python-cdk repository, the `declarative_component_schema.py` file is auto-generated from `declarative_component_schema.yaml` and should be ignored in the recommended reviewing order.
Applied to files:
airbyte_cdk/sources/declarative/models/declarative_component_schema.pyairbyte_cdk/sources/declarative/declarative_component_schema.yaml
📚 Learning: 2024-11-18T23:40:06.391Z
Learnt from: ChristoGrab
Repo: airbytehq/airbyte-python-cdk PR: 58
File: airbyte_cdk/sources/declarative/yaml_declarative_source.py:0-0
Timestamp: 2024-11-18T23:40:06.391Z
Learning: When modifying the `YamlDeclarativeSource` class in `airbyte_cdk/sources/declarative/yaml_declarative_source.py`, avoid introducing breaking changes like altering method signatures within the scope of unrelated PRs. Such changes should be addressed separately to minimize impact on existing implementations.
Applied to files:
airbyte_cdk/sources/declarative/declarative_component_schema.yaml
🧬 Code graph analysis (8)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (3)
airbyte_cdk/sources/streams/concurrent/adapters.py (1)
block_simultaneous_read(200-202)airbyte_cdk/sources/streams/concurrent/abstract_stream.py (1)
block_simultaneous_read(89-101)airbyte_cdk/sources/streams/concurrent/default_stream.py (1)
block_simultaneous_read(100-102)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (3)
airbyte_cdk/sources/streams/concurrent/adapters.py (1)
block_simultaneous_read(200-202)airbyte_cdk/sources/streams/concurrent/abstract_stream.py (1)
block_simultaneous_read(89-101)airbyte_cdk/sources/streams/concurrent/default_stream.py (1)
block_simultaneous_read(100-102)
airbyte_cdk/sources/streams/concurrent/abstract_stream.py (2)
airbyte_cdk/sources/streams/concurrent/adapters.py (1)
block_simultaneous_read(200-202)airbyte_cdk/sources/streams/concurrent/default_stream.py (1)
block_simultaneous_read(100-102)
airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py (5)
airbyte_cdk/sources/streams/concurrent/adapters.py (3)
name(180-181)block_simultaneous_read(200-202)stream_name(316-317)airbyte_cdk/sources/streams/concurrent/abstract_stream.py (3)
name(50-53)block_simultaneous_read(89-101)generate_partitions(42-46)airbyte_cdk/sources/streams/concurrent/default_stream.py (3)
name(46-47)block_simultaneous_read(100-102)generate_partitions(42-43)airbyte_cdk/sources/concurrent_source/partition_generation_completed_sentinel.py (1)
PartitionGenerationCompletedSentinel(9-24)airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py (1)
stream_name(118-119)
airbyte_cdk/sources/streams/concurrent/adapters.py (2)
airbyte_cdk/sources/streams/concurrent/abstract_stream.py (1)
block_simultaneous_read(89-101)airbyte_cdk/sources/streams/concurrent/default_stream.py (1)
block_simultaneous_read(100-102)
airbyte_cdk/sources/streams/concurrent/default_stream.py (2)
airbyte_cdk/sources/streams/concurrent/adapters.py (1)
block_simultaneous_read(200-202)airbyte_cdk/sources/streams/concurrent/abstract_stream.py (1)
block_simultaneous_read(89-101)
unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py (4)
airbyte_cdk/sources/streams/concurrent/adapters.py (5)
name(180-181)block_simultaneous_read(200-202)as_airbyte_stream(213-214)cursor(196-197)stream_name(316-317)airbyte_cdk/sources/streams/concurrent/default_stream.py (4)
name(46-47)block_simultaneous_read(100-102)as_airbyte_stream(60-84)cursor(96-97)airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py (5)
ConcurrentReadProcessor(31-439)start_next_partition_generator(228-335)on_partition_generation_completed(90-134)_collect_all_parent_stream_names(367-406)_on_stream_is_done(408-439)airbyte_cdk/sources/concurrent_source/partition_generation_completed_sentinel.py (1)
PartitionGenerationCompletedSentinel(9-24)
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (3)
airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py (1)
propagate_types_and_parameters(87-188)airbyte_cdk/sources/streams/concurrent/default_stream.py (2)
name(46-47)block_simultaneous_read(100-102)airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
create_component(825-858)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
- GitHub Check: Publish SDM to DockerHub
🔇 Additional comments (4)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)
2498-2502: Field type should match YAML schema: makeblock_simultaneous_reada non-Optionalstr.The YAML source defines this as
type: stringwithdefault: "", but the generated Python schema makes itOptional[str]. While the factory normalization (model.block_simultaneous_read or "") ensures runtime safety, the type inconsistency is worth fixing. Consider updating the codegen to generatestrinstead ofOptional[str]to match the YAML definition and avoid the appearance of a nullable value when the schema itself doesn't allow it.⛔ Skipped due to learnings
Learnt from: pnilan Repo: airbytehq/airbyte-python-cdk PR: 0 File: :0-0 Timestamp: 2024-12-11T16:34:46.319Z Learning: In the airbytehq/airbyte-python-cdk repository, the `declarative_component_schema.py` file is auto-generated from `declarative_component_schema.yaml` and should be ignored in the recommended reviewing order.Learnt from: ChristoGrab Repo: airbytehq/airbyte-python-cdk PR: 58 File: airbyte_cdk/sources/declarative/yaml_declarative_source.py:0-0 Timestamp: 2024-11-18T23:40:06.391Z Learning: When modifying the `YamlDeclarativeSource` class in `airbyte_cdk/sources/declarative/yaml_declarative_source.py`, avoid introducing breaking changes like altering method signatures within the scope of unrelated PRs. Such changes should be addressed separately to minimize impact on existing implementations.airbyte_cdk/sources/streams/concurrent/abstract_stream.py (1)
88-101: LGTM: good backward-compatible default + clear contract.Non-abstract default
""is a solid choice to avoid forcing implementations.airbyte_cdk/sources/streams/concurrent/adapters.py (1)
199-203: LGTM: simple, correct delegation.airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py (1)
425-439: Cleanup on stream completion looks good.
Removing only the completed stream (not parents) from active tracking matches the intent and keeps parent lifecycle separate.
airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Outdated
Show resolved
Hide resolved
|
/autofix
|
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In @unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py:
- Around line 1238-1248: The failing linter flagged formatting in the assertion
block; run the code formatter (ruff format) to fix whitespace/line-wrapping and
ensure list comprehension and multi-line statements follow project style. In
practice, reformat the block that builds children_active and the two assertions
referencing handler._active_stream_names so the list comprehension and asserts
conform to ruff rules (e.g., proper indentation and line breaks).
🧹 Nitpick comments (1)
unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py (1)
948-974: Misleading test name - consider renaming?The test name
test_retry_blocked_stream_after_blocker_donesuggests it tests the retry mechanism after a blocker completes, but the actual behavior (as the docstring correctly states) is verifying that streams with different groups don't block each other.Would something like
test_different_groups_dont_block_each_otherortest_streams_with_different_groups_run_concurrentlybetter describe the test's intent, wdyt?
📜 Review details
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py
🧰 Additional context used
🧬 Code graph analysis (1)
unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py (2)
airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py (5)
ConcurrentReadProcessor(31-439)start_next_partition_generator(228-335)on_partition_generation_completed(90-134)_collect_all_parent_stream_names(367-406)_on_stream_is_done(408-439)airbyte_cdk/sources/concurrent_source/partition_generation_completed_sentinel.py (1)
PartitionGenerationCompletedSentinel(9-24)
🪛 GitHub Actions: Linters
unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py
[warning] 1238-1248: Ruff format suggested code style changes in test_concurrent_read_processor.py. Run 'ruff format' to apply formatting changes.
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (13)
- GitHub Check: Check: source-hardcoded-records
- GitHub Check: Check: source-intercom
- GitHub Check: Check: source-pokeapi
- GitHub Check: Check: source-shopify
- GitHub Check: Check: destination-motherduck
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.12, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (All, Python 3.13, Ubuntu)
- GitHub Check: Manifest Server Docker Image Build
- GitHub Check: SDM Docker Image Build
- GitHub Check: Analyze (python)
🔇 Additional comments (3)
unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py (3)
800-838: LGTM!The helper methods
_create_mock_streamand_create_mock_stream_with_parentare well-structured and effectively reduce boilerplate across the tests. The parent relationship mocking correctly mirrors the retriever/partition_router pattern used in the actual implementation.
1249-1308: Nice edge case coverage!These two tests (
test_child_without_flag_blocked_by_parent_with_flagandtest_child_with_flag_not_blocked_by_parent_without_flag) nicely cover the asymmetric blocking behavior. This ensures the feature correctly respects the parent's blocking group regardless of the child's configuration, which is important for the use case where a parent API endpoint needs protection.
1310-1347: LGTM - validates the core group-based blocking feature!This test is particularly important as it validates the primary use case mentioned in the PR objectives (source-intercom). It confirms that unrelated streams sharing a resource (same group name) correctly block each other, which is the key feature of
block_simultaneous_read.
…tream_groups - Remove block_simultaneous_read property from DeclarativeStream schema - Add top-level stream_groups with StreamGroup and BlockSimultaneousSyncsAction - ConcurrentDeclarativeSource parses stream_groups and injects block_simultaneous_read into stream configs before factory processing - Internal blocking logic in ConcurrentReadProcessor unchanged - Update tests for new interface Co-Authored-By: unknown <>
- Add stream_name_to_group parameter to ModelToComponentFactory.__init__() - Add set_stream_name_to_group() method for post-init configuration - Factory now looks up block_simultaneous_read from its own mapping - Remove config injection hack from ConcurrentDeclarativeSource.streams() - Update tests to use factory-based approach instead of extra fields Co-Authored-By: unknown <>
…oded dict - Test now defines stream_groups with references in the manifest YAML - Uses _build_stream_name_to_group() to derive the mapping from manifest - Removed test_set_stream_name_to_group (redundant with the manifest-based test) - Added ConcurrentDeclarativeSource import for _build_stream_name_to_group Co-Authored-By: unknown <>
Child streams that depend on parent streams should not be in the same group, as this would cause a deadlock (child needs to read parent). Co-Authored-By: unknown <>
Co-Authored-By: unknown <>
- Factory now owns the stream_groups resolution via set_stream_groups(manifest) - ConcurrentDeclarativeSource just calls factory.set_stream_groups(manifest) - Removed _build_stream_name_to_group from ConcurrentDeclarativeSource - Updated tests to use factory's _build_stream_name_to_group directly Co-Authored-By: unknown <>
…of factory - Removed _build_stream_name_to_group, set_stream_groups, _stream_name_to_group from factory - Factory no longer knows about stream_groups at all - Added _apply_stream_groups to ConcurrentDeclarativeSource: creates streams first, then sets block_simultaneous_read on matching DefaultStream instances - Added block_simultaneous_read setter on DefaultStream - Replaced mock-based tests with parametrized tests using real DefaultStream instances Co-Authored-By: unknown <>
Replace hasattr chain in ConcurrentReadProcessor._collect_all_parent_stream_names with DefaultStream.get_partition_router() that safely traverses the internal partition_generator -> stream_slicer -> partition_router chain using isinstance checks. Co-Authored-By: unknown <>
_apply_stream_groups now checks that no stream shares a group with any of its parent streams (via get_partition_router). Raises ValueError at config time if a deadlock-causing configuration is detected. Co-Authored-By: unknown <>
… done Adds a safety check in is_done() that raises AirbyteTracedException (system_error) if streams remain in the partition generation queue after all streams are marked done. Also moves inline imports to module level and updates test mocks to use DefaultStream with get_partition_router(). Co-Authored-By: unknown <>
…d concurrent_declarative_source.py Co-Authored-By: unknown <>
…ct parent streams Co-Authored-By: unknown <>
…artition_router() Co-Authored-By: unknown <>
Co-Authored-By: unknown <>
…ps check, and get_partition_router Co-Authored-By: unknown <>
…rents Co-Authored-By: unknown <>
…hema definitions Co-Authored-By: unknown <>
…er reading Co-Authored-By: unknown <>
fa6bdbe to
b56b376
Compare
unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py
Dismissed
Show dismissed
Hide dismissed
Co-Authored-By: alfredo.garcia@airbyte.io <freddy.garcia7.fg@gmail.com>
…atch actual error message Co-Authored-By: alfredo.garcia@airbyte.io <freddy.garcia7.fg@gmail.com>
Summary
Adds a
block_simultaneous_readfeature to prevent multiple streams from running concurrently when they share the same resource. The feature is configured via a top-levelstream_groupsstructure in the manifest (rather than per-stream properties).Interface
stream_groupsis a top-level manifest property (alongsidestreams,definitions,check, etc.)crm_objects), a list of stream$refreferences, and anactionBlockSimultaneousSyncsActionis the only action type for nowImplementation
Schema (
declarative_component_schema.yaml): Removed per-streamblock_simultaneous_readproperty fromDeclarativeStream. Added top-levelstream_groupswithStreamGroupandBlockSimultaneousSyncsActiondefinitions.Pydantic models (
declarative_component_schema.py): AddedBlockSimultaneousSyncsAction,StreamGroupclasses. Addedstream_groups: Optional[Dict[str, StreamGroup]]toDeclarativeSource1/DeclarativeSource2.ConcurrentDeclarativeSource._apply_stream_groups(): Resolves stream groups from actual stream instances after stream creation. Validates that no stream shares a group with any of its parent streams (deadlock prevention). Setsblock_simultaneous_readon matchingDefaultStreaminstances.DefaultStream.get_partition_router(): New helper method that safely traverses thepartition_generator → stream_slicer → partition_routerchain usingisinstancechecks, replacing thehasattrchains inConcurrentReadProcessor.ConcurrentReadProcessor(core blocking logic): Uses group-based deferral/retry with parent-child awareness. Addedis_done()safety check that raisesAirbyteTracedExceptionif streams remain in the partition generation queue after all streams are marked done.Blocking Behavior
First use case: source-intercom — prevents duplicate concurrent requests to the companies endpoint.
Resolves: https://github.com/airbytehq/oncall/issues/8346
Updates since last revision
get_partition_router()helper onDefaultStream. Replaces messyhasattrchains inConcurrentReadProcessor._collect_all_parent_stream_names()with a clean method that usesisinstancechecks to traverseStreamSlicerPartitionGenerator → ConcurrentPerPartitionCursor → partition_router._apply_stream_groups(). RaisesValueErrorat config time if a stream and any of its parent streams are in the same blocking group.is_done()safety check. RaisesAirbyteTracedException(system_error) if_stream_instances_to_start_partition_generationis not empty after all streams are marked done — catches stuck-stream bugs at runtime.ConcurrentPerPartitionCursor,StreamSlicerPartitionGenerator,PartitionRouter,SubstreamPartitionRouter) are now at the top of files per Python coding standards.Review & Testing Checklist for Human
ValueErrorat config time with a clear message.get_partition_router()handles all stream types: Test with bothDefaultStream(withStreamSlicerPartitionGenerator) and legacyDeclarativeStreampaths. Confirm it returnsNonefor streams without partition routers.is_done()check doesn't mask real issues: If the safety check triggers, investigate the root cause (why streams remained in the queue) rather than just fixing the symptom.stream_groupsconfig format to confirm end-to-end blocking behavior and verify no deadlocks occur.ConcurrentPerPartitionCursor,StreamSlicerPartitionGenerator,SubstreamPartitionRouter) should not cause circular dependencies in production connectors.Recommended test plan:
stream_groupsreferencing 2+ streams via$refConcurrentDeclarativeSourceand verify streams in the same group are read sequentiallyNotes
stream_groupscontinue to work unchanged.ConcurrentReadProcessorstill uses string-based group identifiers, so the blocking logic is unchanged.test_read_with_concurrent_and_synchronous_streams_with_concurrent_statefails with SQLite locking issues (pre-existing, not caused by these changes).Summary by CodeRabbit
New Features
get_partition_router()onDefaultStreamfor safe partition router access.is_done()to detect stuck streams in partition generation queue.Bug Fixes
Tests