Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
896170b
i think i added python sentry
victoria-yining-huang Dec 9, 2025
60b4035
add
victoria-yining-huang Dec 16, 2025
bd3f9f1
add test_sdk
victoria-yining-huang Dec 16, 2025
1de7afe
add progress
victoria-yining-huang Dec 17, 2025
a0f034f
worked jan 7
victoria-yining-huang Jan 7, 2026
b091cd6
make customer logs pass through
victoria-yining-huang Jan 9, 2026
1058263
exit main process if sub fails
victoria-yining-huang Jan 9, 2026
89b6507
remove test script
victoria-yining-huang Jan 9, 2026
7ccbdfc
remove return code check
victoria-yining-huang Jan 9, 2026
a5bb343
move sentry sdk into main
victoria-yining-huang Jan 9, 2026
e65a942
change env var
victoria-yining-huang Jan 9, 2026
2c97503
add tests
victoria-yining-huang Jan 12, 2026
490fada
lol did not add tests now added
victoria-yining-huang Jan 12, 2026
f8a20a5
add test on customer print
victoria-yining-huang Jan 12, 2026
e2fed79
rename file
victoria-yining-huang Jan 12, 2026
6133766
make dsn come from yaml
victoria-yining-huang Jan 21, 2026
9a64a32
only read yaml once
victoria-yining-huang Jan 21, 2026
024c09e
use multiprocessing module
victoria-yining-huang Jan 22, 2026
a346fea
remove module
victoria-yining-huang Jan 22, 2026
97c9720
remove redundant raises
victoria-yining-huang Jan 22, 2026
6b8736e
remove third party multiprocessing
victoria-yining-huang Jan 23, 2026
29b794e
all tests verified and pass
victoria-yining-huang Jan 23, 2026
9e65bc6
just one conn'
victoria-yining-huang Jan 23, 2026
7823864
remove lib
victoria-yining-huang Jan 23, 2026
8810710
reset metrics backend in tests
victoria-yining-huang Jan 23, 2026
b68fb44
that child close was not needed
victoria-yining-huang Jan 23, 2026
1476cc8
typing annotations
victoria-yining-huang Jan 23, 2026
071c87e
remove daed code
victoria-yining-huang Jan 23, 2026
06f7e3a
add more detailed tests
victoria-yining-huang Jan 23, 2026
5a8c1ec
refine error paths
victoria-yining-huang Jan 26, 2026
8466979
fix return flow
victoria-yining-huang Jan 26, 2026
7e20803
remove code by mistake
victoria-yining-huang Jan 26, 2026
280fd9b
use multiprocessing pool
victoria-yining-huang Jan 26, 2026
98c86f1
typecheck
victoria-yining-huang Jan 26, 2026
c440385
match rust func signatures
victoria-yining-huang Jan 27, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions sentry_streams/sentry_streams/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
},
"metrics": {
"$ref": "#/definitions/Metrics"
},
"streaming_platform_dsn": {
"type": "string"
Comment on lines +16 to +18
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The DSN is not enough. At times we need to pass parameters to the sentry integraiotn.
Please make this an object and one of the fields will be the DSN. For now you do not have to add additional fields but we will certainly do. Turning a string into an object will be tricky as it will break existing configs. Adding a field to an object will be trivial.

}
}
},
Expand Down
73 changes: 53 additions & 20 deletions sentry_streams/sentry_streams/runner.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import importlib
import json
import logging
from typing import Any, Optional, cast
import multiprocessing
import sys
from typing import Any, Mapping, Optional, cast

import click
import jsonschema
import sentry_sdk
import yaml

from sentry_streams.adapters.loader import load_adapter
Expand Down Expand Up @@ -59,13 +62,36 @@ def iterate_edges(
step_streams[branch_name] = next_step_stream[branch_name]


def _load_pipeline_in_process(application: str) -> Pipeline[Any]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this method does not know it runs in a separate process. It can work without the separate process. Just call it _load_pipeline. Then you can provide the rationale in the docstring

"""
Worker function that runs in a separate process to load the pipeline.
Returns the Pipeline object directly, or raises an exception on error.

Customer code exceptions are allowed to propagate naturally so that the customer's
Sentry SDK (if initialized) can capture them.
"""
import contextlib

pipeline_globals: dict[str, Any] = {}

with contextlib.redirect_stdout(sys.stderr):
with open(application, "r") as f:
exec(f.read(), pipeline_globals)

if "pipeline" not in pipeline_globals:
raise ValueError("Application file must define a 'pipeline' variable")

pipeline = cast(Pipeline[Any], pipeline_globals["pipeline"])
return pipeline


def load_runtime(
name: str,
log_level: str,
adapter: str,
config: str,
segment_id: Optional[str],
application: str,
environment_config: Mapping[str, Any],
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rust caller uses incompatible old function signature

High Severity

The load_runtime function signature changed: the config parameter was removed, environment_config: Mapping[str, Any] was added, and parameter order shifted. However, the Rust code in run.rs still calls this function with the old positional arguments (name, log_level, adapter_name, config_file, segment_id, application_name). This causes a type mismatch where config_file (a string path) gets passed where segment_id is expected, and application_name (a string) gets passed where environment_config (a mapping) is expected, breaking the Rust integration entirely.

Fix in Cursor Fix in Web

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rust caller uses outdated load_runtime function signature

High Severity

The load_runtime function signature changed from (name, log_level, adapter, config, segment_id, application) to (name, log_level, adapter, segment_id, application, environment_config), but the Rust caller in run.rs was not updated. The Rust code passes config_file where segment_id is now expected, segment_id where application is expected, and application_name (a string) where environment_config (a Mapping) is expected. This will crash when environment_config.get("metrics", {}) is called because strings don't have a .get() method.

Fix in Cursor Fix in Web

) -> Any:

logging.basicConfig(
Expand All @@ -74,22 +100,13 @@ def load_runtime(
datefmt="%Y-%m-%d %H:%M:%S",
)

pipeline_globals: dict[str, Any] = {}

with open(application) as f:
exec(f.read(), pipeline_globals)

with open(config, "r") as config_file:
environment_config = yaml.safe_load(config_file)

config_template = importlib.resources.files("sentry_streams") / "config.json"
with config_template.open("r") as file:
schema = json.load(file)

try:
jsonschema.validate(environment_config, schema)
except Exception:
raise
# Execute the application in a subprocess to build the pipeline
# The subprocess will return the pipeline object or raise an exception
# Note: Customer print() and logging statements (redirected to stderr)
# do not trigger platform Sentry alerts.
with multiprocessing.Pool(processes=1) as pool:
pipeline: Pipeline[Any] = pool.apply(_load_pipeline_in_process, (application,))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Subprocess pickle breaks pipelines containing lambdas

Medium Severity

The new multiprocessing.Pool.apply() approach requires the Pipeline object to be picklable when returning from the subprocess. However, Pipeline objects commonly contain Step instances with lambda functions (e.g., Map(function=lambda msg: ...)), and Python lambdas are not picklable. Existing examples like billing.py with aggregate_func=lambda: OutcomesBuffer() and tests using function=lambda msg: ... will fail with PicklingError at runtime.

Fix in Cursor Fix in Web

logger.info("Successfully loaded pipeline from subprocess")

metric_config = environment_config.get("metrics", {})
if metric_config.get("type") == "datadog":
Expand All @@ -113,7 +130,6 @@ def load_runtime(
metric_config = {}

assigned_segment_id = int(segment_id) if segment_id else None
pipeline: Pipeline[Any] = pipeline_globals["pipeline"]
runtime: Any = load_adapter(adapter, environment_config, assigned_segment_id, metric_config)
translator = RuntimeTranslator(runtime)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is a use case we did not consider and is an issue.
The pipeline definition is coupled with the configuration file. If I rename a step in the pipeline definition and the config file references that, the config may become invalid. Though we would know that only at this step. That error would go to streaming.

Though if we changed that and sent config file mistakes to product, the system would be wrong anyway, as streaming owns the config file content.

Do we have a plan to address this issue ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not address this issue yet. The configuration yaml file should have its own validation step, and it's not been built yet. In any case like you said it will be a problem if this step is the first place an invalid deployment config file is erroring out

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fpacifici not sure what the course of action here is though, a mismatch between deployment config and pipeline settings could be either team's responsibility. if a team renames its steps and does not update the config file in advance then we should not have to react to that.

I think we should probably try to prevent this issue in CI if it becomes a big one.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should probably try to prevent this issue in CI if it becomes a big one.

I think we do not haev a precise story for how to manage this use case in a CI/CD environment.
IF I change the application, the change may be breaking with respect to what the config says. The way this works today would require us to switch the configuraiton at the same time the new sha is deployed, which is not something we can really enforce now.

I think this should be discussed in a followup

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


Expand Down Expand Up @@ -177,7 +193,24 @@ def main(
segment_id: Optional[str],
application: str,
) -> None:
runtime = load_runtime(name, log_level, adapter, config, segment_id, application)
with open(config, "r") as config_file:
environment_config = yaml.safe_load(config_file)
config_template = importlib.resources.files("sentry_streams") / "config.json"
with config_template.open("r") as file:
schema = json.load(file)

try:
jsonschema.validate(environment_config, schema)
except Exception:
raise

streaming_platform_dsn = environment_config.get("streaming_platform_dsn")
if streaming_platform_dsn:
sentry_sdk.init(
dsn=streaming_platform_dsn,
send_default_pii=True,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this True? Usually we should not send pii.

)
runtime = load_runtime(name, log_level, adapter, segment_id, application, environment_config)
runtime.run()


Expand Down
19 changes: 18 additions & 1 deletion sentry_streams/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,16 +74,33 @@ pub fn run(args: Args) -> Result<(), Box<dyn std::error::Error>> {
})?;

let runtime: Py<PyAny> = traced_with_gil!(|py| {
// Read and parse the config file as YAML to create environment_config
let yaml_module = py.import("yaml")?;
let config_path = runtime_config
.config_file
.to_str()
.ok_or_else(|| pyo3::exceptions::PyValueError::new_err("Invalid config file path"))?;

let config_file = std::fs::File::open(config_path).map_err(|e| {
pyo3::exceptions::PyIOError::new_err(format!("Failed to open config file: {}", e))
})?;
let config_reader = std::io::BufReader::new(config_file);
let config_str = std::io::read_to_string(config_reader).map_err(|e| {
pyo3::exceptions::PyIOError::new_err(format!("Failed to read config file: {}", e))
})?;

let environment_config = yaml_module.getattr("safe_load")?.call1((config_str,))?;

Comment on lines +77 to +93
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please let's not duplicate this logic.
Every time we duplicate the logic between python and rust we make it harder to make changes as two places have to be updated.

If you are making this change because now the load_runtime method takes the config structure instead of the config file name. Please have two methods:
one takes the file name, reads it instantiates the sdk and then calls the other method passing the parsed config.
This rust function will call the first method.

More importantly, how do we initialize the platform sdk when we start the runner with this cli?

let runtime = py
.import("sentry_streams.runner")?
.getattr("load_runtime")?
.call1((
runtime_config.name,
runtime_config.log_level,
runtime_config.adapter_name,
runtime_config.config_file,
runtime_config.segment_id,
runtime_config.application_name,
environment_config,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Config validation skipped when using Rust runner

Medium Severity

The jsonschema.validate() call for config validation was moved from load_runtime() to main(). However, the Rust runner (run.rs) calls load_runtime() directly, bypassing main(). This means config files are no longer validated against the JSON schema when using the Rust entry point, which is a regression from the previous behavior where validation happened inside load_runtime() regardless of entry point.

Additional Locations (1)

Fix in Cursor Fix in Web

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sentry SDK not initialized when using Rust runner

Medium Severity

The Sentry SDK initialization code for platform observability was added only to the Python CLI's main() function. The Rust runner (run.rs) calls load_runtime() directly, bypassing main(), so sentry_sdk.init() is never called. This means platform errors occurring when using the Rust entry point won't be reported to Sentry, defeating the purpose of the Sentry integration feature described in the PR.

Additional Locations (1)

Fix in Cursor Fix in Web

))?
.unbind();
PyResult::Ok(runtime)
Expand Down
148 changes: 148 additions & 0 deletions sentry_streams/tests/test_sentry_transports.py
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure I get what you are trying to validate with this test.

  • The test says test_sentry_transport, but there is no assertion whether the mock Transport provided is ever used.
  • Having a test for the load_runtime method is a good idea, we do not seem to have one, but, then I would assert the properties of the returned runtime using the Dummy runtime rather than mocking the graph construction methods which are the logic you are testing.

Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
from typing import Any, Generator, List, Optional
from unittest.mock import patch

import pytest
import sentry_sdk
from sentry_sdk.transport import Transport

from sentry_streams.pipeline.pipeline import Pipeline
from sentry_streams.runner import load_runtime


class CaptureTransport(Transport):

def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.events: List[Any] = []
self.envelopes: List[Any] = []

def capture_event(self, event: Any) -> None:
self.events.append(event)
return None

def capture_envelope(self, envelope: Any) -> None:
self.envelopes.append(envelope)
return None

def flush(self, timeout: float, callback: Optional[Any] = None) -> None:
"""Flush is called when SDK shuts down."""
pass


@pytest.fixture
def temp_fixture_dir(tmp_path: Any) -> Any:
fixture_dir = tmp_path / "fixtures"
fixture_dir.mkdir()
return fixture_dir


@pytest.fixture(autouse=True)
def reset_metrics_backend() -> Generator[None, None, None]:
Comment on lines +12 to +40
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These do not seem to be relevant for the test. No assertion is ran on any instance of these.
If you want to test load_runtime, then you can remove these and just test that load_Runtime returns the right runtime when the pipeline is good and that it returns an exception when it is not.

If you want to test the two separate sdk initializations then you will have to check that the Transport actually catches events when load fails.

"""Reset the global metrics backend between tests."""
from sentry_streams import metrics

try:
from arroyo.utils import metrics as arroyo_metrics

has_arroyo = True
except ImportError:
has_arroyo = False

# Reset before each test
metrics.metrics._metrics_backend = None
if has_arroyo:
arroyo_metrics._metrics_backend = None

yield

# Reset to None after each test
metrics.metrics._metrics_backend = None
if has_arroyo:
arroyo_metrics._metrics_backend = None


@pytest.fixture
def platform_transport() -> CaptureTransport:
transport = CaptureTransport()
# Clear any existing Sentry client
sentry_sdk.get_client().close()
return transport


def test_multiprocess_pipe_communication_success(
platform_transport: CaptureTransport, temp_fixture_dir: Any
) -> None:
sentry_sdk.init(
dsn="https://platform@example.com/456",
transport=platform_transport,
)

app_file = temp_fixture_dir / "simple_app.py"
app_file.write_text(
"""
from sentry_streams.pipeline import streaming_source
pipeline = streaming_source(name="test", stream_name="test-stream")
"""
Comment on lines +81 to +85
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please create a module containing the pipeline you want to test, do not generate a python file on the fly. There is no gain in doing so while there is the downside that ype checking will nto understand this string.

)

with (
patch("sentry_streams.runner.load_adapter") as mock_load_adapter,
patch("sentry_streams.runner.iterate_edges") as mock_iterate_edges,
):
mock_runtime = type(
"MockRuntime",
(),
{
"run": lambda self: None,
"source": lambda self, step: "mock_stream",
"complex_step_override": lambda self: {},
},
)()
mock_load_adapter.return_value = mock_runtime
Comment on lines +88 to +101
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use the dummyAdapter rather than mocking everything


runtime = load_runtime(
name="test",
log_level="INFO",
adapter="arroyo",
segment_id=None,
application=str(app_file),
environment_config={"metrics": {"type": "dummy"}},
)

assert runtime is not None

mock_iterate_edges.assert_called_once()
pipeline_arg = mock_iterate_edges.call_args[0][0] # First positional argument
assert isinstance(pipeline_arg, Pipeline)


def test_subprocess_sends_error_status_with_details(
platform_transport: CaptureTransport, temp_fixture_dir: Any
) -> None:
"""Test that detailed error messages are captured when subprocess sends status='error'."""
sentry_sdk.init(
dsn="https://platform@example.com/456",
transport=platform_transport,
)

# Create an app file that doesn't define 'pipeline' variable
app_file = temp_fixture_dir / "missing_pipeline.py"
app_file.write_text(
"""
from sentry_streams.pipeline import streaming_source
# Intentionally not defining 'pipeline' variable
my_pipeline = streaming_source(name="test", stream_name="test-stream")
"""
)

with pytest.raises(ValueError) as exc_info:
load_runtime(
name="test",
log_level="INFO",
adapter="arroyo",
segment_id=None,
application=str(app_file),
environment_config={"metrics": {"type": "dummy"}},
)

assert "Application file must define a 'pipeline' variable" in str(exc_info.value)