Skip to content

Commit c26a567

Browse files
committed
Use seq with sequencer queue
1 parent c68477c commit c26a567

File tree

2 files changed

+127
-48
lines changed

2 files changed

+127
-48
lines changed

sequencers/single/queue.go

Lines changed: 83 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,11 @@ package single
22

33
import (
44
"context"
5+
"encoding/binary"
56
"encoding/hex"
67
"errors"
78
"fmt"
9+
"strconv"
810
"sync"
911

1012
ds "github.com/ipfs/go-datastore"
@@ -20,30 +22,56 @@ import (
2022
// ErrQueueFull is returned when the batch queue has reached its maximum size
2123
var ErrQueueFull = errors.New("batch queue is full")
2224

25+
// initialSeqNum is the starting sequence number for new queues.
26+
// It is set to the middle of the uint64 range to allow for both
27+
// appending (incrementing) and prepending (decrementing) transactions.
28+
const initialSeqNum = uint64(0x8000000000000000)
29+
2330
func newPrefixKV(kvStore ds.Batching, prefix string) ds.Batching {
2431
return ktds.Wrap(kvStore, ktds.PrefixTransform{Prefix: ds.NewKey(prefix)})
2532
}
2633

34+
// queuedItem holds a batch and its associated persistence key
35+
type queuedItem struct {
36+
Batch coresequencer.Batch
37+
Key string
38+
}
39+
2740
// BatchQueue implements a persistent queue for transaction batches
2841
type BatchQueue struct {
29-
queue []coresequencer.Batch
42+
queue []queuedItem
3043
head int // index of the first element in the queue
3144
maxQueueSize int // maximum number of batches allowed in queue (0 = unlimited)
32-
mu sync.Mutex
33-
db ds.Batching
45+
46+
// Sequence numbers for generating new keys
47+
nextAddSeq uint64
48+
nextPrependSeq uint64
49+
50+
mu sync.Mutex
51+
db ds.Batching
3452
}
3553

3654
// NewBatchQueue creates a new BatchQueue with the specified maximum size.
3755
// If maxSize is 0, the queue will be unlimited.
3856
func NewBatchQueue(db ds.Batching, prefix string, maxSize int) *BatchQueue {
3957
return &BatchQueue{
40-
queue: make([]coresequencer.Batch, 0),
41-
head: 0,
42-
maxQueueSize: maxSize,
43-
db: newPrefixKV(db, prefix),
58+
queue: make([]queuedItem, 0),
59+
head: 0,
60+
maxQueueSize: maxSize,
61+
db: newPrefixKV(db, prefix),
62+
nextAddSeq: initialSeqNum,
63+
nextPrependSeq: initialSeqNum - 1,
4464
}
4565
}
4666

67+
// seqToKey converts a sequence number to a hex-encoded big-endian key.
68+
// We use big-endian so that lexicographical sort order matches numeric order.
69+
func seqToKey(seq uint64) string {
70+
b := make([]byte, 8)
71+
binary.BigEndian.PutUint64(b, seq)
72+
return hex.EncodeToString(b)
73+
}
74+
4775
// AddBatch adds a new transaction to the queue and writes it to the WAL.
4876
// Returns ErrQueueFull if the queue has reached its maximum size.
4977
func (bq *BatchQueue) AddBatch(ctx context.Context, batch coresequencer.Batch) error {
@@ -57,12 +85,14 @@ func (bq *BatchQueue) AddBatch(ctx context.Context, batch coresequencer.Batch) e
5785
return ErrQueueFull
5886
}
5987

60-
if err := bq.persistBatch(ctx, batch); err != nil {
88+
key := seqToKey(bq.nextAddSeq)
89+
if err := bq.persistBatch(ctx, batch, key); err != nil {
6190
return err
6291
}
92+
bq.nextAddSeq++
6393

6494
// Then add to in-memory queue
65-
bq.queue = append(bq.queue, batch)
95+
bq.queue = append(bq.queue, queuedItem{Batch: batch, Key: key})
6696

6797
return nil
6898
}
@@ -72,25 +102,27 @@ func (bq *BatchQueue) AddBatch(ctx context.Context, batch coresequencer.Batch) e
72102
// The batch is persisted to the DB to ensure durability in case of crashes.
73103
//
74104
// NOTE: Prepend intentionally bypasses the maxQueueSize limit to ensure high-priority
75-
// transactions can always be re-queued. This means the effective queue size may temporarily
76-
// exceed the configured maximum when Prepend is used. This is by design to prevent loss
77-
// of transactions that have already been accepted but couldn't fit in the current batch.
105+
// transactions can always be re-queued.
78106
func (bq *BatchQueue) Prepend(ctx context.Context, batch coresequencer.Batch) error {
79107
bq.mu.Lock()
80108
defer bq.mu.Unlock()
81109

82-
if err := bq.persistBatch(ctx, batch); err != nil {
110+
key := seqToKey(bq.nextPrependSeq)
111+
if err := bq.persistBatch(ctx, batch, key); err != nil {
83112
return err
84113
}
114+
bq.nextPrependSeq--
115+
116+
item := queuedItem{Batch: batch, Key: key}
85117

86118
// Then add to in-memory queue
87119
// If we have room before head, use it
88120
if bq.head > 0 {
89121
bq.head--
90-
bq.queue[bq.head] = batch
122+
bq.queue[bq.head] = item
91123
} else {
92124
// Need to expand the queue at the front
93-
bq.queue = append([]coresequencer.Batch{batch}, bq.queue...)
125+
bq.queue = append([]queuedItem{item}, bq.queue...)
94126
}
95127

96128
return nil
@@ -106,8 +138,9 @@ func (bq *BatchQueue) Next(ctx context.Context) (*coresequencer.Batch, error) {
106138
return &coresequencer.Batch{Transactions: nil}, nil
107139
}
108140

109-
batch := bq.queue[bq.head]
110-
bq.queue[bq.head] = coresequencer.Batch{} // Release memory for the dequeued element
141+
item := bq.queue[bq.head]
142+
// Release memory for the dequeued element
143+
bq.queue[bq.head] = queuedItem{}
111144
bq.head++
112145

113146
// Compact when head gets too large to prevent memory leaks
@@ -116,59 +149,72 @@ func (bq *BatchQueue) Next(ctx context.Context) (*coresequencer.Batch, error) {
116149
// frequent compactions on small queues
117150
if bq.head > len(bq.queue)/2 && bq.head > 100 {
118151
remaining := copy(bq.queue, bq.queue[bq.head:])
119-
// Zero out the rest of the slice to release memory
152+
// Zero out the rest of the slice
120153
for i := remaining; i < len(bq.queue); i++ {
121-
bq.queue[i] = coresequencer.Batch{}
154+
bq.queue[i] = queuedItem{}
122155
}
123156
bq.queue = bq.queue[:remaining]
124157
bq.head = 0
125158
}
126159

127-
hash, err := batch.Hash()
128-
if err != nil {
129-
return &coresequencer.Batch{Transactions: nil}, err
130-
}
131-
key := hex.EncodeToString(hash)
132-
133160
// Delete the batch from the WAL since it's been processed
134-
err = bq.db.Delete(ctx, ds.NewKey(key))
135-
if err != nil {
161+
// Use the stored key directly
162+
if err := bq.db.Delete(ctx, ds.NewKey(item.Key)); err != nil {
136163
// Log the error but continue
137164
fmt.Printf("Error deleting processed batch: %v\n", err)
138165
}
139166

140-
return &batch, nil
167+
return &item.Batch, nil
141168
}
142169

143170
// Load reloads all batches from WAL file into the in-memory queue after a crash or restart
144171
func (bq *BatchQueue) Load(ctx context.Context) error {
145172
bq.mu.Lock()
146173
defer bq.mu.Unlock()
147174

148-
// Clear the current queue
149-
bq.queue = make([]coresequencer.Batch, 0)
175+
// Clear the current queue and reset sequences
176+
bq.queue = make([]queuedItem, 0)
150177
bq.head = 0
178+
bq.nextAddSeq = initialSeqNum
179+
bq.nextPrependSeq = initialSeqNum - 1
151180

152-
q := query.Query{}
181+
q := query.Query{
182+
Orders: []query.Order{query.OrderByKey{}},
183+
}
153184
results, err := bq.db.Query(ctx, q)
154185
if err != nil {
155186
return fmt.Errorf("error querying datastore: %w", err)
156187
}
157188
defer results.Close()
158189

159-
// Load each batch
160190
for result := range results.Next() {
161191
if result.Error != nil {
162192
fmt.Printf("Error reading entry from datastore: %v\n", result.Error)
163193
continue
164194
}
195+
// We care about the last part of the key (the sequence number)
196+
// ds.Key usually has a leading slash.
197+
key := ds.NewKey(result.Key).Name()
198+
165199
pbBatch := &pb.Batch{}
166200
err := proto.Unmarshal(result.Value, pbBatch)
167201
if err != nil {
168-
fmt.Printf("Error decoding batch for key '%s': %v. Skipping entry.\n", result.Key, err)
202+
fmt.Printf("Error decoding batch for key '%s': %v. Skipping entry.\n", key, err)
169203
continue
170204
}
171-
bq.queue = append(bq.queue, coresequencer.Batch{Transactions: pbBatch.Txs})
205+
206+
batch := coresequencer.Batch{Transactions: pbBatch.Txs}
207+
bq.queue = append(bq.queue, queuedItem{Batch: batch, Key: key})
208+
209+
// Update sequences based on loaded keys to avoid collisions
210+
if seq, err := strconv.ParseUint(key, 16, 64); err == nil {
211+
if seq >= bq.nextAddSeq {
212+
bq.nextAddSeq = seq + 1
213+
}
214+
if seq <= bq.nextPrependSeq {
215+
bq.nextPrependSeq = seq - 1
216+
}
217+
}
172218
}
173219

174220
return nil
@@ -182,14 +228,8 @@ func (bq *BatchQueue) Size() int {
182228
return len(bq.queue) - bq.head
183229
}
184230

185-
// persistBatch persists a batch to the datastore
186-
func (bq *BatchQueue) persistBatch(ctx context.Context, batch coresequencer.Batch) error {
187-
hash, err := batch.Hash()
188-
if err != nil {
189-
return err
190-
}
191-
key := hex.EncodeToString(hash)
192-
231+
// persistBatch persists a batch to the datastore with the given key
232+
func (bq *BatchQueue) persistBatch(ctx context.Context, batch coresequencer.Batch, key string) error {
193233
pbBatch := &pb.Batch{
194234
Txs: batch.Transactions,
195235
}
@@ -199,7 +239,7 @@ func (bq *BatchQueue) persistBatch(ctx context.Context, batch coresequencer.Batc
199239
return err
200240
}
201241

202-
// First write to DB for durability
242+
// Write to DB
203243
if err := bq.db.Put(ctx, ds.NewKey(key), encodedBatch); err != nil {
204244
return err
205245
}

sequencers/single/queue_test.go

Lines changed: 44 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -227,24 +227,27 @@ func TestLoad_WithMixedData(t *testing.T) {
227227
require.NotNil(bq)
228228

229229
// 1. Add valid batch data under the correct prefix
230+
// Use valid hex sequence keys to ensure Load parses them correctly if needed
231+
key1 := "8000000000000001"
230232
validBatch1 := createTestBatch(t, 3)
231233
hash1, err := validBatch1.Hash()
232234
require.NoError(err)
233235
hexHash1 := hex.EncodeToString(hash1)
234236
pbBatch1 := &pb.Batch{Txs: validBatch1.Transactions}
235237
encodedBatch1, err := proto.Marshal(pbBatch1)
236238
require.NoError(err)
237-
err = rawDB.Put(ctx, ds.NewKey(queuePrefix+hexHash1), encodedBatch1)
239+
err = rawDB.Put(ctx, ds.NewKey(queuePrefix+key1), encodedBatch1)
238240
require.NoError(err)
239241

242+
key2 := "8000000000000002"
240243
validBatch2 := createTestBatch(t, 5)
241244
hash2, err := validBatch2.Hash()
242245
require.NoError(err)
243246
hexHash2 := hex.EncodeToString(hash2)
244247
pbBatch2 := &pb.Batch{Txs: validBatch2.Transactions}
245248
encodedBatch2, err := proto.Marshal(pbBatch2)
246249
require.NoError(err)
247-
err = rawDB.Put(ctx, ds.NewKey(queuePrefix+hexHash2), encodedBatch2)
250+
err = rawDB.Put(ctx, ds.NewKey(queuePrefix+key2), encodedBatch2)
248251
require.NoError(err)
249252

250253
// 3. Add data outside the queue's prefix
@@ -257,8 +260,8 @@ func TestLoad_WithMixedData(t *testing.T) {
257260

258261
// Ensure all data is initially present in the raw DB
259262
initialKeys := map[string]bool{
260-
queuePrefix + hexHash1: true,
261-
queuePrefix + hexHash2: true,
263+
queuePrefix + key1: true,
264+
queuePrefix + key2: true,
262265
otherDataKey1.String(): true,
263266
otherDataKey2.String(): true,
264267
}
@@ -286,7 +289,7 @@ func TestLoad_WithMixedData(t *testing.T) {
286289
loadedHashes := make(map[string]bool)
287290
bq.mu.Lock()
288291
for i := bq.head; i < len(bq.queue); i++ {
289-
h, _ := bq.queue[i].Hash()
292+
h, _ := bq.queue[i].Batch.Hash()
290293
loadedHashes[hex.EncodeToString(h)] = true
291294
}
292295
bq.mu.Unlock()
@@ -302,6 +305,42 @@ func TestLoad_WithMixedData(t *testing.T) {
302305
require.Equal([]byte("more data"), val)
303306
}
304307

308+
func TestBatchQueue_Load_SetsSequencesProperly(t *testing.T) {
309+
ctx := context.Background()
310+
db := ds.NewMapDatastore()
311+
prefix := "test-load-sequences"
312+
313+
// Build some persisted state with both AddBatch and Prepend so we have
314+
// keys on both sides of the initialSeqNum.
315+
q1 := NewBatchQueue(db, prefix, 0)
316+
require.NoError(t, q1.Load(ctx))
317+
318+
require.NoError(t, q1.AddBatch(ctx, coresequencer.Batch{Transactions: [][]byte{[]byte("add-1")}})) // initialSeqNum
319+
require.NoError(t, q1.AddBatch(ctx, coresequencer.Batch{Transactions: [][]byte{[]byte("add-2")}})) // initialSeqNum+1
320+
321+
require.NoError(t, q1.Prepend(ctx, coresequencer.Batch{Transactions: [][]byte{[]byte("pre-1")}})) // initialSeqNum-1
322+
require.NoError(t, q1.Prepend(ctx, coresequencer.Batch{Transactions: [][]byte{[]byte("pre-2")}})) // initialSeqNum-2
323+
324+
// Simulate restart.
325+
q2 := NewBatchQueue(db, prefix, 0)
326+
require.NoError(t, q2.Load(ctx))
327+
328+
// After Load(), the sequencers should be positioned to avoid collisions:
329+
// - nextAddSeq should be (maxSeq + 1)
330+
// - nextPrependSeq should be (minSeq - 1)
331+
require.Equal(t, initialSeqNum+2, q2.nextAddSeq, "nextAddSeq should continue after the max loaded key")
332+
require.Equal(t, initialSeqNum-3, q2.nextPrependSeq, "nextPrependSeq should continue before the min loaded key")
333+
334+
// Verify we actually use those sequences when persisting new items.
335+
require.NoError(t, q2.AddBatch(ctx, coresequencer.Batch{Transactions: [][]byte{[]byte("add-after-load")}}))
336+
_, err := q2.db.Get(ctx, ds.NewKey(seqToKey(initialSeqNum+2)))
337+
require.NoError(t, err, "expected AddBatch after Load to persist using nextAddSeq key")
338+
339+
require.NoError(t, q2.Prepend(ctx, coresequencer.Batch{Transactions: [][]byte{[]byte("pre-after-load")}}))
340+
_, err = q2.db.Get(ctx, ds.NewKey(seqToKey(initialSeqNum-3)))
341+
require.NoError(t, err, "expected Prepend after Load to persist using nextPrependSeq key")
342+
}
343+
305344
func TestConcurrency(t *testing.T) {
306345
bq := setupTestQueue(t)
307346
ctx := context.Background()

0 commit comments

Comments
 (0)