Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions aggregator/pkg/storage/metrics_aware_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func WrapWithMetrics(inner CommitVerificationStorage, m common.AggregatorMonitor
}

func (s *MetricsAwareStorage) SaveCommitVerification(ctx context.Context, record *model.CommitVerificationRecord, aggregationKey model.AggregationKey) error {
return s.captureMetricsNoReturn(ctx, saveOp, func() error {
return captureMetricsNoReturn(ctx, s.metrics(ctx, saveOp), s.logger(ctx), s.slowQueryThreshold, saveOp, func() error {
return s.inner.SaveCommitVerification(ctx, record, aggregationKey)
})
}
Expand Down Expand Up @@ -110,7 +110,7 @@ func (s *MetricsAwareStorage) GetBatchAggregatedReportByMessageIDs(ctx context.C
}

func (s *MetricsAwareStorage) SubmitAggregatedReport(ctx context.Context, report *model.CommitAggregatedReport) error {
return s.captureMetricsNoReturn(ctx, submitReportOp, func() error {
return captureMetricsNoReturn(ctx, s.metrics(ctx, submitReportOp), s.logger(ctx), s.slowQueryThreshold, submitReportOp, func() error {
return s.inner.SubmitAggregatedReport(ctx, report)
})
}
Expand All @@ -134,7 +134,7 @@ func (s *MetricsAwareStorage) Get(ctx context.Context, id string) (*messagerules
}

func (s *MetricsAwareStorage) Delete(ctx context.Context, id string) error {
return s.captureMetricsNoReturn(ctx, deleteMessageDisablementRuleOp, func() error {
return captureMetricsNoReturn(ctx, s.metrics(ctx, deleteMessageDisablementRuleOp), s.logger(ctx), s.slowQueryThreshold, deleteMessageDisablementRuleOp, func() error {
return s.inner.Delete(ctx, id)
})
}
Expand Down Expand Up @@ -200,8 +200,8 @@ func captureMetrics[T any](ctx context.Context, metrics common.AggregatorMetricL
return res, err
}

func (s *MetricsAwareStorage) captureMetricsNoReturn(ctx context.Context, operation string, fn func() error) error {
_, err := captureMetrics(ctx, s.metrics(ctx, operation), s.logger(ctx), s.slowQueryThreshold, operation, func() (struct{}, error) {
func captureMetricsNoReturn(ctx context.Context, metrics common.AggregatorMetricLabeler, l logger.SugaredLogger, threshold time.Duration, operation string, fn func() error) error {
_, err := captureMetrics(ctx, metrics, l, threshold, operation, func() (struct{}, error) {
return struct{}{}, fn()
})
return err
Expand Down
2 changes: 1 addition & 1 deletion indexer/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,5 +294,5 @@ func createPostgresStorage(ctx context.Context, lggr logger.Logger, cfg *config.
lggr.Fatalf("Failed to create postgres storage: %v", err)
}

return dbStore
return storage.WrapWithMetrics(dbStore, indexerMonitoring, lggr)
}
10 changes: 4 additions & 6 deletions indexer/pkg/common/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,10 @@ type IndexerMetricLabeler interface {
RecordHTTPRequestDuration(ctx context.Context, duration time.Duration, path, method string, status int)
// IncrementVerificationRecordsCounter increments the verification records counter.
IncrementVerificationRecordsCounter(ctx context.Context)
// RecordStorageQueryDuration records the storage query duration.
RecordStorageQueryDuration(ctx context.Context, duration time.Duration, queryName string, errored bool)
// RecordStorageWriteDuration records the storage write duration.
RecordStorageWriteDuration(ctx context.Context, duration time.Duration)
// RecordStorageInsertErrorsCounter records the storage insert errors counter.
RecordStorageInsertErrorsCounter(ctx context.Context, queryName string)
// RecordStorageLatency records storage operation latency.
RecordStorageLatency(ctx context.Context, duration time.Duration)
// IncrementStorageError increments the storage error counter.
IncrementStorageError(ctx context.Context)
// RecordScannerPollingErrorsCounter records the scanner polling errors counter.
RecordScannerPollingErrorsCounter(ctx context.Context)
// RecordVerificationRecordChannelSizeGauge records the verification record channel size gauge.
Expand Down
55 changes: 15 additions & 40 deletions indexer/pkg/monitoring/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,9 @@ type IndexerMetrics struct {
requestDurationSeconds metric.Float64Histogram

// Storage Metrics
verificationRecordsCounter metric.Int64Counter
storageQueryDurationSeconds metric.Float64Histogram
storageWriteDurationSeconds metric.Float64Histogram
storageInsertErrorsCounter metric.Int64Counter
verificationRecordsCounter metric.Int64Counter
storageDurationSeconds metric.Float64Histogram
storageErrorsCounter metric.Int64Counter

// Scanner Metrics
scannerPollingErrorsCounter metric.Int64Counter
Expand Down Expand Up @@ -67,21 +66,13 @@ func InitMetrics() (im *IndexerMetrics, err error) {
return nil, fmt.Errorf("failed to register verification records counter: %w", err)
}

im.storageQueryDurationSeconds, err = beholder.GetMeter().Float64Histogram(
"indexer_storage_query_duration_seconds",
metric.WithDescription("Total duration of querying the storage of the Indexer"),
im.storageDurationSeconds, err = beholder.GetMeter().Float64Histogram(
"indexer_storage_operation_duration_seconds",
metric.WithDescription("Duration of storage operations in the Indexer"),
metric.WithUnit("seconds"),
)
if err != nil {
return nil, fmt.Errorf("failed to register storage query duration histogram: %w", err)
}

im.storageWriteDurationSeconds, err = beholder.GetMeter().Float64Histogram("indexer_storage_write_duration_seconds",
metric.WithDescription("Total duration of writing to the storage of the Indexer"),
metric.WithUnit("seconds"),
)
if err != nil {
return nil, fmt.Errorf("failed to register storage write duration histogram: %w", err)
return nil, fmt.Errorf("failed to register storage operation duration histogram: %w", err)
}

im.discoveryLatencySeconds, err = beholder.GetMeter().Float64Histogram("indexer_message_discovery_latency_seconds",
Expand All @@ -92,11 +83,11 @@ func InitMetrics() (im *IndexerMetrics, err error) {
return nil, fmt.Errorf("failed to register message discovery latency histogram: %w", err)
}

im.storageInsertErrorsCounter, err = beholder.GetMeter().Int64Counter("indexer_storage_insert_errors_total",
metric.WithDescription("Total number of errors when inserting into Indexer Storage"),
im.storageErrorsCounter, err = beholder.GetMeter().Int64Counter("indexer_storage_errors_total",
metric.WithDescription("Total number of storage errors in the Indexer"),
)
if err != nil {
return nil, fmt.Errorf("failed to register storage insert errors counter: %w", err)
return nil, fmt.Errorf("failed to register storage errors counter: %w", err)
}

im.scannerPollingErrorsCounter, err = beholder.GetMeter().Int64Counter("indexer_scanner_polling_errors_total",
Expand Down Expand Up @@ -164,13 +155,7 @@ var grpcPayloadSizeBuckets = []float64{
func MetricViews() []sdkmetric.View {
return []sdkmetric.View{
sdkmetric.NewView(
sdkmetric.Instrument{Name: "indexer_storage_query_duration_seconds"},
sdkmetric.Stream{Aggregation: sdkmetric.AggregationExplicitBucketHistogram{
Boundaries: []float64{0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10, 60, 300, 720},
}},
),
sdkmetric.NewView(
sdkmetric.Instrument{Name: "indexer_storage_write_duration_seconds"},
sdkmetric.Instrument{Name: "indexer_storage_operation_duration_seconds"},
sdkmetric.Stream{Aggregation: sdkmetric.AggregationExplicitBucketHistogram{
Boundaries: []float64{0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2.5, 5, 10, 60, 300, 720},
}},
Expand Down Expand Up @@ -244,24 +229,14 @@ func (c *IndexerMetricLabeler) IncrementVerificationRecordsCounter(ctx context.C
c.im.verificationRecordsCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...))
}

func (c *IndexerMetricLabeler) RecordStorageQueryDuration(ctx context.Context, duration time.Duration, queryName string, errored bool) {
func (c *IndexerMetricLabeler) RecordStorageLatency(ctx context.Context, duration time.Duration) {
otelLabels := beholder.OtelAttributes(c.Labels).AsStringAttributes()
c.im.storageQueryDurationSeconds.Record(ctx, duration.Seconds(), metric.WithAttributes([]attribute.KeyValue{
attribute.String("query", queryName),
attribute.Bool("errored", errored),
}...), metric.WithAttributes(otelLabels...))
c.im.storageDurationSeconds.Record(ctx, duration.Seconds(), metric.WithAttributes(otelLabels...))
}

func (c *IndexerMetricLabeler) RecordStorageWriteDuration(ctx context.Context, duration time.Duration) {
func (c *IndexerMetricLabeler) IncrementStorageError(ctx context.Context) {
otelLabels := beholder.OtelAttributes(c.Labels).AsStringAttributes()
c.im.storageWriteDurationSeconds.Record(ctx, duration.Seconds(), metric.WithAttributes(otelLabels...))
}

func (c *IndexerMetricLabeler) RecordStorageInsertErrorsCounter(ctx context.Context, queryName string) {
otelLabels := beholder.OtelAttributes(c.Labels).AsStringAttributes()
c.im.storageInsertErrorsCounter.Add(ctx, 1, metric.WithAttributes([]attribute.KeyValue{
attribute.String("query", queryName),
}...), metric.WithAttributes(otelLabels...))
c.im.storageErrorsCounter.Add(ctx, 1, metric.WithAttributes(otelLabels...))
}

func (c *IndexerMetricLabeler) RecordScannerPollingErrorsCounter(ctx context.Context) {
Expand Down
12 changes: 2 additions & 10 deletions indexer/pkg/monitoring/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,9 @@ func (n *NoopIndexerMetricLabeler) DecrementActiveRequestsCounter(ctx context.Co
func (n *NoopIndexerMetricLabeler) RecordHTTPRequestDuration(ctx context.Context, duration time.Duration, path, method string, status int) {
}
func (n *NoopIndexerMetricLabeler) IncrementVerificationRecordsCounter(ctx context.Context) {}
func (n *NoopIndexerMetricLabeler) RecordStorageQueryDuration(ctx context.Context, duration time.Duration, queryName string, errored bool) {
}

func (n *NoopIndexerMetricLabeler) RecordStorageWriteDuration(ctx context.Context, duration time.Duration) {
}

func (n *NoopIndexerMetricLabeler) RecordStorageInsertErrorsCounter(ctx context.Context, queryName string) {
}

func (n *NoopIndexerMetricLabeler) RecordVerificationRecordRequestDuration(ctx context.Context, duration time.Duration) {
func (n *NoopIndexerMetricLabeler) RecordStorageLatency(ctx context.Context, duration time.Duration) {
}
func (n *NoopIndexerMetricLabeler) IncrementStorageError(ctx context.Context) {}
func (n *NoopIndexerMetricLabeler) RecordScannerPollingErrorsCounter(ctx context.Context) {}
func (n *NoopIndexerMetricLabeler) RecordVerificationRecordChannelSizeGauge(ctx context.Context, size int64) {
}
Expand Down
141 changes: 141 additions & 0 deletions indexer/pkg/storage/metrics_aware_storage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package storage

import (
"context"
"time"

"github.com/smartcontractkit/chainlink-ccv/indexer/pkg/common"
"github.com/smartcontractkit/chainlink-ccv/protocol"
"github.com/smartcontractkit/chainlink-common/pkg/logger"
)

const (
operationLabel = "operation"

opGetCCVData = "GetCCVData"
opQueryCCVData = "QueryCCVData"
opInsertVerifierResults = "InsertVerifierResults"
opGetMessage = "GetMessage"
opQueryMessages = "QueryMessages"
opUpdateMessageStatus = "UpdateMessageStatus"
opCreateDiscoveryState = "CreateDiscoveryState"
opGetDiscoverySequence = "GetDiscoverySequenceNumber"
opPersistDiscoveryBatch = "PersistDiscoveryBatch"

defaultSlowQueryThreshold = 500 * time.Millisecond
)

type MetricsAwareStorage struct {
inner common.IndexerStorage
m common.IndexerMonitoring
l logger.Logger
slowQueryThreshold time.Duration
}

type MetricsAwareStorageOption func(*MetricsAwareStorage)

func WithSlowQueryThreshold(threshold time.Duration) MetricsAwareStorageOption {
return func(s *MetricsAwareStorage) {
s.slowQueryThreshold = threshold
}
}

func NewMetricsAwareStorage(inner common.IndexerStorage, m common.IndexerMonitoring, l logger.Logger, opts ...MetricsAwareStorageOption) *MetricsAwareStorage {
s := &MetricsAwareStorage{
inner: inner,
m: m,
l: l,
slowQueryThreshold: defaultSlowQueryThreshold,
}
for _, opt := range opts {
opt(s)
}
return s
}

func WrapWithMetrics(inner common.IndexerStorage, m common.IndexerMonitoring, l logger.Logger, opts ...MetricsAwareStorageOption) common.IndexerStorage {
return NewMetricsAwareStorage(inner, m, l, opts...)
}

func (s *MetricsAwareStorage) metrics(operation string) common.IndexerMetricLabeler {
return s.m.Metrics().With(operationLabel, operation)
}

func (s *MetricsAwareStorage) GetCCVData(ctx context.Context, messageID protocol.Bytes32) ([]common.VerifierResultWithMetadata, error) {
return captureMetrics(ctx, s.metrics(opGetCCVData), s.l, s.slowQueryThreshold, opGetCCVData, func() ([]common.VerifierResultWithMetadata, error) {
return s.inner.GetCCVData(ctx, messageID)
})
}

func (s *MetricsAwareStorage) QueryCCVData(ctx context.Context, start, end int64, sourceChainSelectors, destChainSelectors []protocol.ChainSelector, limit, offset uint64) (map[string][]common.VerifierResultWithMetadata, error) {
return captureMetrics(ctx, s.metrics(opQueryCCVData), s.l, s.slowQueryThreshold, opQueryCCVData, func() (map[string][]common.VerifierResultWithMetadata, error) {
return s.inner.QueryCCVData(ctx, start, end, sourceChainSelectors, destChainSelectors, limit, offset)
})
}

func (s *MetricsAwareStorage) InsertVerifierResults(ctx context.Context, verifierResults []common.VerifierResultWithMetadata) error {
return captureMetricsNoReturn(ctx, s.metrics(opInsertVerifierResults), s.l, s.slowQueryThreshold, opInsertVerifierResults, func() error {
return s.inner.InsertVerifierResults(ctx, verifierResults)
})
}

func (s *MetricsAwareStorage) GetMessage(ctx context.Context, messageID protocol.Bytes32) (common.MessageWithMetadata, error) {
return captureMetrics(ctx, s.metrics(opGetMessage), s.l, s.slowQueryThreshold, opGetMessage, func() (common.MessageWithMetadata, error) {
return s.inner.GetMessage(ctx, messageID)
})
}

func (s *MetricsAwareStorage) QueryMessages(ctx context.Context, start, end int64, sourceChainSelectors, destChainSelectors []protocol.ChainSelector, limit, offset uint64) ([]common.MessageWithMetadata, error) {
return captureMetrics(ctx, s.metrics(opQueryMessages), s.l, s.slowQueryThreshold, opQueryMessages, func() ([]common.MessageWithMetadata, error) {
return s.inner.QueryMessages(ctx, start, end, sourceChainSelectors, destChainSelectors, limit, offset)
})
}

func (s *MetricsAwareStorage) UpdateMessageStatus(ctx context.Context, messageID protocol.Bytes32, status common.MessageStatus, lastErr string) error {
return captureMetricsNoReturn(ctx, s.metrics(opUpdateMessageStatus), s.l, s.slowQueryThreshold, opUpdateMessageStatus, func() error {
return s.inner.UpdateMessageStatus(ctx, messageID, status, lastErr)
})
}

func (s *MetricsAwareStorage) CreateDiscoveryState(ctx context.Context, discoveryLocation string, startingSequenceNumber int) error {
return captureMetricsNoReturn(ctx, s.metrics(opCreateDiscoveryState), s.l, s.slowQueryThreshold, opCreateDiscoveryState, func() error {
return s.inner.CreateDiscoveryState(ctx, discoveryLocation, startingSequenceNumber)
})
}

func (s *MetricsAwareStorage) GetDiscoverySequenceNumber(ctx context.Context, discoveryLocation string) (int, error) {
return captureMetrics(ctx, s.metrics(opGetDiscoverySequence), s.l, s.slowQueryThreshold, opGetDiscoverySequence, func() (int, error) {
return s.inner.GetDiscoverySequenceNumber(ctx, discoveryLocation)
})
}

func (s *MetricsAwareStorage) PersistDiscoveryBatch(ctx context.Context, batch common.DiscoveryBatch) error {
return captureMetricsNoReturn(ctx, s.metrics(opPersistDiscoveryBatch), s.l, s.slowQueryThreshold, opPersistDiscoveryBatch, func() error {
return s.inner.PersistDiscoveryBatch(ctx, batch)
})
}

func captureMetrics[T any](ctx context.Context, metrics common.IndexerMetricLabeler, l logger.Logger, threshold time.Duration, operation string, fn func() (T, error)) (T, error) {
start := time.Now()
defer func() {
duration := time.Since(start)
metrics.RecordStorageLatency(ctx, duration)
if duration > threshold {
l.Warnw("Slow storage operation", "operation", operation, "duration_ms", duration.Milliseconds())
}
}()

res, err := fn()
if err != nil {
metrics.IncrementStorageError(ctx)
}

return res, err
}

func captureMetricsNoReturn(ctx context.Context, metrics common.IndexerMetricLabeler, l logger.Logger, threshold time.Duration, operation string, fn func() error) error {
_, err := captureMetrics(ctx, metrics, l, threshold, operation, func() (struct{}, error) {
return struct{}{}, fn()
})
return err
}
Loading
Loading