-
Notifications
You must be signed in to change notification settings - Fork 117
feat(eap): Implement sequence number offsets for eap items #6014
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
5d2900b
0598f01
f33fcc8
6f13a00
3bbd9d1
df0096e
e12c9ad
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,18 +1,24 @@ | ||
| //! Time normalization for EAP items. | ||
|
|
||
| use chrono::{DateTime, Utc}; | ||
| use relay_conventions::attributes::SENTRY__TIMESTAMP__SEQUENCE; | ||
| use relay_event_schema::{ | ||
| processor::{self, ProcessValue, ProcessingState}, | ||
| protocol::{OurLog, SpanV2, Timestamp, TraceMetric}, | ||
| protocol::{Attributes, OurLog, SpanV2, Timestamp, TraceMetric}, | ||
| }; | ||
| use relay_protocol::{Annotated, ErrorKind}; | ||
| use relay_protocol::{Annotated, ErrorKind, Remark, RemarkType}; | ||
| use std::time::Duration; | ||
|
|
||
| use crate::ClockDriftProcessor; | ||
|
|
||
| /// Configuration parameters for [`normalize`]. | ||
| #[derive(Debug, Default, Clone, Copy)] | ||
| pub struct Config { | ||
| /// Apply a time sequence shift as provided by the SDK on the timestamp. | ||
| /// | ||
| /// This must only run once, to not shift the same timestamp multiple times and therefore should | ||
| /// be limited to processing Relays. | ||
| pub apply_sequence_shift: bool, | ||
| /// Timestamp when the item was received. | ||
| pub received_at: DateTime<Utc>, | ||
| /// Client local timestamp when the SDK sent the item. | ||
|
|
@@ -41,7 +47,8 @@ where | |
| let timestamp = item | ||
| .value_mut() | ||
| .as_mut() | ||
| .and_then(|t| t.reference_timestamp_mut().value().copied()); | ||
| .map(|t| t.reference_timestamp_mut()) | ||
| .and_then(|ts| ts.value().copied()); | ||
|
|
||
| if let Some(timestamp) = timestamp { | ||
| if config | ||
|
|
@@ -69,6 +76,28 @@ where | |
| processor.apply_correction_meta(item.reference_timestamp_mut().meta_mut()); | ||
| } | ||
| } | ||
|
|
||
| let sequence = item | ||
| .value() | ||
| .and_then(|t| t.timestamp_sequence()) | ||
| .filter(|d| *d > 0); | ||
|
|
||
| let timestamp = item | ||
| .value_mut() | ||
| .as_mut() | ||
| .map(|t| t.reference_timestamp_mut()); | ||
|
|
||
| if config.apply_sequence_shift | ||
| && let Some(sequence) = sequence | ||
| && let Some(ts) = timestamp | ||
| && let Some(ts_value) = ts.value_mut() | ||
| { | ||
| // Always unconditionally apply the time-shift, this puts us potentially slightly over `max_in_future`, | ||
| // by up to ~5s, but this is preferable over losing the ordering. | ||
| ts_value.0 += chrono::TimeDelta::nanoseconds(sequence.into()); | ||
| ts.meta_mut() | ||
| .add_remark(Remark::new(RemarkType::Substituted, "timestamp.sequence")); | ||
| } | ||
| } | ||
|
|
||
| /// Items which can be processed by [`normalize`]. | ||
|
|
@@ -77,32 +106,64 @@ pub trait TimeNormalize: ProcessValue { | |
| /// | ||
| /// Represents the timestamp when the item was created. | ||
| fn reference_timestamp_mut(&mut self) -> &mut Annotated<Timestamp>; | ||
|
|
||
| /// A tie breaker sent from SDKs for timestamps. | ||
| /// | ||
| /// This is usually stored in [`SENTRY__TIMESTAMP__SEQUENCE`] and applied as additional | ||
| /// nanoseconds to the timestamp. | ||
| fn timestamp_sequence(&self) -> Option<u32>; | ||
| } | ||
|
|
||
| impl TimeNormalize for OurLog { | ||
| fn reference_timestamp_mut(&mut self) -> &mut Annotated<Timestamp> { | ||
| &mut self.timestamp | ||
| } | ||
|
|
||
| fn timestamp_sequence(&self) -> Option<u32> { | ||
| get_timestamp_sequence(&self.attributes) | ||
| } | ||
| } | ||
|
|
||
| impl TimeNormalize for SpanV2 { | ||
| fn reference_timestamp_mut(&mut self) -> &mut Annotated<Timestamp> { | ||
| &mut self.start_timestamp | ||
| } | ||
|
|
||
| fn timestamp_sequence(&self) -> Option<u32> { | ||
| // Not supported for spans. | ||
| // | ||
| // If this ever becomes necessary to add, extra care must be taken to not create invalid | ||
| // spans where the start timestamp is moved after the end timestamp. | ||
| None | ||
|
Comment on lines
+133
to
+137
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we potentially resolve this for spans (if it ever becomes necessary, not now) by having
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Something like that was my idea, I was though thinking instead of applying it separately, there is maybe a way where we can adjust the clockdrift processor or a separate processor which applies the offset to all timestamps equally. |
||
| } | ||
| } | ||
|
|
||
| impl TimeNormalize for TraceMetric { | ||
| fn reference_timestamp_mut(&mut self) -> &mut Annotated<Timestamp> { | ||
| &mut self.timestamp | ||
| } | ||
|
|
||
| fn timestamp_sequence(&self) -> Option<u32> { | ||
| get_timestamp_sequence(&self.attributes) | ||
| } | ||
| } | ||
|
|
||
| fn get_timestamp_sequence(attributes: &Annotated<Attributes>) -> Option<u32> { | ||
| attributes | ||
| .value() | ||
| .and_then(|attrs| attrs.get_value(SENTRY__TIMESTAMP__SEQUENCE)) | ||
| .and_then(|v| v.as_f64()) | ||
| .map(|v| v as _) | ||
|
cursor[bot] marked this conversation as resolved.
|
||
| } | ||
|
|
||
| #[cfg(test)] | ||
| mod tests { | ||
| use super::*; | ||
|
|
||
| use relay_event_schema::processor::ProcessValue; | ||
| use relay_protocol::{Annotated, Empty, FromValue, IntoValue, assert_annotated_snapshot}; | ||
| use relay_protocol::{ | ||
| Annotated, Empty, FromValue, IntoValue, assert_annotated_snapshot, get_value, | ||
| }; | ||
|
|
||
| #[derive(Debug, Clone, FromValue, IntoValue, Empty, ProcessValue)] | ||
| struct TestItem { | ||
|
|
@@ -114,6 +175,10 @@ mod tests { | |
| fn reference_timestamp_mut(&mut self) -> &mut Annotated<Timestamp> { | ||
| &mut self.base | ||
| } | ||
|
|
||
| fn timestamp_sequence(&self) -> Option<u32> { | ||
| Some(123) | ||
| } | ||
| } | ||
|
|
||
| fn ts(secs: i64) -> Timestamp { | ||
|
|
@@ -255,4 +320,95 @@ mod tests { | |
| } | ||
| "#); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_normalize_time_sequence_shift() { | ||
| let mut item = Annotated::new(TestItem { | ||
| base: ts(90_000).into(), | ||
| other: ts(80_000).into(), | ||
| }); | ||
|
|
||
| let config = Config { | ||
| apply_sequence_shift: true, | ||
| ..Default::default() | ||
| }; | ||
|
|
||
| normalize(&mut item, config); | ||
|
|
||
| insta::assert_json_snapshot!(IntoValue::extract_meta_tree(&item), @r#" | ||
| { | ||
| "base": { | ||
| "": { | ||
| "rem": [ | ||
| [ | ||
| "timestamp.sequence", | ||
| "s" | ||
| ] | ||
| ] | ||
| } | ||
| } | ||
| } | ||
| "#); | ||
|
|
||
| // Need to assert the raw values instead of a snapshot because the serialization format of | ||
| // `Timestamp` is not precise enough for nanosecond precision. | ||
| assert_eq!( | ||
| get_value!(item.base!).0, | ||
| DateTime::from_timestamp_secs(90_000).unwrap() + chrono::TimeDelta::nanoseconds(123) | ||
| ); | ||
| assert_eq!( | ||
| get_value!(item.other!).0, | ||
| DateTime::from_timestamp_secs(80_000).unwrap() | ||
| ); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_normalize_time_sequence_shift_and_correction() { | ||
| let mut item = Annotated::new(TestItem { | ||
| base: ts(90_000).into(), | ||
| other: ts(80_000).into(), | ||
| }); | ||
|
|
||
| let config = Config { | ||
| apply_sequence_shift: true, | ||
| received_at: ts(10_000).0, | ||
| max_in_future: Some(Duration::from_secs(10)), | ||
| ..Default::default() | ||
| }; | ||
|
|
||
| normalize(&mut item, config); | ||
|
|
||
| insta::assert_json_snapshot!(IntoValue::extract_meta_tree(&item), @r#" | ||
| { | ||
| "base": { | ||
| "": { | ||
| "rem": [ | ||
| [ | ||
| "timestamp.sequence", | ||
| "s" | ||
| ] | ||
| ], | ||
| "err": [ | ||
| [ | ||
| "future_timestamp", | ||
| { | ||
| "sdk_time": "1970-01-02T01:00:00+00:00", | ||
| "server_time": "1970-01-01T02:46:40+00:00" | ||
| } | ||
| ] | ||
| ] | ||
| } | ||
| } | ||
| } | ||
| "#); | ||
|
|
||
| assert_eq!( | ||
| get_value!(item.base!).0, | ||
| DateTime::from_timestamp_secs(10_000).unwrap() + chrono::TimeDelta::nanoseconds(123) | ||
| ); | ||
| assert_eq!( | ||
| get_value!(item.other!).0, | ||
| DateTime::from_timestamp_secs(0).unwrap() | ||
| ); | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.