Skip to content

Commit d4a2da9

Browse files
authored
fix(sequencers/single): deterministic queue (#2938)
## Overview State should be same after restart. Note to reviewer: This PR contains is a breaking change the the queue store. I am not sure if we need a migration.
1 parent 742b8bb commit d4a2da9

File tree

3 files changed

+325
-50
lines changed

3 files changed

+325
-50
lines changed

sequencers/single/queue.go

Lines changed: 115 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,11 @@ package single
22

33
import (
44
"context"
5+
"encoding/binary"
56
"encoding/hex"
67
"errors"
78
"fmt"
9+
"strconv"
810
"sync"
911

1012
ds "github.com/ipfs/go-datastore"
@@ -20,26 +22,52 @@ import (
2022
// ErrQueueFull is returned when the batch queue has reached its maximum size
2123
var ErrQueueFull = errors.New("batch queue is full")
2224

25+
// initialSeqNum is the starting sequence number for new queues.
26+
// It is set to the middle of the uint64 range to allow for both
27+
// appending (incrementing) and prepending (decrementing) transactions.
28+
const initialSeqNum = uint64(0x8000000000000000)
29+
30+
// queuedItem holds a batch and its associated persistence key
31+
type queuedItem struct {
32+
Batch coresequencer.Batch
33+
Key string
34+
}
35+
2336
// BatchQueue implements a persistent queue for transaction batches
2437
type BatchQueue struct {
25-
queue []coresequencer.Batch
38+
queue []queuedItem
2639
head int // index of the first element in the queue
2740
maxQueueSize int // maximum number of batches allowed in queue (0 = unlimited)
28-
mu sync.Mutex
29-
db ds.Batching
41+
42+
// Sequence numbers for generating new keys
43+
nextAddSeq uint64
44+
nextPrependSeq uint64
45+
46+
mu sync.Mutex
47+
db ds.Batching
3048
}
3149

3250
// NewBatchQueue creates a new BatchQueue with the specified maximum size.
3351
// If maxSize is 0, the queue will be unlimited.
3452
func NewBatchQueue(db ds.Batching, prefix string, maxSize int) *BatchQueue {
3553
return &BatchQueue{
36-
queue: make([]coresequencer.Batch, 0),
37-
head: 0,
38-
maxQueueSize: maxSize,
39-
db: store.NewPrefixKVStore(db, prefix),
54+
queue: make([]queuedItem, 0),
55+
head: 0,
56+
maxQueueSize: maxSize,
57+
db: store.NewPrefixKVStore(db, prefix),
58+
nextAddSeq: initialSeqNum,
59+
nextPrependSeq: initialSeqNum - 1,
4060
}
4161
}
4262

63+
// seqToKey converts a sequence number to a hex-encoded big-endian key.
64+
// We use big-endian so that lexicographical sort order matches numeric order.
65+
func seqToKey(seq uint64) string {
66+
b := make([]byte, 8)
67+
binary.BigEndian.PutUint64(b, seq)
68+
return hex.EncodeToString(b)
69+
}
70+
4371
// AddBatch adds a new transaction to the queue and writes it to the WAL.
4472
// Returns ErrQueueFull if the queue has reached its maximum size.
4573
func (bq *BatchQueue) AddBatch(ctx context.Context, batch coresequencer.Batch) error {
@@ -53,12 +81,14 @@ func (bq *BatchQueue) AddBatch(ctx context.Context, batch coresequencer.Batch) e
5381
return ErrQueueFull
5482
}
5583

56-
if err := bq.persistBatch(ctx, batch); err != nil {
84+
key := seqToKey(bq.nextAddSeq)
85+
if err := bq.persistBatch(ctx, batch, key); err != nil {
5786
return err
5887
}
88+
bq.nextAddSeq++
5989

6090
// Then add to in-memory queue
61-
bq.queue = append(bq.queue, batch)
91+
bq.queue = append(bq.queue, queuedItem{Batch: batch, Key: key})
6292

6393
return nil
6494
}
@@ -68,25 +98,27 @@ func (bq *BatchQueue) AddBatch(ctx context.Context, batch coresequencer.Batch) e
6898
// The batch is persisted to the DB to ensure durability in case of crashes.
6999
//
70100
// NOTE: Prepend intentionally bypasses the maxQueueSize limit to ensure high-priority
71-
// transactions can always be re-queued. This means the effective queue size may temporarily
72-
// exceed the configured maximum when Prepend is used. This is by design to prevent loss
73-
// of transactions that have already been accepted but couldn't fit in the current batch.
101+
// transactions can always be re-queued.
74102
func (bq *BatchQueue) Prepend(ctx context.Context, batch coresequencer.Batch) error {
75103
bq.mu.Lock()
76104
defer bq.mu.Unlock()
77105

78-
if err := bq.persistBatch(ctx, batch); err != nil {
106+
key := seqToKey(bq.nextPrependSeq)
107+
if err := bq.persistBatch(ctx, batch, key); err != nil {
79108
return err
80109
}
110+
bq.nextPrependSeq--
111+
112+
item := queuedItem{Batch: batch, Key: key}
81113

82114
// Then add to in-memory queue
83115
// If we have room before head, use it
84116
if bq.head > 0 {
85117
bq.head--
86-
bq.queue[bq.head] = batch
118+
bq.queue[bq.head] = item
87119
} else {
88120
// Need to expand the queue at the front
89-
bq.queue = append([]coresequencer.Batch{batch}, bq.queue...)
121+
bq.queue = append([]queuedItem{item}, bq.queue...)
90122
}
91123

92124
return nil
@@ -102,8 +134,9 @@ func (bq *BatchQueue) Next(ctx context.Context) (*coresequencer.Batch, error) {
102134
return &coresequencer.Batch{Transactions: nil}, nil
103135
}
104136

105-
batch := bq.queue[bq.head]
106-
bq.queue[bq.head] = coresequencer.Batch{} // Release memory for the dequeued element
137+
item := bq.queue[bq.head]
138+
// Release memory for the dequeued element
139+
bq.queue[bq.head] = queuedItem{}
107140
bq.head++
108141

109142
// Compact when head gets too large to prevent memory leaks
@@ -112,59 +145,102 @@ func (bq *BatchQueue) Next(ctx context.Context) (*coresequencer.Batch, error) {
112145
// frequent compactions on small queues
113146
if bq.head > len(bq.queue)/2 && bq.head > 100 {
114147
remaining := copy(bq.queue, bq.queue[bq.head:])
115-
// Zero out the rest of the slice to release memory
148+
// Zero out the rest of the slice
116149
for i := remaining; i < len(bq.queue); i++ {
117-
bq.queue[i] = coresequencer.Batch{}
150+
bq.queue[i] = queuedItem{}
118151
}
119152
bq.queue = bq.queue[:remaining]
120153
bq.head = 0
121154
}
122155

123-
hash, err := batch.Hash()
124-
if err != nil {
125-
return &coresequencer.Batch{Transactions: nil}, err
126-
}
127-
key := hex.EncodeToString(hash)
128-
129156
// Delete the batch from the WAL since it's been processed
130-
err = bq.db.Delete(ctx, ds.NewKey(key))
131-
if err != nil {
157+
// Use the stored key directly
158+
if err := bq.db.Delete(ctx, ds.NewKey(item.Key)); err != nil {
132159
// Log the error but continue
133160
fmt.Printf("Error deleting processed batch: %v\n", err)
134161
}
135162

136-
return &batch, nil
163+
return &item.Batch, nil
137164
}
138165

139166
// Load reloads all batches from WAL file into the in-memory queue after a crash or restart
140167
func (bq *BatchQueue) Load(ctx context.Context) error {
141168
bq.mu.Lock()
142169
defer bq.mu.Unlock()
143170

144-
// Clear the current queue
145-
bq.queue = make([]coresequencer.Batch, 0)
171+
// Clear the current queue and reset sequences
172+
bq.queue = make([]queuedItem, 0)
146173
bq.head = 0
174+
bq.nextAddSeq = initialSeqNum
175+
bq.nextPrependSeq = initialSeqNum - 1
147176

148-
q := query.Query{}
177+
q := query.Query{
178+
Orders: []query.Order{query.OrderByKey{}},
179+
}
149180
results, err := bq.db.Query(ctx, q)
150181
if err != nil {
151182
return fmt.Errorf("error querying datastore: %w", err)
152183
}
153184
defer results.Close()
154185

155-
// Load each batch
186+
var legacyItems []queuedItem
156187
for result := range results.Next() {
157188
if result.Error != nil {
158189
fmt.Printf("Error reading entry from datastore: %v\n", result.Error)
159190
continue
160191
}
161-
pbBatch := &pb.Batch{}
162-
err := proto.Unmarshal(result.Value, pbBatch)
192+
// We care about the last part of the key (the sequence number)
193+
// ds.Key usually has a leading slash.
194+
keyName := ds.NewKey(result.Key).Name()
195+
196+
var pbBatch pb.Batch
197+
err := proto.Unmarshal(result.Value, &pbBatch)
163198
if err != nil {
164-
fmt.Printf("Error decoding batch for key '%s': %v. Skipping entry.\n", result.Key, err)
199+
fmt.Printf("Error decoding batch for key '%s': %v. Skipping entry.\n", keyName, err)
200+
continue
201+
}
202+
203+
batch := coresequencer.Batch{Transactions: pbBatch.Txs}
204+
205+
// Check if key is valid hex sequence number (16 hex chars)
206+
// We use strict 16 check because seqToKey always produces 16 hex chars.
207+
isValid := false
208+
if len(keyName) == 16 {
209+
if seq, err := strconv.ParseUint(keyName, 16, 64); err == nil {
210+
isValid = true
211+
if seq >= bq.nextAddSeq {
212+
bq.nextAddSeq = seq + 1
213+
}
214+
if seq <= bq.nextPrependSeq {
215+
bq.nextPrependSeq = seq - 1
216+
}
217+
}
218+
}
219+
if isValid {
220+
bq.queue = append(bq.queue, queuedItem{Batch: batch, Key: keyName})
221+
} else {
222+
legacyItems = append(legacyItems, queuedItem{Batch: batch, Key: result.Key})
223+
}
224+
}
225+
if len(legacyItems) == 0 {
226+
return nil
227+
}
228+
fmt.Printf("Found %d legacy items to migrate...\n", len(legacyItems))
229+
230+
for _, item := range legacyItems {
231+
newKeyName := seqToKey(bq.nextAddSeq)
232+
233+
if err := bq.persistBatch(ctx, item.Batch, newKeyName); err != nil {
234+
fmt.Printf("Failed to migrate legacy item %s: %v\n", item.Key, err)
165235
continue
166236
}
167-
bq.queue = append(bq.queue, coresequencer.Batch{Transactions: pbBatch.Txs})
237+
238+
if err := bq.db.Delete(ctx, ds.NewKey(item.Key)); err != nil {
239+
fmt.Printf("Failed to delete legacy key %s after migration: %v\n", item.Key, err)
240+
}
241+
242+
bq.queue = append(bq.queue, queuedItem{Batch: item.Batch, Key: newKeyName})
243+
bq.nextAddSeq++
168244
}
169245

170246
return nil
@@ -178,14 +254,8 @@ func (bq *BatchQueue) Size() int {
178254
return len(bq.queue) - bq.head
179255
}
180256

181-
// persistBatch persists a batch to the datastore
182-
func (bq *BatchQueue) persistBatch(ctx context.Context, batch coresequencer.Batch) error {
183-
hash, err := batch.Hash()
184-
if err != nil {
185-
return err
186-
}
187-
key := hex.EncodeToString(hash)
188-
257+
// persistBatch persists a batch to the datastore with the given key
258+
func (bq *BatchQueue) persistBatch(ctx context.Context, batch coresequencer.Batch, key string) error {
189259
pbBatch := &pb.Batch{
190260
Txs: batch.Transactions,
191261
}
@@ -195,7 +265,7 @@ func (bq *BatchQueue) persistBatch(ctx context.Context, batch coresequencer.Batc
195265
return err
196266
}
197267

198-
// First write to DB for durability
268+
// Write to DB
199269
if err := bq.db.Put(ctx, ds.NewKey(key), encodedBatch); err != nil {
200270
return err
201271
}

0 commit comments

Comments
 (0)