Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 24 additions & 2 deletions pkg/cli/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ type Config struct {
AutoApprove bool
HideToolCalls bool
OutputJSON bool
ExecutionMode string
MaxIterations int
}

// Run executes an agent in non-TUI mode, handling user input and runtime events
Expand All @@ -53,6 +55,26 @@ func Run(ctx context.Context, out *Printer, cfg Config, rt runtime.Runtime, sess
ctx = telemetry.WithClient(ctx, telemetryClient)
}

var orch runtime.Orchestrator

switch cfg.ExecutionMode {
case "loop":
orch = runtime.NewLoopAgentOrchestrator(
runtime.NewSingleAgentOrchestrator(rt),
cfg.MaxIterations,
func(ctx context.Context, iter int, events []runtime.Event) bool {
// você pode customizar a condição de saída, por agora false (loop até MaxIterations)
return false
},
)
case "sequential":
orch = runtime.NewSequentialAgentOrchestrator(rt)
case "parallel":
orch = runtime.NewParallelAgentOrchestrator(rt)
default:
orch = runtime.NewSingleAgentOrchestrator(rt)
}

sess.Title = "Running agent"
// If the last received event was an error, return it. That way the exit code
// will be non-zero if the agent failed.
Expand All @@ -67,7 +89,7 @@ func Run(ctx context.Context, out *Printer, cfg Config, rt runtime.Runtime, sess
sess.AddMessage(PrepareUserMessage(ctx, rt, userInput, cfg.AttachmentPath))

if cfg.OutputJSON {
for event := range rt.RunStream(ctx, sess) {
for event := range orch.Run(ctx, sess) {
switch e := event.(type) {
case *runtime.ToolCallConfirmationEvent:
if !cfg.AutoApprove {
Expand All @@ -90,7 +112,7 @@ func Run(ctx context.Context, out *Printer, cfg Config, rt runtime.Runtime, sess
firstLoop := true
lastAgent := rt.CurrentAgentName()
var lastConfirmedToolCallID string
for event := range rt.RunStream(ctx, sess) {
for event := range orch.Run(ctx, sess) {
agentName := event.GetAgentName()
if agentName != "" && (firstLoop || lastAgent != agentName) {
if !firstLoop {
Expand Down
12 changes: 12 additions & 0 deletions pkg/runtime/orchestrator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package runtime

import (
"context"

"github.com/docker/cagent/pkg/session"
)

// Orchestrator defines how agents are executed.
type Orchestrator interface {
Run(ctx context.Context, sess *session.Session) <-chan Event
}
82 changes: 82 additions & 0 deletions pkg/runtime/orchestrator_loop.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package runtime

import (
"context"

"github.com/docker/cagent/pkg/session"
)

// ExitCondition decides whether the loop should stop.
type ExitCondition func(
ctx context.Context,
iteration int,
events []Event,
) bool

// LoopAgentOrchestrator runs an orchestrator repeatedly until an exit condition is met.
type LoopAgentOrchestrator struct {
body Orchestrator
maxIterations int
exitConditions []ExitCondition
}

// NewLoopAgentOrchestrator creates a loop orchestrator.
// maxIterations <= 0 means unlimited.
func NewLoopAgentOrchestrator(
body Orchestrator,
maxIterations int,
exitConditions ...ExitCondition,
) *LoopAgentOrchestrator {
return &LoopAgentOrchestrator{
body: body,
maxIterations: maxIterations,
exitConditions: exitConditions,
}
}

func (o *LoopAgentOrchestrator) Run(
ctx context.Context,
sess *session.Session,
) <-chan Event {
out := make(chan Event)

go func() {
defer close(out)

iteration := 0

for {
if ctx.Err() != nil {
return
}

if o.maxIterations > 0 && iteration >= o.maxIterations {
return
}

events := o.body.Run(ctx, sess)

var collected []Event

for ev := range events {
collected = append(collected, ev)

select {
case out <- ev:
case <-ctx.Done():
return
}
}

for _, cond := range o.exitConditions {
if cond(ctx, iteration, collected) {
return
}
}

iteration++
}
}()

return out
}
72 changes: 72 additions & 0 deletions pkg/runtime/orchestrator_parallel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package runtime

import (
"context"
"sync"

"github.com/docker/cagent/pkg/session"
)

// ParallelAgentOrchestrator runs agents in parallel but emits events
// in the order of the runtimes.
type ParallelAgentOrchestrator struct {
runtimes []Runtime
}

func NewParallelAgentOrchestrator(rts ...Runtime) *ParallelAgentOrchestrator {
return &ParallelAgentOrchestrator{runtimes: rts}
}

func (o *ParallelAgentOrchestrator) Run(
ctx context.Context,
sess *session.Session,
) <-chan Event {
out := make(chan Event)

type stream struct {
events <-chan Event
}

streams := make([]stream, len(o.runtimes))

var wg sync.WaitGroup
wg.Add(len(o.runtimes))

for i, rt := range o.runtimes {
ch := make(chan Event)
streams[i] = stream{events: ch}

go func(rt Runtime, out chan Event) {
defer wg.Done()
defer close(out)

events := rt.RunStream(ctx, sess)
for event := range events {
select {
case out <- event:
case <-ctx.Done():
return
}
}
}(rt, ch)
}

go func() {
defer close(out)

// Emit strictly in runtime order
for _, s := range streams {
for event := range s.events {
select {
case out <- event:
case <-ctx.Done():
return
}
}
}

wg.Wait()
}()

return out
}
40 changes: 40 additions & 0 deletions pkg/runtime/orchestrator_sequential.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package runtime

import (
"context"

"github.com/docker/cagent/pkg/session"
)

// SequentialAgentOrchestrator runs agents one after another.
type SequentialAgentOrchestrator struct {
runtimes []Runtime
}

func NewSequentialAgentOrchestrator(rts ...Runtime) *SequentialAgentOrchestrator {
return &SequentialAgentOrchestrator{runtimes: rts}
}

func (o *SequentialAgentOrchestrator) Run(
ctx context.Context,
sess *session.Session,
) <-chan Event {
out := make(chan Event)

go func() {
defer close(out)

for _, rt := range o.runtimes {
events := rt.RunStream(ctx, sess)
for event := range events {
select {
case out <- event:
case <-ctx.Done():
return
}
}
}
}()

return out
}
23 changes: 23 additions & 0 deletions pkg/runtime/orchestrator_single.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package runtime

import (
"context"

"github.com/docker/cagent/pkg/session"
)

// SingleAgentOrchestrator runs exactly one agent (current behavior)
type SingleAgentOrchestrator struct {
rt Runtime
}

func NewSingleAgentOrchestrator(rt Runtime) *SingleAgentOrchestrator {
return &SingleAgentOrchestrator{rt: rt}
}

func (o *SingleAgentOrchestrator) Run(
ctx context.Context,
sess *session.Session,
) <-chan Event {
return o.rt.RunStream(ctx, sess)
}
62 changes: 62 additions & 0 deletions pkg/runtime/runtime_orchestrator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package runtime_test

import (
"context"
"testing"

"github.com/docker/cagent/pkg/runtime"
"github.com/docker/cagent/pkg/session"
"github.com/docker/cagent/pkg/tools"
)

// ------------------------
// fakeRuntime implements runtime.Runtime for testing.
// ------------------------
type fakeRuntime struct {
events []runtime.Event
}

func (f *fakeRuntime) RunStream(ctx context.Context, _ *session.Session) <-chan runtime.Event {
ch := make(chan runtime.Event)
go func() {
defer close(ch)
for _, ev := range f.events {
select {
case ch <- ev:
case <-ctx.Done():
return
}
}
}()
return ch
}

func (f *fakeRuntime) Resume(ctx context.Context, _ runtime.ResumeType) {}

func (f *fakeRuntime) CurrentAgentInfo(ctx context.Context) runtime.CurrentAgentInfo {
return runtime.CurrentAgentInfo{Name: "test-agent"}
}

func (f *fakeRuntime) CurrentAgentName() string { return "test-agent" }

func (f *fakeRuntime) CurrentAgentTools(ctx context.Context) ([]tools.Tool, error) {
return nil, nil
}

func (f *fakeRuntime) EmitStartupInfo(ctx context.Context, ch chan runtime.Event) {}

func (f *fakeRuntime) ResetStartupInfo() {}

func TestFakeRuntime_RunStream_smoke(t *testing.T) {
ctx := t.Context()

fr := &fakeRuntime{
events: []runtime.Event{},
}

ch := fr.RunStream(ctx, nil)

// drain channel to avoid goroutine leak
for range ch {
}
}