-
-
Notifications
You must be signed in to change notification settings - Fork 0
feat(sentry_integration): exec app in subprocess #217
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
896170b
60b4035
bd3f9f1
1de7afe
a0f034f
b091cd6
1058263
89b6507
7ccbdfc
a5bb343
e65a942
2c97503
490fada
f8a20a5
e2fed79
6133766
9a64a32
024c09e
a346fea
97c9720
6b8736e
29b794e
9e65bc6
7823864
8810710
b68fb44
1476cc8
071c87e
06f7e3a
5a8c1ec
8466979
7e20803
280fd9b
98c86f1
c440385
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,10 +1,13 @@ | ||
| import importlib | ||
| import json | ||
| import logging | ||
| from typing import Any, Optional, cast | ||
| import multiprocessing | ||
| import sys | ||
| from typing import Any, Mapping, Optional, cast | ||
|
|
||
| import click | ||
| import jsonschema | ||
| import sentry_sdk | ||
| import yaml | ||
|
|
||
| from sentry_streams.adapters.loader import load_adapter | ||
|
|
@@ -59,13 +62,36 @@ def iterate_edges( | |
| step_streams[branch_name] = next_step_stream[branch_name] | ||
|
|
||
|
|
||
| def _load_pipeline_in_process(application: str) -> Pipeline[Any]: | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: this method does not know it runs in a separate process. It can work without the separate process. Just call it |
||
| """ | ||
| Worker function that runs in a separate process to load the pipeline. | ||
| Returns the Pipeline object directly, or raises an exception on error. | ||
|
|
||
| Customer code exceptions are allowed to propagate naturally so that the customer's | ||
| Sentry SDK (if initialized) can capture them. | ||
| """ | ||
| import contextlib | ||
|
|
||
| pipeline_globals: dict[str, Any] = {} | ||
|
|
||
| with contextlib.redirect_stdout(sys.stderr): | ||
| with open(application, "r") as f: | ||
| exec(f.read(), pipeline_globals) | ||
|
|
||
| if "pipeline" not in pipeline_globals: | ||
| raise ValueError("Application file must define a 'pipeline' variable") | ||
|
|
||
| pipeline = cast(Pipeline[Any], pipeline_globals["pipeline"]) | ||
| return pipeline | ||
|
|
||
|
|
||
| def load_runtime( | ||
| name: str, | ||
| log_level: str, | ||
| adapter: str, | ||
| config: str, | ||
| segment_id: Optional[str], | ||
| application: str, | ||
| environment_config: Mapping[str, Any], | ||
victoria-yining-huang marked this conversation as resolved.
Show resolved
Hide resolved
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Rust caller uses incompatible old function signatureHigh Severity The There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Rust caller uses outdated
|
||
| ) -> Any: | ||
|
|
||
| logging.basicConfig( | ||
|
|
@@ -74,22 +100,13 @@ def load_runtime( | |
| datefmt="%Y-%m-%d %H:%M:%S", | ||
| ) | ||
|
|
||
| pipeline_globals: dict[str, Any] = {} | ||
|
|
||
| with open(application) as f: | ||
| exec(f.read(), pipeline_globals) | ||
|
|
||
| with open(config, "r") as config_file: | ||
| environment_config = yaml.safe_load(config_file) | ||
|
|
||
| config_template = importlib.resources.files("sentry_streams") / "config.json" | ||
| with config_template.open("r") as file: | ||
| schema = json.load(file) | ||
|
|
||
| try: | ||
| jsonschema.validate(environment_config, schema) | ||
| except Exception: | ||
| raise | ||
| # Execute the application in a subprocess to build the pipeline | ||
| # The subprocess will return the pipeline object or raise an exception | ||
| # Note: Customer print() and logging statements (redirected to stderr) | ||
| # do not trigger platform Sentry alerts. | ||
| with multiprocessing.Pool(processes=1) as pool: | ||
| pipeline: Pipeline[Any] = pool.apply(_load_pipeline_in_process, (application,)) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Subprocess pickle breaks pipelines containing lambdasMedium Severity The new |
||
| logger.info("Successfully loaded pipeline from subprocess") | ||
|
|
||
| metric_config = environment_config.get("metrics", {}) | ||
| if metric_config.get("type") == "datadog": | ||
|
|
@@ -113,7 +130,6 @@ def load_runtime( | |
| metric_config = {} | ||
|
|
||
| assigned_segment_id = int(segment_id) if segment_id else None | ||
| pipeline: Pipeline[Any] = pipeline_globals["pipeline"] | ||
| runtime: Any = load_adapter(adapter, environment_config, assigned_segment_id, metric_config) | ||
| translator = RuntimeTranslator(runtime) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think there is a use case we did not consider and is an issue. Though if we changed that and sent config file mistakes to product, the system would be wrong anyway, as streaming owns the config file content. Do we have a plan to address this issue ?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I did not address this issue yet. The configuration yaml file should have its own validation step, and it's not been built yet. In any case like you said it will be a problem if this step is the first place an invalid deployment config file is erroring out
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @fpacifici not sure what the course of action here is though, a mismatch between deployment config and pipeline settings could be either team's responsibility. if a team renames its steps and does not update the config file in advance then we should not have to react to that. I think we should probably try to prevent this issue in CI if it becomes a big one.
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I think we do not haev a precise story for how to manage this use case in a CI/CD environment. I think this should be discussed in a followup
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
|
|
||
|
|
@@ -177,7 +193,24 @@ def main( | |
| segment_id: Optional[str], | ||
| application: str, | ||
| ) -> None: | ||
| runtime = load_runtime(name, log_level, adapter, config, segment_id, application) | ||
| with open(config, "r") as config_file: | ||
| environment_config = yaml.safe_load(config_file) | ||
victoria-yining-huang marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| config_template = importlib.resources.files("sentry_streams") / "config.json" | ||
| with config_template.open("r") as file: | ||
| schema = json.load(file) | ||
|
|
||
| try: | ||
| jsonschema.validate(environment_config, schema) | ||
| except Exception: | ||
| raise | ||
|
|
||
| streaming_platform_dsn = environment_config.get("streaming_platform_dsn") | ||
victoria-yining-huang marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| if streaming_platform_dsn: | ||
| sentry_sdk.init( | ||
| dsn=streaming_platform_dsn, | ||
| send_default_pii=True, | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is this True? Usually we should not send pii. |
||
| ) | ||
| runtime = load_runtime(name, log_level, adapter, segment_id, application, environment_config) | ||
| runtime.run() | ||
|
|
||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -74,16 +74,33 @@ pub fn run(args: Args) -> Result<(), Box<dyn std::error::Error>> { | |
| })?; | ||
|
|
||
| let runtime: Py<PyAny> = traced_with_gil!(|py| { | ||
| // Read and parse the config file as YAML to create environment_config | ||
| let yaml_module = py.import("yaml")?; | ||
| let config_path = runtime_config | ||
| .config_file | ||
| .to_str() | ||
| .ok_or_else(|| pyo3::exceptions::PyValueError::new_err("Invalid config file path"))?; | ||
|
|
||
| let config_file = std::fs::File::open(config_path).map_err(|e| { | ||
| pyo3::exceptions::PyIOError::new_err(format!("Failed to open config file: {}", e)) | ||
| })?; | ||
| let config_reader = std::io::BufReader::new(config_file); | ||
| let config_str = std::io::read_to_string(config_reader).map_err(|e| { | ||
| pyo3::exceptions::PyIOError::new_err(format!("Failed to read config file: {}", e)) | ||
| })?; | ||
|
|
||
| let environment_config = yaml_module.getattr("safe_load")?.call1((config_str,))?; | ||
|
|
||
|
Comment on lines
+77
to
+93
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please let's not duplicate this logic. If you are making this change because now the More importantly, how do we initialize the platform sdk when we start the runner with this cli? |
||
| let runtime = py | ||
| .import("sentry_streams.runner")? | ||
| .getattr("load_runtime")? | ||
| .call1(( | ||
| runtime_config.name, | ||
| runtime_config.log_level, | ||
| runtime_config.adapter_name, | ||
| runtime_config.config_file, | ||
| runtime_config.segment_id, | ||
| runtime_config.application_name, | ||
| environment_config, | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Config validation skipped when using Rust runnerMedium Severity The Additional Locations (1)There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sentry SDK not initialized when using Rust runnerMedium Severity The Sentry SDK initialization code for platform observability was added only to the Python CLI's Additional Locations (1) |
||
| ))? | ||
| .unbind(); | ||
| PyResult::Ok(runtime) | ||
|
|
||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not sure I get what you are trying to validate with this test.
|
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,148 @@ | ||
| from typing import Any, Generator, List, Optional | ||
| from unittest.mock import patch | ||
|
|
||
| import pytest | ||
| import sentry_sdk | ||
| from sentry_sdk.transport import Transport | ||
|
|
||
| from sentry_streams.pipeline.pipeline import Pipeline | ||
| from sentry_streams.runner import load_runtime | ||
|
|
||
|
|
||
| class CaptureTransport(Transport): | ||
|
|
||
| def __init__(self, *args: Any, **kwargs: Any) -> None: | ||
| super().__init__(*args, **kwargs) | ||
| self.events: List[Any] = [] | ||
| self.envelopes: List[Any] = [] | ||
|
|
||
| def capture_event(self, event: Any) -> None: | ||
| self.events.append(event) | ||
| return None | ||
|
|
||
| def capture_envelope(self, envelope: Any) -> None: | ||
| self.envelopes.append(envelope) | ||
| return None | ||
|
|
||
| def flush(self, timeout: float, callback: Optional[Any] = None) -> None: | ||
| """Flush is called when SDK shuts down.""" | ||
| pass | ||
|
|
||
|
|
||
| @pytest.fixture | ||
| def temp_fixture_dir(tmp_path: Any) -> Any: | ||
| fixture_dir = tmp_path / "fixtures" | ||
| fixture_dir.mkdir() | ||
| return fixture_dir | ||
|
|
||
|
|
||
| @pytest.fixture(autouse=True) | ||
| def reset_metrics_backend() -> Generator[None, None, None]: | ||
|
Comment on lines
+12
to
+40
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These do not seem to be relevant for the test. No assertion is ran on any instance of these. If you want to test the two separate sdk initializations then you will have to check that the Transport actually catches events when load fails. |
||
| """Reset the global metrics backend between tests.""" | ||
| from sentry_streams import metrics | ||
|
|
||
| try: | ||
| from arroyo.utils import metrics as arroyo_metrics | ||
|
|
||
| has_arroyo = True | ||
| except ImportError: | ||
| has_arroyo = False | ||
|
|
||
| # Reset before each test | ||
| metrics.metrics._metrics_backend = None | ||
| if has_arroyo: | ||
| arroyo_metrics._metrics_backend = None | ||
|
|
||
| yield | ||
|
|
||
| # Reset to None after each test | ||
| metrics.metrics._metrics_backend = None | ||
| if has_arroyo: | ||
| arroyo_metrics._metrics_backend = None | ||
|
|
||
|
|
||
| @pytest.fixture | ||
| def platform_transport() -> CaptureTransport: | ||
| transport = CaptureTransport() | ||
| # Clear any existing Sentry client | ||
| sentry_sdk.get_client().close() | ||
| return transport | ||
|
|
||
|
|
||
| def test_multiprocess_pipe_communication_success( | ||
| platform_transport: CaptureTransport, temp_fixture_dir: Any | ||
| ) -> None: | ||
| sentry_sdk.init( | ||
| dsn="https://platform@example.com/456", | ||
| transport=platform_transport, | ||
| ) | ||
|
|
||
| app_file = temp_fixture_dir / "simple_app.py" | ||
| app_file.write_text( | ||
| """ | ||
| from sentry_streams.pipeline import streaming_source | ||
| pipeline = streaming_source(name="test", stream_name="test-stream") | ||
| """ | ||
|
Comment on lines
+81
to
+85
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please create a module containing the pipeline you want to test, do not generate a python file on the fly. There is no gain in doing so while there is the downside that ype checking will nto understand this string. |
||
| ) | ||
|
|
||
| with ( | ||
| patch("sentry_streams.runner.load_adapter") as mock_load_adapter, | ||
| patch("sentry_streams.runner.iterate_edges") as mock_iterate_edges, | ||
| ): | ||
| mock_runtime = type( | ||
| "MockRuntime", | ||
| (), | ||
| { | ||
| "run": lambda self: None, | ||
| "source": lambda self, step: "mock_stream", | ||
| "complex_step_override": lambda self: {}, | ||
| }, | ||
| )() | ||
| mock_load_adapter.return_value = mock_runtime | ||
|
Comment on lines
+88
to
+101
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please use the dummyAdapter rather than mocking everything |
||
|
|
||
| runtime = load_runtime( | ||
| name="test", | ||
| log_level="INFO", | ||
| adapter="arroyo", | ||
| segment_id=None, | ||
| application=str(app_file), | ||
| environment_config={"metrics": {"type": "dummy"}}, | ||
| ) | ||
|
|
||
| assert runtime is not None | ||
|
|
||
| mock_iterate_edges.assert_called_once() | ||
| pipeline_arg = mock_iterate_edges.call_args[0][0] # First positional argument | ||
| assert isinstance(pipeline_arg, Pipeline) | ||
|
|
||
|
|
||
| def test_subprocess_sends_error_status_with_details( | ||
| platform_transport: CaptureTransport, temp_fixture_dir: Any | ||
| ) -> None: | ||
| """Test that detailed error messages are captured when subprocess sends status='error'.""" | ||
| sentry_sdk.init( | ||
| dsn="https://platform@example.com/456", | ||
| transport=platform_transport, | ||
| ) | ||
|
|
||
| # Create an app file that doesn't define 'pipeline' variable | ||
| app_file = temp_fixture_dir / "missing_pipeline.py" | ||
| app_file.write_text( | ||
| """ | ||
| from sentry_streams.pipeline import streaming_source | ||
| # Intentionally not defining 'pipeline' variable | ||
| my_pipeline = streaming_source(name="test", stream_name="test-stream") | ||
| """ | ||
| ) | ||
|
|
||
| with pytest.raises(ValueError) as exc_info: | ||
| load_runtime( | ||
| name="test", | ||
| log_level="INFO", | ||
| adapter="arroyo", | ||
| segment_id=None, | ||
| application=str(app_file), | ||
| environment_config={"metrics": {"type": "dummy"}}, | ||
| ) | ||
|
|
||
| assert "Application file must define a 'pipeline' variable" in str(exc_info.value) | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The DSN is not enough. At times we need to pass parameters to the sentry integraiotn.
Please make this an object and one of the fields will be the DSN. For now you do not have to add additional fields but we will certainly do. Turning a string into an object will be tricky as it will break existing configs. Adding a field to an object will be trivial.