Skip to content

Commit 2b2a25b

Browse files
fix(tracing): address Greptile review on queue depth and labels
Include batch size in queue_depth sampling, clamp out-of-range HTTP codes to a bounded label, and parameterize export success processor.
1 parent 7893cfd commit 2b2a25b

7 files changed

Lines changed: 57 additions & 6 deletions

File tree

src/agentex/lib/core/observability/tests/test_tracing_metrics.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,13 @@ class APIError(Exception):
3333
exc = APIError("Error code: 503 - unavailable")
3434
assert classify_export_error(exc) == ("server_error", "5xx")
3535

36+
def test_out_of_range_code_uses_bounded_label(self):
37+
class APIError(Exception):
38+
pass
39+
40+
exc = APIError("Error code: 100 - continue")
41+
assert classify_export_error(exc) == ("other_error", "other")
42+
3643
def test_timeout_by_name(self):
3744
class APITimeoutError(Exception):
3845
pass

src/agentex/lib/core/observability/tests/test_tracing_metrics_recording.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,3 +122,20 @@ def test_record_export_success(self, monkeypatch):
122122
12,
123123
{"processor": "sgp", "event_type": "end", "outcome": "success"},
124124
)
125+
126+
def test_record_export_success_accepts_processor_label(self, monkeypatch):
127+
monkeypatch.setenv("AGENTEX_TRACING_METRICS", "1")
128+
recording._metrics_enabled = None
129+
mock_metrics = MagicMock()
130+
with patch(
131+
"agentex.lib.core.observability.tracing_metrics.get_tracing_metrics",
132+
return_value=mock_metrics,
133+
):
134+
recording.record_export_success(
135+
event_type="start", span_count=3, processor="other"
136+
)
137+
138+
mock_metrics.export_batches.add.assert_called_once_with(
139+
1,
140+
{"processor": "other", "event_type": "start", "outcome": "success"},
141+
)

src/agentex/lib/core/observability/tracing_metrics.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ def classify_export_error(exc: BaseException) -> tuple[str, str]:
148148
return "client_error", "4xx"
149149
if 500 <= code < 600:
150150
return "server_error", "5xx"
151-
return "other_error", str(code)
151+
return "other_error", "other"
152152

153153
if any(s in name for s in ("Authentication", "Permission")):
154154
return "authentication", "unknown"

src/agentex/lib/core/observability/tracing_metrics_recording.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,13 +97,13 @@ def record_batch_phase(*, phase: str, size: int, duration_ms: float) -> None:
9797
pass
9898

9999

100-
def record_export_success(*, event_type: str, span_count: int) -> None:
100+
def record_export_success(*, event_type: str, span_count: int, processor: str = "sgp") -> None:
101101
if not is_metrics_enabled():
102102
return
103103
try:
104104
from agentex.lib.core.observability.tracing_metrics import get_tracing_metrics
105105

106-
attrs = {"processor": "sgp", "event_type": event_type, "outcome": "success"}
106+
attrs = {"processor": processor, "event_type": event_type, "outcome": "success"}
107107
metrics = get_tracing_metrics()
108108
metrics.export_batches.add(1, attrs)
109109
metrics.export_spans.add(span_count, attrs)

src/agentex/lib/core/tracing/processors/sgp_tracing_processor.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,9 @@ async def on_spans_start(self, spans: list[Span]) -> None:
160160

161161
sgp_spans = [_build_sgp_span(span, self.env_vars) for span in spans]
162162
await client.spans.upsert_batch(items=[s.to_request_params() for s in sgp_spans])
163-
_metrics.record_export_success(event_type="start", span_count=len(spans))
163+
_metrics.record_export_success(
164+
event_type="start", span_count=len(spans), processor="sgp"
165+
)
164166

165167
@override
166168
async def on_spans_end(self, spans: list[Span]) -> None:
@@ -177,7 +179,9 @@ async def on_spans_end(self, spans: list[Span]) -> None:
177179
sgp_span.end_time = span.end_time.isoformat() # type: ignore[union-attr]
178180
sgp_spans.append(sgp_span)
179181
await client.spans.upsert_batch(items=[s.to_request_params() for s in sgp_spans])
180-
_metrics.record_export_success(event_type="end", span_count=len(sgp_spans))
182+
_metrics.record_export_success(
183+
event_type="end", span_count=len(sgp_spans), processor="sgp"
184+
)
181185

182186
@override
183187
async def shutdown(self) -> None:

src/agentex/lib/core/tracing/span_queue.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ async def _drain_loop(self) -> None:
138138

139139
try:
140140
_metrics.record_batch_coalesced(
141-
queue_depth=self._queue.qsize(),
141+
queue_depth=self._queue.qsize() + len(batch),
142142
batch_items=batch,
143143
)
144144

tests/lib/core/tracing/test_span_queue.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -503,6 +503,29 @@ async def capture_end(span: Span) -> None:
503503

504504

505505
class TestAsyncSpanQueueMetrics:
506+
async def test_batch_coalesced_records_depth_including_batch(self, monkeypatch):
507+
monkeypatch.setenv("AGENTEX_TRACING_METRICS", "1")
508+
import agentex.lib.core.observability.tracing_metrics_recording as recording
509+
510+
recording._metrics_enabled = None
511+
proc = _make_processor()
512+
queue = AsyncSpanQueue(linger_ms=0)
513+
recorded_depths: list[int] = []
514+
515+
def capture_coalesced(*, queue_depth: int, batch_items: object) -> None:
516+
recorded_depths.append(queue_depth)
517+
518+
with patch.object(recording, "record_batch_coalesced", side_effect=capture_coalesced):
519+
for _ in range(3):
520+
queue.enqueue(SpanEventType.START, _make_span(), [proc])
521+
await asyncio.sleep(0.05)
522+
await queue.shutdown()
523+
524+
assert recorded_depths, "expected at least one coalesced batch"
525+
assert recorded_depths[0] >= 3, (
526+
f"queue_depth should include batch items removed from queue, got {recorded_depths[0]}"
527+
)
528+
506529
async def test_enqueue_records_enqueued_metric(self, monkeypatch):
507530
monkeypatch.setenv("AGENTEX_TRACING_METRICS", "1")
508531
import agentex.lib.core.observability.tracing_metrics_recording as recording

0 commit comments

Comments
 (0)