Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,14 @@
"DynamicSchemaLoader.retriever": "SimpleRetriever",
# SchemaTypeIdentifier
"SchemaTypeIdentifier.types_map": "TypesMap",
# StreamGroup
"StreamGroup.streams": "DeclarativeStream",
Comment on lines +70 to +71
Copy link

Copilot AI Apr 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DEFAULT_MODEL_TYPES maps StreamGroup.streams to DeclarativeStream, which assumes the StreamGroup.streams field contains stream component objects. However, the generated Pydantic schema currently defines StreamGroup.streams as List[str] (airbyte_cdk/sources/declarative/models/declarative_component_schema.py:3155-3160). Please reconcile the expected manifest shape (and update either the model/schema or this default-type mapping) so type propagation matches the canonical StreamGroup definition.

Suggested change
# StreamGroup
"StreamGroup.streams": "DeclarativeStream",

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Pydantic model StreamGroup.streams is typed as List[str] but the YAML schema (declarative_component_schema.yaml:4266-4273) defines it as:

streams:
  type: array
  items:
    anyOf:
      - "$ref": "#/definitions/DeclarativeStream"

The transformer operates on the resolved manifest dict (pre-pydantic), where stream items are full component dicts, not strings. The pydantic model is not involved at this stage — ManifestComponentTransformer runs before model instantiation, processing raw YAML-resolved dicts. The _apply_stream_groups method in concurrent_declarative_source.py similarly reads from the resolved manifest dict directly (accessing .get("streams", []) and .get("name", "") on each item).

So the DEFAULT_MODEL_TYPES entry here is correct: at the point the transformer runs, StreamGroup.streams items are DeclarativeStream component dicts that need type inference and parameter propagation.

}

# Mapping for fields that use the JSON Schema additionalProperties pattern (Dict[str, ComponentType]).
# The transformer uses this to identify dict values as typed components and recurse into each.
ADDITIONAL_PROPERTIES_TYPES: Mapping[str, str] = {
"DeclarativeSource.stream_groups": "StreamGroup",
}

# We retain a separate registry for custom components to automatically insert the type if it is missing. This is intended to
Expand Down Expand Up @@ -159,12 +167,30 @@ def propagate_types_and_parameters(
# We exclude propagating a parameter that matches the current field name because that would result in an infinite cycle
excluded_parameter = current_parameters.pop(field_name, None)
parent_type_field_identifier = f"{propagated_component.get('type')}.{field_name}"
propagated_component[field_name] = self.propagate_types_and_parameters(
parent_type_field_identifier,
field_value,
current_parameters,
use_parent_parameters=use_parent_parameters,
additional_props_type = ADDITIONAL_PROPERTIES_TYPES.get(
parent_type_field_identifier
)
if additional_props_type:
# Handle additionalProperties pattern: field_value is a dict of
# component dicts keyed by arbitrary names.
for key, value in field_value.items():
if isinstance(value, dict):
if "type" not in value:
value["type"] = additional_props_type
field_value[key] = self.propagate_types_and_parameters(
parent_type_field_identifier,
value,
current_parameters,
use_parent_parameters=use_parent_parameters,
)
Copy link

Copilot AI Apr 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the additionalProperties branch, the updated dict is only mutated via the local field_value alias and never explicitly re-assigned back to propagated_component[field_name]. This works because field_value references the same dict, but it’s brittle/inconsistent with the non-additionalProperties path (which assigns the recursive result). Consider assigning propagated_component[field_name] = field_value after processing to make the data flow explicit and avoid surprises if this code is later refactored (e.g., if field_value stops being a plain dict).

Suggested change
)
)
propagated_component[field_name] = field_value

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point — added the explicit propagated_component[field_name] = field_value assignment in 5066d4a to make the data flow consistent with the else branch.

propagated_component[field_name] = field_value
else:
propagated_component[field_name] = self.propagate_types_and_parameters(
parent_type_field_identifier,
field_value,
current_parameters,
use_parent_parameters=use_parent_parameters,
)
if excluded_parameter:
current_parameters[field_name] = excluded_parameter
elif isinstance(field_value, typing.List):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -743,3 +743,130 @@ def test_dynamic_stream_use_parent_parameters_configuration():
assert result_false["retriever"]["name"] == "retriever_name"
# When use_parent_parameters=True, parent's $parameters win
assert result_true["retriever"]["name"] == "parent_name"


def test_stream_groups_propagate_types_and_parameters():
"""stream_groups uses additionalProperties (Dict[str, StreamGroup]).

The transformer must recurse into each group value, assign
type=StreamGroup, then propagate types and $parameters into the
nested DeclarativeStream items.
"""
component = {
"type": "DeclarativeSource",
"streams": [],
"stream_groups": {
"group_a": {
"streams": [
{
"$parameters": {"name": "users", "primary_key": "id"},
"retriever": {
"type": "SimpleRetriever",
"record_selector": {
"type": "RecordSelector",
"extractor": {"type": "DpathExtractor", "field_path": []},
},
"requester": {
"type": "HttpRequester",
"url_base": "https://api.example.com",
"path": "/users",
},
},
},
],
"action": {"type": "BlockSimultaneousSyncsAction"},
},
},
}

transformer = ManifestComponentTransformer()
result = transformer.propagate_types_and_parameters("", component, {})

group = result["stream_groups"]["group_a"]
# StreamGroup type is assigned
assert group["type"] == "StreamGroup"
# The action component is preserved
assert group["action"]["type"] == "BlockSimultaneousSyncsAction"

stream = group["streams"][0]
# DeclarativeStream type is inferred via DEFAULT_MODEL_TYPES
assert stream["type"] == "DeclarativeStream"
# $parameters are propagated into the stream and its children
assert stream["name"] == "users"
assert stream["primary_key"] == "id"
assert stream["retriever"]["name"] == "users"
assert stream["retriever"]["primary_key"] == "id"
assert stream["retriever"]["requester"]["name"] == "users"
assert stream["retriever"]["requester"]["primary_key"] == "id"
assert stream["retriever"]["requester"]["$parameters"] == {
"name": "users",
"primary_key": "id",
}


def test_stream_groups_with_multiple_groups():
"""Multiple groups each get type=StreamGroup and proper recursion."""
component = {
"type": "DeclarativeSource",
"streams": [],
"stream_groups": {
"api_v1": {
"streams": [
{
"$parameters": {"name": "posts"},
"retriever": {"type": "SimpleRetriever"},
},
],
"action": {"type": "BlockSimultaneousSyncsAction"},
},
"api_v2": {
"streams": [
{
"$parameters": {"name": "comments"},
"retriever": {"type": "SimpleRetriever"},
},
],
"action": {"type": "BlockSimultaneousSyncsAction"},
},
},
}

transformer = ManifestComponentTransformer()
result = transformer.propagate_types_and_parameters("", component, {})

for group_name in ("api_v1", "api_v2"):
group = result["stream_groups"][group_name]
assert group["type"] == "StreamGroup"

assert result["stream_groups"]["api_v1"]["streams"][0]["type"] == "DeclarativeStream"
assert result["stream_groups"]["api_v1"]["streams"][0]["name"] == "posts"

assert result["stream_groups"]["api_v2"]["streams"][0]["type"] == "DeclarativeStream"
assert result["stream_groups"]["api_v2"]["streams"][0]["name"] == "comments"


def test_stream_groups_preserves_explicit_types():
"""When a stream inside a group already has an explicit type, it is not overwritten."""
component = {
"type": "DeclarativeSource",
"streams": [],
"stream_groups": {
"my_group": {
"type": "StreamGroup",
"streams": [
{
"type": "DeclarativeStream",
"retriever": {"type": "SimpleRetriever"},
},
],
"action": {"type": "BlockSimultaneousSyncsAction"},
},
},
}

transformer = ManifestComponentTransformer()
result = transformer.propagate_types_and_parameters("", component, {})

group = result["stream_groups"]["my_group"]
assert group["type"] == "StreamGroup"
assert group["streams"][0]["type"] == "DeclarativeStream"
Loading