Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 24 additions & 5 deletions src/claude_agent_sdk/_internal/transport/subprocess_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -540,8 +540,29 @@ async def _read_messages_impl(self) -> AsyncIterator[dict[str, Any]]:
if not json_line:
continue

# Keep accumulating partial JSON until we can parse it
json_buffer += json_line
# Try standalone parse first (most common case).
# This prevents non-JSON lines (e.g. verbose HTTP logs,
# sandbox debug messages) from poisoning the buffer.
if not json_buffer:
try:
data = json.loads(json_line)
yield data
continue
except json.JSONDecodeError:
# Only start buffering if line looks like start of
# a JSON object. The stream-json protocol emits
# JSON objects, so "{" is the expected start.
if json_line.startswith("{"):
json_buffer = json_line
else:
logger.debug(
"Skipping non-JSON line on stdout: %s",
json_line[:100],
)
continue
else:
# Accumulate into existing buffer
json_buffer += json_line

if len(json_buffer) > self._max_buffer_size:
buffer_length = len(json_buffer)
Expand All @@ -558,9 +579,7 @@ async def _read_messages_impl(self) -> AsyncIterator[dict[str, Any]]:
json_buffer = ""
yield data
except json.JSONDecodeError:
# We are speculatively decoding the buffer until we get
# a full JSON object. If there is an actual issue, we
# raise an error after exceeding the configured limit.
# Still incomplete, keep buffering
continue

except anyio.ClosedResourceError:
Expand Down
101 changes: 101 additions & 0 deletions tests/test_subprocess_buffering.py
Original file line number Diff line number Diff line change
Expand Up @@ -327,3 +327,104 @@ async def _test() -> None:
assert messages[2]["subtype"] == "end"

anyio.run(_test)

def test_non_json_lines_dont_poison_buffer(self) -> None:
"""Test that non-JSON lines on stdout don't poison the buffer.

Non-JSON lines (e.g. verbose HTTP logs, sandbox debug messages) should
be skipped without affecting parsing of subsequent JSON messages.
Reproduces the bug described in https://github.com/anthropics/claude-agent-sdk-python/issues/347
"""

async def _test() -> None:
json_obj1 = {"type": "system", "subtype": "init", "session_id": "abc123"}
json_obj2 = {
"type": "assistant",
"content": [{"type": "text", "text": "Hello"}],
}
json_obj3 = {"type": "result", "result": "done", "is_error": False}

# Simulate non-JSON lines interleaved with valid JSON messages
lines = [
json.dumps(json_obj1) + "\n",
"2026-02-19T01:12:58.573Z [DEBUG] configureGlobalMTLS starting\n",
"[Anthropic SDK INFO] post https://api.example.com status 200\n",
json.dumps(json_obj2) + "\n",
"Warning: some random stderr leaked to stdout\n",
json.dumps(json_obj3) + "\n",
]

transport = SubprocessCLITransport(prompt="test", options=make_options())

mock_process = MagicMock()
mock_process.returncode = None
mock_process.wait = AsyncMock(return_value=None)
transport._process = mock_process
transport._stdout_stream = MockTextReceiveStream(lines)
transport._stderr_stream = MockTextReceiveStream([])

messages: list[Any] = []
async for msg in transport.read_messages():
messages.append(msg)

assert len(messages) == 3
assert messages[0]["type"] == "system"
assert messages[0]["session_id"] == "abc123"
assert messages[1]["type"] == "assistant"
assert messages[1]["content"][0]["text"] == "Hello"
assert messages[2]["type"] == "result"
assert messages[2]["result"] == "done"

anyio.run(_test)

def test_non_json_lines_between_split_json(self) -> None:
"""Test non-JSON lines don't corrupt an in-progress buffered JSON parse.

When a JSON object is split across multiple reads and a non-JSON line
appears before it, the buffer should not be corrupted.
"""

async def _test() -> None:
msg_before = json.dumps({"type": "system", "subtype": "start"})

large_msg = {
"type": "assistant",
"message": {
"content": [{"type": "text", "text": "y" * 5000}],
},
}
large_json = json.dumps(large_msg)

msg_after = json.dumps({"type": "system", "subtype": "end"})

# Split the large JSON across reads, with a non-JSON line before it
lines = [
msg_before + "\n",
"Some debug log line that is not JSON\n",
large_json[:1000],
large_json[1000:],
"\n" + msg_after,
]

transport = SubprocessCLITransport(prompt="test", options=make_options())

mock_process = MagicMock()
mock_process.returncode = None
mock_process.wait = AsyncMock(return_value=None)
transport._process = mock_process
transport._stdout_stream = MockTextReceiveStream(lines)
transport._stderr_stream = MockTextReceiveStream([])

messages: list[Any] = []
async for msg in transport.read_messages():
messages.append(msg)

assert len(messages) == 3
assert messages[0]["type"] == "system"
assert messages[0]["subtype"] == "start"
assert messages[1]["type"] == "assistant"
assert len(messages[1]["message"]["content"][0]["text"]) == 5000
assert messages[2]["type"] == "system"
assert messages[2]["subtype"] == "end"

anyio.run(_test)