Skip to content

Commit 0fa3370

Browse files
authored
feat(sequencers/based): add based batch time (#2911)
<!-- Please read and fill out this form before submitting your PR. Please make sure you have reviewed our contributors guide before submitting your first PR. NOTE: PR titles should follow semantic commits: https://www.conventionalcommits.org/en/v1.0.0/ --> ## Overview ref: #2906 Add based block time based on epoch da time. DO NOT MERGE YET, based on #2908 to avoid conflicts later. <!-- Please provide an explanation of the PR, including the appropriate context, background, goal, and rationale. If there is an issue with this information, please provide a tl;dr and link the issue. Ex: Closes #<issue number> -->
1 parent f1aa2cb commit 0fa3370

File tree

5 files changed

+388
-7
lines changed

5 files changed

+388
-7
lines changed

block/internal/da/client_test.go

Lines changed: 223 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -523,3 +523,226 @@ func TestClient_Retrieve_Timeout(t *testing.T) {
523523
assert.Assert(t, result.Message != "")
524524
})
525525
}
526+
527+
func TestClient_RetrieveHeaders(t *testing.T) {
528+
logger := zerolog.Nop()
529+
dataLayerHeight := uint64(100)
530+
mockIDs := [][]byte{[]byte("id1")}
531+
mockBlobs := [][]byte{[]byte("header-blob")}
532+
mockTimestamp := time.Now()
533+
534+
mockDAInstance := &mockDA{
535+
getIDsFunc: func(ctx context.Context, height uint64, namespace []byte) (*coreda.GetIDsResult, error) {
536+
return &coreda.GetIDsResult{
537+
IDs: mockIDs,
538+
Timestamp: mockTimestamp,
539+
}, nil
540+
},
541+
getFunc: func(ctx context.Context, ids []coreda.ID, namespace []byte) ([]coreda.Blob, error) {
542+
return mockBlobs, nil
543+
},
544+
}
545+
546+
client := NewClient(Config{
547+
DA: mockDAInstance,
548+
Logger: logger,
549+
Namespace: "test-header-ns",
550+
DataNamespace: "test-data-ns",
551+
})
552+
553+
result := client.RetrieveHeaders(context.Background(), dataLayerHeight)
554+
555+
assert.Equal(t, coreda.StatusSuccess, result.Code)
556+
assert.Equal(t, dataLayerHeight, result.Height)
557+
assert.Equal(t, len(mockBlobs), len(result.Data))
558+
}
559+
560+
func TestClient_RetrieveData(t *testing.T) {
561+
logger := zerolog.Nop()
562+
dataLayerHeight := uint64(200)
563+
mockIDs := [][]byte{[]byte("id1"), []byte("id2")}
564+
mockBlobs := [][]byte{[]byte("data-blob-1"), []byte("data-blob-2")}
565+
mockTimestamp := time.Now()
566+
567+
mockDAInstance := &mockDA{
568+
getIDsFunc: func(ctx context.Context, height uint64, namespace []byte) (*coreda.GetIDsResult, error) {
569+
return &coreda.GetIDsResult{
570+
IDs: mockIDs,
571+
Timestamp: mockTimestamp,
572+
}, nil
573+
},
574+
getFunc: func(ctx context.Context, ids []coreda.ID, namespace []byte) ([]coreda.Blob, error) {
575+
return mockBlobs, nil
576+
},
577+
}
578+
579+
client := NewClient(Config{
580+
DA: mockDAInstance,
581+
Logger: logger,
582+
Namespace: "test-header-ns",
583+
DataNamespace: "test-data-ns",
584+
})
585+
586+
result := client.RetrieveData(context.Background(), dataLayerHeight)
587+
588+
assert.Equal(t, coreda.StatusSuccess, result.Code)
589+
assert.Equal(t, dataLayerHeight, result.Height)
590+
assert.Equal(t, len(mockBlobs), len(result.Data))
591+
}
592+
593+
func TestClient_RetrieveBatched(t *testing.T) {
594+
logger := zerolog.Nop()
595+
dataLayerHeight := uint64(100)
596+
597+
// Create 200 IDs to exceed default batch size
598+
numIDs := 200
599+
mockIDs := make([][]byte, numIDs)
600+
for i := range numIDs {
601+
mockIDs[i] = []byte{byte(i)}
602+
}
603+
604+
// Track which batches were requested
605+
batchCalls := []int{}
606+
607+
mockDAInstance := &mockDA{
608+
getIDsFunc: func(ctx context.Context, height uint64, namespace []byte) (*coreda.GetIDsResult, error) {
609+
return &coreda.GetIDsResult{
610+
IDs: mockIDs,
611+
Timestamp: time.Now(),
612+
}, nil
613+
},
614+
getFunc: func(ctx context.Context, ids []coreda.ID, namespace []byte) ([]coreda.Blob, error) {
615+
batchCalls = append(batchCalls, len(ids))
616+
// Return a blob for each ID in the batch
617+
blobs := make([][]byte, len(ids))
618+
for i := range ids {
619+
blobs[i] = []byte("blob")
620+
}
621+
return blobs, nil
622+
},
623+
}
624+
625+
client := NewClient(Config{
626+
DA: mockDAInstance,
627+
Logger: logger,
628+
Namespace: "test-ns",
629+
DataNamespace: "test-data-ns",
630+
RetrieveBatchSize: 50, // Set smaller batch size for testing
631+
})
632+
633+
encodedNamespace := coreda.NamespaceFromString("test-ns")
634+
result := client.Retrieve(context.Background(), dataLayerHeight, encodedNamespace.Bytes())
635+
636+
assert.Equal(t, coreda.StatusSuccess, result.Code)
637+
assert.Equal(t, numIDs, len(result.Data))
638+
639+
// Should have made 4 batches: 50 + 50 + 50 + 50 = 200
640+
assert.Equal(t, 4, len(batchCalls))
641+
assert.Equal(t, 50, batchCalls[0])
642+
assert.Equal(t, 50, batchCalls[1])
643+
assert.Equal(t, 50, batchCalls[2])
644+
assert.Equal(t, 50, batchCalls[3])
645+
}
646+
647+
func TestClient_RetrieveBatched_PartialBatch(t *testing.T) {
648+
logger := zerolog.Nop()
649+
dataLayerHeight := uint64(100)
650+
651+
// Create 175 IDs to test partial batch at the end
652+
numIDs := 175
653+
mockIDs := make([][]byte, numIDs)
654+
for i := range numIDs {
655+
mockIDs[i] = []byte{byte(i)}
656+
}
657+
658+
batchCalls := []int{}
659+
660+
mockDAInstance := &mockDA{
661+
getIDsFunc: func(ctx context.Context, height uint64, namespace []byte) (*coreda.GetIDsResult, error) {
662+
return &coreda.GetIDsResult{
663+
IDs: mockIDs,
664+
Timestamp: time.Now(),
665+
}, nil
666+
},
667+
getFunc: func(ctx context.Context, ids []coreda.ID, namespace []byte) ([]coreda.Blob, error) {
668+
batchCalls = append(batchCalls, len(ids))
669+
blobs := make([][]byte, len(ids))
670+
for i := range ids {
671+
blobs[i] = []byte("blob")
672+
}
673+
return blobs, nil
674+
},
675+
}
676+
677+
client := NewClient(Config{
678+
DA: mockDAInstance,
679+
Logger: logger,
680+
Namespace: "test-ns",
681+
DataNamespace: "test-data-ns",
682+
RetrieveBatchSize: 50,
683+
})
684+
685+
encodedNamespace := coreda.NamespaceFromString("test-ns")
686+
result := client.Retrieve(context.Background(), dataLayerHeight, encodedNamespace.Bytes())
687+
688+
assert.Equal(t, coreda.StatusSuccess, result.Code)
689+
assert.Equal(t, numIDs, len(result.Data))
690+
691+
// Should have made 4 batches: 50 + 50 + 50 + 25 = 175
692+
assert.Equal(t, 4, len(batchCalls))
693+
assert.Equal(t, 50, batchCalls[0])
694+
assert.Equal(t, 50, batchCalls[1])
695+
assert.Equal(t, 50, batchCalls[2])
696+
assert.Equal(t, 25, batchCalls[3]) // Partial batch
697+
}
698+
699+
func TestClient_RetrieveBatched_ErrorInSecondBatch(t *testing.T) {
700+
logger := zerolog.Nop()
701+
dataLayerHeight := uint64(100)
702+
703+
// Create 200 IDs to require multiple batches
704+
numIDs := 200
705+
mockIDs := make([][]byte, numIDs)
706+
for i := range numIDs {
707+
mockIDs[i] = []byte{byte(i)}
708+
}
709+
710+
batchCallCount := 0
711+
712+
mockDAInstance := &mockDA{
713+
getIDsFunc: func(ctx context.Context, height uint64, namespace []byte) (*coreda.GetIDsResult, error) {
714+
return &coreda.GetIDsResult{
715+
IDs: mockIDs,
716+
Timestamp: time.Now(),
717+
}, nil
718+
},
719+
getFunc: func(ctx context.Context, ids []coreda.ID, namespace []byte) ([]coreda.Blob, error) {
720+
batchCallCount++
721+
// Fail on second batch
722+
if batchCallCount == 2 {
723+
return nil, errors.New("network error in batch 2")
724+
}
725+
blobs := make([][]byte, len(ids))
726+
for i := range ids {
727+
blobs[i] = []byte("blob")
728+
}
729+
return blobs, nil
730+
},
731+
}
732+
733+
client := NewClient(Config{
734+
DA: mockDAInstance,
735+
Logger: logger,
736+
Namespace: "test-ns",
737+
DataNamespace: "test-data-ns",
738+
RetrieveBatchSize: 50,
739+
})
740+
741+
encodedNamespace := coreda.NamespaceFromString("test-ns")
742+
result := client.Retrieve(context.Background(), dataLayerHeight, encodedNamespace.Bytes())
743+
744+
assert.Equal(t, coreda.StatusError, result.Code)
745+
assert.Assert(t, result.Message != "")
746+
// Error message should mention the batch range
747+
assert.Assert(t, len(result.Message) > 0)
748+
}

block/internal/da/forced_inclusion_retriever.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"errors"
66
"fmt"
7+
"time"
78

89
"github.com/rs/zerolog"
910

@@ -25,6 +26,7 @@ type ForcedInclusionRetriever struct {
2526

2627
// ForcedInclusionEvent contains forced inclusion transactions retrieved from DA.
2728
type ForcedInclusionEvent struct {
29+
Timestamp time.Time
2830
StartDaHeight uint64
2931
EndDaHeight uint64
3032
Txs [][]byte
@@ -158,6 +160,10 @@ func (r *ForcedInclusionRetriever) processForcedInclusionBlobs(
158160
}
159161
}
160162

163+
if result.Timestamp.After(event.Timestamp) {
164+
event.Timestamp = result.Timestamp
165+
}
166+
161167
r.logger.Debug().
162168
Uint64("height", height).
163169
Int("blob_count", len(result.Data)).

block/internal/da/forced_inclusion_retriever_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,7 @@ func TestForcedInclusionRetriever_RetrieveForcedIncludedTxs_MultiHeightEpoch(t *
244244
assert.Assert(t, event != nil)
245245
assert.Equal(t, event.StartDaHeight, uint64(100))
246246
assert.Equal(t, event.EndDaHeight, uint64(102))
247+
assert.Assert(t, event.Timestamp.After(time.Time{}))
247248

248249
// Should have collected all txs from all heights
249250
expectedTxCount := len(testBlobsByHeight[100]) + len(testBlobsByHeight[101]) + len(testBlobsByHeight[102])
@@ -334,6 +335,7 @@ func TestForcedInclusionRetriever_processForcedInclusionBlobs(t *testing.T) {
334335
} else {
335336
assert.NilError(t, err)
336337
assert.Equal(t, len(event.Txs), tt.expectedTxCount)
338+
assert.Equal(t, event.Timestamp, time.Time{})
337339
}
338340
})
339341
}

sequencers/based/sequencer.go

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ type BasedSequencer struct {
3737

3838
// Cached transactions from the current DA block being processed
3939
currentBatchTxs [][]byte
40+
// DA epoch end time for timestamp calculation
41+
currentDAEndTime time.Time
4042
}
4143

4244
// NewBasedSequencer creates a new based sequencer instance
@@ -97,13 +99,15 @@ func (s *BasedSequencer) GetNextBatch(ctx context.Context, req coresequencer.Get
9799
// If we have no cached transactions or we've consumed all from the current DA block,
98100
// fetch the next DA epoch
99101
daHeight := s.GetDAHeight()
102+
100103
if len(s.currentBatchTxs) == 0 || s.checkpoint.TxIndex >= uint64(len(s.currentBatchTxs)) {
101-
daEndHeight, err := s.fetchNextDAEpoch(ctx, req.MaxBytes)
104+
daEndTime, daEndHeight, err := s.fetchNextDAEpoch(ctx, req.MaxBytes)
102105
if err != nil {
103106
return nil, err
104107
}
105108

106109
daHeight = daEndHeight
110+
s.currentDAEndTime = daEndTime
107111
}
108112

109113
// Create batch from current position up to MaxBytes
@@ -129,15 +133,21 @@ func (s *BasedSequencer) GetNextBatch(ctx context.Context, req coresequencer.Get
129133
}
130134
}
131135

136+
// Calculate timestamp based on remaining transactions after this batch
137+
// timestamp correspond to the last block time of a DA epoch, based on the remaining transactions to be executed
138+
// this is done in order to handle the case where a DA epoch must fit in multiple blocks
139+
remainingTxs := uint64(len(s.currentBatchTxs)) - s.checkpoint.TxIndex
140+
timestamp := s.currentDAEndTime.Add(-time.Duration(remainingTxs) * time.Millisecond)
141+
132142
return &coresequencer.GetNextBatchResponse{
133143
Batch: batch,
134-
Timestamp: time.Time{}, // TODO(@julienrbrt): we need to use DA block timestamp for determinism
144+
Timestamp: timestamp,
135145
BatchData: req.LastBatchData,
136146
}, nil
137147
}
138148

139149
// fetchNextDAEpoch fetches transactions from the next DA epoch
140-
func (s *BasedSequencer) fetchNextDAEpoch(ctx context.Context, maxBytes uint64) (uint64, error) {
150+
func (s *BasedSequencer) fetchNextDAEpoch(ctx context.Context, maxBytes uint64) (time.Time, uint64, error) {
141151
currentDAHeight := s.checkpoint.DAHeight
142152

143153
s.logger.Debug().
@@ -149,16 +159,16 @@ func (s *BasedSequencer) fetchNextDAEpoch(ctx context.Context, maxBytes uint64)
149159
if err != nil {
150160
// Check if forced inclusion is not configured
151161
if errors.Is(err, block.ErrForceInclusionNotConfigured) {
152-
return 0, block.ErrForceInclusionNotConfigured
162+
return time.Time{}, 0, block.ErrForceInclusionNotConfigured
153163
} else if errors.Is(err, coreda.ErrHeightFromFuture) {
154164
// If we get a height from future error, stay at current position
155165
// We'll retry the same height on the next call until DA produces that block
156166
s.logger.Debug().
157167
Uint64("da_height", currentDAHeight).
158168
Msg("DA height from future, waiting for DA to produce block")
159-
return 0, nil
169+
return time.Time{}, 0, nil
160170
}
161-
return 0, fmt.Errorf("failed to retrieve forced inclusion transactions: %w", err)
171+
return time.Time{}, 0, fmt.Errorf("failed to retrieve forced inclusion transactions: %w", err)
162172
}
163173

164174
// Validate and filter transactions
@@ -188,7 +198,7 @@ func (s *BasedSequencer) fetchNextDAEpoch(ctx context.Context, maxBytes uint64)
188198
// Cache the transactions for this DA epoch
189199
s.currentBatchTxs = validTxs
190200

191-
return forcedTxsEvent.EndDaHeight, nil
201+
return forcedTxsEvent.Timestamp.UTC(), forcedTxsEvent.EndDaHeight, nil
192202
}
193203

194204
// createBatchFromCheckpoint creates a batch from the current checkpoint position respecting MaxBytes

0 commit comments

Comments
 (0)