Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
224 changes: 149 additions & 75 deletions pkg/exporters/verifier/verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"sync"
"time"

ethereum "github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/core/types"
"github.com/evstack/ev-metrics/internal/clients/celestia"
"github.com/evstack/ev-metrics/internal/clients/evm"
Expand Down Expand Up @@ -56,7 +57,6 @@ func (e *exporter) ExportMetrics(ctx context.Context, m *metrics.Metrics) error
if err != nil {
return err
}
defer sub.Unsubscribe()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The defer sub.Unsubscribe() was removed, which is correct given the new reconnection logic. However, it's important to ensure that the sub is always unsubscribed when the ExportMetrics function exits, even if the reconnectSubscription loop is entered and then the context is cancelled. The current logic handles this by calling sub.Unsubscribe() in the ctx.Done() cases within the select loop, but the initial sub created before the loop might not be unsubscribed if an error occurs before entering the loop or if the function returns early for other reasons.

// create buffered channel for block queue
blockQueue := make(chan *types.Header, e.workers*2)
Expand All @@ -74,18 +74,46 @@ func (e *exporter) ExportMetrics(ctx context.Context, m *metrics.Metrics) error

e.logger.Info().Int("workers", e.workers).Msg("started verification work pool")

// pre-initialize submission metrics so both blob types are always visible
// in Prometheus output, even before any block has been fully processed.
m.InitializeSubmissionMetrics(e.chainID)

// ticker to refresh submission duration metric every 10 seconds
refreshTicker := time.NewTicker(10 * time.Second)
defer refreshTicker.Stop()

// shutdown cleanly unsubscribes and waits for all workers to finish.
// It captures sub by reference so reassignment during reconnection is reflected.
shutdown := func() {
sub.Unsubscribe()
close(blockQueue)
workerGroup.Wait()
}

// main subscription loop
for {
select {
case <-ctx.Done():
e.logger.Info().Msg("stopping block verification")
close(blockQueue)
workerGroup.Wait()
shutdown()
return nil
case subErr := <-sub.Err():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The subErr := <-sub.Err(): case handles both actual errors and a closed channel (where subErr would be nil). The logging distinguishes between these two scenarios, which is good for debugging.

// WebSocket subscription dropped — reconnect with backoff.
if subErr != nil {
e.logger.Error().Err(subErr).Msg("WebSocket subscription error, reconnecting")
} else {
e.logger.Warn().Msg("WebSocket subscription closed, reconnecting")
}
sub.Unsubscribe()
newSub := e.reconnectSubscription(ctx, headers)
if newSub == nil {
// context was cancelled during reconnection
close(blockQueue)
workerGroup.Wait()
return nil
}
sub = newSub
e.logger.Info().Msg("WebSocket subscription re-established")
case <-refreshTicker.C:
// ensure that submission duration is always included in the 60 second window.
m.RefreshSubmissionDuration()
Expand All @@ -106,14 +134,40 @@ func (e *exporter) ExportMetrics(ctx context.Context, m *metrics.Metrics) error
case blockQueue <- header:
// block queued successfully
case <-ctx.Done():
close(blockQueue)
workerGroup.Wait()
shutdown()
return nil
}
}
}
}

// reconnectSubscription attempts to re-establish the WebSocket block header subscription
// with exponential backoff. Returns nil if the context is cancelled before reconnecting.
func (e *exporter) reconnectSubscription(ctx context.Context, headers chan *types.Header) ethereum.Subscription {
backoff := 5 * time.Second
const maxBackoff = 60 * time.Second

for {
select {
case <-ctx.Done():
return nil
case <-time.After(backoff):
}

sub, err := e.evmClient.SubscribeNewHead(ctx, headers)
if err != nil {
if backoff*2 < maxBackoff {
backoff *= 2
} else {
backoff = maxBackoff
Comment on lines +159 to +162
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The exponential backoff logic is correctly implemented, ensuring that the retry interval doesn't exceed maxBackoff. This prevents excessive retries in case of persistent issues.

}
e.logger.Warn().Err(err).Dur("retry_in", backoff).Msg("failed to reconnect WebSocket subscription, retrying")
continue
}
return sub
}
}

// processBlocks processes blocks from the queue
func (e *exporter) processBlocks(ctx context.Context, m *metrics.Metrics, workerID int, blockQueue chan *types.Header) {
logger := e.logger.With().Int("worker_id", workerID).Logger()
Expand Down Expand Up @@ -153,6 +207,12 @@ func (e *exporter) onVerified(m *metrics.Metrics, namespace string, blockHeight,
}
}

// verifyAttemptTimeout caps how long a single verification attempt (all RPC calls
// combined) may take. Without this, a slow or hung Celestia/ev-node endpoint can
// block a worker goroutine indefinitely, eventually filling the block queue and
// freezing metrics.
const verifyAttemptTimeout = 30 * time.Second

// verifyBlock attempts to verify a DA height for a given block status.
func (e *exporter) verifyBlock(ctx context.Context, m *metrics.Metrics, header *types.Header) bool {
blockHeight := header.Number.Uint64()
Expand Down Expand Up @@ -199,89 +259,103 @@ func (e *exporter) verifyBlock(ctx context.Context, m *metrics.Metrics, header *
// proceed with retry
}

blockResult, err := e.evnodeClient.GetBlock(ctx, blockHeight)
if err != nil {
logger.Warn().Err(err).Int("attempt", retries).Msg("failed to re-query block from ev-node")
continue
}

daHeight := blockResult.HeaderDaHeight
if namespace == "data" {
daHeight = blockResult.DataDaHeight
}

if daHeight == 0 {
logger.Debug().Int("attempt", retries).Msg("block still not submitted to DA, will retry")
continue
if e.verifyAttempt(ctx, m, logger, retries, blockHeight, namespace, blockTime, startTime) {
return false
}
Comment on lines +262 to 264
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The verifyAttempt function now encapsulates the RPC calls with a timeout, which is a significant improvement for preventing worker goroutine hangs. The return value true for verifyAttempt indicates that no further retries are needed for the current block, which correctly propagates to the verifyBlock function to stop its retry loop.

}

blockResultWithBlobs, err := e.evnodeClient.GetBlockWithBlobs(ctx, blockHeight)
if err != nil {
logger.Warn().Err(err).Int("attempt", retries).Msg("failed to query block from ev-node")
continue
}
// if loop completes without success, log final error
logger.Error().Msg("max retries exhausted: failed to verify block")
e.onVerified(m, namespace, blockHeight, 0, false, 0)
return true
}
Comment on lines +262 to +271
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Re-enqueued block records both a failure and a success metric for the same logical event.

After max retries, line 265 calls onVerified(m, namespace, blockHeight, 0, false, 0), which increments the failure counter and marks the block missing. When the re-enqueued block is later verified, onVerified(…, true, …) also fires, incrementing the success counter and removing the missing-block entry. Depending on how dashboards/alerts interpret these counters, a single block can produce one failure and one success — inflating both. If the intent is to track only the terminal outcome, the onVerified(false) call at exhaustion could be moved to a place where the block is definitively abandoned (i.e., never re-enqueued), or the re-enqueue path could clear the missing-block entry before re-trying.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/exporters/verifier/verifier.go` around lines 258 - 267, The code calls
e.onVerified(m, namespace, blockHeight, 0, false, 0) when max retries are
exhausted, which records a failure immediately even though the block may be
re-enqueued and later succeed; move or guard this failure reporting so it only
records terminal abandonment: either (a) remove the onVerified(false) call from
the retry loop exit and call onVerified(false) only from the code path that
definitively abandons a block, or (b) when re-enqueuing inside verifyAttempt (or
whatever path triggers a retry), clear the missing-block state before
re-queueing so a future onVerified(true) does not produce a succeeding counter
for an already-failed record; update references around verifyAttempt, the loop
that checks retries, and the onVerified(m, namespace, blockHeight, ...)
invocation accordingly to ensure only the terminal outcome is recorded.


daBlockTime, err := e.celestiaClient.GetBlockTimestamp(ctx, daHeight)
if err != nil {
logger.Warn().Err(err).Uint64("da_height", daHeight).Msg("failed to get da block timestamp")
continue
}
// verifyAttempt performs one bounded RPC attempt to verify a block against Celestia DA.
// It returns true when retrying is no longer needed (verified, or permanent failure),
// and false when the caller should retry.
// Each call is bounded by verifyAttemptTimeout so workers cannot hang indefinitely
// on slow or unresponsive ev-node / Celestia endpoints.
func (e *exporter) verifyAttempt(ctx context.Context, m *metrics.Metrics, logger zerolog.Logger, retries int, blockHeight uint64, namespace string, blockTime time.Time, startTime time.Time) bool {
attemptCtx, cancel := context.WithTimeout(ctx, verifyAttemptTimeout)
defer cancel()
Comment on lines +279 to +280
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using context.WithTimeout and defer cancel() ensures that all RPC calls within a single verifyAttempt are bounded by verifyAttemptTimeout. This directly addresses the problem of worker goroutines hanging indefinitely.


// the time taken from block time to DA inclusion time.
submissionDuration := daBlockTime.Sub(blockTime)
blockResult, err := e.evnodeClient.GetBlock(attemptCtx, blockHeight)
if err != nil {
logger.Warn().Err(err).Int("attempt", retries).Msg("failed to re-query block from ev-node")
return false
}

switch namespace {
case "header":
verified, err := e.celestiaClient.VerifyBlobAtHeight(ctx, blockResultWithBlobs.HeaderBlob, daHeight, e.headerNS)
daHeight := blockResult.HeaderDaHeight
if namespace == "data" {
daHeight = blockResult.DataDaHeight
}

if err != nil {
logger.Warn().Err(err).Uint64("da_height", daHeight).Msg("verification failed")
continue
}
if daHeight == 0 {
logger.Debug().Int("attempt", retries).Msg("block still not submitted to DA, will retry")
return false
}

if verified {
logger.Info().
Uint64("da_height", daHeight).
Dur("duration", time.Since(startTime)).
Msg("header blob verified on Celestia")
e.onVerified(m, namespace, blockHeight, daHeight, true, submissionDuration)
return false
}
blockResultWithBlobs, err := e.evnodeClient.GetBlockWithBlobs(attemptCtx, blockHeight)
if err != nil {
logger.Warn().Err(err).Int("attempt", retries).Msg("failed to query block from ev-node")
return false
}

case "data":
if len(blockResultWithBlobs.DataBlob) == 0 {
logger.Info().
Dur("duration", time.Since(startTime)).
Msg("empty data block - no verification needed")
e.onVerified(m, namespace, blockHeight, daHeight, true, submissionDuration)
return false
}
daBlockTime, err := e.celestiaClient.GetBlockTimestamp(attemptCtx, daHeight)
if err != nil {
logger.Warn().Err(err).Uint64("da_height", daHeight).Msg("failed to get da block timestamp")
return false
}

// perform actual verification between bytes from ev-node and Celestia.
verified, err := e.celestiaClient.VerifyDataBlobAtHeight(ctx, blockResultWithBlobs.DataBlob, daHeight, e.dataNS)
if err != nil {
logger.Warn().Err(err).Uint64("da_height", daHeight).Msg("verification failed")
continue
}
// the time taken from block time to DA inclusion time.
submissionDuration := daBlockTime.Sub(blockTime)

if verified {
logger.Info().
Uint64("da_height", daHeight).
Dur("duration", time.Since(startTime)).
Msg("data blob verified on Celestia")
e.onVerified(m, namespace, blockHeight, daHeight, true, submissionDuration)
return false
}
logger.Warn().Uint64("da_height", daHeight).Int("attempt", retries).Msg("verification failed, will retry")
switch namespace {
case "header":
verified, err := e.celestiaClient.VerifyBlobAtHeight(attemptCtx, blockResultWithBlobs.HeaderBlob, daHeight, e.headerNS)
if err != nil {
logger.Warn().Err(err).Uint64("da_height", daHeight).Msg("verification failed")
return false
}
if verified {
logger.Info().
Uint64("da_height", daHeight).
Dur("duration", time.Since(startTime)).
Msg("header blob verified on Celestia")
e.onVerified(m, namespace, blockHeight, daHeight, true, submissionDuration)
return true
}
logger.Warn().Uint64("da_height", daHeight).Int("attempt", retries).Msg("header verification failed, will retry")

case "data":
if len(blockResultWithBlobs.DataBlob) == 0 {
logger.Info().
Dur("duration", time.Since(startTime)).
Msg("empty data block - no verification needed")
e.onVerified(m, namespace, blockHeight, daHeight, true, submissionDuration)
return true
}

default:
logger.Error().Str("namespace", namespace).Msg("unknown namespace type")
// perform actual verification between bytes from ev-node and Celestia.
verified, err := e.celestiaClient.VerifyDataBlobAtHeight(attemptCtx, blockResultWithBlobs.DataBlob, daHeight, e.dataNS)
if err != nil {
logger.Warn().Err(err).Uint64("da_height", daHeight).Msg("verification failed")
return false
}
if verified {
logger.Info().
Uint64("da_height", daHeight).
Dur("duration", time.Since(startTime)).
Msg("data blob verified on Celestia")
e.onVerified(m, namespace, blockHeight, daHeight, true, submissionDuration)
return true
}
logger.Warn().Uint64("da_height", daHeight).Int("attempt", retries).Msg("verification failed, will retry")

default:
logger.Error().Str("namespace", namespace).Msg("unknown namespace type")
return true
}

// if loop completes without success, log final error
logger.Error().Msg("max retries exhausted: failed to verify block")
e.onVerified(m, namespace, blockHeight, 0, false, 0)
return true
return false
}
19 changes: 19 additions & 0 deletions pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -701,6 +701,25 @@ func (m *Metrics) RecordJsonRpcRequestDuration(chainID string, duration time.Dur
m.JsonRpcRequestDurationSummary.WithLabelValues(chainID).Observe(duration.Seconds())
}

// InitializeSubmissionMetrics pre-initializes submission-related metrics for all
// known blob types so they are always visible in Prometheus output from startup,
// regardless of whether any blocks have been processed yet.
//
// For Gauges and Counters, the label combination is registered at zero.
// For the SubmissionDuration Summary, GetMetricWithLabelValues registers the
// metric without making a fake observation — quantiles will show NaN and
// count/sum will show 0, which accurately represents "no data yet".
func (m *Metrics) InitializeSubmissionMetrics(chainID string) {
for _, blobType := range []string{"header", "data"} {
m.UnsubmittedBlocksTotal.WithLabelValues(chainID, blobType).Set(0)
m.SubmissionAttemptsTotal.WithLabelValues(chainID, blobType).Add(0)
m.SubmissionFailuresTotal.WithLabelValues(chainID, blobType).Add(0)
// Register the Summary without a fake observation so it is visible
// from startup while keeping quantile values accurate.
_, _ = m.SubmissionDuration.GetMetricWithLabelValues(chainID, blobType)
}
}

// InitializeJsonRpcSloThresholds initializes the constant SLO threshold gauges for JSON-RPC requests
func (m *Metrics) InitializeJsonRpcSloThresholds(chainID string) {
m.JsonRpcRequestSloSeconds.WithLabelValues(chainID, "0.5").Set(0.2)
Expand Down