-
Notifications
You must be signed in to change notification settings - Fork 44
fix: propagate types and $parameters into stream_groups items #1003
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||
|---|---|---|---|---|---|---|---|---|
|
|
@@ -67,6 +67,14 @@ | |||||||
| "DynamicSchemaLoader.retriever": "SimpleRetriever", | ||||||||
| # SchemaTypeIdentifier | ||||||||
| "SchemaTypeIdentifier.types_map": "TypesMap", | ||||||||
| # StreamGroup | ||||||||
| "StreamGroup.streams": "DeclarativeStream", | ||||||||
| } | ||||||||
|
|
||||||||
| # Mapping for fields that use the JSON Schema additionalProperties pattern (Dict[str, ComponentType]). | ||||||||
| # The transformer uses this to identify dict values as typed components and recurse into each. | ||||||||
| ADDITIONAL_PROPERTIES_TYPES: Mapping[str, str] = { | ||||||||
| "DeclarativeSource.stream_groups": "StreamGroup", | ||||||||
| } | ||||||||
|
|
||||||||
| # We retain a separate registry for custom components to automatically insert the type if it is missing. This is intended to | ||||||||
|
|
@@ -159,12 +167,30 @@ def propagate_types_and_parameters( | |||||||
| # We exclude propagating a parameter that matches the current field name because that would result in an infinite cycle | ||||||||
| excluded_parameter = current_parameters.pop(field_name, None) | ||||||||
| parent_type_field_identifier = f"{propagated_component.get('type')}.{field_name}" | ||||||||
| propagated_component[field_name] = self.propagate_types_and_parameters( | ||||||||
| parent_type_field_identifier, | ||||||||
| field_value, | ||||||||
| current_parameters, | ||||||||
| use_parent_parameters=use_parent_parameters, | ||||||||
| additional_props_type = ADDITIONAL_PROPERTIES_TYPES.get( | ||||||||
| parent_type_field_identifier | ||||||||
| ) | ||||||||
| if additional_props_type: | ||||||||
| # Handle additionalProperties pattern: field_value is a dict of | ||||||||
| # component dicts keyed by arbitrary names. | ||||||||
| for key, value in field_value.items(): | ||||||||
| if isinstance(value, dict): | ||||||||
| if "type" not in value: | ||||||||
| value["type"] = additional_props_type | ||||||||
| field_value[key] = self.propagate_types_and_parameters( | ||||||||
| parent_type_field_identifier, | ||||||||
| value, | ||||||||
| current_parameters, | ||||||||
| use_parent_parameters=use_parent_parameters, | ||||||||
| ) | ||||||||
|
||||||||
| ) | |
| ) | |
| propagated_component[field_name] = field_value |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point — added the explicit propagated_component[field_name] = field_value assignment in 5066d4a to make the data flow consistent with the else branch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DEFAULT_MODEL_TYPESmapsStreamGroup.streamstoDeclarativeStream, which assumes theStreamGroup.streamsfield contains stream component objects. However, the generated Pydantic schema currently definesStreamGroup.streamsasList[str](airbyte_cdk/sources/declarative/models/declarative_component_schema.py:3155-3160). Please reconcile the expected manifest shape (and update either the model/schema or this default-type mapping) so type propagation matches the canonical StreamGroup definition.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Pydantic model
StreamGroup.streamsis typed asList[str]but the YAML schema (declarative_component_schema.yaml:4266-4273) defines it as:The transformer operates on the resolved manifest dict (pre-pydantic), where stream items are full component dicts, not strings. The pydantic model is not involved at this stage —
ManifestComponentTransformerruns before model instantiation, processing raw YAML-resolved dicts. The_apply_stream_groupsmethod inconcurrent_declarative_source.pysimilarly reads from the resolved manifest dict directly (accessing.get("streams", [])and.get("name", "")on each item).So the
DEFAULT_MODEL_TYPESentry here is correct: at the point the transformer runs,StreamGroup.streamsitems areDeclarativeStreamcomponent dicts that need type inference and parameter propagation.