Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 20 additions & 20 deletions internal/runner/ci_node_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,12 @@ import (

"github.com/DataDog/ddtest/internal/constants"
"github.com/DataDog/ddtest/internal/framework"
"github.com/DataDog/ddtest/internal/settings"
"golang.org/x/sync/errgroup"
)

// runCINodeTests executes tests for a specific CI node (one split, not the whole tests set).
// It further splits the node's tests among local workers based on ci_node_workers setting.
func runCINodeTests(ctx context.Context, framework framework.Framework, workerEnvMap map[string]string, ciNode int) error {
return runCINodeTestsWithWorkers(ctx, framework, workerEnvMap, ciNode, settings.GetCiNodeWorkers())
}

// runCINodeTestsWithWorkers is the internal implementation that accepts ciNodeWorkers as a parameter
// for easier testing.
func runCINodeTestsWithWorkers(ctx context.Context, framework framework.Framework, workerEnvMap map[string]string, ciNode int, ciNodeWorkers int) error {
func runCINodeTests(ctx context.Context, framework framework.Framework, workerEnvMap map[string]string, ciNode int, ciNodeWorkers int, testFileWeights map[string]int) error {
runnerFilePath := fmt.Sprintf("%s/runner-%d", constants.TestsSplitDir, ciNode)
if _, err := os.Stat(runnerFilePath); os.IsNotExist(err) {
return fmt.Errorf("runner file for ci-node %d does not exist: %s", ciNode, runnerFilePath)
Expand All @@ -46,14 +39,22 @@ func runCINodeTestsWithWorkers(ctx context.Context, framework framework.Framewor
slog.Info("Running tests for CI node in parallel mode",
"ciNode", ciNode, "ciNodeWorkers", ciNodeWorkers, "testFilesCount", len(testFiles))

groups := subsplitTestsBetweenWorkers(testFiles, ciNodeWorkers)
if testFileWeights == nil {
testFileWeights = map[string]int{}
}
groups := subsplitTestsBetweenWorkers(testFiles, ciNodeWorkers, testFileWeights)

var g errgroup.Group
for workerIndex, groupFiles := range groups {
if len(groupFiles) == 0 {
continue
}

slog.Debug("Assigned test files to CI node worker",
"ciNode", ciNode,
"workerIndex", workerIndex,
"testFiles", groupFiles)

g.Go(func() error {
return runTestBatch(ctx, framework, groupFiles, workerEnvMap, ciNode, workerIndex)
})
Expand All @@ -66,21 +67,20 @@ func runCINodeTestsWithWorkers(ctx context.Context, framework framework.Framewor
}

// subsplitTestsBetweenWorkers splits a CI node's test files among local workers
// using simple round-robin distribution.
func subsplitTestsBetweenWorkers(testFiles []string, n int) [][]string {
// using the same weighted distribution algorithm used for CI node splits.
func subsplitTestsBetweenWorkers(testFiles []string, n int, testFileWeights map[string]int) [][]string {
if n <= 0 {
n = 1
}

result := make([][]string, n)
for i := range result {
result[i] = []string{}
}

for i, file := range testFiles {
groupIndex := i % n
result[groupIndex] = append(result[groupIndex], file)
nodeTestFiles := make(map[string]int, len(testFiles))
for _, testFile := range testFiles {
if cachedWeight, ok := testFileWeights[testFile]; ok && cachedWeight > 0 {
nodeTestFiles[testFile] = cachedWeight
} else {
nodeTestFiles[testFile] = defaultTestFileWeight
}
}

return result
return DistributeTestFiles(nodeTestFiles, n)
}
79 changes: 59 additions & 20 deletions internal/runner/ci_node_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ func TestRunCINodeTests_SingleWorker(t *testing.T) {
}

// Test with single worker (ciNodeWorkers=1)
err := runCINodeTestsWithWorkers(context.Background(), mockFramework, map[string]string{}, 1, 1)
err := runCINodeTests(context.Background(), mockFramework, map[string]string{}, 1, 1, nil)
if err != nil {
t.Fatalf("runCINodeTestsWithWorkers() should not return error, got: %v", err)
t.Fatalf("runCINodeTests() should not return error, got: %v", err)
}

// Verify RunTests was called exactly once
Expand All @@ -50,6 +50,7 @@ func TestRunCINodeTests_MultipleWorkers(t *testing.T) {
oldWd, _ := os.Getwd()
defer func() { _ = os.Chdir(oldWd) }()
_ = os.Chdir(tempDir)
logs := captureLogs(t)

// Setup test split directory and files - 4 test files for ci-node 1
_ = os.MkdirAll(filepath.Join(constants.PlanDirectory, "tests-split"), 0755)
Expand All @@ -62,9 +63,9 @@ func TestRunCINodeTests_MultipleWorkers(t *testing.T) {
}

// Test with 2 workers on ci-node 1
err := runCINodeTestsWithWorkers(context.Background(), mockFramework, map[string]string{}, 1, 2)
err := runCINodeTests(context.Background(), mockFramework, map[string]string{}, 1, 2, nil)
if err != nil {
t.Fatalf("runCINodeTestsWithWorkers() should not return error, got: %v", err)
t.Fatalf("runCINodeTests() should not return error, got: %v", err)
}

// Verify RunTests was called twice (once per worker)
Expand All @@ -84,6 +85,21 @@ func TestRunCINodeTests_MultipleWorkers(t *testing.T) {
if !slices.Equal(allFiles, expectedFiles) {
t.Errorf("Expected all test files %v to be distributed, got %v", expectedFiles, allFiles)
}

logOutput := logs.String()
if strings.Contains(logOutput, "Assigned tests to CI node worker") {
t.Errorf("Expected no INFO logs for CI node worker assignments, got logs: %s", logOutput)
}
if strings.Count(logOutput, "Assigned test files to CI node worker") != 2 ||
!strings.Contains(logOutput, "ciNode=1") ||
!strings.Contains(logOutput, "workerIndex=0") ||
!strings.Contains(logOutput, "workerIndex=1") ||
!strings.Contains(logOutput, "test/file1_test.rb") ||
!strings.Contains(logOutput, "test/file2_test.rb") ||
!strings.Contains(logOutput, "test/file3_test.rb") ||
!strings.Contains(logOutput, "test/file4_test.rb") {
t.Errorf("Expected DEBUG logs with assigned CI node worker test files, got logs: %s", logOutput)
}
}

func TestRunCINodeTests_NodeIndexMatchesCINode(t *testing.T) {
Expand All @@ -108,9 +124,9 @@ func TestRunCINodeTests_NodeIndexMatchesCINode(t *testing.T) {
}

// Test with 2 workers on ci-node 1
err := runCINodeTestsWithWorkers(context.Background(), mockFramework, workerEnvMap, 1, 2)
err := runCINodeTests(context.Background(), mockFramework, workerEnvMap, 1, 2, nil)
if err != nil {
t.Fatalf("runCINodeTestsWithWorkers() should not return error, got: %v", err)
t.Fatalf("runCINodeTests() should not return error, got: %v", err)
}

// Verify RunTests was called twice
Expand Down Expand Up @@ -161,9 +177,9 @@ func TestRunCINodeTests_SingleWorkerNodeIndex(t *testing.T) {
"WORKER_INDEX": "{{workerIndex}}",
}

err := runCINodeTestsWithWorkers(context.Background(), mockFramework, workerEnvMap, 2, 1)
err := runCINodeTests(context.Background(), mockFramework, workerEnvMap, 2, 1, nil)
if err != nil {
t.Fatalf("runCINodeTestsWithWorkers() should not return error, got: %v", err)
t.Fatalf("runCINodeTests() should not return error, got: %v", err)
}

calls := mockFramework.GetRunTestsCalls()
Expand All @@ -190,9 +206,9 @@ func TestRunCINodeTests_FileNotFound(t *testing.T) {

mockFramework := &MockFramework{FrameworkName: "rspec"}

err := runCINodeTestsWithWorkers(context.Background(), mockFramework, map[string]string{}, 2, 1)
err := runCINodeTests(context.Background(), mockFramework, map[string]string{}, 2, 1, nil)
if err == nil {
t.Error("runCINodeTestsWithWorkers() should return error when runner file doesn't exist")
t.Error("runCINodeTests() should return error when runner file doesn't exist")
}

expectedMsg := "runner file for ci-node 2 does not exist"
Expand All @@ -214,9 +230,9 @@ func TestRunCINodeTests_EmptyFile(t *testing.T) {
mockFramework := &MockFramework{FrameworkName: "rspec"}

// Should not error for empty file, just not run any tests
err := runCINodeTestsWithWorkers(context.Background(), mockFramework, map[string]string{}, 0, 2)
err := runCINodeTests(context.Background(), mockFramework, map[string]string{}, 0, 2, nil)
if err != nil {
t.Fatalf("runCINodeTestsWithWorkers() should not return error for empty file, got: %v", err)
t.Fatalf("runCINodeTests() should not return error for empty file, got: %v", err)
}

// Verify no tests were run
Expand All @@ -228,13 +244,13 @@ func TestRunCINodeTests_EmptyFile(t *testing.T) {
func TestSubsplitTestsBetweenWorkers(t *testing.T) {
t.Run("even split", func(t *testing.T) {
files := []string{"a", "b", "c", "d"}
result := subsplitTestsBetweenWorkers(files, 2)
result := subsplitTestsBetweenWorkers(files, 2, map[string]int{})

if len(result) != 2 {
t.Fatalf("Expected 2 groups, got %d", len(result))
}

// Round-robin: a->0, b->1, c->0, d->1
// Equal default weights keep this balanced as a->0, b->1, c->0, d->1.
expected0 := []string{"a", "c"}
expected1 := []string{"b", "d"}

Expand All @@ -248,13 +264,13 @@ func TestSubsplitTestsBetweenWorkers(t *testing.T) {

t.Run("uneven split", func(t *testing.T) {
files := []string{"a", "b", "c", "d", "e"}
result := subsplitTestsBetweenWorkers(files, 2)
result := subsplitTestsBetweenWorkers(files, 2, map[string]int{})

if len(result) != 2 {
t.Fatalf("Expected 2 groups, got %d", len(result))
}

// Round-robin: a->0, b->1, c->0, d->1, e->0
// Equal default weights keep this balanced as a->0, b->1, c->0, d->1, e->0.
expected0 := []string{"a", "c", "e"}
expected1 := []string{"b", "d"}

Expand All @@ -268,7 +284,7 @@ func TestSubsplitTestsBetweenWorkers(t *testing.T) {

t.Run("more groups than files", func(t *testing.T) {
files := []string{"a", "b"}
result := subsplitTestsBetweenWorkers(files, 4)
result := subsplitTestsBetweenWorkers(files, 4, map[string]int{})

if len(result) != 4 {
t.Fatalf("Expected 4 groups, got %d", len(result))
Expand All @@ -291,7 +307,7 @@ func TestSubsplitTestsBetweenWorkers(t *testing.T) {

t.Run("single group", func(t *testing.T) {
files := []string{"a", "b", "c"}
result := subsplitTestsBetweenWorkers(files, 1)
result := subsplitTestsBetweenWorkers(files, 1, map[string]int{})

if len(result) != 1 {
t.Fatalf("Expected 1 group, got %d", len(result))
Expand All @@ -303,7 +319,7 @@ func TestSubsplitTestsBetweenWorkers(t *testing.T) {
})

t.Run("empty input", func(t *testing.T) {
result := subsplitTestsBetweenWorkers([]string{}, 3)
result := subsplitTestsBetweenWorkers([]string{}, 3, map[string]int{})

if len(result) != 3 {
t.Fatalf("Expected 3 groups, got %d", len(result))
Expand All @@ -318,7 +334,7 @@ func TestSubsplitTestsBetweenWorkers(t *testing.T) {

t.Run("zero groups defaults to 1", func(t *testing.T) {
files := []string{"a", "b"}
result := subsplitTestsBetweenWorkers(files, 0)
result := subsplitTestsBetweenWorkers(files, 0, map[string]int{})

if len(result) != 1 {
t.Fatalf("Expected 1 group for n=0, got %d", len(result))
Expand All @@ -328,4 +344,27 @@ func TestSubsplitTestsBetweenWorkers(t *testing.T) {
t.Errorf("Expected all files in single group, got %v", result[0])
}
})

t.Run("weighted split keeps heavy file isolated", func(t *testing.T) {
files := []string{"a", "b", "c", "d"}
weights := map[string]int{
"a": 100,
"b": 1,
"c": 1,
"d": 1,
}

result := subsplitTestsBetweenWorkers(files, 2, weights)

if len(result) != 2 {
t.Fatalf("Expected 2 groups, got %d", len(result))
}

if !slices.Equal(result[0], []string{"a"}) {
t.Errorf("Expected heavy file to be alone in group 0, got %v", result[0])
}
if !slices.Equal(result[1], []string{"b", "c", "d"}) {
t.Errorf("Expected light files in group 1, got %v", result[1])
}
})
}
Loading
Loading