Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 62 additions & 52 deletions ss/pebbledb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ type Database struct {
config config.StateStoreConfig
// Earliest version for db after pruning
earliestVersion int64
// Latest version for db
latestVersion int64

// Map of module to when each was last updated
// Used in pruning to skip over stores that have not been updated recently
Expand Down Expand Up @@ -115,20 +117,31 @@ func New(dataDir string, config config.StateStoreConfig) (*Database, error) {
opts.FlushSplitBytes = opts.Levels[0].TargetFileSize
opts = opts.EnsureDefaults()

//TODO: add a new config and check if readonly = true to support readonly mode

db, err := pebble.Open(dataDir, opts)
if err != nil {
return nil, fmt.Errorf("failed to open PebbleDB: %w", err)
}

// Initialize earliest version
earliestVersion, err := retrieveEarliestVersion(db)
if err != nil {
return nil, fmt.Errorf("failed to open PebbleDB: %w", err)
return nil, fmt.Errorf("failed to retrieve earliest version: %w", err)
}

// Initialize latest version
latestVersion, err := retrieveLatestVersion(db)
if err != nil {
return nil, fmt.Errorf("failed to retrieve latest version: %w", err)
}

database := &Database{
storage: db,
asyncWriteWG: sync.WaitGroup{},
config: config,
earliestVersion: earliestVersion,
latestVersion: latestVersion,
pendingChanges: make(chan VersionedChangesets, config.AsyncWriteBuffer),
}

Expand Down Expand Up @@ -159,12 +172,6 @@ func New(dataDir string, config config.StateStoreConfig) (*Database, error) {
return database, nil
}

func NewWithDB(storage *pebble.DB) *Database {
return &Database{
storage: storage,
}
}

func (db *Database) Close() error {
if db.streamHandler != nil {
_ = db.streamHandler.Close()
Expand All @@ -182,32 +189,37 @@ func (db *Database) SetLatestVersion(version int64) error {
if version < 0 {
return fmt.Errorf("version must be non-negative")
}
db.latestVersion = version
var ts [VersionSize]byte
binary.LittleEndian.PutUint64(ts[:], uint64(version))
err := db.storage.Set([]byte(latestVersionKey), ts[:], defaultWriteOpts)
return err
}

func (db *Database) GetLatestVersion() (int64, error) {
bz, closer, err := db.storage.Get([]byte(latestVersionKey))
if err != nil {
func (db *Database) GetLatestVersion() int64 {
return db.latestVersion
}

// Retrieve latestVersion from db, if not found, return 0.
func retrieveLatestVersion(db *pebble.DB) (int64, error) {
bz, closer, err := db.Get([]byte(latestVersionKey))
defer func() {
if closer != nil {
_ = closer.Close()
}
}()
if err != nil || len(bz) == 0 {
if errors.Is(err, pebble.ErrNotFound) {
// in case of a fresh database
return 0, nil
}

return 0, err
}

if len(bz) == 0 {
return 0, closer.Close()
}

uz := binary.LittleEndian.Uint64(bz)
if uz > math.MaxInt64 {
return 0, fmt.Errorf("latest version in database overflows int64: %d", uz)
}
return int64(uz), closer.Close()
return int64(uz), nil
}

func (db *Database) SetEarliestVersion(version int64, ignoreVersion bool) error {
Expand All @@ -216,16 +228,37 @@ func (db *Database) SetEarliestVersion(version int64, ignoreVersion bool) error
}
if version > db.earliestVersion || ignoreVersion {
db.earliestVersion = version

var ts [VersionSize]byte
binary.LittleEndian.PutUint64(ts[:], uint64(version))
return db.storage.Set([]byte(earliestVersionKey), ts[:], defaultWriteOpts)
}
return nil
}

func (db *Database) GetEarliestVersion() (int64, error) {
return db.earliestVersion, nil
func (db *Database) GetEarliestVersion() int64 {
return db.earliestVersion
}

// Retrieves earliest version from db, if not found, return 0
func retrieveEarliestVersion(db *pebble.DB) (int64, error) {
bz, closer, err := db.Get([]byte(earliestVersionKey))
defer func() {
if closer != nil {
_ = closer.Close()
}
}()
if err != nil || len(bz) == 0 {
if errors.Is(err, pebble.ErrNotFound) {
return 0, nil
}
return 0, err
}

ubz := binary.LittleEndian.Uint64(bz)
if ubz > math.MaxInt64 {
return 0, fmt.Errorf("earliest version in database overflows int64: %d", ubz)
}
return int64(ubz), nil
}

func (db *Database) SetLastRangeHashed(latestHashed int64) error {
Expand Down Expand Up @@ -253,29 +286,6 @@ func (db *Database) GetLastRangeHashed() (int64, error) {
return cachedValue, nil
}

// Retrieves earliest version from db
func retrieveEarliestVersion(db *pebble.DB) (int64, error) {
bz, closer, err := db.Get([]byte(earliestVersionKey))
if err != nil {
if errors.Is(err, pebble.ErrNotFound) {
// in case of a fresh database
return 0, nil
}

return 0, err
}

if len(bz) == 0 {
return 0, closer.Close()
}

ubz := binary.LittleEndian.Uint64(bz)
if ubz > math.MaxInt64 {
return 0, fmt.Errorf("earliest version in database overflows int64: %d", ubz)
}
return int64(ubz), closer.Close()
}

// SetLatestKey sets the latest key processed during migration.
func (db *Database) SetLatestMigratedKey(key []byte) error {
return db.storage.Set([]byte(latestMigratedKeyMetadata), key, defaultWriteOpts)
Expand Down Expand Up @@ -373,6 +383,7 @@ func (db *Database) ApplyChangeset(version int64, cs *proto.NamedChangeSet) erro
version = 1
}

// Create batch and persist latest version in the batch
b, err := NewBatch(db.storage, version)
if err != nil {
return err
Expand All @@ -383,17 +394,20 @@ func (db *Database) ApplyChangeset(version int64, cs *proto.NamedChangeSet) erro
if err := b.Delete(cs.Name, kvPair.Key); err != nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a general practice i recommend against overriding error like this, specially in cases like this since the scope of err is tighly limited to the if block. Ditto for next if statement.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed, thanks for pointing that out!

return err
}
} else {
if err := b.Set(cs.Name, kvPair.Key, kvPair.Value); err != nil {
return err
}
} else if err := b.Set(cs.Name, kvPair.Key, kvPair.Value); err != nil {
return err
}
}

// Mark the store as updated
db.storeKeyDirty.Store(cs.Name, version)

return b.Write()
if err := b.Write(); err != nil {
return err
}
// Update latest version on write success
db.latestVersion = version
return nil
}

func (db *Database) ApplyChangesetAsync(version int64, changesets []*proto.NamedChangeSet) error {
Expand Down Expand Up @@ -537,10 +551,6 @@ func (db *Database) writeAsyncInBackground() {
panic(err)
}
}
err := db.SetLatestVersion(version)
if err != nil {
panic(err)
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion ss/pruning/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func (m *Manager) Start() {
go func() {
for {
pruneStartTime := time.Now()
latestVersion, _ := m.stateStore.GetLatestVersion()
latestVersion := m.stateStore.GetLatestVersion()
pruneVersion := latestVersion - m.keepRecent
if pruneVersion > 0 {
// prune all versions up to and including the pruneVersion
Expand Down
Loading
Loading