Skip to content

Commit 580fa29

Browse files
committed
feat: l1infotree publish GER reorged
1 parent 7fe13be commit 580fa29

4 files changed

Lines changed: 335 additions & 12 deletions

File tree

l1infotreesync/l1infotreesync.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -487,3 +487,27 @@ func (s *L1InfoTreeSync) IsUpToDate(ctx context.Context, l1Client aggkittypes.Ba
487487

488488
return lastProcessedBlock >= finalizedBlock.Number.Uint64(), nil
489489
}
490+
491+
// SubscribeToGERReorg allows subscribers to receive notifications when GERs are removed due to reorgs.
492+
// The returned channel will receive GERReorgEvent instances containing information about the reorged block
493+
// and all affected L1InfoTreeLeaf entries.
494+
//
495+
// Parameters:
496+
// - subscriberName: A unique identifier for the subscriber (used for logging and debugging)
497+
//
498+
// Returns:
499+
// - A receive-only channel that will receive GERReorgEvent notifications
500+
//
501+
// Example usage:
502+
//
503+
// reorgCh := l1InfoTreeSync.SubscribeToGERReorg("aggsender")
504+
// go func() {
505+
// for event := range reorgCh {
506+
// log.Infof("Received reorg affecting %d GERs from block %d",
507+
// len(event.ReorgedLeaves), event.FirstReorgedBlock)
508+
// // Handle the reorg event
509+
// }
510+
// }()
511+
func (s *L1InfoTreeSync) SubscribeToGERReorg(subscriberName string) <-chan GERReorgEvent {
512+
return s.processor.gerReorgNotifier.Subscribe(subscriberName)
513+
}

l1infotreesync/l1infotreesync_test.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -332,3 +332,44 @@ func TestL1InfoTreeSync_GetCompletionPercentage(t *testing.T) {
332332
mockEVMDriver.EXPECT().GetCompletionPercentage().Return(&percent).Once()
333333
require.Equal(t, &percent, s.GetCompletionPercentage())
334334
}
335+
336+
func TestL1InfoTreeSync_SubscribeToGERReorg(t *testing.T) {
337+
t.Parallel()
338+
339+
ctx := context.Background()
340+
dbPath := path.Join(t.TempDir(), "l1infotreesync_subscribe_reorg.sqlite")
341+
342+
l1InfoTreeSync, err := NewReadOnly(ctx, dbPath)
343+
require.NoError(t, err)
344+
345+
// Subscribe via public interface
346+
reorgCh := l1InfoTreeSync.SubscribeToGERReorg("test-consumer")
347+
348+
// Create state
349+
info := &UpdateL1InfoTree{
350+
MainnetExitRoot: common.HexToHash("beef"),
351+
RollupExitRoot: common.HexToHash("5ca1e"),
352+
ParentHash: common.HexToHash("1010101"),
353+
Timestamp: 420,
354+
BlockPosition: 0,
355+
}
356+
err = l1InfoTreeSync.processor.ProcessBlock(ctx, sync.Block{
357+
Num: 1,
358+
Hash: common.HexToHash("block1"),
359+
Events: []interface{}{Event{UpdateL1InfoTree: info}},
360+
})
361+
require.NoError(t, err)
362+
363+
// Trigger reorg
364+
err = l1InfoTreeSync.processor.Reorg(ctx, 1)
365+
require.NoError(t, err)
366+
367+
// Verify event received
368+
select {
369+
case event := <-reorgCh:
370+
require.Equal(t, uint64(1), event.FirstReorgedBlock)
371+
require.Len(t, event.ReorgedLeaves, 1)
372+
case <-time.After(1 * time.Second):
373+
t.Fatal("timeout waiting for reorg event")
374+
}
375+
}

l1infotreesync/processor.go

Lines changed: 51 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"errors"
77
"fmt"
88
mutex "sync"
9+
"time"
910

1011
aggkitcommon "github.com/agglayer/aggkit/common"
1112
"github.com/agglayer/aggkit/db"
@@ -28,13 +29,14 @@ var (
2829
)
2930

3031
type processor struct {
31-
db *sql.DB
32-
l1InfoTree treetypes.FullTreer
33-
rollupExitTree treetypes.FullTreer
34-
mu mutex.RWMutex
35-
halted bool
36-
haltedReason string
37-
log *log.Logger
32+
db *sql.DB
33+
l1InfoTree treetypes.FullTreer
34+
rollupExitTree treetypes.FullTreer
35+
mu mutex.RWMutex
36+
halted bool
37+
haltedReason string
38+
log *log.Logger
39+
gerReorgNotifier aggkitcommon.PubSub[GERReorgEvent]
3840
}
3941

4042
// UpdateL1InfoTree representation of the UpdateL1InfoTree event
@@ -121,6 +123,18 @@ func (l *L1InfoTreeInitial) String() string {
121123
return fmt.Sprintf("BlockNumber: %d, LeafCount: %d, L1InfoRoot: %s", l.BlockNumber, l.LeafCount, l.L1InfoRoot.String())
122124
}
123125

126+
// GERReorgEvent represents information about GERs removed during a reorg
127+
type GERReorgEvent struct {
128+
FirstReorgedBlock uint64 // Block number where reorg started
129+
ReorgedLeaves []*L1InfoTreeLeaf // All affected L1InfoTree leaves
130+
Timestamp uint64 // Unix timestamp when reorg occurred
131+
}
132+
133+
func (g *GERReorgEvent) String() string {
134+
return fmt.Sprintf("GERReorgEvent{FirstReorgedBlock: %d, ReorgedLeaves: %d, Timestamp: %d}",
135+
g.FirstReorgedBlock, len(g.ReorgedLeaves), g.Timestamp)
136+
}
137+
124138
// Hash as expected by the tree
125139
func (l *L1InfoTreeLeaf) GetHash() common.Hash {
126140
rawTimestamp := aggkitcommon.Uint64ToBigEndianBytes(l.Timestamp)
@@ -148,10 +162,11 @@ func newProcessor(dbPath string) (*processor, error) {
148162
return nil, err
149163
}
150164
return &processor{
151-
db: database,
152-
l1InfoTree: tree.NewAppendOnlyTree(database, migrations.L1InfoTreePrefix),
153-
rollupExitTree: tree.NewUpdatableTree(database, migrations.RollupExitTreePrefix),
154-
log: log.WithFields("processor", "l1infotreesync"),
165+
db: database,
166+
l1InfoTree: tree.NewAppendOnlyTree(database, migrations.L1InfoTreePrefix),
167+
rollupExitTree: tree.NewUpdatableTree(database, migrations.RollupExitTreePrefix),
168+
log: log.WithFields("processor", "l1infotreesync"),
169+
gerReorgNotifier: aggkitcommon.NewGenericSubscriber[GERReorgEvent](),
155170
}, nil
156171
}
157172

@@ -327,6 +342,18 @@ func (p *processor) Reorg(ctx context.Context, firstReorgedBlock uint64) error {
327342
}
328343
}()
329344

345+
// Query affected L1InfoTreeLeaves BEFORE cascade delete
346+
var reorgedLeaves []*L1InfoTreeLeaf
347+
err = meddler.QueryAll(tx, &reorgedLeaves,
348+
`SELECT * FROM l1info_leaf WHERE block_num >= $1 ORDER BY block_num ASC, block_pos ASC;`,
349+
firstReorgedBlock)
350+
if err != nil && !errors.Is(err, sql.ErrNoRows) {
351+
return fmt.Errorf("failed to query affected l1info_leaf entries: %w", err)
352+
}
353+
354+
p.log.Debugf("found %d l1info_leaf entries to be reorged from block %d",
355+
len(reorgedLeaves), firstReorgedBlock)
356+
330357
res, err := tx.Exec(`DELETE FROM block WHERE num >= $1;`, firstReorgedBlock)
331358
if err != nil {
332359
return err
@@ -349,12 +376,24 @@ func (p *processor) Reorg(ctx context.Context, firstReorgedBlock uint64) error {
349376
return err
350377
}
351378

352-
p.log.Infof("reorged to block %d, %d rows affected", firstReorgedBlock, rowsAffected)
379+
p.log.Infof("reorged to block %d, %d block rows affected, %d l1info_leaf entries removed",
380+
firstReorgedBlock, rowsAffected, len(reorgedLeaves))
353381

354382
shouldRollback = false
355383

356384
if rowsAffected > 0 {
357385
p.unhalt()
386+
387+
// Publish notification ONLY if there were affected leaves
388+
if len(reorgedLeaves) > 0 {
389+
event := GERReorgEvent{
390+
FirstReorgedBlock: firstReorgedBlock,
391+
ReorgedLeaves: reorgedLeaves,
392+
Timestamp: uint64(time.Now().Unix()),
393+
}
394+
p.log.Infof("publishing GER reorg event: %s", event.String())
395+
p.gerReorgNotifier.Publish(event)
396+
}
358397
}
359398
return nil
360399
}

0 commit comments

Comments
 (0)