Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 35 additions & 5 deletions sdk/python/agentfield/node_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from __future__ import annotations

import json
import io
import os
import queue
import secrets
Expand Down Expand Up @@ -93,15 +94,19 @@ def append(self, stream: str, text: str, max_line_bytes: int) -> None:
ts = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + "Z"
with self._lock:
self._seq += 1
entry = LogEntry(seq=self._seq, ts=ts, stream=stream, line=raw, truncated=truncated)
entry = LogEntry(
seq=self._seq, ts=ts, stream=stream, line=raw, truncated=truncated
)
self._entries.append(entry)
self._approx_bytes += len(entry.line.encode("utf-8")) + 64
while self._approx_bytes > self._max_bytes and len(self._entries) > 1:
old = self._entries.popleft()
self._approx_bytes -= len(old.line.encode("utf-8")) + 64
_notify_followers()

def snapshot_after(self, since_seq: int, limit: Optional[int] = None) -> List[LogEntry]:
def snapshot_after(
self, since_seq: int, limit: Optional[int] = None
) -> List[LogEntry]:
with self._lock:
items = [e for e in self._entries if e.seq > since_seq]
if limit is not None and limit > 0:
Expand All @@ -119,7 +124,7 @@ def max_seq(self) -> int:
return self._seq


class _TeeTextIO(TextIO):
class _TeeTextIO(io.TextIOBase):
"""Write-through to original stream and log ring (line-buffered by \\n)."""

def __init__(
Expand All @@ -146,10 +151,33 @@ def write(self, s: str) -> int:
self._ring.append(self._stream_name, line, self._max_line_bytes)
return len(s)

def writelines(self, lines) -> None:
for line in lines:
self.write(line)

def flush(self) -> None:
self._original.flush()

# Minimal TextIO protocol for print()
def fileno(self) -> int:
return self._original.fileno()

def readable(self) -> bool:
return bool(self._original.readable())

def writable(self) -> bool:
return bool(self._original.writable())

def seekable(self) -> bool:
return bool(self._original.seekable())

def close(self) -> None:
if self.closed:
return
if self._buf:
self._ring.append(self._stream_name, self._buf, self._max_line_bytes)
self._buf = ""
super().close()

@property
def encoding(self) -> str:
return getattr(self._original, "encoding", "utf-8") or "utf-8"
Expand Down Expand Up @@ -221,7 +249,9 @@ def iter_tail_ndjson(
ring = get_ring()
cap_tail = tail_lines
if since_seq > 0:
entries = ring.snapshot_after(since_seq, limit=cap_tail if cap_tail > 0 else None)
entries = ring.snapshot_after(
since_seq, limit=cap_tail if cap_tail > 0 else None
)
else:
n = cap_tail if cap_tail > 0 else 200
entries = ring.tail(n)
Expand Down
172 changes: 172 additions & 0 deletions sdk/python/tests/test_harness_cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
"""Tests for shared subprocess helpers used by CLI harness providers."""

from __future__ import annotations

import asyncio
from unittest.mock import AsyncMock, MagicMock, patch

import pytest

from agentfield.harness._cli import (
estimate_cli_cost,
extract_final_text,
parse_jsonl,
run_cli,
strip_ansi,
)


def test_strip_ansi_removes_colors():
assert strip_ansi("\x1b[31mError\x1b[0m") == "Error"


@pytest.mark.asyncio
async def test_run_cli_success():
process = MagicMock()
process.communicate = AsyncMock(return_value=(b"OK", b""))
process.returncode = 0

create_process = AsyncMock(return_value=process)

with patch("asyncio.create_subprocess_exec", create_process):
stdout, stderr, returncode = await run_cli(
["agentfield", "status"],
env={"AGENTFIELD_TEST": "1"},
cwd=".",
timeout=1,
)

assert stdout == "OK"
assert stderr == ""
assert returncode == 0
create_process.assert_awaited_once()
_, kwargs = create_process.call_args
assert kwargs["env"]["AGENTFIELD_TEST"] == "1"
assert kwargs["cwd"] == "."
assert kwargs["stdout"] is asyncio.subprocess.PIPE
assert kwargs["stderr"] is asyncio.subprocess.PIPE


@pytest.mark.asyncio
async def test_run_cli_timeout():
class HangingProcess:
returncode = None

def __init__(self) -> None:
self.killed = False
self.wait = AsyncMock(return_value=None)

async def communicate(self):
await asyncio.sleep(1)
return b"", b""

def kill(self):
self.killed = True

process = HangingProcess()

with patch("asyncio.create_subprocess_exec", AsyncMock(return_value=process)):
with pytest.raises(TimeoutError, match="CLI command timed out"):
await run_cli(["agentfield", "hang"], timeout=0.01)

assert process.killed is True
process.wait.assert_awaited_once()


def test_parse_jsonl_skips_invalid():
events = parse_jsonl('{"type":"a"}\nnot-json\n{"type":"b"}')

assert events == [{"type": "a"}, {"type": "b"}]


def test_extract_final_text_codex_style():
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟧 [HIGH] extract_final_text only tests 1 of 4 event-type branches

extract_final_text in agentfield/harness/_cli.py:68-98 handles four event types:

  • item.completed (Codex) ← tested here
  • result ← untested
  • turn.completed ← untested
  • message / assistant ← untested

Untested branches in a "final answer extraction" path can silently return None or wrong text in production for non-Codex CLI providers. Suggest adding a case per event type plus an empty-events case.


🤖 Reviewed by AgentField PR Review Harness

events = [
{"type": "item.completed", "item": {"type": "agent_message", "text": "first"}},
{
"type": "item.completed",
"item": {"type": "agent_message", "text": "final answer"},
},
]

assert extract_final_text(events) == "final answer"


@pytest.mark.parametrize(
("events", "expected"),
[
([{"type": "result", "result": "result answer"}], "result answer"),
([{"type": "result", "text": "text answer"}], "text answer"),
([{"type": "turn.completed", "text": "turn answer"}], "turn answer"),
([{"type": "message", "content": "message answer"}], "message answer"),
([{"type": "assistant", "text": "assistant answer"}], "assistant answer"),
],
)
def test_extract_final_text_event_variants(events, expected):
assert extract_final_text(events) == expected


def test_extract_final_text_empty_events():
assert extract_final_text([]) is None


def test_estimate_cli_cost_calls_litellm():
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟧 [HIGH] estimate_cli_cost except path is untested

Source (_cli.py:113-123) has try: import litellm ... except Exception: return None. Only the success path returning 0.05 is exercised; the "litellm-missing" or "completion_cost raises" branches silently return None. Callers MUST treat None as "unknown" rather than "free" — exactly the kind of defensive code that rots untested and quietly corrupts billing/accounting if broken.

Suggested additions:

  • ImportError when litellm is absent
  • completion_cost returning 0 or None
  • completion_cost raising

🤖 Reviewed by AgentField PR Review Harness

mock_litellm = MagicMock()
mock_litellm.completion_cost.return_value = 0.05

with patch.dict("sys.modules", {"litellm": mock_litellm}):
cost = estimate_cli_cost(
model="openai/gpt-4o",
prompt="Summarize this run",
result_text="Done",
)

assert cost == 0.05
mock_litellm.completion_cost.assert_called_once_with(
model="openai/gpt-4o",
prompt="Summarize this run",
completion="Done",
)


def test_estimate_cli_cost_returns_none_without_model():
assert estimate_cli_cost(model="", prompt="prompt", result_text="Done") is None


def test_estimate_cli_cost_returns_none_when_litellm_missing():
with patch.dict("sys.modules", {"litellm": None}):
cost = estimate_cli_cost(
model="openai/gpt-4o",
prompt="Summarize this run",
result_text="Done",
)

assert cost is None


@pytest.mark.parametrize("raw_cost", [0, None])
def test_estimate_cli_cost_returns_none_for_non_positive_cost(raw_cost):
mock_litellm = MagicMock()
mock_litellm.completion_cost.return_value = raw_cost

with patch.dict("sys.modules", {"litellm": mock_litellm}):
cost = estimate_cli_cost(
model="openai/gpt-4o",
prompt="Summarize this run",
result_text="Done",
)

assert cost is None


def test_estimate_cli_cost_returns_none_when_litellm_raises():
mock_litellm = MagicMock()
mock_litellm.completion_cost.side_effect = RuntimeError("pricing unavailable")

with patch.dict("sys.modules", {"litellm": mock_litellm}):
cost = estimate_cli_cost(
model="openai/gpt-4o",
prompt="Summarize this run",
result_text="Done",
)

assert cost is None
Loading
Loading