Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions ARCHITECTURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,37 @@ Both modes fire the same `EventCallback` (`"spawned"`, `"status_changed"`, `"hea
| `/api/mc/worker/register` | POST | Pre-register worker metadata before spawn |
| `/api/mc/workers` | GET | List active workers from tracker |

## Swarm BFF (Backend for Frontend)

The Swarm Dashboard provides a unified view across all OpenClaw services. The BFF layer (`orchestrator/api/swarm.go`) implements a fan-out pattern:

```text
Browser → GET /api/swarm/overview → Orchestrator
├─→ Warren /admin/health + /admin/agents
├─→ Chronicle /api/v1/metrics/summary + /api/v1/dlq/stats
├─→ Dispatch /api/v1/stats + /api/v1/agents
├─→ PromptForge /api/prompts (array → count)
└─→ Alexandria /api/collections (array → count)
```

All 5 services are fetched concurrently with a 4s context deadline and 3s per-request timeout. Partial failures are isolated: if Chronicle is down, its error appears in the `errors` map while other services return normally.

### Swarm Endpoints

| Endpoint | Method | Purpose |
|----------|--------|---------|
| `/api/swarm/overview` | GET | Fan-out to all 5 services, return unified JSON |
| `/api/swarm/warren/health` | GET | Proxy Warren `/admin/health` |
| `/api/swarm/warren/events` | GET | Proxy Warren `/admin/events` SSE stream |

### Frontend Architecture

The Swarm tab uses a dedicated Zustand store (`useSwarmStore`) that:
- Polls `/api/swarm/overview` every 10s (only when the Live sub-tab is active)
- Connects to Warren SSE events via `EventSource` with exponential backoff
- Derives alerts from cross-service data (service-down detection, DLQ spike detection)
- Provides computed selectors (`useFleetSummary`, `usePipelineSummary`) for dashboard widgets

## Stack

| Component | Language | Purpose |
Expand Down
27 changes: 27 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,33 @@

All notable changes to MissionControl are documented in this file.

## v6.14 — Swarm Dashboard (2026-02-14)

### Swarm BFF (Backend for Frontend)
- New `/api/swarm/overview` endpoint — fans out to Warren, Chronicle, Dispatch, PromptForge, and Alexandria in parallel, returns unified JSON with per-service error isolation
- `/api/swarm/warren/health` — proxies Warren health endpoint
- `/api/swarm/warren/events` — proxies Warren SSE event stream with buffered passthrough
- Shared HTTP client with 3s timeout; overview uses 4s context deadline
- Service URLs configurable via `WARREN_URL`, `CHRONICLE_URL`, `DISPATCH_URL`, `PROMPTFORGE_URL`, `ALEXANDRIA_URL` environment variables

### Swarm Dashboard (Frontend)
- New "Swarm" tab in the main dashboard with Live and Schedule sub-tabs
- **FleetOverview** — agent counts, DLQ depth, prompt/collection counts
- **ServiceStatus** — per-service health indicators with error highlighting
- **TaskPipeline** — dispatch stats (pending, in-progress, completed, failed)
- **AgentGrid** — combined warren + dispatch agent roster
- **EventTimeline** — live SSE event stream from Warren
- Zustand store (`useSwarmStore`) with alert derivation (service-down, DLQ spike detection)
- `useSwarmPolling` hook — 10s interval polling, active only on Live tab
- `useWarrenSSE` hook — EventSource with exponential backoff reconnection

### Testing
- `swarm_test.go` — 6 Go tests: full overview, partial failure, method guard, health proxy, health down, SSE passthrough
- `useSwarmStore.test.ts` — 14 Vitest tests: state actions, alert derivation, DLQ spike detection, fleet/pipeline computations
- `swarm.spec.ts` — 4 Playwright E2E tests: tab visibility, sub-tab navigation, schedule placeholder

---

## v6.13 — Process Purity Phase 5 (2026-02-10)

### Mandatory Task Binding (`mc commit --task`)
Expand Down
5 changes: 5 additions & 0 deletions orchestrator/api/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ func (s *Server) Routes() http.Handler {
// Stages
mux.HandleFunc("/api/stages/override", s.methodPOST(s.handleStageOverride))

// Swarm BFF
mux.HandleFunc("/api/swarm/overview", s.methodGET(s.handleSwarmOverview))
mux.HandleFunc("/api/swarm/warren/health", s.methodGET(s.handleSwarmWarrenHealth))
mux.HandleFunc("/api/swarm/warren/events", s.methodGET(s.handleSwarmWarrenEvents))

// Placeholders
mux.HandleFunc("/api/openclaw/status", s.methodGET(s.handleOpenClawStatus))
mux.HandleFunc("/api/requirements", s.methodGET(s.handleRequirements))
Expand Down
274 changes: 274 additions & 0 deletions orchestrator/api/swarm.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,274 @@
package api

import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"sync"
"time"
)

// Service base URLs — configurable via environment variables.
var (
warrenURL = envOrDefault("WARREN_URL", "http://localhost:9090")
chronicleURL = envOrDefault("CHRONICLE_URL", "http://localhost:8700")
dispatchURL = envOrDefault("DISPATCH_URL", "http://localhost:8600")
promptForgeURL = envOrDefault("PROMPTFORGE_URL", "http://localhost:8400")
alexandriaURL = envOrDefault("ALEXANDRIA_URL", "http://localhost:8500")
)

func envOrDefault(key, fallback string) string {
if v := os.Getenv(key); v != "" {
return v
}
return fallback
}

// swarmClient is a shared HTTP client with reasonable timeouts for fan-out.
var swarmClient = &http.Client{
Timeout: 3 * time.Second,
}

// SwarmOverview is the aggregated response from all backend services.
type SwarmOverview struct {
Warren *json.RawMessage `json:"warren,omitempty"`
Chronicle *json.RawMessage `json:"chronicle,omitempty"`
Dispatch *json.RawMessage `json:"dispatch,omitempty"`
PromptForge *json.RawMessage `json:"promptforge,omitempty"`
Alexandria *json.RawMessage `json:"alexandria,omitempty"`
Errors map[string]string `json:"errors"`
FetchedAt string `json:"fetched_at"`
}

// fetchJSON performs a GET request and returns the raw JSON body.
func fetchJSON(ctx context.Context, url string) (json.RawMessage, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return nil, err
}
resp, err := swarmClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()

if resp.StatusCode >= 400 {
return nil, fmt.Errorf("HTTP %d", resp.StatusCode)
}

body, err := io.ReadAll(io.LimitReader(resp.Body, 1<<20)) // 1MB limit
if err != nil {
return nil, err
}
return json.RawMessage(body), nil
}

// fetchService fetches multiple endpoints from a single service and merges them
// into a single JSON object with the given keys.
func fetchService(ctx context.Context, baseURL string, endpoints map[string]string) (json.RawMessage, error) {
result := map[string]json.RawMessage{}
var mu sync.Mutex
var wg sync.WaitGroup
var firstErr error

for key, path := range endpoints {
wg.Add(1)
go func(k, p string) {
defer wg.Done()
data, err := fetchJSON(ctx, baseURL+p)
mu.Lock()
defer mu.Unlock()
if err != nil {
if firstErr == nil {
firstErr = err
}
return
}
result[k] = data
}(key, path)
}
wg.Wait()

if len(result) == 0 && firstErr != nil {
return nil, firstErr
}
raw, err := json.Marshal(result)
if err != nil {
return nil, err
}
return json.RawMessage(raw), nil
}

// handleSwarmOverview fans out to all services and returns a unified overview.
func (s *Server) handleSwarmOverview(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), 4*time.Second)
defer cancel()

overview := SwarmOverview{
Errors: map[string]string{},
}

type serviceResult struct {
name string
data json.RawMessage
err error
}

ch := make(chan serviceResult, 5)

// Warren: health + agents
go func() {
data, err := fetchService(ctx, warrenURL, map[string]string{
"health": "/admin/health",
"agents": "/admin/agents",
})
ch <- serviceResult{"warren", data, err}
}()

// Chronicle: metrics + DLQ
go func() {
data, err := fetchService(ctx, chronicleURL, map[string]string{
"metrics": "/api/v1/metrics/summary",
"dlq": "/api/v1/dlq/stats",
})
ch <- serviceResult{"chronicle", data, err}
}()

// Dispatch: stats + agents
go func() {
data, err := fetchService(ctx, dispatchURL, map[string]string{
"stats": "/api/v1/stats",
"agents": "/api/v1/agents",
})
ch <- serviceResult{"dispatch", data, err}
}()

// PromptForge: prompt count
go func() {
raw, err := fetchJSON(ctx, promptForgeURL+"/api/prompts")
if err != nil {
ch <- serviceResult{"promptforge", nil, err}
return
}
// Count array length
var items []json.RawMessage
if jsonErr := json.Unmarshal(raw, &items); jsonErr != nil {
// Maybe it's an object with a count field — pass through as-is
result, _ := json.Marshal(map[string]interface{}{"prompts": raw})
ch <- serviceResult{"promptforge", json.RawMessage(result), nil}
return
}
result, _ := json.Marshal(map[string]int{"prompt_count": len(items)})
ch <- serviceResult{"promptforge", json.RawMessage(result), nil}
}()

// Alexandria: collection count
go func() {
raw, err := fetchJSON(ctx, alexandriaURL+"/api/collections")
if err != nil {
ch <- serviceResult{"alexandria", nil, err}
return
}
var items []json.RawMessage
if jsonErr := json.Unmarshal(raw, &items); jsonErr != nil {
result, _ := json.Marshal(map[string]interface{}{"collections": raw})
ch <- serviceResult{"alexandria", json.RawMessage(result), nil}
return
}
result, _ := json.Marshal(map[string]int{"collection_count": len(items)})
ch <- serviceResult{"alexandria", json.RawMessage(result), nil}
}()

// Collect results
for i := 0; i < 5; i++ {
res := <-ch
if res.err != nil {
overview.Errors[res.name] = res.err.Error()
continue
}
raw := res.data
switch res.name {
case "warren":
overview.Warren = &raw
case "chronicle":
overview.Chronicle = &raw
case "dispatch":
overview.Dispatch = &raw
case "promptforge":
overview.PromptForge = &raw
case "alexandria":
overview.Alexandria = &raw
}
}

overview.FetchedAt = time.Now().UTC().Format(time.RFC3339)
writeJSON(w, http.StatusOK, overview)
}

// handleSwarmWarrenHealth proxies Warren /admin/health.
func (s *Server) handleSwarmWarrenHealth(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), 3*time.Second)
defer cancel()

data, err := fetchJSON(ctx, warrenURL+"/admin/health")
if err != nil {
respondError(w, http.StatusBadGateway, fmt.Sprintf("warren: %s", err))
return
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write(data)
}

// handleSwarmWarrenEvents proxies Warren /admin/events as an SSE stream.
func (s *Server) handleSwarmWarrenEvents(w http.ResponseWriter, r *http.Request) {
flusher, ok := w.(http.Flusher)
if !ok {
respondError(w, http.StatusInternalServerError, "streaming not supported")
return
}

ctx := r.Context()
req, err := http.NewRequestWithContext(ctx, http.MethodGet, warrenURL+"/admin/events", nil)
if err != nil {
respondError(w, http.StatusInternalServerError, err.Error())
return
}
req.Header.Set("Accept", "text/event-stream")

// Use a client without the default timeout for long-lived SSE.
sseClient := &http.Client{Timeout: 0}
resp, err := sseClient.Do(req)
if err != nil {
respondError(w, http.StatusBadGateway, fmt.Sprintf("warren events: %s", err))
return
}
defer resp.Body.Close()

w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("X-Accel-Buffering", "no")
w.WriteHeader(http.StatusOK)
flusher.Flush()

buf := make([]byte, 4096)
for {
select {
case <-ctx.Done():
return
default:
n, readErr := resp.Body.Read(buf)
if n > 0 {
w.Write(buf[:n])
flusher.Flush()
}
if readErr != nil {
return
}
}
}
}
Loading
Loading