Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ Newer updates can be found here: [GitHub Release Notes](https://github.com/airby

# Changelog

## Unreleased

low-code: Add opt-in `config_check_streams_path` to `CheckStream` so connections can supply a hidden array-of-strings config field that overrides the manifest's `stream_names` for the duration of the check, and convert the previous `ValueError` raised on unknown stream names into a `(False, message)` result that lists all unknowns and their source (config path vs. manifest).

## 6.5.2

bugfix: Ensure that streams with partition router are not executed concurrently
Expand Down
68 changes: 60 additions & 8 deletions airbyte_cdk/sources/declarative/checks/check_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from dataclasses import InitVar, dataclass
from typing import Any, Dict, List, Mapping, Optional, Tuple, Union

import dpath

from airbyte_cdk.sources import Source
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
Expand Down Expand Up @@ -41,16 +43,21 @@ class DynamicStreamCheckConfig:

@dataclass
class CheckStream(ConnectionChecker):
"""
Checks the connections by checking availability of one or many streams selected by the developer
"""Checks the connection by verifying availability of one or many streams.

Attributes:
stream_name (List[str]): names of streams to check
stream_names: Manifest-declared default stream names to check.
dynamic_streams_check_configs: Optional dynamic-stream check configs.
config_check_streams_path: Optional dot-delimited path into the
user-provided config whose value (when present and a non-empty
list) overrides `stream_names` for this check. When empty,
missing, or `None`, the manifest's `stream_names` is used.
"""

stream_names: List[str]
parameters: InitVar[Mapping[str, Any]]
dynamic_streams_check_configs: Optional[List[DynamicStreamCheckConfig]] = None
config_check_streams_path: Optional[str] = None

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._parameters = parameters
Expand All @@ -63,6 +70,32 @@ def _log_error(self, logger: logging.Logger, action: str, error: Exception) -> T
logger.error(error_message + f"Error traceback: \n {traceback.format_exc()}", exc_info=True)
return False, error_message

def _resolve_effective_stream_names(
self, config: Mapping[str, Any]
) -> Tuple[Optional[List[str]], Optional[str]]:
"""Resolves the list of stream names to check for this connection.

Returns a `(stream_names, error_message)` tuple. When `error_message`
is set, the caller should short-circuit with `(False, error_message)`.
When `config_check_streams_path` is unset, or the referenced value is
missing or an empty list, falls back to the manifest's `stream_names`.
"""
if not self.config_check_streams_path:
return self.stream_names, None

configured_value = dpath.get(
dict(config), self.config_check_streams_path, separator=".", default=None
)
if configured_value is None:
return self.stream_names, None
if not isinstance(configured_value, list):
return None, (
f"Config field '{self.config_check_streams_path}' must be a list of stream names."
)
if not configured_value:
return self.stream_names, None
return list(configured_value), None

def check_connection(
self,
source: Source,
Expand All @@ -78,12 +111,31 @@ def check_connection(
return self._log_error(logger, "discovering streams", error)

stream_name_to_stream = {s.name: s for s in streams}
for stream_name in self.stream_names:
if stream_name not in stream_name_to_stream:
raise ValueError(
f"{stream_name} is not part of the catalog. Expected one of {list(stream_name_to_stream.keys())}."
)

effective_stream_names, override_error = self._resolve_effective_stream_names(config)
if override_error is not None:
return False, override_error

source_label = (
f"config path '{self.config_check_streams_path}'"
if self.config_check_streams_path and effective_stream_names is not self.stream_names
else "manifest"
)

unknown_stream_names = [
stream_name
for stream_name in (effective_stream_names or [])
if stream_name not in stream_name_to_stream
]
if unknown_stream_names:
available = list(stream_name_to_stream.keys())
message = (
f"Stream(s) {unknown_stream_names} from {source_label} are not part of "
f"the catalog. Expected one of {available}."
)
return False, message

for stream_name in effective_stream_names or []:
stream_availability, message = self._check_stream_availability(
stream_name_to_stream, stream_name, logger
)
Expand Down
66 changes: 66 additions & 0 deletions airbyte_cdk/sources/declarative/concurrent_declarative_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,8 @@ def __init__(
self._validate_source()
# apply additional post-processing to the manifest
self._post_process_manifest()
# inject the hidden config-check-streams-path property into the spec, if requested
self._inject_config_check_streams_property()

spec: Optional[Mapping[str, Any]] = self._source_config.get("spec")
self._spec_component: Optional[Spec] = (
Expand Down Expand Up @@ -293,6 +295,70 @@ def _post_process_manifest(self) -> None:
# apply manifest normalization, if required
self._normalize_manifest()

def _inject_config_check_streams_property(self) -> None:
"""Injects the hidden `config_check_streams_path` property into the spec.

When the manifest's `check` block is a `CheckStream` with a
`config_check_streams_path` set, ensure the connector spec exposes a
hidden array-of-strings property at that path so platforms can plumb
the override through. The injected property is not added to
`required`. If the spec already declares a property at that key, its
existing shape is preserved and `airbyte_hidden: true` is forced.

Manifests without a `spec` block are left unchanged — the runtime
override still works because it reads directly from the user's
config; the injection is only meaningful for platforms that surface
the connector spec.

v1 only supports single-level paths. Dotted paths raise a clear error
so the manifest author can simplify their config schema or wait for
v2 nesting support.
"""
check = self._source_config.get("check")
if not isinstance(check, Mapping) or check.get("type") != "CheckStream":
return

path = check.get("config_check_streams_path")
if not isinstance(path, str) or not path:
return

if "." in path:
raise ValueError(
"CheckStream.config_check_streams_path only supports single-level paths in v1. "
f"Got '{path}'. Declare a top-level config field and reference it without dots."
)

spec = self._source_config.get("spec")
if not isinstance(spec, dict):
return
connection_specification = spec.setdefault("connection_specification", {})
if not isinstance(connection_specification, dict):
raise ValueError(
"Expected 'spec.connection_specification' to be a mapping; got "
f"{type(connection_specification).__name__}."
)
properties = connection_specification.setdefault("properties", {})
if not isinstance(properties, dict):
raise ValueError(
"Expected 'spec.connection_specification.properties' to be a mapping; got "
f"{type(properties).__name__}."
)

existing = properties.get(path)
if isinstance(existing, dict):
existing["airbyte_hidden"] = True
else:
properties[path] = {
"type": "array",
"items": {"type": "string"},
"title": "Check Streams Override",
"description": (
"Internal override for the streams that should be exercised during "
"connection check. Set by the platform; not exposed in the standard UI."
),
"airbyte_hidden": True,
}

def _migrate_manifest(self) -> None:
"""
This method is used to migrate the manifest. It should be called after the manifest has been validated.
Expand Down
15 changes: 15 additions & 0 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,21 @@ definitions:
type: array
items:
"$ref": "#/definitions/DynamicStreamCheckConfig"
config_check_streams_path:
title: Config Check Streams Path
description: |-
Optional dot-delimited path into the user's config that, when present,
overrides `stream_names` for the duration of the check. The referenced
config value must be a list of stream names. When empty or missing, the
manifest's `stream_names` is used. Setting this also injects a hidden
array-of-strings property at the top level of the connector spec
(single-level paths only in v1) so platforms that allow a connection
to override the check-time streams can plumb the value through without
exposing it in the standard config UI.
type: string
examples:
- "check_streams_override"
- "advanced.check_streams"
DynamicStreamCheckConfig:
type: object
required:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1609,6 +1609,12 @@ class CheckStream(BaseModel):
title="Stream Names",
)
dynamic_streams_check_configs: Optional[List[DynamicStreamCheckConfig]] = None
config_check_streams_path: Optional[str] = Field(
None,
description="Optional dot-delimited path into the user's config that, when present,\noverrides `stream_names` for the duration of the check. The referenced\nconfig value must be a list of stream names. When empty or missing, the\nmanifest's `stream_names` is used. Setting this also injects a hidden\narray-of-strings property at the top level of the connector spec\n(single-level paths only in v1) so platforms that allow a connection\nto override the check-time streams can plumb the value through without\nexposing it in the standard config UI.",
examples=["check_streams_override", "advanced.check_streams"],
title="Config Check Streams Path",
)


class IncrementingCountCursor(BaseModel):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1269,6 +1269,7 @@ def create_check_stream(
return CheckStream(
stream_names=model.stream_names or [],
dynamic_streams_check_configs=dynamic_streams_check_configs,
config_check_streams_path=model.config_check_streams_path,
parameters={},
)

Expand Down
Loading
Loading