Skip to content

Commit e281cfb

Browse files
committed
Backports
1 parent 7d30f97 commit e281cfb

File tree

10 files changed

+566
-71
lines changed

10 files changed

+566
-71
lines changed

.mockery.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,11 @@ packages:
3030
dir: ./test/mocks
3131
pkgname: mocks
3232
filename: store.go
33+
Batch:
34+
config:
35+
dir: ./test/mocks
36+
pkgname: mocks
37+
filename: batch.go
3338
github.com/celestiaorg/go-header:
3439
interfaces:
3540
Store:

block/internal/cache/pending_data.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package cache
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67

78
"github.com/rs/zerolog"
@@ -29,8 +30,17 @@ type PendingData struct {
2930
base *pendingBase[*types.Data]
3031
}
3132

33+
var errInFlightData = errors.New("inflight data")
34+
3235
func fetchData(ctx context.Context, store store.Store, height uint64) (*types.Data, error) {
3336
_, data, err := store.GetBlockData(ctx, height)
37+
if err != nil {
38+
return nil, err
39+
}
40+
// in the executor, WIP data is temporary stored. skip them until the process is completed
41+
if data.Height() == 0 {
42+
return nil, errInFlightData
43+
}
3444
return data, err
3545
}
3646

block/internal/cache/pending_headers.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package cache
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67

78
"github.com/rs/zerolog"
@@ -26,8 +27,17 @@ type PendingHeaders struct {
2627
base *pendingBase[*types.SignedHeader]
2728
}
2829

30+
var errInFlightHeader = errors.New("inflight header")
31+
2932
func fetchSignedHeader(ctx context.Context, store storepkg.Store, height uint64) (*types.SignedHeader, error) {
3033
header, err := store.GetHeader(ctx, height)
34+
if err != nil {
35+
return nil, err
36+
}
37+
// in the executor, WIP headers are temporary stored. skip them until the process is completed
38+
if header.Height() == 0 {
39+
return nil, errInFlightHeader
40+
}
3141
return header, err
3242
}
3343

block/internal/executing/executor.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,9 @@ func (e *Executor) initializeState() error {
227227
if err := batch.SetHeight(state.LastBlockHeight); err != nil {
228228
return fmt.Errorf("failed to set store height: %w", err)
229229
}
230+
if err := batch.UpdateState(state); err != nil {
231+
return fmt.Errorf("failed to update state: %w", err)
232+
}
230233
if err := batch.Commit(); err != nil {
231234
return fmt.Errorf("failed to commit batch: %w", err)
232235
}
@@ -236,7 +239,8 @@ func (e *Executor) initializeState() error {
236239

237240
// Sync execution layer with store on startup
238241
execReplayer := common.NewReplayer(e.store, e.exec, e.genesis, e.logger)
239-
if err := execReplayer.SyncToHeight(e.ctx, state.LastBlockHeight); err != nil {
242+
syncTargetHeight := state.LastBlockHeight
243+
if err := execReplayer.SyncToHeight(e.ctx, syncTargetHeight); err != nil {
240244
e.sendCriticalError(fmt.Errorf("failed to sync execution layer: %w", err))
241245
return fmt.Errorf("failed to sync execution layer: %w", err)
242246
}
@@ -281,7 +285,7 @@ func (e *Executor) executionLoop() {
281285
}
282286
txsAvailable := false
283287

284-
for {
288+
for e.ctx.Err() == nil {
285289
select {
286290
case <-e.ctx.Done():
287291
return
@@ -316,6 +320,10 @@ func (e *Executor) executionLoop() {
316320

317321
// ProduceBlock creates, validates, and stores a new block.
318322
func (e *Executor) ProduceBlock(ctx context.Context) error {
323+
if ctx.Err() != nil {
324+
return ctx.Err()
325+
}
326+
319327
start := time.Now()
320328
defer func() {
321329
if e.metrics.OperationDuration["block_production"] != nil {
@@ -582,7 +590,7 @@ func (e *Executor) CreateBlock(ctx context.Context, height uint64, batchData *Ba
582590
}
583591

584592
for i, tx := range batchData.Transactions {
585-
data.Txs[i] = types.Tx(tx)
593+
data.Txs[i] = tx
586594
}
587595

588596
// Set data hash

block/internal/submitting/submitter.go

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ func (s *Submitter) Start(ctx context.Context) error {
128128

129129
// Start DA submission loop if signer is available (aggregator nodes only)
130130
if s.signer != nil {
131+
s.logger.Info().Msg("starting DA submission loop")
131132
s.wg.Add(1)
132133
go func() {
133134
defer s.wg.Done()
@@ -150,7 +151,18 @@ func (s *Submitter) Stop() error {
150151
if s.cancel != nil {
151152
s.cancel()
152153
}
153-
s.wg.Wait()
154+
// Wait for goroutines to finish with a timeout to prevent hanging
155+
done := make(chan struct{})
156+
go func() {
157+
s.wg.Wait()
158+
close(done)
159+
}()
160+
select {
161+
case <-done:
162+
// All goroutines finished cleanly
163+
case <-time.After(5 * time.Second):
164+
s.logger.Warn().Msg("submitter shutdown timed out waiting for goroutines, proceeding anyway")
165+
}
154166
s.logger.Info().Msg("submitter stopped")
155167
return nil
156168
}
@@ -180,8 +192,14 @@ func (s *Submitter) daSubmissionLoop() {
180192
// For strategy decision, we need to estimate the size
181193
// We'll fetch headers to check, but only submit if strategy approves
182194
if s.headerSubmissionMtx.TryLock() {
195+
s.logger.Debug().Time("t", time.Now()).Uint64("headers", headersNb).Msg("Header submission in progress")
196+
s.wg.Add(1)
183197
go func() {
184-
defer s.headerSubmissionMtx.Unlock()
198+
defer func() {
199+
s.headerSubmissionMtx.Unlock()
200+
s.logger.Debug().Time("t", time.Now()).Uint64("headers", headersNb).Msg("Header submission completed")
201+
s.wg.Done()
202+
}()
185203

186204
// Get headers with marshalled bytes from cache
187205
headers, marshalledHeaders, err := s.cache.GetPendingHeaders(s.ctx)
@@ -233,10 +251,15 @@ func (s *Submitter) daSubmissionLoop() {
233251
if dataNb > 0 {
234252
lastSubmitNanos := s.lastDataSubmit.Load()
235253
timeSinceLastSubmit := time.Since(time.Unix(0, lastSubmitNanos))
236-
254+
s.logger.Debug().Time("t", time.Now()).Uint64("data", dataNb).Msg("Data submission in progress")
255+
s.wg.Add(1)
237256
if s.dataSubmissionMtx.TryLock() {
238257
go func() {
239-
defer s.dataSubmissionMtx.Unlock()
258+
defer func() {
259+
s.dataSubmissionMtx.Unlock()
260+
s.logger.Debug().Time("t", time.Now()).Uint64("data", dataNb).Msg("Data submission completed")
261+
s.wg.Done()
262+
}()
240263

241264
// Get data with marshalled bytes from cache
242265
signedDataList, marshalledData, err := s.cache.GetPendingData(s.ctx)

block/internal/syncing/assert.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package syncing
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
7+
"github.com/evstack/ev-node/pkg/genesis"
8+
"github.com/evstack/ev-node/types"
9+
)
10+
11+
func assertExpectedProposer(genesis genesis.Genesis, proposerAddr []byte) error {
12+
if string(proposerAddr) != string(genesis.ProposerAddress) {
13+
return fmt.Errorf("unexpected proposer: got %x, expected %x",
14+
proposerAddr, genesis.ProposerAddress)
15+
}
16+
return nil
17+
}
18+
19+
func assertValidSignedData(signedData *types.SignedData, genesis genesis.Genesis) error {
20+
if signedData == nil || signedData.Txs == nil {
21+
return errors.New("empty signed data")
22+
}
23+
24+
if err := assertExpectedProposer(genesis, signedData.Signer.Address); err != nil {
25+
return err
26+
}
27+
28+
dataBytes, err := signedData.Data.MarshalBinary()
29+
if err != nil {
30+
return fmt.Errorf("failed to get signed data payload: %w", err)
31+
}
32+
33+
valid, err := signedData.Signer.PubKey.Verify(dataBytes, signedData.Signature)
34+
if err != nil {
35+
return fmt.Errorf("failed to verify signature: %w", err)
36+
}
37+
38+
if !valid {
39+
return fmt.Errorf("invalid signature")
40+
}
41+
42+
return nil
43+
}

block/internal/syncing/da_retriever.go

Lines changed: 2 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -346,38 +346,12 @@ func (r *daRetriever) tryDecodeData(bz []byte, daHeight uint64) *types.Data {
346346

347347
// assertExpectedProposer validates the proposer address
348348
func (r *daRetriever) assertExpectedProposer(proposerAddr []byte) error {
349-
if string(proposerAddr) != string(r.genesis.ProposerAddress) {
350-
return fmt.Errorf("unexpected proposer: got %x, expected %x",
351-
proposerAddr, r.genesis.ProposerAddress)
352-
}
353-
return nil
349+
return assertExpectedProposer(r.genesis, proposerAddr)
354350
}
355351

356352
// assertValidSignedData validates signed data using the configured signature provider
357353
func (r *daRetriever) assertValidSignedData(signedData *types.SignedData) error {
358-
if signedData == nil || signedData.Txs == nil {
359-
return errors.New("empty signed data")
360-
}
361-
362-
if err := r.assertExpectedProposer(signedData.Signer.Address); err != nil {
363-
return err
364-
}
365-
366-
dataBytes, err := signedData.Data.MarshalBinary()
367-
if err != nil {
368-
return fmt.Errorf("failed to get signed data payload: %w", err)
369-
}
370-
371-
valid, err := signedData.Signer.PubKey.Verify(dataBytes, signedData.Signature)
372-
if err != nil {
373-
return fmt.Errorf("failed to verify signature: %w", err)
374-
}
375-
376-
if !valid {
377-
return fmt.Errorf("invalid signature")
378-
}
379-
380-
return nil
354+
return assertValidSignedData(signedData, r.genesis)
381355
}
382356

383357
// isEmptyDataExpected checks if empty data is expected for a header

0 commit comments

Comments
 (0)