Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions ss/pebbledb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -677,6 +677,7 @@ func prefixEnd(b []byte) []byte {
}

func (db *Database) ReverseIterator(storeKey string, version int64, start, end []byte) (types.DBIterator, error) {
fmt.Printf("[Debug] Creating reverset iterator for storeKey %s, version %d, start %X, end %X\n", storeKey, version, start, end)
if (start != nil && len(start) == 0) || (end != nil && len(end) == 0) {
return nil, errorutils.ErrKeyEmpty
}
Expand All @@ -689,15 +690,23 @@ func (db *Database) ReverseIterator(storeKey string, version int64, start, end [

var upperBound []byte
if end != nil {
upperBound = MVCCEncode(prependStoreKey(storeKey, end), 0)
upperBound = MVCCEncode(prependStoreKey(storeKey, end), version+1)
} else {
upperBound = MVCCEncode(prefixEnd(storePrefix(storeKey)), 0)
upperBound = MVCCEncode(prefixEnd(storePrefix(storeKey)), version+1)
}

itr, err := db.storage.NewIter(&pebble.IterOptions{LowerBound: lowerBound, UpperBound: upperBound})
if err != nil {
return nil, fmt.Errorf("failed to create PebbleDB iterator: %w", err)
}
valid := itr.SeekLT(upperBound)
if !valid {
fmt.Printf("Invalid reverse iterator\n")
return nil, errors.New("invalid range")
}
currKey, currKeyVersion, _ := SplitMVCCKey(itr.Key())
curKeyVersionDecoded, _ := decodeUint64Ascending(currKeyVersion)
fmt.Printf("[Debug] After seekLT, currKey is %X, currKeyVersion is %d\n", currKey, curKeyVersionDecoded)

return newPebbleDBIterator(itr, storePrefix(storeKey), start, end, version, db.earliestVersion, true), nil
}
Expand Down
79 changes: 61 additions & 18 deletions ss/pebbledb/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,9 @@
}
}

// move the underlying PebbleDB iterator to the first key
var valid bool
if reverse {
valid = src.Last()
} else {
valid = src.First()
}
valid := true

// move the underlying PebbleDB iterator to the first key
itr := &iterator{
source: src,
prefix: prefix,
Expand All @@ -66,12 +61,12 @@
// XXX: This should not happen as that would indicate we have a malformed MVCC key.
panic(fmt.Sprintf("invalid PebbleDB MVCC key: %s", itr.source.Key()))
}

curKeyVersionDecoded, err := decodeUint64Ascending(currKeyVersion)
if err != nil {
itr.valid = false
return itr
}
fmt.Printf("[Debug] Intialize iterator with currKey %X, version %d\n", currKey, curKeyVersionDecoded)

Check failure on line 69 in ss/pebbledb/iterator.go

View workflow job for this annotation

GitHub Actions / lint

`Intialize` is a misspelling of `Initialize` (misspell)

// We need to check whether initial key iterator visits has a version <= requested version
// If larger version, call next to find another key which does
Expand Down Expand Up @@ -136,35 +131,48 @@
return
}

currKey, _, ok := SplitMVCCKey(itr.source.Key())
currKey, currKeyVersionBz, ok := SplitMVCCKey(itr.source.Key())
if !ok {
// XXX: This should not happen as that would indicate we have a malformed
// MVCC key.
panic(fmt.Sprintf("invalid PebbleDB MVCC key: %s", itr.source.Key()))
}
currKeyVersion, _ := decodeUint64Ascending(currKeyVersionBz) // Assuming decode won't fail here based on prior checks
fmt.Printf("nextForward: currKey=%s, currKeyVersion=%d\n", string(currKey), currKeyVersion)

next := itr.source.NextPrefix()

// First move the iterator to the next prefix, which may not correspond to the
// desired version for that key, e.g. if the key was written at a later version,
// so we seek back to the latest desired version, s.t. the version is <= itr.version.
if next {
nextKey, _, ok := SplitMVCCKey(itr.source.Key())
nextKey, nextKeyVersionBz, ok := SplitMVCCKey(itr.source.Key())
if !ok {
// XXX: This should not happen as that would indicate we have a malformed
// MVCC key.
itr.valid = false
return
}
nextKeyVersion, _ := decodeUint64Ascending(nextKeyVersionBz) // Assuming decode won't fail here
fmt.Printf("nextForward: nextPrefix moved to nextKey=%s, nextKeyVersion=%d\n", string(nextKey), nextKeyVersion)

if !bytes.HasPrefix(nextKey, itr.prefix) {
// the next key must have itr.prefix as the prefix
fmt.Printf("nextForward: nextKey %s does not have prefix %s\n", string(nextKey), string(itr.prefix))
itr.valid = false
return
}

// Move the iterator to the closest version to the desired version, so we
// append the current iterator key to the prefix and seek to that key.
itr.valid = itr.source.SeekLT(MVCCEncode(nextKey, itr.version+1))
seekTarget := MVCCEncode(nextKey, itr.version+1)
fmt.Printf("nextForward: seeking LT %s (key=%s, version=%d)\n", string(seekTarget), string(nextKey), itr.version+1)
itr.valid = itr.source.SeekLT(seekTarget)

if !itr.valid {
fmt.Printf("nextForward: SeekLT returned invalid\n")
return
}

tmpKey, tmpKeyVersion, ok := SplitMVCCKey(itr.source.Key())
if !ok {
Expand All @@ -173,13 +181,24 @@
itr.valid = false
return
}
tmpKeyVersionDecoded, err := decodeUint64Ascending(tmpKeyVersion)
if err != nil {
itr.valid = false
return
}
fmt.Printf("nextForward: SeekLT landed on tmpKey=%s, tmpKeyVersion=%d\n", string(tmpKey), tmpKeyVersionDecoded)

// There exists cases where the SeekLT() call moved us back to the same key
// we started at, so we must move to next key, i.e. two keys forward.
if bytes.Equal(tmpKey, currKey) {
fmt.Printf("nextForward: SeekLT moved back to original key %s, calling NextPrefix and nextForward recursively\n", string(currKey))
if itr.source.NextPrefix() {
itr.nextForward()

// Need to re-check key and version after recursive call returns
if !itr.valid {
return // Recursive call invalidated the iterator
}
_, tmpKeyVersion, ok = SplitMVCCKey(itr.source.Key())
if !ok {
// XXX: This should not happen as that would indicate we have a malformed
Expand All @@ -189,32 +208,37 @@
}

} else {
fmt.Printf("nextForward: NextPrefix after SeekLT returned false\n")
itr.valid = false
return
}
}

// We need to verify that every Next call either moves the iterator to a key whose version
// is less than or equal to requested iterator version, or exhausts the iterator
tmpKeyVersionDecoded, err := decodeUint64Ascending(tmpKeyVersion)
// Re-decode version after potential recursive call
tmpKeyVersionDecoded, err = decodeUint64Ascending(tmpKeyVersion)
if err != nil {
itr.valid = false
return
}

// If iterator is at a entry whose version is higher than requested version, call nextForward again
if tmpKeyVersionDecoded > itr.version {
fmt.Printf("nextForward: tmpKeyVersion %d > target version %d, calling nextForward recursively\n", tmpKeyVersionDecoded, itr.version)
itr.nextForward()
}

// The cursor might now be pointing at a key/value pair that is tombstoned.
// If so, we must move the cursor.
if itr.valid && itr.cursorTombstoned() {
fmt.Printf("nextForward: cursor is tombstoned, calling nextForward recursively\n")
itr.nextForward()
}

return
}
fmt.Printf("nextForward: NextPrefix returned false\n")

itr.valid = false
}
Expand All @@ -225,37 +249,52 @@
return
}

currKey, _, ok := SplitMVCCKey(itr.source.Key())
currKey, currKeyVersionBz, ok := SplitMVCCKey(itr.source.Key())
if !ok {
// XXX: This should not happen as that would indicate we have a malformed
// MVCC key.
panic(fmt.Sprintf("invalid PebbleDB MVCC key: %s", itr.source.Key()))
panic(fmt.Sprintf("invalid PebbleDB MVCC key: %X", itr.source.Key()))
}
currKeyVersion, _ := decodeUint64Ascending(currKeyVersionBz) // Assuming decode won't fail here
fmt.Printf("nextReverse: currKey=%X, currKeyVersion=%d\n", currKey, currKeyVersion)

next := itr.source.SeekLT(MVCCEncode(currKey, 0))
seekTarget := MVCCEncode(currKey, 0)
fmt.Printf("nextReverse: seeking LT %s (key=%X, version=0)\n", string(seekTarget), currKey)
next := itr.source.SeekLT(seekTarget)

// First move the iterator to the next prefix, which may not correspond to the
// desired version for that key, e.g. if the key was written at a later version,
// so we seek back to the latest desired version, s.t. the version is <= itr.version.
if next {
nextKey, _, ok := SplitMVCCKey(itr.source.Key())
nextKey, nextKeyVersionBz, ok := SplitMVCCKey(itr.source.Key())
if !ok {
// XXX: This should not happen as that would indicate we have a malformed
// MVCC key.
itr.valid = false
return
}
nextKeyVersion, _ := decodeUint64Ascending(nextKeyVersionBz) // Assuming decode won't fail here
fmt.Printf("nextReverse: SeekLT(currKey, 0) moved to nextKey=%X, nextKeyVersion=%d\n", nextKey, nextKeyVersion)

if !bytes.HasPrefix(nextKey, itr.prefix) {
// the next key must have itr.prefix as the prefix
fmt.Printf("nextReverse: nextKey %X does not have prefix %s\n", nextKey, string(itr.prefix))
itr.valid = false
return
}

// Move the iterator to the closest version to the desired version, so we
// append the current iterator key to the prefix and seek to that key.
itr.valid = itr.source.SeekLT(MVCCEncode(nextKey, itr.version+1))
seekTargetClosest := MVCCEncode(nextKey, itr.version+1)
fmt.Printf("nextReverse: seeking LT %s (key=%X, version=%d)\n", string(seekTargetClosest), nextKey, itr.version+1)
itr.valid = itr.source.SeekLT(seekTargetClosest)

if !itr.valid {
fmt.Printf("nextReverse: SeekLT(nextKey, version+1) returned invalid\n")
return
}

_, tmpKeyVersion, ok := SplitMVCCKey(itr.source.Key())
tmpKey, tmpKeyVersion, ok := SplitMVCCKey(itr.source.Key()) // Renamed from _ to tmpKey for logging
if !ok {
// XXX: This should not happen as that would indicate we have a malformed
// MVCC key.
Expand All @@ -270,20 +309,24 @@
itr.valid = false
return
}
fmt.Printf("nextReverse: SeekLT(nextKey, version+1) landed on tmpKey=%X, tmpKeyVersion=%d\n", tmpKey, tmpKeyVersionDecoded)

// If iterator is at a entry whose version is higher than requested version, call nextReverse again
if tmpKeyVersionDecoded > itr.version {
fmt.Printf("nextReverse: tmpKeyVersion %d > target version %d, calling nextReverse recursively\n", tmpKeyVersionDecoded, itr.version)
itr.nextReverse()
}

// The cursor might now be pointing at a key/value pair that is tombstoned.
// If so, we must move the cursor.
if itr.valid && itr.cursorTombstoned() {
fmt.Printf("nextReverse: cursor is tombstoned, calling nextReverse recursively\n")
itr.nextReverse()
}

return
}
fmt.Printf("nextReverse: SeekLT(currKey, 0) returned false\n")

itr.valid = false
}
Expand Down
1 change: 1 addition & 0 deletions tools/cmd/seidb/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ func main() {
operations.PruneCmd(),
operations.DumpIAVLCmd(),
operations.StateSizeCmd(),
operations.TestIteratorCmd(),
operations.ReplayChangelogCmd())
if err := rootCmd.Execute(); err != nil {
fmt.Println(err)
Expand Down
57 changes: 57 additions & 0 deletions tools/cmd/seidb/operations/test_iterator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package operations

import (
"encoding/hex"
"fmt"

"github.com/sei-protocol/sei-db/common/logger"
"github.com/sei-protocol/sei-db/config"
"github.com/sei-protocol/sei-db/ss"
"github.com/spf13/cobra"
)

func TestIteratorCmd() *cobra.Command {
iteratorCmd := &cobra.Command{
Use: "test-iterator",
Short: "Test forward or reverse iterator",
Run: executeIterator,
}

iteratorCmd.PersistentFlags().StringP("home-dir", "d", "/root/.sei", "Database Directory")
iteratorCmd.PersistentFlags().StringP("start", "s", "07", "Start key")
iteratorCmd.PersistentFlags().StringP("end", "e", "08", "End key")

return iteratorCmd
}

func executeIterator(cmd *cobra.Command, _ []string) {
homeDir, _ := cmd.Flags().GetString("home-dir")
start, _ := cmd.Flags().GetString("start")
end, _ := cmd.Flags().GetString("end")
IterateDbData(homeDir, start, end)
}

func IterateDbData(homeDir string, start string, end string) {
ssConfig := config.DefaultStateStoreConfig()
ssConfig.KeepRecent = 0
ssStore, err := ss.NewStateStore(logger.NewNopLogger(), homeDir, ssConfig)
if err != nil {
panic(err)
}
defer ssStore.Close()
if err != nil {
panic(err)
}
fmt.Printf("Start reverse iteration\n")
startPos, _ := hex.DecodeString(start)
endPos, _ := hex.DecodeString(end)
iter, err := ssStore.ReverseIterator("oracle", 98350313, startPos, endPos)
if err != nil {
panic(err)
}
for ; iter.Valid(); iter.Next() {
fmt.Printf("key: %X, value %X\n", iter.Key(), iter.Value())
}
iter.Close()
fmt.Printf("Complete reverse iteration\n")
}
Loading