Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions airbyte_cdk/sources/declarative/checks/check_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,14 @@ def _check_dynamic_streams_availability(
logger: logging.Logger,
) -> Tuple[bool, Any]:
"""Checks the availability of dynamic streams."""
dynamic_streams = source.resolved_manifest.get("dynamic_streams", []) # type: ignore[attr-defined] # The source's resolved_manifest manifest is checked before calling this method
dynamic_stream_name_to_dynamic_stream = {
ds.get("name", f"dynamic_stream_{i}"): ds for i, ds in enumerate(dynamic_streams)
}
generated_streams = self._map_generated_streams(source.dynamic_streams) # type: ignore[attr-defined] # The source's dynamic_streams manifest is checked before calling this method
try:
dynamic_streams = source.resolved_manifest.get("dynamic_streams", []) # type: ignore[attr-defined] # The source's resolved_manifest manifest is checked before calling this method
dynamic_stream_name_to_dynamic_stream = {
ds.get("name", f"dynamic_stream_{i}"): ds for i, ds in enumerate(dynamic_streams)
}
generated_streams = self._map_generated_streams(source.dynamic_streams) # type: ignore[attr-defined] # The source's dynamic_streams manifest is checked before calling this method
except Exception as error:
return self._log_error(logger, "resolving dynamic streams for check", error)

for check_config in self.dynamic_streams_check_configs: # type: ignore[union-attr] # None value for self.dynamic_streams_check_configs handled in __post_init__
if check_config.dynamic_stream_name not in dynamic_stream_name_to_dynamic_stream:
Expand Down
88 changes: 87 additions & 1 deletion unit_tests/sources/declarative/checks/test_check_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@
from jsonschema.exceptions import ValidationError

from airbyte_cdk.models import Status
from airbyte_cdk.sources.declarative.checks.check_stream import CheckStream
from airbyte_cdk.sources.declarative.checks.check_stream import (
CheckStream,
DynamicStreamCheckConfig,
)
from airbyte_cdk.sources.declarative.concurrent_declarative_source import (
ConcurrentDeclarativeSource,
)
Expand Down Expand Up @@ -692,3 +695,86 @@ def test_check_stream_only_type_provided():
)
with pytest.raises(ValueError):
source.check(logger, _CONFIG)


class _RaisingManifest:
"""Dict-like manifest whose `get` call raises, simulating a failure such as a
`ConfigComponentsResolver` error while resolving `dynamic_streams`."""

def get(self, *args, **kwargs):
raise RuntimeError("ConfigComponentsResolver failed to resolve manifest")


class _FailingResolvedManifestSource:
"""Fake Source whose resolved_manifest fails when `dynamic_streams` is looked up."""

resolved_manifest = _RaisingManifest()
dynamic_streams: list = []

def __init__(self, streams):
self._streams = streams

def streams(self, config):
return self._streams


class _FailingDynamicStreamsSource:
"""Fake Source whose `dynamic_streams` property raises when accessed from inside
`_check_dynamic_streams_availability`. The first access (from the `hasattr` gate
in `check_connection`) succeeds; subsequent accesses raise, simulating a flaky
resolver or a resolver whose second call fails."""

resolved_manifest = {"dynamic_streams": [{"name": "some_dynamic_stream"}]}

def __init__(self, streams):
self._streams = streams
self._access_count = 0

def streams(self, config):
return self._streams

@property
def dynamic_streams(self):
self._access_count += 1
if self._access_count > 1:
raise RuntimeError("dynamic stream generation failed")
return []


@pytest.mark.parametrize(
"source_factory, expected_error_substring",
[
pytest.param(
_FailingResolvedManifestSource,
"ConfigComponentsResolver failed to resolve manifest",
id="resolved_manifest_get_raises",
),
pytest.param(
_FailingDynamicStreamsSource,
"dynamic stream generation failed",
id="dynamic_streams_property_raises",
),
],
)
def test_check_dynamic_streams_availability_returns_error_when_resolution_raises(
source_factory, expected_error_substring
):
"""`_check_dynamic_streams_availability` must funnel resolution errors through
`_log_error` and return `(False, message)` rather than propagating the exception."""
static_stream = MagicMock(spec=Stream)
static_stream.name = "static_stream"
source = source_factory([static_stream])

check_stream = CheckStream(
stream_names=[],
parameters={},
dynamic_streams_check_configs=[
DynamicStreamCheckConfig(dynamic_stream_name="some_dynamic_stream", stream_count=1)
],
)

is_available, message = check_stream.check_connection(source, logger, config)

assert is_available is False
assert "resolving dynamic streams for check" in message
assert expected_error_substring in message
Loading