-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathagent.go
More file actions
147 lines (121 loc) · 4.88 KB
/
agent.go
File metadata and controls
147 lines (121 loc) · 4.88 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package core
import (
"context"
"fmt"
"sync"
)
// StreamConsumer is called for each delta when Run uses streaming. When nil, Run uses
// Complete only. Thinking deltas are for display only; the consumer must not persist them.
type StreamConsumer func(d Delta)
// Agent is the central unit of execution. It wraps a system prompt, a model configuration,
// a set of tools, optional long-term memory, and a callback chain.
//
// An Agent is safe for concurrent use: multiple goroutines may call Run simultaneously,
// each with its own Conversation. Internal state is read-only after construction.
type Agent struct {
// ID is a unique identifier for this agent instance, used in memory entries and logs.
ID string
// Instructions is the system prompt prepended to every conversation on Run.
// If the conversation already starts with a system message, Instructions is not prepended.
Instructions string
// Model holds the provider and generation parameters.
Model ModelConfig
// Tools is the set of tools available to this agent.
// The agent builds an internal name→tool map at the start of each Run call.
Tools []Tool
// Memory is the optional long-term memory store. Nil disables long-term memory.
Memory MemoryStore
// Callbacks is the lifecycle hook chain. A nil slice is a valid no-op chain.
Callbacks CallbackChain
// MaxTurns caps LLM sampling rounds per Run; 0 means unlimited (default).
MaxTurns int
// QueryEventSink receives query-loop observability events when non-nil.
QueryEventSink func(QueryEvent)
// QueryLoopHooks wires optional query-loop extension points (permission view, stop hooks).
QueryLoopHooks QueryLoopHooks
// QueryTracking optionally supplies chain identity for sub-agent runs; nil uses a root chain per Run.
QueryTracking *QueryTracking
}
// Run executes the agent loop for the given conversation via the Claude-parity query loop
// (prepare → model → recoveries → tools → prefetch stubs; see query_loop_*.go).
//
// It prepends the system prompt (if Instructions is set and not already present), then iterates:
// 1. BeforeLLMCall callbacks
// 2. Provider completion (Stream when streamConsumer is non-nil, else Complete)
// 3. AfterLLMCall callbacks
// 4. If tool calls present: dispatch concurrently, fire per-tool callbacks, append results, continue
// 5. If no tool calls: stop
//
// When streamConsumer is non-nil, Run uses Provider.Stream and invokes it for each delta;
// only the final assistant message (from Delta.Final) is appended to the conversation.
// Run modifies conv in place and also returns it.
// ctx cancellation aborts the loop at the next blocking call.
func (a *Agent) Run(ctx context.Context, conv *Conversation, streamConsumer StreamConsumer) (*Conversation, error) {
if a.Instructions != "" {
if len(conv.Messages) == 0 || conv.Messages[0].Role != RoleSystem {
conv.Messages = append([]Message{{Role: RoleSystem, Content: a.Instructions}}, conv.Messages...)
}
}
// Build name→tool map once per Run to avoid per-iteration allocations.
toolMap := make(map[string]Tool, len(a.Tools))
for _, t := range a.Tools {
toolMap[t.Name()] = t
}
if err := a.Callbacks.BeforeAgentLoop(ctx, conv); err != nil {
return nil, err
}
dispatch := func(ctx context.Context, calls []ToolCall) []ToolResult {
return a.dispatchTools(ctx, calls, toolMap)
}
err := runQueryLoop(ctx, &queryLoopParams{
Agent: a,
Conv: conv,
StreamConsumer: streamConsumer,
MaxTurns: a.MaxTurns,
EventSink: a.QueryEventSink,
Tracking: a.QueryTracking,
Hooks: a.QueryLoopHooks,
DispatchTools: dispatch,
})
if err != nil {
return nil, err
}
if err := a.Callbacks.AfterAgentLoop(ctx, conv); err != nil {
return nil, err
}
return conv, nil
}
// dispatchTools executes all tool calls concurrently and collects results in call order.
// Each call fires BeforeToolExecution (which may mutate or reject the call) then AfterToolExecution.
func (a *Agent) dispatchTools(ctx context.Context, calls []ToolCall, toolMap map[string]Tool) []ToolResult {
results := make([]ToolResult, len(calls))
var wg sync.WaitGroup
for i, call := range calls {
wg.Add(1)
// Capture loop variables.
go func(i int, call ToolCall) {
defer wg.Done()
call, err := a.Callbacks.BeforeToolExecution(ctx, call)
if err != nil {
results[i] = ToolResult{CallID: call.ID, Name: call.Name, Err: err}
return
}
t, ok := toolMap[call.Name]
if !ok {
results[i] = ToolResult{
CallID: call.ID,
Name: call.Name,
Err: fmt.Errorf("unknown tool: %s", call.Name),
}
return
}
content, execErr := t.Execute(ctx, call.Params)
result := ToolResult{CallID: call.ID, Name: call.Name, Content: content, Err: execErr}
// AfterToolExecution errors are not fatal to the loop.
_ = a.Callbacks.AfterToolExecution(ctx, call, result)
results[i] = result
}(i, call)
}
wg.Wait()
return results
}