Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
52900d1
Add block_simultaneous_read to DefaultStream
tolik0 Dec 30, 2025
0b8561e
Change `block_simultaneous_read` to string
tolik0 Jan 6, 2026
c6e89c1
Fix StreamFacade
tolik0 Jan 6, 2026
4868f26
Fix NoneType error when all streams are blocked
tolik0 Jan 9, 2026
6b614b3
Fix unit tests
tolik0 Jan 12, 2026
46e7c19
Auto-fix lint and format issues
Jan 12, 2026
b82dfbc
Add retry deferred streams on stream completion
tolik0 Jan 13, 2026
0c0f4ef
Fix unit tests
tolik0 Jan 13, 2026
cc1d47c
More fixes for unit tests
tolik0 Jan 13, 2026
6f5b4ea
refactor: replace per-stream block_simultaneous_read with top-level s…
devin-ai-integration[bot] Feb 25, 2026
d5ff69e
refactor: move stream_name_to_group into ModelToComponentFactory
devin-ai-integration[bot] Feb 25, 2026
5ed978f
refactor: use stream_groups manifest in factory test instead of hardc…
devin-ai-integration[bot] Feb 25, 2026
b41a4b3
fix: only include parent stream in stream_groups to avoid deadlock
devin-ai-integration[bot] Feb 26, 2026
a231e8c
style: fix ruff format for long line
devin-ai-integration[bot] Feb 26, 2026
d3c8067
refactor: move _build_stream_name_to_group into ModelToComponentFactory
devin-ai-integration[bot] Feb 27, 2026
3a3cf8b
refactor: resolve stream_groups from actual stream instances instead …
devin-ai-integration[bot] Mar 3, 2026
7ccf6cf
Fix stream format in schema
tolik0 Mar 4, 2026
8d3b1f2
refactor: add get_partition_router() helper to DefaultStream
devin-ai-integration[bot] Mar 4, 2026
77b25c5
feat: validate no parent-child streams share a group to prevent deadlock
devin-ai-integration[bot] Mar 4, 2026
3014458
feat: assert partition generation queue is empty when all streams are…
devin-ai-integration[bot] Mar 4, 2026
48711d3
refactor: move inline imports to module level in default_stream.py an…
devin-ai-integration[bot] Mar 4, 2026
5e4ed38
fix: unwrap GroupingPartitionRouter in get_partition_router() to dete…
devin-ai-integration[bot] Mar 4, 2026
0e710d2
fix: handle GroupingPartitionRouter at call sites instead of in get_p…
devin-ai-integration[bot] Mar 4, 2026
a0fbd89
feat: check active_groups is empty in is_done() safety check
devin-ai-integration[bot] Mar 4, 2026
d8ef4fa
test: add missing unit tests for GroupingPartitionRouter, active_grou…
devin-ai-integration[bot] Mar 4, 2026
f000313
fix: make deadlock validation check all ancestors, not just direct pa…
devin-ai-integration[bot] Mar 4, 2026
68850ec
style: alphabetize StreamGroup and BlockSimultaneousSyncsAction in sc…
devin-ai-integration[bot] Mar 5, 2026
b183f80
style: move BlockSimultaneousSyncsAction next to StreamGroup for easi…
devin-ai-integration[bot] Mar 5, 2026
e1bd626
fix: add default HTTP request timeout to prevent indefinite hangs
devin-ai-integration[bot] Mar 6, 2026
29951b2
fix: match test assertions against AirbyteTracedException message field
devin-ai-integration[bot] Mar 6, 2026
b0537da
style: fix ruff format in test_concurrent_read_processor.py
devin-ai-integration[bot] Mar 6, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
232 changes: 218 additions & 14 deletions airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py

Large diffs are not rendered by default.

71 changes: 70 additions & 1 deletion airbyte_cdk/sources/declarative/concurrent_declarative_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,19 @@
from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import (
ModelToComponentFactory,
)
from airbyte_cdk.sources.declarative.partition_routers.grouping_partition_router import (
GroupingPartitionRouter,
)
from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import (
SubstreamPartitionRouter,
)
from airbyte_cdk.sources.declarative.resolvers import COMPONENTS_RESOLVER_TYPE_MAPPING
from airbyte_cdk.sources.declarative.spec.spec import Spec
from airbyte_cdk.sources.declarative.types import Config, ConnectionDefinition
from airbyte_cdk.sources.message.concurrent_repository import ConcurrentMessageRepository
from airbyte_cdk.sources.message.repository import InMemoryMessageRepository
from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream
from airbyte_cdk.sources.streams.concurrent.partitions.types import QueueItem
from airbyte_cdk.sources.utils.slice_logger import (
AlwaysLogSliceLogger,
Expand Down Expand Up @@ -405,6 +412,8 @@ def streams(self, config: Mapping[str, Any]) -> List[AbstractStream]: # type: i
if api_budget_model:
self._constructor.set_api_budget(api_budget_model, self._config)

prepared_configs = self._initialize_cache_for_parent_streams(deepcopy(stream_configs))

source_streams = [
self._constructor.create_component(
(
Expand All @@ -416,10 +425,70 @@ def streams(self, config: Mapping[str, Any]) -> List[AbstractStream]: # type: i
self._config,
emit_connector_builder_messages=self._emit_connector_builder_messages,
)
for stream_config in self._initialize_cache_for_parent_streams(deepcopy(stream_configs))
for stream_config in prepared_configs
]

self._apply_stream_groups(source_streams)

return source_streams

def _apply_stream_groups(self, streams: List[AbstractStream]) -> None:
"""Set block_simultaneous_read on streams based on the manifest's stream_groups config.

Iterates over the resolved manifest's stream_groups and matches group membership
against actual created stream instances by name. Validates that no stream shares a
group with any of its parent streams, which would cause a deadlock.
"""
stream_groups = self._source_config.get("stream_groups", {})
if not stream_groups:
return

# Build stream_name -> group_name mapping from the resolved manifest
stream_name_to_group: Dict[str, str] = {}
for group_name, group_config in stream_groups.items():
for stream_ref in group_config.get("streams", []):
if isinstance(stream_ref, dict):
stream_name = stream_ref.get("name", "")
if stream_name:
stream_name_to_group[stream_name] = group_name
Comment on lines +446 to +453
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Could we fail fast on invalid stream_groups membership, wdyt?

Lines 446-453 let the last group win for duplicate entries, and Lines 475-490 silently skip unknown or unsupported targets. That makes a typo or double-assignment look like a runtime blocking bug instead of a clear manifest error.

🛠️ Possible guardrails
         stream_name_to_group: Dict[str, str] = {}
         for group_name, group_config in stream_groups.items():
             for stream_ref in group_config.get("streams", []):
                 if isinstance(stream_ref, dict):
                     stream_name = stream_ref.get("name", "")
                     if stream_name:
+                        previous_group = stream_name_to_group.get(stream_name)
+                        if previous_group and previous_group != group_name:
+                            raise AirbyteTracedException(
+                                message=f"Stream '{stream_name}' is assigned to multiple stream groups.",
+                                internal_message=(
+                                    f"Stream '{stream_name}' appears in both "
+                                    f"'{previous_group}' and '{group_name}'."
+                                ),
+                                failure_type=FailureType.config_error,
+                            )
                         stream_name_to_group[stream_name] = group_name

         # Validate no stream shares a group with any of its ancestor streams
         stream_name_to_instance: Dict[str, AbstractStream] = {s.name: s for s in streams}
+        invalid_targets = [
+            name
+            for name in stream_name_to_group
+            if name not in stream_name_to_instance
+            or not isinstance(stream_name_to_instance[name], DefaultStream)
+        ]
+        if invalid_targets:
+            raise AirbyteTracedException(
+                message="stream_groups references an unknown or unsupported stream.",
+                internal_message=(
+                    "stream_groups entries must target created DefaultStream instances. "
+                    f"Got: {sorted(invalid_targets)}"
+                ),
+                failure_type=FailureType.config_error,
+            )

Also applies to: 475-490

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@airbyte_cdk/sources/declarative/concurrent_declarative_source.py` around
lines 446 - 453, The current build of stream_name_to_group from stream_groups
silently allows duplicate stream entries (last group wins) and later processing
silently skips unknown/unsupported targets; change this to fail-fast: when
iterating stream_groups in the block that builds stream_name_to_group, detect if
a stream name already exists and raise a clear manifest/config error (include
the duplicate stream name and both group names) instead of overwriting;
additionally, in the downstream target-processing logic (the code that checks
targets and currently skips unknown/unsupported targets), replace silent skips
with validation that raises a manifest/config error when a target refers to a
stream not present in stream_name_to_group or when a target type is unsupported,
so typos and double assignments surface immediately (use the symbols
stream_groups, stream_name_to_group and the target-processing function/method
where unknown targets are handled to locate the changes).


# Validate no stream shares a group with any of its ancestor streams
stream_name_to_instance: Dict[str, AbstractStream] = {s.name: s for s in streams}

Comment on lines +446 to +457
Copy link

Copilot AI Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_apply_stream_groups() only extracts stream names when stream_groups[*].streams entries are dicts with a name field. If stream_groups is meant to support JSON-reference strings (as documented in the generated model and PR description), those entries will currently be ignored and no groups will be applied. Expand parsing to handle the reference format you intend to support (e.g., strings like #/definitions/<stream_key> or {"$ref": ...}) and map them to created stream instances reliably.

Suggested change
# Build stream_name -> group_name mapping from the resolved manifest
stream_name_to_group: Dict[str, str] = {}
for group_name, group_config in stream_groups.items():
for stream_ref in group_config.get("streams", []):
if isinstance(stream_ref, dict):
stream_name = stream_ref.get("name", "")
if stream_name:
stream_name_to_group[stream_name] = group_name
# Validate no stream shares a group with any of its ancestor streams
stream_name_to_instance: Dict[str, AbstractStream] = {s.name: s for s in streams}
# Map existing stream instance names for reliable resolution of references
stream_name_to_instance: Dict[str, AbstractStream] = {s.name: s for s in streams}
def _resolve_stream_ref_to_name(stream_ref: Any) -> Optional[str]:
"""
Normalize a stream reference entry from stream_groups[*].streams into a stream name.
Supported formats:
- {"name": "<stream_name>"}
- {"$ref": "#/definitions/<stream_name_or_key>"}
- "#/definitions/<stream_name_or_key>"
"""
# Direct dict with explicit name (current behavior)
if isinstance(stream_ref, Mapping):
name_val = stream_ref.get("name")
if isinstance(name_val, str) and name_val:
return name_val
ref_val = stream_ref.get("$ref")
if isinstance(ref_val, str):
ref_str = ref_val
else:
return None
elif isinstance(stream_ref, str):
ref_str = stream_ref
else:
return None
# At this point, ref_str is a reference string. For simple JSON-pointer-like
# references such as "#/definitions/<stream_key>", use the last path segment.
ref_str = ref_str.strip()
if not ref_str:
return None
# Ignore fragment prefix "#/" if present, then take the last segment
if ref_str.startswith("#/"):
path = ref_str[2:]
elif ref_str.startswith("#"):
path = ref_str[1:]
else:
path = ref_str
candidate = path.split("/")[-1] if path else ""
if not candidate:
return None
# Only return names that correspond to an actual created stream
return candidate if candidate in stream_name_to_instance else None
# Build stream_name -> group_name mapping from the resolved manifest
stream_name_to_group: Dict[str, str] = {}
for group_name, group_config in stream_groups.items():
for stream_ref in group_config.get("streams", []):
stream_name = _resolve_stream_ref_to_name(stream_ref)
if stream_name:
stream_name_to_group[stream_name] = group_name
# Validate no stream shares a group with any of its ancestor streams

Copilot uses AI. Check for mistakes.
def _collect_all_ancestor_names(stream_name: str) -> Set[str]:
"""Recursively collect all ancestor stream names."""
ancestors: Set[str] = set()
inst = stream_name_to_instance.get(stream_name)
if not isinstance(inst, DefaultStream):
return ancestors
router = inst.get_partition_router()
if isinstance(router, GroupingPartitionRouter):
router = router.underlying_partition_router
if not isinstance(router, SubstreamPartitionRouter):
return ancestors
for parent_config in router.parent_stream_configs:
parent_name = parent_config.stream.name
ancestors.add(parent_name)
ancestors.update(_collect_all_ancestor_names(parent_name))
return ancestors

for stream in streams:
if not isinstance(stream, DefaultStream) or stream.name not in stream_name_to_group:
continue
group_name = stream_name_to_group[stream.name]
for ancestor_name in _collect_all_ancestor_names(stream.name):
if stream_name_to_group.get(ancestor_name) == group_name:
raise ValueError(
f"Stream '{stream.name}' and its parent stream '{ancestor_name}' "
f"are both in group '{group_name}'. "
f"A child stream must not share a group with its parent to avoid deadlock."
)

# Apply group to matching stream instances
for stream in streams:
if isinstance(stream, DefaultStream) and stream.name in stream_name_to_group:
stream.block_simultaneous_read = stream_name_to_group[stream.name]

@staticmethod
def _initialize_cache_for_parent_streams(
stream_configs: List[Dict[str, Any]],
Expand Down
46 changes: 46 additions & 0 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,15 @@ properties:
"$ref": "#/definitions/ConcurrencyLevel"
api_budget:
"$ref": "#/definitions/HTTPAPIBudget"
stream_groups:
title: Stream Groups
description: >
Groups of streams that share a common resource and should not be read simultaneously.
Each group defines a set of stream references and an action that controls how concurrent
reads are managed. Only applies to ConcurrentDeclarativeSource.
type: object
additionalProperties:
"$ref": "#/definitions/StreamGroup"
max_concurrent_async_job_count:
title: Maximum Concurrent Asynchronous Jobs
description: Maximum number of concurrent asynchronous jobs to run. This property is only relevant for sources/streams that support asynchronous job execution through the AsyncRetriever (e.g. a report-based stream that initiates a job, polls the job status, and then fetches the job results). This is often set by the API's maximum number of concurrent jobs on the account level. Refer to the API's documentation for this information.
Expand Down Expand Up @@ -4126,6 +4135,43 @@ definitions:
- "$ref": "#/definitions/ConfigRemoveFields"
- "$ref": "#/definitions/CustomConfigTransformation"
default: []
StreamGroup:
title: Stream Group
description: >
A group of streams that share a common resource and should not be read simultaneously.
Streams in the same group will be blocked from concurrent reads based on the specified action.
type: object
required:
- streams
- action
properties:
streams:
title: Streams
description: >
List of references to streams that belong to this group.
type: array
items:
anyOf:
Comment on lines +4151 to +4154
Copy link

Copilot AI Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StreamGroup.streams is described as “references to streams”, but the schema currently restricts items to #/definitions/DeclarativeStream objects. This doesn’t match the intended interface shown in the PR description (string refs like "#/definitions/my_stream") and will force users to duplicate full stream definitions inside stream_groups. Update the schema to accept reference strings (or $ref-style objects) and document the expected format consistently.

Suggested change
List of references to streams that belong to this group.
type: array
items:
anyOf:
List of references to streams that belong to this group.
Each entry may be a reference string (e.g. "#/definitions/my_stream"),
a $ref-style object (e.g. { "$ref": "#/definitions/my_stream" }),
or an inline DeclarativeStream definition.
type: array
items:
anyOf:
- type: string
- type: object
required:
- "$ref"
properties:
"$ref":
type: string
additionalProperties: false

Copilot uses AI. Check for mistakes.
- "$ref": "#/definitions/DeclarativeStream"
action:
title: Action
description: The action to apply to streams in this group.
"$ref": "#/definitions/BlockSimultaneousSyncsAction"
BlockSimultaneousSyncsAction:
title: Block Simultaneous Syncs Action
description: >
Action that prevents streams in the same group from being read concurrently.
When applied to a stream group, streams with this action will be deferred if
another stream in the same group is currently active.
This is useful for APIs that don't allow concurrent access to the same
endpoint or session. Only applies to ConcurrentDeclarativeSource.
type: object
required:
- type
properties:
type:
type: string
enum: [BlockSimultaneousSyncsAction]
SubstreamPartitionRouter:
title: Substream Partition Router
description: Partition router that is used to retrieve records that have been partitioned according to records from the specified parent streams. An example of a parent stream is automobile brands and the substream would be the various car models associated with each branch.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.

# generated by datamodel-codegen:
# filename: declarative_component_schema.yaml

Expand Down Expand Up @@ -2359,6 +2357,11 @@ class Config:
spec: Optional[Spec] = None
concurrency_level: Optional[ConcurrencyLevel] = None
api_budget: Optional[HTTPAPIBudget] = None
stream_groups: Optional[Dict[str, StreamGroup]] = Field(
None,
description="Groups of streams that share a common resource and should not be read simultaneously. Each group defines a set of stream references and an action that controls how concurrent reads are managed. Only applies to ConcurrentDeclarativeSource.",
title="Stream Groups",
)
max_concurrent_async_job_count: Optional[Union[int, str]] = Field(
None,
description="Maximum number of concurrent asynchronous jobs to run. This property is only relevant for sources/streams that support asynchronous job execution through the AsyncRetriever (e.g. a report-based stream that initiates a job, polls the job status, and then fetches the job results). This is often set by the API's maximum number of concurrent jobs on the account level. Refer to the API's documentation for this information.",
Expand Down Expand Up @@ -2394,6 +2397,11 @@ class Config:
spec: Optional[Spec] = None
concurrency_level: Optional[ConcurrencyLevel] = None
api_budget: Optional[HTTPAPIBudget] = None
stream_groups: Optional[Dict[str, StreamGroup]] = Field(
None,
description="Groups of streams that share a common resource and should not be read simultaneously. Each group defines a set of stream references and an action that controls how concurrent reads are managed. Only applies to ConcurrentDeclarativeSource.",
title="Stream Groups",
)
max_concurrent_async_job_count: Optional[Union[int, str]] = Field(
None,
description="Maximum number of concurrent asynchronous jobs to run. This property is only relevant for sources/streams that support asynchronous job execution through the AsyncRetriever (e.g. a report-based stream that initiates a job, polls the job status, and then fetches the job results). This is often set by the API's maximum number of concurrent jobs on the account level. Refer to the API's documentation for this information.",
Expand Down Expand Up @@ -3055,6 +3063,23 @@ class AsyncRetriever(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class BlockSimultaneousSyncsAction(BaseModel):
type: Literal["BlockSimultaneousSyncsAction"]


class StreamGroup(BaseModel):
streams: List[str] = Field(
...,
description='List of references to streams that belong to this group. Use JSON references to stream definitions (e.g., "#/definitions/my_stream").',
Comment on lines +3071 to +3073
Copy link

Copilot AI Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The generated Pydantic model defines StreamGroup.streams as List[str] (JSON reference strings), but the JSON schema in declarative_component_schema.yaml currently defines streams items as DeclarativeStream objects. This schema/model mismatch will cause validation/parsing failures depending on which path consumes the manifest. Align the model and schema (preferably by making the schema accept the same reference-string format used by the model and docs).

Suggested change
streams: List[str] = Field(
...,
description='List of references to streams that belong to this group. Use JSON references to stream definitions (e.g., "#/definitions/my_stream").',
streams: List[Union[str, "DeclarativeStream"]] = Field(
...,
description='List of streams that belong to this group. Each item can be either a JSON reference to a stream definition (e.g., "#/definitions/my_stream") or an inlined DeclarativeStream object.',

Copilot uses AI. Check for mistakes.
title="Streams",
)
action: BlockSimultaneousSyncsAction = Field(
...,
description="The action to apply to streams in this group.",
title="Action",
)


class SubstreamPartitionRouter(BaseModel):
type: Literal["SubstreamPartitionRouter"]
parent_stream_configs: List[ParentStreamConfig] = Field(
Expand Down
15 changes: 15 additions & 0 deletions airbyte_cdk/sources/streams/concurrent/abstract_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,21 @@ def cursor(self) -> Cursor:
:return: The cursor associated with this stream.
"""

@property
def block_simultaneous_read(self) -> str:
"""
Override to return a non-empty group name if this stream should block simultaneous reads.
When a non-empty string is returned, prevents starting partition generation for this stream if:
- Another stream with the same group name is already active
- Any of its parent streams are in an active group

This allows grouping multiple streams that share the same resource (e.g., API endpoint or session)
to prevent them from running concurrently, even if they don't have a parent-child relationship.

:return: Group name for blocking (non-empty string), or "" to allow concurrent reading
"""
return "" # Default: allow concurrent reading

@abstractmethod
def check_availability(self) -> StreamAvailability:
"""
Expand Down
5 changes: 5 additions & 0 deletions airbyte_cdk/sources/streams/concurrent/adapters.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,11 @@ def cursor_field(self) -> Union[str, List[str]]:
def cursor(self) -> Optional[Cursor]: # type: ignore[override] # StreamFaced expects to use only airbyte_cdk.sources.streams.concurrent.cursor.Cursor
return self._cursor

@property
def block_simultaneous_read(self) -> str:
"""Returns the blocking group name from the underlying stream"""
return self._abstract_stream.block_simultaneous_read

# FIXME the lru_cache seems to be mostly there because of typing issue
@lru_cache(maxsize=None)
def get_json_schema(self) -> Mapping[str, Any]:
Expand Down
27 changes: 27 additions & 0 deletions airbyte_cdk/sources/streams/concurrent/default_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,13 @@
from typing import Any, Callable, Iterable, List, Mapping, Optional, Union

from airbyte_cdk.models import AirbyteStream, SyncMode
from airbyte_cdk.sources.declarative.incremental.concurrent_partition_cursor import (
ConcurrentPerPartitionCursor,
)
from airbyte_cdk.sources.declarative.partition_routers.partition_router import PartitionRouter
from airbyte_cdk.sources.declarative.stream_slicers.declarative_partition_generator import (
StreamSlicerPartitionGenerator,
)
from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
from airbyte_cdk.sources.streams.concurrent.availability_strategy import StreamAvailability
from airbyte_cdk.sources.streams.concurrent.cursor import Cursor, CursorField
Expand All @@ -26,6 +33,7 @@ def __init__(
cursor: Cursor,
namespace: Optional[str] = None,
supports_file_transfer: bool = False,
block_simultaneous_read: str = "",
) -> None:
self._stream_partition_generator = partition_generator
self._name = name
Expand All @@ -36,6 +44,7 @@ def __init__(
self._cursor = cursor
self._namespace = namespace
self._supports_file_transfer = supports_file_transfer
self._block_simultaneous_read = block_simultaneous_read

def generate_partitions(self) -> Iterable[Partition]:
yield from self._stream_partition_generator.generate()
Expand Down Expand Up @@ -94,6 +103,24 @@ def log_stream_sync_configuration(self) -> None:
def cursor(self) -> Cursor:
return self._cursor

@property
def block_simultaneous_read(self) -> str:
"""Returns the blocking group name for this stream, or empty string if no blocking"""
return self._block_simultaneous_read

@block_simultaneous_read.setter
def block_simultaneous_read(self, value: str) -> None:
self._block_simultaneous_read = value

def get_partition_router(self) -> PartitionRouter | None:
"""Return the partition router for this stream, or None if not available."""
if not isinstance(self._stream_partition_generator, StreamSlicerPartitionGenerator):
return None
stream_slicer = self._stream_partition_generator._stream_slicer
if not isinstance(stream_slicer, ConcurrentPerPartitionCursor):
return None
return stream_slicer._partition_router
Comment on lines +115 to +122
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Could get_partition_router() unwrap test-read decorators too, wdyt?

StreamSlicerPartitionGenerator wraps the slicer when slice_limit is enabled, so the isinstance(..., ConcurrentPerPartitionCursor) check at Line 120 returns False for those streams even though the underlying cursor still has a partition router. Any caller using this helper for parent discovery or deadlock validation will silently get None in that mode.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@airbyte_cdk/sources/streams/concurrent/default_stream.py` around lines 115 -
122, get_partition_router currently returns None when the slicer is wrapped
(e.g., by StreamSlicerPartitionGenerator/test-read decorators) because it only
checks isinstance(stream_slicer, ConcurrentPerPartitionCursor); change it to
unwrap decorator layers: after obtaining stream_slicer from
self._stream_partition_generator._stream_slicer, loop while the object exposes
an inner slicer (e.g., has attribute _stream_slicer or _wrapped) and set
stream_slicer = stream_slicer._stream_slicer (or stream_slicer._wrapped) until
you reach the underlying instance, then check isinstance(...,
ConcurrentPerPartitionCursor) and return its _partition_router; this preserves
existing behavior for unwrapped slicers and recovers the partition router when
decorators are present.


def check_availability(self) -> StreamAvailability:
"""
Check stream availability by attempting to read the first record of the stream.
Expand Down
12 changes: 10 additions & 2 deletions airbyte_cdk/sources/streams/http/http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ def monkey_patched_get_item(self, key): # type: ignore # this interface is a co
class HttpClient:
_DEFAULT_MAX_RETRY: int = 5
_DEFAULT_MAX_TIME: int = 60 * 10
_DEFAULT_CONNECT_TIMEOUT: int = 30
_DEFAULT_READ_TIMEOUT: int = 300
Comment on lines +88 to +89
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Could we keep an override path for these new default timeouts before landing this, wdyt?

This now applies (30, 300) to every caller that omits timeout. That includes the common HttpStream._fetch_next_page() path in airbyte_cdk/sources/streams/http/http.py:505-540 and the declarative HttpRequester.send_request() path in airbyte_cdk/sources/declarative/requesters/http_requester.py:446-490, which currently only passes {"stream": ...}. For declarative sources, that means a new hard 5-minute read cap with no manifest-level escape hatch, so long-running exports/long-poll endpoints would start failing with no connector-side workaround. Could we thread an optional timeout through the declarative requester, or add an explicit opt-out in the same change, wdyt?

Also applies to: 593-601

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@airbyte_cdk/sources/streams/http/http_client.py` around lines 88 - 89, The
new module-level defaults _DEFAULT_CONNECT_TIMEOUT and _DEFAULT_READ_TIMEOUT are
being applied universally and can unexpectedly cap declarative and HttpStream
flows; update the code to allow callers to opt out or override: add an optional
timeout parameter (or allow timeout=None) to HttpRequester.send_request and
thread it through to where requests are made (and to HttpStream._fetch_next_page
usage), ensure the declarative requester (requesters/http_requester.py) honors a
passed-in timeout and does not substitute the (30,300) default when None is
provided, and add documentation/comments and tests to cover both explicit
override and opt-out behavior so long-running/long-poll endpoints can bypass the
hard 5-minute read cap.

_ACTIONS_TO_RETRY_ON = {
ResponseAction.RETRY,
ResponseAction.RATE_LIMITED,
Expand Down Expand Up @@ -586,11 +588,17 @@ def send_request(
verify=request_kwargs.get("verify"),
cert=request_kwargs.get("cert"),
)
request_kwargs = {**request_kwargs, **env_settings}
mutable_request_kwargs: Dict[str, Any] = {**request_kwargs, **env_settings}

if "timeout" not in mutable_request_kwargs:
mutable_request_kwargs["timeout"] = (
self._DEFAULT_CONNECT_TIMEOUT,
self._DEFAULT_READ_TIMEOUT,
)

response: requests.Response = self._send_with_retry(
request=request,
request_kwargs=request_kwargs,
request_kwargs=mutable_request_kwargs,
log_formatter=log_formatter,
exit_on_rate_limit=exit_on_rate_limit,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5281,6 +5281,62 @@ def test_catalog_defined_cursor_field_stream_missing():
assert stream._cursor_field.supports_catalog_defined_cursor_field == True


def test_block_simultaneous_read_from_stream_groups():
"""Test that factory-created streams default to empty block_simultaneous_read.

The factory no longer handles stream_groups — that's done by
ConcurrentDeclarativeSource._apply_stream_groups after stream creation.
This test verifies the factory creates streams without group info.
"""
content = """
definitions:
parent_stream:
type: DeclarativeStream
name: "parent"
primary_key: "id"
retriever:
type: SimpleRetriever
requester:
type: HttpRequester
url_base: "https://api.example.com"
path: "/parent"
http_method: "GET"
authenticator:
type: BearerAuthenticator
api_token: "{{ config['api_key'] }}"
record_selector:
type: RecordSelector
extractor:
type: DpathExtractor
field_path: []
schema_loader:
type: InlineSchemaLoader
schema:
type: object
properties:
id:
type: string
"""

config = {"api_key": "test_key"}

parsed_manifest = YamlDeclarativeSource._parse(content)
resolved_manifest = resolver.preprocess_manifest(parsed_manifest)

factory = ModelToComponentFactory()

parent_manifest = transformer.propagate_types_and_parameters(
"", resolved_manifest["definitions"]["parent_stream"], {}
)
parent_stream: DefaultStream = factory.create_component(
model_type=DeclarativeStreamModel, component_definition=parent_manifest, config=config
)

assert isinstance(parent_stream, DefaultStream)
assert parent_stream.name == "parent"
assert parent_stream.block_simultaneous_read == ""

Comment on lines +5284 to +5338
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Could this test include an actual stream_groups entry, wdyt?

Right now the fixture at Line 5291 never defines stream_groups, so this only proves the default is "". If the factory accidentally starts consuming top-level groups again, this test would still pass.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@unit_tests/sources/declarative/parsers/test_model_to_component_factory.py`
around lines 5284 - 5338, Update the
test_block_simultaneous_read_from_stream_groups to include a top-level
stream_groups entry in the YAML fixture so the factory is exercised with group
data present; keep the rest of the fixture the same and still assert that
factory.create_component (ModelToComponentFactory.create_component invoked with
DeclarativeStreamModel) returns a DefaultStream with name "parent" and
block_simultaneous_read == "" — this ensures the factory does not consume
top-level stream_groups while still verifying the default behavior.


def get_schema_loader(stream: DefaultStream):
assert isinstance(
stream._stream_partition_generator._partition_factory._schema_loader,
Expand Down
Loading
Loading