Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions block/internal/common/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ type Metrics struct {
// Forced inclusion metrics
ForcedInclusionTxsInGracePeriod metrics.Gauge // Number of forced inclusion txs currently in grace period
ForcedInclusionTxsMalicious metrics.Counter // Total number of forced inclusion txs marked as malicious

// Sync mode metrics
SyncMode metrics.Gauge // Current sync mode: 0=catchup, 1=follow
SubscribeErrors metrics.Counter // Number of subscription failures
ModeSwitches metrics.Counter // Number of catchup<->follow mode transitions
}

// PrometheusMetrics returns Metrics built using Prometheus client library
Expand Down Expand Up @@ -201,6 +206,28 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics {
Help: "Total number of forced inclusion transactions marked as malicious (past grace boundary)",
}, labels).With(labelsAndValues...)

// Sync mode metrics
m.SyncMode = prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "sync_mode",
Help: "Current sync mode: 0=catchup (polling), 1=follow (subscription)",
}, labels).With(labelsAndValues...)

m.SubscribeErrors = prometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "subscribe_errors_total",
Help: "Total number of DA subscription failures",
}, labels).With(labelsAndValues...)

m.ModeSwitches = prometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "mode_switches_total",
Help: "Total number of sync mode transitions between catchup and follow",
}, labels).With(labelsAndValues...)

// DA Submitter metrics
m.DASubmitterPendingBlobs = prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Expand Down Expand Up @@ -269,6 +296,11 @@ func NopMetrics() *Metrics {
// Forced inclusion metrics
ForcedInclusionTxsInGracePeriod: discard.NewGauge(),
ForcedInclusionTxsMalicious: discard.NewCounter(),

// Sync mode metrics
SyncMode: discard.NewGauge(),
SubscribeErrors: discard.NewCounter(),
ModeSwitches: discard.NewCounter(),
}

// Initialize maps with no-op metrics
Expand Down
24 changes: 24 additions & 0 deletions block/internal/da/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,3 +442,27 @@ func (c *client) Validate(ctx context.Context, ids []datypes.ID, proofs []datype

return results, nil
}

// Subscribe subscribes to blobs in the specified namespace.
// Returns a channel that receives subscription responses as new blobs are included.
func (c *client) Subscribe(ctx context.Context, namespace []byte) (<-chan *blobrpc.SubscriptionResponse, error) {
ns, err := share.NewNamespaceFromBytes(namespace)
if err != nil {
return nil, fmt.Errorf("invalid namespace: %w", err)
}

return c.blobAPI.Subscribe(ctx, ns)
}

// LocalHead returns the height of the locally synced DA head.
func (c *client) LocalHead(ctx context.Context) (uint64, error) {
headCtx, cancel := context.WithTimeout(ctx, c.defaultTimeout)
defer cancel()

header, err := c.headerAPI.LocalHead(headCtx)
if err != nil {
return 0, fmt.Errorf("failed to get local head: %w", err)
}

return header.Height, nil
}
10 changes: 10 additions & 0 deletions block/internal/da/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package da
import (
"context"

blobrpc "github.com/evstack/ev-node/pkg/da/jsonrpc"
datypes "github.com/evstack/ev-node/pkg/da/types"
)

Expand All @@ -22,6 +23,15 @@ type Client interface {
GetDataNamespace() []byte
GetForcedInclusionNamespace() []byte
HasForcedInclusionNamespace() bool

// Subscribe subscribes to blobs in the specified namespace.
// Returns a channel that receives subscription responses as new blobs are included.
// Used for follow mode to receive real-time blob notifications.
Subscribe(ctx context.Context, namespace []byte) (<-chan *blobrpc.SubscriptionResponse, error)

// LocalHead returns the height of the locally synced DA head.
// Used to determine if the node is caught up with the DA layer.
LocalHead(ctx context.Context) (uint64, error)
}

// Verifier defines the interface for DA proof verification operations.
Expand Down
10 changes: 7 additions & 3 deletions block/internal/syncing/da_retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import (
// DARetriever defines the interface for retrieving events from the DA layer
type DARetriever interface {
RetrieveFromDA(ctx context.Context, daHeight uint64) ([]common.DAHeightEvent, error)
// ProcessBlobs processes raw blobs from subscription and returns height events.
// Used by follow mode to process real-time blob notifications.
ProcessBlobs(ctx context.Context, blobs [][]byte, daHeight uint64) []common.DAHeightEvent
}

// daRetriever handles DA retrieval operations for syncing
Expand Down Expand Up @@ -72,7 +75,7 @@ func (r *daRetriever) RetrieveFromDA(ctx context.Context, daHeight uint64) ([]co
}

r.logger.Debug().Int("blobs", len(blobsResp.Data)).Uint64("da_height", daHeight).Msg("retrieved blob data")
return r.processBlobs(ctx, blobsResp.Data, daHeight), nil
return r.ProcessBlobs(ctx, blobsResp.Data, daHeight), nil
}

// fetchBlobs retrieves blobs from both header and data namespaces
Expand Down Expand Up @@ -148,8 +151,9 @@ func (r *daRetriever) validateBlobResponse(res datypes.ResultRetrieve, daHeight
}
}

// processBlobs processes retrieved blobs to extract headers and data and returns height events
func (r *daRetriever) processBlobs(ctx context.Context, blobs [][]byte, daHeight uint64) []common.DAHeightEvent {
// ProcessBlobs processes retrieved blobs to extract headers and data and returns height events.
// This method implements the DARetriever interface and is used by both polling and subscription modes.
func (r *daRetriever) ProcessBlobs(ctx context.Context, blobs [][]byte, daHeight uint64) []common.DAHeightEvent {
// Decode all blobs
for _, bz := range blobs {
if len(bz) == 0 {
Expand Down
65 changes: 65 additions & 0 deletions block/internal/syncing/da_retriever_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 7 additions & 7 deletions block/internal/syncing/da_retriever_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ func TestDARetriever_ProcessBlobs_HeaderAndData_Success(t *testing.T) {
dataBin, data := makeSignedDataBytes(t, gen.ChainID, 2, addr, pub, signer, 2)
hdrBin, _ := makeSignedHeaderBytes(t, gen.ChainID, 2, addr, pub, signer, nil, &data.Data, nil)

events := r.processBlobs(context.Background(), [][]byte{hdrBin, dataBin}, 77)
events := r.ProcessBlobs(context.Background(), [][]byte{hdrBin, dataBin}, 77)
require.Len(t, events, 1)
assert.Equal(t, uint64(2), events[0].Header.Height())
assert.Equal(t, uint64(2), events[0].Data.Height())
Expand All @@ -172,7 +172,7 @@ func TestDARetriever_ProcessBlobs_HeaderOnly_EmptyDataExpected(t *testing.T) {
// Header with no data hash present should trigger empty data creation (per current logic)
hb, _ := makeSignedHeaderBytes(t, gen.ChainID, 3, addr, pub, signer, nil, nil, nil)

events := r.processBlobs(context.Background(), [][]byte{hb}, 88)
events := r.ProcessBlobs(context.Background(), [][]byte{hb}, 88)
require.Len(t, events, 1)
assert.Equal(t, uint64(3), events[0].Header.Height())
assert.NotNil(t, events[0].Data)
Expand Down Expand Up @@ -282,14 +282,14 @@ func TestDARetriever_ProcessBlobs_CrossDAHeightMatching(t *testing.T) {
hdrBin, _ := makeSignedHeaderBytes(t, gen.ChainID, 5, addr, pub, signer, nil, &data.Data, nil)

// Process header from DA height 100 first
events1 := r.processBlobs(context.Background(), [][]byte{hdrBin}, 100)
events1 := r.ProcessBlobs(context.Background(), [][]byte{hdrBin}, 100)
require.Len(t, events1, 0, "should not create event yet - data is missing")

// Verify header is stored in pending headers
require.Contains(t, r.pendingHeaders, uint64(5), "header should be stored as pending")

// Process data from DA height 102
events2 := r.processBlobs(context.Background(), [][]byte{dataBin}, 102)
events2 := r.ProcessBlobs(context.Background(), [][]byte{dataBin}, 102)
require.Len(t, events2, 1, "should create event when matching data arrives")

event := events2[0]
Expand Down Expand Up @@ -319,7 +319,7 @@ func TestDARetriever_ProcessBlobs_MultipleHeadersCrossDAHeightMatching(t *testin
hdr5Bin, _ := makeSignedHeaderBytes(t, gen.ChainID, 5, addr, pub, signer, nil, &data5.Data, nil)

// Process multiple headers from DA height 200 - should be stored as pending
events1 := r.processBlobs(context.Background(), [][]byte{hdr3Bin, hdr4Bin, hdr5Bin}, 200)
events1 := r.ProcessBlobs(context.Background(), [][]byte{hdr3Bin, hdr4Bin, hdr5Bin}, 200)
require.Len(t, events1, 0, "should not create events yet - all data is missing")

// Verify all headers are stored in pending
Expand All @@ -328,7 +328,7 @@ func TestDARetriever_ProcessBlobs_MultipleHeadersCrossDAHeightMatching(t *testin
require.Contains(t, r.pendingHeaders, uint64(5), "header 5 should be pending")

// Process some data from DA height 203 - should create partial events
events2 := r.processBlobs(context.Background(), [][]byte{data3Bin, data5Bin}, 203)
events2 := r.ProcessBlobs(context.Background(), [][]byte{data3Bin, data5Bin}, 203)
require.Len(t, events2, 2, "should create events for heights 3 and 5")

// Sort events by height for consistent testing
Expand All @@ -352,7 +352,7 @@ func TestDARetriever_ProcessBlobs_MultipleHeadersCrossDAHeightMatching(t *testin
require.NotContains(t, r.pendingHeaders, uint64(5), "header 5 should be removed from pending")

// Process remaining data from DA height 205
events3 := r.processBlobs(context.Background(), [][]byte{data4Bin}, 205)
events3 := r.ProcessBlobs(context.Background(), [][]byte{data4Bin}, 205)
require.Len(t, events3, 1, "should create event for height 4")

// Verify final event for height 4
Expand Down
Loading
Loading