Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 5 additions & 64 deletions cmd/entire/cli/agent/claudecode/generate_streaming_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ import (
"os"
"os/exec"
"path/filepath"
"strings"
"testing"

"github.com/entireio/cli/cmd/entire/cli/agent"
"github.com/entireio/cli/cmd/entire/cli/agent/testutil"
)

func TestGenerateTextStreaming_Success(t *testing.T) {
Expand All @@ -21,7 +21,7 @@ func TestGenerateTextStreaming_Success(t *testing.T) {
}

agentInst := &ClaudeCodeAgent{
CommandRunner: fakeStreamCmd(string(fixture), "", 0),
CommandRunner: testutil.FakeStreamCmd(string(fixture), "", 0),
}

var phases []agent.ProgressPhase
Expand Down Expand Up @@ -57,8 +57,8 @@ func TestGenerateTextStreaming_FallbackOnUnrecognizedFlag(t *testing.T) {
// Old CLI: exit non-zero with stderr complaining about --output-format=stream-json.
// Fallback path is exercised by routing the *second* call (GenerateText) to a
// canned non-streaming envelope.
streamCall := fakeStreamCmd("", "error: unknown flag: --output-format=stream-json", 1)
nonStreamCall := fakeStreamCmd(`{"is_error":false,"result":"fallback ok","subtype":"success"}`, "", 0)
streamCall := testutil.FakeStreamCmd("", "error: unknown flag: --output-format=stream-json", 1)
nonStreamCall := testutil.FakeStreamCmd(`{"is_error":false,"result":"fallback ok","subtype":"success"}`, "", 0)
calls := 0
agentInst := &ClaudeCodeAgent{
CommandRunner: func(ctx context.Context, name string, args ...string) *exec.Cmd {
Expand Down Expand Up @@ -100,7 +100,7 @@ func TestGenerateTextStreaming_EnvelopeErrorSurfaced(t *testing.T) {
}

agentInst := &ClaudeCodeAgent{
CommandRunner: fakeStreamCmd(string(fixture), "", 0),
CommandRunner: testutil.FakeStreamCmd(string(fixture), "", 0),
}
_, err = agentInst.GenerateTextStreaming(context.Background(), "test", "haiku", nil)
if err == nil {
Expand Down Expand Up @@ -135,62 +135,3 @@ func equalPhases(a, b []agent.ProgressPhase) bool {
}
return true
}

// fakeStreamCmd returns a CommandRunner factory whose *exec.Cmd, when Start()'d
// and Wait()'d, produces stdout/stderr/exit-code as configured. Implementation
// uses 'sh -c' to write fixture data; on Windows runners we'd need PowerShell,
// but our CI is Linux/macOS.
func fakeStreamCmd(stdout, stderr string, exitCode int) func(ctx context.Context, name string, args ...string) *exec.Cmd {
return func(ctx context.Context, _ string, _ ...string) *exec.Cmd {
script := buildFakeShellScript(stdout, stderr, exitCode)
return exec.CommandContext(ctx, "sh", "-c", script)
}
}

func buildFakeShellScript(stdout, stderr string, exitCode int) string {
var sb strings.Builder
if stdout != "" {
sb.WriteString("cat <<'__EOF__'\n")
sb.WriteString(stdout)
if !strings.HasSuffix(stdout, "\n") {
sb.WriteString("\n")
}
sb.WriteString("__EOF__\n")
}
if stderr != "" {
sb.WriteString("cat <<'__EOF__' 1>&2\n")
sb.WriteString(stderr)
if !strings.HasSuffix(stderr, "\n") {
sb.WriteString("\n")
}
sb.WriteString("__EOF__\n")
}
if exitCode != 0 {
sb.WriteString("exit ")
sb.WriteString(itoa(exitCode))
sb.WriteString("\n")
}
return sb.String()
}

func itoa(n int) string {
if n == 0 {
return "0"
}
neg := n < 0
if neg {
n = -n
}
var buf [12]byte
i := len(buf)
for n > 0 {
i--
buf[i] = byte('0' + n%10)
n /= 10
}
if neg {
i--
buf[i] = '-'
}
return string(buf[i:])
}
153 changes: 153 additions & 0 deletions cmd/entire/cli/agent/codex/generate_streaming.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
package codex

import (
"bufio"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"os/exec"
"strings"
"time"

"github.com/entireio/cli/cmd/entire/cli/agent"
)

const streamBufferMax = 4 * 1024 * 1024 // 4 MiB

// parseCodexStream consumes `codex exec --json` NDJSON output, dispatches
// progress callbacks, and returns the final agent_message text.
func parseCodexStream(stdout io.Reader, progress agent.ProgressFn) (string, error) {
scanner := bufio.NewScanner(stdout)
scanner.Buffer(make([]byte, 64*1024), streamBufferMax)

var (
resultText string
sawTurnComplete bool
usage *codexStreamUsage
turnStartedAt time.Time
turnDuration time.Duration
malformed int
)

dispatch := func(p agent.GenerationProgress) {
if progress != nil {
progress(p)
}
}

for scanner.Scan() {
line := scanner.Bytes()
if len(line) == 0 {
continue
}
var ev codexStreamEvent
if err := json.Unmarshal(line, &ev); err != nil {
// Codex may emit transient noise (blank lines, partial flushes); a
// schema-incompatible line is recoverable per-event but tracked so
// protocol regressions surface in the "no agent_message" error
// instead of disappearing silently.
malformed++
continue
}

switch ev.Type {
case "turn.started":
turnStartedAt = time.Now()
dispatch(agent.GenerationProgress{Phase: agent.PhaseConnecting})

case "item.completed":
// Codex emits the full agent_message in one item; we capture
// the text but defer PhaseFirstToken until turn.completed so
// the cached_input_tokens usage clause can be attached. The
// CLI buffers and emits items in one chunk per turn, so there
// is no incremental "first token" signal to surface anyway.
if ev.Item != nil && ev.Item.Type == "agent_message" {
resultText = ev.Item.Text
}

case "turn.completed":
sawTurnComplete = true
usage = ev.Usage
if !turnStartedAt.IsZero() {
turnDuration = time.Since(turnStartedAt)
}

case "turn.failed", "error":
return "", fmt.Errorf("codex turn failed: %s", agent.SafeErrorMessage(line))
}
}
if err := scanner.Err(); err != nil {
return "", fmt.Errorf("reading codex stream: %w", err)
}
if !sawTurnComplete {
return "", errors.New("codex stream ended without a turn.completed event")
}
if resultText == "" {
if malformed > 0 {
return "", fmt.Errorf("codex stream produced no agent_message (%d malformed lines skipped)", malformed)
}
return "", errors.New("codex stream produced no agent_message")
}
if progress != nil {
firstToken := agent.GenerationProgress{Phase: agent.PhaseFirstToken}
if usage != nil {
firstToken.InputTokens = usage.InputTokens
firstToken.CachedInputTokens = usage.CachedInputTokens
}
dispatch(firstToken)

done := agent.GenerationProgress{Phase: agent.PhaseDone}
if usage != nil {
done.OutputTokens = usage.OutputTokens
done.InputTokens = usage.InputTokens
done.CachedInputTokens = usage.CachedInputTokens
}
if turnDuration > 0 {
done.DurationMs = int(turnDuration.Milliseconds())
}
dispatch(done)
}
return resultText, nil
}

// GenerateTextStreaming implements agent.StreamingTextGenerator.
func (c *CodexAgent) GenerateTextStreaming(
ctx context.Context,
prompt, model string,
progress agent.ProgressFn,
) (string, error) {
tmpl := &agent.StreamingGeneratorTemplate{
AgentName: "codex",
BuildCmd: c.buildStreamCmd,
Parser: parseCodexStream,
LooksLikeUnrecognizedFlag: func(stderr string) bool {
return agent.LooksLikeUnrecognizedFlag(stderr, "json")
},
}

result, err := tmpl.Generate(ctx, prompt, model, progress)
if err != nil {
if errors.Is(err, agent.ErrUnrecognizedStreamingFlag) {
return c.GenerateText(ctx, prompt, model)
}
return "", fmt.Errorf("codex streaming generate: %w", err)
}
return result, nil
}

func (c *CodexAgent) buildStreamCmd(ctx context.Context, prompt, model string) *exec.Cmd {
commandRunner := c.CommandRunner
if commandRunner == nil {
commandRunner = exec.CommandContext
}
args := []string{"exec", "--skip-git-repo-check", "--json"}
if model != "" {
args = append(args, "--model", model)
}
args = append(args, "-")
cmd := commandRunner(ctx, "codex", args...)
cmd.Stdin = strings.NewReader(prompt)
return cmd
}
132 changes: 132 additions & 0 deletions cmd/entire/cli/agent/codex/generate_streaming_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package codex

import (
"bytes"
"context"
"os"
"os/exec"
"path/filepath"
"strings"
"testing"

"github.com/entireio/cli/cmd/entire/cli/agent"
"github.com/entireio/cli/cmd/entire/cli/agent/testutil"
)

func TestParseCodexStream_Success(t *testing.T) {
t.Parallel()

data, err := os.ReadFile(filepath.Join("testdata", "stream_success.jsonl"))
if err != nil {
t.Fatalf("read fixture: %v", err)
}

var phases []agent.ProgressPhase
result, err := parseCodexStream(bytes.NewReader(data), func(p agent.GenerationProgress) {
phases = append(phases, p.Phase)
})
if err != nil {
t.Fatalf("parse: %v", err)
}
if result != "Hello, world." {
t.Errorf("result = %q, want %q", result, "Hello, world.")
}

counts := map[agent.ProgressPhase]int{}
for _, p := range phases {
counts[p]++
}
if counts[agent.PhaseConnecting] != 1 {
t.Errorf("PhaseConnecting count = %d, want 1", counts[agent.PhaseConnecting])
}
if counts[agent.PhaseFirstToken] != 1 {
t.Errorf("PhaseFirstToken count = %d, want 1", counts[agent.PhaseFirstToken])
}
if counts[agent.PhaseDone] != 1 {
t.Errorf("PhaseDone count = %d, want 1", counts[agent.PhaseDone])
}
}

func TestParseCodexStream_ErrorEnvelope(t *testing.T) {
t.Parallel()

data, err := os.ReadFile(filepath.Join("testdata", "stream_error.jsonl"))
if err != nil {
t.Fatalf("read fixture: %v", err)
}

_, err = parseCodexStream(bytes.NewReader(data), nil)
if err == nil {
t.Fatal("expected error from error envelope")
}
if !strings.Contains(err.Error(), "model not found") {
t.Errorf("error %q should mention 'model not found'", err)
}
}

func TestParseCodexStream_MissingTurnCompleted(t *testing.T) {
t.Parallel()

stream := `{"type":"thread.started","thread_id":"t"}
{"type":"item.completed","item":{"id":"i","type":"agent_message","text":"partial"}}
`
_, err := parseCodexStream(strings.NewReader(stream), nil)
if err == nil {
t.Fatal("expected error when stream lacks turn.completed")
}
}

func TestCodexGenerateTextStreaming_Success(t *testing.T) {
t.Parallel()

fixture, err := os.ReadFile(filepath.Join("testdata", "stream_success.jsonl"))
if err != nil {
t.Fatalf("read fixture: %v", err)
}

c := &CodexAgent{
CommandRunner: testutil.FakeStreamCmd(string(fixture), "", 0),
}

var phases []agent.ProgressPhase
result, err := c.GenerateTextStreaming(context.Background(), "test prompt", "haiku", func(p agent.GenerationProgress) {
phases = append(phases, p.Phase)
})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if result != "Hello, world." {
t.Errorf("result = %q, want %q", result, "Hello, world.")
}
if len(phases) != 3 {
t.Errorf("phases = %v (count %d), want 3 (Connecting, FirstToken, Done)", phases, len(phases))
}
}

func TestCodexGenerateTextStreaming_FallbackOnUnrecognizedFlag(t *testing.T) {
t.Parallel()

streamCall := testutil.FakeStreamCmd("", "error: unknown flag: --json", 1)
nonStreamCall := testutil.FakeStreamCmd("fallback response", "", 0)
calls := 0
c := &CodexAgent{
CommandRunner: func(ctx context.Context, name string, args ...string) *exec.Cmd {
calls++
if calls == 1 {
return streamCall(ctx, name, args...)
}
return nonStreamCall(ctx, name, args...)
},
}

result, err := c.GenerateTextStreaming(context.Background(), "test", "haiku", nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if !strings.Contains(result, "fallback") {
t.Errorf("result = %q, want substring 'fallback'", result)
}
if calls != 2 {
t.Errorf("calls = %d, want 2 (streaming + fallback)", calls)
}
}
Loading
Loading