Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 14 additions & 6 deletions internal/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,8 @@ func RunSubgraph(ctx context.Context, cancel context.CancelFunc, port int, openB
})
}

allRunning := false

for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -275,15 +277,20 @@ func RunSubgraph(ctx context.Context, cancel context.CancelFunc, port int, openB
logger.Println("βœ… exiting because all requested tasks completed and none should be restarted")
cancel()
} else if len(remainingTasks) == 0 {
logger.Println("πŸ”΅ all requested tasks are running:")
// print a list of running tasks, and their ports
for _, node := range subgraph.Nodes {
if (node.Phase == "running" || node.Phase == "stalled") && node.Task.Ports != nil {
for _, port := range node.Task.Ports {
logger.Printf(" - %s: http://localhost:%d\n", node.Name, port.HostPort)
if !allRunning {
allRunning = true
logger.Println("πŸ”΅ all requested tasks are running:")
// print a list of running tasks, and their ports
for _, node := range subgraph.Nodes {
if (node.Phase == "running" || node.Phase == "stalled") && node.Task.Ports != nil {
for _, port := range node.Task.Ports {
logger.Printf(" - %s: http://localhost:%d\n", node.Name, port.HostPort)
}
}
}
}
} else {
allRunning = false
}

// if the event is a string, it is the name of the task to run
Expand All @@ -308,6 +315,7 @@ func RunSubgraph(ctx context.Context, cancel context.CancelFunc, port int, openB
node := subgraph.Nodes[taskName]

node.cancel()
allRunning = false

// each task is executed in a separate goroutine
wg.Add(1)
Expand Down
64 changes: 64 additions & 0 deletions internal/run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,70 @@ sleep 30
err := RunSubgraph(ctx, cancel, 0, false, logger, wf, []string{"job"}, nil)
assert.NoError(t, err)
})

t.Run("Ready message printed only once when service stays running", func(t *testing.T) {
ctx, cancel, logger, buffer := setup(t)
defer cancel()

wf := &types.Workflow{
Tasks: map[string]types.Task{
"service": {Command: []string{"sleep", "30"}, Type: types.TaskTypeService},
},
}
wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
err := RunSubgraph(ctx, cancel, 0, false, logger, wf, []string{"service"}, nil)
assert.NoError(t, err)
}()

sleep(t)
cancel()
wg.Wait()

// The ready message should appear exactly once
count := strings.Count(buffer.String(), "πŸ”΅ all requested tasks are running:")
assert.Equal(t, 1, count)
})

t.Run("Ready message printed again after service restarts", func(t *testing.T) {
ctx, cancel, logger, buffer := setup(t)
defer cancel()

wf := &types.Workflow{
Tasks: map[string]types.Task{
"service": {
Command: []string{"sleep", "30"},
Watch: []string{"testdata/marker"},
Type: types.TaskTypeService,
},
},
}

wg := &sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
err := RunSubgraph(ctx, cancel, 0, false, logger, wf, []string{"service"}, nil)
assert.NoError(t, err)
}()

sleep(t)

// Trigger a restart by modifying the watched file
err := os.WriteFile("testdata/marker", nil, 0644)
assert.NoError(t, err)

sleep(t)

cancel()
wg.Wait()

// The ready message should appear twice: once initially, once after restart
count := strings.Count(buffer.String(), "πŸ”΅ all requested tasks are running:")
assert.Equal(t, 2, count)
})
}

func sleep(t *testing.T) {
Expand Down
Loading