Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 99 additions & 51 deletions plugins/snapshots/devbox/storage/bolt.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ func CreateSnapshot(ctx context.Context, kind snapshots.Kind, key, parent string
}
}

err = createBucketIfNotExists(ctx, func(ctx context.Context, bkt, pbkt *bolt.Bucket) error {
err = createBucketIfNotExists(ctx, func(ctx context.Context, _ *bolt.Bucket, bkt, pbkt *bolt.Bucket) error {
var spbkt *bolt.Bucket
if parent != "" {
spbkt = bkt.Bucket([]byte(parent))
Expand Down Expand Up @@ -315,7 +315,7 @@ func CommitActive(ctx context.Context, key, name string, usage snapshots.Usage,
}

var id uint64
err := createBucketIfNotExists(ctx, func(ctx context.Context, bkt, pbkt *bolt.Bucket) error {
err := createBucketIfNotExists(ctx, func(ctx context.Context, version, bkt, pbkt *bolt.Bucket) error {
if cbkt := bkt.Bucket([]byte(name)); cbkt != nil {
return fmt.Errorf("snapshot %q already exists: %w", name, errdefs.ErrAlreadyExists)
}
Expand Down Expand Up @@ -357,7 +357,7 @@ func CommitActive(ctx context.Context, key, name string, usage snapshots.Usage,
if err := cbkt.Put(DevboxKeyContentID, contentID); err != nil {
return err
}
root := pbkt.Bucket(DevboxStoragePathBucket)
root := version.Bucket(DevboxStoragePathBucket)
if root != nil {
if contentBkt := root.Bucket(contentID); contentBkt != nil {
snapshotKey := contentBkt.Get(DevboxKeySnapshotKey)
Expand Down Expand Up @@ -473,12 +473,15 @@ func GetSnapshotDevboxInfo(ctx context.Context, key string) (string, string, err
contentID string
mountPath string
)
err := withSnapshotBucket(ctx, key, func(ctx context.Context, bkt, pbkt *bolt.Bucket) error {
contentID = string(bkt.Get(DevboxKeyContentID))
err := withDevboxStorageBucket(ctx, func(ctx context.Context, bkt, root *bolt.Bucket) error {
sbkt := bkt.Bucket([]byte(key))
if sbkt == nil {
return fmt.Errorf("snapshot %q does not exist: %w", key, errdefs.ErrNotFound)
}
contentID = string(sbkt.Get(DevboxKeyContentID))
if contentID == "" {
return nil
}
root := pbkt.Bucket(DevboxStoragePathBucket)
if root == nil {
return nil
}
Expand All @@ -497,7 +500,7 @@ func GetSnapshotDevboxInfo(ctx context.Context, key string) (string, string, err

// SetDevboxContent associates a snapshot key with a devbox content record.
func SetDevboxContent(ctx context.Context, key, contentID, lvName, mountPath string) error {
return withBucket(ctx, func(ctx context.Context, bkt, pbkt *bolt.Bucket) error {
return withCreatedDevboxStorageBucket(ctx, func(ctx context.Context, bkt, root *bolt.Bucket) error {
sbkt := bkt.Bucket([]byte(key))
if sbkt == nil {
return fmt.Errorf("snapshot %q does not exist: %w", key, errdefs.ErrNotFound)
Expand All @@ -506,10 +509,6 @@ func SetDevboxContent(ctx context.Context, key, contentID, lvName, mountPath str
return err
}

root, err := pbkt.CreateBucketIfNotExists(DevboxStoragePathBucket)
if err != nil {
return err
}
cbkt, err := root.CreateBucketIfNotExists([]byte(contentID))
if err != nil {
return err
Expand All @@ -534,7 +533,7 @@ func RemoveDevbox(ctx context.Context, key string) (string, error) {
var mountPath string

log.G(ctx).WithField("key", key).Warnf("[REMOVE-DEVBOX-TRACE] RemoveDevbox called with key")
err := withBucket(ctx, func(ctx context.Context, bkt, pbkt *bolt.Bucket) error {
err := withDevboxStorageBucket(ctx, func(ctx context.Context, bkt, root *bolt.Bucket) error {
sbkt := bkt.Bucket([]byte(key))
if sbkt == nil {
log.G(ctx).WithField("key", key).Warnf("[REMOVE-DEVBOX-TRACE] devbox snapshot bucket for key %s does not exist", key)
Expand All @@ -545,7 +544,6 @@ func RemoveDevbox(ctx context.Context, key string) (string, error) {
return nil
}

root := pbkt.Bucket(DevboxStoragePathBucket)
if root == nil {
return nil
}
Expand Down Expand Up @@ -577,11 +575,10 @@ func RemoveDevbox(ctx context.Context, key string) (string, error) {
}

// GetDevboxLvName returns the LV name for a content ID. If snapshotKey is
// provided, it is used as a fallback lookup path.
// provided, it is used to resolve the content ID from the snapshot record.
func GetDevboxLvName(ctx context.Context, contentID, snapshotKey string) (string, error) {
var lvName string
err := withBucket(ctx, func(ctx context.Context, bkt, pbkt *bolt.Bucket) error {
root := pbkt.Bucket(DevboxStoragePathBucket)
err := withDevboxStorageBucket(ctx, func(ctx context.Context, bkt, root *bolt.Bucket) error {
if root != nil && contentID != "" {
if cbkt := root.Bucket([]byte(contentID)); cbkt != nil {
lvName = string(cbkt.Get(DevboxKeyLvName))
Expand Down Expand Up @@ -618,8 +615,7 @@ func GetDevboxLvName(ctx context.Context, contentID, snapshotKey string) (string
// attached to any snapshot and are ready for LV cleanup.
func GetRemovedDevboxContents(ctx context.Context) ([]RemovedDevboxContent, error) {
var contents []RemovedDevboxContent
err := withBucket(ctx, func(ctx context.Context, bkt, pbkt *bolt.Bucket) error {
root := pbkt.Bucket(DevboxStoragePathBucket)
err := withDevboxStorageBucket(ctx, func(ctx context.Context, bkt, root *bolt.Bucket) error {
if root == nil {
return nil
}
Expand Down Expand Up @@ -657,8 +653,7 @@ func GetRemovedDevboxContents(ctx context.Context) ([]RemovedDevboxContent, erro
// GetDevboxLvNames returns all devbox LV names keyed by LV name.
func GetDevboxLvNames(ctx context.Context) (map[string]struct{}, error) {
names := map[string]struct{}{}
err := withBucket(ctx, func(ctx context.Context, bkt, pbkt *bolt.Bucket) error {
root := pbkt.Bucket(DevboxStoragePathBucket)
err := withDevboxStorageBucket(ctx, func(ctx context.Context, bkt, root *bolt.Bucket) error {
if root == nil {
return nil
}
Expand All @@ -681,8 +676,7 @@ func GetDevboxLvNames(ctx context.Context) (map[string]struct{}, error) {
}

func DeleteDevboxContent(ctx context.Context, contentID string) error {
return withBucket(ctx, func(ctx context.Context, bkt, pbkt *bolt.Bucket) error {
root := pbkt.Bucket(DevboxStoragePathBucket)
return withDevboxStorageBucket(ctx, func(ctx context.Context, bkt, root *bolt.Bucket) error {
if root == nil {
return errdefs.ErrNotFound
}
Expand All @@ -694,7 +688,7 @@ func DeleteDevboxContent(ctx context.Context, contentID string) error {
// returns the recorded mount path.
func SetUnmountedWithKey(ctx context.Context, key string) (string, error) {
var mountPath string
err := withBucket(ctx, func(ctx context.Context, bkt, pbkt *bolt.Bucket) error {
err := withDevboxStorageBucket(ctx, func(ctx context.Context, bkt, root *bolt.Bucket) error {
sbkt := bkt.Bucket([]byte(key))
if sbkt == nil {
return fmt.Errorf("snapshot %q does not exist: %w", key, errdefs.ErrNotFound)
Expand All @@ -704,7 +698,6 @@ func SetUnmountedWithKey(ctx context.Context, key string) (string, error) {
return errdefs.ErrNotFound
}

root := pbkt.Bucket(DevboxStoragePathBucket)
if root == nil {
return errdefs.ErrNotFound
}
Expand All @@ -726,8 +719,7 @@ func SetUnmountedWithKey(ctx context.Context, key string) (string, error) {

// SetDevboxContentStatusRemoved marks a devbox content record as removed.
func SetDevboxContentStatusRemoved(ctx context.Context, contentID string) error {
return withBucket(ctx, func(ctx context.Context, bkt, pbkt *bolt.Bucket) error {
root := pbkt.Bucket(DevboxStoragePathBucket)
return withDevboxStorageBucket(ctx, func(ctx context.Context, bkt, root *bolt.Bucket) error {
if root == nil {
return errdefs.ErrNotFound
}
Expand All @@ -751,7 +743,7 @@ func withBucket(ctx context.Context, fn func(context.Context, *bolt.Bucket, *bol
return fn(ctx, version.Bucket(bucketKeySnapshot), version.Bucket(bucketKeyParents))
}

func createBucketIfNotExists(ctx context.Context, fn func(context.Context, *bolt.Bucket, *bolt.Bucket) error) error {
func createBucketIfNotExists(ctx context.Context, fn func(context.Context, *bolt.Bucket, *bolt.Bucket, *bolt.Bucket) error) error {
tx, ok := ctx.Value(transactionKey{}).(*bolt.Tx)
if !ok || tx == nil {
return ErrNoTransaction
Expand All @@ -769,7 +761,7 @@ func createBucketIfNotExists(ctx context.Context, fn func(context.Context, *bolt
if err != nil {
return fmt.Errorf("failed to create parents bucket: %w", err)
}
return fn(ctx, bkt, pbkt)
return fn(ctx, version, bkt, pbkt)
}

func withSnapshotBucket(ctx context.Context, key string, fn func(context.Context, *bolt.Bucket, *bolt.Bucket) error) error {
Expand All @@ -792,6 +784,42 @@ func withSnapshotBucket(ctx context.Context, key string, fn func(context.Context
return fn(ctx, sbkt, version.Bucket(bucketKeyParents))
}

func withDevboxStorageBucket(ctx context.Context, fn func(context.Context, *bolt.Bucket, *bolt.Bucket) error) error {
tx, ok := ctx.Value(transactionKey{}).(*bolt.Tx)
if !ok || tx == nil {
return ErrNoTransaction
}
version := tx.Bucket(bucketKeyStorageVersion)
if version == nil {
return fmt.Errorf("bucket does not exist: %w", errdefs.ErrNotFound)
}
bkt := version.Bucket(bucketKeySnapshot)
if bkt == nil {
return fmt.Errorf("snapshots bucket does not exist: %w", errdefs.ErrNotFound)
}
return fn(ctx, bkt, version.Bucket(DevboxStoragePathBucket))
}

func withCreatedDevboxStorageBucket(ctx context.Context, fn func(context.Context, *bolt.Bucket, *bolt.Bucket) error) error {
tx, ok := ctx.Value(transactionKey{}).(*bolt.Tx)
if !ok || tx == nil {
return ErrNoTransaction
}
version := tx.Bucket(bucketKeyStorageVersion)
if version == nil {
return fmt.Errorf("bucket does not exist: %w", errdefs.ErrNotFound)
}
bkt := version.Bucket(bucketKeySnapshot)
if bkt == nil {
return fmt.Errorf("snapshots bucket does not exist: %w", errdefs.ErrNotFound)
}
root, err := version.CreateBucketIfNotExists(DevboxStoragePathBucket)
if err != nil {
return err
}
return fn(ctx, bkt, root)
}

func sequenceNext(bkt *bolt.Bucket) (uint64, error) {
return bkt.NextSequence()
}
Expand Down Expand Up @@ -832,50 +860,70 @@ func readSnapshot(bkt *bolt.Bucket, id *uint64, si *snapshots.Info) error {
}

func putID(bkt *bolt.Bucket, id uint64) error {
buf := make([]byte, 8)
binary.BigEndian.PutUint64(buf, id)
return bkt.Put(bucketKeyID, buf)
idEncoded, err := encodeID(id)
if err != nil {
return err
}
return bkt.Put(bucketKeyID, idEncoded)
}

func readID(bkt *bolt.Bucket) uint64 {
v := bkt.Get(bucketKeyID)
if len(v) == 0 {
return 0
}
return binary.BigEndian.Uint64(v)
id, _ := binary.Uvarint(bkt.Get(bucketKeyID))
return id
}

func putKind(bkt *bolt.Bucket, kind snapshots.Kind) error {
buf := make([]byte, 8)
binary.BigEndian.PutUint64(buf, uint64(kind))
return bkt.Put(bucketKeyKind, buf)
return bkt.Put(bucketKeyKind, []byte{byte(kind)})
}

func readKind(bkt *bolt.Bucket) snapshots.Kind {
v := bkt.Get(bucketKeyKind)
if len(v) == 0 {
if len(v) != 1 {
return 0
}
return snapshots.Kind(binary.BigEndian.Uint64(v))
return snapshots.Kind(v[0])
}

func putUsage(bkt *bolt.Bucket, usage snapshots.Usage) error {
buf := make([]byte, 8)
binary.BigEndian.PutUint64(buf, uint64(usage.Inodes))
if err := bkt.Put(bucketKeyInodes, buf); err != nil {
return err
for _, entry := range []struct {
key []byte
value int64
}{
{bucketKeyInodes, usage.Inodes},
{bucketKeySize, usage.Size},
} {
encoded, err := encodeSize(entry.value)
if err != nil {
return err
}
if err := bkt.Put(entry.key, encoded); err != nil {
return err
}
}
binary.BigEndian.PutUint64(buf, uint64(usage.Size))
return bkt.Put(bucketKeySize, buf)
return nil
}

func getUsage(bkt *bolt.Bucket, usage *snapshots.Usage) {
if v := bkt.Get(bucketKeyInodes); len(v) > 0 {
usage.Inodes = int64(binary.BigEndian.Uint64(v))
usage.Inodes, _ = binary.Varint(bkt.Get(bucketKeyInodes))
usage.Size, _ = binary.Varint(bkt.Get(bucketKeySize))
}

func encodeID(id uint64) ([]byte, error) {
var buf [binary.MaxVarintLen64]byte
encoded := buf[:binary.PutUvarint(buf[:], id)]
if len(encoded) == 0 {
return nil, fmt.Errorf("failed encoding id = %v", id)
}
if v := bkt.Get(bucketKeySize); len(v) > 0 {
usage.Size = int64(binary.BigEndian.Uint64(v))
return encoded, nil
}

func encodeSize(size int64) ([]byte, error) {
var buf [binary.MaxVarintLen64]byte
encoded := buf[:binary.PutVarint(buf[:], size)]
if len(encoded) == 0 {
return nil, fmt.Errorf("failed encoding size = %v", size)
}
return encoded, nil
}

var (
Expand Down
Loading