Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ repos:
types-pyYAML,
types-jsonschema,
"sentry-kafka-schemas>=1.2.0",
"types-protobuf",
"sentry-protos>=0.1.74"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is FSL, we cannot import it in an Apache 2.0 library

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do I understand it right that this is only used for tests? in that case why not write your own protobuf schema?

Copy link
Contributor Author

@ayirr7 ayirr7 May 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can do that, I just relied on existing ones out of ease of use and didn't know about the license issue

]
files: ^sentry_streams/.+
- repo: https://github.com/pycqa/isort
Expand Down
1 change: 1 addition & 0 deletions sentry_streams/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ dependencies = [
"pyyaml>=6.0.2",
"jsonschema>=4.23.0",
"sentry-kafka-schemas>=1.2.0",
"types-protobuf>=6.30.2.20250516",
]

[dependency-groups]
Expand Down
3 changes: 2 additions & 1 deletion sentry_streams/sentry_streams/examples/batching.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from sentry_streams.pipeline import Batch, FlatMap, streaming_source
from sentry_streams.pipeline.batch import unbatch
from sentry_streams.pipeline.chain import Parser, Serializer
from sentry_streams.pipeline.message import MessageSchema

pipeline = streaming_source(
name="myinput",
Expand All @@ -19,6 +20,6 @@
FlatMap(function=unbatch),
)

chain3 = chain2.apply("serializer", Serializer()).sink(
chain3 = chain2.apply("serializer", Serializer(schema_type=MessageSchema.JSON)).sink(
"kafkasink2", stream_name="transformed-events"
) # flush the batches to the Sink
5 changes: 3 additions & 2 deletions sentry_streams/sentry_streams/examples/blq.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@
)
from sentry_streams.pipeline import segment, streaming_source
from sentry_streams.pipeline.chain import Parser, Serializer
from sentry_streams.pipeline.message import MessageSchema

storage_branch = (
segment(name="recent", msg_type=IngestMetric)
.apply("serializer1", Serializer())
.apply("serializer1", Serializer(schema_type=MessageSchema.JSON))
.broadcast(
"send_message_to_DBs",
routes=[
Expand All @@ -25,7 +26,7 @@

save_delayed_message = (
segment(name="delayed", msg_type=IngestMetric)
.apply("serializer2", Serializer())
.apply("serializer2", Serializer(schema_type=MessageSchema.JSON))
.sink(
"kafkasink3",
stream_name="transformed-events-3",
Expand Down
10 changes: 5 additions & 5 deletions sentry_streams/sentry_streams/examples/multi_chain.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from sentry_streams.pipeline import Map, multi_chain, streaming_source
from sentry_streams.pipeline.chain import Parser, Serializer
from sentry_streams.pipeline.message import Message
from sentry_streams.pipeline.message import Message, MessageSchema


def do_something(msg: Message[IngestMetric]) -> Message[IngestMetric]:
Expand All @@ -16,20 +16,20 @@ def do_something(msg: Message[IngestMetric]) -> Message[IngestMetric]:
streaming_source("ingest", stream_name="ingest-metrics")
.apply("parse_msg", Parser(msg_type=IngestMetric))
.apply("process", Map(do_something))
.apply("serialize", Serializer())
.apply("serialize", Serializer(schema_type=MessageSchema.JSON))
.sink("eventstream", stream_name="events"),
# Snuba chain to Clickhouse
streaming_source("snuba", stream_name="ingest-metrics")
.apply("snuba_parse_msg", Parser(msg_type=IngestMetric))
.apply("snuba_serialize", Serializer())
.apply("snuba_serialize", Serializer(schema_type=MessageSchema.JSON))
.sink(
"clickhouse",
stream_name="someewhere",
),
# Super Big Consumer chain
streaming_source("sbc", stream_name="ingest-metrics")
.apply("sbc_parse_msg", Parser(msg_type=IngestMetric))
.apply("sbc_serialize", Serializer())
.apply("sbc_serialize", Serializer(schema_type=MessageSchema.JSON))
.sink(
"sbc_sink",
stream_name="someewhere",
Expand All @@ -38,7 +38,7 @@ def do_something(msg: Message[IngestMetric]) -> Message[IngestMetric]:
streaming_source("post_process", stream_name="ingest-metrics")
.apply("post_parse_msg", Parser(msg_type=IngestMetric))
.apply("postprocess", Map(do_something))
.apply("postprocess_serialize", Serializer())
.apply("postprocess_serialize", Serializer(schema_type=MessageSchema.JSON))
.sink(
"devnull",
stream_name="someewhereelse",
Expand Down
4 changes: 2 additions & 2 deletions sentry_streams/sentry_streams/examples/transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
Serializer,
)
from sentry_streams.pipeline.function_template import Accumulator
from sentry_streams.pipeline.message import Message
from sentry_streams.pipeline.message import Message, MessageSchema
from sentry_streams.pipeline.window import SlidingWindow

# The simplest possible pipeline.
Expand Down Expand Up @@ -59,7 +59,7 @@ def merge(self, other: Self) -> Self:

chain3 = chain2.apply(
"serializer",
Serializer(), # pass in the standard message serializer function
Serializer(schema_type=MessageSchema.JSON), # pass in the standard message serializer function
) # ExtensibleChain[bytes]

chain4 = chain3.sink(
Expand Down
7 changes: 5 additions & 2 deletions sentry_streams/sentry_streams/pipeline/chain.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from abc import ABC, abstractmethod
from dataclasses import dataclass
from functools import partial
from typing import (
Callable,
Generic,
Expand All @@ -23,7 +24,7 @@
InputType,
OutputType,
)
from sentry_streams.pipeline.message import Message
from sentry_streams.pipeline.message import Message, MessageSchema
from sentry_streams.pipeline.msg_parser import msg_parser, msg_serializer
from sentry_streams.pipeline.pipeline import (
Aggregate,
Expand Down Expand Up @@ -151,12 +152,14 @@ class Serializer(Applier[Message[TIn], bytes], Generic[TIn]):
sink step which writes to Kafka.
"""

schema_type: MessageSchema

def build_step(self, name: str, ctx: Pipeline, previous: Step) -> Step:
return MapStep(
name=name,
ctx=ctx,
inputs=[previous],
function=msg_serializer,
function=partial(msg_serializer, schema_type=self.schema_type),
)


Expand Down
6 changes: 6 additions & 0 deletions sentry_streams/sentry_streams/pipeline/message.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

from dataclasses import dataclass
from enum import Enum
from typing import (
Any,
Generic,
Expand All @@ -15,6 +16,11 @@
TIn = TypeVar("TIn") # TODO: Consider naming this TPayload


class MessageSchema(Enum):
PROTOBUF = "protobuf"
JSON = "json"


# A message with a generic payload
@dataclass(frozen=True)
class Message(Generic[TIn]):
Expand Down
14 changes: 11 additions & 3 deletions sentry_streams/sentry_streams/pipeline/msg_parser.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import json
from typing import Any

from sentry_streams.pipeline.message import Message
from google.protobuf.message import Message as ProtoMessage

from sentry_streams.pipeline.message import Message, MessageSchema

# TODO: Push the following to docs
# Standard message decoders and encoders live here
Expand All @@ -21,7 +23,13 @@ def msg_parser(msg: Message[bytes]) -> Any:
return decoded


def msg_serializer(msg: Message[Any]) -> bytes:
def msg_serializer(msg: Message[Any], schema_type: MessageSchema) -> bytes:
payload = msg.payload

return json.dumps(payload).encode("utf-8")
if schema_type is MessageSchema.PROTOBUF:
assert isinstance(payload, ProtoMessage)
return payload.SerializeToString()
elif schema_type is MessageSchema.JSON:
return json.dumps(payload).encode("utf-8")
else:
raise Exception(f"Unknown codec / message schema type {schema_type}")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can catch this with the type checker with assert_never

48 changes: 38 additions & 10 deletions sentry_streams/tests/adapters/arroyo/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from arroyo.types import Topic
from arroyo.utils.clock import MockedClock
from sentry_kafka_schemas.schema_types.ingest_metrics_v1 import IngestMetric
from sentry_protos.sentry.v1.taskworker_pb2 import TaskActivation

from sentry_streams.pipeline.chain import (
Filter,
Expand All @@ -19,13 +20,10 @@
streaming_source,
)
from sentry_streams.pipeline.function_template import Accumulator
from sentry_streams.pipeline.message import Message
from sentry_streams.pipeline.message import Message, MessageSchema
from sentry_streams.pipeline.pipeline import Pipeline
from sentry_streams.pipeline.window import SlidingWindow

# def decode(msg: bytes) -> str:
# return msg.decode("utf-8")


def basic_map(msg: Message[IngestMetric]) -> IngestMetric:
payload = msg.payload
Expand All @@ -42,6 +40,7 @@ def broker() -> LocalBroker[KafkaPayload]:
broker.create_topic(Topic("transformed-events"), 1)
broker.create_topic(Topic("transformed-events-2"), 1)
broker.create_topic(Topic("ingest-metrics"), 1)
broker.create_topic(Topic("taskworker-output"), 1)
return broker


Expand Down Expand Up @@ -112,7 +111,7 @@ def pipeline() -> Pipeline:
.apply("decoder", Parser(msg_type=IngestMetric))
.apply("myfilter", Filter(lambda msg: msg.payload["type"] == "s"))
.apply("mymap", Map(basic_map))
.apply("serializer", Serializer())
.apply("serializer", Serializer(schema_type=MessageSchema.JSON))
.sink("kafkasink", stream_name="transformed-events")
)

Expand All @@ -130,7 +129,7 @@ def reduce_pipeline(transformer: Callable[[], TestTransformerBatch]) -> Pipeline
.apply("decoder", Parser(msg_type=IngestMetric))
.apply("mymap", Map(basic_map))
.apply("myreduce", Reducer(reduce_window, transformer))
.apply("serializer", Serializer())
.apply("serializer", Serializer(schema_type=MessageSchema.JSON))
.sink("kafkasink", stream_name="transformed-events")
)

Expand All @@ -141,12 +140,12 @@ def reduce_pipeline(transformer: Callable[[], TestTransformerBatch]) -> Pipeline
def router_pipeline() -> Pipeline:
branch_1 = (
segment("set_branch", IngestMetric)
.apply("serializer", Serializer())
.apply("serializer", Serializer(schema_type=MessageSchema.JSON))
.sink("kafkasink1", stream_name="transformed-events")
)
branch_2 = (
segment("not_set_branch", IngestMetric)
.apply("serializer2", Serializer())
.apply("serializer2", Serializer(schema_type=MessageSchema.JSON))
.sink("kafkasink2", stream_name="transformed-events-2")
)

Expand Down Expand Up @@ -174,13 +173,13 @@ def broadcast_pipeline() -> Pipeline:
branch_1 = (
segment("even_branch", IngestMetric)
.apply("mymap1", Map(basic_map))
.apply("serializer", Serializer())
.apply("serializer", Serializer(schema_type=MessageSchema.JSON))
.sink("kafkasink1", stream_name="transformed-events")
)
branch_2 = (
segment("odd_branch", IngestMetric)
.apply("mymap2", Map(basic_map))
.apply("serializer2", Serializer())
.apply("serializer2", Serializer(schema_type=MessageSchema.JSON))
.sink("kafkasink2", stream_name="transformed-events-2")
)

Expand All @@ -200,3 +199,32 @@ def broadcast_pipeline() -> Pipeline:
)

return pipeline


@pytest.fixture
def basic_proto_pipeline() -> Pipeline:

pipeline = streaming_source(
name="myinput", stream_name="taskworker-ingest"
) # ExtensibleChain[Message[bytes]]

chain1 = pipeline.apply(
"parser",
Parser(
msg_type=TaskActivation,
), # pass in the standard message parser function
) # ExtensibleChain[Message[TaskActivation]]

chain2 = chain1.apply(
"serializer",
Serializer(
schema_type=MessageSchema.PROTOBUF
), # pass in the standard message serializer function
) # ExtensibleChain[bytes]

chain2.sink(
"kafkasink2",
stream_name="taskworker-output",
) # Chain

return pipeline
4 changes: 2 additions & 2 deletions sentry_streams/tests/adapters/arroyo/message_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,13 @@ def make_value_msg(


def make_kafka_msg(
payload: str,
payload: bytes,
topic: str,
offset: int,
) -> Message[Any]:
return Message(
BrokerValue(
payload=KafkaPayload(None, payload.encode("utf-8"), []),
payload=KafkaPayload(None, payload, []),
partition=Partition(Topic(topic), 0),
offset=offset,
timestamp=datetime.now(),
Expand Down
Loading