Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,7 @@ func NewBee(
}
}(probe)

stamperStore, err := InitStamperStore(logger, o.DataDir, stateStore)
stamperStore, wasClean, err := InitStamperStore(logger, o.DataDir, stateStore)
if err != nil {
return nil, fmt.Errorf("failed to initialize stamper store: %w", err)
}
Expand Down Expand Up @@ -660,7 +660,7 @@ func NewBee(
b.p2pService = p2ps
b.p2pHalter = p2ps

post, err := postage.NewService(logger, stamperStore, batchStore, chainID)
post, err := postage.NewService(logger, stamperStore, batchStore, chainID, wasClean)
if err != nil {
return nil, fmt.Errorf("postage service: %w", err)
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/node/statestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func InitStateStore(logger log.Logger, dataDir string, cacheCapacity uint64) (st
} else {
dataDir = filepath.Join(dataDir, "statestore")
}
ldb, err := leveldbstore.New(dataDir, nil)
ldb, _, err := leveldbstore.New(dataDir, nil)
if err != nil {
return nil, nil, err
}
Expand All @@ -45,18 +45,18 @@ func InitStateStore(logger log.Logger, dataDir string, cacheCapacity uint64) (st
// InitStamperStore will create new stamper store with the given path to the
// data directory. When given an empty directory path, the function will instead
// initialize an in-memory state store that will not be persisted.
func InitStamperStore(logger log.Logger, dataDir string, stateStore storage.StateStorer) (storage.Store, error) {
// The returned bool indicates whether the previous shutdown was unclean (dirty).
func InitStamperStore(logger log.Logger, dataDir string, stateStore storage.StateStorer) (storage.Store, bool, error) {
if dataDir == "" {
logger.Warning("using in-mem stamper store, no node state will be persisted")
} else {
dataDir = filepath.Join(dataDir, "stamperstore")
}
stamperStore, err := leveldbstore.New(dataDir, nil)
store, dirty, err := leveldbstore.New(dataDir, nil)
if err != nil {
return nil, err
return nil, false, err
}

return stamperStore, nil
return store, dirty, nil
}

const (
Expand Down
95 changes: 88 additions & 7 deletions pkg/postage/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"io"
"math/big"
"sync"
"time"

"github.com/ethersphere/bee/v2/pkg/log"
"github.com/ethersphere/bee/v2/pkg/storage"
Expand All @@ -27,6 +28,11 @@ const (
blockThreshold = 10
)

const (
// stampIssuerSaveInterval is how often dirty stamp issuers are flushed to disk.
stampIssuerSaveInterval = time.Minute
)

var (
// ErrNotFound is the error returned when issuer with given batch ID does not exist.
ErrNotFound = errors.New("not found")
Expand Down Expand Up @@ -54,18 +60,24 @@ type service struct {
postageStore Storer
chainID int64
issuers []*StampIssuer

quit chan struct{}
done chan struct{}
}

// NewService constructs a new Service.
func NewService(logger log.Logger, store storage.Store, postageStore Storer, chainID int64) (Service, error) {
// NewService constructs a new Service. wasClean indicates whether the previous
// shutdown was graceful; if false, bucket counts are recovered from the stamp store.
func NewService(logger log.Logger, store storage.Store, postageStore Storer, chainID int64, wasClean bool) (Service, error) {
s := &service{
logger: logger.WithName(loggerName).Register(),
store: store,
postageStore: postageStore,
chainID: chainID,
quit: make(chan struct{}),
done: make(chan struct{}),
}

return s, s.store.Iterate(
err := s.store.Iterate(
storage.Query{
Factory: func() storage.Item {
return new(StampIssuerItem)
Expand All @@ -75,6 +87,70 @@ func NewService(logger log.Logger, store storage.Store, postageStore Storer, cha
_ = s.add(issuer)
return false, nil
})
if err != nil {
return nil, err
}

if !wasClean {
s.logger.Info("recovering bucket counts from stamper store")
if err := s.recoverBuckets(); err != nil {
s.logger.Error(err, "postage stamper store recovery failed")
}
}

go s.run()

return s, nil
}

func (s *service) recoverBuckets() error {
s.mtx.Lock()
defer s.mtx.Unlock()

return s.store.Iterate(
storage.Query{
Factory: func() storage.Item { return new(StampItem) },
}, func(result storage.Result) (bool, error) {
item := result.Entry.(*StampItem)
for _, issuer := range s.issuers {
if bytes.Equal(issuer.data.BatchID, item.BatchID) {
if err := issuer.recover(item.BatchIndex); err != nil {
s.logger.Error(err, "postage recovery of bucket count failed")
} else {
issuer.dirty = true
}
break
}
}
return false, nil
})
}

func (s *service) run() {
defer close(s.done)
// using 1 minute to significantly reduce disk writes
ticker := time.NewTicker(stampIssuerSaveInterval)
defer ticker.Stop()

for {
select {
case <-s.quit:
return
case <-ticker.C:
s.mtx.Lock()
issuers := make([]*StampIssuer, len(s.issuers))
copy(issuers, s.issuers)
s.mtx.Unlock()

for _, issuer := range issuers {
if issuer.isDirty() {
if err := s.save(issuer); err != nil {
s.logger.Error(err, "failed to save stamp issuer")
}
}
}
}
}
}

// Add adds a stamp issuer to the active issuers.
Expand Down Expand Up @@ -163,9 +239,8 @@ func (ps *service) GetStampIssuer(batchID []byte) (*StampIssuer, func() error, e
return nil, nil, ErrNotUsable
}
return st, func() error {
ps.mtx.Lock()
defer ps.mtx.Unlock()
return ps.save(st)
st.setDirty(true)
return nil
}, nil
}
}
Expand All @@ -182,15 +257,21 @@ func (ps *service) save(st *StampIssuer) error {
}); err != nil {
return err
}
st.dirty = false
return nil
}

func (ps *service) Close() error {
close(ps.quit)
<-ps.done

ps.mtx.Lock()
defer ps.mtx.Unlock()
var err error
for _, issuer := range ps.issuers {
err = errors.Join(err, ps.save(issuer))
if issuer.isDirty() {
err = errors.Join(err, ps.save(issuer))
}
}
return err
}
Expand Down
122 changes: 118 additions & 4 deletions pkg/postage/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestSaveLoad(t *testing.T) {
defer store.Close()
pstore := pstoremock.New()
saved := func(id int64) postage.Service {
ps, err := postage.NewService(log.Noop, store, pstore, id)
ps, err := postage.NewService(log.Noop, store, pstore, id, true)
if err != nil {
t.Fatal(err)
}
Expand All @@ -48,7 +48,7 @@ func TestSaveLoad(t *testing.T) {
return ps
}
loaded := func(id int64) postage.Service {
ps, err := postage.NewService(log.Noop, store, pstore, id)
ps, err := postage.NewService(log.Noop, store, pstore, id, true)
if err != nil {
t.Fatal(err)
}
Expand All @@ -57,6 +57,7 @@ func TestSaveLoad(t *testing.T) {
test := func(id int64) {
psS := saved(id)
psL := loaded(id)
defer psL.Close()

sMap := map[string]struct{}{}
stampIssuers := psS.StampIssuers()
Expand Down Expand Up @@ -87,10 +88,12 @@ func TestGetStampIssuer(t *testing.T) {
}
validBlockNumber := testChainState.Block - uint64(postage.BlockThreshold+1)
pstore := pstoremock.New(pstoremock.WithChainState(testChainState))
ps, err := postage.NewService(log.Noop, store, pstore, chainID)
ps, err := postage.NewService(log.Noop, store, pstore, chainID, true)
if err != nil {
t.Fatal(err)
}
defer ps.Close()

ids := make([][]byte, 8)
for i := range ids {
id := make([]byte, 32)
Expand Down Expand Up @@ -224,7 +227,7 @@ func TestSetExpired(t *testing.T) {
return bytes.Equal(b, batch), nil
}))

ps, err := postage.NewService(log.Noop, store, pstore, 1)
ps, err := postage.NewService(log.Noop, store, pstore, 1, true)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -298,3 +301,114 @@ func TestSetExpired(t *testing.T) {

testutil.CleanupCloser(t, ps)
}

// TestCrashRecovery verifies that bucket counts are restored from stamp items
// when the service starts after an unclean shutdown (wasClean=false).
func TestCrashRecovery(t *testing.T) {
t.Parallel()

store := inmemstore.New()
defer store.Close()
pstore := pstoremock.New()

issuer := newTestStampIssuer(t, 1000)
batchID := issuer.ID()

// Pick two random chunk addresses and compute their bucket indices.
chunkAddr0 := swarm.RandAddress(t)
chunkAddr1 := swarm.RandAddress(t)
bIdx0 := postage.ToBucket(issuer.BucketDepth(), chunkAddr0)
bIdx1 := postage.ToBucket(issuer.BucketDepth(), chunkAddr1)

// Write StampItems directly, simulating stamps issued before a crash
// without the issuer bucket state being saved.
// bIdx0: issued at collision count 2 → bucket should recover to 3
// bIdx1: issued at collision count 0 → bucket should recover to 1
items := []*postage.StampItem{
postage.NewStampItem().WithBatchID(batchID).WithChunkAddress(chunkAddr0).WithBatchIndex(postage.IndexToBytes(bIdx0, 2)),
postage.NewStampItem().WithBatchID(batchID).WithChunkAddress(chunkAddr1).WithBatchIndex(postage.IndexToBytes(bIdx1, 0)),
}
for _, item := range items {
if err := store.Put(item); err != nil {
t.Fatal(err)
}
}

// Save the issuer with zero bucket counts to the store.
ps, err := postage.NewService(log.Noop, store, pstore, 1, true)
if err != nil {
t.Fatal(err)
}
if err := ps.Add(issuer); err != nil {
t.Fatal(err)
}
if err := ps.Close(); err != nil {
t.Fatal(err)
}

Comment thread
martinconic marked this conversation as resolved.
// Verify that the issuer on disk still has zero bucket counts (i.e., recovery
// has not happened yet). Open with wasClean=true to skip recovery.
psCheck, err := postage.NewService(log.Noop, store, pstore, 1, true)
if err != nil {
t.Fatal(err)
}
checkIssuers := psCheck.StampIssuers()
if len(checkIssuers) != 1 {
t.Fatalf("pre-recovery check: expected 1 issuer, got %d", len(checkIssuers))
}
checkBuckets := checkIssuers[0].Buckets()
if checkBuckets[bIdx0] != 0 {
t.Errorf("pre-recovery check: bucket %d: want 0, got %d", bIdx0, checkBuckets[bIdx0])
}
if checkBuckets[bIdx1] != 0 {
t.Errorf("pre-recovery check: bucket %d: want 0, got %d", bIdx1, checkBuckets[bIdx1])
}
if err := psCheck.Close(); err != nil {
t.Fatal(err)
}

// Restart with wasClean=false — should trigger bucket recovery.
ps2, err := postage.NewService(log.Noop, store, pstore, 1, false)
if err != nil {
t.Fatal(err)
}

issuers := ps2.StampIssuers()
if len(issuers) != 1 {
t.Fatalf("expected 1 issuer, got %d", len(issuers))
}

buckets := issuers[0].Buckets()
if buckets[bIdx0] != 3 {
t.Errorf("bucket %d: want 3, got %d", bIdx0, buckets[bIdx0])
}
if buckets[bIdx1] != 1 {
t.Errorf("bucket %d: want 1, got %d", bIdx1, buckets[bIdx1])
}

// Clean shutdown — recovered counts must be flushed to disk.
if err := ps2.Close(); err != nil {
t.Fatal(err)
}

// Restart with wasClean=true — recovery is skipped, but counts must still
// be correct because ps2.Close() persisted the recovered state.
ps3, err := postage.NewService(log.Noop, store, pstore, 1, true)
if err != nil {
t.Fatal(err)
}
defer ps3.Close()

issuers3 := ps3.StampIssuers()
if len(issuers3) != 1 {
t.Fatalf("post-recovery clean restart: expected 1 issuer, got %d", len(issuers3))
}

buckets3 := issuers3[0].Buckets()
if buckets3[bIdx0] != 3 {
t.Errorf("post-recovery clean restart: bucket %d: want 3, got %d", bIdx0, buckets3[bIdx0])
}
if buckets3[bIdx1] != 1 {
t.Errorf("post-recovery clean restart: bucket %d: want 1, got %d", bIdx1, buckets3[bIdx1])
}
}
Loading
Loading