-
Notifications
You must be signed in to change notification settings - Fork 40
feat: Add block_simultaneous_read with stream_groups + fix default HTTP timeout #940
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
52900d1
0b8561e
c6e89c1
4868f26
6b614b3
46e7c19
b82dfbc
0c0f4ef
cc1d47c
6f5b4ea
d5ff69e
5ed978f
b41a4b3
a231e8c
d3c8067
3a3cf8b
7ccf6cf
8d3b1f2
77b25c5
3014458
48711d3
5e4ed38
0e710d2
a0fbd89
d8ef4fa
f000313
68850ec
b183f80
e1bd626
29951b2
b0537da
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -76,12 +76,19 @@ | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import ( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ModelToComponentFactory, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from airbyte_cdk.sources.declarative.partition_routers.grouping_partition_router import ( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| GroupingPartitionRouter, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import ( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| SubstreamPartitionRouter, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from airbyte_cdk.sources.declarative.resolvers import COMPONENTS_RESOLVER_TYPE_MAPPING | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from airbyte_cdk.sources.declarative.spec.spec import Spec | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from airbyte_cdk.sources.declarative.types import Config, ConnectionDefinition | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from airbyte_cdk.sources.message.concurrent_repository import ConcurrentMessageRepository | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from airbyte_cdk.sources.message.repository import InMemoryMessageRepository | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from airbyte_cdk.sources.streams.concurrent.partitions.types import QueueItem | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from airbyte_cdk.sources.utils.slice_logger import ( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| AlwaysLogSliceLogger, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -405,6 +412,8 @@ def streams(self, config: Mapping[str, Any]) -> List[AbstractStream]: # type: i | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if api_budget_model: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._constructor.set_api_budget(api_budget_model, self._config) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| prepared_configs = self._initialize_cache_for_parent_streams(deepcopy(stream_configs)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| source_streams = [ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._constructor.create_component( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -416,10 +425,70 @@ def streams(self, config: Mapping[str, Any]) -> List[AbstractStream]: # type: i | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._config, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| emit_connector_builder_messages=self._emit_connector_builder_messages, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| for stream_config in self._initialize_cache_for_parent_streams(deepcopy(stream_configs)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| for stream_config in prepared_configs | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._apply_stream_groups(source_streams) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return source_streams | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def _apply_stream_groups(self, streams: List[AbstractStream]) -> None: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| """Set block_simultaneous_read on streams based on the manifest's stream_groups config. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Iterates over the resolved manifest's stream_groups and matches group membership | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| against actual created stream instances by name. Validates that no stream shares a | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| group with any of its parent streams, which would cause a deadlock. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| stream_groups = self._source_config.get("stream_groups", {}) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if not stream_groups: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # Build stream_name -> group_name mapping from the resolved manifest | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| stream_name_to_group: Dict[str, str] = {} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| for group_name, group_config in stream_groups.items(): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| for stream_ref in group_config.get("streams", []): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if isinstance(stream_ref, dict): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| stream_name = stream_ref.get("name", "") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if stream_name: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| stream_name_to_group[stream_name] = group_name | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # Validate no stream shares a group with any of its ancestor streams | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| stream_name_to_instance: Dict[str, AbstractStream] = {s.name: s for s in streams} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+446
to
+457
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| # Build stream_name -> group_name mapping from the resolved manifest | |
| stream_name_to_group: Dict[str, str] = {} | |
| for group_name, group_config in stream_groups.items(): | |
| for stream_ref in group_config.get("streams", []): | |
| if isinstance(stream_ref, dict): | |
| stream_name = stream_ref.get("name", "") | |
| if stream_name: | |
| stream_name_to_group[stream_name] = group_name | |
| # Validate no stream shares a group with any of its ancestor streams | |
| stream_name_to_instance: Dict[str, AbstractStream] = {s.name: s for s in streams} | |
| # Map existing stream instance names for reliable resolution of references | |
| stream_name_to_instance: Dict[str, AbstractStream] = {s.name: s for s in streams} | |
| def _resolve_stream_ref_to_name(stream_ref: Any) -> Optional[str]: | |
| """ | |
| Normalize a stream reference entry from stream_groups[*].streams into a stream name. | |
| Supported formats: | |
| - {"name": "<stream_name>"} | |
| - {"$ref": "#/definitions/<stream_name_or_key>"} | |
| - "#/definitions/<stream_name_or_key>" | |
| """ | |
| # Direct dict with explicit name (current behavior) | |
| if isinstance(stream_ref, Mapping): | |
| name_val = stream_ref.get("name") | |
| if isinstance(name_val, str) and name_val: | |
| return name_val | |
| ref_val = stream_ref.get("$ref") | |
| if isinstance(ref_val, str): | |
| ref_str = ref_val | |
| else: | |
| return None | |
| elif isinstance(stream_ref, str): | |
| ref_str = stream_ref | |
| else: | |
| return None | |
| # At this point, ref_str is a reference string. For simple JSON-pointer-like | |
| # references such as "#/definitions/<stream_key>", use the last path segment. | |
| ref_str = ref_str.strip() | |
| if not ref_str: | |
| return None | |
| # Ignore fragment prefix "#/" if present, then take the last segment | |
| if ref_str.startswith("#/"): | |
| path = ref_str[2:] | |
| elif ref_str.startswith("#"): | |
| path = ref_str[1:] | |
| else: | |
| path = ref_str | |
| candidate = path.split("/")[-1] if path else "" | |
| if not candidate: | |
| return None | |
| # Only return names that correspond to an actual created stream | |
| return candidate if candidate in stream_name_to_instance else None | |
| # Build stream_name -> group_name mapping from the resolved manifest | |
| stream_name_to_group: Dict[str, str] = {} | |
| for group_name, group_config in stream_groups.items(): | |
| for stream_ref in group_config.get("streams", []): | |
| stream_name = _resolve_stream_ref_to_name(stream_ref) | |
| if stream_name: | |
| stream_name_to_group[stream_name] = group_name | |
| # Validate no stream shares a group with any of its ancestor streams |
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -45,6 +45,15 @@ properties: | |||||||||||||||||||||||||||||||||||||||
| "$ref": "#/definitions/ConcurrencyLevel" | ||||||||||||||||||||||||||||||||||||||||
| api_budget: | ||||||||||||||||||||||||||||||||||||||||
| "$ref": "#/definitions/HTTPAPIBudget" | ||||||||||||||||||||||||||||||||||||||||
| stream_groups: | ||||||||||||||||||||||||||||||||||||||||
| title: Stream Groups | ||||||||||||||||||||||||||||||||||||||||
| description: > | ||||||||||||||||||||||||||||||||||||||||
| Groups of streams that share a common resource and should not be read simultaneously. | ||||||||||||||||||||||||||||||||||||||||
| Each group defines a set of stream references and an action that controls how concurrent | ||||||||||||||||||||||||||||||||||||||||
| reads are managed. Only applies to ConcurrentDeclarativeSource. | ||||||||||||||||||||||||||||||||||||||||
| type: object | ||||||||||||||||||||||||||||||||||||||||
| additionalProperties: | ||||||||||||||||||||||||||||||||||||||||
| "$ref": "#/definitions/StreamGroup" | ||||||||||||||||||||||||||||||||||||||||
| max_concurrent_async_job_count: | ||||||||||||||||||||||||||||||||||||||||
| title: Maximum Concurrent Asynchronous Jobs | ||||||||||||||||||||||||||||||||||||||||
| description: Maximum number of concurrent asynchronous jobs to run. This property is only relevant for sources/streams that support asynchronous job execution through the AsyncRetriever (e.g. a report-based stream that initiates a job, polls the job status, and then fetches the job results). This is often set by the API's maximum number of concurrent jobs on the account level. Refer to the API's documentation for this information. | ||||||||||||||||||||||||||||||||||||||||
|
|
@@ -4126,6 +4135,43 @@ definitions: | |||||||||||||||||||||||||||||||||||||||
| - "$ref": "#/definitions/ConfigRemoveFields" | ||||||||||||||||||||||||||||||||||||||||
| - "$ref": "#/definitions/CustomConfigTransformation" | ||||||||||||||||||||||||||||||||||||||||
| default: [] | ||||||||||||||||||||||||||||||||||||||||
| StreamGroup: | ||||||||||||||||||||||||||||||||||||||||
| title: Stream Group | ||||||||||||||||||||||||||||||||||||||||
| description: > | ||||||||||||||||||||||||||||||||||||||||
| A group of streams that share a common resource and should not be read simultaneously. | ||||||||||||||||||||||||||||||||||||||||
| Streams in the same group will be blocked from concurrent reads based on the specified action. | ||||||||||||||||||||||||||||||||||||||||
| type: object | ||||||||||||||||||||||||||||||||||||||||
| required: | ||||||||||||||||||||||||||||||||||||||||
| - streams | ||||||||||||||||||||||||||||||||||||||||
| - action | ||||||||||||||||||||||||||||||||||||||||
| properties: | ||||||||||||||||||||||||||||||||||||||||
| streams: | ||||||||||||||||||||||||||||||||||||||||
| title: Streams | ||||||||||||||||||||||||||||||||||||||||
| description: > | ||||||||||||||||||||||||||||||||||||||||
| List of references to streams that belong to this group. | ||||||||||||||||||||||||||||||||||||||||
| type: array | ||||||||||||||||||||||||||||||||||||||||
| items: | ||||||||||||||||||||||||||||||||||||||||
| anyOf: | ||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+4151
to
+4154
|
||||||||||||||||||||||||||||||||||||||||
| List of references to streams that belong to this group. | |
| type: array | |
| items: | |
| anyOf: | |
| List of references to streams that belong to this group. | |
| Each entry may be a reference string (e.g. "#/definitions/my_stream"), | |
| a $ref-style object (e.g. { "$ref": "#/definitions/my_stream" }), | |
| or an inline DeclarativeStream definition. | |
| type: array | |
| items: | |
| anyOf: | |
| - type: string | |
| - type: object | |
| required: | |
| - "$ref" | |
| properties: | |
| "$ref": | |
| type: string | |
| additionalProperties: false |
| Original file line number | Diff line number | Diff line change | ||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -1,5 +1,3 @@ | ||||||||||||||
| # Copyright (c) 2025 Airbyte, Inc., all rights reserved. | ||||||||||||||
|
|
||||||||||||||
| # generated by datamodel-codegen: | ||||||||||||||
| # filename: declarative_component_schema.yaml | ||||||||||||||
|
|
||||||||||||||
|
|
@@ -2359,6 +2357,11 @@ class Config: | |||||||||||||
| spec: Optional[Spec] = None | ||||||||||||||
| concurrency_level: Optional[ConcurrencyLevel] = None | ||||||||||||||
| api_budget: Optional[HTTPAPIBudget] = None | ||||||||||||||
| stream_groups: Optional[Dict[str, StreamGroup]] = Field( | ||||||||||||||
| None, | ||||||||||||||
| description="Groups of streams that share a common resource and should not be read simultaneously. Each group defines a set of stream references and an action that controls how concurrent reads are managed. Only applies to ConcurrentDeclarativeSource.", | ||||||||||||||
| title="Stream Groups", | ||||||||||||||
| ) | ||||||||||||||
| max_concurrent_async_job_count: Optional[Union[int, str]] = Field( | ||||||||||||||
| None, | ||||||||||||||
| description="Maximum number of concurrent asynchronous jobs to run. This property is only relevant for sources/streams that support asynchronous job execution through the AsyncRetriever (e.g. a report-based stream that initiates a job, polls the job status, and then fetches the job results). This is often set by the API's maximum number of concurrent jobs on the account level. Refer to the API's documentation for this information.", | ||||||||||||||
|
|
@@ -2394,6 +2397,11 @@ class Config: | |||||||||||||
| spec: Optional[Spec] = None | ||||||||||||||
| concurrency_level: Optional[ConcurrencyLevel] = None | ||||||||||||||
| api_budget: Optional[HTTPAPIBudget] = None | ||||||||||||||
| stream_groups: Optional[Dict[str, StreamGroup]] = Field( | ||||||||||||||
| None, | ||||||||||||||
| description="Groups of streams that share a common resource and should not be read simultaneously. Each group defines a set of stream references and an action that controls how concurrent reads are managed. Only applies to ConcurrentDeclarativeSource.", | ||||||||||||||
| title="Stream Groups", | ||||||||||||||
| ) | ||||||||||||||
| max_concurrent_async_job_count: Optional[Union[int, str]] = Field( | ||||||||||||||
| None, | ||||||||||||||
| description="Maximum number of concurrent asynchronous jobs to run. This property is only relevant for sources/streams that support asynchronous job execution through the AsyncRetriever (e.g. a report-based stream that initiates a job, polls the job status, and then fetches the job results). This is often set by the API's maximum number of concurrent jobs on the account level. Refer to the API's documentation for this information.", | ||||||||||||||
|
|
@@ -3055,6 +3063,23 @@ class AsyncRetriever(BaseModel): | |||||||||||||
| parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") | ||||||||||||||
|
|
||||||||||||||
|
|
||||||||||||||
| class BlockSimultaneousSyncsAction(BaseModel): | ||||||||||||||
| type: Literal["BlockSimultaneousSyncsAction"] | ||||||||||||||
|
|
||||||||||||||
|
|
||||||||||||||
| class StreamGroup(BaseModel): | ||||||||||||||
| streams: List[str] = Field( | ||||||||||||||
| ..., | ||||||||||||||
| description='List of references to streams that belong to this group. Use JSON references to stream definitions (e.g., "#/definitions/my_stream").', | ||||||||||||||
|
Comment on lines
+3071
to
+3073
|
||||||||||||||
| streams: List[str] = Field( | |
| ..., | |
| description='List of references to streams that belong to this group. Use JSON references to stream definitions (e.g., "#/definitions/my_stream").', | |
| streams: List[Union[str, "DeclarativeStream"]] = Field( | |
| ..., | |
| description='List of streams that belong to this group. Each item can be either a JSON reference to a stream definition (e.g., "#/definitions/my_stream") or an inlined DeclarativeStream object.', |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -6,6 +6,13 @@ | |
| from typing import Any, Callable, Iterable, List, Mapping, Optional, Union | ||
|
|
||
| from airbyte_cdk.models import AirbyteStream, SyncMode | ||
| from airbyte_cdk.sources.declarative.incremental.concurrent_partition_cursor import ( | ||
| ConcurrentPerPartitionCursor, | ||
| ) | ||
| from airbyte_cdk.sources.declarative.partition_routers.partition_router import PartitionRouter | ||
| from airbyte_cdk.sources.declarative.stream_slicers.declarative_partition_generator import ( | ||
| StreamSlicerPartitionGenerator, | ||
| ) | ||
| from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream | ||
| from airbyte_cdk.sources.streams.concurrent.availability_strategy import StreamAvailability | ||
| from airbyte_cdk.sources.streams.concurrent.cursor import Cursor, CursorField | ||
|
|
@@ -26,6 +33,7 @@ def __init__( | |
| cursor: Cursor, | ||
| namespace: Optional[str] = None, | ||
| supports_file_transfer: bool = False, | ||
| block_simultaneous_read: str = "", | ||
| ) -> None: | ||
| self._stream_partition_generator = partition_generator | ||
| self._name = name | ||
|
|
@@ -36,6 +44,7 @@ def __init__( | |
| self._cursor = cursor | ||
| self._namespace = namespace | ||
| self._supports_file_transfer = supports_file_transfer | ||
| self._block_simultaneous_read = block_simultaneous_read | ||
|
|
||
| def generate_partitions(self) -> Iterable[Partition]: | ||
| yield from self._stream_partition_generator.generate() | ||
|
|
@@ -94,6 +103,24 @@ def log_stream_sync_configuration(self) -> None: | |
| def cursor(self) -> Cursor: | ||
| return self._cursor | ||
|
|
||
| @property | ||
| def block_simultaneous_read(self) -> str: | ||
| """Returns the blocking group name for this stream, or empty string if no blocking""" | ||
| return self._block_simultaneous_read | ||
|
|
||
| @block_simultaneous_read.setter | ||
| def block_simultaneous_read(self, value: str) -> None: | ||
| self._block_simultaneous_read = value | ||
|
|
||
| def get_partition_router(self) -> PartitionRouter | None: | ||
| """Return the partition router for this stream, or None if not available.""" | ||
| if not isinstance(self._stream_partition_generator, StreamSlicerPartitionGenerator): | ||
| return None | ||
| stream_slicer = self._stream_partition_generator._stream_slicer | ||
| if not isinstance(stream_slicer, ConcurrentPerPartitionCursor): | ||
| return None | ||
| return stream_slicer._partition_router | ||
|
Comment on lines
+115
to
+122
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could
🤖 Prompt for AI Agents |
||
|
|
||
| def check_availability(self) -> StreamAvailability: | ||
| """ | ||
| Check stream availability by attempting to read the first record of the stream. | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -85,6 +85,8 @@ def monkey_patched_get_item(self, key): # type: ignore # this interface is a co | |
| class HttpClient: | ||
| _DEFAULT_MAX_RETRY: int = 5 | ||
| _DEFAULT_MAX_TIME: int = 60 * 10 | ||
| _DEFAULT_CONNECT_TIMEOUT: int = 30 | ||
| _DEFAULT_READ_TIMEOUT: int = 300 | ||
|
Comment on lines
+88
to
+89
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we keep an override path for these new default timeouts before landing this, wdyt? This now applies Also applies to: 593-601 🤖 Prompt for AI Agents |
||
| _ACTIONS_TO_RETRY_ON = { | ||
| ResponseAction.RETRY, | ||
| ResponseAction.RATE_LIMITED, | ||
|
|
@@ -586,11 +588,17 @@ def send_request( | |
| verify=request_kwargs.get("verify"), | ||
| cert=request_kwargs.get("cert"), | ||
| ) | ||
| request_kwargs = {**request_kwargs, **env_settings} | ||
| mutable_request_kwargs: Dict[str, Any] = {**request_kwargs, **env_settings} | ||
|
|
||
| if "timeout" not in mutable_request_kwargs: | ||
| mutable_request_kwargs["timeout"] = ( | ||
| self._DEFAULT_CONNECT_TIMEOUT, | ||
| self._DEFAULT_READ_TIMEOUT, | ||
| ) | ||
|
|
||
| response: requests.Response = self._send_with_retry( | ||
| request=request, | ||
| request_kwargs=request_kwargs, | ||
| request_kwargs=mutable_request_kwargs, | ||
| log_formatter=log_formatter, | ||
| exit_on_rate_limit=exit_on_rate_limit, | ||
| ) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5281,6 +5281,62 @@ def test_catalog_defined_cursor_field_stream_missing(): | |
| assert stream._cursor_field.supports_catalog_defined_cursor_field == True | ||
|
|
||
|
|
||
| def test_block_simultaneous_read_from_stream_groups(): | ||
| """Test that factory-created streams default to empty block_simultaneous_read. | ||
|
|
||
| The factory no longer handles stream_groups — that's done by | ||
| ConcurrentDeclarativeSource._apply_stream_groups after stream creation. | ||
| This test verifies the factory creates streams without group info. | ||
| """ | ||
| content = """ | ||
| definitions: | ||
| parent_stream: | ||
| type: DeclarativeStream | ||
| name: "parent" | ||
| primary_key: "id" | ||
| retriever: | ||
| type: SimpleRetriever | ||
| requester: | ||
| type: HttpRequester | ||
| url_base: "https://api.example.com" | ||
| path: "/parent" | ||
| http_method: "GET" | ||
| authenticator: | ||
| type: BearerAuthenticator | ||
| api_token: "{{ config['api_key'] }}" | ||
| record_selector: | ||
| type: RecordSelector | ||
| extractor: | ||
| type: DpathExtractor | ||
| field_path: [] | ||
| schema_loader: | ||
| type: InlineSchemaLoader | ||
| schema: | ||
| type: object | ||
| properties: | ||
| id: | ||
| type: string | ||
| """ | ||
|
|
||
| config = {"api_key": "test_key"} | ||
|
|
||
| parsed_manifest = YamlDeclarativeSource._parse(content) | ||
| resolved_manifest = resolver.preprocess_manifest(parsed_manifest) | ||
|
|
||
| factory = ModelToComponentFactory() | ||
|
|
||
| parent_manifest = transformer.propagate_types_and_parameters( | ||
| "", resolved_manifest["definitions"]["parent_stream"], {} | ||
| ) | ||
| parent_stream: DefaultStream = factory.create_component( | ||
| model_type=DeclarativeStreamModel, component_definition=parent_manifest, config=config | ||
| ) | ||
|
|
||
| assert isinstance(parent_stream, DefaultStream) | ||
| assert parent_stream.name == "parent" | ||
| assert parent_stream.block_simultaneous_read == "" | ||
|
|
||
|
Comment on lines
+5284
to
+5338
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could this test include an actual Right now the fixture at Line 5291 never defines 🤖 Prompt for AI Agents |
||
|
|
||
| def get_schema_loader(stream: DefaultStream): | ||
| assert isinstance( | ||
| stream._stream_partition_generator._partition_factory._schema_loader, | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we fail fast on invalid
stream_groupsmembership, wdyt?Lines 446-453 let the last group win for duplicate entries, and Lines 475-490 silently skip unknown or unsupported targets. That makes a typo or double-assignment look like a runtime blocking bug instead of a clear manifest error.
🛠️ Possible guardrails
stream_name_to_group: Dict[str, str] = {} for group_name, group_config in stream_groups.items(): for stream_ref in group_config.get("streams", []): if isinstance(stream_ref, dict): stream_name = stream_ref.get("name", "") if stream_name: + previous_group = stream_name_to_group.get(stream_name) + if previous_group and previous_group != group_name: + raise AirbyteTracedException( + message=f"Stream '{stream_name}' is assigned to multiple stream groups.", + internal_message=( + f"Stream '{stream_name}' appears in both " + f"'{previous_group}' and '{group_name}'." + ), + failure_type=FailureType.config_error, + ) stream_name_to_group[stream_name] = group_name # Validate no stream shares a group with any of its ancestor streams stream_name_to_instance: Dict[str, AbstractStream] = {s.name: s for s in streams} + invalid_targets = [ + name + for name in stream_name_to_group + if name not in stream_name_to_instance + or not isinstance(stream_name_to_instance[name], DefaultStream) + ] + if invalid_targets: + raise AirbyteTracedException( + message="stream_groups references an unknown or unsupported stream.", + internal_message=( + "stream_groups entries must target created DefaultStream instances. " + f"Got: {sorted(invalid_targets)}" + ), + failure_type=FailureType.config_error, + )Also applies to: 475-490
🤖 Prompt for AI Agents