Skip to content

Commit 3a60add

Browse files
authored
feat(tracing): Add background queue for async span processing (#303)
* fix(tracing): Fix memory leak in SGP tracing processors SGPSyncTracingProcessor and SGPAsyncTracingProcessor accumulated spans in self._spans dict on every request but never removed them, since on_span_end() used dict.get() (read-only) instead of dict.pop() (read-and-remove). The only cleanup was in shutdown() which is never called. After this fix, spans are removed from the dict when they complete, preventing unbounded memory growth. * feat(tracing): Add background queue for async span processing AsyncTrace.start_span() and end_span() previously awaited processor HTTP calls inline, adding 8 blocking round-trips per request. This moves processor calls into a background FIFO queue so callers enqueue and return immediately. - Add AsyncSpanQueue with sequential drain loop, error logging, and graceful shutdown with configurable timeout - Wire shutdown into FastAPI lifespan teardown in base_acp_server - FIFO ordering preserves start-before-end invariant required by SGPAsyncTracingProcessor's internal _spans dict * fix(tests): Use context manager patches for sync processor tests @patch decorators on _make_processor expired before test bodies ran, so on_span_start/on_span_end hit the real create_span and flush. Refactored to @staticmethod with 'with patch(...)' context managers matching the async test class pattern. * fix(tests): Satisfy pyright type check for span output assignment Use explicit dict[str, object] annotation to avoid invariance error when assigning to Span.output (Dict[str, object] | ... | None). * fix(tracing): Deep-copy spans before enqueuing to background queue Processors like SGPAsyncTracingProcessor mutate span.data in-place via _add_source_to_span. With async background processing, this raced with the caller who holds a reference to the same span object. Deep-copying via model_copy(deep=True) decouples the two.
1 parent f43dac4 commit 3a60add

File tree

6 files changed

+343
-10
lines changed

6 files changed

+343
-10
lines changed
Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,19 @@
11
from agentex.types.span import Span
22
from agentex.lib.core.tracing.trace import Trace, AsyncTrace
33
from agentex.lib.core.tracing.tracer import Tracer, AsyncTracer
4+
from agentex.lib.core.tracing.span_queue import (
5+
AsyncSpanQueue,
6+
get_default_span_queue,
7+
shutdown_default_span_queue,
8+
)
49

5-
__all__ = ["Trace", "AsyncTrace", "Span", "Tracer", "AsyncTracer"]
10+
__all__ = [
11+
"Trace",
12+
"AsyncTrace",
13+
"Span",
14+
"Tracer",
15+
"AsyncTracer",
16+
"AsyncSpanQueue",
17+
"get_default_span_queue",
18+
"shutdown_default_span_queue",
19+
]
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
from __future__ import annotations
2+
3+
import asyncio
4+
from enum import Enum
5+
from dataclasses import dataclass
6+
7+
from agentex.types.span import Span
8+
from agentex.lib.utils.logging import make_logger
9+
from agentex.lib.core.tracing.processors.tracing_processor_interface import (
10+
AsyncTracingProcessor,
11+
)
12+
13+
logger = make_logger(__name__)
14+
15+
16+
class SpanEventType(str, Enum):
17+
START = "start"
18+
END = "end"
19+
20+
21+
@dataclass
22+
class _SpanQueueItem:
23+
event_type: SpanEventType
24+
span: Span
25+
processors: list[AsyncTracingProcessor]
26+
27+
28+
class AsyncSpanQueue:
29+
"""Background FIFO queue for async span processing.
30+
31+
Span events are enqueued synchronously (non-blocking) and processed
32+
sequentially by a background drain task. This keeps tracing HTTP calls
33+
off the critical request path while preserving start-before-end ordering.
34+
"""
35+
36+
def __init__(self) -> None:
37+
self._queue: asyncio.Queue[_SpanQueueItem] = asyncio.Queue()
38+
self._drain_task: asyncio.Task[None] | None = None
39+
self._stopping = False
40+
41+
def enqueue(
42+
self,
43+
event_type: SpanEventType,
44+
span: Span,
45+
processors: list[AsyncTracingProcessor],
46+
) -> None:
47+
if self._stopping:
48+
logger.warning("Span queue is shutting down, dropping %s event for span %s", event_type.value, span.id)
49+
return
50+
self._ensure_drain_running()
51+
self._queue.put_nowait(_SpanQueueItem(event_type=event_type, span=span, processors=processors))
52+
53+
def _ensure_drain_running(self) -> None:
54+
if self._drain_task is None or self._drain_task.done():
55+
self._drain_task = asyncio.create_task(self._drain_loop())
56+
57+
async def _drain_loop(self) -> None:
58+
while True:
59+
item = await self._queue.get()
60+
try:
61+
if item.event_type == SpanEventType.START:
62+
coros = [p.on_span_start(item.span) for p in item.processors]
63+
else:
64+
coros = [p.on_span_end(item.span) for p in item.processors]
65+
results = await asyncio.gather(*coros, return_exceptions=True)
66+
for result in results:
67+
if isinstance(result, Exception):
68+
logger.error(
69+
"Tracing processor error during %s for span %s",
70+
item.event_type.value,
71+
item.span.id,
72+
exc_info=result,
73+
)
74+
except Exception:
75+
logger.exception("Unexpected error in span queue drain loop for span %s", item.span.id)
76+
finally:
77+
self._queue.task_done()
78+
79+
async def shutdown(self, timeout: float = 30.0) -> None:
80+
self._stopping = True
81+
if self._queue.empty() and (self._drain_task is None or self._drain_task.done()):
82+
return
83+
try:
84+
await asyncio.wait_for(self._queue.join(), timeout=timeout)
85+
except asyncio.TimeoutError:
86+
logger.warning(
87+
"Span queue shutdown timed out after %.1fs with %d items remaining", timeout, self._queue.qsize()
88+
)
89+
if self._drain_task is not None and not self._drain_task.done():
90+
self._drain_task.cancel()
91+
try:
92+
await self._drain_task
93+
except asyncio.CancelledError:
94+
pass
95+
96+
97+
_default_span_queue: AsyncSpanQueue | None = None
98+
99+
100+
def get_default_span_queue() -> AsyncSpanQueue:
101+
global _default_span_queue
102+
if _default_span_queue is None:
103+
_default_span_queue = AsyncSpanQueue()
104+
return _default_span_queue
105+
106+
107+
async def shutdown_default_span_queue(timeout: float = 30.0) -> None:
108+
global _default_span_queue
109+
if _default_span_queue is not None:
110+
await _default_span_queue.shutdown(timeout=timeout)
111+
_default_span_queue = None

src/agentex/lib/core/tracing/trace.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
from __future__ import annotations
22

33
import uuid
4-
import asyncio
54
from typing import Any, AsyncGenerator
65
from datetime import UTC, datetime
76
from contextlib import contextmanager, asynccontextmanager
@@ -12,6 +11,11 @@
1211
from agentex.types.span import Span
1312
from agentex.lib.utils.logging import make_logger
1413
from agentex.lib.utils.model_utils import recursive_model_dump
14+
from agentex.lib.core.tracing.span_queue import (
15+
SpanEventType,
16+
AsyncSpanQueue,
17+
get_default_span_queue,
18+
)
1519
from agentex.lib.core.tracing.processors.tracing_processor_interface import (
1620
SyncTracingProcessor,
1721
AsyncTracingProcessor,
@@ -173,17 +177,20 @@ def __init__(
173177
processors: list[AsyncTracingProcessor],
174178
client: AsyncAgentex,
175179
trace_id: str | None = None,
180+
span_queue: AsyncSpanQueue | None = None,
176181
):
177182
"""
178183
Initialize a new trace with the specified trace ID.
179184
180185
Args:
181186
trace_id: Required trace ID to use for this trace.
182187
processors: Optional list of tracing processors to use for this trace.
188+
span_queue: Optional span queue for background processing.
183189
"""
184190
self.processors = processors
185191
self.client = client
186192
self.trace_id = trace_id
193+
self._span_queue = span_queue or get_default_span_queue()
187194

188195
async def start_span(
189196
self,
@@ -225,9 +232,7 @@ async def start_span(
225232
)
226233

227234
if self.processors:
228-
await asyncio.gather(
229-
*[processor.on_span_start(span) for processor in self.processors]
230-
)
235+
self._span_queue.enqueue(SpanEventType.START, span.model_copy(deep=True), self.processors)
231236

232237
return span
233238

@@ -252,9 +257,7 @@ async def end_span(
252257
span.data = recursive_model_dump(span.data) if span.data else None
253258

254259
if self.processors:
255-
await asyncio.gather(
256-
*[processor.on_span_end(span) for processor in self.processors]
257-
)
260+
self._span_queue.enqueue(SpanEventType.END, span.model_copy(deep=True), self.processors)
258261

259262
return span
260263

src/agentex/lib/core/tracing/tracer.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
from agentex import Agentex, AsyncAgentex
44
from agentex.lib.core.tracing.trace import Trace, AsyncTrace
5+
from agentex.lib.core.tracing.span_queue import AsyncSpanQueue
56
from agentex.lib.core.tracing.tracing_processor_manager import (
67
get_sync_tracing_processors,
78
get_async_tracing_processors,
@@ -55,12 +56,13 @@ def __init__(self, client: AsyncAgentex):
5556
"""
5657
self.client = client
5758

58-
def trace(self, trace_id: str | None = None) -> AsyncTrace:
59+
def trace(self, trace_id: str | None = None, span_queue: AsyncSpanQueue | None = None) -> AsyncTrace:
5960
"""
6061
Create a new trace with the given trace ID.
6162
6263
Args:
6364
trace_id: The trace ID to use.
65+
span_queue: Optional span queue for background processing.
6466
6567
Returns:
6668
A new AsyncTrace instance.
@@ -69,4 +71,5 @@ def trace(self, trace_id: str | None = None) -> AsyncTrace:
6971
processors=get_async_tracing_processors(),
7072
client=self.client,
7173
trace_id=trace_id,
74+
span_queue=span_queue,
7275
)

src/agentex/lib/sdk/fastacp/base/base_acp_server.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
from agentex.lib.environment_variables import EnvironmentVariables, refreshed_environment_variables
3333
from agentex.types.task_message_update import TaskMessageUpdate, StreamTaskMessageFull
3434
from agentex.types.task_message_content import TaskMessageContent
35+
from agentex.lib.core.tracing.span_queue import shutdown_default_span_queue
3536
from agentex.lib.sdk.fastacp.base.constants import (
3637
FASTACP_HEADER_SKIP_EXACT,
3738
FASTACP_HEADER_SKIP_PREFIXES,
@@ -103,7 +104,10 @@ async def lifespan_context(app: FastAPI): # noqa: ARG001
103104
else:
104105
logger.warning("AGENTEX_BASE_URL not set, skipping agent registration")
105106

106-
yield
107+
try:
108+
yield
109+
finally:
110+
await shutdown_default_span_queue()
107111

108112
return lifespan_context
109113

0 commit comments

Comments
 (0)