Skip to content

fix: propagate types and $parameters into stream_groups items#1003

Open
devin-ai-integration[bot] wants to merge 2 commits intomainfrom
devin/1777343889-fix-stream-groups-transformer
Open

fix: propagate types and $parameters into stream_groups items#1003
devin-ai-integration[bot] wants to merge 2 commits intomainfrom
devin/1777343889-fix-stream-groups-transformer

Conversation

@devin-ai-integration
Copy link
Copy Markdown
Contributor

Summary

ManifestComponentTransformer.propagate_types_and_parameters() did not recurse into the stream_groups field because it uses the JSON Schema additionalProperties pattern (Dict[str, StreamGroup]). The transformer handled direct component dicts and lists of components, but not dicts-of-components. When processing stream_groups, it entered the untyped-component early return without processing anything.

This meant streams defined inside stream_groups did not get their type inferred (e.g., DeclarativeStream) and $parameters were not propagated into them or their children.

Changes

  1. Added "StreamGroup.streams": "DeclarativeStream" to DEFAULT_MODEL_TYPES so stream items inside a StreamGroup get the correct default type.
  2. Added a new ADDITIONAL_PROPERTIES_TYPES mapping ("DeclarativeSource.stream_groups": "StreamGroup") to tell the transformer that dict values under stream_groups are StreamGroup components.
  3. Modified the dict-field handling in the main recursion loop: when a field matches ADDITIONAL_PROPERTIES_TYPES, iterate the dict's values, assign the mapped type on each, and recurse into each individually.

Prior art

Declarative-First Evaluation

This fix is entirely within the CDK's manifest parsing infrastructure (ManifestComponentTransformer). It adds a new mapping and extends the existing recursion logic — no custom Python components are involved.

Test Coverage

Added 3 regression tests:

  • test_stream_groups_propagate_types_and_parameters — verifies type inference and full $parameters propagation through nested stream components inside a group.
  • test_stream_groups_with_multiple_groups — verifies multiple groups each get type=StreamGroup and proper recursion.
  • test_stream_groups_preserves_explicit_types — verifies explicit types on groups/streams are not overwritten.

All 21 tests in test_manifest_component_transformer.py pass locally.

Resolves https://github.com/airbytehq/airbyte-internal-issues/issues/16257:

Related to https://github.com/airbytehq/airbyte-internal-issues/issues/16256:

Review & Testing Checklist for Human

  • Verify the ADDITIONAL_PROPERTIES_TYPES handling correctly iterates dict values and does not break existing dict-field recursion
  • Confirm _apply_stream_groups in concurrent_declarative_source.py is unaffected by the added type: StreamGroup field on group configs
  • Test with a real connector manifest that uses stream_groups (e.g., the source-slack PR at fix(source-slack): add stream_groups to serialize substream parent walks airbyte#77131) to verify end-to-end behavior

Notes

  • StreamGroup has no type field in the YAML schema — the transformer now assigns type=StreamGroup implicitly via ADDITIONAL_PROPERTIES_TYPES.
  • Adding type: StreamGroup to group config dicts does not break _apply_stream_groups since it reads from the resolved manifest dict directly and ignores unknown fields.
  • This unblocks fix(source-slack): add stream_groups to serialize substream parent walks airbyte#77131 (source-slack stream_groups).

Link to Devin session: https://app.devin.ai/sessions/33a873e377924cb79b54a3a06cd23d79

ManifestComponentTransformer did not recurse into the stream_groups
field because it uses the JSON Schema additionalProperties pattern
(Dict[str, StreamGroup]) which the transformer did not handle.

Changes:
- Add StreamGroup.streams -> DeclarativeStream to DEFAULT_MODEL_TYPES
- Add ADDITIONAL_PROPERTIES_TYPES mapping for DeclarativeSource.stream_groups
- Handle dict-of-components traversal in the main recursion loop
- Add 3 regression tests for stream_groups type/parameter propagation

Co-Authored-By: bot_apk <apk@cognition.ai>
@devin-ai-integration
Copy link
Copy Markdown
Contributor Author

🤖 Devin AI Engineer

I'll be helping with this pull request! Here's what you should know:

✅ I will automatically:

  • Address comments on this PR. Add '(aside)' to your comment to have me ignore it.
  • Look at CI failures and help fix them

Note: I can only respond to comments from users who have write access to this repository.

⚙️ Control Options:

  • Disable automatic comment and CI monitoring

@github-actions
Copy link
Copy Markdown

👋 Greetings, Airbyte Team Member!

Here are some helpful tips and reminders for your convenience.

💡 Show Tips and Tricks

Testing This CDK Version

You can test this version of the CDK using the following:

# Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@devin/1777343889-fix-stream-groups-transformer#egg=airbyte-python-cdk[dev]' --help

# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch devin/1777343889-fix-stream-groups-transformer

PR Slash Commands

Airbyte Maintainers can execute the following slash commands on your PR:

  • /autofix - Fixes most formatting and linting issues
  • /poetry-lock - Updates poetry.lock file
  • /test - Runs connector tests with the updated CDK
  • /prerelease - Triggers a prerelease publish with default arguments
  • /poe build - Regenerate git-committed build artifacts, such as the pydantic models which are generated from the manifest JSON schema in YAML.
  • /poe <command> - Runs any poe command in the CDK environment
📚 Show Repo Guidance

Helpful Resources

📝 Edit this welcome message.

@github-actions
Copy link
Copy Markdown

github-actions Bot commented Apr 28, 2026

PyTest Results (Fast)

4 037 tests  +3   4 026 ✅ +3   7m 55s ⏱️ -7s
    1 suites ±0      11 💤 ±0 
    1 files   ±0       0 ❌ ±0 

Results for commit 5066d4a. ± Comparison against base commit 60bae81.

♻️ This comment has been updated with latest results.

@github-actions
Copy link
Copy Markdown

github-actions Bot commented Apr 28, 2026

PyTest Results (Full)

4 040 tests  +3   4 028 ✅ +3   11m 31s ⏱️ +34s
    1 suites ±0      12 💤 ±0 
    1 files   ±0       0 ❌ ±0 

Results for commit 5066d4a. ± Comparison against base commit 60bae81.

♻️ This comment has been updated with latest results.

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Fixes ManifestComponentTransformer.propagate_types_and_parameters() so it properly recurses into stream_groups (an additionalProperties/dict-of-components field), enabling default type inference and $parameters propagation for components nested under stream groups.

Changes:

  • Add default typing for StreamGroup.streams items and introduce an ADDITIONAL_PROPERTIES_TYPES registry for dict-of-components fields.
  • Extend recursion logic to detect additionalProperties fields (e.g., DeclarativeSource.stream_groups), assign missing type to each dict value, and recurse into each value.
  • Add regression tests covering type inference/parameter propagation, multiple groups, and preservation of explicit type fields.

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.

File Description
airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py Adds additionalProperties-aware recursion for stream_groups and default type inference for nested stream items.
unit_tests/sources/declarative/parsers/test_manifest_component_transformer.py Adds regression tests ensuring stream groups receive inferred types and propagated $parameters.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +70 to +71
# StreamGroup
"StreamGroup.streams": "DeclarativeStream",
Copy link

Copilot AI Apr 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DEFAULT_MODEL_TYPES maps StreamGroup.streams to DeclarativeStream, which assumes the StreamGroup.streams field contains stream component objects. However, the generated Pydantic schema currently defines StreamGroup.streams as List[str] (airbyte_cdk/sources/declarative/models/declarative_component_schema.py:3155-3160). Please reconcile the expected manifest shape (and update either the model/schema or this default-type mapping) so type propagation matches the canonical StreamGroup definition.

Suggested change
# StreamGroup
"StreamGroup.streams": "DeclarativeStream",

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Pydantic model StreamGroup.streams is typed as List[str] but the YAML schema (declarative_component_schema.yaml:4266-4273) defines it as:

streams:
  type: array
  items:
    anyOf:
      - "$ref": "#/definitions/DeclarativeStream"

The transformer operates on the resolved manifest dict (pre-pydantic), where stream items are full component dicts, not strings. The pydantic model is not involved at this stage — ManifestComponentTransformer runs before model instantiation, processing raw YAML-resolved dicts. The _apply_stream_groups method in concurrent_declarative_source.py similarly reads from the resolved manifest dict directly (accessing .get("streams", []) and .get("name", "") on each item).

So the DEFAULT_MODEL_TYPES entry here is correct: at the point the transformer runs, StreamGroup.streams items are DeclarativeStream component dicts that need type inference and parameter propagation.

value,
current_parameters,
use_parent_parameters=use_parent_parameters,
)
Copy link

Copilot AI Apr 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the additionalProperties branch, the updated dict is only mutated via the local field_value alias and never explicitly re-assigned back to propagated_component[field_name]. This works because field_value references the same dict, but it’s brittle/inconsistent with the non-additionalProperties path (which assigns the recursive result). Consider assigning propagated_component[field_name] = field_value after processing to make the data flow explicit and avoid surprises if this code is later refactored (e.g., if field_value stops being a plain dict).

Suggested change
)
)
propagated_component[field_name] = field_value

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point — added the explicit propagated_component[field_name] = field_value assignment in 5066d4a to make the data flow consistent with the else branch.

Makes data flow explicit and consistent with the non-additionalProperties
branch, as suggested in code review.

Co-Authored-By: bot_apk <apk@cognition.ai>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant