Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions block/internal/executing/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,23 @@ func (e *Executor) produceBlock() error {
// Update in-memory state after successful commit
e.setLastState(newState)

// Run height-based pruning of stored block data if enabled. This is a
// best-effort background maintenance step and should not cause block
// production to fail, but it does run in the critical path and may add
// some latency when large ranges are pruned.
if e.config.Node.PruningEnabled && e.config.Node.PruningKeepRecent > 0 && e.config.Node.PruningInterval > 0 {
if newHeight%e.config.Node.PruningInterval == 0 {
// Compute the prune floor: all heights <= targetHeight are candidates
// for pruning of header/data/signature/index entries.
if newHeight > e.config.Node.PruningKeepRecent {
targetHeight := newHeight - e.config.Node.PruningKeepRecent
if err := e.store.PruneBlocks(e.ctx, targetHeight); err != nil {
e.logger.Error().Err(err).Uint64("target_height", targetHeight).Msg("failed to prune old block data")
}
}
}
}

// broadcast header and data to P2P network
g, ctx := errgroup.WithContext(e.ctx)
g.Go(func() error { return e.headerBroadcaster.WriteToStoreAndBroadcast(ctx, header) })
Expand Down
11 changes: 11 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,13 @@ type NodeConfig struct {
// Readiness / health configuration
ReadinessWindowSeconds uint64 `mapstructure:"readiness_window_seconds" yaml:"readiness_window_seconds" comment:"Time window in seconds used to calculate ReadinessMaxBlocksBehind based on block time. Default: 15 seconds."`
ReadinessMaxBlocksBehind uint64 `mapstructure:"readiness_max_blocks_behind" yaml:"readiness_max_blocks_behind" comment:"How many blocks behind best-known head the node can be and still be considered ready. 0 means must be exactly at head."`

// Pruning configuration
// When enabled, the node will periodically prune old block data (headers, data,
// signatures, and hash index) from the local store while keeping recent history.
PruningEnabled bool `mapstructure:"pruning_enabled" yaml:"pruning_enabled" comment:"Enable height-based pruning of stored block data. When disabled, all blocks are kept (archive mode)."`
PruningKeepRecent uint64 `mapstructure:"pruning_keep_recent" yaml:"pruning_keep_recent" comment:"Number of most recent blocks to retain. Older blocks will have their header/data/signature removed from the local store. 0 means keep all blocks."`
PruningInterval uint64 `mapstructure:"pruning_interval" yaml:"pruning_interval" comment:"Run pruning every N blocks. Must be >= 1 when pruning is enabled."`
}

// LogConfig contains all logging configuration parameters
Expand Down Expand Up @@ -341,6 +348,10 @@ func AddFlags(cmd *cobra.Command) {
cmd.Flags().Uint64(FlagReadinessWindowSeconds, def.Node.ReadinessWindowSeconds, "time window in seconds for calculating readiness threshold based on block time (default: 15s)")
cmd.Flags().Uint64(FlagReadinessMaxBlocksBehind, def.Node.ReadinessMaxBlocksBehind, "how many blocks behind best-known head the node can be and still be considered ready (0 = must be at head)")
cmd.Flags().Duration(FlagScrapeInterval, def.Node.ScrapeInterval.Duration, "interval at which the reaper polls the execution layer for new transactions")
// Pruning configuration flags
cmd.Flags().Bool(FlagPrefixEvnode+"node.pruning_enabled", def.Node.PruningEnabled, "enable height-based pruning of stored block data (headers, data, signatures, index)")
cmd.Flags().Uint64(FlagPrefixEvnode+"node.pruning_keep_recent", def.Node.PruningKeepRecent, "number of most recent blocks to retain when pruning is enabled (0 = keep all)")
cmd.Flags().Uint64(FlagPrefixEvnode+"node.pruning_interval", def.Node.PruningInterval, "run pruning every N blocks (must be >= 1 when pruning is enabled)")

// Data Availability configuration flags
cmd.Flags().String(FlagDAAddress, def.DA.Address, "DA address (host:port)")
Expand Down
3 changes: 3 additions & 0 deletions pkg/config/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ func DefaultConfig() Config {
ReadinessWindowSeconds: defaultReadinessWindowSeconds,
ReadinessMaxBlocksBehind: calculateReadinessMaxBlocksBehind(defaultBlockTime.Duration, defaultReadinessWindowSeconds),
ScrapeInterval: DurationWrapper{1 * time.Second},
PruningEnabled: false,
PruningKeepRecent: 0,
PruningInterval: 0,
},
DA: DAConfig{
Address: "http://localhost:7980",
Expand Down
6 changes: 6 additions & 0 deletions pkg/store/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ const (
// LastSubmittedHeaderHeightKey is the key used for persisting the last submitted header height in store.
LastSubmittedHeaderHeightKey = "last-submitted-header-height"

// LastPrunedBlockHeightKey is the metadata key used for persisting the last
// pruned block height in the store. All block data (header, data,
// signature, and hash index) for heights <= this value are considered
// pruned and may be missing from the store.
LastPrunedBlockHeightKey = "last-pruned-block-height"

headerPrefix = "h"
dataPrefix = "d"
signaturePrefix = "c"
Expand Down
79 changes: 79 additions & 0 deletions pkg/store/store.go
Copy link
Member

@julienrbrt julienrbrt Jan 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given we have a lot of different stores and keys. Are we planning to prune as well:

(and eventually ev-abci store)

It will be inconsistent if we don't allow height pruning of all those stores (at once).

Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,85 @@ func (s *DefaultStore) Rollback(ctx context.Context, height uint64, aggregator b
return nil
}

// PruneBlocks removes block data (header, data, signature, and hash index)
// up to and including the given height from the store. It does not modify
// the current chain height or any state snapshots.
//
// This method is intended for long-term storage reduction and is safe to
// call repeatedly with the same or increasing heights.
func (s *DefaultStore) PruneBlocks(ctx context.Context, height uint64) error {
batch, err := s.db.Batch(ctx)
if err != nil {
return fmt.Errorf("failed to create a new batch for pruning: %w", err)
}

// Track the last successfully pruned height so we can resume across restarts.
var lastPruned uint64
meta, err := s.GetMetadata(ctx, LastPrunedBlockHeightKey)
if err != nil {
if !errors.Is(err, ds.ErrNotFound) {
return fmt.Errorf("failed to get last pruned height: %w", err)
}
} else if len(meta) == heightLength {
lastPruned, err = decodeHeight(meta)
if err != nil {
return fmt.Errorf("failed to decode last pruned height: %w", err)
}
}

// Nothing new to prune.
if height <= lastPruned {
return nil
}

// Delete block data for heights in (lastPruned, height].
for h := lastPruned + 1; h <= height; h++ {
// Get header blob to compute the hash index key. If header is already
// missing (e.g. due to previous partial pruning), just skip this height.
headerBlob, err := s.db.Get(ctx, ds.NewKey(getHeaderKey(h)))
if err != nil {
if errors.Is(err, ds.ErrNotFound) {
continue
}
return fmt.Errorf("failed to get header at height %d during pruning: %w", h, err)
}

if err := batch.Delete(ctx, ds.NewKey(getHeaderKey(h))); err != nil {
return fmt.Errorf("failed to delete header at height %d during pruning: %w", h, err)
}

if err := batch.Delete(ctx, ds.NewKey(getDataKey(h))); err != nil {
if !errors.Is(err, ds.ErrNotFound) {
return fmt.Errorf("failed to delete data at height %d during pruning: %w", h, err)
}
}

if err := batch.Delete(ctx, ds.NewKey(getSignatureKey(h))); err != nil {
if !errors.Is(err, ds.ErrNotFound) {
return fmt.Errorf("failed to delete signature at height %d during pruning: %w", h, err)
}
}

headerHash := sha256.Sum256(headerBlob)
if err := batch.Delete(ctx, ds.NewKey(getIndexKey(headerHash[:]))); err != nil {
if !errors.Is(err, ds.ErrNotFound) {
return fmt.Errorf("failed to delete index for height %d during pruning: %w", h, err)
}
}
}

// Persist the updated last pruned height.
if err := batch.Put(ctx, ds.NewKey(getMetaKey(LastPrunedBlockHeightKey)), encodeHeight(height)); err != nil {
return fmt.Errorf("failed to update last pruned height: %w", err)
}

if err := batch.Commit(ctx); err != nil {
return fmt.Errorf("failed to commit pruning batch: %w", err)
}

return nil
}

const heightLength = 8

func encodeHeight(height uint64) []byte {
Expand Down
51 changes: 51 additions & 0 deletions pkg/store/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -734,6 +734,57 @@ func TestRollback(t *testing.T) {
require.Equal(rollbackToHeight, state.LastBlockHeight)
}

func TestPruneBlocks_RemovesOldBlockDataOnly(t *testing.T) {
t.Parallel()

ctx := context.Background()
ds, err := NewTestInMemoryKVStore()
require.NoError(t, err)

s := New(ds).(*DefaultStore)

// create and store a few blocks with headers, data, signatures and state
batch, err := s.NewBatch(ctx)
require.NoError(t, err)

var lastState types.State
for h := uint64(1); h <= 5; h++ {
header := &types.SignedHeader{Header: types.Header{BaseHeader: types.BaseHeader{Height: h}}}
data := &types.Data{}
sig := types.Signature([]byte{byte(h)})

require.NoError(t, batch.SaveBlockData(header, data, &sig))

// fake state snapshot per height
lastState = types.State{LastBlockHeight: h}
require.NoError(t, batch.UpdateState(lastState))
}
require.NoError(t, batch.SetHeight(5))
require.NoError(t, batch.Commit())

// prune everything up to height 3
require.NoError(t, s.PruneBlocks(ctx, 3))

// old block data should be gone
for h := uint64(1); h <= 3; h++ {
_, _, err := s.GetBlockData(ctx, h)
assert.Error(t, err, "expected block data at height %d to be pruned", h)
}

// recent block data should remain
for h := uint64(4); h <= 5; h++ {
_, _, err := s.GetBlockData(ctx, h)
assert.NoError(t, err, "expected block data at height %d to be kept", h)
}

// state snapshots are not pruned by PruneBlocks
for h := uint64(1); h <= 5; h++ {
st, err := s.GetStateAtHeight(ctx, h)
assert.NoError(t, err, "expected state at height %d to remain", h)
assert.Equal(t, h, st.LastBlockHeight)
}
}

// TestRollbackToSameHeight verifies that rollback to same height is a no-op
func TestRollbackToSameHeight(t *testing.T) {
t.Parallel()
Expand Down
19 changes: 18 additions & 1 deletion pkg/store/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,15 @@ type Batch interface {
}

// Store is minimal interface for storing and retrieving blocks, commits and state.
//
// It is composed from three concerns:
// - Reader: read access to blocks, state, and metadata
// - Rollback: consensus rollback logic (used for chain reorgs / recovery)
// - Pruner: long-term height-based pruning of historical block data
type Store interface {
Rollback
Reader
Rollback
Pruner

// SetMetadata saves arbitrary value in the store.
//
Expand Down Expand Up @@ -75,3 +81,14 @@ type Rollback interface {
// Aggregator is used to determine if the rollback is performed on the aggregator node.
Rollback(ctx context.Context, height uint64, aggregator bool) error
}

// Pruner provides long-term, height-based pruning of historical block data.
//
// Implementations SHOULD be idempotent and safe to call multiple times for
// the same or increasing target heights.
type Pruner interface {
// PruneBlocks removes block data (header, data, signature, and hash index)
// up to and including the given height from the store, without modifying
// state snapshots or the current chain height.
PruneBlocks(ctx context.Context, height uint64) error
}
Loading