Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions relay-dynamic-config/src/global.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,16 @@ pub struct Options {
)]
pub eap_outcomes_rollout_rate: f32,

/// Rollout rate for accepted outcomes for spans being emitted by EAP instead of Relay.
///
/// Rate needs to be between `0.0` and `1.0`.
#[serde(
rename = "relay.eap-span-outcomes.rollout-rate",
deserialize_with = "default_on_error",
skip_serializing_if = "is_default"
)]
pub eap_span_outcomes_rollout_rate: f32,

/// All other unknown options.
#[serde(flatten)]
other: HashMap<String, Value>,
Expand Down
70 changes: 48 additions & 22 deletions relay-server/src/services/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,15 @@ impl StoreService {
let scoping = message.scoping();
let received_at = message.received_at();

let relay_emits_accepted_outcome = !utils::is_rolled_out(
scoping.organization_id.value(),
self.global_config
.current()
.options
.eap_span_outcomes_rollout_rate,
)
.is_keep();

let meta = SpanMeta {
organization_id: scoping.organization_id,
project_id: scoping.project_id,
Expand All @@ -639,6 +648,7 @@ impl StoreService {
retention_days: message.retention_days,
downsampled_retention_days: message.downsampled_retention_days,
received: datetime_to_timestamp(received_at),
accepted_outcome_emitted: relay_emits_accepted_outcome,
};

let result = message.try_accept(|span| {
Expand All @@ -664,17 +674,19 @@ impl StoreService {
via = "processing"
);

// XXX: Temporarily produce span outcomes. Keep in sync with either EAP
// or the segments consumer, depending on which will produce outcomes later.
self.outcome_aggregator.send(TrackOutcome {
category: DataCategory::SpanIndexed,
event_id: None,
outcome: Outcome::Accepted,
quantity: 1,
remote_addr: None,
scoping,
timestamp: received_at,
});
if relay_emits_accepted_outcome {
// XXX: Temporarily produce span outcomes. Keep in sync with either EAP
// or the segments consumer, depending on which will produce outcomes later.
self.outcome_aggregator.send(TrackOutcome {
category: DataCategory::SpanIndexed,
event_id: None,
outcome: Outcome::Accepted,
quantity: 1,
remote_addr: None,
scoping,
timestamp: received_at,
});
}
}
}

Expand Down Expand Up @@ -1031,6 +1043,15 @@ impl StoreService {
key_id,
} = scoping;

let relay_emits_accepted_outcome = !utils::is_rolled_out(
scoping.organization_id.value(),
self.global_config
.current()
.options
.eap_span_outcomes_rollout_rate,
)
.is_keep();

let payload = item.payload();
let message = SpanKafkaMessageRaw {
meta: SpanMeta {
Expand All @@ -1041,6 +1062,7 @@ impl StoreService {
retention_days,
downsampled_retention_days,
received: datetime_to_timestamp(received_at),
accepted_outcome_emitted: relay_emits_accepted_outcome,
},
span: serde_json::from_slice(&payload)
.map_err(|e| StoreError::EncodingFailed(e.into()))?,
Expand All @@ -1065,17 +1087,19 @@ impl StoreService {
},
)?;

// XXX: Temporarily produce span outcomes. Keep in sync with either EAP
// or the segments consumer, depending on which will produce outcomes later.
self.outcome_aggregator.send(TrackOutcome {
category: DataCategory::SpanIndexed,
event_id: None,
outcome: Outcome::Accepted,
quantity: 1,
remote_addr: None,
scoping,
timestamp: received_at,
});
if relay_emits_accepted_outcome {
// XXX: Temporarily produce span outcomes. Keep in sync with either EAP
// or the segments consumer, depending on which will produce outcomes later.
self.outcome_aggregator.send(TrackOutcome {
category: DataCategory::SpanIndexed,
event_id: None,
outcome: Outcome::Accepted,
quantity: 1,
remote_addr: None,
scoping,
timestamp: received_at,
});
}

Ok(())
}
Expand Down Expand Up @@ -1372,6 +1396,8 @@ struct SpanMeta {
retention_days: u16,
/// Number of days until the downsampled version of this data should be deleted.
downsampled_retention_days: u16,
/// Indicates whether Relay already emitted an accepted outcome or if EAP still needs to emit it.
accepted_outcome_emitted: bool,
}

#[derive(Clone, Debug, Serialize)]
Expand Down
45 changes: 44 additions & 1 deletion tests/integration/test_ai.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,27 @@
from datetime import datetime, timezone
from unittest import mock

import pytest

from sentry_relay.consts import DataCategory

from .asserts import time_within_delta


@pytest.mark.parametrize(
"eap_span_outcomes_rollout_rate",
[
pytest.param(0.0, id="relay_emits_accepted_outcome"),
pytest.param(1.0, id="eap_emits_accepted_outcome"),
],
)
def test_ai_spans_example_transaction(
mini_sentry, relay, relay_with_processing, spans_consumer
mini_sentry,
relay,
relay_with_processing,
spans_consumer,
outcomes_consumer,
eap_span_outcomes_rollout_rate,
):
"""
Asserts the span output of an example AI agent workflow.
Expand All @@ -14,6 +30,7 @@ def test_ai_spans_example_transaction(
The example was taken from a test application with a real agentic workflow.
"""
spans_consumer = spans_consumer()
outcomes_consumer = outcomes_consumer()

project_id = 42
mini_sentry.add_full_project_config(project_id)
Expand All @@ -30,6 +47,10 @@ def test_ai_spans_example_transaction(
},
},
}
mini_sentry.global_config["options"][
"relay.eap-span-outcomes.rollout-rate"
] = eap_span_outcomes_rollout_rate
relay_emits_accepted_outcome = eap_span_outcomes_rollout_rate == 0.0

relay = relay(relay_with_processing())

Expand Down Expand Up @@ -441,6 +462,7 @@ def test_ai_spans_example_transaction(
"project_id": 42,
"received": time_within_delta(),
"retention_days": 90,
"accepted_outcome_emitted": relay_emits_accepted_outcome,
"span_id": "13e7c1ffd66981f0",
"start_timestamp": time_within_delta(),
"status": "ok",
Expand Down Expand Up @@ -572,6 +594,7 @@ def test_ai_spans_example_transaction(
"project_id": 42,
"received": time_within_delta(),
"retention_days": 90,
"accepted_outcome_emitted": relay_emits_accepted_outcome,
"span_id": "af7b547f4d5f4f49",
"start_timestamp": time_within_delta(),
"status": "ok",
Expand Down Expand Up @@ -646,6 +669,7 @@ def test_ai_spans_example_transaction(
"project_id": 42,
"received": time_within_delta(),
"retention_days": 90,
"accepted_outcome_emitted": relay_emits_accepted_outcome,
"span_id": "5ff84ff6bf512012",
"start_timestamp": time_within_delta(),
"status": "ok",
Expand Down Expand Up @@ -714,6 +738,7 @@ def test_ai_spans_example_transaction(
"project_id": 42,
"received": time_within_delta(),
"retention_days": 90,
"accepted_outcome_emitted": relay_emits_accepted_outcome,
"span_id": "d2fd68d7cf6eb933",
"start_timestamp": time_within_delta(),
"status": "ok",
Expand Down Expand Up @@ -786,6 +811,7 @@ def test_ai_spans_example_transaction(
"project_id": 42,
"received": time_within_delta(),
"retention_days": 90,
"accepted_outcome_emitted": relay_emits_accepted_outcome,
"span_id": "672681a999129905",
"start_timestamp": time_within_delta(),
"status": "ok",
Expand Down Expand Up @@ -851,6 +877,7 @@ def test_ai_spans_example_transaction(
"project_id": 42,
"received": time_within_delta(),
"retention_days": 90,
"accepted_outcome_emitted": relay_emits_accepted_outcome,
"span_id": "e5bb8f1d156e7649",
"start_timestamp": time_within_delta(),
"status": "ok",
Expand Down Expand Up @@ -923,6 +950,7 @@ def test_ai_spans_example_transaction(
"project_id": 42,
"received": time_within_delta(),
"retention_days": 90,
"accepted_outcome_emitted": relay_emits_accepted_outcome,
"span_id": "3d755d9884113eba",
"start_timestamp": time_within_delta(),
"status": "ok",
Expand Down Expand Up @@ -1054,6 +1082,7 @@ def test_ai_spans_example_transaction(
"project_id": 42,
"received": time_within_delta(),
"retention_days": 90,
"accepted_outcome_emitted": relay_emits_accepted_outcome,
"span_id": "bdf1648756367ee5",
"start_timestamp": time_within_delta(),
"status": "ok",
Expand Down Expand Up @@ -1128,6 +1157,7 @@ def test_ai_spans_example_transaction(
"project_id": 42,
"received": time_within_delta(),
"retention_days": 90,
"accepted_outcome_emitted": relay_emits_accepted_outcome,
"span_id": "203e925b464ad87b",
"start_timestamp": time_within_delta(),
"status": "ok",
Expand Down Expand Up @@ -1177,9 +1207,22 @@ def test_ai_spans_example_transaction(
"project_id": 42,
"received": time_within_delta(),
"retention_days": 90,
"accepted_outcome_emitted": relay_emits_accepted_outcome,
"span_id": "657cf984a6a4e59b",
"start_timestamp": time_within_delta(),
"status": "ok",
"trace_id": "a9351cd574f092f6acad48e250981f11",
},
]

if relay_emits_accepted_outcome:
assert outcomes_consumer.get_aggregated_outcomes() == [
{
"category": DataCategory.SPAN_INDEXED.value,
"key_id": 123,
"org_id": 1,
"outcome": 0,
"project_id": 42,
"quantity": 10,
}
]
29 changes: 29 additions & 0 deletions tests/integration/test_spans.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@


@pytest.mark.parametrize("performance_issues_spans", [False, True])
@pytest.mark.parametrize(
"eap_span_outcomes_rollout_rate",
[
pytest.param(0.0, id="relay_emits_accepted_outcome"),
pytest.param(1.0, id="eap_emits_accepted_outcome"),
],
)
def test_span_extraction(
mini_sentry,
relay_with_processing,
Expand All @@ -34,11 +41,19 @@ def test_span_extraction(
events_consumer,
metrics_consumer,
performance_issues_spans,
outcomes_consumer,
eap_span_outcomes_rollout_rate,
):
spans_consumer = spans_consumer()
transactions_consumer = transactions_consumer()
events_consumer = events_consumer()
metrics_consumer = metrics_consumer()
outcomes_consumer = outcomes_consumer()

mini_sentry.global_config["options"][
"relay.eap-span-outcomes.rollout-rate"
] = eap_span_outcomes_rollout_rate
relay_emits_accepted_outcome = eap_span_outcomes_rollout_rate == 0.0

relay = relay_with_processing(options=TEST_CONFIG)
project_id = 42
Expand Down Expand Up @@ -167,6 +182,7 @@ def test_span_extraction(
"project_id": 42,
"key_id": 123,
"retention_days": 90,
"accepted_outcome_emitted": relay_emits_accepted_outcome,
"span_id": "bbbbbbbbbbbbbbbb",
"start_timestamp": start.timestamp(),
"status": "ok",
Expand Down Expand Up @@ -237,6 +253,7 @@ def test_span_extraction(
"project_id": 42,
"key_id": 123,
"retention_days": 90,
"accepted_outcome_emitted": relay_emits_accepted_outcome,
"span_id": "968cff94913ebb07",
"start_timestamp": start_timestamp.timestamp(),
"status": "ok",
Expand All @@ -247,6 +264,18 @@ def test_span_extraction(

spans_consumer.assert_empty()

if relay_emits_accepted_outcome:
assert outcomes_consumer.get_aggregated_outcomes() == [
{
"category": DataCategory.SPAN_INDEXED.value,
"key_id": 123,
"org_id": 1,
"outcome": 0,
"project_id": 42,
"quantity": 2,
}
]


def test_duplicate_performance_score(mini_sentry, relay):
relay = relay(mini_sentry, options=TEST_CONFIG)
Expand Down
3 changes: 3 additions & 0 deletions tests/integration/test_spans_standalone.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ def test_lcp_span(
"project_id": 42,
"received": time_within(ts),
"retention_days": 90,
"accepted_outcome_emitted": True,
"span_id": "9fd17741416e8e4e",
"start_timestamp": time_within(ts.timestamp() - 0.5),
"status": "ok",
Expand Down Expand Up @@ -342,6 +343,7 @@ def test_cls_span(
"project_id": 42,
"received": time_within(ts),
"retention_days": 90,
"accepted_outcome_emitted": True,
"span_id": "be6fa380c55f2fcb",
"start_timestamp": time_within(ts.timestamp() - 0.5),
"status": "ok",
Expand Down Expand Up @@ -502,6 +504,7 @@ def test_inp_span(
"project_id": 42,
"received": time_within(ts),
"retention_days": 90,
"accepted_outcome_emitted": True,
"span_id": "a6f029fbe0e2389a",
"start_timestamp": time_within(ts.timestamp() - 0.5),
"status": "ok",
Expand Down
Loading
Loading