Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ packages:
dir: ./test/mocks
pkgname: mocks
filename: store.go
Batch:
config:
dir: ./test/mocks
pkgname: mocks
filename: batch.go
github.com/celestiaorg/go-header:
interfaces:
Store:
Expand Down
10 changes: 10 additions & 0 deletions block/internal/cache/pending_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cache

import (
"context"
"errors"
"fmt"

"github.com/rs/zerolog"
Expand Down Expand Up @@ -29,8 +30,17 @@ type PendingData struct {
base *pendingBase[*types.Data]
}

var errInFlightData = errors.New("inflight data")

func fetchData(ctx context.Context, store store.Store, height uint64) (*types.Data, error) {
_, data, err := store.GetBlockData(ctx, height)
if err != nil {
return nil, err
}
// in the executor, WIP data is temporary stored. skip them until the process is completed
if data.Height() == 0 {
return nil, errInFlightData
}
return data, err
}

Expand Down
10 changes: 10 additions & 0 deletions block/internal/cache/pending_headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cache

import (
"context"
"errors"
"fmt"

"github.com/rs/zerolog"
Expand All @@ -26,8 +27,17 @@ type PendingHeaders struct {
base *pendingBase[*types.SignedHeader]
}

var errInFlightHeader = errors.New("inflight header")

func fetchSignedHeader(ctx context.Context, store storepkg.Store, height uint64) (*types.SignedHeader, error) {
header, err := store.GetHeader(ctx, height)
if err != nil {
return nil, err
}
// in the executor, WIP headers are temporary stored. skip them until the process is completed
if header.Height() == 0 {
return nil, errInFlightHeader
}
return header, err
}

Expand Down
14 changes: 11 additions & 3 deletions block/internal/executing/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,9 @@ func (e *Executor) initializeState() error {
if err := batch.SetHeight(state.LastBlockHeight); err != nil {
return fmt.Errorf("failed to set store height: %w", err)
}
if err := batch.UpdateState(state); err != nil {
return fmt.Errorf("failed to update state: %w", err)
}
if err := batch.Commit(); err != nil {
return fmt.Errorf("failed to commit batch: %w", err)
}
Expand All @@ -236,7 +239,8 @@ func (e *Executor) initializeState() error {

// Sync execution layer with store on startup
execReplayer := common.NewReplayer(e.store, e.exec, e.genesis, e.logger)
if err := execReplayer.SyncToHeight(e.ctx, state.LastBlockHeight); err != nil {
syncTargetHeight := state.LastBlockHeight
if err := execReplayer.SyncToHeight(e.ctx, syncTargetHeight); err != nil {
e.sendCriticalError(fmt.Errorf("failed to sync execution layer: %w", err))
return fmt.Errorf("failed to sync execution layer: %w", err)
}
Expand Down Expand Up @@ -281,7 +285,7 @@ func (e *Executor) executionLoop() {
}
txsAvailable := false

for {
for e.ctx.Err() == nil {
select {
case <-e.ctx.Done():
return
Expand Down Expand Up @@ -316,6 +320,10 @@ func (e *Executor) executionLoop() {

// ProduceBlock creates, validates, and stores a new block.
func (e *Executor) ProduceBlock(ctx context.Context) error {
if ctx.Err() != nil {
return ctx.Err()
}

start := time.Now()
defer func() {
if e.metrics.OperationDuration["block_production"] != nil {
Expand Down Expand Up @@ -582,7 +590,7 @@ func (e *Executor) CreateBlock(ctx context.Context, height uint64, batchData *Ba
}

for i, tx := range batchData.Transactions {
data.Txs[i] = types.Tx(tx)
data.Txs[i] = tx
}

// Set data hash
Expand Down
31 changes: 27 additions & 4 deletions block/internal/submitting/submitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ func (s *Submitter) Start(ctx context.Context) error {

// Start DA submission loop if signer is available (aggregator nodes only)
if s.signer != nil {
s.logger.Info().Msg("starting DA submission loop")
s.wg.Add(1)
go func() {
defer s.wg.Done()
Expand All @@ -150,7 +151,18 @@ func (s *Submitter) Stop() error {
if s.cancel != nil {
s.cancel()
}
s.wg.Wait()
// Wait for goroutines to finish with a timeout to prevent hanging
done := make(chan struct{})
go func() {
s.wg.Wait()
close(done)
}()
select {
case <-done:
// All goroutines finished cleanly
case <-time.After(5 * time.Second):
s.logger.Warn().Msg("submitter shutdown timed out waiting for goroutines, proceeding anyway")
}
s.logger.Info().Msg("submitter stopped")
return nil
}
Expand Down Expand Up @@ -180,8 +192,14 @@ func (s *Submitter) daSubmissionLoop() {
// For strategy decision, we need to estimate the size
// We'll fetch headers to check, but only submit if strategy approves
if s.headerSubmissionMtx.TryLock() {
s.logger.Debug().Time("t", time.Now()).Uint64("headers", headersNb).Msg("Header submission in progress")
s.wg.Add(1)
go func() {
defer s.headerSubmissionMtx.Unlock()
defer func() {
s.headerSubmissionMtx.Unlock()
s.logger.Debug().Time("t", time.Now()).Uint64("headers", headersNb).Msg("Header submission completed")
s.wg.Done()
}()

// Get headers with marshalled bytes from cache
headers, marshalledHeaders, err := s.cache.GetPendingHeaders(s.ctx)
Expand Down Expand Up @@ -233,10 +251,15 @@ func (s *Submitter) daSubmissionLoop() {
if dataNb > 0 {
lastSubmitNanos := s.lastDataSubmit.Load()
timeSinceLastSubmit := time.Since(time.Unix(0, lastSubmitNanos))

if s.dataSubmissionMtx.TryLock() {
s.logger.Debug().Time("t", time.Now()).Uint64("data", dataNb).Msg("Data submission in progress")
s.wg.Add(1)
go func() {
defer s.dataSubmissionMtx.Unlock()
defer func() {
s.dataSubmissionMtx.Unlock()
s.logger.Debug().Time("t", time.Now()).Uint64("data", dataNb).Msg("Data submission completed")
s.wg.Done()
}()

// Get data with marshalled bytes from cache
signedDataList, marshalledData, err := s.cache.GetPendingData(s.ctx)
Expand Down
43 changes: 43 additions & 0 deletions block/internal/syncing/assert.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package syncing

import (
"errors"
"fmt"

"github.com/evstack/ev-node/pkg/genesis"
"github.com/evstack/ev-node/types"
)

func assertExpectedProposer(genesis genesis.Genesis, proposerAddr []byte) error {
if string(proposerAddr) != string(genesis.ProposerAddress) {
return fmt.Errorf("unexpected proposer: got %x, expected %x",
proposerAddr, genesis.ProposerAddress)
}
return nil
}

func assertValidSignedData(signedData *types.SignedData, genesis genesis.Genesis) error {
if signedData == nil || signedData.Txs == nil {
return errors.New("empty signed data")
}

if err := assertExpectedProposer(genesis, signedData.Signer.Address); err != nil {
return err
}

dataBytes, err := signedData.Data.MarshalBinary()
if err != nil {
return fmt.Errorf("failed to get signed data payload: %w", err)
}

valid, err := signedData.Signer.PubKey.Verify(dataBytes, signedData.Signature)
if err != nil {
return fmt.Errorf("failed to verify signature: %w", err)
}

if !valid {
return fmt.Errorf("invalid signature")
}

return nil
}
30 changes: 2 additions & 28 deletions block/internal/syncing/da_retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,38 +346,12 @@ func (r *daRetriever) tryDecodeData(bz []byte, daHeight uint64) *types.Data {

// assertExpectedProposer validates the proposer address
func (r *daRetriever) assertExpectedProposer(proposerAddr []byte) error {
if string(proposerAddr) != string(r.genesis.ProposerAddress) {
return fmt.Errorf("unexpected proposer: got %x, expected %x",
proposerAddr, r.genesis.ProposerAddress)
}
return nil
return assertExpectedProposer(r.genesis, proposerAddr)
}

// assertValidSignedData validates signed data using the configured signature provider
func (r *daRetriever) assertValidSignedData(signedData *types.SignedData) error {
if signedData == nil || signedData.Txs == nil {
return errors.New("empty signed data")
}

if err := r.assertExpectedProposer(signedData.Signer.Address); err != nil {
return err
}

dataBytes, err := signedData.Data.MarshalBinary()
if err != nil {
return fmt.Errorf("failed to get signed data payload: %w", err)
}

valid, err := signedData.Signer.PubKey.Verify(dataBytes, signedData.Signature)
if err != nil {
return fmt.Errorf("failed to verify signature: %w", err)
}

if !valid {
return fmt.Errorf("invalid signature")
}

return nil
return assertValidSignedData(signedData, r.genesis)
}

// isEmptyDataExpected checks if empty data is expected for a header
Expand Down
Loading
Loading