Skip to content

feat(streams): Add topic name override for StreamSource and StreamSink#257

Open
fpacifici wants to merge 1 commit intomainfrom
fpacifici/override_topic
Open

feat(streams): Add topic name override for StreamSource and StreamSink#257
fpacifici wants to merge 1 commit intomainfrom
fpacifici/override_topic

Conversation

@fpacifici
Copy link
Collaborator

Summary

Add the ability to override topic names for StreamSource and StreamSink steps via deployment configuration using the "topic" key. The override is optional and falls back to the hardcoded stream_name if not provided.

Changes

  • Add override_config() method to StreamSource class
  • Add override_config() method to StreamSink class
  • Update Arroyo Rust adapter to call override_config() for sources
  • Update Arroyo legacy adapter to call override_config() for sources
  • Add comprehensive test coverage (4 new tests)

Implementation Details

This follows the existing override_config() pattern used by GCSSink, ensuring consistency across the codebase. The implementation maintains full backwards compatibility with existing pipelines.

Usage Example

Pipeline Code (remains unchanged):

pipeline = (
    streaming_source(name="my_source", stream_name="events")
    .apply(Map(name="transform", function=transform_func))
    .sink(StreamSink(name="my_sink", stream_name="output"))
)

Deployment YAML (new optional configuration):

pipeline:
  segments:
    - steps_config:
        my_source:
          starts_segment: true
          bootstrap_servers: ["kafka:9092"]
          topic: "production-events-v2"  # Override source topic

        my_sink:
          bootstrap_servers: ["kafka:9092"]
          topic: "production-output-v2"  # Override sink topic

Result:

  • Source reads from "production-events-v2" (not "events")
  • Sink writes to "production-output-v2" (not "output")

Test plan

  • All 4 new unit tests pass
  • All 29 pipeline tests pass (no regressions)
  • Type checking passes with no errors
  • Pre-commit hooks pass

🤖 Generated with Claude Code

Add the ability to override topic names for StreamSource and StreamSink
steps via deployment configuration using the "topic" key. The override is
optional and falls back to the hardcoded stream_name if not provided.

Changes:
- Add override_config() method to StreamSource class
- Add override_config() method to StreamSink class
- Update Arroyo Rust adapter to call override_config() for sources
- Update Arroyo legacy adapter to call override_config() for sources
- Add comprehensive test coverage (4 new tests)

This follows the existing override_config() pattern used by GCSSink and
maintains full backwards compatibility with existing pipelines.

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
@fpacifici fpacifici requested a review from a team as a code owner March 13, 2026 02:34
@github-actions
Copy link

Semver Impact of This PR

🟡 Minor (new features)

📋 Changelog Preview

This is how your changes will appear in the changelog.
Entries from this PR are highlighted with a left border (blockquote style).


New Features ✨

  • (streams) Add topic name override for StreamSource and StreamSink by fpacifici in #257

Internal Changes 🔧

Deps

  • Bump virtualenv from 20.31.2 to 20.36.1 in /sentry_streams by dependabot in #248
  • Bump bytes from 1.10.1 to 1.11.1 in /sentry_streams by dependabot in #242
  • Bump urllib3 from 2.5.0 to 2.6.3 in /sentry_streams by dependabot in #239
  • Bump protobuf from 5.29.5 to 5.29.6 in /sentry_streams by dependabot in #244

🤖 This preview updates automatically when you update the PR.

Comment on lines +107 to +110
# Apply config overrides and validate
step_config: Mapping[str, Any] = self.config.get(step.name, {})
step.override_config(step_config)
step.validate()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: The legacy ArroyoAdapter.sink() method doesn't call override_config(), causing topic overrides for StreamSink to be silently ignored, unlike the new Rust adapter.
Severity: HIGH

Suggested Fix

In sentry_streams/sentry_streams/adapters/arroyo/adapter.py, before adding the StreamSinkStep, call step.override_config() to apply any configuration overrides. This can be done by fetching the step_config from self.steps_config and passing it to step.override_config(step_config), mirroring the implementation in add_source() and the rust_arroyo.py adapter's sink() method.

Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.

Location: sentry_streams/sentry_streams/adapters/arroyo/adapter.py#L107-L110

Potential issue: The `ArroyoAdapter.sink()` method in `adapter.py` fails to call
`step.override_config()` before adding a `StreamSinkStep`. As a result, if a deployment
configuration provides a `topic` override for a `StreamSink`, it will be silently
ignored. The sink will continue to write to its hardcoded `stream_name` instead of the
intended, overridden topic. This creates an inconsistency with the `rust_arroyo.py`
adapter, which correctly handles this override, leading to different behavior depending
on which adapter is used.

Did we get this right? 👍 / 👎 to inform future reviews.

Copy link

@cursor cursor bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 2 potential issues.

Fix All in Cursor

Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.

# Apply config overrides and validate
step_config: Mapping[str, Any] = self.steps_config.get(step.name, {})
step.override_config(step_config)
step.validate()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Redundant config fetch in rust_arroyo source method

Low Severity

step_config on line 252 fetches the same value from self.steps_config as source_config on line 248, since source_name == step.name. The source_config is already asserted non-None on line 249, making the {} default on step_config unreachable dead code. The existing source_config variable could be passed directly to override_config() instead of performing a redundant dictionary lookup.

Fix in Cursor Fix in Web

def override_config(self, loaded_config: Mapping[str, Any]) -> None:
"""Override topic name from deployment configuration."""
if loaded_config.get("topic"):
self.stream_name = str(loaded_config.get("topic"))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Falsy topic values silently skip the override

Low Severity

Both StreamSource.override_config() and StreamSink.override_config() use if loaded_config.get("topic"): as a truthy check. If a deployment config explicitly sets topic to an empty string "", the override silently does not apply and the original stream_name is kept, with no warning. Using if "topic" in loaded_config: or if loaded_config.get("topic") is not None: would be more precise and consistent with the DevNullSink pattern (which uses is not None checks).

Additional Locations (1)
Fix in Cursor Fix in Web

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't care about falsey topics.

def override_config(self, loaded_config: Mapping[str, Any]) -> None:
"""Override topic name from deployment configuration."""
if loaded_config.get("topic"):
self.stream_name = str(loaded_config.get("topic"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't care about falsey topics.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants