Skip to content

Commit f43dac4

Browse files
authored
fix(tracing): Fix memory leak in SGP tracing processors (#302)
* fix(tracing): Fix memory leak in SGP tracing processors SGPSyncTracingProcessor and SGPAsyncTracingProcessor accumulated spans in self._spans dict on every request but never removed them, since on_span_end() used dict.get() (read-only) instead of dict.pop() (read-and-remove). The only cleanup was in shutdown() which is never called. After this fix, spans are removed from the dict when they complete, preventing unbounded memory growth. * fix(tests): Use context manager patches for sync processor tests @patch decorators on _make_processor expired before test bodies ran, so on_span_start/on_span_end hit the real create_span and flush. Refactored to @staticmethod with 'with patch(...)' context managers matching the async test class pattern.
1 parent 74cb1a0 commit f43dac4

File tree

5 files changed

+169
-9
lines changed

5 files changed

+169
-9
lines changed

src/agentex/lib/core/tracing/processors/sgp_tracing_processor.py

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ def __init__(self, config: SGPTracingProcessorConfig):
3030
disabled = config.sgp_api_key == "" or config.sgp_account_id == ""
3131
tracing.init(
3232
SGPClient(
33-
api_key=config.sgp_api_key,
33+
api_key=config.sgp_api_key,
3434
account_id=config.sgp_account_id,
3535
base_url=config.sgp_base_url,
3636
),
@@ -72,11 +72,9 @@ def on_span_start(self, span: Span) -> None:
7272

7373
@override
7474
def on_span_end(self, span: Span) -> None:
75-
sgp_span = self._spans.get(span.id)
75+
sgp_span = self._spans.pop(span.id, None)
7676
if sgp_span is None:
77-
logger.warning(
78-
f"Span {span.id} not found in stored spans, skipping span end"
79-
)
77+
logger.warning(f"Span {span.id} not found in stored spans, skipping span end")
8078
return
8179

8280
self._add_source_to_span(span)
@@ -151,11 +149,9 @@ async def on_span_start(self, span: Span) -> None:
151149

152150
@override
153151
async def on_span_end(self, span: Span) -> None:
154-
sgp_span = self._spans.get(span.id)
152+
sgp_span = self._spans.pop(span.id, None)
155153
if sgp_span is None:
156-
logger.warning(
157-
f"Span {span.id} not found in stored spans, skipping span end"
158-
)
154+
logger.warning(f"Span {span.id} not found in stored spans, skipping span end")
159155
return
160156

161157
self._add_source_to_span(span)

tests/lib/core/__init__.py

Whitespace-only changes.

tests/lib/core/tracing/__init__.py

Whitespace-only changes.

tests/lib/core/tracing/processors/__init__.py

Whitespace-only changes.
Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
from __future__ import annotations
2+
3+
import uuid
4+
from datetime import UTC, datetime
5+
from unittest.mock import AsyncMock, MagicMock, patch
6+
7+
from agentex.types.span import Span
8+
from agentex.lib.types.tracing import SGPTracingProcessorConfig
9+
10+
MODULE = "agentex.lib.core.tracing.processors.sgp_tracing_processor"
11+
12+
13+
def _make_config() -> SGPTracingProcessorConfig:
14+
return SGPTracingProcessorConfig(
15+
sgp_api_key="test-key",
16+
sgp_account_id="test-account",
17+
)
18+
19+
20+
def _make_span(span_id: str | None = None) -> Span:
21+
return Span(
22+
id=span_id or str(uuid.uuid4()),
23+
name="test-span",
24+
start_time=datetime.now(UTC),
25+
trace_id="trace-1",
26+
)
27+
28+
29+
def _make_mock_sgp_span() -> MagicMock:
30+
sgp_span = MagicMock()
31+
sgp_span.to_request_params.return_value = {"mock": "params"}
32+
sgp_span.start_time = None
33+
sgp_span.end_time = None
34+
sgp_span.output = None
35+
sgp_span.metadata = None
36+
return sgp_span
37+
38+
39+
# ---------------------------------------------------------------------------
40+
# Sync processor tests
41+
# ---------------------------------------------------------------------------
42+
43+
44+
class TestSGPSyncTracingProcessorMemoryLeak:
45+
@staticmethod
46+
def _make_processor():
47+
mock_env = MagicMock()
48+
mock_env.refresh.return_value = MagicMock(ACP_TYPE=None, AGENT_NAME=None, AGENT_ID=None)
49+
mock_create_span = MagicMock(side_effect=lambda **kwargs: _make_mock_sgp_span())
50+
51+
with patch(f"{MODULE}.EnvironmentVariables", mock_env), \
52+
patch(f"{MODULE}.SGPClient"), \
53+
patch(f"{MODULE}.tracing"), \
54+
patch(f"{MODULE}.flush_queue"), \
55+
patch(f"{MODULE}.create_span", mock_create_span):
56+
from agentex.lib.core.tracing.processors.sgp_tracing_processor import (
57+
SGPSyncTracingProcessor,
58+
)
59+
60+
processor = SGPSyncTracingProcessor(_make_config())
61+
62+
return processor, mock_create_span
63+
64+
def test_spans_not_leaked_after_completed_lifecycle(self):
65+
processor, _ = self._make_processor()
66+
67+
with patch(f"{MODULE}.create_span", side_effect=lambda **kw: _make_mock_sgp_span()):
68+
for _ in range(100):
69+
span = _make_span()
70+
processor.on_span_start(span)
71+
span.end_time = datetime.now(UTC)
72+
processor.on_span_end(span)
73+
74+
assert len(processor._spans) == 0, (
75+
f"Expected 0 spans after 100 complete lifecycles, got {len(processor._spans)} — memory leak!"
76+
)
77+
78+
def test_spans_present_during_active_lifecycle(self):
79+
processor, _ = self._make_processor()
80+
81+
with patch(f"{MODULE}.create_span", side_effect=lambda **kw: _make_mock_sgp_span()):
82+
span = _make_span()
83+
processor.on_span_start(span)
84+
assert len(processor._spans) == 1, "Span should be tracked while active"
85+
86+
span.end_time = datetime.now(UTC)
87+
processor.on_span_end(span)
88+
assert len(processor._spans) == 0, "Span should be removed after end"
89+
90+
def test_span_end_for_unknown_span_is_noop(self):
91+
processor, _ = self._make_processor()
92+
93+
span = _make_span()
94+
# End a span that was never started — should not raise
95+
span.end_time = datetime.now(UTC)
96+
processor.on_span_end(span)
97+
98+
assert len(processor._spans) == 0
99+
100+
101+
# ---------------------------------------------------------------------------
102+
# Async processor tests
103+
# ---------------------------------------------------------------------------
104+
105+
106+
class TestSGPAsyncTracingProcessorMemoryLeak:
107+
@staticmethod
108+
def _make_processor():
109+
mock_env = MagicMock()
110+
mock_env.refresh.return_value = MagicMock(ACP_TYPE=None, AGENT_NAME=None, AGENT_ID=None)
111+
mock_create_span = MagicMock(side_effect=lambda **kwargs: _make_mock_sgp_span())
112+
113+
mock_async_client = MagicMock()
114+
mock_async_client.spans.upsert_batch = AsyncMock()
115+
116+
with patch(f"{MODULE}.EnvironmentVariables", mock_env), \
117+
patch(f"{MODULE}.create_span", mock_create_span), \
118+
patch(f"{MODULE}.AsyncSGPClient", return_value=mock_async_client):
119+
from agentex.lib.core.tracing.processors.sgp_tracing_processor import (
120+
SGPAsyncTracingProcessor,
121+
)
122+
123+
processor = SGPAsyncTracingProcessor(_make_config())
124+
125+
# Wire up the mock client after construction (constructor stores it)
126+
processor.sgp_async_client = mock_async_client
127+
128+
# Keep create_span mock active for on_span_start calls
129+
return processor, mock_create_span
130+
131+
async def test_spans_not_leaked_after_completed_lifecycle(self):
132+
processor, _ = self._make_processor()
133+
134+
with patch(f"{MODULE}.create_span", side_effect=lambda **kw: _make_mock_sgp_span()):
135+
for _ in range(100):
136+
span = _make_span()
137+
await processor.on_span_start(span)
138+
span.end_time = datetime.now(UTC)
139+
await processor.on_span_end(span)
140+
141+
assert len(processor._spans) == 0, (
142+
f"Expected 0 spans after 100 complete lifecycles, got {len(processor._spans)} — memory leak!"
143+
)
144+
145+
async def test_spans_present_during_active_lifecycle(self):
146+
processor, _ = self._make_processor()
147+
148+
with patch(f"{MODULE}.create_span", side_effect=lambda **kw: _make_mock_sgp_span()):
149+
span = _make_span()
150+
await processor.on_span_start(span)
151+
assert len(processor._spans) == 1, "Span should be tracked while active"
152+
153+
span.end_time = datetime.now(UTC)
154+
await processor.on_span_end(span)
155+
assert len(processor._spans) == 0, "Span should be removed after end"
156+
157+
async def test_span_end_for_unknown_span_is_noop(self):
158+
processor, _ = self._make_processor()
159+
160+
span = _make_span()
161+
span.end_time = datetime.now(UTC)
162+
await processor.on_span_end(span)
163+
164+
assert len(processor._spans) == 0

0 commit comments

Comments
 (0)