-
Notifications
You must be signed in to change notification settings - Fork 0
fix(verifier): reconnect WebSocket on drop and add per-attempt RPC timeouts #31
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,6 +5,7 @@ import ( | |
| "sync" | ||
| "time" | ||
|
|
||
| ethereum "github.com/ethereum/go-ethereum" | ||
| "github.com/ethereum/go-ethereum/core/types" | ||
| "github.com/evstack/ev-metrics/internal/clients/celestia" | ||
| "github.com/evstack/ev-metrics/internal/clients/evm" | ||
|
|
@@ -56,7 +57,6 @@ func (e *exporter) ExportMetrics(ctx context.Context, m *metrics.Metrics) error | |
| if err != nil { | ||
| return err | ||
| } | ||
| defer sub.Unsubscribe() | ||
|
|
||
| // create buffered channel for block queue | ||
| blockQueue := make(chan *types.Header, e.workers*2) | ||
|
|
@@ -74,18 +74,46 @@ func (e *exporter) ExportMetrics(ctx context.Context, m *metrics.Metrics) error | |
|
|
||
| e.logger.Info().Int("workers", e.workers).Msg("started verification work pool") | ||
|
|
||
| // pre-initialize submission metrics so both blob types are always visible | ||
| // in Prometheus output, even before any block has been fully processed. | ||
| m.InitializeSubmissionMetrics(e.chainID) | ||
|
|
||
| // ticker to refresh submission duration metric every 10 seconds | ||
| refreshTicker := time.NewTicker(10 * time.Second) | ||
| defer refreshTicker.Stop() | ||
|
|
||
| // shutdown cleanly unsubscribes and waits for all workers to finish. | ||
| // It captures sub by reference so reassignment during reconnection is reflected. | ||
| shutdown := func() { | ||
| sub.Unsubscribe() | ||
| close(blockQueue) | ||
| workerGroup.Wait() | ||
| } | ||
|
|
||
| // main subscription loop | ||
| for { | ||
| select { | ||
| case <-ctx.Done(): | ||
| e.logger.Info().Msg("stopping block verification") | ||
| close(blockQueue) | ||
| workerGroup.Wait() | ||
| shutdown() | ||
| return nil | ||
| case subErr := <-sub.Err(): | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| // WebSocket subscription dropped — reconnect with backoff. | ||
| if subErr != nil { | ||
| e.logger.Error().Err(subErr).Msg("WebSocket subscription error, reconnecting") | ||
| } else { | ||
| e.logger.Warn().Msg("WebSocket subscription closed, reconnecting") | ||
| } | ||
| sub.Unsubscribe() | ||
| newSub := e.reconnectSubscription(ctx, headers) | ||
| if newSub == nil { | ||
| // context was cancelled during reconnection | ||
| close(blockQueue) | ||
| workerGroup.Wait() | ||
| return nil | ||
| } | ||
| sub = newSub | ||
| e.logger.Info().Msg("WebSocket subscription re-established") | ||
| case <-refreshTicker.C: | ||
| // ensure that submission duration is always included in the 60 second window. | ||
| m.RefreshSubmissionDuration() | ||
|
|
@@ -106,14 +134,40 @@ func (e *exporter) ExportMetrics(ctx context.Context, m *metrics.Metrics) error | |
| case blockQueue <- header: | ||
| // block queued successfully | ||
| case <-ctx.Done(): | ||
| close(blockQueue) | ||
| workerGroup.Wait() | ||
| shutdown() | ||
| return nil | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // reconnectSubscription attempts to re-establish the WebSocket block header subscription | ||
| // with exponential backoff. Returns nil if the context is cancelled before reconnecting. | ||
| func (e *exporter) reconnectSubscription(ctx context.Context, headers chan *types.Header) ethereum.Subscription { | ||
| backoff := 5 * time.Second | ||
| const maxBackoff = 60 * time.Second | ||
|
|
||
| for { | ||
| select { | ||
| case <-ctx.Done(): | ||
| return nil | ||
| case <-time.After(backoff): | ||
| } | ||
|
|
||
| sub, err := e.evmClient.SubscribeNewHead(ctx, headers) | ||
| if err != nil { | ||
| if backoff*2 < maxBackoff { | ||
| backoff *= 2 | ||
| } else { | ||
| backoff = maxBackoff | ||
|
Comment on lines
+159
to
+162
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| } | ||
| e.logger.Warn().Err(err).Dur("retry_in", backoff).Msg("failed to reconnect WebSocket subscription, retrying") | ||
| continue | ||
| } | ||
| return sub | ||
| } | ||
| } | ||
coderabbitai[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| // processBlocks processes blocks from the queue | ||
| func (e *exporter) processBlocks(ctx context.Context, m *metrics.Metrics, workerID int, blockQueue chan *types.Header) { | ||
| logger := e.logger.With().Int("worker_id", workerID).Logger() | ||
|
|
@@ -153,6 +207,12 @@ func (e *exporter) onVerified(m *metrics.Metrics, namespace string, blockHeight, | |
| } | ||
| } | ||
|
|
||
| // verifyAttemptTimeout caps how long a single verification attempt (all RPC calls | ||
| // combined) may take. Without this, a slow or hung Celestia/ev-node endpoint can | ||
| // block a worker goroutine indefinitely, eventually filling the block queue and | ||
| // freezing metrics. | ||
| const verifyAttemptTimeout = 30 * time.Second | ||
|
|
||
| // verifyBlock attempts to verify a DA height for a given block status. | ||
| func (e *exporter) verifyBlock(ctx context.Context, m *metrics.Metrics, header *types.Header) bool { | ||
| blockHeight := header.Number.Uint64() | ||
|
|
@@ -199,89 +259,103 @@ func (e *exporter) verifyBlock(ctx context.Context, m *metrics.Metrics, header * | |
| // proceed with retry | ||
| } | ||
|
|
||
| blockResult, err := e.evnodeClient.GetBlock(ctx, blockHeight) | ||
| if err != nil { | ||
| logger.Warn().Err(err).Int("attempt", retries).Msg("failed to re-query block from ev-node") | ||
| continue | ||
| } | ||
|
|
||
| daHeight := blockResult.HeaderDaHeight | ||
| if namespace == "data" { | ||
| daHeight = blockResult.DataDaHeight | ||
| } | ||
|
|
||
| if daHeight == 0 { | ||
| logger.Debug().Int("attempt", retries).Msg("block still not submitted to DA, will retry") | ||
| continue | ||
| if e.verifyAttempt(ctx, m, logger, retries, blockHeight, namespace, blockTime, startTime) { | ||
| return false | ||
| } | ||
|
Comment on lines
+262
to
264
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The |
||
| } | ||
|
|
||
| blockResultWithBlobs, err := e.evnodeClient.GetBlockWithBlobs(ctx, blockHeight) | ||
| if err != nil { | ||
| logger.Warn().Err(err).Int("attempt", retries).Msg("failed to query block from ev-node") | ||
| continue | ||
| } | ||
| // if loop completes without success, log final error | ||
| logger.Error().Msg("max retries exhausted: failed to verify block") | ||
| e.onVerified(m, namespace, blockHeight, 0, false, 0) | ||
| return true | ||
| } | ||
|
Comment on lines
+262
to
+271
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Re-enqueued block records both a failure and a success metric for the same logical event. After max retries, line 265 calls 🤖 Prompt for AI Agents |
||
|
|
||
| daBlockTime, err := e.celestiaClient.GetBlockTimestamp(ctx, daHeight) | ||
| if err != nil { | ||
| logger.Warn().Err(err).Uint64("da_height", daHeight).Msg("failed to get da block timestamp") | ||
| continue | ||
| } | ||
| // verifyAttempt performs one bounded RPC attempt to verify a block against Celestia DA. | ||
| // It returns true when retrying is no longer needed (verified, or permanent failure), | ||
| // and false when the caller should retry. | ||
| // Each call is bounded by verifyAttemptTimeout so workers cannot hang indefinitely | ||
| // on slow or unresponsive ev-node / Celestia endpoints. | ||
| func (e *exporter) verifyAttempt(ctx context.Context, m *metrics.Metrics, logger zerolog.Logger, retries int, blockHeight uint64, namespace string, blockTime time.Time, startTime time.Time) bool { | ||
| attemptCtx, cancel := context.WithTimeout(ctx, verifyAttemptTimeout) | ||
| defer cancel() | ||
|
Comment on lines
+279
to
+280
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
|
|
||
| // the time taken from block time to DA inclusion time. | ||
| submissionDuration := daBlockTime.Sub(blockTime) | ||
| blockResult, err := e.evnodeClient.GetBlock(attemptCtx, blockHeight) | ||
| if err != nil { | ||
| logger.Warn().Err(err).Int("attempt", retries).Msg("failed to re-query block from ev-node") | ||
| return false | ||
| } | ||
|
|
||
| switch namespace { | ||
| case "header": | ||
| verified, err := e.celestiaClient.VerifyBlobAtHeight(ctx, blockResultWithBlobs.HeaderBlob, daHeight, e.headerNS) | ||
| daHeight := blockResult.HeaderDaHeight | ||
| if namespace == "data" { | ||
| daHeight = blockResult.DataDaHeight | ||
| } | ||
|
|
||
| if err != nil { | ||
| logger.Warn().Err(err).Uint64("da_height", daHeight).Msg("verification failed") | ||
| continue | ||
| } | ||
| if daHeight == 0 { | ||
| logger.Debug().Int("attempt", retries).Msg("block still not submitted to DA, will retry") | ||
| return false | ||
| } | ||
|
|
||
| if verified { | ||
| logger.Info(). | ||
| Uint64("da_height", daHeight). | ||
| Dur("duration", time.Since(startTime)). | ||
| Msg("header blob verified on Celestia") | ||
| e.onVerified(m, namespace, blockHeight, daHeight, true, submissionDuration) | ||
| return false | ||
| } | ||
| blockResultWithBlobs, err := e.evnodeClient.GetBlockWithBlobs(attemptCtx, blockHeight) | ||
| if err != nil { | ||
| logger.Warn().Err(err).Int("attempt", retries).Msg("failed to query block from ev-node") | ||
| return false | ||
| } | ||
|
|
||
| case "data": | ||
| if len(blockResultWithBlobs.DataBlob) == 0 { | ||
| logger.Info(). | ||
| Dur("duration", time.Since(startTime)). | ||
| Msg("empty data block - no verification needed") | ||
| e.onVerified(m, namespace, blockHeight, daHeight, true, submissionDuration) | ||
| return false | ||
| } | ||
| daBlockTime, err := e.celestiaClient.GetBlockTimestamp(attemptCtx, daHeight) | ||
| if err != nil { | ||
| logger.Warn().Err(err).Uint64("da_height", daHeight).Msg("failed to get da block timestamp") | ||
| return false | ||
| } | ||
|
|
||
| // perform actual verification between bytes from ev-node and Celestia. | ||
| verified, err := e.celestiaClient.VerifyDataBlobAtHeight(ctx, blockResultWithBlobs.DataBlob, daHeight, e.dataNS) | ||
| if err != nil { | ||
| logger.Warn().Err(err).Uint64("da_height", daHeight).Msg("verification failed") | ||
| continue | ||
| } | ||
| // the time taken from block time to DA inclusion time. | ||
| submissionDuration := daBlockTime.Sub(blockTime) | ||
|
|
||
| if verified { | ||
| logger.Info(). | ||
| Uint64("da_height", daHeight). | ||
| Dur("duration", time.Since(startTime)). | ||
| Msg("data blob verified on Celestia") | ||
| e.onVerified(m, namespace, blockHeight, daHeight, true, submissionDuration) | ||
| return false | ||
| } | ||
| logger.Warn().Uint64("da_height", daHeight).Int("attempt", retries).Msg("verification failed, will retry") | ||
| switch namespace { | ||
| case "header": | ||
| verified, err := e.celestiaClient.VerifyBlobAtHeight(attemptCtx, blockResultWithBlobs.HeaderBlob, daHeight, e.headerNS) | ||
| if err != nil { | ||
| logger.Warn().Err(err).Uint64("da_height", daHeight).Msg("verification failed") | ||
| return false | ||
| } | ||
| if verified { | ||
| logger.Info(). | ||
| Uint64("da_height", daHeight). | ||
| Dur("duration", time.Since(startTime)). | ||
| Msg("header blob verified on Celestia") | ||
| e.onVerified(m, namespace, blockHeight, daHeight, true, submissionDuration) | ||
| return true | ||
| } | ||
coderabbitai[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| logger.Warn().Uint64("da_height", daHeight).Int("attempt", retries).Msg("header verification failed, will retry") | ||
|
|
||
| case "data": | ||
| if len(blockResultWithBlobs.DataBlob) == 0 { | ||
| logger.Info(). | ||
| Dur("duration", time.Since(startTime)). | ||
| Msg("empty data block - no verification needed") | ||
| e.onVerified(m, namespace, blockHeight, daHeight, true, submissionDuration) | ||
| return true | ||
| } | ||
|
|
||
| default: | ||
| logger.Error().Str("namespace", namespace).Msg("unknown namespace type") | ||
| // perform actual verification between bytes from ev-node and Celestia. | ||
| verified, err := e.celestiaClient.VerifyDataBlobAtHeight(attemptCtx, blockResultWithBlobs.DataBlob, daHeight, e.dataNS) | ||
| if err != nil { | ||
| logger.Warn().Err(err).Uint64("da_height", daHeight).Msg("verification failed") | ||
| return false | ||
| } | ||
| if verified { | ||
| logger.Info(). | ||
| Uint64("da_height", daHeight). | ||
| Dur("duration", time.Since(startTime)). | ||
| Msg("data blob verified on Celestia") | ||
| e.onVerified(m, namespace, blockHeight, daHeight, true, submissionDuration) | ||
| return true | ||
| } | ||
| logger.Warn().Uint64("da_height", daHeight).Int("attempt", retries).Msg("verification failed, will retry") | ||
|
|
||
| default: | ||
| logger.Error().Str("namespace", namespace).Msg("unknown namespace type") | ||
| return true | ||
| } | ||
|
|
||
| // if loop completes without success, log final error | ||
| logger.Error().Msg("max retries exhausted: failed to verify block") | ||
| e.onVerified(m, namespace, blockHeight, 0, false, 0) | ||
| return true | ||
| return false | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
defer sub.Unsubscribe()was removed, which is correct given the new reconnection logic. However, it's important to ensure that thesubis always unsubscribed when theExportMetricsfunction exits, even if thereconnectSubscriptionloop is entered and then the context is cancelled. The current logic handles this by callingsub.Unsubscribe()in thectx.Done()cases within theselectloop, but the initialsubcreated before the loop might not be unsubscribed if an error occurs before entering the loop or if the function returns early for other reasons.