Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 1 addition & 14 deletions sentry_streams/sentry_streams/adapters/arroyo/rust_arroyo.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,19 +269,6 @@ def sink(self, step: Sink[Any], stream: Route) -> Route:
step.validate()

if isinstance(step, GCSSink):
if sink_config := self.steps_config.get(step.name):
bucket = (
step.bucket if not sink_config.get("bucket") else str(sink_config.get("bucket"))
)
parallelism_config = cast(Mapping[str, Any], sink_config.get("parallelism"))
if parallelism_config:
thread_count = cast(int, parallelism_config["threads"])
else:
thread_count = 1
else:
bucket = step.bucket
thread_count = 1

# TODO: This object_generator is just used to get the name of the object that is being written.
# Fix this to wrap the actual step instead of just the object_generator.
# This will at least capture the number of calls to the step, if nothing else.
Expand All @@ -294,7 +281,7 @@ def wrapped_generator() -> str:

logger.info(f"Adding GCS sink: {step.name} to pipeline")
self.__consumers[stream.source].add_step(
RuntimeOperator.GCSSink(route, bucket, wrapped_generator, thread_count)
RuntimeOperator.GCSSink(route, step.bucket, wrapped_generator, step.thread_count)
)

elif isinstance(step, DevNullSink):
Expand Down
10 changes: 10 additions & 0 deletions sentry_streams/sentry_streams/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,8 +287,18 @@ class GCSSink(Sink[TIn]):

bucket: str
object_generator: Callable[[], str]
thread_count: int = 1
step_type: StepType = StepType.SINK

def override_config(self, loaded_config: Mapping[str, Any]) -> None:
"""Override bucket and thread_count from deployment configuration."""
if loaded_config.get("bucket"):
self.bucket = str(loaded_config.get("bucket"))

parallelism_config = cast(Mapping[str, Any], loaded_config.get("parallelism", {}))
if "threads" in parallelism_config:
self.thread_count = int(parallelism_config["threads"])


@dataclass
class StreamSink(Sink[TIn]):
Expand Down
41 changes: 41 additions & 0 deletions sentry_streams/tests/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -483,3 +483,44 @@ def test_devnullsink_override_config() -> None:
assert sink.batch_time_ms == 5000.0
assert sink.average_sleep_time_ms == 100.0
assert sink.max_sleep_time_ms == 200.0


def test_gcssink_override_config() -> None:
"""Test that GCSSink config can be overridden from deployment config."""
from sentry_streams.pipeline.pipeline import GCSSink

sink = GCSSink[str](
name="gcs_sink",
bucket="default-bucket",
object_generator=lambda: "test-file.txt",
thread_count=1,
)

config = {
"bucket": "override-bucket",
"parallelism": {
"threads": 4,
},
}
sink.override_config(config)

assert sink.bucket == "override-bucket"
assert sink.thread_count == 4


def test_gcssink_override_config_empty() -> None:
"""Test that GCSSink handles empty config correctly."""
from sentry_streams.pipeline.pipeline import GCSSink

sink = GCSSink[str](
name="gcs_sink",
bucket="original-bucket",
object_generator=lambda: "test-file.txt",
thread_count=3,
)

config: Mapping[str, Any] = {}
sink.override_config(config)

assert sink.bucket == "original-bucket"
assert sink.thread_count == 3
4 changes: 2 additions & 2 deletions sentry_streams/uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.