Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 23 additions & 2 deletions sc/memiavl/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"os"
"path/filepath"
"runtime"
"sort"
"strconv"
"strings"
Expand Down Expand Up @@ -85,11 +86,20 @@ const (
)

func OpenDB(logger logger.Logger, targetVersion int64, opts Options) (*DB, error) {
restartTime := time.Now()

var (
err error
fileLock FileLock
)

// Maximize CPU utilization: set GOMAXPROCS to use all available cores
// This is critical for I/O-bound workloads where goroutines wait for disk
// By using all cores, OS can schedule other goroutines while some wait for I/O
numCPU := runtime.NumCPU()
prevGOMAXPROCS := runtime.GOMAXPROCS(numCPU)
logger.Info("Set GOMAXPROCS for parallel tree processing", "prev", prevGOMAXPROCS, "new", numCPU)

if err := opts.Validate(); err != nil {
return nil, fmt.Errorf("invalid commit store options: %w", err)
}
Expand Down Expand Up @@ -128,6 +138,10 @@ func OpenDB(logger logger.Logger, targetVersion int64, opts Options) (*DB, error
if err != nil {
return nil, err
}
for _, tree := range mtree.trees {
tree.snapshot.nodesMap.PrepareForRandomRead()
tree.snapshot.leavesMap.PrepareForRandomRead()
}

// Create rlog manager and open the rlog file
streamHandler, err := changelog.NewStream(logger, utils.GetChangelogPath(opts.Dir), changelog.Config{
Expand All @@ -141,12 +155,16 @@ func OpenDB(logger logger.Logger, targetVersion int64, opts Options) (*DB, error

if targetVersion == 0 || targetVersion > mtree.Version() {
logger.Info("Start catching up and replaying the MemIAVL changelog file")
if err := mtree.Catchup(streamHandler, targetVersion); err != nil {
if err := mtree.CatchupWithStartTime(streamHandler, targetVersion, restartTime); err != nil {
return nil, errorutils.Join(err, streamHandler.Close())
}
logger.Info(fmt.Sprintf("Finished the replay and caught up to version %d", targetVersion))
}

// Print total startup time
totalElapsed := time.Since(restartTime).Seconds()
fmt.Printf("[STARTUP] Total time: %.1fs (load + replay)\n", totalElapsed)

if opts.LoadForOverwriting && targetVersion > 0 {
currentSnapshot, err := os.Readlink(currentPath(opts.Dir))
if err != nil {
Expand Down Expand Up @@ -212,9 +230,13 @@ func OpenDB(logger logger.Logger, targetVersion int64, opts Options) (*DB, error
if db.streamHandler == nil {
fmt.Println("[Debug] DB steam handler is nil??")
}

return db, nil
}

// Prefetch functions removed: EBS disk IO too slow (35min for 60GB unacceptable)
// Using batched sorted replay in multitree.go instead

func removeTmpDirs(rootDir string) error {
entries, err := os.ReadDir(rootDir)
if err != nil {
Expand Down Expand Up @@ -388,7 +410,6 @@ func (db *DB) checkBackgroundSnapshotRewrite() error {
return fmt.Errorf("switch multitree failed: %w", err)
}
db.logger.Info("switched to new memiavl snapshot", "version", db.MultiTree.Version())

db.pruneSnapshots()
default:
}
Expand Down
24 changes: 24 additions & 0 deletions sc/memiavl/mmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/ledgerwatch/erigon-lib/mmap"
"github.com/sei-protocol/sei-db/common/errors"
"golang.org/x/sys/unix"
)

// MmapFile manage the resources of a mmap-ed file
Expand All @@ -18,6 +19,16 @@ type MmapFile struct {
// Open openes the file and create the mmap.
// the mmap is created with flags: PROT_READ, MAP_SHARED, MADV_RANDOM.
func NewMmap(path string) (*MmapFile, error) {
return newMmapInternal(path, true)
}

// NewMmapNoPreload opens the file and creates mmap without prefetching hints
// Used for small/inactive trees to avoid unnecessary OS prefetching
func NewMmapNoPreload(path string) (*MmapFile, error) {
return newMmapInternal(path, false)
}

func newMmapInternal(path string, withPrefetch bool) (*MmapFile, error) {
file, err := os.Open(path)
if err != nil {
return nil, err
Expand All @@ -29,13 +40,26 @@ func NewMmap(path string) (*MmapFile, error) {
return nil, err
}

// Apply madvise hints based on prefetch flag
if len(data) > 0 {
if withPrefetch {
// Override default MADV_RANDOM with SEQUENTIAL + WILLNEED to favor prefetching
_ = unix.Madvise(data, unix.MADV_SEQUENTIAL)
_ = unix.Madvise(data, unix.MADV_WILLNEED)
}
}

return &MmapFile{
file: file,
data: data,
handle: handle,
}, nil
}

func (m *MmapFile) PrepareForRandomRead() {
_ = unix.Madvise(m.data, unix.MADV_RANDOM)
}

// Close closes the file and mmap handles
func (m *MmapFile) Close() error {
var err error
Expand Down
59 changes: 55 additions & 4 deletions sc/memiavl/multitree.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"os"
"path/filepath"
"sort"
"time"

"github.com/alitto/pond"
"github.com/cosmos/iavl"
Expand Down Expand Up @@ -65,11 +66,16 @@ func NewEmptyMultiTree(initialVersion uint32, cacheSize int) *MultiTree {
}

func LoadMultiTree(dir string, zeroCopy bool, cacheSize int) (*MultiTree, error) {
loadStartTime := time.Now()

metadata, err := readMetadata(dir)
if err != nil {
return nil, err
}

// Print snapshot version information
fmt.Printf("[SNAPSHOT] Loading snapshot version: %d from directory: %s\n", metadata.CommitInfo.Version, dir)

entries, err := os.ReadDir(dir)
if err != nil {
return nil, err
Expand All @@ -83,13 +89,17 @@ func LoadMultiTree(dir string, zeroCopy bool, cacheSize int) (*MultiTree, error)
}
name := e.Name()
treeNames = append(treeNames, name)
fmt.Printf("[LOADING] Opening snapshot for tree: %s\n", name)
snapshot, err := OpenSnapshot(filepath.Join(dir, name))
if err != nil {
return nil, err
}
treeMap[name] = NewFromSnapshot(snapshot, zeroCopy, cacheSize)
}

loadElapsed := time.Since(loadStartTime).Seconds()
fmt.Printf("[LOADING] All %d trees loaded in %.1fs\n", len(treeNames), loadElapsed)

slices.Sort(treeNames)

trees := make([]NamedTree, len(treeNames))
Expand Down Expand Up @@ -224,9 +234,12 @@ func (t *MultiTree) ApplyUpgrades(upgrades []*proto.TreeNameUpgrade) error {
}
t.trees[i].Name = upgrade.Name
default:
// add tree
// add tree (dynamically created during replay, e.g., acc/bank/evm at ~216K)
tree := NewWithInitialVersion(uint32(utils.NextVersion(t.Version(), t.initialVersion)))
t.trees = append(t.trees, NamedTree{Tree: tree, Name: upgrade.Name})
newTree := NamedTree{Tree: tree, Name: upgrade.Name}
t.trees = append(t.trees, newTree)

tree.startBackgroundWriteLargeBuffer(100, upgrade.Name)
}
}

Expand Down Expand Up @@ -315,6 +328,13 @@ func (t *MultiTree) UpdateCommitInfo() {

// Catchup replay the new entries in the Rlog file on the tree to catch up to the target or latest version.
func (t *MultiTree) Catchup(stream types.Stream[proto.ChangelogEntry], endVersion int64) error {
return t.CatchupWithStartTime(stream, endVersion, time.Time{})
}

// CatchupWithStartTime is like Catchup but also tracks total time from process start
func (t *MultiTree) CatchupWithStartTime(stream types.Stream[proto.ChangelogEntry], endVersion int64, processStartTime time.Time) error {
replayStartTime := time.Now()
var perTreeReplayLatency = make(map[string]int64)
lastIndex, err := stream.LastOffset()
if err != nil {
return fmt.Errorf("read rlog last index failed, %w", err)
Expand All @@ -339,17 +359,28 @@ func (t *MultiTree) Catchup(stream types.Stream[proto.ChangelogEntry], endVersio
return fmt.Errorf("target index %d is in the future, latest index: %d", endIndex, lastIndex)
}

// Start async write workers for each tree with LARGE buffer for cold start
// 128GB machine: we can be very aggressive with buffer sizes (~60GB total)
// This allows publisher to complete quickly without blocking on slow trees
fmt.Printf("[REPLAY INIT] Starting background workers for %d existing trees\n", len(t.trees))
for _, namedTree := range t.trees {
namedTree.Tree.startBackgroundWriteLargeBuffer(100, namedTree.Name)
}

var replayCount = 0
err = stream.Replay(firstIndex, endIndex, func(index uint64, entry proto.ChangelogEntry) error {
if err := t.ApplyUpgrades(entry.Upgrades); err != nil {
return err
}
updatedTrees := make(map[string]bool)
for _, cs := range entry.Changesets {
startTime := time.Now()
treeName := cs.Name
t.TreeByName(treeName).ApplyChangeSetAsync(cs.Changeset)
updatedTrees[treeName] = true
perTreeReplayLatency[treeName] += time.Since(startTime).Nanoseconds()
}
// For trees without changes, still need to bump version
for _, tree := range t.trees {
if _, found := updatedTrees[tree.Name]; !found {
tree.ApplyChangeSetAsync(iavl.ChangeSet{})
Expand All @@ -359,18 +390,38 @@ func (t *MultiTree) Catchup(stream types.Stream[proto.ChangelogEntry], endVersio
t.lastCommitInfo.StoreInfos = []proto.StoreInfo{}
replayCount++
if replayCount%1000 == 0 {
fmt.Printf("Replayed %d changelog entries\n", replayCount)
fmt.Printf("Replayed %d changelog entries \n", replayCount)
}
return nil
})

// Wait for all async writes to complete
fmt.Printf("Waiting for all trees to complete processing...\n")

for _, tree := range t.trees {
startTime := time.Now()
tree.WaitToCompleteAsyncWrite()
perTreeReplayLatency[tree.Name] += time.Since(startTime).Nanoseconds()
}

if err != nil {
return err
}

for _, tree := range t.trees {
fmt.Printf("[Replay] Tree %s took %dms to replay changelog\n", tree.Name, perTreeReplayLatency[tree.Name]/1000000)
}

// Print final summary with timing
replayElapsed := time.Since(replayStartTime).Seconds()
if !processStartTime.IsZero() {
totalCatchupTime := time.Since(processStartTime).Seconds()
fmt.Printf("[REPLAY] Total replay %d entries in %.1fs (%.1f entries/sec) | Total catchup process time: %.1fs\n",
replayCount, replayElapsed, float64(replayCount)/replayElapsed, totalCatchupTime)
} else {
fmt.Printf("[REPLAY] Total replay %d entries in %.1fs (%.1f entries/sec)\n",
replayCount, replayElapsed, float64(replayCount)/replayElapsed)
}

t.UpdateCommitInfo()
return nil
}
Expand Down
15 changes: 15 additions & 0 deletions sc/memiavl/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package memiavl

import (
"errors"
"runtime"

"github.com/sei-protocol/sei-db/config"
)
Expand Down Expand Up @@ -35,6 +36,14 @@ type Options struct {

// Limit the number of concurrent snapshot writers
SnapshotWriterLimit int

// PrefetchOnStartup enables sequential page-cache warmup of snapshot files
// (nodes/leaves and, optionally, kvs) before replaying the changelog.
PrefetchOnStartup bool
// PrefetchIncludeKVs controls whether to include the large kvs file in the warmup.
PrefetchIncludeKVs bool
// PrefetchConcurrency limits number of stores warmed in parallel; <=0 defaults to NumCPU.
PrefetchConcurrency int
}

func (opts Options) Validate() error {
Expand All @@ -61,4 +70,10 @@ func (opts *Options) FillDefaults() {
if opts.CacheSize < 0 {
opts.CacheSize = config.DefaultCacheSize
}

opts.SnapshotKeepRecent = config.DefaultSnapshotKeepRecent

if opts.PrefetchConcurrency <= 0 {
opts.PrefetchConcurrency = runtime.NumCPU()
}
}
42 changes: 42 additions & 0 deletions sc/memiavl/resid.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package memiavl

import (
"fmt"
"runtime"
"unsafe"

"golang.org/x/sys/unix"
)

// residentRatio returns fraction of pages resident in the page cache for b.
// Uses mincore on Linux; on other platforms returns an unsupported error.
func residentRatio(b []byte) (float64, error) {
if len(b) == 0 {
return 1, nil
}
if runtime.GOOS != "linux" {
return 0, fmt.Errorf("residentRatio unsupported on %s", runtime.GOOS)
}

pageSize := unix.Getpagesize()
numPages := (len(b) + pageSize - 1) / pageSize
if numPages == 0 {
return 1, nil
}
vec := make([]byte, numPages)

addr := uintptr(unsafe.Pointer(&b[0]))
length := uintptr(len(b))
_, _, errno := unix.Syscall(unix.SYS_MINCORE, addr, length, uintptr(unsafe.Pointer(&vec[0])))
if errno != 0 {
return 0, errno
}

present := 0
for _, v := range vec {
if v&1 == 1 {
present++
}
}
return float64(present) / float64(len(vec)), nil
}
Loading