-
-
Notifications
You must be signed in to change notification settings - Fork 0
feat(sentry_integration): exec app in subprocess #217
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
d575941 to
b0e6a25
Compare
|
also it would be nice to have some sort of test for this. you can test using custom transports in the SDK |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current approach means if no customer sentry configured, pickling errors and pipeline build errors will silently fail. Nobody gets alerted. Should we mandate every customer app must have a Sentry?
No, we build a platform, we would not mandate how the clients manage their errors.
Taking a step back I think we should be notified even when the application code fails when starting up, it is still useful for us.
I think we should get back those errors and send them to our integration as well. We can tag them or lower the severity so we can filter them out, but we should have visibility on them no matter how the customer set up their integrations.
We do not want product to get infra errors in their integration, but I think we should at least have visibility on product errors whether or not the product sets up sentry
| assigned_segment_id = int(segment_id) if segment_id else None | ||
| pipeline: Pipeline[Any] = pipeline_globals["pipeline"] | ||
| runtime: Any = load_adapter(adapter, environment_config, assigned_segment_id, metric_config) | ||
| translator = RuntimeTranslator(runtime) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there is a use case we did not consider and is an issue.
The pipeline definition is coupled with the configuration file. If I rename a step in the pipeline definition and the config file references that, the config may become invalid. Though we would know that only at this step. That error would go to streaming.
Though if we changed that and sent config file mistakes to product, the system would be wrong anyway, as streaming owns the config file content.
Do we have a plan to address this issue ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did not address this issue yet. The configuration yaml file should have its own validation step, and it's not been built yet. In any case like you said it will be a problem if this step is the first place an invalid deployment config file is erroring out
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@fpacifici not sure what the course of action here is though, a mismatch between deployment config and pipeline settings could be either team's responsibility. if a team renames its steps and does not update the config file in advance then we should not have to react to that.
I think we should probably try to prevent this issue in CI if it becomes a big one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should probably try to prevent this issue in CI if it becomes a big one.
I think we do not haev a precise story for how to manage this use case in a CI/CD environment.
IF I change the application, the change may be breaking with respect to what the config says. The way this works today would require us to switch the configuraiton at the same time the new sha is deployed, which is not something we can really enforce now.
I think this should be discussed in a followup
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
@fpacifici added new feature:
|
|
@untitaker added tests using transports |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cursor Bugbot has reviewed your changes and found 8 potential issues.
Bugbot Autofix is OFF. To automatically fix reported issues with Cloud Agents, enable Autofix in the Cursor dashboard.
Semver Impact of This PR🟡 Minor (new features) 📋 Changelog PreviewThis is how your changes will appear in the changelog. New Features ✨
Other
🤖 This preview updates automatically when you update the PR. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cursor Bugbot has reviewed your changes and found 3 potential issues.
Bugbot Autofix is OFF. To automatically fix reported issues with Cloud Agents, enable Autofix in the Cursor dashboard.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cursor Bugbot has reviewed your changes and found 3 potential issues.
Bugbot Autofix is OFF. To automatically fix reported issues with Cloud Agents, enable Autofix in the Cursor dashboard.
113a6e0 to
8466979
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cursor Bugbot has reviewed your changes and found 2 potential issues.
Bugbot Autofix is OFF. To automatically fix reported issues with Cloud Agents, enable Autofix in the Cursor dashboard.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cursor Bugbot has reviewed your changes and found 2 potential issues.
Bugbot Autofix is OFF. To automatically fix reported issues with Cloud Agents, enable Autofix in the Cursor dashboard.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cursor Bugbot has reviewed your changes and found 2 potential issues.
Bugbot Autofix is OFF. To automatically fix reported issues with Cloud Agents, enable Autofix in the Cursor dashboard.
| config: str, | ||
| segment_id: Optional[str], | ||
| application: str, | ||
| environment_config: Mapping[str, Any], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rust caller uses incompatible old function signature
High Severity
The load_runtime function signature changed: the config parameter was removed, environment_config: Mapping[str, Any] was added, and parameter order shifted. However, the Rust code in run.rs still calls this function with the old positional arguments (name, log_level, adapter_name, config_file, segment_id, application_name). This causes a type mismatch where config_file (a string path) gets passed where segment_id is expected, and application_name (a string) gets passed where environment_config (a mapping) is expected, breaking the Rust integration entirely.
| # Note: Customer print() and logging statements (redirected to stderr) | ||
| # do not trigger platform Sentry alerts. | ||
| with multiprocessing.Pool(processes=1) as pool: | ||
| pipeline: Pipeline[Any] = pool.apply(_load_pipeline_in_process, (application,)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Subprocess pickle breaks pipelines containing lambdas
Medium Severity
The new multiprocessing.Pool.apply() approach requires the Pipeline object to be picklable when returning from the subprocess. However, Pipeline objects commonly contain Step instances with lambda functions (e.g., Map(function=lambda msg: ...)), and Python lambdas are not picklable. Existing examples like billing.py with aggregate_func=lambda: OutcomesBuffer() and tests using function=lambda msg: ... will fail with PicklingError at runtime.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
Bugbot Autofix is OFF. To automatically fix reported issues with Cloud Agents, enable Autofix in the Cursor dashboard.
| config: str, | ||
| segment_id: Optional[str], | ||
| application: str, | ||
| environment_config: Mapping[str, Any], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rust caller uses outdated load_runtime function signature
High Severity
The load_runtime function signature changed from (name, log_level, adapter, config, segment_id, application) to (name, log_level, adapter, segment_id, application, environment_config), but the Rust caller in run.rs was not updated. The Rust code passes config_file where segment_id is now expected, segment_id where application is expected, and application_name (a string) where environment_config (a Mapping) is expected. This will crash when environment_config.get("metrics", {}) is called because strings don't have a .get() method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cursor Bugbot has reviewed your changes and found 2 potential issues.
Bugbot Autofix is OFF. To automatically fix reported issues with Cloud Agents, enable Autofix in the Cursor dashboard.
| runtime_config.config_file, | ||
| runtime_config.segment_id, | ||
| runtime_config.application_name, | ||
| environment_config, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Config validation skipped when using Rust runner
Medium Severity
The jsonschema.validate() call for config validation was moved from load_runtime() to main(). However, the Rust runner (run.rs) calls load_runtime() directly, bypassing main(). This means config files are no longer validated against the JSON schema when using the Rust entry point, which is a regression from the previous behavior where validation happened inside load_runtime() regardless of entry point.
Additional Locations (1)
| runtime_config.config_file, | ||
| runtime_config.segment_id, | ||
| runtime_config.application_name, | ||
| environment_config, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sentry SDK not initialized when using Rust runner
Medium Severity
The Sentry SDK initialization code for platform observability was added only to the Python CLI's main() function. The Rust runner (run.rs) calls load_runtime() directly, bypassing main(), so sentry_sdk.init() is never called. This means platform errors occurring when using the Rust entry point won't be reported to Sentry, defeating the purpose of the Sentry integration feature described in the PR.
Additional Locations (1)
fpacifici
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please see the comments inline
| // Read and parse the config file as YAML to create environment_config | ||
| let yaml_module = py.import("yaml")?; | ||
| let config_path = runtime_config | ||
| .config_file | ||
| .to_str() | ||
| .ok_or_else(|| pyo3::exceptions::PyValueError::new_err("Invalid config file path"))?; | ||
|
|
||
| let config_file = std::fs::File::open(config_path).map_err(|e| { | ||
| pyo3::exceptions::PyIOError::new_err(format!("Failed to open config file: {}", e)) | ||
| })?; | ||
| let config_reader = std::io::BufReader::new(config_file); | ||
| let config_str = std::io::read_to_string(config_reader).map_err(|e| { | ||
| pyo3::exceptions::PyIOError::new_err(format!("Failed to read config file: {}", e)) | ||
| })?; | ||
|
|
||
| let environment_config = yaml_module.getattr("safe_load")?.call1((config_str,))?; | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please let's not duplicate this logic.
Every time we duplicate the logic between python and rust we make it harder to make changes as two places have to be updated.
If you are making this change because now the load_runtime method takes the config structure instead of the config file name. Please have two methods:
one takes the file name, reads it instantiates the sdk and then calls the other method passing the parsed config.
This rust function will call the first method.
More importantly, how do we initialize the platform sdk when we start the runner with this cli?
| step_streams[branch_name] = next_step_stream[branch_name] | ||
|
|
||
|
|
||
| def _load_pipeline_in_process(application: str) -> Pipeline[Any]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this method does not know it runs in a separate process. It can work without the separate process. Just call it _load_pipeline. Then you can provide the rationale in the docstring
| }, | ||
| "streaming_platform_dsn": { | ||
| "type": "string" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The DSN is not enough. At times we need to pass parameters to the sentry integraiotn.
Please make this an object and one of the fields will be the DSN. For now you do not have to add additional fields but we will certainly do. Turning a string into an object will be tricky as it will break existing configs. Adding a field to an object will be trivial.
| if streaming_platform_dsn: | ||
| sentry_sdk.init( | ||
| dsn=streaming_platform_dsn, | ||
| send_default_pii=True, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this True? Usually we should not send pii.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure I get what you are trying to validate with this test.
- The test says test_sentry_transport, but there is no assertion whether the mock Transport provided is ever used.
- Having a test for the load_runtime method is a good idea, we do not seem to have one, but, then I would assert the properties of the returned runtime using the Dummy runtime rather than mocking the graph construction methods which are the logic you are testing.
| class CaptureTransport(Transport): | ||
|
|
||
| def __init__(self, *args: Any, **kwargs: Any) -> None: | ||
| super().__init__(*args, **kwargs) | ||
| self.events: List[Any] = [] | ||
| self.envelopes: List[Any] = [] | ||
|
|
||
| def capture_event(self, event: Any) -> None: | ||
| self.events.append(event) | ||
| return None | ||
|
|
||
| def capture_envelope(self, envelope: Any) -> None: | ||
| self.envelopes.append(envelope) | ||
| return None | ||
|
|
||
| def flush(self, timeout: float, callback: Optional[Any] = None) -> None: | ||
| """Flush is called when SDK shuts down.""" | ||
| pass | ||
|
|
||
|
|
||
| @pytest.fixture | ||
| def temp_fixture_dir(tmp_path: Any) -> Any: | ||
| fixture_dir = tmp_path / "fixtures" | ||
| fixture_dir.mkdir() | ||
| return fixture_dir | ||
|
|
||
|
|
||
| @pytest.fixture(autouse=True) | ||
| def reset_metrics_backend() -> Generator[None, None, None]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These do not seem to be relevant for the test. No assertion is ran on any instance of these.
If you want to test load_runtime, then you can remove these and just test that load_Runtime returns the right runtime when the pipeline is good and that it returns an exception when it is not.
If you want to test the two separate sdk initializations then you will have to check that the Transport actually catches events when load fails.
| with ( | ||
| patch("sentry_streams.runner.load_adapter") as mock_load_adapter, | ||
| patch("sentry_streams.runner.iterate_edges") as mock_iterate_edges, | ||
| ): | ||
| mock_runtime = type( | ||
| "MockRuntime", | ||
| (), | ||
| { | ||
| "run": lambda self: None, | ||
| "source": lambda self, step: "mock_stream", | ||
| "complex_step_override": lambda self: {}, | ||
| }, | ||
| )() | ||
| mock_load_adapter.return_value = mock_runtime |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use the dummyAdapter rather than mocking everything
| app_file.write_text( | ||
| """ | ||
| from sentry_streams.pipeline import streaming_source | ||
| pipeline = streaming_source(name="test", stream_name="test-stream") | ||
| """ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please create a module containing the pipeline you want to test, do not generate a python file on the fly. There is no gain in doing so while there is the downside that ype checking will nto understand this string.
Summary
Changes
Subprocess-based Pipeline Loading (sentry_streams/runner.py)
note: