Skip to content

Commit f6fac04

Browse files
committed
feat: Introduce conflict resolution and checkpointing tools for agents, and add a Multi-Agent Coordination Protocol (MCP) server.
1 parent 934d855 commit f6fac04

19 files changed

Lines changed: 2732 additions & 6 deletions

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# Sofia - AI Workspace Assistant 🧠✨
22

3-
![Version](https://img.shields.io/badge/version-v0.0.80-blue)
3+
![Version](https://img.shields.io/badge/version-v0.0.81-blue)
44
Sofia är en avancerad, kontextmedveten AI-assistent och multi-agent-orkestrerare skriven i Go. Designad för att fungera som en fullstack-utvecklare, systemarkitekt och projektledare. Genom att integrera direkt i den lokala utvecklingsmiljön kan Sofia läsa/skriva filer, exekvera terminalkommandon, schemalägga uppgifter och delegera arbete till specialiserade sub-agenter.
55

66
## ✨ Huvudfunktioner

cmd/sofia/internal/helpers.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import (
1212
const Logo = "🪲"
1313

1414
var (
15-
version = "v0.0.80"
15+
version = "v0.0.81"
1616
gitCommit string
1717
buildTime string
1818
goVersion string
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package mcpserver
2+
3+
import (
4+
"github.com/spf13/cobra"
5+
)
6+
7+
// NewMCPServerCommand creates the `sofia mcp-server` command.
8+
func NewMCPServerCommand() *cobra.Command {
9+
var (
10+
transport string
11+
addr string
12+
debug bool
13+
)
14+
15+
cmd := &cobra.Command{
16+
Use: "mcp-server",
17+
Short: "Start Sofia as an MCP server",
18+
Long: `Expose Sofia as a Model Context Protocol (MCP) server.
19+
Other AI agents and tools can connect to Sofia and use its capabilities
20+
via the standard MCP protocol.
21+
22+
Supports two transport modes:
23+
stdio - Communicate via stdin/stdout (default, for subprocess-based clients)
24+
sse - Communicate via HTTP Server-Sent Events (for network clients)`,
25+
Args: cobra.NoArgs,
26+
RunE: func(_ *cobra.Command, _ []string) error {
27+
return runMCPServer(transport, addr, debug)
28+
},
29+
}
30+
31+
cmd.Flags().StringVarP(&transport, "transport", "t", "stdio", "Transport mode: stdio or sse")
32+
cmd.Flags().StringVarP(&addr, "addr", "a", ":9090", "Listen address for SSE transport")
33+
cmd.Flags().BoolVarP(&debug, "debug", "d", false, "Enable debug logging")
34+
35+
return cmd
36+
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package mcpserver
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
"github.com/grasberg/sofia/cmd/sofia/internal"
8+
"github.com/grasberg/sofia/pkg/agent"
9+
"github.com/grasberg/sofia/pkg/bus"
10+
"github.com/grasberg/sofia/pkg/logger"
11+
"github.com/grasberg/sofia/pkg/mcp"
12+
"github.com/grasberg/sofia/pkg/providers"
13+
)
14+
15+
func runMCPServer(transport, addr string, debug bool) error {
16+
if debug {
17+
logger.SetLevel(logger.DEBUG)
18+
}
19+
20+
// When running as stdio MCP server, suppress all non-protocol output to stderr
21+
// since stdout is reserved for MCP JSON-RPC messages.
22+
if transport == "stdio" {
23+
logger.SetLevel(logger.ERROR)
24+
}
25+
26+
cfg, err := internal.LoadConfig()
27+
if err != nil {
28+
return fmt.Errorf("error loading config: %w", err)
29+
}
30+
31+
provider, _, err := providers.CreateProvider(cfg)
32+
if err != nil {
33+
return fmt.Errorf("error creating provider: %w", err)
34+
}
35+
36+
msgBus := bus.NewMessageBus()
37+
agentLoop := agent.NewAgentLoop(cfg, msgBus, provider)
38+
39+
// Start agent loop in background
40+
ctx, cancel := context.WithCancel(context.Background())
41+
defer cancel()
42+
go func() { _ = agentLoop.Run(ctx) }() //nolint:errcheck
43+
44+
sofiaServer := mcp.NewSofiaServer(agentLoop)
45+
46+
switch transport {
47+
case "stdio":
48+
return sofiaServer.ServeStdio()
49+
case "sse":
50+
logger.InfoC("mcp-server", fmt.Sprintf("MCP SSE server starting on %s", addr))
51+
return sofiaServer.ServeSSE(addr)
52+
default:
53+
return fmt.Errorf("unknown transport: %s (use 'stdio' or 'sse')", transport)
54+
}
55+
}

cmd/sofia/main.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/grasberg/sofia/cmd/sofia/internal/agent"
1717
"github.com/grasberg/sofia/cmd/sofia/internal/cron"
1818
"github.com/grasberg/sofia/cmd/sofia/internal/gateway"
19+
"github.com/grasberg/sofia/cmd/sofia/internal/mcpserver"
1920
"github.com/grasberg/sofia/cmd/sofia/internal/onboard"
2021
"github.com/grasberg/sofia/cmd/sofia/internal/version"
2122
)
@@ -34,6 +35,7 @@ func NewSofiaCommand() *cobra.Command {
3435
agent.NewAgentCommand(),
3536
gateway.NewGatewayCommand(),
3637
cron.NewCronCommand(),
38+
mcpserver.NewMCPServerCommand(),
3739
version.NewVersionCommand(),
3840
)
3941

cmd/sofia/main_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ func TestNewSofiaCommand(t *testing.T) {
3636
"agent",
3737
"cron",
3838
"gateway",
39+
"mcp-server",
3940
"onboard",
4041
"version",
4142
}

pkg/agent/loop.go

Lines changed: 128 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,12 @@ import (
2424

2525
"github.com/grasberg/sofia/pkg/autonomy"
2626
"github.com/grasberg/sofia/pkg/bus"
27+
"github.com/grasberg/sofia/pkg/checkpoint"
2728
"github.com/grasberg/sofia/pkg/channels"
2829
"github.com/grasberg/sofia/pkg/config"
2930
"github.com/grasberg/sofia/pkg/constants"
3031
"github.com/grasberg/sofia/pkg/logger"
32+
mcpPkg "github.com/grasberg/sofia/pkg/mcp"
3133
"github.com/grasberg/sofia/pkg/memory"
3234
"github.com/grasberg/sofia/pkg/providers"
3335
"github.com/grasberg/sofia/pkg/routing"
@@ -53,6 +55,7 @@ type AgentLoop struct {
5355
activeStatus atomic.Value // string
5456
planManager *tools.PlanManager
5557
scratchpad *tools.SharedScratchpad
58+
checkpointMgr *checkpoint.Manager
5659
a2aRouter *A2ARouter
5760

5861
// Rate limiting state
@@ -110,6 +113,7 @@ func NewAgentLoop(cfg *config.Config, msgBus *bus.MessageBus, provider providers
110113

111114
planMgr := tools.NewPlanManager()
112115
scratchpad := tools.NewSharedScratchpad()
116+
checkpointMgr := checkpoint.NewManager(memDB)
113117
a2aRouter := NewA2ARouter()
114118

115119
// Register all agents with the A2A router
@@ -127,6 +131,7 @@ func NewAgentLoop(cfg *config.Config, msgBus *bus.MessageBus, provider providers
127131
fallback: fallbackChain,
128132
planManager: planMgr,
129133
scratchpad: scratchpad,
134+
checkpointMgr: checkpointMgr,
130135
a2aRouter: a2aRouter,
131136
rpmCounts: make(map[string]int),
132137
rpmResetTime: make(map[string]time.Time),
@@ -149,14 +154,14 @@ func NewAgentLoop(cfg *config.Config, msgBus *bus.MessageBus, provider providers
149154
al.startAutonomyServices(provider)
150155

151156
// Register shared tools to all agents.
152-
registerSharedTools(cfg, msgBus, registry, provider, al.runSpawnedTaskAsAgent, planMgr, scratchpad, memDB, a2aRouter)
157+
registerSharedTools(cfg, msgBus, registry, provider, al.runSpawnedTaskAsAgent, planMgr, scratchpad, checkpointMgr, memDB, a2aRouter)
153158

154159
al.activeAgentID.Store("")
155160
al.activeStatus.Store("Idle")
156161
return al
157162
}
158163

159-
// registerSharedTools registers tools that are shared across all agents (web, message, spawn, plan, scratchpad, orchestrate, a2a).
164+
// registerSharedTools registers tools that are shared across all agents.
160165
func registerSharedTools(
161166
cfg *config.Config,
162167
msgBus *bus.MessageBus,
@@ -165,6 +170,7 @@ func registerSharedTools(
165170
agentTaskRunner func(ctx context.Context, agentID, sessionKey, task, originChannel, originChatID string) (string, error),
166171
planMgr *tools.PlanManager,
167172
scratchpad *tools.SharedScratchpad,
173+
checkpointMgr *checkpoint.Manager,
168174
memDB *memory.MemoryDB,
169175
a2aRouter *A2ARouter,
170176
) {
@@ -265,6 +271,9 @@ func registerSharedTools(
265271
// Scratchpad — agent-to-agent shared key-value store
266272
agent.Tools.Register(tools.NewScratchpadTool(scratchpad, "default"))
267273

274+
// Checkpoint — save/restore execution state mid-task
275+
agent.Tools.Register(tools.NewCheckpointTool(checkpointMgr, agentID))
276+
268277
// Subagent (synchronous) tool
269278
subagentTool := tools.NewSubagentTool(subagentManager)
270279
agent.Tools.Register(subagentTool)
@@ -293,6 +302,9 @@ func registerSharedTools(
293302
orchTool := tools.NewOrchestrateTool(orchCfg)
294303
agent.Tools.Register(orchTool)
295304

305+
// Conflict Resolution — detect and resolve conflicting outputs from parallel agents
306+
agent.Tools.Register(tools.NewConflictResolveTool(scratchpad))
307+
296308
// Knowledge Graph — semantic memory with structured entities and relationships
297309
if memDB != nil {
298310
agent.Tools.Register(tools.NewKnowledgeGraphTool(memDB, agentID))
@@ -462,7 +474,7 @@ func (al *AgentLoop) ReloadAgents() {
462474
al.a2aRouter.Register(id)
463475
}
464476

465-
registerSharedTools(al.cfg, al.bus, newRegistry, provider, al.runSpawnedTaskAsAgent, al.planManager, al.scratchpad, al.memDB, al.a2aRouter)
477+
registerSharedTools(al.cfg, al.bus, newRegistry, provider, al.runSpawnedTaskAsAgent, al.planManager, al.scratchpad, al.checkpointMgr, al.memDB, al.a2aRouter)
466478

467479
al.registryMu.Lock()
468480
al.registry = newRegistry
@@ -952,6 +964,13 @@ func (al *AgentLoop) runAgentLoop(ctx context.Context, agent *AgentInstance, opt
952964
// 1. Update tool contexts
953965
al.updateToolContexts(agent, opts.Channel, opts.ChatID)
954966

967+
// 1a. Set checkpoint tool session key
968+
if cpTool, ok := agent.Tools.Get("checkpoint"); ok {
969+
if ct, ok := cpTool.(*tools.CheckpointTool); ok {
970+
ct.SetSessionKey(opts.SessionKey)
971+
}
972+
}
973+
955974
// 1b. Signal thinking status to the channel (only if we intend to send a response)
956975
if opts.SendResponse && opts.Channel != "" && opts.ChatID != "" && !constants.IsInternalChannel(opts.Channel) {
957976
al.bus.PublishOutbound(bus.OutboundMessage{
@@ -1028,6 +1047,11 @@ func (al *AgentLoop) runAgentLoop(ctx context.Context, agent *AgentInstance, opt
10281047
agent.Sessions.AddMessage(opts.SessionKey, "assistant", finalContent)
10291048
agent.Sessions.Save(opts.SessionKey)
10301049

1050+
// 6b. Cleanup auto-checkpoints after successful completion
1051+
if err := al.checkpointMgr.Cleanup(opts.SessionKey); err != nil {
1052+
logger.WarnCF(agentComp, "Failed to cleanup checkpoints", map[string]any{"error": err.Error()})
1053+
}
1054+
10311055
// 7. Optional: summarization
10321056
if opts.EnableSummary {
10331057
al.maybeSummarize(agent, opts.SessionKey, opts.Channel, opts.ChatID)
@@ -1402,6 +1426,17 @@ func (al *AgentLoop) runLLMIteration(
14021426
// Save assistant message with tool calls to session
14031427
agent.Sessions.AddFullMessage(opts.SessionKey, assistantMsg)
14041428

1429+
// Auto-checkpoint before tool execution
1430+
cpName := fmt.Sprintf("auto:iter-%d", iteration)
1431+
if _, cpErr := al.checkpointMgr.Create(opts.SessionKey, agent.ID, cpName, iteration); cpErr != nil {
1432+
logger.WarnCF(agentComp, "Failed to create auto-checkpoint", map[string]any{
1433+
"error": cpErr.Error(), "iteration": iteration,
1434+
})
1435+
} else {
1436+
logger.DebugCF(agentComp, fmt.Sprintf("CHECKPOINT: saved %s", cpName),
1437+
map[string]any{"iteration": iteration})
1438+
}
1439+
14051440
// Execute tool calls — parallel or sequential based on config
14061441
type toolCallResult struct {
14071442
index int
@@ -1568,6 +1603,38 @@ func (al *AgentLoop) runLLMIteration(
15681603
agent.Sessions.AddFullMessage(opts.SessionKey, tcr.resultMsg)
15691604
}
15701605

1606+
// Auto-rollback: if error count reaches threshold, rollback to last checkpoint
1607+
const autoRollbackThreshold = 3
1608+
if errorCount >= autoRollbackThreshold {
1609+
cp, restoredMsgs, rbErr := al.checkpointMgr.RollbackToLatest(opts.SessionKey)
1610+
if rbErr != nil {
1611+
logger.WarnCF(agentComp, "Auto-rollback failed", map[string]any{"error": rbErr.Error()})
1612+
} else if cp != nil {
1613+
logger.InfoCF(agentComp,
1614+
fmt.Sprintf("CHECKPOINT: auto-rollback to %q (iter %d) after %d errors", cp.Name, cp.Iteration, errorCount),
1615+
map[string]any{"checkpoint_id": cp.ID, "errors": errorCount})
1616+
1617+
// Rebuild in-memory messages from restored state
1618+
messages = agent.ContextBuilder.BuildMessages(
1619+
restoredMsgs,
1620+
agent.Sessions.GetSummary(opts.SessionKey),
1621+
"",
1622+
nil, opts.Channel, opts.ChatID,
1623+
)
1624+
1625+
// Inject rollback notice so the LLM knows what happened
1626+
messages = append(messages, providers.Message{
1627+
Role: "user",
1628+
Content: fmt.Sprintf("[SYSTEM: Auto-rollback triggered after %d consecutive tool errors. "+
1629+
"State restored to checkpoint %q (iteration %d). "+
1630+
"Please try a different approach.]", errorCount, cp.Name, cp.Iteration),
1631+
})
1632+
1633+
errorCount = 0 // Reset for the new attempt
1634+
continue // Restart the iteration loop from the restored state
1635+
}
1636+
}
1637+
15711638
// If any tool requires confirmation, stop the loop and let the user respond
15721639
if confirmationNeeded {
15731640
finalContent = "Waiting for user confirmation before proceeding."
@@ -2183,6 +2250,64 @@ func extractPeer(msg bus.InboundMessage) *routing.RoutePeer {
21832250
return &routing.RoutePeer{Kind: peerKind, ID: peerID}
21842251
}
21852252

2253+
// ListAgentIDs returns all registered agent IDs.
2254+
func (al *AgentLoop) ListAgentIDs() []string {
2255+
return al.getRegistry().ListAgentIDs()
2256+
}
2257+
2258+
// ListAgentTools returns tool names for a given agent. If agentID is empty, uses default agent.
2259+
func (al *AgentLoop) ListAgentTools(agentID string) []string {
2260+
reg := al.getRegistry()
2261+
if agentID == "" {
2262+
agent := reg.GetDefaultAgent()
2263+
if agent == nil {
2264+
return nil
2265+
}
2266+
return agent.Tools.List()
2267+
}
2268+
agent, ok := reg.GetAgent(agentID)
2269+
if !ok {
2270+
return nil
2271+
}
2272+
return agent.Tools.List()
2273+
}
2274+
2275+
// ListSessionMetas returns lightweight metadata for all sessions.
2276+
func (al *AgentLoop) ListSessionMetas() []mcpPkg.SessionMeta {
2277+
sm := al.GetDefaultSessionManager()
2278+
if sm == nil {
2279+
return nil
2280+
}
2281+
sessions := sm.ListSessions()
2282+
result := make([]mcpPkg.SessionMeta, len(sessions))
2283+
for i, s := range sessions {
2284+
result[i] = mcpPkg.SessionMeta{
2285+
Key: s.Key,
2286+
Channel: s.Channel,
2287+
Preview: s.Preview,
2288+
MessageCount: s.MessageCount,
2289+
}
2290+
}
2291+
return result
2292+
}
2293+
2294+
// GetSessionHistory returns messages for a session key.
2295+
func (al *AgentLoop) GetSessionHistory(sessionKey string) []mcpPkg.MessageInfo {
2296+
sm := al.GetDefaultSessionManager()
2297+
if sm == nil {
2298+
return nil
2299+
}
2300+
history := sm.GetHistory(sessionKey)
2301+
result := make([]mcpPkg.MessageInfo, len(history))
2302+
for i, m := range history {
2303+
result[i] = mcpPkg.MessageInfo{
2304+
Role: m.Role,
2305+
Content: m.Content,
2306+
}
2307+
}
2308+
return result
2309+
}
2310+
21862311
// extractParentPeer extracts the parent peer (reply-to) from inbound message metadata.
21872312
func extractParentPeer(msg bus.InboundMessage) *routing.RoutePeer {
21882313
parentKind := msg.Metadata["parent_peer_kind"]

0 commit comments

Comments
 (0)