Skip to content

Commit 01dcada

Browse files
authored
feat(tracing): adding block production tracing (#2980)
<!-- Please read and fill out this form before submitting your PR. Please make sure you have reviewed our contributors guide before submitting your first PR. NOTE: PR titles should follow semantic commits: https://www.conventionalcommits.org/en/v1.0.0/ --> ## Overview Follows the same pattern as before and extracts a `BlockProducer` interface and introduces a tracing implementation which wraps the real one and is wrapped when telemetry is enabled. <!-- Please provide an explanation of the PR, including the appropriate context, background, goal, and rationale. If there is an issue with this information, please provide a tl;dr and link the issue. Ex: Closes #<issue number> -->
1 parent 3146f0c commit 01dcada

File tree

8 files changed

+548
-38
lines changed

8 files changed

+548
-38
lines changed

block/components.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,10 @@ func NewAggregatorComponents(
220220
return nil, fmt.Errorf("failed to create executor: %w", err)
221221
}
222222

223+
if config.Instrumentation.IsTracingEnabled() {
224+
executor.SetBlockProducer(executing.WithTracingBlockProducer(executor))
225+
}
226+
223227
reaper, err := reaping.NewReaper(
224228
exec,
225229
sequencer,
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package executing
2+
3+
import (
4+
"context"
5+
6+
"github.com/evstack/ev-node/types"
7+
)
8+
9+
// BlockProducer defines operations for block production that can be traced.
10+
// The Executor implements this interface, and a tracing decorator can wrap it
11+
// to add OpenTelemetry spans to each operation.
12+
type BlockProducer interface {
13+
// ProduceBlock creates, validates, and broadcasts a new block.
14+
ProduceBlock(ctx context.Context) error
15+
16+
// RetrieveBatch gets the next batch of transactions from the sequencer.
17+
RetrieveBatch(ctx context.Context) (*BatchData, error)
18+
19+
// CreateBlock constructs a new block from the given batch data.
20+
CreateBlock(ctx context.Context, height uint64, batchData *BatchData) (*types.SignedHeader, *types.Data, error)
21+
22+
// ApplyBlock executes the block transactions and returns the new state.
23+
ApplyBlock(ctx context.Context, header types.Header, data *types.Data) (types.State, error)
24+
25+
// ValidateBlock validates block structure and state transitions.
26+
ValidateBlock(ctx context.Context, lastState types.State, header *types.SignedHeader, data *types.Data) error
27+
}

block/internal/executing/executor.go

Lines changed: 40 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ import (
2525
"github.com/evstack/ev-node/types"
2626
)
2727

28+
var _ BlockProducer = (*Executor)(nil)
29+
2830
// Executor handles block production, transaction processing, and state management
2931
type Executor struct {
3032
// Core components
@@ -60,6 +62,10 @@ type Executor struct {
6062
ctx context.Context
6163
cancel context.CancelFunc
6264
wg sync.WaitGroup
65+
66+
// blockProducer is the interface used for block production operations.
67+
// defaults to self, but can be wrapped with tracing.
68+
blockProducer BlockProducer
6369
}
6470

6571
// NewExecutor creates a new block executor.
@@ -101,7 +107,7 @@ func NewExecutor(
101107
}
102108
}
103109

104-
return &Executor{
110+
e := &Executor{
105111
store: store,
106112
exec: exec,
107113
sequencer: sequencer,
@@ -117,7 +123,15 @@ func NewExecutor(
117123
txNotifyCh: make(chan struct{}, 1),
118124
errorCh: errorCh,
119125
logger: logger.With().Str("component", "executor").Logger(),
120-
}, nil
126+
}
127+
e.blockProducer = e
128+
return e, nil
129+
}
130+
131+
// SetBlockProducer sets the block producer interface, allowing injection of
132+
// a tracing wrapper or other decorator.
133+
func (e *Executor) SetBlockProducer(bp BlockProducer) {
134+
e.blockProducer = bp
121135
}
122136

123137
// Start begins the execution component
@@ -279,7 +293,7 @@ func (e *Executor) executionLoop() {
279293
continue
280294
}
281295

282-
if err := e.produceBlock(); err != nil {
296+
if err := e.blockProducer.ProduceBlock(e.ctx); err != nil {
283297
e.logger.Error().Err(err).Msg("failed to produce block")
284298
}
285299
txsAvailable = false
@@ -288,7 +302,7 @@ func (e *Executor) executionLoop() {
288302

289303
case <-lazyTimerCh:
290304
e.logger.Debug().Msg("Lazy timer triggered block production")
291-
if err := e.produceBlock(); err != nil {
305+
if err := e.blockProducer.ProduceBlock(e.ctx); err != nil {
292306
e.logger.Error().Err(err).Msg("failed to produce block from lazy timer")
293307
}
294308
// Reset lazy timer
@@ -300,8 +314,8 @@ func (e *Executor) executionLoop() {
300314
}
301315
}
302316

303-
// produceBlock creates, validates, and stores a new block
304-
func (e *Executor) produceBlock() error {
317+
// ProduceBlock creates, validates, and stores a new block.
318+
func (e *Executor) ProduceBlock(ctx context.Context) error {
305319
start := time.Now()
306320
defer func() {
307321
if e.metrics.OperationDuration["block_production"] != nil {
@@ -338,7 +352,7 @@ func (e *Executor) produceBlock() error {
338352

339353
// Check if there's an already stored block at the newHeight
340354
// If there is use that instead of creating a new block
341-
pendingHeader, pendingData, err := e.store.GetBlockData(e.ctx, newHeight)
355+
pendingHeader, pendingData, err := e.store.GetBlockData(ctx, newHeight)
342356
if err == nil {
343357
e.logger.Info().Uint64("height", newHeight).Msg("using pending block")
344358
header = pendingHeader
@@ -347,7 +361,7 @@ func (e *Executor) produceBlock() error {
347361
return fmt.Errorf("failed to get block data: %w", err)
348362
} else {
349363
// get batch from sequencer
350-
batchData, err = e.retrieveBatch(e.ctx)
364+
batchData, err = e.blockProducer.RetrieveBatch(ctx)
351365
if errors.Is(err, common.ErrNoBatch) {
352366
e.logger.Debug().Msg("no batch available")
353367
return nil
@@ -357,13 +371,13 @@ func (e *Executor) produceBlock() error {
357371
return fmt.Errorf("failed to retrieve batch: %w", err)
358372
}
359373

360-
header, data, err = e.createBlock(e.ctx, newHeight, batchData)
374+
header, data, err = e.blockProducer.CreateBlock(ctx, newHeight, batchData)
361375
if err != nil {
362376
return fmt.Errorf("failed to create block: %w", err)
363377
}
364378

365379
// saved early for crash recovery, will be overwritten later with the final signature
366-
batch, err := e.store.NewBatch(e.ctx)
380+
batch, err := e.store.NewBatch(ctx)
367381
if err != nil {
368382
return fmt.Errorf("failed to create batch for early save: %w", err)
369383
}
@@ -378,12 +392,12 @@ func (e *Executor) produceBlock() error {
378392
// Pass force-included mask through context for execution optimization
379393
// Force-included txs (from DA) MUST be validated as they're from untrusted sources
380394
// Mempool txs can skip validation as they were validated when added to mempool
381-
ctx := e.ctx
395+
applyCtx := ctx
382396
if batchData != nil && batchData.Batch != nil && batchData.ForceIncludedMask != nil {
383-
ctx = coreexecutor.WithForceIncludedMask(ctx, batchData.ForceIncludedMask)
397+
applyCtx = coreexecutor.WithForceIncludedMask(applyCtx, batchData.ForceIncludedMask)
384398
}
385399

386-
newState, err := e.applyBlock(ctx, header.Header, data)
400+
newState, err := e.blockProducer.ApplyBlock(applyCtx, header.Header, data)
387401
if err != nil {
388402
return fmt.Errorf("failed to apply block: %w", err)
389403
}
@@ -400,13 +414,13 @@ func (e *Executor) produceBlock() error {
400414
}
401415
header.Signature = signature
402416

403-
if err := e.validateBlock(currentState, header, data); err != nil {
417+
if err := e.blockProducer.ValidateBlock(ctx, currentState, header, data); err != nil {
404418
e.sendCriticalError(fmt.Errorf("failed to validate block: %w", err))
405419
e.logger.Error().Err(err).Msg("CRITICAL: Permanent block validation error - halting block production")
406420
return fmt.Errorf("failed to validate block: %w", err)
407421
}
408422

409-
batch, err := e.store.NewBatch(e.ctx)
423+
batch, err := e.store.NewBatch(ctx)
410424
if err != nil {
411425
return fmt.Errorf("failed to create batch: %w", err)
412426
}
@@ -431,9 +445,9 @@ func (e *Executor) produceBlock() error {
431445
e.setLastState(newState)
432446

433447
// broadcast header and data to P2P network
434-
g, ctx := errgroup.WithContext(e.ctx)
435-
g.Go(func() error { return e.headerBroadcaster.WriteToStoreAndBroadcast(ctx, header) })
436-
g.Go(func() error { return e.dataBroadcaster.WriteToStoreAndBroadcast(ctx, data) })
448+
g, broadcastCtx := errgroup.WithContext(ctx)
449+
g.Go(func() error { return e.headerBroadcaster.WriteToStoreAndBroadcast(broadcastCtx, header) })
450+
g.Go(func() error { return e.dataBroadcaster.WriteToStoreAndBroadcast(broadcastCtx, data) })
437451
if err := g.Wait(); err != nil {
438452
e.logger.Error().Err(err).Msg("failed to broadcast header and/data")
439453
// don't fail block production on broadcast error
@@ -449,8 +463,8 @@ func (e *Executor) produceBlock() error {
449463
return nil
450464
}
451465

452-
// retrieveBatch gets the next batch of transactions from the sequencer
453-
func (e *Executor) retrieveBatch(ctx context.Context) (*BatchData, error) {
466+
// RetrieveBatch gets the next batch of transactions from the sequencer.
467+
func (e *Executor) RetrieveBatch(ctx context.Context) (*BatchData, error) {
454468
req := coresequencer.GetNextBatchRequest{
455469
Id: []byte(e.genesis.ChainID),
456470
MaxBytes: common.DefaultMaxBlobSize,
@@ -481,8 +495,8 @@ func (e *Executor) retrieveBatch(ctx context.Context) (*BatchData, error) {
481495
}, nil
482496
}
483497

484-
// createBlock creates a new block from the given batch
485-
func (e *Executor) createBlock(ctx context.Context, height uint64, batchData *BatchData) (*types.SignedHeader, *types.Data, error) {
498+
// CreateBlock creates a new block from the given batch.
499+
func (e *Executor) CreateBlock(ctx context.Context, height uint64, batchData *BatchData) (*types.SignedHeader, *types.Data, error) {
486500
currentState := e.getLastState()
487501
headerTime := uint64(e.genesis.StartTime.UnixNano())
488502

@@ -581,8 +595,8 @@ func (e *Executor) createBlock(ctx context.Context, height uint64, batchData *Ba
581595
return header, data, nil
582596
}
583597

584-
// applyBlock applies the block to get the new state
585-
func (e *Executor) applyBlock(ctx context.Context, header types.Header, data *types.Data) (types.State, error) {
598+
// ApplyBlock applies the block to get the new state.
599+
func (e *Executor) ApplyBlock(ctx context.Context, header types.Header, data *types.Data) (types.State, error) {
586600
currentState := e.getLastState()
587601

588602
// Prepare transactions
@@ -654,8 +668,8 @@ func (e *Executor) executeTxsWithRetry(ctx context.Context, rawTxs [][]byte, hea
654668
return nil, nil
655669
}
656670

657-
// validateBlock validates the created block
658-
func (e *Executor) validateBlock(lastState types.State, header *types.SignedHeader, data *types.Data) error {
671+
// ValidateBlock validates the created block.
672+
func (e *Executor) ValidateBlock(_ context.Context, lastState types.State, header *types.SignedHeader, data *types.Data) error {
659673
// Set custom verifier for aggregator node signature
660674
header.SetCustomVerifierForAggregator(e.options.AggregatorNodeSignatureBytesProvider)
661675

block/internal/executing/executor_lazy_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,8 @@ func TestLazyMode_ProduceBlockLogic(t *testing.T) {
9494

9595
mockSeq.EXPECT().GetDAHeight().Return(uint64(0)).Once()
9696

97-
// Direct call to produceBlock should work (this is what lazy timer does)
98-
err = exec.produceBlock()
97+
// Direct call to ProduceBlock should work (this is what lazy timer does)
98+
err = exec.ProduceBlock(exec.ctx)
9999
require.NoError(t, err)
100100

101101
h1, err := memStore.Height(context.Background())
@@ -118,7 +118,7 @@ func TestLazyMode_ProduceBlockLogic(t *testing.T) {
118118

119119
mockSeq.EXPECT().GetDAHeight().Return(uint64(0)).Once()
120120

121-
err = exec.produceBlock()
121+
err = exec.ProduceBlock(exec.ctx)
122122
require.NoError(t, err)
123123

124124
h2, err := memStore.Height(context.Background())
@@ -209,7 +209,7 @@ func TestRegularMode_ProduceBlockLogic(t *testing.T) {
209209

210210
mockSeq.EXPECT().GetDAHeight().Return(uint64(0)).Once()
211211

212-
err = exec.produceBlock()
212+
err = exec.ProduceBlock(exec.ctx)
213213
require.NoError(t, err)
214214

215215
h1, err := memStore.Height(context.Background())

block/internal/executing/executor_logic_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ func TestProduceBlock_EmptyBatch_SetsEmptyDataHash(t *testing.T) {
117117
mockSeq.EXPECT().GetDAHeight().Return(uint64(0)).Once()
118118

119119
// produce one block
120-
err = exec.produceBlock()
120+
err = exec.ProduceBlock(exec.ctx)
121121
require.NoError(t, err)
122122

123123
// Verify height and stored block
@@ -202,14 +202,14 @@ func TestPendingLimit_SkipsProduction(t *testing.T) {
202202

203203
mockSeq.EXPECT().GetDAHeight().Return(uint64(0)).Once()
204204

205-
require.NoError(t, exec.produceBlock())
205+
require.NoError(t, exec.ProduceBlock(exec.ctx))
206206
h1, err := memStore.Height(context.Background())
207207
require.NoError(t, err)
208208
assert.Equal(t, uint64(1), h1)
209209

210210
// With limit=1 and lastSubmitted default 0, pending >= 1 so next production should be skipped
211-
// No new expectations; produceBlock should return early before hitting sequencer
212-
require.NoError(t, exec.produceBlock())
211+
// No new expectations; ProduceBlock should return early before hitting sequencer
212+
require.NoError(t, exec.ProduceBlock(exec.ctx))
213213
h2, err := memStore.Height(context.Background())
214214
require.NoError(t, err)
215215
assert.Equal(t, h1, h2, "height should not change when production is skipped")

block/internal/executing/executor_restart_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ func TestExecutor_RestartUsesPendingHeader(t *testing.T) {
9595

9696
mockSeq1.EXPECT().GetDAHeight().Return(uint64(0)).Once()
9797

98-
err = exec1.produceBlock()
98+
err = exec1.ProduceBlock(exec1.ctx)
9999
require.NoError(t, err)
100100

101101
// Verify first block was produced
@@ -214,7 +214,7 @@ func TestExecutor_RestartUsesPendingHeader(t *testing.T) {
214214

215215
// Note: mockSeq2 should NOT receive GetNextBatch calls because pending block should be used
216216

217-
err = exec2.produceBlock()
217+
err = exec2.ProduceBlock(exec2.ctx)
218218
require.NoError(t, err)
219219

220220
// Verify height advanced to 2
@@ -316,7 +316,7 @@ func TestExecutor_RestartNoPendingHeader(t *testing.T) {
316316

317317
mockSeq1.EXPECT().GetDAHeight().Return(uint64(0)).Once()
318318

319-
err = exec1.produceBlock()
319+
err = exec1.ProduceBlock(exec1.ctx)
320320
require.NoError(t, err)
321321

322322
// Stop first executor
@@ -372,7 +372,7 @@ func TestExecutor_RestartNoPendingHeader(t *testing.T) {
372372

373373
mockSeq2.EXPECT().GetDAHeight().Return(uint64(0)).Once()
374374

375-
err = exec2.produceBlock()
375+
err = exec2.ProduceBlock(exec2.ctx)
376376
require.NoError(t, err)
377377

378378
// Verify normal operation

0 commit comments

Comments
 (0)