fix: propagate types and $parameters into stream_groups items#1003
fix: propagate types and $parameters into stream_groups items#1003devin-ai-integration[bot] wants to merge 2 commits intomainfrom
Conversation
ManifestComponentTransformer did not recurse into the stream_groups field because it uses the JSON Schema additionalProperties pattern (Dict[str, StreamGroup]) which the transformer did not handle. Changes: - Add StreamGroup.streams -> DeclarativeStream to DEFAULT_MODEL_TYPES - Add ADDITIONAL_PROPERTIES_TYPES mapping for DeclarativeSource.stream_groups - Handle dict-of-components traversal in the main recursion loop - Add 3 regression tests for stream_groups type/parameter propagation Co-Authored-By: bot_apk <apk@cognition.ai>
🤖 Devin AI EngineerI'll be helping with this pull request! Here's what you should know: ✅ I will automatically:
Note: I can only respond to comments from users who have write access to this repository. ⚙️ Control Options:
|
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. 💡 Show Tips and TricksTesting This CDK VersionYou can test this version of the CDK using the following: # Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@devin/1777343889-fix-stream-groups-transformer#egg=airbyte-python-cdk[dev]' --help
# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch devin/1777343889-fix-stream-groups-transformerPR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR:
|
There was a problem hiding this comment.
Pull request overview
Fixes ManifestComponentTransformer.propagate_types_and_parameters() so it properly recurses into stream_groups (an additionalProperties/dict-of-components field), enabling default type inference and $parameters propagation for components nested under stream groups.
Changes:
- Add default typing for
StreamGroup.streamsitems and introduce anADDITIONAL_PROPERTIES_TYPESregistry for dict-of-components fields. - Extend recursion logic to detect
additionalPropertiesfields (e.g.,DeclarativeSource.stream_groups), assign missingtypeto each dict value, and recurse into each value. - Add regression tests covering type inference/parameter propagation, multiple groups, and preservation of explicit
typefields.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py |
Adds additionalProperties-aware recursion for stream_groups and default type inference for nested stream items. |
unit_tests/sources/declarative/parsers/test_manifest_component_transformer.py |
Adds regression tests ensuring stream groups receive inferred types and propagated $parameters. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| # StreamGroup | ||
| "StreamGroup.streams": "DeclarativeStream", |
There was a problem hiding this comment.
DEFAULT_MODEL_TYPES maps StreamGroup.streams to DeclarativeStream, which assumes the StreamGroup.streams field contains stream component objects. However, the generated Pydantic schema currently defines StreamGroup.streams as List[str] (airbyte_cdk/sources/declarative/models/declarative_component_schema.py:3155-3160). Please reconcile the expected manifest shape (and update either the model/schema or this default-type mapping) so type propagation matches the canonical StreamGroup definition.
| # StreamGroup | |
| "StreamGroup.streams": "DeclarativeStream", |
There was a problem hiding this comment.
The Pydantic model StreamGroup.streams is typed as List[str] but the YAML schema (declarative_component_schema.yaml:4266-4273) defines it as:
streams:
type: array
items:
anyOf:
- "$ref": "#/definitions/DeclarativeStream"The transformer operates on the resolved manifest dict (pre-pydantic), where stream items are full component dicts, not strings. The pydantic model is not involved at this stage — ManifestComponentTransformer runs before model instantiation, processing raw YAML-resolved dicts. The _apply_stream_groups method in concurrent_declarative_source.py similarly reads from the resolved manifest dict directly (accessing .get("streams", []) and .get("name", "") on each item).
So the DEFAULT_MODEL_TYPES entry here is correct: at the point the transformer runs, StreamGroup.streams items are DeclarativeStream component dicts that need type inference and parameter propagation.
| value, | ||
| current_parameters, | ||
| use_parent_parameters=use_parent_parameters, | ||
| ) |
There was a problem hiding this comment.
In the additionalProperties branch, the updated dict is only mutated via the local field_value alias and never explicitly re-assigned back to propagated_component[field_name]. This works because field_value references the same dict, but it’s brittle/inconsistent with the non-additionalProperties path (which assigns the recursive result). Consider assigning propagated_component[field_name] = field_value after processing to make the data flow explicit and avoid surprises if this code is later refactored (e.g., if field_value stops being a plain dict).
| ) | |
| ) | |
| propagated_component[field_name] = field_value |
There was a problem hiding this comment.
Good point — added the explicit propagated_component[field_name] = field_value assignment in 5066d4a to make the data flow consistent with the else branch.
Makes data flow explicit and consistent with the non-additionalProperties branch, as suggested in code review. Co-Authored-By: bot_apk <apk@cognition.ai>
Summary
ManifestComponentTransformer.propagate_types_and_parameters()did not recurse into thestream_groupsfield because it uses the JSON SchemaadditionalPropertiespattern (Dict[str, StreamGroup]). The transformer handled direct component dicts and lists of components, but not dicts-of-components. When processingstream_groups, it entered the untyped-component early return without processing anything.This meant streams defined inside
stream_groupsdid not get theirtypeinferred (e.g.,DeclarativeStream) and$parameterswere not propagated into them or their children.Changes
"StreamGroup.streams": "DeclarativeStream"toDEFAULT_MODEL_TYPESso stream items inside aStreamGroupget the correct default type.ADDITIONAL_PROPERTIES_TYPESmapping ("DeclarativeSource.stream_groups": "StreamGroup") to tell the transformer that dict values understream_groupsareStreamGroupcomponents.ADDITIONAL_PROPERTIES_TYPES, iterate the dict's values, assign the mapped type on each, and recurse into each individually.Prior art
DynamicDeclarativeStream.stream_templateandDynamicDeclarativeStream.components_resolverentries toDEFAULT_MODEL_TYPESfor an identical class of bug.stream_groupsfeature but did not update the transformer.Declarative-First Evaluation
This fix is entirely within the CDK's manifest parsing infrastructure (
ManifestComponentTransformer). It adds a new mapping and extends the existing recursion logic — no custom Python components are involved.Test Coverage
Added 3 regression tests:
test_stream_groups_propagate_types_and_parameters— verifies type inference and full$parameterspropagation through nested stream components inside a group.test_stream_groups_with_multiple_groups— verifies multiple groups each gettype=StreamGroupand proper recursion.test_stream_groups_preserves_explicit_types— verifies explicit types on groups/streams are not overwritten.All 21 tests in
test_manifest_component_transformer.pypass locally.Resolves https://github.com/airbytehq/airbyte-internal-issues/issues/16257:
Related to https://github.com/airbytehq/airbyte-internal-issues/issues/16256:
Review & Testing Checklist for Human
ADDITIONAL_PROPERTIES_TYPEShandling correctly iterates dict values and does not break existing dict-field recursion_apply_stream_groupsinconcurrent_declarative_source.pyis unaffected by the addedtype: StreamGroupfield on group configsstream_groups(e.g., the source-slack PR at fix(source-slack): add stream_groups to serialize substream parent walks airbyte#77131) to verify end-to-end behaviorNotes
StreamGrouphas notypefield in the YAML schema — the transformer now assignstype=StreamGroupimplicitly viaADDITIONAL_PROPERTIES_TYPES.type: StreamGroupto group config dicts does not break_apply_stream_groupssince it reads from the resolved manifest dict directly and ignores unknown fields.Link to Devin session: https://app.devin.ai/sessions/33a873e377924cb79b54a3a06cd23d79