Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions index/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ package index

import (
"encoding/binary"
"fmt"
"hash/fnv"

art "github.com/plar/go-adaptive-radix-tree/v2"

"github.com/klev-dev/klevdb/message"
art "github.com/plar/go-adaptive-radix-tree/v2"
)

var ErrKeyNotFound = fmt.Errorf("key: %w", message.ErrNotFound)

func KeyHash(key []byte) uint64 {
hasher := fnv.New64a()
hasher.Write(key)
Expand Down Expand Up @@ -41,5 +43,5 @@ func Keys(keys art.Tree, keyHash []byte) ([]int64, error) {
return v.([]int64), nil
}

return nil, message.ErrNotFound
return nil, ErrKeyNotFound
}
6 changes: 3 additions & 3 deletions index/keys_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ func TestKeys(t *testing.T) {
t.Run("Empty", func(t *testing.T) {
keys := art.New()
pos, err := Keys(keys, KeyHashEncoded(1))
require.ErrorIs(t, message.ErrNotFound, err)
require.ErrorIs(t, err, message.ErrNotFound)
require.Empty(t, pos)
})

Expand All @@ -27,7 +27,7 @@ func TestKeys(t *testing.T) {
require.ElementsMatch(t, []int64{item.Position}, pos)

pos, err = Keys(keys, KeyHashEncoded(321))
require.ErrorIs(t, message.ErrNotFound, err)
require.ErrorIs(t, err, message.ErrNotFound)
require.Empty(t, pos)
})

Expand All @@ -48,7 +48,7 @@ func TestKeys(t *testing.T) {
require.ElementsMatch(t, []int64{item3.Position}, pos)

pos, err = Keys(keys, KeyHashEncoded(321))
require.ErrorIs(t, message.ErrNotFound, err)
require.ErrorIs(t, err, message.ErrNotFound)
require.Empty(t, pos)
})
}
17 changes: 10 additions & 7 deletions index/offset.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@ import (
"github.com/klev-dev/klevdb/message"
)

var ErrIndexEmpty = fmt.Errorf("%w: no items", message.ErrInvalidOffset)
var ErrOffsetIndexEmpty = fmt.Errorf("%w: no offset items", message.ErrInvalidOffset)
var ErrOffsetBeforeStart = fmt.Errorf("%w: offset before start", message.ErrNotFound)
var ErrOffsetAfterEnd = fmt.Errorf("%w: offset after end", message.ErrInvalidOffset)
var ErrOffsetNotFound = fmt.Errorf("%w: offset not found", message.ErrNotFound)

func Consume(items []Item, offset int64) (int64, int64, error) {
if len(items) == 0 {
return 0, 0, ErrIndexEmpty
return 0, 0, ErrOffsetIndexEmpty
}

switch offset {
Expand All @@ -32,7 +35,7 @@ func Consume(items []Item, offset int64) (int64, int64, error) {
endItem := items[endIndex]
switch {
case offset > endItem.Offset:
return 0, 0, message.ErrInvalidOffset
return 0, 0, ErrOffsetAfterEnd
case offset == endItem.Offset:
return endItem.Position, endItem.Position, nil
}
Expand All @@ -55,7 +58,7 @@ func Consume(items []Item, offset int64) (int64, int64, error) {

func Get(items []Item, offset int64) (int64, error) {
if len(items) == 0 {
return 0, ErrIndexEmpty
return 0, ErrOffsetIndexEmpty
}

switch offset {
Expand All @@ -69,7 +72,7 @@ func Get(items []Item, offset int64) (int64, error) {
beginItem := items[beginIndex]
switch {
case offset < beginItem.Offset:
return 0, message.ErrNotFound
return 0, ErrOffsetBeforeStart
case offset == beginItem.Offset:
return beginItem.Position, nil
}
Expand All @@ -78,7 +81,7 @@ func Get(items []Item, offset int64) (int64, error) {
endItem := items[endIndex]
switch {
case offset > endItem.Offset:
return 0, message.ErrNotFound
return 0, ErrOffsetAfterEnd
case offset == endItem.Offset:
return endItem.Position, nil
}
Expand All @@ -96,5 +99,5 @@ func Get(items []Item, offset int64) (int64, error) {
}
}

return 0, message.ErrNotFound
return 0, ErrOffsetNotFound
}
34 changes: 17 additions & 17 deletions index/offset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,28 +25,28 @@ func TestConsume(t *testing.T) {
err error
}{
// empty tests
{items: nil, offset: 0, err: ErrIndexEmpty},
{items: nil, offset: 0, err: ErrOffsetIndexEmpty},
// single item tests
{items: []int64{1}, offset: message.OffsetOldest, position: 1, max: 1},
{items: []int64{1}, offset: message.OffsetNewest, position: 1, max: 1},
{items: []int64{1}, offset: 0, position: 1, max: 1},
{items: []int64{1}, offset: 1, position: 1, max: 1},
{items: []int64{1}, offset: 2, err: message.ErrInvalidOffset},
{items: []int64{1}, offset: 2, err: ErrOffsetAfterEnd},
// continuous tests
{items: []int64{1, 2, 3}, offset: message.OffsetOldest, position: 1, max: 3},
{items: []int64{1, 2, 3}, offset: message.OffsetNewest, position: 3, max: 3},
{items: []int64{1, 2, 3}, offset: 0, position: 1, max: 3},
{items: []int64{1, 2, 3}, offset: 1, position: 1, max: 3},
{items: []int64{1, 2, 3}, offset: 3, position: 3, max: 3},
{items: []int64{1, 2, 3}, offset: 4, err: message.ErrInvalidOffset},
{items: []int64{1, 2, 3}, offset: 4, err: ErrOffsetAfterEnd},
// gaps tests
{items: []int64{1, 3}, offset: message.OffsetOldest, position: 1, max: 3},
{items: []int64{1, 3}, offset: message.OffsetNewest, position: 3, max: 3},
{items: []int64{1, 3}, offset: 0, position: 1, max: 3},
{items: []int64{1, 3}, offset: 1, position: 1, max: 3},
{items: []int64{1, 3}, offset: 2, position: 3, max: 3},
{items: []int64{1, 3}, offset: 3, position: 3, max: 3},
{items: []int64{1, 3}, offset: 4, err: message.ErrInvalidOffset},
{items: []int64{1, 3}, offset: 4, err: ErrOffsetAfterEnd},
{items: []int64{1, 3, 5}, offset: message.OffsetOldest, position: 1, max: 5},
{items: []int64{1, 3, 5}, offset: message.OffsetNewest, position: 5, max: 5},
{items: []int64{1, 3, 5}, offset: 0, position: 1, max: 5},
Expand All @@ -55,7 +55,7 @@ func TestConsume(t *testing.T) {
{items: []int64{1, 3, 5}, offset: 3, position: 3, max: 5},
{items: []int64{1, 3, 5}, offset: 4, position: 5, max: 5},
{items: []int64{1, 3, 5}, offset: 5, position: 5, max: 5},
{items: []int64{1, 3, 5}, offset: 6, err: message.ErrInvalidOffset},
{items: []int64{1, 3, 5}, offset: 6, err: ErrOffsetAfterEnd},
}

for _, tc := range tests {
Expand All @@ -76,37 +76,37 @@ func TestGet(t *testing.T) {
err error
}{
// empty tests
{items: nil, offset: 0, err: ErrIndexEmpty},
{items: nil, offset: 0, err: ErrOffsetIndexEmpty},
// single item tests
{items: []int64{1}, offset: message.OffsetOldest, position: 1},
{items: []int64{1}, offset: message.OffsetNewest, position: 1},
{items: []int64{1}, offset: 0, err: message.ErrNotFound},
{items: []int64{1}, offset: 0, err: ErrOffsetBeforeStart},
{items: []int64{1}, offset: 1, position: 1},
{items: []int64{1}, offset: 2, err: message.ErrNotFound},
{items: []int64{1}, offset: 2, err: ErrOffsetAfterEnd},
// continuous tests
{items: []int64{1, 2, 3}, offset: message.OffsetOldest, position: 1},
{items: []int64{1, 2, 3}, offset: message.OffsetNewest, position: 3},
{items: []int64{1, 2, 3}, offset: 0, err: message.ErrNotFound},
{items: []int64{1, 2, 3}, offset: 0, err: ErrOffsetBeforeStart},
{items: []int64{1, 2, 3}, offset: 1, position: 1},
{items: []int64{1, 2, 3}, offset: 3, position: 3},
{items: []int64{1, 2, 3}, offset: 4, err: message.ErrNotFound},
{items: []int64{1, 2, 3}, offset: 4, err: ErrOffsetAfterEnd},
// gaps tests
{items: []int64{1, 3}, offset: message.OffsetOldest, position: 1},
{items: []int64{1, 3}, offset: message.OffsetNewest, position: 3},
{items: []int64{1, 3}, offset: 0, err: message.ErrNotFound},
{items: []int64{1, 3}, offset: 0, err: ErrOffsetBeforeStart},
{items: []int64{1, 3}, offset: 1, position: 1},
{items: []int64{1, 3}, offset: 2, err: message.ErrNotFound},
{items: []int64{1, 3}, offset: 2, err: ErrOffsetNotFound},
{items: []int64{1, 3}, offset: 3, position: 3},
{items: []int64{1, 3}, offset: 4, err: message.ErrNotFound},
{items: []int64{1, 3}, offset: 4, err: ErrOffsetAfterEnd},
{items: []int64{1, 3, 5}, offset: message.OffsetOldest, position: 1},
{items: []int64{1, 3, 5}, offset: message.OffsetNewest, position: 5},
{items: []int64{1, 3, 5}, offset: 0, err: message.ErrNotFound},
{items: []int64{1, 3, 5}, offset: 0, err: ErrOffsetBeforeStart},
{items: []int64{1, 3, 5}, offset: 1, position: 1},
{items: []int64{1, 3, 5}, offset: 2, err: message.ErrNotFound},
{items: []int64{1, 3, 5}, offset: 2, err: ErrOffsetNotFound},
{items: []int64{1, 3, 5}, offset: 3, position: 3},
{items: []int64{1, 3, 5}, offset: 4, err: message.ErrNotFound},
{items: []int64{1, 3, 5}, offset: 4, err: ErrOffsetNotFound},
{items: []int64{1, 3, 5}, offset: 5, position: 5},
{items: []int64{1, 3, 5}, offset: 6, err: message.ErrNotFound},
{items: []int64{1, 3, 5}, offset: 6, err: ErrOffsetAfterEnd},
}

for _, tc := range tests {
Expand Down
12 changes: 9 additions & 3 deletions index/times.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,27 @@
package index

import (
"errors"
"fmt"
"sort"

"github.com/klev-dev/klevdb/message"
)

var ErrTimeIndexEmpty = fmt.Errorf("%w: no time items", message.ErrInvalidOffset)
var ErrTimeBeforeStart = errors.New("time before start")
var ErrTimeAfterEnd = errors.New("time after end")

func Time(items []Item, ts int64) (int64, error) {
if len(items) == 0 {
return 0, ErrIndexEmpty
return 0, ErrTimeIndexEmpty
}

beginIndex := 0
beginItem := items[beginIndex]
switch {
case ts < beginItem.Timestamp:
return 0, message.ErrInvalidOffset
return 0, ErrTimeBeforeStart
case ts == beginItem.Timestamp:
return beginItem.Position, nil
}
Expand All @@ -24,7 +30,7 @@ func Time(items []Item, ts int64) (int64, error) {
endItem := items[endIndex]
switch {
case endItem.Timestamp < ts:
return 0, message.ErrNotFound
return 0, ErrTimeAfterEnd
}

foundIndex := sort.Search(len(items), func(midIndex int) bool {
Expand Down
7 changes: 3 additions & 4 deletions index/times_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"
"testing"

"github.com/klev-dev/klevdb/message"
"github.com/stretchr/testify/require"
)

Expand All @@ -21,19 +20,19 @@ func TestTime(t *testing.T) {
t.Run("Empty", func(t *testing.T) {
items := gen()
_, err := Time(items, 1)
require.ErrorIs(t, ErrIndexEmpty, err)
require.ErrorIs(t, ErrTimeIndexEmpty, err)
})

t.Run("Before", func(t *testing.T) {
items := gen(1)
_, err := Time(items, 0)
require.ErrorIs(t, message.ErrInvalidOffset, err)
require.ErrorIs(t, ErrTimeBeforeStart, err)
})

t.Run("After", func(t *testing.T) {
items := gen(1)
_, err := Time(items, 2)
require.ErrorIs(t, message.ErrNotFound, err)
require.ErrorIs(t, ErrTimeAfterEnd, err)
})

t.Run("Exact", func(t *testing.T) {
Expand Down
33 changes: 17 additions & 16 deletions log.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ import (
)

var errNoKeyIndex = fmt.Errorf("%w by key", ErrNoIndex)
var errKeyNotFound = fmt.Errorf("key %w", ErrNotFound)
var errKeyNotFound = fmt.Errorf("key %w", message.ErrNotFound)
var errNoTimeIndex = fmt.Errorf("%w by time", ErrNoIndex)
var errTimeNotFound = fmt.Errorf("time %w", ErrNotFound)
var errTimeNotFound = fmt.Errorf("time %w", message.ErrNotFound)
var errDeleteRelative = fmt.Errorf("%w: delete relative offsets", message.ErrInvalidOffset)

// Open create a log based on a dir and set of options
Expand Down Expand Up @@ -196,15 +196,13 @@ func (l *log) Consume(offset int64, maxCount int64) (int64, []message.Message, e
l.readersMu.RLock()
defer l.readersMu.RUnlock()

rdr, index := segment.Consume(l.readers, offset)
rdr, segmentIndex := segment.Consume(l.readers, offset)

nextOffset, msgs, err := rdr.Consume(offset, maxCount)
if err != nil && err == message.ErrInvalidOffset {
if index < len(l.readers)-1 {
// this is after the end, consume starting the next one
next := l.readers[index+1]
return next.Consume(message.OffsetOldest, maxCount)
}
if err == index.ErrOffsetAfterEnd && segmentIndex < len(l.readers)-1 {
// this is after the end, consume starting the next one
next := l.readers[segmentIndex+1]
return next.Consume(message.OffsetOldest, maxCount)
}

return nextOffset, msgs, err
Expand Down Expand Up @@ -243,12 +241,16 @@ func (l *log) Get(offset int64) (message.Message, error) {
l.readersMu.RLock()
defer l.readersMu.RUnlock()

rdr, _, err := segment.Get(l.readers, offset)
rdr, segmentIndex, err := segment.Get(l.readers, offset)
if err != nil {
return message.Invalid, err
}

return rdr.Get(offset)
msg, err := rdr.Get(offset)
if err == index.ErrOffsetAfterEnd && segmentIndex < len(l.readers)-1 {
return msg, index.ErrOffsetNotFound
}
return msg, err
}

func (l *log) GetByKey(key []byte) (message.Message, error) {
Expand All @@ -267,7 +269,7 @@ func (l *log) GetByKey(key []byte) (message.Message, error) {
switch msg, err := rdr.GetByKey(key, hash); {
case err == nil:
return msg, nil
case err == message.ErrNotFound:
case err == index.ErrKeyNotFound:
// not in this segment, try the rest
default:
return message.Invalid, err
Expand Down Expand Up @@ -302,19 +304,18 @@ func (l *log) GetByTime(start time.Time) (message.Message, error) {
switch msg, err := rdr.GetByTime(ts); {
case err == nil:
return msg, nil
case err == message.ErrInvalidOffset:
case err == index.ErrTimeBeforeStart:
// not in this segment, try the rest
if i == 0 {
return rdr.Get(message.OffsetOldest)
}
case err == message.ErrNotFound:
case err == index.ErrTimeAfterEnd:
// time is between end of this and begin next
if i < len(l.readers)-1 {
nextRdr := l.readers[i+1]
return nextRdr.Get(message.OffsetOldest)
}

return message.Invalid, err
return message.Invalid, errTimeNotFound
default:
return message.Invalid, err
}
Expand Down
Loading
Loading