Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions ss/pebbledb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,13 +176,15 @@
}

func (db *Database) SetLatestVersion(version int64) error {
fmt.Printf("[%d] SSDEBUG - SetLatestVersion version %d\n", time.Now().UnixNano(), version)

Check warning

Code scanning / CodeQL

Calling the system time Warning

Calling the system time may be a possible source of non-determinism
var ts [VersionSize]byte
binary.LittleEndian.PutUint64(ts[:], uint64(version))
err := db.storage.Set([]byte(latestVersionKey), ts[:], defaultWriteOpts)
return err
}

func (db *Database) GetLatestVersion() (int64, error) {
fmt.Printf("[%d] SSDEBUG - GetLatestVersion\n", time.Now().UnixNano())

Check warning

Code scanning / CodeQL

Calling the system time Warning

Calling the system time may be a possible source of non-determinism
bz, closer, err := db.storage.Get([]byte(latestVersionKey))
if err != nil {
if errors.Is(err, pebble.ErrNotFound) {
Expand All @@ -201,6 +203,7 @@
}

func (db *Database) SetEarliestVersion(version int64, ignoreVersion bool) error {
fmt.Printf("[%d] SSDEBUG - SetEarliestVersion version %d ignoreVersion %t\n", time.Now().UnixNano(), version, ignoreVersion)

Check warning

Code scanning / CodeQL

Calling the system time Warning

Calling the system time may be a possible source of non-determinism
if version > db.earliestVersion || ignoreVersion {
db.earliestVersion = version

Expand All @@ -212,10 +215,12 @@
}

func (db *Database) GetEarliestVersion() (int64, error) {
fmt.Printf("[%d] SSDEBUG - GetEarliestVersion\n", time.Now().UnixNano())

Check warning

Code scanning / CodeQL

Calling the system time Warning

Calling the system time may be a possible source of non-determinism
return db.earliestVersion, nil
}

func (db *Database) SetLastRangeHashed(latestHashed int64) error {
fmt.Printf("[%d] SSDEBUG - SetLastRangeHashed latestHashed %d\n", time.Now().UnixNano(), latestHashed)

Check warning

Code scanning / CodeQL

Calling the system time Warning

Calling the system time may be a possible source of non-determinism
var ts [VersionSize]byte
binary.LittleEndian.PutUint64(ts[:], uint64(latestHashed))

Expand All @@ -229,6 +234,7 @@

// GetLastRangeHashed returns the highest block that has been fully hashed in ranges.
func (db *Database) GetLastRangeHashed() (int64, error) {
fmt.Printf("[%d] SSDEBUG - GetLastRangeHashed\n", time.Now().UnixNano())

Check warning

Code scanning / CodeQL

Calling the system time Warning

Calling the system time may be a possible source of non-determinism
// Return the cached value
db.lastRangeHashedMu.RLock()
cachedValue := db.lastRangeHashedCache
Expand Down Expand Up @@ -258,11 +264,13 @@

// SetLatestKey sets the latest key processed during migration.
func (db *Database) SetLatestMigratedKey(key []byte) error {
fmt.Printf("[%d] SSDEBUG - SetLatestMigratedKey key %s (length %d)\n", time.Now().UnixNano(), string(key), len(key))

Check warning

Code scanning / CodeQL

Calling the system time Warning

Calling the system time may be a possible source of non-determinism
return db.storage.Set([]byte(latestMigratedKeyMetadata), key, defaultWriteOpts)
}

// GetLatestKey retrieves the latest key processed during migration.
func (db *Database) GetLatestMigratedKey() ([]byte, error) {
fmt.Printf("[%d] SSDEBUG - GetLatestMigratedKey\n", time.Now().UnixNano())

Check warning

Code scanning / CodeQL

Calling the system time Warning

Calling the system time may be a possible source of non-determinism
bz, closer, err := db.storage.Get([]byte(latestMigratedKeyMetadata))
if err != nil {
if errors.Is(err, pebble.ErrNotFound) {
Expand All @@ -276,11 +284,13 @@

// SetLatestModule sets the latest module processed during migration.
func (db *Database) SetLatestMigratedModule(module string) error {
fmt.Printf("[%d] SSDEBUG - SetLatestMigratedModule module %s\n", time.Now().UnixNano(), module)

Check warning

Code scanning / CodeQL

Calling the system time Warning

Calling the system time may be a possible source of non-determinism
return db.storage.Set([]byte(latestMigratedModuleMetadata), []byte(module), defaultWriteOpts)
}

// GetLatestModule retrieves the latest module processed during migration.
func (db *Database) GetLatestMigratedModule() (string, error) {
fmt.Printf("[%d] SSDEBUG - GetLatestMigratedModule\n", time.Now().UnixNano())

Check warning

Code scanning / CodeQL

Calling the system time Warning

Calling the system time may be a possible source of non-determinism
bz, closer, err := db.storage.Get([]byte(latestMigratedModuleMetadata))
if err != nil {
if errors.Is(err, pebble.ErrNotFound) {
Expand All @@ -293,6 +303,7 @@
}

func (db *Database) Has(storeKey string, version int64, key []byte) (bool, error) {
fmt.Printf("[%d] SSDEBUG - Has storeKey %s version %d key %s (length %d)\n", time.Now().UnixNano(), storeKey, version, string(key), len(key))

Check warning

Code scanning / CodeQL

Calling the system time Warning

Calling the system time may be a possible source of non-determinism
if version < db.earliestVersion {
return false, nil
}
Expand All @@ -306,6 +317,7 @@
}

func (db *Database) Get(storeKey string, targetVersion int64, key []byte) ([]byte, error) {
fmt.Printf("[%d] SSDEBUG - Get storeKey %s targetVersion %d key %s (length %d)\n", time.Now().UnixNano(), storeKey, targetVersion, string(key), len(key))

Check warning

Code scanning / CodeQL

Calling the system time Warning

Calling the system time may be a possible source of non-determinism
if targetVersion < db.earliestVersion {
return nil, nil
}
Expand Down Expand Up @@ -346,6 +358,7 @@
}

func (db *Database) ApplyChangeset(version int64, cs *proto.NamedChangeSet) error {
fmt.Printf("[%d] SSDEBUG - ApplyChangeset version %d store %s pairs length %d\n", time.Now().UnixNano(), version, cs.Name, len(cs.Changeset.Pairs))
// Check if version is 0 and change it to 1
// We do this specifically since keys written as part of genesis state come in as version 0
// But pebbledb treats version 0 as special, so apply the changeset at version 1 instead
Expand Down Expand Up @@ -377,6 +390,7 @@
}

func (db *Database) ApplyChangesetAsync(version int64, changesets []*proto.NamedChangeSet) error {
fmt.Printf("[%d] SSDEBUG - ApplyChangesetAsync version %d changesets length %d\n", time.Now().UnixNano(), version, len(changesets))
// Write to WAL first
if db.streamHandler != nil {
entry := proto.ChangelogEntry{
Expand Down Expand Up @@ -413,6 +427,7 @@
}

func (db *Database) computeMissingRanges(latestVersion int64) error {
fmt.Printf("[%d] SSDEBUG - computeMissingRanges latestVersion %d\n", time.Now().UnixNano(), latestVersion)
lastHashed, err := db.GetLastRangeHashed()
if err != nil {
return fmt.Errorf("failed to get last hashed range: %w", err)
Expand Down Expand Up @@ -446,6 +461,7 @@
}

func (db *Database) computeHashForRange(beginBlock, endBlock int64) error {
fmt.Printf("[%d] SSDEBUG - computeHashForRange beginBlock %d endBlock %d\n", time.Now().UnixNano(), beginBlock, endBlock)
chunkSize := endBlock - beginBlock + 1
if chunkSize <= 0 {
// Nothing to do
Expand Down Expand Up @@ -534,6 +550,7 @@
// it has been updated. This occurs when that module's keys are updated in between pruning runs, the node after is restarted.
// This is not a large issue given the next time that module is updated, it will be properly pruned thereafter.
func (db *Database) Prune(version int64) error {
fmt.Printf("[%d] SSDEBUG - Prune version %d\n", time.Now().UnixNano(), version)
earliestVersion := version + 1 // we increment by 1 to include the provided version

itr, err := db.storage.NewIter(nil)
Expand Down Expand Up @@ -639,6 +656,14 @@
}

func (db *Database) Iterator(storeKey string, version int64, start, end []byte) (types.DBIterator, error) {
var startStr, endStr string
if start != nil {
startStr = string(start)
}

Check warning

Code scanning / CodeQL

Calling the system time Warning

Calling the system time may be a possible source of non-determinism
if end != nil {
endStr = string(end)
}
fmt.Printf("[%d] SSDEBUG - Iterator storeKey %s version %d start %s (length %d) end %s (length %d)\n", time.Now().UnixNano(), storeKey, version, startStr, len(start), endStr, len(end))
if (start != nil && len(start) == 0) || (end != nil && len(end) == 0) {
return nil, errorutils.ErrKeyEmpty
}
Expand Down Expand Up @@ -677,6 +702,14 @@
}

func (db *Database) ReverseIterator(storeKey string, version int64, start, end []byte) (types.DBIterator, error) {
var startStr, endStr string
if start != nil {
startStr = string(start)
}

Check warning

Code scanning / CodeQL

Calling the system time Warning

Calling the system time may be a possible source of non-determinism
if end != nil {
endStr = string(end)
}
fmt.Printf("[%d] SSDEBUG - ReverseIterator storeKey %s version %d start %s (length %d) end %s (length %d)\n", time.Now().UnixNano(), storeKey, version, startStr, len(start), endStr, len(end))
if (start != nil && len(start) == 0) || (end != nil && len(end) == 0) {
return nil, errorutils.ErrKeyEmpty
}
Expand Down Expand Up @@ -705,6 +738,7 @@
// Import loads the initial version of the state in parallel with numWorkers goroutines
// TODO: Potentially add retries instead of panics
func (db *Database) Import(version int64, ch <-chan types.SnapshotNode) error {
fmt.Printf("[%d] SSDEBUG - Import version %d\n", time.Now().UnixNano(), version)
var wg sync.WaitGroup

worker := func() {
Expand Down Expand Up @@ -752,6 +786,7 @@
}

func (db *Database) RawImport(ch <-chan types.RawSnapshotNode) error {
fmt.Printf("[%d] SSDEBUG - RawImport\n", time.Now().UnixNano())
var wg sync.WaitGroup

worker := func() {
Expand Down Expand Up @@ -834,6 +869,7 @@

// RawIterate iterates over all keys and values for a store
func (db *Database) RawIterate(storeKey string, fn func(key []byte, value []byte, version int64) bool) (bool, error) {
fmt.Printf("[%d] SSDEBUG - RawIterate storeKey %s\n", time.Now().UnixNano(), storeKey)
// Iterate through all keys and values for a store
lowerBound := MVCCEncode(prependStoreKey(storeKey, nil), 0)
prefix := storePrefix(storeKey)
Expand Down Expand Up @@ -892,6 +928,7 @@
}

func (db *Database) DeleteKeysAtVersion(module string, version int64) error {
fmt.Printf("[%d] SSDEBUG - DeleteKeysAtVersion module %s version %d\n", time.Now().UnixNano(), module, version)

batch, err := NewBatch(db.storage, version)
if err != nil {
Expand Down Expand Up @@ -1024,6 +1061,7 @@

// WriteBlockRangeHash writes a hash for a range of blocks to the database
func (db *Database) WriteBlockRangeHash(storeKey string, beginBlockRange, endBlockRange int64, hash []byte) error {
fmt.Printf("[%d] SSDEBUG - WriteBlockRangeHash storeKey %s beginBlockRange %d endBlockRange %d hash %s (length %d)\n", time.Now().UnixNano(), storeKey, beginBlockRange, endBlockRange, string(hash), len(hash))
key := []byte(fmt.Sprintf(HashTpl, storeKey, beginBlockRange, endBlockRange))
err := db.storage.Set(key, hash, defaultWriteOpts)
if err != nil {
Expand Down
28 changes: 26 additions & 2 deletions ss/pebbledb/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import (
"bytes"
"fmt"
"time"

"github.com/cockroachdb/pebble"
"github.com/sei-protocol/sei-db/ss/types"
Expand All @@ -29,6 +30,14 @@
}

func newPebbleDBIterator(src *pebble.Iterator, prefix, mvccStart, mvccEnd []byte, version int64, earliestVersion int64, reverse bool) *iterator {
var startStr, endStr string
if mvccStart != nil {
startStr = string(mvccStart)
}
if mvccEnd != nil {
endStr = string(mvccEnd)
}
fmt.Printf("[%d] SSDEBUG - newPebbleDBIterator prefix %s (length %d) mvccStart %s (length %d) mvccEnd %s (length %d) version %d earliestVersion %d reverse %t\n", time.Now().UnixNano(), string(prefix), len(prefix), startStr, len(mvccStart), endStr, len(mvccEnd), version, earliestVersion, reverse)

Check warning

Code scanning / CodeQL

Calling the system time Warning

Calling the system time may be a possible source of non-determinism
// Return invalid iterator if requested iterator height is lower than earliest version after pruning
if version < earliestVersion {
return &iterator{
Expand Down Expand Up @@ -100,10 +109,12 @@
// Domain returns the domain of the iterator. The caller must not modify the
// return values.
func (itr *iterator) Domain() ([]byte, []byte) {
fmt.Printf("[%d] SSDEBUG - iterator.Domain\n", time.Now().UnixNano())

Check warning

Code scanning / CodeQL

Calling the system time Warning

Calling the system time may be a possible source of non-determinism
return itr.start, itr.end
}

func (itr *iterator) Key() []byte {
fmt.Printf("[%d] SSDEBUG - iterator.Key\n", time.Now().UnixNano())

Check warning

Code scanning / CodeQL

Calling the system time Warning

Calling the system time may be a possible source of non-determinism
itr.assertIsValid()

key, _, ok := SplitMVCCKey(itr.source.Key())
Expand All @@ -114,10 +125,13 @@
}

keyCopy := slices.Clone(key)
return keyCopy[len(itr.prefix):]
result := keyCopy[len(itr.prefix):]
fmt.Printf("[%d] SSDEBUG - iterator.Key returning key %s (length %d)\n", time.Now().UnixNano(), string(result), len(result))

Check warning

Code scanning / CodeQL

Calling the system time Warning

Calling the system time may be a possible source of non-determinism
return result
}

func (itr *iterator) Value() []byte {
fmt.Printf("[%d] SSDEBUG - iterator.Value\n", time.Now().UnixNano())

Check warning

Code scanning / CodeQL

Calling the system time Warning

Calling the system time may be a possible source of non-determinism
itr.assertIsValid()

val, _, ok := SplitMVCCKey(itr.source.Value())
Expand All @@ -127,10 +141,13 @@
panic(fmt.Sprintf("invalid PebbleDB MVCC value: %s", itr.source.Key()))
}

return slices.Clone(val)
result := slices.Clone(val)
fmt.Printf("[%d] SSDEBUG - iterator.Value returning value %s (length %d)\n", time.Now().UnixNano(), string(result), len(result))

Check warning

Code scanning / CodeQL

Calling the system time Warning

Calling the system time may be a possible source of non-determinism
return result
}

func (itr *iterator) nextForward() {
fmt.Printf("[%d] SSDEBUG - iterator.nextForward\n", time.Now().UnixNano())

Check warning

Code scanning / CodeQL

Calling the system time Warning

Calling the system time may be a possible source of non-determinism
if !itr.source.Valid() {
itr.valid = false
return
Expand Down Expand Up @@ -220,6 +237,7 @@
}

func (itr *iterator) nextReverse() {
fmt.Printf("[%d] SSDEBUG - iterator.nextReverse\n", time.Now().UnixNano())

Check warning

Code scanning / CodeQL

Calling the system time Warning

Calling the system time may be a possible source of non-determinism
if !itr.source.Valid() {
itr.valid = false
return
Expand Down Expand Up @@ -289,6 +307,7 @@
}

func (itr *iterator) Next() {
fmt.Printf("[%d] SSDEBUG - iterator.Next reverse %t\n", time.Now().UnixNano(), itr.reverse)

Check warning

Code scanning / CodeQL

Calling the system time Warning

Calling the system time may be a possible source of non-determinism
if itr.reverse {
itr.nextReverse()
} else {
Expand All @@ -297,6 +316,7 @@
}

func (itr *iterator) Valid() bool {
fmt.Printf("[%d] SSDEBUG - iterator.Valid\n", time.Now().UnixNano())

Check warning

Code scanning / CodeQL

Calling the system time Warning

Calling the system time may be a possible source of non-determinism
// once invalid, forever invalid
if !itr.valid || !itr.source.Valid() {
itr.valid = false
Expand All @@ -321,10 +341,12 @@
}

func (itr *iterator) Error() error {
fmt.Printf("[%d] SSDEBUG - iterator.Error\n", time.Now().UnixNano())

Check warning

Code scanning / CodeQL

Calling the system time Warning

Calling the system time may be a possible source of non-determinism
return itr.source.Error()
}

func (itr *iterator) Close() error {
fmt.Printf("[%d] SSDEBUG - iterator.Close\n", time.Now().UnixNano())

Check warning

Code scanning / CodeQL

Calling the system time Warning

Calling the system time may be a possible source of non-determinism
_ = itr.source.Close()
itr.source = nil
itr.valid = false
Expand All @@ -343,6 +365,7 @@
// pair is tombstoned, the caller should call Next(). Note, this method assumes
// the caller assures the iterator is valid first!
func (itr *iterator) cursorTombstoned() bool {
fmt.Printf("[%d] SSDEBUG - iterator.cursorTombstoned\n", time.Now().UnixNano())

Check warning

Code scanning / CodeQL

Calling the system time Warning

Calling the system time may be a possible source of non-determinism
_, tombBz, ok := SplitMVCCKey(itr.source.Value())
if !ok {
// XXX: This should not happen as that would indicate we have a malformed
Expand Down Expand Up @@ -370,6 +393,7 @@
}

func (itr *iterator) DebugRawIterate() {
fmt.Printf("[%d] SSDEBUG - iterator.DebugRawIterate\n", time.Now().UnixNano())

Check warning

Code scanning / CodeQL

Calling the system time Warning

Calling the system time may be a possible source of non-determinism
valid := itr.source.Valid()
if valid {
// The first key may not represent the desired target version, so move the
Expand Down
Loading