@@ -59,6 +59,8 @@ type Database struct {
5959 config config.StateStoreConfig
6060 // Earliest version for db after pruning
6161 earliestVersion int64
62+ // Latest version for db
63+ latestVersion int64
6264
6365 // Map of module to when each was last updated
6466 // Used in pruning to skip over stores that have not been updated recently
@@ -115,20 +117,31 @@ func New(dataDir string, config config.StateStoreConfig) (*Database, error) {
115117 opts .FlushSplitBytes = opts .Levels [0 ].TargetFileSize
116118 opts = opts .EnsureDefaults ()
117119
120+ //TODO: add a new config and check if readonly = true to support readonly mode
121+
118122 db , err := pebble .Open (dataDir , opts )
119123 if err != nil {
120124 return nil , fmt .Errorf ("failed to open PebbleDB: %w" , err )
121125 }
122126
127+ // Initialize earliest version
123128 earliestVersion , err := retrieveEarliestVersion (db )
124129 if err != nil {
125- return nil , fmt .Errorf ("failed to open PebbleDB: %w" , err )
130+ return nil , fmt .Errorf ("failed to retrieve earliest version: %w" , err )
131+ }
132+
133+ // Initialize latest version
134+ latestVersion , err := retrieveLatestVersion (db )
135+ if err != nil {
136+ return nil , fmt .Errorf ("failed to retrieve latest version: %w" , err )
126137 }
138+
127139 database := & Database {
128140 storage : db ,
129141 asyncWriteWG : sync.WaitGroup {},
130142 config : config ,
131143 earliestVersion : earliestVersion ,
144+ latestVersion : latestVersion ,
132145 pendingChanges : make (chan VersionedChangesets , config .AsyncWriteBuffer ),
133146 }
134147
@@ -159,12 +172,6 @@ func New(dataDir string, config config.StateStoreConfig) (*Database, error) {
159172 return database , nil
160173}
161174
162- func NewWithDB (storage * pebble.DB ) * Database {
163- return & Database {
164- storage : storage ,
165- }
166- }
167-
168175func (db * Database ) Close () error {
169176 if db .streamHandler != nil {
170177 _ = db .streamHandler .Close ()
@@ -182,32 +189,37 @@ func (db *Database) SetLatestVersion(version int64) error {
182189 if version < 0 {
183190 return fmt .Errorf ("version must be non-negative" )
184191 }
192+ db .latestVersion = version
185193 var ts [VersionSize ]byte
186194 binary .LittleEndian .PutUint64 (ts [:], uint64 (version ))
187195 err := db .storage .Set ([]byte (latestVersionKey ), ts [:], defaultWriteOpts )
188196 return err
189197}
190198
191- func (db * Database ) GetLatestVersion () (int64 , error ) {
192- bz , closer , err := db .storage .Get ([]byte (latestVersionKey ))
193- if err != nil {
199+ func (db * Database ) GetLatestVersion () int64 {
200+ return db .latestVersion
201+ }
202+
203+ // Retrieve latestVersion from db, if not found, return 0.
204+ func retrieveLatestVersion (db * pebble.DB ) (int64 , error ) {
205+ bz , closer , err := db .Get ([]byte (latestVersionKey ))
206+ defer func () {
207+ if closer != nil {
208+ _ = closer .Close ()
209+ }
210+ }()
211+ if err != nil || len (bz ) == 0 {
194212 if errors .Is (err , pebble .ErrNotFound ) {
195- // in case of a fresh database
196213 return 0 , nil
197214 }
198-
199215 return 0 , err
200216 }
201217
202- if len (bz ) == 0 {
203- return 0 , closer .Close ()
204- }
205-
206218 uz := binary .LittleEndian .Uint64 (bz )
207219 if uz > math .MaxInt64 {
208220 return 0 , fmt .Errorf ("latest version in database overflows int64: %d" , uz )
209221 }
210- return int64 (uz ), closer . Close ()
222+ return int64 (uz ), nil
211223}
212224
213225func (db * Database ) SetEarliestVersion (version int64 , ignoreVersion bool ) error {
@@ -216,16 +228,37 @@ func (db *Database) SetEarliestVersion(version int64, ignoreVersion bool) error
216228 }
217229 if version > db .earliestVersion || ignoreVersion {
218230 db .earliestVersion = version
219-
220231 var ts [VersionSize ]byte
221232 binary .LittleEndian .PutUint64 (ts [:], uint64 (version ))
222233 return db .storage .Set ([]byte (earliestVersionKey ), ts [:], defaultWriteOpts )
223234 }
224235 return nil
225236}
226237
227- func (db * Database ) GetEarliestVersion () (int64 , error ) {
228- return db .earliestVersion , nil
238+ func (db * Database ) GetEarliestVersion () int64 {
239+ return db .earliestVersion
240+ }
241+
242+ // Retrieves earliest version from db, if not found, return 0
243+ func retrieveEarliestVersion (db * pebble.DB ) (int64 , error ) {
244+ bz , closer , err := db .Get ([]byte (earliestVersionKey ))
245+ defer func () {
246+ if closer != nil {
247+ _ = closer .Close ()
248+ }
249+ }()
250+ if err != nil || len (bz ) == 0 {
251+ if errors .Is (err , pebble .ErrNotFound ) {
252+ return 0 , nil
253+ }
254+ return 0 , err
255+ }
256+
257+ ubz := binary .LittleEndian .Uint64 (bz )
258+ if ubz > math .MaxInt64 {
259+ return 0 , fmt .Errorf ("earliest version in database overflows int64: %d" , ubz )
260+ }
261+ return int64 (ubz ), nil
229262}
230263
231264func (db * Database ) SetLastRangeHashed (latestHashed int64 ) error {
@@ -253,29 +286,6 @@ func (db *Database) GetLastRangeHashed() (int64, error) {
253286 return cachedValue , nil
254287}
255288
256- // Retrieves earliest version from db
257- func retrieveEarliestVersion (db * pebble.DB ) (int64 , error ) {
258- bz , closer , err := db .Get ([]byte (earliestVersionKey ))
259- if err != nil {
260- if errors .Is (err , pebble .ErrNotFound ) {
261- // in case of a fresh database
262- return 0 , nil
263- }
264-
265- return 0 , err
266- }
267-
268- if len (bz ) == 0 {
269- return 0 , closer .Close ()
270- }
271-
272- ubz := binary .LittleEndian .Uint64 (bz )
273- if ubz > math .MaxInt64 {
274- return 0 , fmt .Errorf ("earliest version in database overflows int64: %d" , ubz )
275- }
276- return int64 (ubz ), closer .Close ()
277- }
278-
279289// SetLatestKey sets the latest key processed during migration.
280290func (db * Database ) SetLatestMigratedKey (key []byte ) error {
281291 return db .storage .Set ([]byte (latestMigratedKeyMetadata ), key , defaultWriteOpts )
@@ -373,6 +383,7 @@ func (db *Database) ApplyChangeset(version int64, cs *proto.NamedChangeSet) erro
373383 version = 1
374384 }
375385
386+ // Create batch and persist latest version in the batch
376387 b , err := NewBatch (db .storage , version )
377388 if err != nil {
378389 return err
@@ -383,17 +394,20 @@ func (db *Database) ApplyChangeset(version int64, cs *proto.NamedChangeSet) erro
383394 if err := b .Delete (cs .Name , kvPair .Key ); err != nil {
384395 return err
385396 }
386- } else {
387- if err := b .Set (cs .Name , kvPair .Key , kvPair .Value ); err != nil {
388- return err
389- }
397+ } else if err := b .Set (cs .Name , kvPair .Key , kvPair .Value ); err != nil {
398+ return err
390399 }
391400 }
392401
393402 // Mark the store as updated
394403 db .storeKeyDirty .Store (cs .Name , version )
395404
396- return b .Write ()
405+ if err := b .Write (); err != nil {
406+ return err
407+ }
408+ // Update latest version on write success
409+ db .latestVersion = version
410+ return nil
397411}
398412
399413func (db * Database ) ApplyChangesetAsync (version int64 , changesets []* proto.NamedChangeSet ) error {
@@ -537,10 +551,6 @@ func (db *Database) writeAsyncInBackground() {
537551 panic (err )
538552 }
539553 }
540- err := db .SetLatestVersion (version )
541- if err != nil {
542- panic (err )
543- }
544554 }
545555 }
546556
0 commit comments