Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 21 additions & 4 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package klevdb

import (
"errors"
"fmt"
"time"

"github.com/klev-dev/klevdb/index"
Expand Down Expand Up @@ -139,29 +140,45 @@ type Log interface {

// Stat stats a store directory, without opening the store
func Stat(dir string, opts Options) (Stats, error) {
return segment.StatDir(dir, index.Params{
stat, err := segment.StatDir(dir, index.Params{
Times: opts.TimeIndex,
Keys: opts.KeyIndex,
})
if err != nil {
return Stats{}, fmt.Errorf("[klevdb.Stat] %s stat: %w", dir, err)
}
return stat, nil
}

// Backup backups a store directory to another location, without opening the store
func Backup(src, dst string) error {
return segment.BackupDir(src, dst)
err := segment.BackupDir(src, dst)
if err != nil {
return fmt.Errorf("[klevdb.Backup] %s backup to %s: %w", src, dst, err)
}
return nil
}

// Check runs an integrity check, without opening the store
func Check(dir string, opts Options) error {
return segment.CheckDir(dir, index.Params{
err := segment.CheckDir(dir, index.Params{
Times: opts.TimeIndex,
Keys: opts.KeyIndex,
})
if err != nil {
return fmt.Errorf("[klevdb.Check] %s check: %w", dir, err)
}
return nil
}

// Recover rewrites the storage to include all messages prior the first that fails an integrity check
func Recover(dir string, opts Options) error {
return segment.RecoverDir(dir, index.Params{
err := segment.RecoverDir(dir, index.Params{
Times: opts.TimeIndex,
Keys: opts.KeyIndex,
})
if err != nil {
return fmt.Errorf("[klevdb.Record] %s recover: %w", dir, err)
}
return nil
}
46 changes: 35 additions & 11 deletions blocking.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package klevdb

import "context"
import (
"context"
"fmt"
)

type BlockingLog interface {
Log
Expand All @@ -12,18 +15,24 @@ type BlockingLog interface {
ConsumeByKeyBlocking(ctx context.Context, key []byte, offset int64, maxCount int64) (nextOffset int64, messages []Message, err error)
}

// TODO docs
func OpenBlocking(dir string, opts Options) (BlockingLog, error) {
l, err := Open(dir, opts)
if err != nil {
return nil, err
return nil, fmt.Errorf("[klevdb.OpenBlocking] %s open: %w", dir, err)
}
return WrapBlocking(l)
w, err := WrapBlocking(l)
if err != nil {
return nil, fmt.Errorf("[klevdb.OpenBlocking] %s wrap: %w", dir, err)
}
return w, nil
}

// TODO docs
func WrapBlocking(l Log) (BlockingLog, error) {
next, err := l.NextOffset()
if err != nil {
return nil, err
return nil, fmt.Errorf("[klevdb.WrapBlocking] %s next offset: %w", l, err)
}
return &blockingLog{l, NewOffsetNotify(next)}, nil
}
Expand All @@ -35,27 +44,42 @@ type blockingLog struct {

func (l *blockingLog) Publish(messages []Message) (int64, error) {
nextOffset, err := l.Log.Publish(messages)
if err != nil {
return OffsetInvalid, fmt.Errorf("[klevdb.BlockingLog.Publish] %s publish: %w", l.Log, err)
}

l.notify.Set(nextOffset)
return nextOffset, err
return nextOffset, nil
}

func (l *blockingLog) ConsumeBlocking(ctx context.Context, offset int64, maxCount int64) (int64, []Message, error) {
if err := l.notify.Wait(ctx, offset); err != nil {
return 0, nil, err
return OffsetInvalid, nil, fmt.Errorf("[klevdb.BlockingLog.ConsumeBlocking] %s wait: %w", l.Log, err)
}
return l.Log.Consume(offset, maxCount)
next, msgs, err := l.Log.Consume(offset, maxCount)
if err != nil {
return OffsetInvalid, nil, fmt.Errorf("[klevdb.BlockingLog.ConsumeBlocking] %s consume: %w", l.Log, err)
}
return next, msgs, nil
}

func (l *blockingLog) ConsumeByKeyBlocking(ctx context.Context, key []byte, offset int64, maxCount int64) (int64, []Message, error) {
if err := l.notify.Wait(ctx, offset); err != nil {
return 0, nil, err
return OffsetInvalid, nil, fmt.Errorf("[klevdb.BlockingLog.ConsumeByKeyBlocking] %s wait: %w", l.Log, err)
}
return l.Log.ConsumeByKey(key, offset, maxCount)
next, msgs, err := l.Log.ConsumeByKey(key, offset, maxCount)
if err != nil {
return OffsetInvalid, nil, fmt.Errorf("[klevdb.BlockingLog.ConsumeByKeyBlocking] %s consume: %w", l.Log, err)
}
return next, msgs, nil
}

func (l *blockingLog) Close() error {
if err := l.notify.Close(); err != nil {
return err
return fmt.Errorf("[klevdb.BlockingLog.Close] %s notify close: %w", l.Log, err)
}
if err := l.Log.Close(); err != nil {
return fmt.Errorf("[klevdb.BlockingLog.Close] %s close: %w", l.Log, err)
}
return l.Log.Close()
return nil
}
17 changes: 11 additions & 6 deletions compact/deletes.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package compact

import (
"context"
"fmt"
"time"

art "github.com/plar/go-adaptive-radix-tree/v2"
Expand All @@ -17,7 +18,7 @@ import (
func FindDeletes(ctx context.Context, l klevdb.Log, before time.Time) (map[int64]struct{}, error) {
maxOffset, err := l.NextOffset()
if err != nil {
return nil, err
return nil, fmt.Errorf("[compact.FindDeletes] %s next offset: %w", l, err)
}

var keyOffset = art.New()
Expand All @@ -27,7 +28,7 @@ SEARCH:
for offset := klevdb.OffsetOldest; offset < maxOffset; {
nextOffset, msgs, err := l.Consume(offset, 32)
if err != nil {
return nil, err
return nil, fmt.Errorf("[compact.FindDeletes] %s consume %d: %w", l, offset, err)
}
offset = nextOffset

Expand All @@ -51,12 +52,12 @@ SEARCH:
}

if err := ctx.Err(); err != nil {
return nil, err
return nil, fmt.Errorf("[compact.FindDeletes] %s canceled %d: %w", l, offset, err)
}
}

if err := ctx.Err(); err != nil {
return nil, err
return nil, fmt.Errorf("[compact.FindDeletes] %s canceled: %w", l, err)
}

return offsets, nil
Expand All @@ -72,7 +73,11 @@ SEARCH:
func Deletes(ctx context.Context, l klevdb.Log, before time.Time) (map[int64]struct{}, int64, error) {
offsets, err := FindDeletes(ctx, l, before)
if err != nil {
return nil, 0, err
return nil, 0, fmt.Errorf("[compact.Deletes] %s find: %w", l, err)
}
return l.Delete(offsets)
m, sz, err := l.Delete(offsets)
if err != nil {
return nil, 0, fmt.Errorf("[compact.Deletes] %s delete: %w", l, err)
}
return m, sz, nil
}
17 changes: 11 additions & 6 deletions compact/updates.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package compact

import (
"context"
"fmt"
"time"

art "github.com/plar/go-adaptive-radix-tree/v2"
Expand All @@ -17,7 +18,7 @@ import (
func FindUpdates(ctx context.Context, l klevdb.Log, before time.Time) (map[int64]struct{}, error) {
maxOffset, err := l.NextOffset()
if err != nil {
return nil, err
return nil, fmt.Errorf("[compact.FindUpdates] %s next offset: %w", l, err)
}

var keyOffset = art.New()
Expand All @@ -27,7 +28,7 @@ SEARCH:
for offset := klevdb.OffsetOldest; offset < maxOffset; {
nextOffset, msgs, err := l.Consume(offset, 32)
if err != nil {
return nil, err
return nil, fmt.Errorf("[compact.FindUpdates] %s consume %d: %w", l, offset, err)
}
offset = nextOffset

Expand All @@ -42,12 +43,12 @@ SEARCH:
}

if err := ctx.Err(); err != nil {
return nil, err
return nil, fmt.Errorf("[compact.FindUpdates] %s canceled %d: %w", l, offset, err)
}
}

if err := ctx.Err(); err != nil {
return nil, err
return nil, fmt.Errorf("[compact.FindUpdates] %s canceled: %w", l, err)
}

return offsets, nil
Expand All @@ -63,7 +64,11 @@ SEARCH:
func Updates(ctx context.Context, l klevdb.Log, before time.Time) (map[int64]struct{}, int64, error) {
offsets, err := FindUpdates(ctx, l, before)
if err != nil {
return nil, 0, err
return nil, 0, fmt.Errorf("[compact.Updates] %s find: %w", l, err)
}
return l.Delete(offsets)
m, sz, err := l.Delete(offsets)
if err != nil {
return nil, 0, fmt.Errorf("[compact.Updates] %s delete: %w", l, err)
}
return m, sz, nil
}
7 changes: 4 additions & 3 deletions delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package klevdb

import (
"context"
"fmt"
"time"

"golang.org/x/exp/maps"
Expand All @@ -24,7 +25,7 @@ func DeleteMultiWithWait(d time.Duration) DeleteMultiBackoff {
case <-t.C:
return nil
case <-ctx.Done():
return ctx.Err()
return fmt.Errorf("[klevdb.DeleteMultiWithWait] canceled: %w", ctx.Err())
}
}
}
Expand All @@ -48,7 +49,7 @@ func DeleteMulti(ctx context.Context, l Log, offsets map[int64]struct{}, backoff
deleted, size, err := l.Delete(offsets)
switch {
case err != nil:
return deletedOffsets, deletedSize, err
return deletedOffsets, deletedSize, fmt.Errorf("[klevdb.DeleteMulti] delete: %w", err)
case len(deleted) == 0:
return deletedOffsets, deletedSize, nil
}
Expand All @@ -61,7 +62,7 @@ func DeleteMulti(ctx context.Context, l Log, offsets map[int64]struct{}, backoff
})

if err := backoff(ctx); err != nil {
return deletedOffsets, deletedSize, err
return deletedOffsets, deletedSize, fmt.Errorf("[klevdb.DeleteMulti] backoff: %w", err)
}
}

Expand Down
Loading
Loading