2020_DEFAULT_MAX_SIZE = 0
2121# Total attempts per batch for a *transient* failure (1 == no retry).
2222_DEFAULT_MAX_RETRIES = 1
23+ # Max number of batch-export HTTP requests in flight at once. The export
24+ # backend (EGP) processes each upsert_batch in ~150ms but serves many requests
25+ # concurrently; issuing one batch at a time caps per-pod egress at ~1/latency.
26+ # Sending several concurrently lets a pod keep up with span production under
27+ # load. ``1`` restores the old strictly-serial behavior.
28+ _DEFAULT_CONCURRENCY = 3
2329# HTTP statuses worth retrying at the queue level. These are explicit
2430# backpressure / transient signals; everything else (esp. 401/403/4xx auth and
2531# validation errors) is a permanent failure that re-enqueuing cannot fix. Note
@@ -76,15 +82,23 @@ class AsyncSpanQueue:
7682 """Background FIFO queue for async span processing.
7783
7884 Span events are enqueued synchronously (non-blocking) and drained by a
79- background task. Items are processed in batches: all START events in a
80- batch are flushed concurrently, then all END events, so that per-span
81- start-before-end ordering is preserved while HTTP calls for independent
82- spans execute in parallel.
83-
84- Once the drain loop picks up the first item, it lingers up to
85- ``linger_ms`` waiting for more items to coalesce into the same batch.
86- Without the linger the drain almost always returned size-1 batches under
87- real agent workloads, because spans typically arrive a few ms apart.
85+ background task. The drain coalesces ready events into batches and
86+ *dispatches* each batch's export as its own task, so up to ``concurrency``
87+ batch requests can be in flight at once. This matters because each
88+ ``upsert_batch`` HTTP call takes tens-to-hundreds of ms server-side; issuing
89+ them one at a time caps a pod's egress at ~1/latency and lets a backlog
90+ build under load.
91+
92+ Ordering guarantee: a span's START export always completes before its END
93+ export is issued. END batches wait on the START batches that were in flight
94+ when they were formed; because a span's START is always enqueued before its
95+ END, that span's START send is either still in flight (and waited on) or
96+ already finished. Independent spans export fully concurrently.
97+
98+ Once the drain loop picks up the first item, it lingers up to ``linger_ms``
99+ waiting for more items to coalesce into the same batch. Without the linger
100+ the drain almost always returned size-1 batches under real agent workloads,
101+ because spans typically arrive a few ms apart.
88102
89103 Reliability:
90104 - ``max_size`` bounds the queue. When full, new events are dropped and
@@ -101,6 +115,7 @@ def __init__(
101115 linger_ms : int | None = None ,
102116 max_size : int | None = None ,
103117 max_retries : int | None = None ,
118+ concurrency : int | None = None ,
104119 ) -> None :
105120 resolved_max_size = (
106121 _read_int_env ("AGENTEX_SPAN_QUEUE_MAX_SIZE" , _DEFAULT_MAX_SIZE ) if max_size is None else max (0 , max_size )
@@ -115,6 +130,17 @@ def __init__(
115130 if max_retries is None
116131 else max (1 , max_retries )
117132 )
133+ self ._concurrency = (
134+ _read_int_env ("AGENTEX_SPAN_QUEUE_CONCURRENCY" , _DEFAULT_CONCURRENCY , minimum = 1 )
135+ if concurrency is None
136+ else max (1 , concurrency )
137+ )
138+ # Bounds concurrent export HTTP requests.
139+ self ._send_sema = asyncio .Semaphore (self ._concurrency )
140+ # Outstanding dispatched send tasks, and the subset that are START
141+ # sends (END sends wait on these to preserve per-span ordering).
142+ self ._inflight : set [asyncio .Task [None ]] = set ()
143+ self ._inflight_starts : set [asyncio .Task [None ]] = set ()
118144 # Total spans dropped for any reason (full queue, shutdown, permanent
119145 # failure, exhausted retries). Surfaced for metrics/observability so
120146 # span loss stops being silent.
@@ -169,6 +195,11 @@ def _ensure_drain_running(self) -> None:
169195
170196 async def _drain_loop (self ) -> None :
171197 while True :
198+ # Backpressure: cap the number of in-flight send tasks so the drain
199+ # does not run unboundedly ahead of the exporters.
200+ while len (self ._inflight ) >= self ._concurrency :
201+ await asyncio .wait (set (self ._inflight ), return_when = asyncio .FIRST_COMPLETED )
202+
172203 # Block until at least one item is available.
173204 first = await self ._queue .get ()
174205 batch : list [_SpanQueueItem ] = [first ]
@@ -196,22 +227,47 @@ async def _drain_loop(self) -> None:
196227 except asyncio .QueueEmpty :
197228 break
198229
199- try :
200- # Separate START and END events. Processing all STARTs before
201- # ENDs ensures that on_span_start completes before on_span_end
202- # for any span whose both events land in the same batch.
203- starts = [i for i in batch if i .event_type == SpanEventType .START ]
204- ends = [i for i in batch if i .event_type == SpanEventType .END ]
205-
206- if starts :
207- await self ._process_items (starts )
208- if ends :
209- await self ._process_items (ends )
210- finally :
211- for _ in batch :
212- self ._queue .task_done ()
213- # Release span data for GC.
214- batch .clear ()
230+ # Separate START and END events and dispatch each as its own send
231+ # task. Dispatching STARTs first (so they are registered before the
232+ # END snapshot) guarantees an END never outruns a START of the same
233+ # span whose events land in this batch.
234+ starts = [i for i in batch if i .event_type == SpanEventType .START ]
235+ ends = [i for i in batch if i .event_type == SpanEventType .END ]
236+ if starts :
237+ self ._dispatch (starts , SpanEventType .START )
238+ if ends :
239+ # Re-check backpressure before the second dispatch so a batch
240+ # carrying both event types can't push _inflight past the cap.
241+ while len (self ._inflight ) >= self ._concurrency :
242+ await asyncio .wait (set (self ._inflight ), return_when = asyncio .FIRST_COMPLETED )
243+ self ._dispatch (ends , SpanEventType .END )
244+
245+ def _dispatch (self , items : list [_SpanQueueItem ], event_type : SpanEventType ) -> None :
246+ """Spawn a background task to export ``items``.
247+
248+ END sends snapshot the currently in-flight START tasks and wait for them
249+ before issuing, preserving the per-span START-before-END invariant.
250+ """
251+ barrier = tuple (self ._inflight_starts ) if event_type == SpanEventType .END else ()
252+ task = asyncio .create_task (self ._run_send (items , barrier ))
253+ self ._inflight .add (task )
254+ task .add_done_callback (self ._inflight .discard )
255+ if event_type == SpanEventType .START :
256+ self ._inflight_starts .add (task )
257+ task .add_done_callback (self ._inflight_starts .discard )
258+
259+ async def _run_send (self , items : list [_SpanQueueItem ], barrier : tuple [asyncio .Task [None ], ...]) -> None :
260+ try :
261+ if barrier :
262+ # Wait for the START sends this END batch depends on. Their
263+ # exceptions are irrelevant here — we only need them finished.
264+ await asyncio .gather (* barrier , return_exceptions = True )
265+ await self ._process_items (items )
266+ finally :
267+ # Mark every item done so shutdown's queue.join() can complete only
268+ # once all sends (and their retries) have finished.
269+ for _ in items :
270+ self ._queue .task_done ()
215271
216272 async def _process_items (self , items : list [_SpanQueueItem ]) -> None :
217273 """Dispatch a batch of same-event-type items to each processor in one call.
@@ -243,10 +299,12 @@ async def _handle(
243299 ) -> None :
244300 spans = [item .span for item in items ]
245301 try :
246- if event_type == SpanEventType .START :
247- await p .on_spans_start (spans )
248- else :
249- await p .on_spans_end (spans )
302+ # Hold a concurrency slot only for the duration of the HTTP call.
303+ async with self ._send_sema :
304+ if event_type == SpanEventType .START :
305+ await p .on_spans_start (spans )
306+ else :
307+ await p .on_spans_end (spans )
250308 except Exception as exc :
251309 self ._handle_failure (p , items , event_type , exc )
252310
@@ -288,7 +346,14 @@ def _handle_failure(
288346
289347 def _reenqueue (self , item : _SpanQueueItem , p : AsyncTracingProcessor ) -> None :
290348 """Put a single failed item back on the queue, scoped to the processor
291- that failed, with an incremented attempt count."""
349+ that failed, with an incremented attempt count.
350+
351+ NOTE: a re-enqueued START goes to the *back* of the queue. If an END
352+ for the same span is dispatched concurrently before this START is picked
353+ up again, the END's barrier snapshot won't contain it, breaking the
354+ START-before-END guarantee for that span. This is benign at the default
355+ ``max_retries=1`` (retries disabled) but must be addressed before
356+ enabling retries by default."""
292357 try :
293358 self ._queue .put_nowait (
294359 _SpanQueueItem (
@@ -307,21 +372,37 @@ def _reenqueue(self, item: _SpanQueueItem, p: AsyncTracingProcessor) -> None:
307372
308373 async def shutdown (self , timeout : float = 30.0 ) -> None :
309374 self ._stopping = True
310- if self ._queue .empty () and (self ._drain_task is None or self ._drain_task .done ()):
375+ drain_idle = self ._drain_task is None or self ._drain_task .done ()
376+ if self ._queue .empty () and drain_idle and not self ._inflight :
311377 return
378+
379+ timed_out = False
312380 try :
381+ # join() returns once every enqueued (and re-enqueued) item has been
382+ # marked done by its send task.
313383 await asyncio .wait_for (self ._queue .join (), timeout = timeout )
314384 except asyncio .TimeoutError :
385+ timed_out = True
315386 logger .warning (
316387 "Span queue shutdown timed out after %.1fs with %d items remaining" , timeout , self ._queue .qsize ()
317388 )
389+
318390 if self ._drain_task is not None and not self ._drain_task .done ():
319391 self ._drain_task .cancel ()
320392 try :
321393 await self ._drain_task
322394 except asyncio .CancelledError :
323395 pass
324396
397+ # Clean up any in-flight send tasks. On a clean shutdown these are
398+ # already finishing; on timeout, cancel the stragglers so we don't hang.
399+ inflight = list (self ._inflight )
400+ if inflight :
401+ if timed_out :
402+ for task in inflight :
403+ task .cancel ()
404+ await asyncio .gather (* inflight , return_exceptions = True )
405+
325406
326407_default_span_queue : AsyncSpanQueue | None = None
327408
0 commit comments