feat(streams): Add topic name override for StreamSource and StreamSink#257
feat(streams): Add topic name override for StreamSource and StreamSink#257
Conversation
Add the ability to override topic names for StreamSource and StreamSink steps via deployment configuration using the "topic" key. The override is optional and falls back to the hardcoded stream_name if not provided. Changes: - Add override_config() method to StreamSource class - Add override_config() method to StreamSink class - Update Arroyo Rust adapter to call override_config() for sources - Update Arroyo legacy adapter to call override_config() for sources - Add comprehensive test coverage (4 new tests) This follows the existing override_config() pattern used by GCSSink and maintains full backwards compatibility with existing pipelines. Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
Semver Impact of This PR🟡 Minor (new features) 📋 Changelog PreviewThis is how your changes will appear in the changelog. New Features ✨
Internal Changes 🔧Deps
🤖 This preview updates automatically when you update the PR. |
| # Apply config overrides and validate | ||
| step_config: Mapping[str, Any] = self.config.get(step.name, {}) | ||
| step.override_config(step_config) | ||
| step.validate() |
There was a problem hiding this comment.
Bug: The legacy ArroyoAdapter.sink() method doesn't call override_config(), causing topic overrides for StreamSink to be silently ignored, unlike the new Rust adapter.
Severity: HIGH
Suggested Fix
In sentry_streams/sentry_streams/adapters/arroyo/adapter.py, before adding the StreamSinkStep, call step.override_config() to apply any configuration overrides. This can be done by fetching the step_config from self.steps_config and passing it to step.override_config(step_config), mirroring the implementation in add_source() and the rust_arroyo.py adapter's sink() method.
Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent.
Verify if this is a real issue. If it is, propose a fix; if not, explain why it's not
valid.
Location: sentry_streams/sentry_streams/adapters/arroyo/adapter.py#L107-L110
Potential issue: The `ArroyoAdapter.sink()` method in `adapter.py` fails to call
`step.override_config()` before adding a `StreamSinkStep`. As a result, if a deployment
configuration provides a `topic` override for a `StreamSink`, it will be silently
ignored. The sink will continue to write to its hardcoded `stream_name` instead of the
intended, overridden topic. This creates an inconsistency with the `rust_arroyo.py`
adapter, which correctly handles this override, leading to different behavior depending
on which adapter is used.
Did we get this right? 👍 / 👎 to inform future reviews.
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 2 potential issues.
Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
| # Apply config overrides and validate | ||
| step_config: Mapping[str, Any] = self.steps_config.get(step.name, {}) | ||
| step.override_config(step_config) | ||
| step.validate() |
There was a problem hiding this comment.
Redundant config fetch in rust_arroyo source method
Low Severity
step_config on line 252 fetches the same value from self.steps_config as source_config on line 248, since source_name == step.name. The source_config is already asserted non-None on line 249, making the {} default on step_config unreachable dead code. The existing source_config variable could be passed directly to override_config() instead of performing a redundant dictionary lookup.
| def override_config(self, loaded_config: Mapping[str, Any]) -> None: | ||
| """Override topic name from deployment configuration.""" | ||
| if loaded_config.get("topic"): | ||
| self.stream_name = str(loaded_config.get("topic")) |
There was a problem hiding this comment.
Falsy topic values silently skip the override
Low Severity
Both StreamSource.override_config() and StreamSink.override_config() use if loaded_config.get("topic"): as a truthy check. If a deployment config explicitly sets topic to an empty string "", the override silently does not apply and the original stream_name is kept, with no warning. Using if "topic" in loaded_config: or if loaded_config.get("topic") is not None: would be more precise and consistent with the DevNullSink pattern (which uses is not None checks).
Additional Locations (1)
There was a problem hiding this comment.
We don't care about falsey topics.
| def override_config(self, loaded_config: Mapping[str, Any]) -> None: | ||
| """Override topic name from deployment configuration.""" | ||
| if loaded_config.get("topic"): | ||
| self.stream_name = str(loaded_config.get("topic")) |
There was a problem hiding this comment.
We don't care about falsey topics.


Summary
Add the ability to override topic names for
StreamSourceandStreamSinksteps via deployment configuration using the"topic"key. The override is optional and falls back to the hardcodedstream_nameif not provided.Changes
override_config()method toStreamSourceclassoverride_config()method toStreamSinkclassoverride_config()for sourcesoverride_config()for sourcesImplementation Details
This follows the existing
override_config()pattern used byGCSSink, ensuring consistency across the codebase. The implementation maintains full backwards compatibility with existing pipelines.Usage Example
Pipeline Code (remains unchanged):
Deployment YAML (new optional configuration):
Result:
Test plan
🤖 Generated with Claude Code