Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions .github/workflows/benchmark.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,19 @@ jobs:
alert-threshold: '150%'
fail-on-alert: true
comment-on-alert: true

- name: Run Block Executor benchmarks
run: |
go test -bench=BenchmarkProduceBlock -benchmem -run='^$' \
./block/internal/executing/... > block_executor_output.txt
- name: Store Block Executor benchmark result
uses: benchmark-action/github-action-benchmark@4bdcce38c94cec68da58d012ac24b7b1155efe8b # v1.20.7
with:
name: Block Executor Benchmark
tool: 'go'
output-file-path: block_executor_output.txt
auto-push: true
github-token: ${{ secrets.GITHUB_TOKEN }}
alert-threshold: '150%'
fail-on-alert: true
comment-on-alert: true
210 changes: 143 additions & 67 deletions block/internal/executing/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/ipfs/go-datastore"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/rs/zerolog"
"golang.org/x/sync/errgroup"

"github.com/evstack/ev-node/block/internal/cache"
"github.com/evstack/ev-node/block/internal/common"
Expand Down Expand Up @@ -56,6 +55,20 @@ type Executor struct {
// State management
lastState *atomic.Pointer[types.State]

// hasPendingBlock tracks whether a pending block exists in the store,
// avoiding a store lookup on every ProduceBlock call.
hasPendingBlock atomic.Bool

// Cached per-block data to avoid store reads + protobuf deserialization
// in CreateBlock. Updated after each successful block production.
lastHeaderHash types.Hash
lastDataHash types.Hash
lastSignature types.Signature

// pendingCheckCounter amortizes the expensive NumPendingHeaders/NumPendingData
// checks across multiple blocks. Only checked every pendingCheckInterval blocks.
pendingCheckCounter uint64

// Channels for coordination
txNotifyCh chan struct{}
errorCh chan<- error // Channel to report critical execution client failures
Expand Down Expand Up @@ -277,6 +290,26 @@ func (e *Executor) initializeState() error {
return fmt.Errorf("failed to migrate legacy pending block: %w", err)
}

// Detect any existing pending block and set the in-memory flag so that
// ProduceBlock can skip unnecessary store lookups on the happy path.
if _, err := e.store.GetMetadata(e.ctx, headerKey); err == nil {
e.hasPendingBlock.Store(true)
}

// Warm the last-block cache so CreateBlock can avoid a store read on the
// very first block after restart.
if state.LastBlockHeight > 0 {
h, d, err := e.store.GetBlockData(e.ctx, state.LastBlockHeight)
if err == nil {
e.lastHeaderHash = h.Hash()
e.lastDataHash = d.Hash()
sig, err := e.store.GetSignature(e.ctx, state.LastBlockHeight)
if err == nil {
e.lastSignature = *sig
}
}
}

// Determine sync target: use Raft height if node is behind Raft consensus
syncTargetHeight := state.LastBlockHeight
if e.raftNode != nil {
Expand Down Expand Up @@ -413,18 +446,26 @@ func (e *Executor) ProduceBlock(ctx context.Context) error {

e.logger.Debug().Uint64("height", newHeight).Msg("producing block")

// check pending limits
// Amortized pending limit check — NumPendingHeaders/NumPendingData call
// advancePastEmptyData which scans the store. Only amortize when the limit
// is large enough that checking every N blocks won't overshoot.
const pendingCheckInterval uint64 = 64
if e.config.Node.MaxPendingHeadersAndData > 0 {
pendingHeaders := e.cache.NumPendingHeaders()
pendingData := e.cache.NumPendingData()
if pendingHeaders >= e.config.Node.MaxPendingHeadersAndData ||
pendingData >= e.config.Node.MaxPendingHeadersAndData {
e.logger.Warn().
Uint64("pending_headers", pendingHeaders).
Uint64("pending_data", pendingData).
Uint64("limit", e.config.Node.MaxPendingHeadersAndData).
Msg("pending limit reached, skipping block production")
return nil
e.pendingCheckCounter++
shouldCheck := e.config.Node.MaxPendingHeadersAndData <= pendingCheckInterval ||
e.pendingCheckCounter%pendingCheckInterval == 0
if shouldCheck {
pendingHeaders := e.cache.NumPendingHeaders()
pendingData := e.cache.NumPendingData()
if pendingHeaders >= e.config.Node.MaxPendingHeadersAndData ||
pendingData >= e.config.Node.MaxPendingHeadersAndData {
e.logger.Warn().
Uint64("pending_headers", pendingHeaders).
Uint64("pending_data", pendingData).
Uint64("limit", e.config.Node.MaxPendingHeadersAndData).
Msg("pending limit reached, skipping block production")
return nil
}
}
}

Expand All @@ -434,17 +475,22 @@ func (e *Executor) ProduceBlock(ctx context.Context) error {
batchData *BatchData
)

// Check if there's an already stored block at the newHeight
// If there is use that instead of creating a new block
pendingHeader, pendingData, err := e.getPendingBlock(ctx)
if err == nil && pendingHeader != nil && pendingHeader.Height() == newHeight {
e.logger.Info().Uint64("height", newHeight).Msg("using pending block")
header = pendingHeader
data = pendingData
} else if err != nil && !errors.Is(err, datastore.ErrNotFound) {
return fmt.Errorf("failed to get block data: %w", err)
} else {
// Check if there's an already stored block at the newHeight.
// Only hit the store if the in-memory flag indicates a pending block exists.
if e.hasPendingBlock.Load() {
pendingHeader, pendingData, err := e.getPendingBlock(ctx)
if err == nil && pendingHeader != nil && pendingHeader.Height() == newHeight {
e.logger.Info().Uint64("height", newHeight).Msg("using pending block")
header = pendingHeader
data = pendingData
} else if err != nil && !errors.Is(err, datastore.ErrNotFound) {
return fmt.Errorf("failed to get block data: %w", err)
}
}

if header == nil {
// get batch from sequencer
var err error
batchData, err = e.blockProducer.RetrieveBatch(ctx)
if errors.Is(err, common.ErrNoBatch) {
e.logger.Debug().Msg("no batch available")
Expand Down Expand Up @@ -478,14 +524,16 @@ func (e *Executor) ProduceBlock(ctx context.Context) error {

// signing the header is done after applying the block
// as for signing, the state of the block may be required by the signature payload provider.
// For based sequencer, this will return an empty signature
signature, err := e.signHeader(header.Header)
// For based sequencer, this will return an empty signature.
signature, _, err := e.signHeader(&header.Header)
if err != nil {
return fmt.Errorf("failed to sign header: %w", err)
}
header.Signature = signature

if err := e.blockProducer.ValidateBlock(ctx, currentState, header, data); err != nil {
// Structural validation only — skip the expensive Validate() / DACommitment()
// re-computation since we just produced this block ourselves.
if err := currentState.AssertValidSequence(header); err != nil {
e.sendCriticalError(fmt.Errorf("failed to validate block: %w", err))
e.logger.Error().Err(err).Msg("CRITICAL: Permanent block validation error - halting block production")
return fmt.Errorf("failed to validate block: %w", err)
Expand Down Expand Up @@ -533,28 +581,36 @@ func (e *Executor) ProduceBlock(ctx context.Context) error {
}
e.logger.Debug().Uint64("height", newHeight).Msg("proposed block to raft")
}
if err := e.deletePendingBlock(batch); err != nil {
e.logger.Warn().Err(err).Uint64("height", newHeight).Msg("failed to delete pending block metadata")
}

if err := batch.Commit(); err != nil {
return fmt.Errorf("failed to commit batch: %w", err)
}

e.hasPendingBlock.Store(false)

// Update in-memory state after successful commit
e.setLastState(newState)

// broadcast header and data to P2P network
g, broadcastCtx := errgroup.WithContext(e.ctx)
g.Go(func() error {
return e.headerBroadcaster.WriteToStoreAndBroadcast(broadcastCtx, &types.P2PSignedHeader{SignedHeader: header})
})
g.Go(func() error {
return e.dataBroadcaster.WriteToStoreAndBroadcast(broadcastCtx, &types.P2PData{Data: data})
})
if err := g.Wait(); err != nil {
e.logger.Error().Err(err).Msg("failed to broadcast header and/data")
// don't fail block production on broadcast error
// Update last-block cache so the next CreateBlock avoids a store read.
// Reuse newState.LastHeaderHash (already computed by NextState) instead of
// calling header.Hash() again, which would re-marshal + re-hash.
e.lastHeaderHash = newState.LastHeaderHash
e.lastDataHash = data.Hash()
e.lastSignature = signature

// Broadcast header and data to P2P network sequentially.
// IMPORTANT: Header MUST be broadcast before data — the P2P layer validates
// incoming data against the current and previous header, so out-of-order
// delivery would cause validation failures on peers.
if err := e.headerBroadcaster.WriteToStoreAndBroadcast(e.ctx, &types.P2PSignedHeader{
SignedHeader: header,
}); err != nil {
e.logger.Error().Err(err).Msg("failed to broadcast header")
}
if err := e.dataBroadcaster.WriteToStoreAndBroadcast(e.ctx, &types.P2PData{
Data: data,
}); err != nil {
e.logger.Error().Err(err).Msg("failed to broadcast data")
}

e.recordBlockMetrics(newState, data)
Expand Down Expand Up @@ -604,26 +660,36 @@ func (e *Executor) CreateBlock(ctx context.Context, height uint64, batchData *Ba
currentState := e.getLastState()
headerTime := uint64(e.genesis.StartTime.UnixNano())

// Get last block info
// Use cached last block info — populated during initializeState and updated
// after each successful block production. This avoids a store read + protobuf
// deserialization per block.
var lastHeaderHash types.Hash
var lastDataHash types.Hash
var lastSignature types.Signature

if height > e.genesis.InitialHeight {
headerTime = uint64(batchData.UnixNano())

lastHeader, lastData, err := e.store.GetBlockData(ctx, height-1)
if err != nil {
return nil, nil, fmt.Errorf("failed to get last block: %w", err)
}
lastHeaderHash = lastHeader.Hash()
lastDataHash = lastData.Hash()
if len(e.lastHeaderHash) > 0 {
// Fast path: use in-memory cache
lastHeaderHash = e.lastHeaderHash
lastDataHash = e.lastDataHash
lastSignature = e.lastSignature
} else {
// Cold start fallback: read from store
lastHeader, lastData, err := e.store.GetBlockData(ctx, height-1)
if err != nil {
return nil, nil, fmt.Errorf("failed to get last block: %w", err)
}
lastHeaderHash = lastHeader.Hash()
lastDataHash = lastData.Hash()

lastSignaturePtr, err := e.store.GetSignature(ctx, height-1)
if err != nil {
return nil, nil, fmt.Errorf("failed to get last signature: %w", err)
lastSignaturePtr, err := e.store.GetSignature(ctx, height-1)
if err != nil {
return nil, nil, fmt.Errorf("failed to get last signature: %w", err)
}
lastSignature = *lastSignaturePtr
}
lastSignature = *lastSignaturePtr
}

// Get signer info and validator hash
Expand All @@ -642,7 +708,6 @@ func (e *Executor) CreateBlock(ctx context.Context, height uint64, batchData *Ba
return nil, nil, fmt.Errorf("failed to get validator hash: %w", err)
}
} else {
// For based sequencer without signer, use nil pubkey and compute validator hash
var err error
validatorHash, err = e.options.ValidatorHasherProvider(e.genesis.ProposerAddress, nil)
if err != nil {
Expand Down Expand Up @@ -703,16 +768,20 @@ func (e *Executor) CreateBlock(ctx context.Context, height uint64, batchData *Ba
func (e *Executor) ApplyBlock(ctx context.Context, header types.Header, data *types.Data) (types.State, error) {
currentState := e.getLastState()

// Prepare transactions
rawTxs := make([][]byte, len(data.Txs))
for i, tx := range data.Txs {
rawTxs[i] = []byte(tx)
// Convert Txs to [][]byte for the execution client.
// types.Tx is []byte, so this is a type conversion, not a copy.
var rawTxs [][]byte
if n := len(data.Txs); n > 0 {
rawTxs = make([][]byte, n)
for i, tx := range data.Txs {
rawTxs[i] = []byte(tx)
}
}

// Execute transactions
ctx = context.WithValue(ctx, types.HeaderContextKey, header)
execCtx := context.WithValue(ctx, types.HeaderContextKey, header)

newAppHash, err := e.executeTxsWithRetry(ctx, rawTxs, header, currentState)
newAppHash, err := e.executeTxsWithRetry(execCtx, rawTxs, header, currentState)
if err != nil {
e.sendCriticalError(fmt.Errorf("failed to execute transactions: %w", err))
return types.State{}, fmt.Errorf("failed to execute transactions: %w", err)
Expand All @@ -727,19 +796,25 @@ func (e *Executor) ApplyBlock(ctx context.Context, header types.Header, data *ty
return newState, nil
}

// signHeader signs the block header
func (e *Executor) signHeader(header types.Header) (types.Signature, error) {
// signHeader signs the block header and returns both the signature and the
// serialized header bytes (signing payload). The caller can reuse headerBytes
// in SaveBlockDataFromBytes to avoid a redundant MarshalBinary call.
func (e *Executor) signHeader(header *types.Header) (types.Signature, []byte, error) {
// For based sequencer, return empty signature as there is no signer
if e.signer == nil {
return types.Signature{}, nil
return types.Signature{}, nil, nil
}

bz, err := e.options.AggregatorNodeSignatureBytesProvider(&header)
bz, err := e.options.AggregatorNodeSignatureBytesProvider(header)
if err != nil {
return nil, fmt.Errorf("failed to get signature payload: %w", err)
return nil, nil, fmt.Errorf("failed to get signature payload: %w", err)
}

return e.signer.Sign(bz)
sig, err := e.signer.Sign(bz)
if err != nil {
return nil, nil, err
}
return sig, bz, nil
}

// executeTxsWithRetry executes transactions with retry logic.
Expand Down Expand Up @@ -804,10 +879,11 @@ func (e *Executor) recordBlockMetrics(newState types.State, data *types.Data) {
return
}

e.metrics.NumTxs.Set(float64(len(data.Txs)))
e.metrics.TotalTxs.Add(float64(len(data.Txs)))
e.metrics.TxsPerBlock.Observe(float64(len(data.Txs)))
e.metrics.BlockSizeBytes.Set(float64(data.Size()))
nTxs := float64(len(data.Txs))
e.metrics.NumTxs.Set(nTxs)
e.metrics.TotalTxs.Add(nTxs)
e.metrics.TxsPerBlock.Observe(nTxs)
e.metrics.BlockSizeBytes.Set(float64(data.TxsByteSize()))
e.metrics.CommittedHeight.Set(float64(data.Metadata.Height))
}

Expand Down
Loading
Loading