Skip to content

Commit e57eb44

Browse files
committed
rebase from merge problem 2
1 parent 41fc2b9 commit e57eb44

File tree

6 files changed

+77
-63
lines changed

6 files changed

+77
-63
lines changed

block/internal/syncing/da_retriever.go

Lines changed: 61 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -11,21 +11,26 @@ import (
1111

1212
"github.com/evstack/ev-node/block/internal/cache"
1313
"github.com/evstack/ev-node/block/internal/common"
14-
dapkg "github.com/evstack/ev-node/da"
14+
da "github.com/evstack/ev-node/da"
1515
"github.com/evstack/ev-node/pkg/config"
1616
"github.com/evstack/ev-node/pkg/genesis"
1717
"github.com/evstack/ev-node/types"
1818
pb "github.com/evstack/ev-node/types/pb/evnode/v1"
1919
)
2020

21-
// DARetriever handles DA retrieval operations for syncing
22-
type DARetriever struct {
23-
da dapkg.DA
24-
cache cache.Manager
21+
// DARetriever defines the interface for retrieving events from the DA layer
22+
type DARetriever interface {
23+
RetrieveFromDA(ctx context.Context, daHeight uint64) ([]common.DAHeightEvent, error)
24+
}
25+
26+
// daRetriever handles DA retrieval operations for syncing
27+
type daRetriever struct {
28+
da da.DA
29+
cache cache.CacheManager
2530
genesis genesis.Genesis
2631
logger zerolog.Logger
2732

28-
// calculate namespaces bytes once and reuse them
33+
// namespace bytes calculated once
2934
namespaceBz []byte
3035
namespaceDataBz []byte
3136

@@ -37,26 +42,26 @@ type DARetriever struct {
3742

3843
// NewDARetriever creates a new DA retriever
3944
func NewDARetriever(
40-
da dapkg.DA,
41-
cache cache.Manager,
45+
daClient da.DA,
46+
cache cache.CacheManager,
4247
config config.Config,
4348
genesis genesis.Genesis,
4449
logger zerolog.Logger,
45-
) *DARetriever {
46-
return &DARetriever{
47-
da: da,
50+
) *daRetriever {
51+
return &daRetriever{
52+
da: daClient,
4853
cache: cache,
4954
genesis: genesis,
5055
logger: logger.With().Str("component", "da_retriever").Logger(),
51-
namespaceBz: dapkg.NamespaceFromString(config.DA.GetNamespace()).Bytes(),
52-
namespaceDataBz: dapkg.NamespaceFromString(config.DA.GetDataNamespace()).Bytes(),
56+
namespaceBz: da.NamespaceFromString(config.DA.GetNamespace()).Bytes(),
57+
namespaceDataBz: da.NamespaceFromString(config.DA.GetDataNamespace()).Bytes(),
5358
pendingHeaders: make(map[uint64]*types.SignedHeader),
5459
pendingData: make(map[uint64]*types.Data),
5560
}
5661
}
5762

5863
// RetrieveFromDA retrieves blocks from the specified DA height and returns height events
59-
func (r *DARetriever) RetrieveFromDA(ctx context.Context, daHeight uint64) ([]common.DAHeightEvent, error) {
64+
func (r *daRetriever) RetrieveFromDA(ctx context.Context, daHeight uint64) ([]common.DAHeightEvent, error) {
6065
r.logger.Debug().Uint64("da_height", daHeight).Msg("retrieving from DA")
6166
blobsResp, err := r.fetchBlobs(ctx, daHeight)
6267
if err != nil {
@@ -72,9 +77,9 @@ func (r *DARetriever) RetrieveFromDA(ctx context.Context, daHeight uint64) ([]co
7277
return r.processBlobs(ctx, blobsResp.Data, daHeight), nil
7378
}
7479

75-
// fetchBlobs retrieves blobs from the DA layer
76-
func (r *DARetriever) fetchBlobs(ctx context.Context, daHeight uint64) (dapkg.ResultRetrieve, error) {
77-
// Retrieve from both namespaces
80+
// fetchBlobs retrieves blobs from both header and data namespaces
81+
func (r *daRetriever) fetchBlobs(ctx context.Context, daHeight uint64) (da.ResultRetrieve, error) {
82+
// Retrieve from header namespace
7883
headerRes := r.da.Retrieve(ctx, daHeight, r.namespaceBz)
7984

8085
// If namespaces are the same, return header result
@@ -87,57 +92,57 @@ func (r *DARetriever) fetchBlobs(ctx context.Context, daHeight uint64) (dapkg.Re
8792
// Validate responses
8893
headerErr := r.validateBlobResponse(headerRes, daHeight)
8994
// ignoring error not found, as data can have data
90-
if headerErr != nil && !errors.Is(headerErr, dapkg.ErrBlobNotFound) {
95+
if headerErr != nil && !errors.Is(headerErr, da.ErrBlobNotFound) {
9196
return headerRes, headerErr
9297
}
9398

9499
dataErr := r.validateBlobResponse(dataRes, daHeight)
95100
// ignoring error not found, as header can have data
96-
if dataErr != nil && !errors.Is(dataErr, dapkg.ErrBlobNotFound) {
101+
if dataErr != nil && !errors.Is(dataErr, da.ErrBlobNotFound) {
97102
return dataRes, dataErr
98103
}
99104

100105
// Combine successful results
101-
combinedResult := dapkg.ResultRetrieve{
102-
BaseResult: dapkg.BaseResult{
103-
Code: dapkg.StatusSuccess,
106+
combinedResult := da.ResultRetrieve{
107+
BaseResult: da.BaseResult{
108+
Code: da.StatusSuccess,
104109
Height: daHeight,
105110
},
106111
Data: make([][]byte, 0),
107112
}
108113

109-
if headerRes.Code == dapkg.StatusSuccess {
114+
if headerRes.Code == da.StatusSuccess {
110115
combinedResult.Data = append(combinedResult.Data, headerRes.Data...)
111116
combinedResult.IDs = append(combinedResult.IDs, headerRes.IDs...)
112117
}
113118

114-
if dataRes.Code == dapkg.StatusSuccess {
119+
if dataRes.Code == da.StatusSuccess {
115120
combinedResult.Data = append(combinedResult.Data, dataRes.Data...)
116121
combinedResult.IDs = append(combinedResult.IDs, dataRes.IDs...)
117122
}
118123

119124
// Re-throw error not found if both were not found.
120125
if len(combinedResult.Data) == 0 && len(combinedResult.IDs) == 0 {
121126
r.logger.Debug().Uint64("da_height", daHeight).Msg("no blob data found")
122-
combinedResult.Code = dapkg.StatusNotFound
123-
combinedResult.Message = dapkg.ErrBlobNotFound.Error()
124-
return combinedResult, dapkg.ErrBlobNotFound
127+
combinedResult.Code = da.StatusNotFound
128+
combinedResult.Message = da.ErrBlobNotFound.Error()
129+
return combinedResult, da.ErrBlobNotFound
125130
}
126131

127132
return combinedResult, nil
128133
}
129134

130135
// validateBlobResponse validates a blob response from DA layer
131-
// those are the only error code returned by dapkg.RetrieveWithHelpers
132-
func (r *DARetriever) validateBlobResponse(res dapkg.ResultRetrieve, daHeight uint64) error {
136+
// those are the only error code returned by da.RetrieveWithHelpers
137+
func (r *daRetriever) validateBlobResponse(res da.ResultRetrieve, daHeight uint64) error {
133138
switch res.Code {
134-
case dapkg.StatusError:
139+
case da.StatusError:
135140
return fmt.Errorf("DA retrieval failed: %s", res.Message)
136-
case dapkg.StatusHeightFromFuture:
137-
return fmt.Errorf("%w: height from future", dapkg.ErrHeightFromFuture)
138-
case dapkg.StatusNotFound:
139-
return fmt.Errorf("%w: blob not found", dapkg.ErrBlobNotFound)
140-
case dapkg.StatusSuccess:
141+
case da.StatusHeightFromFuture:
142+
return fmt.Errorf("%w: height from future", da.ErrHeightFromFuture)
143+
case da.StatusNotFound:
144+
return fmt.Errorf("%w: blob not found", da.ErrBlobNotFound)
145+
case da.StatusSuccess:
141146
r.logger.Debug().Uint64("da_height", daHeight).Msg("successfully retrieved from DA")
142147
return nil
143148
default:
@@ -146,7 +151,7 @@ func (r *DARetriever) validateBlobResponse(res dapkg.ResultRetrieve, daHeight ui
146151
}
147152

148153
// processBlobs processes retrieved blobs to extract headers and data and returns height events
149-
func (r *DARetriever) processBlobs(ctx context.Context, blobs [][]byte, daHeight uint64) []common.DAHeightEvent {
154+
func (r *daRetriever) processBlobs(ctx context.Context, blobs [][]byte, daHeight uint64) []common.DAHeightEvent {
150155
// Decode all blobs
151156
for _, bz := range blobs {
152157
if len(bz) == 0 {
@@ -207,15 +212,28 @@ func (r *DARetriever) processBlobs(ctx context.Context, blobs [][]byte, daHeight
207212
}
208213

209214
events = append(events, event)
215+
}
210216

211-
r.logger.Info().Uint64("height", height).Uint64("da_height", daHeight).Msg("processed block from DA")
217+
if len(events) > 0 {
218+
startHeight := events[0].Header.Height()
219+
endHeight := events[0].Header.Height()
220+
for _, event := range events {
221+
h := event.Header.Height()
222+
if h < startHeight {
223+
startHeight = h
224+
}
225+
if h > endHeight {
226+
endHeight = h
227+
}
228+
}
229+
r.logger.Info().Uint64("da_height", daHeight).Uint64("start_height", startHeight).Uint64("end_height", endHeight).Msg("processed blocks from DA")
212230
}
213231

214232
return events
215233
}
216234

217235
// tryDecodeHeader attempts to decode a blob as a header
218-
func (r *DARetriever) tryDecodeHeader(bz []byte, daHeight uint64) *types.SignedHeader {
236+
func (r *daRetriever) tryDecodeHeader(bz []byte, daHeight uint64) *types.SignedHeader {
219237
header := new(types.SignedHeader)
220238
var headerPb pb.SignedHeader
221239

@@ -245,7 +263,7 @@ func (r *DARetriever) tryDecodeHeader(bz []byte, daHeight uint64) *types.SignedH
245263
headerHash := header.Hash().String()
246264
r.cache.SetHeaderDAIncluded(headerHash, daHeight, header.Height())
247265

248-
r.logger.Info().
266+
r.logger.Debug().
249267
Str("header_hash", headerHash).
250268
Uint64("da_height", daHeight).
251269
Uint64("height", header.Height()).
@@ -255,7 +273,7 @@ func (r *DARetriever) tryDecodeHeader(bz []byte, daHeight uint64) *types.SignedH
255273
}
256274

257275
// tryDecodeData attempts to decode a blob as signed data
258-
func (r *DARetriever) tryDecodeData(bz []byte, daHeight uint64) *types.Data {
276+
func (r *daRetriever) tryDecodeData(bz []byte, daHeight uint64) *types.Data {
259277
var signedData types.SignedData
260278
if err := signedData.UnmarshalBinary(bz); err != nil {
261279
return nil
@@ -276,7 +294,7 @@ func (r *DARetriever) tryDecodeData(bz []byte, daHeight uint64) *types.Data {
276294
dataHash := signedData.Data.DACommitment().String()
277295
r.cache.SetDataDAIncluded(dataHash, daHeight, signedData.Height())
278296

279-
r.logger.Info().
297+
r.logger.Debug().
280298
Str("data_hash", dataHash).
281299
Uint64("da_height", daHeight).
282300
Uint64("height", signedData.Height()).
@@ -286,7 +304,7 @@ func (r *DARetriever) tryDecodeData(bz []byte, daHeight uint64) *types.Data {
286304
}
287305

288306
// assertExpectedProposer validates the proposer address
289-
func (r *DARetriever) assertExpectedProposer(proposerAddr []byte) error {
307+
func (r *daRetriever) assertExpectedProposer(proposerAddr []byte) error {
290308
if string(proposerAddr) != string(r.genesis.ProposerAddress) {
291309
return fmt.Errorf("unexpected proposer: got %x, expected %x",
292310
proposerAddr, r.genesis.ProposerAddress)
@@ -295,7 +313,7 @@ func (r *DARetriever) assertExpectedProposer(proposerAddr []byte) error {
295313
}
296314

297315
// assertValidSignedData validates signed data using the configured signature provider
298-
func (r *DARetriever) assertValidSignedData(signedData *types.SignedData) error {
316+
func (r *daRetriever) assertValidSignedData(signedData *types.SignedData) error {
299317
if signedData == nil || signedData.Txs == nil {
300318
return errors.New("empty signed data")
301319
}

block/internal/syncing/da_retriever_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,7 @@ func TestDARetriever_tryDecodeData_InvalidSignatureOrProposer(t *testing.T) {
270270
}
271271

272272
func TestDARetriever_validateBlobResponse(t *testing.T) {
273-
r := &DARetriever{logger: zerolog.Nop()}
273+
r := &daRetriever{logger: zerolog.Nop()}
274274
// StatusSuccess -> nil
275275
err := r.validateBlobResponse(da.ResultRetrieve{BaseResult: da.BaseResult{Code: da.StatusSuccess}}, 1)
276276
assert.NoError(t, err)

block/internal/syncing/p2p_handler.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,11 @@ import (
1515
"github.com/evstack/ev-node/types"
1616
)
1717

18+
type p2pHandler interface {
19+
ProcessHeight(ctx context.Context, height uint64, heightInCh chan<- common.DAHeightEvent) error
20+
SetProcessedHeight(height uint64)
21+
}
22+
1823
// P2PHandler coordinates block retrieval from P2P stores for the syncer.
1924
// It waits for both header and data to be available at a given height,
2025
// validates their consistency, and emits events to the syncer for processing.

block/internal/syncing/syncer.go

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,6 @@ import (
2525
"github.com/evstack/ev-node/types"
2626
)
2727

28-
type daRetriever interface {
29-
RetrieveFromDA(ctx context.Context, daHeight uint64) ([]common.DAHeightEvent, error)
30-
}
31-
32-
type p2pHandler interface {
33-
ProcessHeight(ctx context.Context, height uint64, heightInCh chan<- common.DAHeightEvent) error
34-
SetProcessedHeight(height uint64)
35-
}
36-
3728
// Syncer handles block synchronization from DA and P2P sources.
3829
type Syncer struct {
3930
// Core components
@@ -42,7 +33,7 @@ type Syncer struct {
4233
da da.DA
4334

4435
// Shared components
45-
cache cache.Manager
36+
cache cache.CacheManager
4637
metrics *common.Metrics
4738

4839
// Configuration
@@ -65,8 +56,8 @@ type Syncer struct {
6556
errorCh chan<- error // Channel to report critical execution client failures
6657

6758
// Handlers
68-
daRetrieverHandler daRetriever
69-
p2pHandler p2pHandler
59+
daRetriever DARetriever
60+
p2pHandler p2pHandler
7061

7162
// Logging
7263
logger zerolog.Logger
@@ -85,7 +76,7 @@ func NewSyncer(
8576
store store.Store,
8677
exec coreexecutor.Executor,
8778
da da.DA,
88-
cache cache.Manager,
79+
cache cache.CacheManager,
8980
metrics *common.Metrics,
9081
config config.Config,
9182
genesis genesis.Genesis,
@@ -123,7 +114,7 @@ func (s *Syncer) Start(ctx context.Context) error {
123114
}
124115

125116
// Initialize handlers
126-
s.daRetrieverHandler = NewDARetriever(s.da, s.cache, s.config, s.genesis, s.logger)
117+
s.daRetriever = NewDARetriever(s.da, s.cache, s.config, s.genesis, s.logger)
127118
s.p2pHandler = NewP2PHandler(s.headerStore.Store(), s.dataStore.Store(), s.cache, s.genesis, s.logger)
128119
if currentHeight, err := s.store.Height(s.ctx); err != nil {
129120
s.logger.Error().Err(err).Msg("failed to set initial processed height for p2p handler")
@@ -290,7 +281,7 @@ func (s *Syncer) fetchDAUntilCaughtUp() error {
290281

291282
daHeight := max(s.daRetrieverHeight.Load(), s.cache.DaHeight())
292283

293-
events, err := s.daRetrieverHandler.RetrieveFromDA(s.ctx, daHeight)
284+
events, err := s.daRetriever.RetrieveFromDA(s.ctx, daHeight)
294285
if err != nil {
295286
switch {
296287
case errors.Is(err, da.ErrBlobNotFound):

block/internal/syncing/syncer_benchmark_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ func BenchmarkSyncerIO(b *testing.B) {
5858
}
5959
require.Len(b, fixt.s.heightInCh, 0)
6060

61-
assert.Equal(b, spec.heights+daHeightOffset, fixt.s.daHeight.Load())
61+
assert.Equal(b, spec.heights+daHeightOffset, fixt.s.daRetrieverHeight.Load())
6262
gotStoreHeight, err := fixt.s.store.Height(b.Context())
6363
require.NoError(b, err)
6464
assert.Equal(b, spec.heights, gotStoreHeight)

block/internal/syncing/syncer_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -419,7 +419,7 @@ func TestSyncLoopPersistState(t *testing.T) {
419419
requireEmptyChan(t, errorCh)
420420

421421
t.Log("sync workers on instance1 completed")
422-
require.Equal(t, myFutureDAHeight, syncerInst1.daHeight.Load())
422+
require.Equal(t, myFutureDAHeight, syncerInst1.daRetrieverHeight.Load())
423423

424424
// wait for all events consumed
425425
require.NoError(t, cacheMgr.SaveToDisk())
@@ -469,7 +469,7 @@ func TestSyncLoopPersistState(t *testing.T) {
469469
Run(func(arg mock.Arguments) {
470470
cancel()
471471
// retrieve last one again
472-
assert.Equal(t, syncerInst2.daHeight.Load(), arg.Get(1).(uint64))
472+
assert.Equal(t, syncerInst2.daRetrieverHeight.Load(), arg.Get(1).(uint64))
473473
}).
474474
Return(nil, nil)
475475

@@ -617,7 +617,7 @@ func TestSyncer_InitializeState_CallsReplayer(t *testing.T) {
617617
exec: mockExec,
618618
genesis: gen,
619619
lastState: &atomic.Pointer[types.State]{},
620-
daHeight: &atomic.Uint64{},
620+
daRetrieverHeight: &atomic.Uint64{},
621621
logger: zerolog.Nop(),
622622
ctx: context.Background(),
623623
cache: cm,

0 commit comments

Comments
 (0)