Skip to content

Commit f0a28d9

Browse files
Merge remote-tracking branch 'origin/stas/tracing-concurrent-egress' into SGPINF-1863-span-queue-telemetry-stas-tracing-concurrent-egress
2 parents 7f956b7 + c6b117e commit f0a28d9

2 files changed

Lines changed: 238 additions & 50 deletions

File tree

src/agentex/lib/core/tracing/span_queue.py

Lines changed: 122 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,12 @@
2222
_DEFAULT_MAX_SIZE = 0
2323
# Total attempts per batch for a *transient* failure (1 == no retry).
2424
_DEFAULT_MAX_RETRIES = 1
25+
# Max number of batch-export HTTP requests in flight at once. The export
26+
# backend (EGP) processes each upsert_batch in ~150ms but serves many requests
27+
# concurrently; issuing one batch at a time caps per-pod egress at ~1/latency.
28+
# Sending several concurrently lets a pod keep up with span production under
29+
# load. ``1`` restores the old strictly-serial behavior.
30+
_DEFAULT_CONCURRENCY = 8
2531
# HTTP statuses worth retrying at the queue level. These are explicit
2632
# backpressure / transient signals; everything else (esp. 401/403/4xx auth and
2733
# validation errors) is a permanent failure that re-enqueuing cannot fix. Note
@@ -79,15 +85,23 @@ class AsyncSpanQueue:
7985
"""Background FIFO queue for async span processing.
8086
8187
Span events are enqueued synchronously (non-blocking) and drained by a
82-
background task. Items are processed in batches: all START events in a
83-
batch are flushed concurrently, then all END events, so that per-span
84-
start-before-end ordering is preserved while HTTP calls for independent
85-
spans execute in parallel.
86-
87-
Once the drain loop picks up the first item, it lingers up to
88-
``linger_ms`` waiting for more items to coalesce into the same batch.
89-
Without the linger the drain almost always returned size-1 batches under
90-
real agent workloads, because spans typically arrive a few ms apart.
88+
background task. The drain coalesces ready events into batches and
89+
*dispatches* each batch's export as its own task, so up to ``concurrency``
90+
batch requests can be in flight at once. This matters because each
91+
``upsert_batch`` HTTP call takes tens-to-hundreds of ms server-side; issuing
92+
them one at a time caps a pod's egress at ~1/latency and lets a backlog
93+
build under load.
94+
95+
Ordering guarantee: a span's START export always completes before its END
96+
export is issued. END batches wait on the START batches that were in flight
97+
when they were formed; because a span's START is always enqueued before its
98+
END, that span's START send is either still in flight (and waited on) or
99+
already finished. Independent spans export fully concurrently.
100+
101+
Once the drain loop picks up the first item, it lingers up to ``linger_ms``
102+
waiting for more items to coalesce into the same batch. Without the linger
103+
the drain almost always returned size-1 batches under real agent workloads,
104+
because spans typically arrive a few ms apart.
91105
92106
Reliability:
93107
- ``max_size`` bounds the queue. When full, new events are dropped and
@@ -104,6 +118,7 @@ def __init__(
104118
linger_ms: int | None = None,
105119
max_size: int | None = None,
106120
max_retries: int | None = None,
121+
concurrency: int | None = None,
107122
) -> None:
108123
resolved_max_size = (
109124
_read_int_env("AGENTEX_SPAN_QUEUE_MAX_SIZE", _DEFAULT_MAX_SIZE) if max_size is None else max(0, max_size)
@@ -118,6 +133,17 @@ def __init__(
118133
if max_retries is None
119134
else max(1, max_retries)
120135
)
136+
self._concurrency = (
137+
_read_int_env("AGENTEX_SPAN_QUEUE_CONCURRENCY", _DEFAULT_CONCURRENCY, minimum=1)
138+
if concurrency is None
139+
else max(1, concurrency)
140+
)
141+
# Bounds concurrent export HTTP requests.
142+
self._send_sema = asyncio.Semaphore(self._concurrency)
143+
# Outstanding dispatched send tasks, and the subset that are START
144+
# sends (END sends wait on these to preserve per-span ordering).
145+
self._inflight: set[asyncio.Task[None]] = set()
146+
self._inflight_starts: set[asyncio.Task[None]] = set()
121147
# Total spans dropped for any reason (full queue, shutdown, permanent
122148
# failure, exhausted retries). Surfaced for metrics/observability so
123149
# span loss stops being silent.
@@ -186,6 +212,11 @@ def _ensure_drain_running(self) -> None:
186212

187213
async def _drain_loop(self) -> None:
188214
while True:
215+
# Backpressure: cap the number of in-flight send tasks so the drain
216+
# does not run unboundedly ahead of the exporters.
217+
while len(self._inflight) >= self._concurrency:
218+
await asyncio.wait(set(self._inflight), return_when=asyncio.FIRST_COMPLETED)
219+
189220
# Block until at least one item is available.
190221
first = await self._queue.get()
191222
batch: list[_SpanQueueItem] = [first]
@@ -213,39 +244,59 @@ async def _drain_loop(self) -> None:
213244
except asyncio.QueueEmpty:
214245
break
215246

216-
try:
217-
_metrics.record_batch_coalesced(
218-
queue_depth=self._queue.qsize() + len(batch),
219-
batch_items=batch,
220-
)
247+
_metrics.record_batch_coalesced(
248+
queue_depth=self._queue.qsize() + len(batch),
249+
batch_items=batch,
250+
)
221251

222-
# Separate START and END events. Processing all STARTs before
223-
# ENDs ensures that on_span_start completes before on_span_end
224-
# for any span whose both events land in the same batch.
225-
starts = [i for i in batch if i.event_type == SpanEventType.START]
226-
ends = [i for i in batch if i.event_type == SpanEventType.END]
227-
228-
if starts:
229-
phase_start = time.perf_counter()
230-
await self._process_items(starts)
231-
_metrics.record_batch_phase(
232-
phase="start",
233-
size=len(starts),
234-
duration_ms=(time.perf_counter() - phase_start) * 1000.0,
235-
)
236-
if ends:
237-
phase_start = time.perf_counter()
238-
await self._process_items(ends)
239-
_metrics.record_batch_phase(
240-
phase="end",
241-
size=len(ends),
242-
duration_ms=(time.perf_counter() - phase_start) * 1000.0,
243-
)
244-
finally:
245-
for _ in batch:
246-
self._queue.task_done()
247-
# Release span data for GC.
248-
batch.clear()
252+
# Separate START and END events and dispatch each as its own send
253+
# task. Dispatching STARTs first (so they are registered before the
254+
# END snapshot) guarantees an END never outruns a START of the same
255+
# span whose events land in this batch.
256+
starts = [i for i in batch if i.event_type == SpanEventType.START]
257+
ends = [i for i in batch if i.event_type == SpanEventType.END]
258+
if starts:
259+
self._dispatch(starts, SpanEventType.START)
260+
if ends:
261+
# Re-check backpressure before the second dispatch so a batch
262+
# carrying both event types can't push _inflight past the cap.
263+
while len(self._inflight) >= self._concurrency:
264+
await asyncio.wait(set(self._inflight), return_when=asyncio.FIRST_COMPLETED)
265+
self._dispatch(ends, SpanEventType.END)
266+
267+
def _dispatch(self, items: list[_SpanQueueItem], event_type: SpanEventType) -> None:
268+
"""Spawn a background task to export ``items``.
269+
270+
END sends snapshot the currently in-flight START tasks and wait for them
271+
before issuing, preserving the per-span START-before-END invariant.
272+
"""
273+
barrier = tuple(self._inflight_starts) if event_type == SpanEventType.END else ()
274+
task = asyncio.create_task(self._run_send(items, barrier))
275+
self._inflight.add(task)
276+
task.add_done_callback(self._inflight.discard)
277+
if event_type == SpanEventType.START:
278+
self._inflight_starts.add(task)
279+
task.add_done_callback(self._inflight_starts.discard)
280+
281+
async def _run_send(self, items: list[_SpanQueueItem], barrier: tuple[asyncio.Task[None], ...]) -> None:
282+
try:
283+
if barrier:
284+
# Wait for the START sends this END batch depends on. Their
285+
# exceptions are irrelevant here — we only need them finished.
286+
await asyncio.gather(*barrier, return_exceptions=True)
287+
phase_start = time.perf_counter()
288+
await self._process_items(items)
289+
if items:
290+
_metrics.record_batch_phase(
291+
phase=items[0].event_type.value,
292+
size=len(items),
293+
duration_ms=(time.perf_counter() - phase_start) * 1000.0,
294+
)
295+
finally:
296+
# Mark every item done so shutdown's queue.join() can complete only
297+
# once all sends (and their retries) have finished.
298+
for _ in items:
299+
self._queue.task_done()
249300

250301
async def _process_items(self, items: list[_SpanQueueItem]) -> None:
251302
"""Dispatch a batch of same-event-type items to each processor in one call.
@@ -277,10 +328,12 @@ async def _handle(
277328
) -> None:
278329
spans = [item.span for item in items]
279330
try:
280-
if event_type == SpanEventType.START:
281-
await p.on_spans_start(spans)
282-
else:
283-
await p.on_spans_end(spans)
331+
# Hold a concurrency slot only for the duration of the HTTP call.
332+
async with self._send_sema:
333+
if event_type == SpanEventType.START:
334+
await p.on_spans_start(spans)
335+
else:
336+
await p.on_spans_end(spans)
284337
except Exception as exc:
285338
self._handle_failure(p, items, event_type, exc)
286339

@@ -334,7 +387,14 @@ def _handle_failure(
334387

335388
def _reenqueue(self, item: _SpanQueueItem, p: AsyncTracingProcessor) -> None:
336389
"""Put a single failed item back on the queue, scoped to the processor
337-
that failed, with an incremented attempt count."""
390+
that failed, with an incremented attempt count.
391+
392+
NOTE: a re-enqueued START goes to the *back* of the queue. If an END
393+
for the same span is dispatched concurrently before this START is picked
394+
up again, the END's barrier snapshot won't contain it, breaking the
395+
START-before-END guarantee for that span. This is benign at the default
396+
``max_retries=1`` (retries disabled) but must be addressed before
397+
enabling retries by default."""
338398
try:
339399
self._queue.put_nowait(
340400
_SpanQueueItem(
@@ -354,11 +414,17 @@ def _reenqueue(self, item: _SpanQueueItem, p: AsyncTracingProcessor) -> None:
354414

355415
async def shutdown(self, timeout: float = 30.0) -> None:
356416
self._stopping = True
357-
if self._queue.empty() and (self._drain_task is None or self._drain_task.done()):
417+
drain_idle = self._drain_task is None or self._drain_task.done()
418+
if self._queue.empty() and drain_idle and not self._inflight:
358419
return
420+
421+
timed_out = False
359422
try:
423+
# join() returns once every enqueued (and re-enqueued) item has been
424+
# marked done by its send task.
360425
await asyncio.wait_for(self._queue.join(), timeout=timeout)
361426
except asyncio.TimeoutError:
427+
timed_out = True
362428
remaining = self._queue.qsize()
363429
logger.warning(
364430
"Span queue shutdown timed out after %.1fs with %d items remaining", timeout, remaining
@@ -371,6 +437,15 @@ async def shutdown(self, timeout: float = 30.0) -> None:
371437
except asyncio.CancelledError:
372438
pass
373439

440+
# Clean up any in-flight send tasks. On a clean shutdown these are
441+
# already finishing; on timeout, cancel the stragglers so we don't hang.
442+
inflight = list(self._inflight)
443+
if inflight:
444+
if timed_out:
445+
for task in inflight:
446+
task.cancel()
447+
await asyncio.gather(*inflight, return_exceptions=True)
448+
374449

375450
_default_span_queue: AsyncSpanQueue | None = None
376451

tests/lib/core/tracing/test_span_queue.py

Lines changed: 116 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -426,9 +426,10 @@ async def block_first(spans: list[Span]) -> None:
426426
proc.on_spans_start = AsyncMock(side_effect=block_first)
427427
proc.on_spans_end = AsyncMock()
428428

429-
# max_size=1, no linger: the drain pulls item-0 and blocks; item-1 fills
430-
# the queue; items 2 and 3 are dropped.
431-
queue = AsyncSpanQueue(max_size=1, linger_ms=0)
429+
# max_size=1, no linger, concurrency=1: the drain dispatches item-0 and
430+
# then blocks at the in-flight cap; item-1 fills the queue; items 2 and 3
431+
# are dropped.
432+
queue = AsyncSpanQueue(max_size=1, linger_ms=0, concurrency=1)
432433

433434
queue.enqueue(SpanEventType.START, _make_span("s0"), [proc])
434435
await asyncio.sleep(0.02) # let the drain pick up s0 and block
@@ -537,6 +538,118 @@ async def always_503(spans: list[Span]) -> None:
537538
assert queue.dropped_spans == 1
538539

539540

541+
class TestAsyncSpanQueueConcurrency:
542+
"""Span export should issue multiple batch requests concurrently (bounded),
543+
so per-pod egress isn't capped at one in-flight request — while still
544+
guaranteeing a span's START send completes before its END send.
545+
"""
546+
547+
async def test_batches_dispatched_concurrently_up_to_bound(self):
548+
current = 0
549+
max_seen = 0
550+
lock = asyncio.Lock()
551+
552+
async def slow_start(spans: list[Span]) -> None:
553+
nonlocal current, max_seen
554+
async with lock:
555+
current += 1
556+
max_seen = max(max_seen, current)
557+
await asyncio.sleep(0.05)
558+
async with lock:
559+
current -= 1
560+
561+
proc = AsyncMock()
562+
proc.on_spans_start = AsyncMock(side_effect=slow_start)
563+
proc.on_spans_end = AsyncMock()
564+
565+
# batch_size=1 → each span is its own batch/send; concurrency=4 caps
566+
# simultaneous in-flight sends.
567+
queue = AsyncSpanQueue(batch_size=1, linger_ms=0, concurrency=4)
568+
for i in range(8):
569+
queue.enqueue(SpanEventType.START, _make_span(f"s{i}"), [proc])
570+
571+
await queue.shutdown()
572+
573+
assert proc.on_spans_start.call_count == 8
574+
assert 2 <= max_seen <= 4, f"expected bounded concurrency (2..4), saw {max_seen}"
575+
576+
async def test_concurrency_one_serializes(self):
577+
current = 0
578+
max_seen = 0
579+
lock = asyncio.Lock()
580+
581+
async def slow_start(spans: list[Span]) -> None:
582+
nonlocal current, max_seen
583+
async with lock:
584+
current += 1
585+
max_seen = max(max_seen, current)
586+
await asyncio.sleep(0.03)
587+
async with lock:
588+
current -= 1
589+
590+
proc = AsyncMock()
591+
proc.on_spans_start = AsyncMock(side_effect=slow_start)
592+
proc.on_spans_end = AsyncMock()
593+
594+
queue = AsyncSpanQueue(batch_size=1, linger_ms=0, concurrency=1)
595+
for i in range(4):
596+
queue.enqueue(SpanEventType.START, _make_span(f"s{i}"), [proc])
597+
598+
await queue.shutdown()
599+
600+
assert max_seen == 1, f"concurrency=1 must serialize sends, saw {max_seen}"
601+
602+
async def test_concurrent_is_faster_than_serial(self):
603+
async def slow_start(spans: list[Span]) -> None:
604+
await asyncio.sleep(0.05)
605+
606+
proc = AsyncMock()
607+
proc.on_spans_start = AsyncMock(side_effect=slow_start)
608+
proc.on_spans_end = AsyncMock()
609+
610+
queue = AsyncSpanQueue(batch_size=1, linger_ms=0, concurrency=8)
611+
for i in range(8):
612+
queue.enqueue(SpanEventType.START, _make_span(f"s{i}"), [proc])
613+
614+
start = time.monotonic()
615+
await queue.shutdown()
616+
elapsed = time.monotonic() - start
617+
618+
serial = 8 * 0.05
619+
assert elapsed < serial * 0.5, f"concurrent drain took {elapsed:.3f}s; serial would be {serial:.3f}s"
620+
621+
async def test_end_waits_for_start_of_same_span(self):
622+
"""The per-span ordering invariant: a span's END upsert must not be sent
623+
until its START upsert has completed, even with concurrency enabled."""
624+
log: list[tuple[str, str]] = []
625+
626+
async def on_start(spans: list[Span]) -> None:
627+
log.append(("start_enter", spans[0].id))
628+
await asyncio.sleep(0.05)
629+
log.append(("start_exit", spans[0].id))
630+
631+
async def on_end(spans: list[Span]) -> None:
632+
log.append(("end_enter", spans[0].id))
633+
await asyncio.sleep(0.01)
634+
log.append(("end_exit", spans[0].id))
635+
636+
proc = AsyncMock()
637+
proc.on_spans_start = AsyncMock(side_effect=on_start)
638+
proc.on_spans_end = AsyncMock(side_effect=on_end)
639+
640+
queue = AsyncSpanQueue(batch_size=1, linger_ms=0, concurrency=4)
641+
queue.enqueue(SpanEventType.START, _make_span("A"), [proc])
642+
await asyncio.sleep(0.01) # let the START send begin (and block on sleep)
643+
queue.enqueue(SpanEventType.END, _make_span("A"), [proc])
644+
645+
await queue.shutdown()
646+
647+
# END must not enter until START has exited for the same span.
648+
start_exit = log.index(("start_exit", "A"))
649+
end_enter = log.index(("end_enter", "A"))
650+
assert start_exit < end_enter, f"END began before START completed: {log}"
651+
652+
540653
class TestAsyncSpanQueueIntegration:
541654
async def test_integration_with_async_trace(self):
542655
call_log: list[tuple[str, str]] = []

0 commit comments

Comments
 (0)