Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,5 @@ bench-multi:

.PHONY: update-libs
update-libs:
go get -u github.com/klev-dev/kleverr@main
go get -u ./...
go mod tidy
11 changes: 9 additions & 2 deletions blocking.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,18 @@ package klevdb

import "context"

// BlockingLog enhances log adding blocking consume
type BlockingLog interface {
Log

// ConsumeBlocking is similar to Consume, but if offset is equal to the next offsetit will block until next event is produced
// ConsumeBlocking is similar to Consume, but if offset is equal to the next offset it will block until next message is produced
ConsumeBlocking(ctx context.Context, offset int64, maxCount int64) (nextOffset int64, messages []Message, err error)

// ConsumeByKeyBlocking is similar to ConsumeBlocking, but only returns messages matching the key
ConsumeByKeyBlocking(ctx context.Context, key []byte, offset int64, maxCount int64) (nextOffset int64, messages []Message, err error)
}

// OpenBlocking opens log and wraps it with support for blocking consume
func OpenBlocking(dir string, opts Options) (BlockingLog, error) {
l, err := Open(dir, opts)
if err != nil {
Expand All @@ -20,6 +22,7 @@ func OpenBlocking(dir string, opts Options) (BlockingLog, error) {
return WrapBlocking(l)
}

// WrapBlocking wraps log with support for blocking consume
func WrapBlocking(l Log) (BlockingLog, error) {
next, err := l.NextOffset()
if err != nil {
Expand All @@ -35,8 +38,12 @@ type blockingLog struct {

func (l *blockingLog) Publish(messages []Message) (int64, error) {
nextOffset, err := l.Log.Publish(messages)
if err != nil {
return OffsetInvalid, err
}

l.notify.Set(nextOffset)
return nextOffset, err
return nextOffset, nil
}

func (l *blockingLog) ConsumeBlocking(ctx context.Context, offset int64, maxCount int64) (int64, []Message, error) {
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ toolchain go1.23.1

require (
github.com/gofrs/flock v0.12.1
github.com/klev-dev/kleverr v0.1.0
github.com/mr-tron/base58 v1.2.0
github.com/plar/go-adaptive-radix-tree/v2 v2.0.3
github.com/stretchr/testify v1.9.0
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/gofrs/flock v0.12.1 h1:MTLVXXHf8ekldpJk3AKicLij9MdwOWkZ+a/jHHZby9E=
github.com/gofrs/flock v0.12.1/go.mod h1:9zxTsyu5xtJ9DK+1tFZyibEV7y3uwDxPPfbxeeHCoD0=
github.com/klev-dev/kleverr v0.1.0 h1:UnBDKFlHFy6bnN5M/fQ3uCI4G91ciCf1jX3dj1EqL9k=
github.com/klev-dev/kleverr v0.1.0/go.mod h1:DV1tEcfsgAzKraeb/7nux27wOJs8w9P8fLB6GT7DmGM=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
Expand Down
39 changes: 17 additions & 22 deletions index/format.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ package index
import (
"encoding/binary"
"errors"
"fmt"
"io"
"os"

"github.com/klev-dev/kleverr"
)

var ErrCorrupted = errors.New("index corrupted")
var errIndexSize = fmt.Errorf("%w: unaligned index size", ErrCorrupted)

type Writer struct {
opts Params
Expand All @@ -22,20 +22,18 @@ type Writer struct {
func OpenWriter(path string, opts Params) (*Writer, error) {
f, err := os.OpenFile(path, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0600)
if err != nil {
return nil, kleverr.Newf("could not open index: %w", err)
return nil, fmt.Errorf("write index open: %w", err)
}

pos := int64(0)
if stat, err := f.Stat(); err != nil {
return nil, kleverr.Newf("could not stat index: %w", err)
} else {
pos = stat.Size()
stat, err := f.Stat()
if err != nil {
return nil, fmt.Errorf("write index stat: %w", err)
}

return &Writer{
opts: opts,
f: f,
pos: pos,
pos: stat.Size(),
keyOffset: opts.keyOffset(),
}, nil
}
Expand All @@ -57,7 +55,7 @@ func (w *Writer) Write(it Item) error {
}

if n, err := w.f.Write(w.buff); err != nil {
return kleverr.Newf("failed to write index: %w", err)
return fmt.Errorf("write index: %w", err)
} else {
w.pos += int64(n)
}
Expand All @@ -71,26 +69,23 @@ func (w *Writer) Size() int64 {

func (w *Writer) Sync() error {
if err := w.f.Sync(); err != nil {
return kleverr.Newf("could not sync index: %w", err)
return fmt.Errorf("write index sync: %w", err)
}
return nil
}

func (w *Writer) Close() error {
if err := w.f.Close(); err != nil {
return kleverr.Newf("could not close index: %w", err)
return fmt.Errorf("write index close: %w", err)
}
return nil
}

func (w *Writer) SyncAndClose() error {
if err := w.f.Sync(); err != nil {
return kleverr.Newf("could not sync index: %w", err)
}
if err := w.f.Close(); err != nil {
return kleverr.Newf("could not close index: %w", err)
if err := w.Sync(); err != nil {
return err
}
return nil
return w.Close()
}

func Write(path string, opts Params, index []Item) error {
Expand All @@ -112,24 +107,24 @@ func Write(path string, opts Params, index []Item) error {
func Read(path string, opts Params) ([]Item, error) {
f, err := os.Open(path)
if err != nil {
return nil, kleverr.Newf("could not open index: %w", err)
return nil, fmt.Errorf("read index open: %w", err)
}
defer f.Close()

stat, err := os.Stat(path)
if err != nil {
return nil, kleverr.Newf("could not stat index: %w", err)
return nil, fmt.Errorf("read index stat: %w", err)
}
dataSize := stat.Size()

itemSize := opts.Size()
if dataSize%itemSize > 0 {
return nil, kleverr.Newf("%w: unexpected data len: %d", ErrCorrupted, dataSize)
return nil, errIndexSize
}

data := make([]byte, dataSize)
if _, err = io.ReadFull(f, data); err != nil {
return nil, kleverr.Newf("could not read index: %w", err)
return nil, fmt.Errorf("read index: %w", err)
}

var keyOffset = opts.keyOffset()
Expand Down
36 changes: 21 additions & 15 deletions log.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package klevdb

import (
"errors"
"fmt"
"os"
"path/filepath"
"sync"
Expand All @@ -14,9 +15,14 @@ import (
"github.com/klev-dev/klevdb/index"
"github.com/klev-dev/klevdb/message"
"github.com/klev-dev/klevdb/segment"
"github.com/klev-dev/kleverr"
)

var errNoKeyIndex = fmt.Errorf("%w by key", ErrNoIndex)
var errKeyNotFound = fmt.Errorf("key %w", ErrNotFound)
var errNoTimeIndex = fmt.Errorf("%w by time", ErrNoIndex)
var errTimeNotFound = fmt.Errorf("time %w", ErrNotFound)
var errDeleteRelative = fmt.Errorf("%w: delete relative offsets", message.ErrInvalidOffset)

// Open create a log based on a dir and set of options
func Open(dir string, opts Options) (result Log, err error) {
if opts.Rollover == 0 {
Expand All @@ -25,30 +31,30 @@ func Open(dir string, opts Options) (result Log, err error) {

if opts.CreateDirs {
if err := os.MkdirAll(dir, 0700); err != nil {
return nil, kleverr.Newf("could not create log dirs: %w", err)
return nil, fmt.Errorf("open create dirs: %w", err)
}
}

lock := flock.New(filepath.Join(dir, ".lock"))
if opts.Readonly {
switch ok, err := lock.TryRLock(); {
case err != nil:
return nil, kleverr.Newf("could not lock: %w", err)
return nil, fmt.Errorf("open read lock: %w", err)
case !ok:
return nil, kleverr.Newf("log already writing locked")
return nil, fmt.Errorf("open already writing locked")
}
} else {
switch ok, err := lock.TryLock(); {
case err != nil:
return nil, kleverr.Newf("could not lock: %w", err)
return nil, fmt.Errorf("open lock: %w", err)
case !ok:
return nil, kleverr.Newf("log already locked")
return nil, fmt.Errorf("open already locked")
}
}
defer func() {
if err != nil {
if lerr := lock.Unlock(); lerr != nil {
err = kleverr.Newf("%w: could not release lock: %w", err, lerr)
err = fmt.Errorf("%w: open release lock: %w", err, lerr)
}
}
}()
Expand Down Expand Up @@ -206,7 +212,7 @@ func (l *log) Consume(offset int64, maxCount int64) (int64, []message.Message, e

func (l *log) ConsumeByKey(key []byte, offset int64, maxCount int64) (int64, []message.Message, error) {
if !l.opts.KeyIndex {
return OffsetInvalid, nil, kleverr.Newf("%w by key", ErrNoIndex)
return OffsetInvalid, nil, errNoKeyIndex
}

hash := index.KeyHashEncoded(index.KeyHash(key))
Expand Down Expand Up @@ -247,7 +253,7 @@ func (l *log) Get(offset int64) (message.Message, error) {

func (l *log) GetByKey(key []byte) (message.Message, error) {
if !l.opts.KeyIndex {
return message.Invalid, kleverr.Newf("%w by key", ErrNoIndex)
return message.Invalid, errNoKeyIndex
}

hash := index.KeyHashEncoded(index.KeyHash(key))
Expand All @@ -269,7 +275,7 @@ func (l *log) GetByKey(key []byte) (message.Message, error) {
}

// not in any segment, so just return the error
return message.Invalid, kleverr.Newf("key %w", message.ErrNotFound)
return message.Invalid, errKeyNotFound
}

func (l *log) OffsetByKey(key []byte) (int64, error) {
Expand All @@ -282,7 +288,7 @@ func (l *log) OffsetByKey(key []byte) (int64, error) {

func (l *log) GetByTime(start time.Time) (message.Message, error) {
if !l.opts.TimeIndex {
return message.Invalid, kleverr.Newf("%w by time", ErrNoIndex)
return message.Invalid, errNoTimeIndex
}

ts := start.UnixMicro()
Expand Down Expand Up @@ -314,7 +320,7 @@ func (l *log) GetByTime(start time.Time) (message.Message, error) {
}
}

return message.Invalid, kleverr.Newf("time %w", message.ErrNotFound)
return message.Invalid, errTimeNotFound
}

func (l *log) OffsetByTime(start time.Time) (int64, time.Time, error) {
Expand Down Expand Up @@ -371,7 +377,7 @@ func (l *log) Delete(offsets map[int64]struct{}) (map[int64]struct{}, int64, err

newWriter, newReader, err := l.writer.Delete(rs)
switch {
case errors.Is(err, errSegmentChanged):
case err == errSegmentChanged:
return nil, 0, nil
case err != nil:
return nil, 0, err
Expand Down Expand Up @@ -419,7 +425,7 @@ func (l *log) findDeleteReader(offsets map[int64]struct{}) (*reader, error) {
lowestOffset := orderedOffsets[0]

if lowestOffset < 0 {
return nil, kleverr.Newf("%w: cannot delete relative offsets", message.ErrInvalidOffset)
return nil, errDeleteRelative
}

l.readersMu.RLock()
Expand Down Expand Up @@ -544,7 +550,7 @@ func (l *log) Close() error {
}

if err := l.lock.Unlock(); err != nil {
return kleverr.Newf("could not release lock: %w", err)
return fmt.Errorf("close unlock: %w", err)
}

return nil
Expand Down
15 changes: 3 additions & 12 deletions log_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (

"github.com/klev-dev/klevdb/message"
"github.com/klev-dev/klevdb/segment"
"github.com/klev-dev/kleverr"
)

func publishBatched(t *testing.T, l Log, msgs []Message, batchLen int) {
Expand Down Expand Up @@ -1556,7 +1555,7 @@ func testConcurrentPubsubRecent(t *testing.T) {
for ctx.Err() == nil {
next, msgs, err := s.Consume(offset, 32)
if err != nil {
return kleverr.Newf("could not consume offset %d: %w", offset, err)
return fmt.Errorf("could not consume offset %d: %w", offset, err)
}

if offset == next {
Expand Down Expand Up @@ -1591,11 +1590,7 @@ func testConcurrentPubsubRecent(t *testing.T) {
return nil
})

err = g.Wait()
if serr := kleverr.Get(err); serr != nil {
fmt.Println(serr.Print())
}
require.NoError(t, err)
require.NoError(t, g.Wait())
}

func testConcurrentConsume(t *testing.T) {
Expand Down Expand Up @@ -1742,9 +1737,5 @@ func testConcurrentGC(t *testing.T) {
return nil
})

err = g.Wait()
if serr := kleverr.Get(err); serr != nil {
fmt.Println(serr.Print())
}
require.NoError(t, err)
require.NoError(t, g.Wait())
}
Loading
Loading