Skip to content

Commit 3c38a45

Browse files
authored
Add and expose latest version as watermark (#116)
## Describe your changes and provide context This PR add an in memory latestVersion for each database, change interface to handle getLatestVersion always from in memory to avoid any I/O read. Add logic to retrieve latestVersion during database initialization. ## Testing performed to validate your change Added and fix unit test for latestVersion
1 parent 0a109bb commit 3c38a45

File tree

7 files changed

+193
-156
lines changed

7 files changed

+193
-156
lines changed

ss/pebbledb/db.go

Lines changed: 62 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ type Database struct {
5959
config config.StateStoreConfig
6060
// Earliest version for db after pruning
6161
earliestVersion int64
62+
// Latest version for db
63+
latestVersion int64
6264

6365
// Map of module to when each was last updated
6466
// Used in pruning to skip over stores that have not been updated recently
@@ -115,20 +117,31 @@ func New(dataDir string, config config.StateStoreConfig) (*Database, error) {
115117
opts.FlushSplitBytes = opts.Levels[0].TargetFileSize
116118
opts = opts.EnsureDefaults()
117119

120+
//TODO: add a new config and check if readonly = true to support readonly mode
121+
118122
db, err := pebble.Open(dataDir, opts)
119123
if err != nil {
120124
return nil, fmt.Errorf("failed to open PebbleDB: %w", err)
121125
}
122126

127+
// Initialize earliest version
123128
earliestVersion, err := retrieveEarliestVersion(db)
124129
if err != nil {
125-
return nil, fmt.Errorf("failed to open PebbleDB: %w", err)
130+
return nil, fmt.Errorf("failed to retrieve earliest version: %w", err)
131+
}
132+
133+
// Initialize latest version
134+
latestVersion, err := retrieveLatestVersion(db)
135+
if err != nil {
136+
return nil, fmt.Errorf("failed to retrieve latest version: %w", err)
126137
}
138+
127139
database := &Database{
128140
storage: db,
129141
asyncWriteWG: sync.WaitGroup{},
130142
config: config,
131143
earliestVersion: earliestVersion,
144+
latestVersion: latestVersion,
132145
pendingChanges: make(chan VersionedChangesets, config.AsyncWriteBuffer),
133146
}
134147

@@ -159,12 +172,6 @@ func New(dataDir string, config config.StateStoreConfig) (*Database, error) {
159172
return database, nil
160173
}
161174

162-
func NewWithDB(storage *pebble.DB) *Database {
163-
return &Database{
164-
storage: storage,
165-
}
166-
}
167-
168175
func (db *Database) Close() error {
169176
if db.streamHandler != nil {
170177
_ = db.streamHandler.Close()
@@ -182,32 +189,37 @@ func (db *Database) SetLatestVersion(version int64) error {
182189
if version < 0 {
183190
return fmt.Errorf("version must be non-negative")
184191
}
192+
db.latestVersion = version
185193
var ts [VersionSize]byte
186194
binary.LittleEndian.PutUint64(ts[:], uint64(version))
187195
err := db.storage.Set([]byte(latestVersionKey), ts[:], defaultWriteOpts)
188196
return err
189197
}
190198

191-
func (db *Database) GetLatestVersion() (int64, error) {
192-
bz, closer, err := db.storage.Get([]byte(latestVersionKey))
193-
if err != nil {
199+
func (db *Database) GetLatestVersion() int64 {
200+
return db.latestVersion
201+
}
202+
203+
// Retrieve latestVersion from db, if not found, return 0.
204+
func retrieveLatestVersion(db *pebble.DB) (int64, error) {
205+
bz, closer, err := db.Get([]byte(latestVersionKey))
206+
defer func() {
207+
if closer != nil {
208+
_ = closer.Close()
209+
}
210+
}()
211+
if err != nil || len(bz) == 0 {
194212
if errors.Is(err, pebble.ErrNotFound) {
195-
// in case of a fresh database
196213
return 0, nil
197214
}
198-
199215
return 0, err
200216
}
201217

202-
if len(bz) == 0 {
203-
return 0, closer.Close()
204-
}
205-
206218
uz := binary.LittleEndian.Uint64(bz)
207219
if uz > math.MaxInt64 {
208220
return 0, fmt.Errorf("latest version in database overflows int64: %d", uz)
209221
}
210-
return int64(uz), closer.Close()
222+
return int64(uz), nil
211223
}
212224

213225
func (db *Database) SetEarliestVersion(version int64, ignoreVersion bool) error {
@@ -216,16 +228,37 @@ func (db *Database) SetEarliestVersion(version int64, ignoreVersion bool) error
216228
}
217229
if version > db.earliestVersion || ignoreVersion {
218230
db.earliestVersion = version
219-
220231
var ts [VersionSize]byte
221232
binary.LittleEndian.PutUint64(ts[:], uint64(version))
222233
return db.storage.Set([]byte(earliestVersionKey), ts[:], defaultWriteOpts)
223234
}
224235
return nil
225236
}
226237

227-
func (db *Database) GetEarliestVersion() (int64, error) {
228-
return db.earliestVersion, nil
238+
func (db *Database) GetEarliestVersion() int64 {
239+
return db.earliestVersion
240+
}
241+
242+
// Retrieves earliest version from db, if not found, return 0
243+
func retrieveEarliestVersion(db *pebble.DB) (int64, error) {
244+
bz, closer, err := db.Get([]byte(earliestVersionKey))
245+
defer func() {
246+
if closer != nil {
247+
_ = closer.Close()
248+
}
249+
}()
250+
if err != nil || len(bz) == 0 {
251+
if errors.Is(err, pebble.ErrNotFound) {
252+
return 0, nil
253+
}
254+
return 0, err
255+
}
256+
257+
ubz := binary.LittleEndian.Uint64(bz)
258+
if ubz > math.MaxInt64 {
259+
return 0, fmt.Errorf("earliest version in database overflows int64: %d", ubz)
260+
}
261+
return int64(ubz), nil
229262
}
230263

231264
func (db *Database) SetLastRangeHashed(latestHashed int64) error {
@@ -253,29 +286,6 @@ func (db *Database) GetLastRangeHashed() (int64, error) {
253286
return cachedValue, nil
254287
}
255288

256-
// Retrieves earliest version from db
257-
func retrieveEarliestVersion(db *pebble.DB) (int64, error) {
258-
bz, closer, err := db.Get([]byte(earliestVersionKey))
259-
if err != nil {
260-
if errors.Is(err, pebble.ErrNotFound) {
261-
// in case of a fresh database
262-
return 0, nil
263-
}
264-
265-
return 0, err
266-
}
267-
268-
if len(bz) == 0 {
269-
return 0, closer.Close()
270-
}
271-
272-
ubz := binary.LittleEndian.Uint64(bz)
273-
if ubz > math.MaxInt64 {
274-
return 0, fmt.Errorf("earliest version in database overflows int64: %d", ubz)
275-
}
276-
return int64(ubz), closer.Close()
277-
}
278-
279289
// SetLatestKey sets the latest key processed during migration.
280290
func (db *Database) SetLatestMigratedKey(key []byte) error {
281291
return db.storage.Set([]byte(latestMigratedKeyMetadata), key, defaultWriteOpts)
@@ -373,6 +383,7 @@ func (db *Database) ApplyChangeset(version int64, cs *proto.NamedChangeSet) erro
373383
version = 1
374384
}
375385

386+
// Create batch and persist latest version in the batch
376387
b, err := NewBatch(db.storage, version)
377388
if err != nil {
378389
return err
@@ -383,17 +394,20 @@ func (db *Database) ApplyChangeset(version int64, cs *proto.NamedChangeSet) erro
383394
if err := b.Delete(cs.Name, kvPair.Key); err != nil {
384395
return err
385396
}
386-
} else {
387-
if err := b.Set(cs.Name, kvPair.Key, kvPair.Value); err != nil {
388-
return err
389-
}
397+
} else if err := b.Set(cs.Name, kvPair.Key, kvPair.Value); err != nil {
398+
return err
390399
}
391400
}
392401

393402
// Mark the store as updated
394403
db.storeKeyDirty.Store(cs.Name, version)
395404

396-
return b.Write()
405+
if err := b.Write(); err != nil {
406+
return err
407+
}
408+
// Update latest version on write success
409+
db.latestVersion = version
410+
return nil
397411
}
398412

399413
func (db *Database) ApplyChangesetAsync(version int64, changesets []*proto.NamedChangeSet) error {
@@ -537,10 +551,6 @@ func (db *Database) writeAsyncInBackground() {
537551
panic(err)
538552
}
539553
}
540-
err := db.SetLatestVersion(version)
541-
if err != nil {
542-
panic(err)
543-
}
544554
}
545555
}
546556

ss/pruning/manager.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ func (m *Manager) Start() {
4141
go func() {
4242
for {
4343
pruneStartTime := time.Now()
44-
latestVersion, _ := m.stateStore.GetLatestVersion()
44+
latestVersion := m.stateStore.GetLatestVersion()
4545
pruneVersion := latestVersion - m.keepRecent
4646
if pruneVersion > 0 {
4747
// prune all versions up to and including the pruneVersion

0 commit comments

Comments
 (0)