Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
**Features**:

- Convert measurements to attributes based on information from `sentry-conventions`. This is gated behind a project feature flag. ([#6007](https://github.com/getsentry/relay/pull/6007))
- Implements timestamp shifts based on a sequence number provided by SDKs. ([#6014](https://github.com/getsentry/relay/pull/6014))

**Bug Fixes**:

Expand Down
164 changes: 160 additions & 4 deletions relay-event-normalization/src/eap/time.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,24 @@
//! Time normalization for EAP items.

use chrono::{DateTime, Utc};
use relay_conventions::attributes::SENTRY__TIMESTAMP__SEQUENCE;
use relay_event_schema::{
processor::{self, ProcessValue, ProcessingState},
protocol::{OurLog, SpanV2, Timestamp, TraceMetric},
protocol::{Attributes, OurLog, SpanV2, Timestamp, TraceMetric},
};
use relay_protocol::{Annotated, ErrorKind};
use relay_protocol::{Annotated, ErrorKind, Remark, RemarkType};
use std::time::Duration;

use crate::ClockDriftProcessor;

/// Configuration parameters for [`normalize`].
#[derive(Debug, Default, Clone, Copy)]
pub struct Config {
/// Apply a time sequence shift as provided by the SDK on the timestamp.
///
/// This must only run once, to not shift the same timestamp multiple times and therefore should
/// be limited to processing Relays.
pub apply_sequence_shift: bool,
/// Timestamp when the item was received.
pub received_at: DateTime<Utc>,
/// Client local timestamp when the SDK sent the item.
Expand Down Expand Up @@ -41,7 +47,8 @@ where
let timestamp = item
.value_mut()
.as_mut()
.and_then(|t| t.reference_timestamp_mut().value().copied());
.map(|t| t.reference_timestamp_mut())
.and_then(|ts| ts.value().copied());

if let Some(timestamp) = timestamp {
if config
Expand Down Expand Up @@ -69,6 +76,28 @@ where
processor.apply_correction_meta(item.reference_timestamp_mut().meta_mut());
}
}

let sequence = item
.value()
.and_then(|t| t.timestamp_sequence())
.filter(|d| *d > 0);

let timestamp = item
.value_mut()
.as_mut()
.map(|t| t.reference_timestamp_mut());

if config.apply_sequence_shift
&& let Some(sequence) = sequence
&& let Some(ts) = timestamp
&& let Some(ts_value) = ts.value_mut()
{
// Always unconditionally apply the time-shift, this puts us potentially slightly over `max_in_future`,
// by up to ~5s, but this is preferable over losing the ordering.
ts_value.0 += chrono::TimeDelta::nanoseconds(sequence.into());
ts.meta_mut()
.add_remark(Remark::new(RemarkType::Substituted, "timestamp.sequence"));
}
Comment thread
Dav1dde marked this conversation as resolved.
}

/// Items which can be processed by [`normalize`].
Expand All @@ -77,32 +106,64 @@ pub trait TimeNormalize: ProcessValue {
///
/// Represents the timestamp when the item was created.
fn reference_timestamp_mut(&mut self) -> &mut Annotated<Timestamp>;

/// A tie breaker sent from SDKs for timestamps.
///
/// This is usually stored in [`SENTRY__TIMESTAMP__SEQUENCE`] and applied as additional
/// nanoseconds to the timestamp.
fn timestamp_sequence(&self) -> Option<u32>;
}

impl TimeNormalize for OurLog {
fn reference_timestamp_mut(&mut self) -> &mut Annotated<Timestamp> {
&mut self.timestamp
}

fn timestamp_sequence(&self) -> Option<u32> {
get_timestamp_sequence(&self.attributes)
}
}

impl TimeNormalize for SpanV2 {
fn reference_timestamp_mut(&mut self) -> &mut Annotated<Timestamp> {
&mut self.start_timestamp
}

fn timestamp_sequence(&self) -> Option<u32> {
// Not supported for spans.
//
// If this ever becomes necessary to add, extra care must be taken to not create invalid
// spans where the start timestamp is moved after the end timestamp.
None
Comment on lines +133 to +137
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we potentially resolve this for spans (if it ever becomes necessary, not now) by having reference_timestamp(_mut) return an iterator of timestamps?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something like that was my idea, I was though thinking instead of applying it separately, there is maybe a way where we can adjust the clockdrift processor or a separate processor which applies the offset to all timestamps equally.

}
}

impl TimeNormalize for TraceMetric {
fn reference_timestamp_mut(&mut self) -> &mut Annotated<Timestamp> {
&mut self.timestamp
}

fn timestamp_sequence(&self) -> Option<u32> {
get_timestamp_sequence(&self.attributes)
}
}

fn get_timestamp_sequence(attributes: &Annotated<Attributes>) -> Option<u32> {
attributes
.value()
.and_then(|attrs| attrs.get_value(SENTRY__TIMESTAMP__SEQUENCE))
.and_then(|v| v.as_f64())
.map(|v| v as _)
Comment thread
cursor[bot] marked this conversation as resolved.
}

#[cfg(test)]
mod tests {
use super::*;

use relay_event_schema::processor::ProcessValue;
use relay_protocol::{Annotated, Empty, FromValue, IntoValue, assert_annotated_snapshot};
use relay_protocol::{
Annotated, Empty, FromValue, IntoValue, assert_annotated_snapshot, get_value,
};

#[derive(Debug, Clone, FromValue, IntoValue, Empty, ProcessValue)]
struct TestItem {
Expand All @@ -114,6 +175,10 @@ mod tests {
fn reference_timestamp_mut(&mut self) -> &mut Annotated<Timestamp> {
&mut self.base
}

fn timestamp_sequence(&self) -> Option<u32> {
Some(123)
}
}

fn ts(secs: i64) -> Timestamp {
Expand Down Expand Up @@ -255,4 +320,95 @@ mod tests {
}
"#);
}

#[test]
fn test_normalize_time_sequence_shift() {
let mut item = Annotated::new(TestItem {
base: ts(90_000).into(),
other: ts(80_000).into(),
});

let config = Config {
apply_sequence_shift: true,
..Default::default()
};

normalize(&mut item, config);

insta::assert_json_snapshot!(IntoValue::extract_meta_tree(&item), @r#"
{
"base": {
"": {
"rem": [
[
"timestamp.sequence",
"s"
]
]
}
}
}
"#);

// Need to assert the raw values instead of a snapshot because the serialization format of
// `Timestamp` is not precise enough for nanosecond precision.
assert_eq!(
get_value!(item.base!).0,
DateTime::from_timestamp_secs(90_000).unwrap() + chrono::TimeDelta::nanoseconds(123)
);
assert_eq!(
get_value!(item.other!).0,
DateTime::from_timestamp_secs(80_000).unwrap()
);
}

#[test]
fn test_normalize_time_sequence_shift_and_correction() {
let mut item = Annotated::new(TestItem {
base: ts(90_000).into(),
other: ts(80_000).into(),
});

let config = Config {
apply_sequence_shift: true,
received_at: ts(10_000).0,
max_in_future: Some(Duration::from_secs(10)),
..Default::default()
};

normalize(&mut item, config);

insta::assert_json_snapshot!(IntoValue::extract_meta_tree(&item), @r#"
{
"base": {
"": {
"rem": [
[
"timestamp.sequence",
"s"
]
],
"err": [
[
"future_timestamp",
{
"sdk_time": "1970-01-02T01:00:00+00:00",
"server_time": "1970-01-01T02:46:40+00:00"
}
]
]
}
}
}
"#);

assert_eq!(
get_value!(item.base!).0,
DateTime::from_timestamp_secs(10_000).unwrap() + chrono::TimeDelta::nanoseconds(123)
);
assert_eq!(
get_value!(item.other!).0,
DateTime::from_timestamp_secs(0).unwrap()
);
}
}
1 change: 1 addition & 0 deletions relay-server/src/processing/utils/normalize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ where
F: FnOnce(&RetentionsConfig) -> Option<&RetentionConfig>,
{
eap::time::Config {
apply_sequence_shift: ctx.is_processing(),
received_at: headers.meta().received_at(),
sent_at: headers.sent_at(),
max_in_past: Some(retention_days_to_duration(
Expand Down
88 changes: 87 additions & 1 deletion tests/integration/test_ourlogs.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def envelope_with_sentry_logs(*payloads: dict, metadata=None) -> Envelope:
def timestamps(ts: datetime):
return {
"sentry.observed_timestamp_nanos": {
"stringValue": time_within(ts, expect_resolution="ns")
"stringValue": time_within_delta(ts, expect_resolution="ns")
},
"sentry.timestamp_precise": {
"intValue": time_within_delta(
Expand Down Expand Up @@ -1097,6 +1097,92 @@ def test_time_corrections(mini_sentry, relay, delta, error):
}


def test_time_sequence_shift(mini_sentry, relay_with_processing, items_consumer):
items_consumer = items_consumer()

project_id = 42
project_config = mini_sentry.add_full_project_config(project_id)
project_config["config"]["features"] = ["organizations:ourlogs-ingestion"]

relay = relay_with_processing(options=TEST_CONFIG)

ts = datetime.now(timezone.utc)
seq_shift_in_secs = 1.0

envelope = envelope_with_sentry_logs(
{
"timestamp": ts.timestamp(),
"trace_id": "5b8efff798038103d269b633813fc60c",
"span_id": "eee19b7ec3c1b175",
"level": "error",
"body": "foo",
"attributes": {
"sentry.timestamp.sequence": {
"value": int(seq_shift_in_secs * 1e9),
"type": "integer",
},
},
},
)

relay.send_envelope(project_id, envelope)

assert items_consumer.get_item() == {
"attributes": {
"browser.name": {
"stringValue": "Firefox",
},
"browser.version": {
"stringValue": "42.0",
},
"sentry._meta.fields.timestamp": {
"stringValue": '{"meta":{"":{"rem":[["timestamp.sequence","s"]]}}}',
},
"sentry.body": {
"stringValue": "foo",
},
"sentry.observed_timestamp_nanos": {
"stringValue": time_within_delta(ts, expect_resolution="ns")
},
"sentry.payload_size_bytes": {
"intValue": "36",
},
"sentry.severity_text": {
"stringValue": "error",
},
"sentry.span_id": {
"stringValue": "eee19b7ec3c1b175",
},
"sentry.timestamp.sequence": {
"intValue": "1000000000",
},
"sentry.timestamp_precise": {
"intValue": time_within_delta(
ts + timedelta(seconds=seq_shift_in_secs),
delta=timedelta(
seconds=0,
),
expect_resolution="ns",
precision="us",
)
},
},
"clientSampleRate": 1.0,
"downsampledRetentionDays": 90,
"itemId": any(),
"itemType": "TRACE_ITEM_TYPE_LOG",
"organizationId": "1",
"projectId": "42",
"received": time_within_delta(),
"retentionDays": 90,
"serverSampleRate": 1.0,
"timestamp": time_within_delta(
ts + timedelta(seconds=seq_shift_in_secs), delta=timedelta(), precision="ms"
),
"traceId": "5b8efff798038103d269b633813fc60c",
}


@pytest.mark.parametrize(
"metadata,client_ip,browser",
[
Expand Down
Loading
Loading