Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions book/src/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
- [Components Resources](framework/components/resources.md)
- [Containers Network Isolation](framework/components/network_isolation.md)
- [Fake Services](framework/components/mocking.md)
- [Detecting Resource Leaks](framework/resource_leaks.md)
- [Copying Files](framework/copying_files.md)
- [External Environment](framework/components/external.md)
- [Observability Stack](framework/observability/observability_stack.md)
Expand Down
57 changes: 57 additions & 0 deletions book/src/framework/resource_leaks.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
## Resource Leak Detector

We have a simple utility to detect resource leaks in our tests

## CL Nodes Leak Detection

In this example test will fail if any node will consume more than 2 additional cores and allocate 20% more memory at the end of a test.
```go
import (
"github.com/smartcontractkit/chainlink-testing-framework/framework/leak"
)
```

```go
l, err := leak.NewCLNodesLeakDetector(leak.NewResourceLeakChecker())
require.NoError(t, err)
errs := l.Check(&leak.CLNodesCheck{
NumNodes: in.NodeSets[0].Nodes,
Start: start,
End: time.Now(),
WarmUpDuration: 10 * time.Minute,
CPUThreshold: 2000.0,
MemoryThreshold: 20.0,
})
require.NoError(t, errs)
```

## Custom Resource Assertion

You can also use low-level API to verify
```go
diff, err := lc.MeasureDelta(&leak.CheckConfig{
Query: fmt.Sprintf(`quantile_over_time(0.5, container_memory_rss{name="don-node%d"}[1h]) / 1024 / 1024`, i),
Start: mustTime("2026-01-12T21:53:00Z"),
End: mustTime("2026-01-13T10:11:00Z"),
WarmUpDuration: 1 * time.Hour,
})
require.NoError(t, err)
```

## Adding New Queries

You can use our test `hog` to debug new metrics and verify its correctness
```bash
cd framework/leak/cmd
just build
```

Run different hogs
```bash
ctf obs up
go test -v -timeout 1h -run TestCyclicHog
```
Then verify your query
```bash
go test -v -run TestVerifyCyclicHog
```
2 changes: 2 additions & 0 deletions framework/.changeset/v0.13.2.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
- Add pprof dumps download
- Generalize and change CPU query
14 changes: 14 additions & 0 deletions framework/leak/cmd/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Build stage
FROM golang:1.25-alpine AS builder

WORKDIR /app

COPY go.mod ./
RUN go mod download
COPY . .
RUN go build -o resource-hog main.go

FROM alpine:latest
WORKDIR /root/
COPY --from=builder /app/resource-hog .
CMD ["./resource-hog"]
8 changes: 8 additions & 0 deletions framework/leak/cmd/Justfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
run:
go run main.go

build:
docker build -t resource-hog:latest .

clean-hog:
docker rm -f resource-hog-test 2>/dev/null || true
3 changes: 3 additions & 0 deletions framework/leak/cmd/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
module github.com/smartcontractkit/chainlink-testing-framework/framework/hog

go 1.25.4
149 changes: 149 additions & 0 deletions framework/leak/cmd/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package main

import (
"fmt"
"log"
"math"
"os"
"strconv"
"strings"
"time"
)

const (
StepTick = 3 * time.Minute
)

func main() {
fmt.Println("Starting CPU and Memory hog...")

workersSchedule := os.Getenv("WORKERS")
memorySchedule := os.Getenv("MEMORY")
repeatStr := os.Getenv("REPEAT")

leaks := make([][]byte, 0)
workerCounter := 0

go scheduleMemoryLeaks(memorySchedule, parseRepeat(repeatStr), &leaks)
go scheduleCPUWorkers(workersSchedule, parseRepeat(repeatStr), &workerCounter)

select {}
}

func parseRepeat(repeatStr string) int {
if repeatStr == "" {
return 1
}
repeat, _ := strconv.Atoi(repeatStr)
if repeat < 1 {
return 1
}
return repeat
}

func scheduleMemoryLeaks(schedule string, repeat int, leaks *[][]byte) {
if schedule == "" {
return
}

levels := parseSchedule(schedule)

for r := 0; r < repeat; r++ {
for _, target := range levels {
timer := time.NewTimer(StepTick)

current := len(*leaks) / 100

if target > current {
for i := 0; i < target-current; i++ {
leak := make([]byte, 100*1024*1024)
for j := range leak {
leak[j] = byte(j % 256)
}
*leaks = append(*leaks, leak)
}
} else if target < current {
removeCount := current - target
if removeCount > len(*leaks)/100 {
removeCount = len(*leaks) / 100
}
*leaks = (*leaks)[:len(*leaks)-removeCount*100]
}

log.Printf("Memory: %dx100MB", len(*leaks)/100)
<-timer.C
}
}
}

func scheduleCPUWorkers(schedule string, repeat int, counter *int) {
if schedule == "" {
return
}

levels := parseSchedule(schedule)
activeWorkers := 0

for r := 0; r < repeat; r++ {
for _, target := range levels {
timer := time.NewTimer(StepTick)

if target > activeWorkers {
for i := 0; i < target-activeWorkers; i++ {
*counter++
go cpuWorker(*counter)
}
} else if target < activeWorkers {
stopWorkers(activeWorkers - target)
}

activeWorkers = target
log.Printf("CPU Workers: %d", activeWorkers)
<-timer.C
}
}
}

var stopChan = make(chan bool, 1000)

func cpuWorker(id int) {
for {
select {
case <-stopChan:
return
default:
for n := 2; n < 100000; n++ {
isPrime := true
for i := 2; i <= int(math.Sqrt(float64(n))); i++ {
if n%i == 0 {
isPrime = false
break
}
}
_ = isPrime
}
time.Sleep(10 * time.Millisecond)
}
}
}

func stopWorkers(count int) {
for i := 0; i < count; i++ {
select {
case stopChan <- true:
default:
}
}
}

func parseSchedule(schedule string) []int {
parts := strings.Split(schedule, ",")
var result []int
for _, part := range parts {
val, _ := strconv.Atoi(strings.TrimSpace(part))
if val >= 0 {
result = append(result, val)
}
}
return result
}
22 changes: 11 additions & 11 deletions framework/leak/detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ type CheckConfig struct {
WarmUpDuration time.Duration
}

// MeasureLeak measures resource leak between start and end timestamps
// MeasureDelta measures resource leak delta between start and end timestamps
// WarmUpDuration is used to ignore warm up interval results for more stable comparison
func (rc *ResourceLeakChecker) MeasureLeak(
func (rc *ResourceLeakChecker) MeasureDelta(
c *CheckConfig,
) (float64, error) {
if c.Start.After(c.End) {
Expand All @@ -91,10 +91,10 @@ func (rc *ResourceLeakChecker) MeasureLeak(
resStart := memStart.Data.Result
resEnd := memEnd.Data.Result
if len(resStart) == 0 {
return 0, fmt.Errorf("no results for start timestamp: %s", c.Start)
return 0, fmt.Errorf("no results for start timestamp: %s, query: %s", startWithWarmUp, c.Query)
}
if len(resEnd) == 0 {
return 0, fmt.Errorf("no results for end timestamp: %s", c.End)
return 0, fmt.Errorf("no results for end timestamp: %s, query: %s", c.End, c.Query)
}

if len(resStart[0].Value) < 2 {
Expand All @@ -113,21 +113,21 @@ func (rc *ResourceLeakChecker) MeasureLeak(
return 0, fmt.Errorf("invalid Prometheus response value for timestamp: %s, value: %v", c.End, memEnd.Data.Result[0].Value[1])
}

memStartValFloat, err := strconv.ParseFloat(memStartVal, 64)
startValFloat, err := strconv.ParseFloat(memStartVal, 64)
if err != nil {
return 0, fmt.Errorf("start quantile can't be parsed from string: %w", err)
}
memEndValFloat, err := strconv.ParseFloat(memEndVal, 64)
endValFloat, err := strconv.ParseFloat(memEndVal, 64)
if err != nil {
return 0, fmt.Errorf("start quantile can't be parsed from string: %w", err)
}

totalIncreasePercentage := (memEndValFloat / memStartValFloat * 100) - 100
totalIncreasePercentage := (endValFloat / startValFloat * 100) - 100

f.L.Debug().
Float64("Start", memStartValFloat).
Float64("End", memEndValFloat).
f.L.Info().
Float64("Start", startValFloat).
Float64("End", endValFloat).
Float64("Increase", totalIncreasePercentage).
Msg("Memory increase total (percentage)")
Msg("Increase total (percentage)")
return totalIncreasePercentage, nil
}
9 changes: 5 additions & 4 deletions framework/leak/detector_cl_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,9 @@ func NewCLNodesLeakDetector(c *ResourceLeakChecker, opts ...func(*CLNodesLeakDet
}
switch cd.Mode {
case "devenv":
cd.CPUQuery = `sum(rate(container_cpu_usage_seconds_total{name=~"don-node%d"}[5m])) * 100`
cd.MemoryQuery = `quantile_over_time(0.5, container_memory_rss{name="don-node%d"}[1h]) / 1024 / 1024`
// aggregate it on 5m interval with 2m step for mitigating spikes
cd.CPUQuery = `avg_over_time((sum(rate(container_cpu_usage_seconds_total{name="don-node%d"}[5m])) * 100)[5m:2m])`
cd.MemoryQuery = `avg_over_time(container_memory_rss{name="don-node%d"}[5m]) / 1024 / 1024`
case "griddle":
return nil, fmt.Errorf("not implemented yet")
default:
Expand All @@ -74,7 +75,7 @@ func (cd *CLNodesLeakDetector) Check(t *CLNodesCheck) error {
cpuDiffs := make([]float64, 0)
errs := make([]error, 0)
for i := range t.NumNodes {
memoryDiff, err := cd.c.MeasureLeak(&CheckConfig{
memoryDiff, err := cd.c.MeasureDelta(&CheckConfig{
Query: fmt.Sprintf(cd.MemoryQuery, i),
Start: t.Start,
End: t.End,
Expand All @@ -84,7 +85,7 @@ func (cd *CLNodesLeakDetector) Check(t *CLNodesCheck) error {
return fmt.Errorf("memory leak check failed: %w", err)
}
memoryDiffs = append(memoryDiffs, memoryDiff)
cpuDiff, err := cd.c.MeasureLeak(&CheckConfig{
cpuDiff, err := cd.c.MeasureDelta(&CheckConfig{
Query: fmt.Sprintf(cd.CPUQuery, i),
Start: t.Start,
End: t.End,
Expand Down
Loading
Loading