Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sei-tendermint/internal/autobahn/avail/inner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func TestPruneMismatchedIndices(t *testing.T) {
committee, keys := types.GenCommittee(rng, 4)
ds := data.NewState(&data.Config{
Committee: committee,
}, utils.None[data.BlockStore]())
}, utils.None[data.BlockStore](), utils.OrPanic1(data.NewDataWAL(utils.None[string]())))
state, err := NewState(keys[0], ds, utils.None[string]())
require.NoError(t, err)

Expand Down
30 changes: 15 additions & 15 deletions sei-tendermint/internal/autobahn/avail/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func testState(t *testing.T, stateDir utils.Option[string]) {
if err := scope.Run(ctx, func(ctx context.Context, s scope.Scope) error {
ds := data.NewState(&data.Config{
Committee: committee,
}, utils.None[data.BlockStore]())
}, utils.None[data.BlockStore](), utils.OrPanic1(data.NewDataWAL(utils.None[string]())))
s.SpawnBgNamed("data.State.Run()", func() error {
return utils.IgnoreCancel(ds.Run(ctx))
})
Expand Down Expand Up @@ -237,7 +237,7 @@ func TestStateRestartFromPersisted(t *testing.T) {
var wantNextBlocks map[types.LaneID]types.BlockNumber

require.NoError(t, scope.Run(t.Context(), func(ctx context.Context, s scope.Scope) error {
ds := data.NewState(&data.Config{Committee: committee}, utils.None[data.BlockStore]())
ds := data.NewState(&data.Config{Committee: committee}, utils.None[data.BlockStore](), utils.OrPanic1(data.NewDataWAL(utils.None[string]())))
s.SpawnBgNamed("data.Run", func() error {
return utils.IgnoreCancel(ds.Run(ctx))
})
Expand Down Expand Up @@ -312,7 +312,7 @@ func TestStateRestartFromPersisted(t *testing.T) {
}))

// Phase 2: Restart from the same directory.
ds2 := data.NewState(&data.Config{Committee: committee}, utils.None[data.BlockStore]())
ds2 := data.NewState(&data.Config{Committee: committee}, utils.None[data.BlockStore](), utils.OrPanic1(data.NewDataWAL(utils.None[string]())))
state2, err := NewState(keys[0], ds2, utils.Some(dir))
require.NoError(t, err)

Expand All @@ -337,7 +337,7 @@ func TestStateMismatchedQCs(t *testing.T) {

ds := data.NewState(&data.Config{
Committee: committee,
}, utils.None[data.BlockStore]())
}, utils.None[data.BlockStore](), utils.OrPanic1(data.NewDataWAL(utils.None[string]())))
state, err := NewState(keys[0], ds, utils.None[string]())
require.NoError(t, err)
ctx := t.Context()
Expand Down Expand Up @@ -394,7 +394,7 @@ func TestPushBlockRejectsBadParentHash(t *testing.T) {

ds := data.NewState(&data.Config{
Committee: committee,
}, utils.None[data.BlockStore]())
}, utils.None[data.BlockStore](), utils.OrPanic1(data.NewDataWAL(utils.None[string]())))
state := utils.OrPanic1(NewState(keys[0], ds, utils.None[string]()))

// Produce a valid first block on our lane.
Expand All @@ -419,7 +419,7 @@ func TestPushBlockRejectsWrongSigner(t *testing.T) {

ds := data.NewState(&data.Config{
Committee: committee,
}, utils.None[data.BlockStore]())
}, utils.None[data.BlockStore](), utils.OrPanic1(data.NewDataWAL(utils.None[string]())))
state := utils.OrPanic1(NewState(keys[0], ds, utils.None[string]()))

// Create a block on keys[0]'s lane but sign it with keys[1].
Expand All @@ -437,7 +437,7 @@ func TestNewStateWithPersistence(t *testing.T) {

t.Run("empty dir loads fresh state", func(t *testing.T) {
dir := t.TempDir()
ds := data.NewState(&data.Config{Committee: committee}, utils.None[data.BlockStore]())
ds := data.NewState(&data.Config{Committee: committee}, utils.None[data.BlockStore](), utils.OrPanic1(data.NewDataWAL(utils.None[string]())))

state, err := NewState(keys[0], ds, utils.Some(dir))
require.NoError(t, err)
Expand All @@ -450,7 +450,7 @@ func TestNewStateWithPersistence(t *testing.T) {

t.Run("loads persisted AppQC", func(t *testing.T) {
dir := t.TempDir()
ds := data.NewState(&data.Config{Committee: committee}, utils.None[data.BlockStore]())
ds := data.NewState(&data.Config{Committee: committee}, utils.None[data.BlockStore](), utils.OrPanic1(data.NewDataWAL(utils.None[string]())))

roadIdx := types.RoadIndex(7)
globalNum := types.GlobalBlockNumber(50)
Expand Down Expand Up @@ -491,7 +491,7 @@ func TestNewStateWithPersistence(t *testing.T) {

t.Run("loads persisted blocks", func(t *testing.T) {
dir := t.TempDir()
ds := data.NewState(&data.Config{Committee: committee}, utils.None[data.BlockStore]())
ds := data.NewState(&data.Config{Committee: committee}, utils.None[data.BlockStore](), utils.OrPanic1(data.NewDataWAL(utils.None[string]())))
lane := keys[0].Public()

// Persist blocks using BlockPersister.
Expand All @@ -515,7 +515,7 @@ func TestNewStateWithPersistence(t *testing.T) {

t.Run("loads persisted AppQC and blocks together", func(t *testing.T) {
dir := t.TempDir()
ds := data.NewState(&data.Config{Committee: committee}, utils.None[data.BlockStore]())
ds := data.NewState(&data.Config{Committee: committee}, utils.None[data.BlockStore](), utils.OrPanic1(data.NewDataWAL(utils.None[string]())))
lane := keys[0].Public()

roadIdx := types.RoadIndex(2)
Expand Down Expand Up @@ -568,7 +568,7 @@ func TestNewStateWithPersistence(t *testing.T) {

t.Run("loads persisted commitQCs", func(t *testing.T) {
dir := t.TempDir()
ds := data.NewState(&data.Config{Committee: committee}, utils.None[data.BlockStore]())
ds := data.NewState(&data.Config{Committee: committee}, utils.None[data.BlockStore](), utils.OrPanic1(data.NewDataWAL(utils.None[string]())))

// Persist CommitQCs to disk.
cp, _, err := persist.NewCommitQCPersister(utils.Some(dir))
Expand All @@ -593,7 +593,7 @@ func TestNewStateWithPersistence(t *testing.T) {

t.Run("loads persisted commitQCs with AppQC", func(t *testing.T) {
dir := t.TempDir()
ds := data.NewState(&data.Config{Committee: committee}, utils.None[data.BlockStore]())
ds := data.NewState(&data.Config{Committee: committee}, utils.None[data.BlockStore](), utils.OrPanic1(data.NewDataWAL(utils.None[string]())))

// Persist AppQC at road index 1.
roadIdx := types.RoadIndex(1)
Expand Down Expand Up @@ -666,7 +666,7 @@ func TestNewStateWithPersistence(t *testing.T) {

t.Run("anchor past all persisted commitQCs truncates WAL", func(t *testing.T) {
dir := t.TempDir()
ds := data.NewState(&data.Config{Committee: committee}, utils.None[data.BlockStore]())
ds := data.NewState(&data.Config{Committee: committee}, utils.None[data.BlockStore](), utils.OrPanic1(data.NewDataWAL(utils.None[string]())))

// Build a chain of 10 CommitQCs (indices 0-9).
qcs := make([]*types.CommitQC, 10)
Expand Down Expand Up @@ -709,7 +709,7 @@ func TestNewStateWithPersistence(t *testing.T) {

t.Run("anchor past all persisted blocks truncates lane WAL", func(t *testing.T) {
dir := t.TempDir()
ds := data.NewState(&data.Config{Committee: committee}, utils.None[data.BlockStore]())
ds := data.NewState(&data.Config{Committee: committee}, utils.None[data.BlockStore](), utils.OrPanic1(data.NewDataWAL(utils.None[string]())))
lane := keys[0].Public()

// Persist commitQCs 0-9 and blocks 0-2 for one lane.
Expand Down Expand Up @@ -757,7 +757,7 @@ func TestNewStateWithPersistence(t *testing.T) {

t.Run("corrupt AppQC data returns error", func(t *testing.T) {
dir := t.TempDir()
ds := data.NewState(&data.Config{Committee: committee}, utils.None[data.BlockStore]())
ds := data.NewState(&data.Config{Committee: committee}, utils.None[data.BlockStore](), utils.OrPanic1(data.NewDataWAL(utils.None[string]())))

// Write a valid PersistedWrapper whose Data payload is garbage.
// This simulates corruption at the application data level while
Expand Down
2 changes: 1 addition & 1 deletion sei-tendermint/internal/autobahn/consensus/inner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -938,7 +938,7 @@ func TestRunOutputsPersistErrorPropagates(t *testing.T) {
// and terminates the consensus component (instead of panicking).
rng := utils.TestRng()
committee, keys := types.GenCommittee(rng, 4)
ds := data.NewState(&data.Config{Committee: committee}, utils.None[data.BlockStore]())
ds := data.NewState(&data.Config{Committee: committee}, utils.None[data.BlockStore](), utils.OrPanic1(data.NewDataWAL(utils.None[string]())))

wantErr := errors.New("disk on fire")
pers := utils.Some[persist.Persister[*pb.PersistedInner]](failPersister[*pb.PersistedInner]{err: wantErr})
Expand Down
207 changes: 207 additions & 0 deletions sei-tendermint/internal/autobahn/consensus/persist/globalblocks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
package persist

import (
"encoding/binary"
"fmt"
"path/filepath"

"github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/types"
"github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils"
)

const globalBlocksDir = "globalblocks"

// LoadedGlobalBlock is a block loaded from disk during state restoration.
type LoadedGlobalBlock struct {
Number types.GlobalBlockNumber
Block *types.Block
}

// numberedBlockEntry pairs a GlobalBlockNumber with a Block for WAL storage.
// Block doesn't carry its GlobalBlockNumber (that's assigned by the ordering
// layer), so we embed it in each WAL entry to make entries self-describing.
// A bit hacky to write GlobalBlockNumber into the entry, but we will have
// real storage solutions soon.
type numberedBlockEntry struct {
Number types.GlobalBlockNumber
Block *types.Block
}

// numberedBlockCodec serializes numberedBlockEntry as [8-byte LE number][proto block].
type numberedBlockCodec struct{}

func (numberedBlockCodec) Marshal(e numberedBlockEntry) []byte {
var buf [8]byte
binary.LittleEndian.PutUint64(buf[:], uint64(e.Number))
return append(buf[:], types.BlockConv.Marshal(e.Block)...)
}

func (numberedBlockCodec) Unmarshal(raw []byte) (numberedBlockEntry, error) {
if len(raw) < 8 {
return numberedBlockEntry{}, fmt.Errorf("global block entry too short: %d bytes", len(raw))
}
n := types.GlobalBlockNumber(binary.LittleEndian.Uint64(raw[:8]))
block, err := types.BlockConv.Unmarshal(raw[8:])
if err != nil {
return numberedBlockEntry{}, fmt.Errorf("unmarshal block %d: %w", n, err)
}
return numberedBlockEntry{Number: n, Block: block}, nil
}

// globalBlockState is the mutable state protected by GlobalBlockPersister's mutex.
type globalBlockState struct {
iw utils.Option[*indexedWAL[numberedBlockEntry]]
next types.GlobalBlockNumber
}

func (s *globalBlockState) persistBlock(n types.GlobalBlockNumber, block *types.Block) error {
if n < s.next {
return nil
}
if n > s.next {
return fmt.Errorf("global block %d out of sequence (next=%d)", n, s.next)
}
if iw, ok := s.iw.Get(); ok {
if err := iw.Write(numberedBlockEntry{Number: n, Block: block}); err != nil {
return fmt.Errorf("persist global block %d: %w", n, err)
}
}
s.next = n + 1
return nil
}

func (s *globalBlockState) truncateBefore(n types.GlobalBlockNumber) error {
if n == 0 {
return nil
}
iw, ok := s.iw.Get()
if n >= s.next {
s.next = n
if ok && iw.Count() > 0 {
if err := iw.TruncateAll(); err != nil {
return err
}
}
return nil
}
if !ok || iw.Count() == 0 {
return nil
}
firstGlobal := s.next - types.GlobalBlockNumber(iw.Count())
if n <= firstGlobal {
return nil
}
walIdx := iw.FirstIdx() + uint64(n-firstGlobal)
if err := iw.TruncateBefore(walIdx, func(entry numberedBlockEntry) error {
if entry.Number != n {
return fmt.Errorf("global block at WAL index %d has number %d, expected %d (index mapping broken)", walIdx, entry.Number, n)
}
return nil
}); err != nil {
return fmt.Errorf("truncate global block WAL before %d: %w", n, err)
}
return nil
}

// GlobalBlockPersister manages persistence of globally-ordered blocks using a WAL.
// Each entry embeds its GlobalBlockNumber since Block doesn't carry it.
// When stateDir is None, all disk I/O is skipped (no-op mode).
// All public methods are safe for concurrent use.
type GlobalBlockPersister struct {
state utils.Mutex[*globalBlockState]
loadedBlocks []LoadedGlobalBlock // set once at construction, cleared after first read
}

// NewGlobalBlockPersister opens (or creates) a WAL in the globalblocks/ subdir
// and replays all persisted entries. Loaded blocks are available via
// LoadedBlocks() (one-shot). When stateDir is None, returns a no-op persister.
func NewGlobalBlockPersister(stateDir utils.Option[string]) (*GlobalBlockPersister, error) {
sd, ok := stateDir.Get()
if !ok {
return &GlobalBlockPersister{state: utils.NewMutex(&globalBlockState{})}, nil
}
dir := filepath.Join(sd, globalBlocksDir)
iw, err := openIndexedWAL(dir, numberedBlockCodec{})
if err != nil {
return nil, fmt.Errorf("open global block WAL in %s: %w", dir, err)
}

s := &globalBlockState{iw: utils.Some(iw)}
loaded, err := loadAllGlobalBlocks(s)
if err != nil {
_ = iw.Close()
return nil, err
}
if len(loaded) > 0 {
s.next = loaded[len(loaded)-1].Number + 1
}
return &GlobalBlockPersister{
state: utils.NewMutex(s),
loadedBlocks: loaded,
}, nil
}

// LoadedBlocks returns blocks replayed from the WAL at startup and clears
// them from memory. Only meaningful on the first call after construction.
func (gp *GlobalBlockPersister) LoadedBlocks() []LoadedGlobalBlock {
loaded := gp.loadedBlocks
gp.loadedBlocks = nil
return loaded
}

// LoadNext returns the next GlobalBlockNumber expected by the persister.
func (gp *GlobalBlockPersister) LoadNext() types.GlobalBlockNumber {
for s := range gp.state.Lock() {
return s.next
}
panic("unreachable")
}

// PersistBlock appends a block to the WAL. Duplicates are silently ignored.
// Gaps return an error.
func (gp *GlobalBlockPersister) PersistBlock(n types.GlobalBlockNumber, block *types.Block) error {
for s := range gp.state.Lock() {
return s.persistBlock(n, block)
}
panic("unreachable")
}

// TruncateBefore removes all entries before n from the WAL.
func (gp *GlobalBlockPersister) TruncateBefore(n types.GlobalBlockNumber) error {
for s := range gp.state.Lock() {
return s.truncateBefore(n)
}
panic("unreachable")
}

// Close shuts down the WAL.
func (gp *GlobalBlockPersister) Close() error {
for s := range gp.state.Lock() {
iw, ok := s.iw.Get()
if !ok {
return nil
}
s.iw = utils.None[*indexedWAL[numberedBlockEntry]]()
return iw.Close()
}
panic("unreachable")
}

func loadAllGlobalBlocks(s *globalBlockState) ([]LoadedGlobalBlock, error) {
iw, ok := s.iw.Get()
if !ok {
return nil, nil
}
entries, err := iw.ReadAll()
if err != nil {
return nil, err
}
loaded := make([]LoadedGlobalBlock, 0, len(entries))
for i, entry := range entries {
if i > 0 && entry.Number != loaded[i-1].Number+1 {
return nil, fmt.Errorf("gap in global blocks: number %d follows %d", entry.Number, loaded[i-1].Number)
}
loaded = append(loaded, LoadedGlobalBlock(entry))
}
return loaded, nil
}
Loading
Loading