Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
335 changes: 335 additions & 0 deletions components/backend/handlers/agent_status_bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,335 @@
package handlers

import (
"encoding/json"
"fmt"
"os"
"path/filepath"
"sync"
"testing"
"time"

"ambient-code-backend/types"
)

// setupEventLog creates a temporary event log with N events for benchmarking.
func setupEventLog(b *testing.B, eventCount int) (stateDir string, sessionName string) {
b.Helper()
stateDir = b.TempDir()
sessionName = "bench-session"
sessionDir := filepath.Join(stateDir, "sessions", sessionName)
if err := os.MkdirAll(sessionDir, 0755); err != nil {
b.Fatal(err)
}

logPath := filepath.Join(sessionDir, "agui-events.jsonl")
f, err := os.Create(logPath)
if err != nil {
b.Fatal(err)
}

// Write a realistic event sequence ending with RUN_STARTED (so status = "working")
threadID := "thread-1"
runID := "run-1"
for i := 0; i < eventCount-1; i++ {
evt := map[string]interface{}{
"type": "TEXT_MESSAGE_CONTENT",
"threadId": threadID,
"runId": runID,
"messageId": fmt.Sprintf("msg-%d", i),
"delta": "some text content for benchmarking purposes",
"timestamp": time.Now().UnixMilli(),
}
data, _ := json.Marshal(evt)
f.Write(append(data, '\n'))
}
// Last event: RUN_STARTED (makes DeriveAgentStatus return "working")
lastEvt := map[string]interface{}{
"type": "RUN_STARTED",
"threadId": threadID,
"runId": runID,
"timestamp": time.Now().UnixMilli(),
}
data, _ := json.Marshal(lastEvt)
f.Write(append(data, '\n'))
f.Close()

return stateDir, sessionName
}

// BenchmarkEnrichAgentStatus_Uncached measures the cost of deriving agent status
// from the event log without caching (the old behavior).
func BenchmarkEnrichAgentStatus_Uncached(b *testing.B) {
stateDir, sessionName := setupEventLog(b, 10000)

// Point the websocket package at our temp dir
origDerive := DeriveAgentStatusFromEvents
defer func() { DeriveAgentStatusFromEvents = origDerive }()

// Import the real DeriveAgentStatus from websocket package via the function pointer.
// Since we can't import websocket here (circular), simulate with a file-scanning function.
DeriveAgentStatusFromEvents = func(name string) string {
path := filepath.Join(stateDir, "sessions", name, "agui-events.jsonl")
return deriveStatusFromFile(path)
}

session := &types.AgenticSession{
Metadata: map[string]interface{}{"name": sessionName},
Status: &types.AgenticSessionStatus{Phase: "Running"},
}

b.ResetTimer()
for i := 0; i < b.N; i++ {
// Clear cache to force file scan every time
agentStatusCache.Lock()
delete(agentStatusCache.entries, sessionName)
agentStatusCache.Unlock()

enrichAgentStatus(session)
}
}

// BenchmarkEnrichAgentStatus_Cached measures the cost with caching (the fix).
func BenchmarkEnrichAgentStatus_Cached(b *testing.B) {
stateDir, sessionName := setupEventLog(b, 10000)

origDerive := DeriveAgentStatusFromEvents
defer func() { DeriveAgentStatusFromEvents = origDerive }()

DeriveAgentStatusFromEvents = func(name string) string {
path := filepath.Join(stateDir, "sessions", name, "agui-events.jsonl")
return deriveStatusFromFile(path)
}

session := &types.AgenticSession{
Metadata: map[string]interface{}{"name": sessionName},
Status: &types.AgenticSessionStatus{Phase: "Running"},
}

// Prime the cache with one call
enrichAgentStatus(session)

b.ResetTimer()
for i := 0; i < b.N; i++ {
enrichAgentStatus(session)
}
Comment on lines +109 to +115
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Pin the cache entry past the timed section.

agentStatusCache is TTL-based in components/backend/handlers/sessions.go, but these benchmarks only warm it once before ResetTimer(). On longer -benchtime runs, the entry can expire mid-loop and the reported “cached” numbers start including file-derivation work again.

Suggested change
 // Prime the cache with one call
 enrichAgentStatus(session)
+agentStatusCache.Lock()
+if entry, ok := agentStatusCache.entries[sessionName]; ok {
+	entry.expiresAt = time.Now().Add(time.Hour)
+	agentStatusCache.entries[sessionName] = entry
+}
+agentStatusCache.Unlock()

 b.ResetTimer()

Apply the same pattern to BenchmarkEnrichAgentStatus_Concurrent and BenchmarkEnrichAgentStatus_ListPage/cached.

Also applies to: 139-152, 189-199

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@components/backend/handlers/agent_status_bench_test.go` around lines 109 -
115, The benchmark is warming agentStatusCache only before ResetTimer(), but the
TTL can expire during long runs, skewing cached metrics; update each benchmark
(enrichAgentStatus calls in BenchmarkEnrichAgentStatus,
BenchmarkEnrichAgentStatus_Concurrent,
BenchmarkEnrichAgentStatus_ListPage/cached) to "pin" the cache entry after
warming by explicitly extending its TTL (e.g. re-set or update the entry via the
same cache API used in sessions.go such as agentStatusCache.Set or a
touch/update method on the session) immediately before b.ResetTimer(), ensuring
the cached entry cannot expire during the timed loop.

}

// BenchmarkEnrichAgentStatus_Concurrent measures cached path under contention.
func BenchmarkEnrichAgentStatus_Concurrent(b *testing.B) {
stateDir, _ := setupEventLog(b, 10000)

origDerive := DeriveAgentStatusFromEvents
defer func() { DeriveAgentStatusFromEvents = origDerive }()

DeriveAgentStatusFromEvents = func(name string) string {
path := filepath.Join(stateDir, "sessions", name, "agui-events.jsonl")
return deriveStatusFromFile(path)
}

// Create 20 "running" sessions (simulates a list page with 20 running)
sessions := make([]*types.AgenticSession, 20)
for i := 0; i < 20; i++ {
sessions[i] = &types.AgenticSession{
Metadata: map[string]interface{}{"name": "bench-session"},
Status: &types.AgenticSessionStatus{Phase: "Running"},
}
}

// Prime cache
enrichAgentStatus(sessions[0])

b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
// Each goroutine gets its own session to avoid racing on AgentStatus mutation
session := &types.AgenticSession{
Metadata: map[string]interface{}{"name": "bench-session"},
Status: &types.AgenticSessionStatus{Phase: "Running"},
}
for pb.Next() {
enrichAgentStatus(session)
}
})
}

// BenchmarkEnrichAgentStatus_ListPage simulates enriching all sessions in a
// paginated list response (20 running sessions, as the frontend would see).
func BenchmarkEnrichAgentStatus_ListPage(b *testing.B) {
stateDir, _ := setupEventLog(b, 10000)

origDerive := DeriveAgentStatusFromEvents
defer func() { DeriveAgentStatusFromEvents = origDerive }()

DeriveAgentStatusFromEvents = func(name string) string {
path := filepath.Join(stateDir, "sessions", name, "agui-events.jsonl")
return deriveStatusFromFile(path)
}

sessions := make([]types.AgenticSession, 20)
for i := 0; i < 20; i++ {
sessions[i] = types.AgenticSession{
Metadata: map[string]interface{}{"name": "bench-session"},
Status: &types.AgenticSessionStatus{Phase: "Running"},
}
}

b.Run("uncached", func(b *testing.B) {
for i := 0; i < b.N; i++ {
for j := range sessions {
// Clear cache before each session to force a file scan every time
agentStatusCache.Lock()
delete(agentStatusCache.entries, "bench-session")
agentStatusCache.Unlock()

enrichAgentStatus(&sessions[j])
}
}
})

b.Run("cached", func(b *testing.B) {
// Prime
for j := range sessions {
enrichAgentStatus(&sessions[j])
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
for j := range sessions {
enrichAgentStatus(&sessions[j])
}
}
})
}

// deriveStatusFromFile simulates DeriveAgentStatus by tail-scanning the event log.
// This mirrors the real implementation in websocket/agui_store.go:DeriveAgentStatus.
func deriveStatusFromFile(path string) string {
const maxTailBytes = 20 * 1024 * 1024

file, err := os.Open(path)
if err != nil {
return ""
}
defer file.Close()

stat, err := file.Stat()
if err != nil {
return ""
}

fileSize := stat.Size()
var data []byte

if fileSize <= maxTailBytes {
data, err = os.ReadFile(path)
if err != nil {
return ""
}
} else {
offset := fileSize - maxTailBytes
file.Seek(offset, 0)
data = make([]byte, maxTailBytes)
n, _ := file.Read(data)
data = data[:n]
}

// Scan backwards for lifecycle events
lines := splitTailLines(data)
for i := len(lines) - 1; i >= 0; i-- {
if len(lines[i]) == 0 {
continue
}
var evt map[string]interface{}
if err := json.Unmarshal(lines[i], &evt); err != nil {
continue
}
evtType, _ := evt["type"].(string)
switch evtType {
case "RUN_STARTED":
return "working"
case "RUN_FINISHED", "RUN_ERROR":
return "idle"
}
}
return ""
}

func splitTailLines(data []byte) [][]byte {
var lines [][]byte
start := 0
for i, b := range data {
if b == '\n' {
if i > start {
lines = append(lines, data[start:i])
}
start = i + 1
}
}
if start < len(data) {
lines = append(lines, data[start:])
}
return lines
}

// TestAgentStatusCache_Correctness verifies the cache behaves correctly.
func TestAgentStatusCache_Correctness(t *testing.T) {
callCount := 0
origDerive := DeriveAgentStatusFromEvents
defer func() { DeriveAgentStatusFromEvents = origDerive }()

DeriveAgentStatusFromEvents = func(name string) string {
callCount++
return "working"
}
Comment on lines +275 to +282
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

find . -name "agent_status_bench_test.go" -type f

Repository: ambient-code/platform

Length of output: 121


🏁 Script executed:

cat -n ./components/backend/handlers/agent_status_bench_test.go | sed -n '270,290p'

Repository: ambient-code/platform

Length of output: 771


🏁 Script executed:

cat -n ./components/backend/handlers/agent_status_bench_test.go | sed -n '315,340p'

Repository: ambient-code/platform

Length of output: 659


🏁 Script executed:

cat -n ./components/backend/handlers/agent_status_bench_test.go | sed -n '290,320p'

Repository: ambient-code/platform

Length of output: 1226


🏁 Script executed:

cat -n ./components/backend/handlers/agent_status_bench_test.go | sed -n '334,345p'

Repository: ambient-code/platform

Length of output: 91


🏁 Script executed:

cat -n ./components/backend/handlers/agent_status_bench_test.go | sed -n '330,350p'

Repository: ambient-code/platform

Length of output: 157


🏁 Script executed:

grep -n "callCount" ./components/backend/handlers/agent_status_bench_test.go

Repository: ambient-code/platform

Length of output: 366


🏁 Script executed:

cat -n ./components/backend/handlers/agent_status_bench_test.go | sed -n '1,30p'

Repository: ambient-code/platform

Length of output: 907


Make the concurrency check race-free and assert the outcome.

The stub increments a plain callCount (line 280) that all 50 concurrent goroutines can reach without synchronization. The concurrent test section (lines 321-334) never checks that callCount remained at 1 after wg.Wait(). If the cache regresses and multiple workers fall through to derivation, this test races on its own bookkeeping and still passes.

Use atomic.AddInt32(&callCount, 1) instead of callCount++, and add an assertion after line 334 to verify the cache prevented extra derivations under concurrent load.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@components/backend/handlers/agent_status_bench_test.go` around lines 275 -
282, Replace the plain callCount increment with an atomic counter and assert its
value after the goroutines complete: change callCount to an int32 (used by the
test stub for DeriveAgentStatusFromEvents), increment it with
atomic.AddInt32(&callCount, 1) inside the stub, and after wg.Wait() add an
assertion that the counter equals 1 to verify the cache prevented extra
derivations; keep the defer that restores DeriveAgentStatusFromEvents and
reference the same stub and wg.Wait() code paths when making the change.


session := &types.AgenticSession{
Metadata: map[string]interface{}{"name": "test-session"},
Status: &types.AgenticSessionStatus{Phase: "Running"},
}

// Clear cache
agentStatusCache.Lock()
agentStatusCache.entries = make(map[string]agentStatusCacheEntry)
agentStatusCache.Unlock()

// First call — cache miss, should call DeriveAgentStatusFromEvents
enrichAgentStatus(session)
if callCount != 1 {
t.Fatalf("expected 1 call, got %d", callCount)
}

// Second call within TTL — cache hit, should NOT call again
enrichAgentStatus(session)
if callCount != 1 {
t.Fatalf("expected 1 call (cached), got %d", callCount)
}

// Verify status was set
if *session.Status.AgentStatus != "working" {
t.Fatalf("expected 'working', got %q", *session.Status.AgentStatus)
}

// Non-running session should skip cache entirely
stopped := &types.AgenticSession{
Metadata: map[string]interface{}{"name": "stopped-session"},
Status: &types.AgenticSessionStatus{Phase: "Stopped"},
}
enrichAgentStatus(stopped)
if callCount != 1 {
t.Fatalf("expected no call for stopped session, got %d", callCount)
}

// Concurrent safety
var wg sync.WaitGroup
for i := 0; i < 50; i++ {
wg.Add(1)
go func() {
defer wg.Done()
s := &types.AgenticSession{
Metadata: map[string]interface{}{"name": "test-session"},
Status: &types.AgenticSessionStatus{Phase: "Running"},
}
enrichAgentStatus(s)
}()
}
wg.Wait()
}
Loading