Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions framework/.changeset/v0.13.1.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
- Add resource leak detector automation
133 changes: 133 additions & 0 deletions framework/leak/detector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package leak

/*
Resource leak detector
This module provides a Prometheus-based leak detector for long-running soak tests. It detects leaks by comparing the median resource usage at the start and end of a test and flags any increases that breach configured thresholds.

Usage Note: Set the WarmUpDuration to at least 20% of your test length for reliable metrics.
It is also recommend to use it with 3h+ soak tests for less false-positives.
*/

import (
"fmt"
"strconv"
"time"

f "github.com/smartcontractkit/chainlink-testing-framework/framework"
)

// ResourceLeakCheckerConfig is resource leak checker config with Prometheus base URL
type ResourceLeakCheckerConfig struct {
PrometheusBaseURL string
}

// ResourceLeakChecker is resource leak cheker instance
type ResourceLeakChecker struct {
PrometheusURL string
c PromQuerier
}

// WithPrometheusBaseURL sets Prometheus base URL, example http://localhost:9099
func WithPrometheusBaseURL(url string) func(*ResourceLeakChecker) {
return func(rlc *ResourceLeakChecker) {
rlc.PrometheusURL = url
}
}

// WithQueryClient sets Prometheus query client
func WithQueryClient(c PromQuerier) func(*ResourceLeakChecker) {
return func(rlc *ResourceLeakChecker) {
rlc.c = c
}
}

// PromQueries is an interface for querying Prometheus containing only methods we need for detecting resource leaks
type PromQuerier interface {
Query(query string, timestamp time.Time) (*f.PrometheusQueryResponse, error)
}

// NewResourceLeakChecker creates a new resource leak checker
func NewResourceLeakChecker(opts ...func(*ResourceLeakChecker)) *ResourceLeakChecker {
lc := &ResourceLeakChecker{}
for _, o := range opts {
o(lc)
}
if lc.c == nil {
lc.c = f.NewPrometheusQueryClient(f.LocalPrometheusBaseURL)
}
return lc
}

// CheckConfig describes leak check configuration
type CheckConfig struct {
Query string
Start time.Time
End time.Time
WarmUpDuration time.Duration
}

// MeasureLeak measures resource leak between start and end timestamps
// WarmUpDuration is used to ignore warm up interval results for more stable comparison
func (rc *ResourceLeakChecker) MeasureLeak(
c *CheckConfig,
) (float64, error) {
if c.Start.After(c.End) {
return 0, fmt.Errorf("start time is greated than end time: %s -> %s", c.Start, c.End)
}
if c.WarmUpDuration > c.End.Sub(c.Start)/2 {
return 0, fmt.Errorf("warm up duration can't be more than 50 percent of test interval between start and end timestamps: %s", c.WarmUpDuration)
}
startWithWarmUp := c.Start.Add(c.WarmUpDuration)
memStart, err := rc.c.Query(c.Query, startWithWarmUp)
if err != nil {
return 0, fmt.Errorf("failed to get memory for the test start: %w", err)
}

memEnd, err := rc.c.Query(c.Query, c.End)
if err != nil {
return 0, fmt.Errorf("failed to get memory for the test end: %w", err)
}

resStart := memStart.Data.Result
resEnd := memEnd.Data.Result
if len(resStart) == 0 {
return 0, fmt.Errorf("no results for start timestamp: %s", c.Start)
}
if len(resEnd) == 0 {
return 0, fmt.Errorf("no results for end timestamp: %s", c.End)
}

if len(resStart[0].Value) < 2 {
return 0, fmt.Errorf("invalid Prometheus response for start timestamp, should have timestamp and value: %s", c.Start)
}
if len(resEnd[0].Value) < 2 {
return 0, fmt.Errorf("invalid Prometheus response for end timestamp, should have timestamp and value: %s", c.End)
}

memStartVal, startOk := memStart.Data.Result[0].Value[1].(string)
if !startOk {
return 0, fmt.Errorf("invalid Prometheus response value for timestamp: %s, value: %v", c.Start, memStart.Data.Result[0].Value[1])
}
memEndVal, endOk := memEnd.Data.Result[0].Value[1].(string)
if !endOk {
return 0, fmt.Errorf("invalid Prometheus response value for timestamp: %s, value: %v", c.End, memEnd.Data.Result[0].Value[1])
}

memStartValFloat, err := strconv.ParseFloat(memStartVal, 64)
if err != nil {
return 0, fmt.Errorf("start quantile can't be parsed from string: %w", err)
}
memEndValFloat, err := strconv.ParseFloat(memEndVal, 64)
if err != nil {
return 0, fmt.Errorf("start quantile can't be parsed from string: %w", err)
}

totalIncreasePercentage := (memEndValFloat / memStartValFloat * 100) - 100

f.L.Debug().
Float64("Start", memStartValFloat).
Float64("End", memEndValFloat).
Float64("Increase", totalIncreasePercentage).
Msg("Memory increase total (percentage)")
return totalIncreasePercentage, nil
}
116 changes: 116 additions & 0 deletions framework/leak/detector_cl_node.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package leak

import (
"errors"
"fmt"
"time"

"github.com/smartcontractkit/chainlink-testing-framework/framework"
)

// ClNodesCheck contains thresholds which can be verified for each Chainlink node
// it is recommended to set some WarmUpDuration, 20% of overall test time
// to have more stable results
type CLNodesCheck struct {
NumNodes int
Start time.Time
End time.Time
WarmUpDuration time.Duration
CPUThreshold float64
MemoryThreshold float64
}

// CLNodesLeakDetector is Chainlink node specific resource leak detector
// can be used with both local and remote Chainlink node sets (DONs)
type CLNodesLeakDetector struct {
Mode string
CPUQuery, MemoryQuery string
c *ResourceLeakChecker
}

// WithCPUQuery allows to override CPU leak query (Prometheus)
func WithCPUQuery(q string) func(*CLNodesLeakDetector) {
return func(cd *CLNodesLeakDetector) {
cd.CPUQuery = q
}
}

// WithCPUQuery allows to override Memory leak query (Prometheus)
func WithMemoryQuery(q string) func(*CLNodesLeakDetector) {
return func(cd *CLNodesLeakDetector) {
cd.MemoryQuery = q
}
}

// NewCLNodesLeakDetector create new Chainlink node specific resource leak detector with Prometheus client
func NewCLNodesLeakDetector(c *ResourceLeakChecker, opts ...func(*CLNodesLeakDetector)) (*CLNodesLeakDetector, error) {
cd := &CLNodesLeakDetector{
c: c,
}
for _, o := range opts {
o(cd)
}
if cd.Mode == "" {
cd.Mode = "devenv"
}
switch cd.Mode {
case "devenv":
cd.CPUQuery = `sum(rate(container_cpu_usage_seconds_total{name=~"don-node%d"}[5m])) * 100`
cd.MemoryQuery = `quantile_over_time(0.5, container_memory_rss{name="don-node%d"}[1h]) / 1024 / 1024`
case "griddle":
return nil, fmt.Errorf("not implemented yet")
default:
return nil, fmt.Errorf("invalid mode, use: 'devenv' or 'griddle'")
}
return cd, nil
}

// Check runs all resource leak checks and returns errors if threshold reached for any of them
func (cd *CLNodesLeakDetector) Check(t *CLNodesCheck) error {
if t.NumNodes == 0 {
return fmt.Errorf("cl nodes num must be > 0")
}
memoryDiffs := make([]float64, 0)
cpuDiffs := make([]float64, 0)
errs := make([]error, 0)
for i := range t.NumNodes {
memoryDiff, err := cd.c.MeasureLeak(&CheckConfig{
Query: fmt.Sprintf(cd.MemoryQuery, i),
Start: t.Start,
End: t.End,
WarmUpDuration: t.WarmUpDuration,
})
if err != nil {
return fmt.Errorf("memory leak check failed: %w", err)
}
memoryDiffs = append(memoryDiffs, memoryDiff)
cpuDiff, err := cd.c.MeasureLeak(&CheckConfig{
Query: fmt.Sprintf(cd.CPUQuery, i),
Start: t.Start,
End: t.End,
WarmUpDuration: t.WarmUpDuration,
})
if err != nil {
return fmt.Errorf("cpu leak check failed: %w", err)
}
cpuDiffs = append(cpuDiffs, cpuDiff)

if memoryDiff >= t.MemoryThreshold {
errs = append(errs, fmt.Errorf(
"Memory leak detected for node %d and interval: [%s -> %s], diff: %.f",
i, t.Start, t.End, memoryDiff,
))
}
if cpuDiff >= t.CPUThreshold {
errs = append(errs, fmt.Errorf(
"CPU leak detected for node %d and interval: [%s -> %s], diff: %.f",
i, t.Start, t.End, cpuDiff,
))
}
}
framework.L.Info().
Any("MemoryDiffs", memoryDiffs).
Any("CPUDiffs", cpuDiffs).
Msg("Leaks info")
return errors.Join(errs...)
}
48 changes: 48 additions & 0 deletions framework/leak/detector_fake.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package leak

import (
"time"

f "github.com/smartcontractkit/chainlink-testing-framework/framework"
)

var _ PromQuerier = (*FakeQueryClient)(nil)

type FakeQueryClient struct {
isStartResp bool
startResp *f.PrometheusQueryResponse
endResp *f.PrometheusQueryResponse
}

func NewFakeQueryClient() *FakeQueryClient {
return &FakeQueryClient{}
}

func (qc *FakeQueryClient) SetResponses(sr *f.PrometheusQueryResponse, er *f.PrometheusQueryResponse) {
qc.isStartResp = true
qc.startResp = sr
qc.endResp = er
}

func (qc *FakeQueryClient) Query(query string, timestamp time.Time) (*f.PrometheusQueryResponse, error) {
if qc.isStartResp {
qc.isStartResp = false
return qc.startResp, nil
}
qc.isStartResp = true
return qc.endResp, nil
}

func PromSingleValueResponse(val string) *f.PrometheusQueryResponse {
return &f.PrometheusQueryResponse{
Data: &f.PromQueryResponseData{
Result: []f.PromQueryResponseResult{
{
Metric: map[string]string{},
// timestamp is irrelevant in tests, we trust Prometheus
Value: []interface{}{"", val},
},
},
},
}
}
Loading
Loading