-
Notifications
You must be signed in to change notification settings - Fork 86
perf: add SSAR cache, event log tail reads, and agent status cache #1026
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
6048e42
1613729
34fdfe0
96d0e18
b0ab35c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,335 @@ | ||
| package handlers | ||
|
|
||
| import ( | ||
| "encoding/json" | ||
| "fmt" | ||
| "os" | ||
| "path/filepath" | ||
| "sync" | ||
| "testing" | ||
| "time" | ||
|
|
||
| "ambient-code-backend/types" | ||
| ) | ||
|
|
||
| // setupEventLog creates a temporary event log with N events for benchmarking. | ||
| func setupEventLog(b *testing.B, eventCount int) (stateDir string, sessionName string) { | ||
| b.Helper() | ||
| stateDir = b.TempDir() | ||
| sessionName = "bench-session" | ||
| sessionDir := filepath.Join(stateDir, "sessions", sessionName) | ||
| if err := os.MkdirAll(sessionDir, 0755); err != nil { | ||
| b.Fatal(err) | ||
| } | ||
|
|
||
| logPath := filepath.Join(sessionDir, "agui-events.jsonl") | ||
| f, err := os.Create(logPath) | ||
| if err != nil { | ||
| b.Fatal(err) | ||
| } | ||
|
|
||
| // Write a realistic event sequence ending with RUN_STARTED (so status = "working") | ||
| threadID := "thread-1" | ||
| runID := "run-1" | ||
| for i := 0; i < eventCount-1; i++ { | ||
| evt := map[string]interface{}{ | ||
| "type": "TEXT_MESSAGE_CONTENT", | ||
| "threadId": threadID, | ||
| "runId": runID, | ||
| "messageId": fmt.Sprintf("msg-%d", i), | ||
| "delta": "some text content for benchmarking purposes", | ||
| "timestamp": time.Now().UnixMilli(), | ||
| } | ||
| data, _ := json.Marshal(evt) | ||
| f.Write(append(data, '\n')) | ||
| } | ||
| // Last event: RUN_STARTED (makes DeriveAgentStatus return "working") | ||
| lastEvt := map[string]interface{}{ | ||
| "type": "RUN_STARTED", | ||
| "threadId": threadID, | ||
| "runId": runID, | ||
| "timestamp": time.Now().UnixMilli(), | ||
| } | ||
| data, _ := json.Marshal(lastEvt) | ||
| f.Write(append(data, '\n')) | ||
| f.Close() | ||
|
|
||
| return stateDir, sessionName | ||
| } | ||
|
|
||
| // BenchmarkEnrichAgentStatus_Uncached measures the cost of deriving agent status | ||
| // from the event log without caching (the old behavior). | ||
| func BenchmarkEnrichAgentStatus_Uncached(b *testing.B) { | ||
| stateDir, sessionName := setupEventLog(b, 10000) | ||
|
|
||
| // Point the websocket package at our temp dir | ||
| origDerive := DeriveAgentStatusFromEvents | ||
| defer func() { DeriveAgentStatusFromEvents = origDerive }() | ||
|
|
||
| // Import the real DeriveAgentStatus from websocket package via the function pointer. | ||
| // Since we can't import websocket here (circular), simulate with a file-scanning function. | ||
| DeriveAgentStatusFromEvents = func(name string) string { | ||
| path := filepath.Join(stateDir, "sessions", name, "agui-events.jsonl") | ||
| return deriveStatusFromFile(path) | ||
| } | ||
|
|
||
| session := &types.AgenticSession{ | ||
| Metadata: map[string]interface{}{"name": sessionName}, | ||
| Status: &types.AgenticSessionStatus{Phase: "Running"}, | ||
| } | ||
|
|
||
| b.ResetTimer() | ||
| for i := 0; i < b.N; i++ { | ||
| // Clear cache to force file scan every time | ||
| agentStatusCache.Lock() | ||
| delete(agentStatusCache.entries, sessionName) | ||
| agentStatusCache.Unlock() | ||
|
|
||
| enrichAgentStatus(session) | ||
| } | ||
| } | ||
|
|
||
| // BenchmarkEnrichAgentStatus_Cached measures the cost with caching (the fix). | ||
| func BenchmarkEnrichAgentStatus_Cached(b *testing.B) { | ||
| stateDir, sessionName := setupEventLog(b, 10000) | ||
|
|
||
| origDerive := DeriveAgentStatusFromEvents | ||
| defer func() { DeriveAgentStatusFromEvents = origDerive }() | ||
|
|
||
| DeriveAgentStatusFromEvents = func(name string) string { | ||
| path := filepath.Join(stateDir, "sessions", name, "agui-events.jsonl") | ||
| return deriveStatusFromFile(path) | ||
| } | ||
|
|
||
| session := &types.AgenticSession{ | ||
| Metadata: map[string]interface{}{"name": sessionName}, | ||
| Status: &types.AgenticSessionStatus{Phase: "Running"}, | ||
| } | ||
|
|
||
| // Prime the cache with one call | ||
| enrichAgentStatus(session) | ||
|
|
||
| b.ResetTimer() | ||
| for i := 0; i < b.N; i++ { | ||
| enrichAgentStatus(session) | ||
| } | ||
| } | ||
|
|
||
| // BenchmarkEnrichAgentStatus_Concurrent measures cached path under contention. | ||
| func BenchmarkEnrichAgentStatus_Concurrent(b *testing.B) { | ||
| stateDir, _ := setupEventLog(b, 10000) | ||
|
|
||
| origDerive := DeriveAgentStatusFromEvents | ||
| defer func() { DeriveAgentStatusFromEvents = origDerive }() | ||
|
|
||
| DeriveAgentStatusFromEvents = func(name string) string { | ||
| path := filepath.Join(stateDir, "sessions", name, "agui-events.jsonl") | ||
| return deriveStatusFromFile(path) | ||
| } | ||
|
|
||
| // Create 20 "running" sessions (simulates a list page with 20 running) | ||
| sessions := make([]*types.AgenticSession, 20) | ||
| for i := 0; i < 20; i++ { | ||
| sessions[i] = &types.AgenticSession{ | ||
| Metadata: map[string]interface{}{"name": "bench-session"}, | ||
| Status: &types.AgenticSessionStatus{Phase: "Running"}, | ||
| } | ||
| } | ||
|
|
||
| // Prime cache | ||
| enrichAgentStatus(sessions[0]) | ||
|
|
||
| b.ResetTimer() | ||
| b.RunParallel(func(pb *testing.PB) { | ||
| // Each goroutine gets its own session to avoid racing on AgentStatus mutation | ||
| session := &types.AgenticSession{ | ||
| Metadata: map[string]interface{}{"name": "bench-session"}, | ||
| Status: &types.AgenticSessionStatus{Phase: "Running"}, | ||
| } | ||
| for pb.Next() { | ||
| enrichAgentStatus(session) | ||
| } | ||
| }) | ||
coderabbitai[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| // BenchmarkEnrichAgentStatus_ListPage simulates enriching all sessions in a | ||
| // paginated list response (20 running sessions, as the frontend would see). | ||
| func BenchmarkEnrichAgentStatus_ListPage(b *testing.B) { | ||
| stateDir, _ := setupEventLog(b, 10000) | ||
|
|
||
| origDerive := DeriveAgentStatusFromEvents | ||
| defer func() { DeriveAgentStatusFromEvents = origDerive }() | ||
|
|
||
| DeriveAgentStatusFromEvents = func(name string) string { | ||
| path := filepath.Join(stateDir, "sessions", name, "agui-events.jsonl") | ||
| return deriveStatusFromFile(path) | ||
| } | ||
|
|
||
| sessions := make([]types.AgenticSession, 20) | ||
| for i := 0; i < 20; i++ { | ||
| sessions[i] = types.AgenticSession{ | ||
| Metadata: map[string]interface{}{"name": "bench-session"}, | ||
| Status: &types.AgenticSessionStatus{Phase: "Running"}, | ||
| } | ||
| } | ||
|
|
||
| b.Run("uncached", func(b *testing.B) { | ||
| for i := 0; i < b.N; i++ { | ||
| for j := range sessions { | ||
| // Clear cache before each session to force a file scan every time | ||
| agentStatusCache.Lock() | ||
| delete(agentStatusCache.entries, "bench-session") | ||
| agentStatusCache.Unlock() | ||
|
|
||
| enrichAgentStatus(&sessions[j]) | ||
| } | ||
syntaxsdev marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
| }) | ||
|
|
||
| b.Run("cached", func(b *testing.B) { | ||
| // Prime | ||
| for j := range sessions { | ||
| enrichAgentStatus(&sessions[j]) | ||
| } | ||
| b.ResetTimer() | ||
| for i := 0; i < b.N; i++ { | ||
| for j := range sessions { | ||
| enrichAgentStatus(&sessions[j]) | ||
| } | ||
| } | ||
| }) | ||
| } | ||
|
|
||
| // deriveStatusFromFile simulates DeriveAgentStatus by tail-scanning the event log. | ||
| // This mirrors the real implementation in websocket/agui_store.go:DeriveAgentStatus. | ||
| func deriveStatusFromFile(path string) string { | ||
| const maxTailBytes = 20 * 1024 * 1024 | ||
|
|
||
| file, err := os.Open(path) | ||
| if err != nil { | ||
| return "" | ||
| } | ||
| defer file.Close() | ||
|
|
||
| stat, err := file.Stat() | ||
| if err != nil { | ||
| return "" | ||
| } | ||
|
|
||
| fileSize := stat.Size() | ||
| var data []byte | ||
|
|
||
| if fileSize <= maxTailBytes { | ||
| data, err = os.ReadFile(path) | ||
| if err != nil { | ||
| return "" | ||
| } | ||
| } else { | ||
| offset := fileSize - maxTailBytes | ||
| file.Seek(offset, 0) | ||
| data = make([]byte, maxTailBytes) | ||
| n, _ := file.Read(data) | ||
| data = data[:n] | ||
| } | ||
|
|
||
| // Scan backwards for lifecycle events | ||
| lines := splitTailLines(data) | ||
| for i := len(lines) - 1; i >= 0; i-- { | ||
| if len(lines[i]) == 0 { | ||
| continue | ||
| } | ||
| var evt map[string]interface{} | ||
| if err := json.Unmarshal(lines[i], &evt); err != nil { | ||
| continue | ||
| } | ||
| evtType, _ := evt["type"].(string) | ||
| switch evtType { | ||
| case "RUN_STARTED": | ||
| return "working" | ||
| case "RUN_FINISHED", "RUN_ERROR": | ||
| return "idle" | ||
| } | ||
| } | ||
| return "" | ||
| } | ||
|
|
||
| func splitTailLines(data []byte) [][]byte { | ||
| var lines [][]byte | ||
| start := 0 | ||
| for i, b := range data { | ||
| if b == '\n' { | ||
| if i > start { | ||
| lines = append(lines, data[start:i]) | ||
| } | ||
| start = i + 1 | ||
| } | ||
| } | ||
| if start < len(data) { | ||
| lines = append(lines, data[start:]) | ||
| } | ||
| return lines | ||
| } | ||
|
|
||
| // TestAgentStatusCache_Correctness verifies the cache behaves correctly. | ||
| func TestAgentStatusCache_Correctness(t *testing.T) { | ||
| callCount := 0 | ||
| origDerive := DeriveAgentStatusFromEvents | ||
| defer func() { DeriveAgentStatusFromEvents = origDerive }() | ||
|
|
||
| DeriveAgentStatusFromEvents = func(name string) string { | ||
| callCount++ | ||
| return "working" | ||
| } | ||
|
Comment on lines
+275
to
+282
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🏁 Script executed: find . -name "agent_status_bench_test.go" -type fRepository: ambient-code/platform Length of output: 121 🏁 Script executed: cat -n ./components/backend/handlers/agent_status_bench_test.go | sed -n '270,290p'Repository: ambient-code/platform Length of output: 771 🏁 Script executed: cat -n ./components/backend/handlers/agent_status_bench_test.go | sed -n '315,340p'Repository: ambient-code/platform Length of output: 659 🏁 Script executed: cat -n ./components/backend/handlers/agent_status_bench_test.go | sed -n '290,320p'Repository: ambient-code/platform Length of output: 1226 🏁 Script executed: cat -n ./components/backend/handlers/agent_status_bench_test.go | sed -n '334,345p'Repository: ambient-code/platform Length of output: 91 🏁 Script executed: cat -n ./components/backend/handlers/agent_status_bench_test.go | sed -n '330,350p'Repository: ambient-code/platform Length of output: 157 🏁 Script executed: grep -n "callCount" ./components/backend/handlers/agent_status_bench_test.goRepository: ambient-code/platform Length of output: 366 🏁 Script executed: cat -n ./components/backend/handlers/agent_status_bench_test.go | sed -n '1,30p'Repository: ambient-code/platform Length of output: 907 Make the concurrency check race-free and assert the outcome. The stub increments a plain Use 🤖 Prompt for AI Agents |
||
|
|
||
| session := &types.AgenticSession{ | ||
| Metadata: map[string]interface{}{"name": "test-session"}, | ||
| Status: &types.AgenticSessionStatus{Phase: "Running"}, | ||
| } | ||
|
|
||
| // Clear cache | ||
| agentStatusCache.Lock() | ||
| agentStatusCache.entries = make(map[string]agentStatusCacheEntry) | ||
| agentStatusCache.Unlock() | ||
|
|
||
| // First call — cache miss, should call DeriveAgentStatusFromEvents | ||
| enrichAgentStatus(session) | ||
| if callCount != 1 { | ||
| t.Fatalf("expected 1 call, got %d", callCount) | ||
| } | ||
|
|
||
| // Second call within TTL — cache hit, should NOT call again | ||
| enrichAgentStatus(session) | ||
| if callCount != 1 { | ||
| t.Fatalf("expected 1 call (cached), got %d", callCount) | ||
| } | ||
|
|
||
| // Verify status was set | ||
| if *session.Status.AgentStatus != "working" { | ||
| t.Fatalf("expected 'working', got %q", *session.Status.AgentStatus) | ||
| } | ||
|
|
||
| // Non-running session should skip cache entirely | ||
| stopped := &types.AgenticSession{ | ||
| Metadata: map[string]interface{}{"name": "stopped-session"}, | ||
| Status: &types.AgenticSessionStatus{Phase: "Stopped"}, | ||
| } | ||
| enrichAgentStatus(stopped) | ||
| if callCount != 1 { | ||
| t.Fatalf("expected no call for stopped session, got %d", callCount) | ||
| } | ||
|
|
||
| // Concurrent safety | ||
| var wg sync.WaitGroup | ||
| for i := 0; i < 50; i++ { | ||
| wg.Add(1) | ||
| go func() { | ||
| defer wg.Done() | ||
| s := &types.AgenticSession{ | ||
| Metadata: map[string]interface{}{"name": "test-session"}, | ||
| Status: &types.AgenticSessionStatus{Phase: "Running"}, | ||
| } | ||
| enrichAgentStatus(s) | ||
| }() | ||
| } | ||
| wg.Wait() | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pin the cache entry past the timed section.
agentStatusCacheis TTL-based incomponents/backend/handlers/sessions.go, but these benchmarks only warm it once beforeResetTimer(). On longer-benchtimeruns, the entry can expire mid-loop and the reported “cached” numbers start including file-derivation work again.Suggested change
Apply the same pattern to
BenchmarkEnrichAgentStatus_ConcurrentandBenchmarkEnrichAgentStatus_ListPage/cached.Also applies to: 139-152, 189-199
🤖 Prompt for AI Agents