Skip to content

Commit 6669012

Browse files
feat(tracing): OTel span queue and export telemetry (SGPINF-1863) (#373)
Add OpenTelemetry metrics for async span queue processing and SGP export: queue depth, batch lag, drain duration, shutdown flush timing, and export success/failure counters with bounded HTTP status labels. Introduce AGENTEX_TRACING_METRICS=0|false|no|off kill switch to disable SDK-side recording without code changes. Linear: SGPINF-1863
1 parent 2e1a2da commit 6669012

9 files changed

Lines changed: 760 additions & 2 deletions

File tree

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
# Changelog
22

3+
## Unreleased
4+
5+
### Features
6+
7+
* **tracing:** emit OTel metrics for async span queue depth, batch drain, and SGP export success/failure (HTTP status labels). Disable SDK-side recording with ``AGENTEX_TRACING_METRICS=0``.
8+
39
## 0.11.6 (2026-05-29)
410

511
Full Changelog: [v0.11.5...v0.11.6](https://github.com/scaleapi/scale-agentex-python/compare/v0.11.5...v0.11.6)
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
"""Tests for ``agentex.lib.core.observability.tracing_metrics``."""
2+
3+
from __future__ import annotations
4+
5+
import agentex.lib.core.observability.tracing_metrics as tracing_metrics
6+
from agentex.lib.core.observability.tracing_metrics import (
7+
TracingMetrics,
8+
processor_label,
9+
get_tracing_metrics,
10+
classify_export_error,
11+
)
12+
13+
14+
class TestClassifyExportError:
15+
def test_scale_gp_authentication_error(self):
16+
class AuthenticationError(Exception):
17+
pass
18+
19+
exc = AuthenticationError("Error code: 401 - {'message': 'Not authorized to access Account'}")
20+
assert classify_export_error(exc) == ("authentication", "401")
21+
22+
def test_rate_limit_code(self):
23+
class APIError(Exception):
24+
pass
25+
26+
exc = APIError("Error code: 429 - rate limited")
27+
assert classify_export_error(exc) == ("rate_limit", "429")
28+
29+
def test_server_error_code(self):
30+
class APIError(Exception):
31+
pass
32+
33+
exc = APIError("Error code: 503 - unavailable")
34+
assert classify_export_error(exc) == ("server_error", "5xx")
35+
36+
def test_out_of_range_code_uses_bounded_label(self):
37+
class APIError(Exception):
38+
pass
39+
40+
exc = APIError("Error code: 100 - continue")
41+
assert classify_export_error(exc) == ("other_error", "other")
42+
43+
def test_timeout_by_name(self):
44+
class APITimeoutError(Exception):
45+
pass
46+
47+
assert classify_export_error(APITimeoutError("slow")) == ("timeout", "timeout")
48+
49+
def test_unknown_error(self):
50+
class WeirdError(Exception):
51+
pass
52+
53+
assert classify_export_error(WeirdError("boom")) == ("other_error", "unknown")
54+
55+
56+
class TestProcessorLabel:
57+
def test_sgp_async_processor(self):
58+
class SGPAsyncTracingProcessor:
59+
pass
60+
61+
assert processor_label(SGPAsyncTracingProcessor()) == "sgp"
62+
63+
def test_other_processor(self):
64+
class AgentexAsyncTracingProcessor:
65+
pass
66+
67+
assert processor_label(AgentexAsyncTracingProcessor()) == "other"
68+
69+
70+
class TestGetTracingMetrics:
71+
def test_returns_tracing_metrics_instance(self, monkeypatch):
72+
monkeypatch.setattr(tracing_metrics, "_tracing_metrics", None)
73+
m = get_tracing_metrics()
74+
assert isinstance(m, TracingMetrics)
75+
76+
def test_singleton_returns_same_instance(self, monkeypatch):
77+
monkeypatch.setattr(tracing_metrics, "_tracing_metrics", None)
78+
first = get_tracing_metrics()
79+
second = get_tracing_metrics()
80+
assert first is second
81+
82+
def test_instruments_exist(self, monkeypatch):
83+
monkeypatch.setattr(tracing_metrics, "_tracing_metrics", None)
84+
m = get_tracing_metrics()
85+
for name in (
86+
"span_events_enqueued",
87+
"span_events_dropped",
88+
"queue_depth",
89+
"queue_lag",
90+
"batch_items",
91+
"batch_size",
92+
"batch_drain_duration",
93+
"export_batches",
94+
"export_spans",
95+
"export_batch_failures",
96+
"export_span_failures",
97+
"shutdown_timeouts",
98+
"shutdown_remaining_items",
99+
):
100+
assert hasattr(m, name), f"missing instrument: {name}"
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
"""Tests for ``agentex.lib.core.observability.tracing_metrics_recording``."""
2+
3+
from __future__ import annotations
4+
5+
from unittest.mock import MagicMock, patch
6+
7+
import agentex.lib.core.observability.tracing_metrics_recording as recording
8+
9+
10+
class _Item:
11+
def __init__(self, enqueued_at: float | None) -> None:
12+
self.enqueued_at = enqueued_at
13+
14+
15+
class TestIsMetricsEnabled:
16+
def setup_method(self) -> None:
17+
recording._metrics_enabled = None
18+
recording._tracing = None
19+
20+
def test_enabled_by_default(self, monkeypatch):
21+
monkeypatch.delenv("AGENTEX_TRACING_METRICS", raising=False)
22+
assert recording.is_metrics_enabled() is True
23+
24+
def test_disabled_by_zero(self, monkeypatch):
25+
monkeypatch.setenv("AGENTEX_TRACING_METRICS", "0")
26+
recording._metrics_enabled = None
27+
assert recording.is_metrics_enabled() is False
28+
29+
30+
class TestRecordingHelpers:
31+
def setup_method(self) -> None:
32+
recording._metrics_enabled = None
33+
recording._tracing = None
34+
35+
def test_record_span_enqueued_when_disabled_does_not_load_metrics(self, monkeypatch):
36+
monkeypatch.setenv("AGENTEX_TRACING_METRICS", "0")
37+
recording._metrics_enabled = None
38+
with patch(
39+
"agentex.lib.core.observability.tracing_metrics.get_tracing_metrics"
40+
) as mock_get:
41+
recording.record_span_enqueued("start")
42+
mock_get.assert_not_called()
43+
44+
def test_record_span_enqueued_when_enabled(self, monkeypatch):
45+
monkeypatch.setenv("AGENTEX_TRACING_METRICS", "1")
46+
recording._metrics_enabled = None
47+
mock_metrics = MagicMock()
48+
with patch(
49+
"agentex.lib.core.observability.tracing_metrics.get_tracing_metrics",
50+
return_value=mock_metrics,
51+
):
52+
recording.record_span_enqueued("end")
53+
mock_metrics.span_events_enqueued.add.assert_called_once_with(1, {"event_type": "end"})
54+
55+
def test_monotonic_if_enabled_respects_kill_switch(self, monkeypatch):
56+
monkeypatch.setenv("AGENTEX_TRACING_METRICS", "0")
57+
recording._metrics_enabled = None
58+
assert recording.monotonic_if_enabled() is None
59+
60+
def test_record_batch_coalesced_records_lag(self, monkeypatch):
61+
monkeypatch.setenv("AGENTEX_TRACING_METRICS", "1")
62+
recording._metrics_enabled = None
63+
mock_metrics = MagicMock()
64+
with patch(
65+
"agentex.lib.core.observability.tracing_metrics.get_tracing_metrics",
66+
return_value=mock_metrics,
67+
), patch("agentex.lib.core.observability.tracing_metrics_recording.time.monotonic", return_value=10.0):
68+
recording.record_batch_coalesced(
69+
queue_depth=3,
70+
batch_items=[_Item(9.5), _Item(9.0)],
71+
)
72+
mock_metrics.queue_depth.record.assert_called_once_with(3)
73+
mock_metrics.batch_items.record.assert_called_once_with(2)
74+
mock_metrics.queue_lag.record.assert_called_once_with(1000.0)
75+
76+
def test_record_export_failure(self, monkeypatch):
77+
monkeypatch.setenv("AGENTEX_TRACING_METRICS", "1")
78+
recording._metrics_enabled = None
79+
mock_metrics = MagicMock()
80+
81+
class AuthenticationError(Exception):
82+
pass
83+
84+
exc = AuthenticationError("Error code: 401 - denied")
85+
processor = type("SGPAsyncTracingProcessor", (), {})()
86+
87+
with patch(
88+
"agentex.lib.core.observability.tracing_metrics.get_tracing_metrics",
89+
return_value=mock_metrics,
90+
):
91+
recording.record_export_failure(
92+
processor=processor,
93+
event_type="start",
94+
span_count=5,
95+
exc=exc,
96+
)
97+
98+
mock_metrics.export_batch_failures.add.assert_called_once()
99+
mock_metrics.export_span_failures.add.assert_called_once_with(
100+
5,
101+
{
102+
"processor": "sgp",
103+
"event_type": "start",
104+
"http_code": "401",
105+
"error_class": "authentication",
106+
},
107+
)
108+
109+
def test_record_export_success(self, monkeypatch):
110+
monkeypatch.setenv("AGENTEX_TRACING_METRICS", "1")
111+
recording._metrics_enabled = None
112+
mock_metrics = MagicMock()
113+
with patch(
114+
"agentex.lib.core.observability.tracing_metrics.get_tracing_metrics",
115+
return_value=mock_metrics,
116+
):
117+
recording.record_export_success(event_type="end", span_count=12, processor="sgp")
118+
119+
mock_metrics.export_batches.add.assert_called_once_with(
120+
1,
121+
{"processor": "sgp", "event_type": "end"},
122+
)
123+
mock_metrics.export_spans.add.assert_called_once_with(
124+
12,
125+
{"processor": "sgp", "event_type": "end"},
126+
)
127+
128+
def test_record_export_success_accepts_processor_label(self, monkeypatch):
129+
monkeypatch.setenv("AGENTEX_TRACING_METRICS", "1")
130+
recording._metrics_enabled = None
131+
mock_metrics = MagicMock()
132+
with patch(
133+
"agentex.lib.core.observability.tracing_metrics.get_tracing_metrics",
134+
return_value=mock_metrics,
135+
):
136+
recording.record_export_success(
137+
event_type="start", span_count=3, processor="other"
138+
)
139+
140+
mock_metrics.export_batches.add.assert_called_once_with(
141+
1,
142+
{"processor": "other", "event_type": "start"},
143+
)

0 commit comments

Comments
 (0)