-
Notifications
You must be signed in to change notification settings - Fork 17
Add and expose latest version as watermark #116
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
d70c912
6c44357
d76d6c7
3bee995
1640542
5d87eec
8a98170
1518f68
6b7dab2
f372213
10e313a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -59,6 +59,8 @@ type Database struct { | |
| config config.StateStoreConfig | ||
| // Earliest version for db after pruning | ||
| earliestVersion int64 | ||
| // Latest version for db | ||
| latestVersion int64 | ||
|
|
||
| // Map of module to when each was last updated | ||
| // Used in pruning to skip over stores that have not been updated recently | ||
|
|
@@ -115,20 +117,31 @@ func New(dataDir string, config config.StateStoreConfig) (*Database, error) { | |
| opts.FlushSplitBytes = opts.Levels[0].TargetFileSize | ||
| opts = opts.EnsureDefaults() | ||
|
|
||
| //TODO: add a new config and check if readonly = true to support readonly mode | ||
|
|
||
| db, err := pebble.Open(dataDir, opts) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("failed to open PebbleDB: %w", err) | ||
| } | ||
|
|
||
| // Initialize earliest version | ||
| earliestVersion, err := retrieveEarliestVersion(db) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("failed to open PebbleDB: %w", err) | ||
| return nil, fmt.Errorf("failed to retrieve earliest version: %w", err) | ||
| } | ||
|
|
||
| // Initialize latest version | ||
| latestVersion, err := retrieveLatestVersion(db) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("failed to retrieve latest version: %w", err) | ||
| } | ||
|
|
||
| database := &Database{ | ||
| storage: db, | ||
| asyncWriteWG: sync.WaitGroup{}, | ||
| config: config, | ||
| earliestVersion: earliestVersion, | ||
| latestVersion: latestVersion, | ||
| pendingChanges: make(chan VersionedChangesets, config.AsyncWriteBuffer), | ||
| } | ||
|
|
||
|
|
@@ -159,12 +172,6 @@ func New(dataDir string, config config.StateStoreConfig) (*Database, error) { | |
| return database, nil | ||
| } | ||
|
|
||
| func NewWithDB(storage *pebble.DB) *Database { | ||
| return &Database{ | ||
| storage: storage, | ||
| } | ||
| } | ||
|
|
||
| func (db *Database) Close() error { | ||
| if db.streamHandler != nil { | ||
| _ = db.streamHandler.Close() | ||
|
|
@@ -182,32 +189,37 @@ func (db *Database) SetLatestVersion(version int64) error { | |
| if version < 0 { | ||
| return fmt.Errorf("version must be non-negative") | ||
| } | ||
| db.latestVersion = version | ||
| var ts [VersionSize]byte | ||
| binary.LittleEndian.PutUint64(ts[:], uint64(version)) | ||
| err := db.storage.Set([]byte(latestVersionKey), ts[:], defaultWriteOpts) | ||
| return err | ||
| } | ||
|
|
||
| func (db *Database) GetLatestVersion() (int64, error) { | ||
| bz, closer, err := db.storage.Get([]byte(latestVersionKey)) | ||
| if err != nil { | ||
| func (db *Database) GetLatestVersion() int64 { | ||
| return db.latestVersion | ||
| } | ||
|
|
||
| // Retrieve latestVersion from db, if not found, return 0. | ||
| func retrieveLatestVersion(db *pebble.DB) (int64, error) { | ||
| bz, closer, err := db.Get([]byte(latestVersionKey)) | ||
| defer func() { | ||
| if closer != nil { | ||
| _ = closer.Close() | ||
| } | ||
| }() | ||
| if err != nil || len(bz) == 0 { | ||
| if errors.Is(err, pebble.ErrNotFound) { | ||
yzang2019 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| // in case of a fresh database | ||
| return 0, nil | ||
| } | ||
|
|
||
| return 0, err | ||
| } | ||
|
|
||
| if len(bz) == 0 { | ||
| return 0, closer.Close() | ||
| } | ||
|
|
||
| uz := binary.LittleEndian.Uint64(bz) | ||
| if uz > math.MaxInt64 { | ||
| return 0, fmt.Errorf("latest version in database overflows int64: %d", uz) | ||
| } | ||
| return int64(uz), closer.Close() | ||
| return int64(uz), nil | ||
| } | ||
|
|
||
| func (db *Database) SetEarliestVersion(version int64, ignoreVersion bool) error { | ||
|
|
@@ -216,16 +228,37 @@ func (db *Database) SetEarliestVersion(version int64, ignoreVersion bool) error | |
| } | ||
| if version > db.earliestVersion || ignoreVersion { | ||
| db.earliestVersion = version | ||
|
|
||
| var ts [VersionSize]byte | ||
| binary.LittleEndian.PutUint64(ts[:], uint64(version)) | ||
| return db.storage.Set([]byte(earliestVersionKey), ts[:], defaultWriteOpts) | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| func (db *Database) GetEarliestVersion() (int64, error) { | ||
| return db.earliestVersion, nil | ||
| func (db *Database) GetEarliestVersion() int64 { | ||
| return db.earliestVersion | ||
| } | ||
|
|
||
| // Retrieves earliest version from db, if not found, return 0 | ||
| func retrieveEarliestVersion(db *pebble.DB) (int64, error) { | ||
| bz, closer, err := db.Get([]byte(earliestVersionKey)) | ||
| defer func() { | ||
| if closer != nil { | ||
| _ = closer.Close() | ||
| } | ||
| }() | ||
| if err != nil || len(bz) == 0 { | ||
| if errors.Is(err, pebble.ErrNotFound) { | ||
| return 0, nil | ||
| } | ||
| return 0, err | ||
| } | ||
|
|
||
| ubz := binary.LittleEndian.Uint64(bz) | ||
| if ubz > math.MaxInt64 { | ||
| return 0, fmt.Errorf("earliest version in database overflows int64: %d", ubz) | ||
| } | ||
| return int64(ubz), nil | ||
| } | ||
|
|
||
| func (db *Database) SetLastRangeHashed(latestHashed int64) error { | ||
|
|
@@ -253,29 +286,6 @@ func (db *Database) GetLastRangeHashed() (int64, error) { | |
| return cachedValue, nil | ||
| } | ||
|
|
||
| // Retrieves earliest version from db | ||
| func retrieveEarliestVersion(db *pebble.DB) (int64, error) { | ||
| bz, closer, err := db.Get([]byte(earliestVersionKey)) | ||
| if err != nil { | ||
| if errors.Is(err, pebble.ErrNotFound) { | ||
| // in case of a fresh database | ||
| return 0, nil | ||
| } | ||
|
|
||
| return 0, err | ||
| } | ||
|
|
||
| if len(bz) == 0 { | ||
| return 0, closer.Close() | ||
| } | ||
|
|
||
| ubz := binary.LittleEndian.Uint64(bz) | ||
| if ubz > math.MaxInt64 { | ||
| return 0, fmt.Errorf("earliest version in database overflows int64: %d", ubz) | ||
| } | ||
| return int64(ubz), closer.Close() | ||
| } | ||
|
|
||
| // SetLatestKey sets the latest key processed during migration. | ||
| func (db *Database) SetLatestMigratedKey(key []byte) error { | ||
| return db.storage.Set([]byte(latestMigratedKeyMetadata), key, defaultWriteOpts) | ||
|
|
@@ -373,6 +383,7 @@ func (db *Database) ApplyChangeset(version int64, cs *proto.NamedChangeSet) erro | |
| version = 1 | ||
| } | ||
|
|
||
| // Create batch and persist latest version in the batch | ||
| b, err := NewBatch(db.storage, version) | ||
| if err != nil { | ||
| return err | ||
|
|
@@ -383,17 +394,20 @@ func (db *Database) ApplyChangeset(version int64, cs *proto.NamedChangeSet) erro | |
| if err := b.Delete(cs.Name, kvPair.Key); err != nil { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As a general practice i recommend against overriding error like this, specially in cases like this since the scope of err is tighly limited to the if block. Ditto for next if statement.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed, thanks for pointing that out! |
||
| return err | ||
| } | ||
| } else { | ||
| if err := b.Set(cs.Name, kvPair.Key, kvPair.Value); err != nil { | ||
| return err | ||
| } | ||
| } else if err := b.Set(cs.Name, kvPair.Key, kvPair.Value); err != nil { | ||
| return err | ||
| } | ||
| } | ||
|
|
||
| // Mark the store as updated | ||
| db.storeKeyDirty.Store(cs.Name, version) | ||
|
|
||
| return b.Write() | ||
| if err := b.Write(); err != nil { | ||
| return err | ||
| } | ||
| // Update latest version on write success | ||
| db.latestVersion = version | ||
| return nil | ||
| } | ||
|
|
||
| func (db *Database) ApplyChangesetAsync(version int64, changesets []*proto.NamedChangeSet) error { | ||
|
|
@@ -537,10 +551,6 @@ func (db *Database) writeAsyncInBackground() { | |
| panic(err) | ||
| } | ||
| } | ||
| err := db.SetLatestVersion(version) | ||
| if err != nil { | ||
| panic(err) | ||
| } | ||
| } | ||
| } | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.