Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions log.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,14 +259,15 @@ func (l *log) GetByKey(key []byte) (message.Message, error) {
}

hash := index.KeyHashEncoded(index.KeyHash(key))
tctx := time.Now().UnixMicro()

l.readersMu.RLock()
defer l.readersMu.RUnlock()

for i := len(l.readers) - 1; i >= 0; i-- {
rdr := l.readers[i]

switch msg, err := rdr.GetByKey(key, hash); {
switch msg, err := rdr.GetByKey(key, hash, tctx); {
case err == nil:
return msg, nil
case err == index.ErrKeyNotFound:
Expand Down Expand Up @@ -294,14 +295,15 @@ func (l *log) GetByTime(start time.Time) (message.Message, error) {
}

ts := start.UnixMicro()
tctx := time.Now().UnixMicro()

l.readersMu.RLock()
defer l.readersMu.RUnlock()

for i := len(l.readers) - 1; i >= 0; i-- {
rdr := l.readers[i]

switch msg, err := rdr.GetByTime(ts); {
switch msg, err := rdr.GetByTime(ts, tctx); {
case err == nil:
return msg, nil
case err == index.ErrTimeBeforeStart:
Expand Down
52 changes: 30 additions & 22 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ type reader struct {

messages *message.Reader
messagesMu sync.RWMutex
messagesInuse int64
messagesInuse atomic.Int64

index indexer
indexMu sync.RWMutex
indexLastAccess int64
indexLastAccess atomic.Int64
}

type indexer interface {
Expand Down Expand Up @@ -75,15 +75,15 @@ func (r *reader) GetOffset() int64 {
}

func (r *reader) GetNextOffset() (int64, error) {
index, err := r.getIndex()
index, err := r.getIndexNow()
if err != nil {
return 0, err
}
return index.GetNextOffset()
}

func (r *reader) Consume(offset, maxCount int64) (int64, []message.Message, error) {
index, err := r.getIndex()
index, err := r.getIndexNow()
if err != nil {
return OffsetInvalid, nil, err
}
Expand All @@ -108,7 +108,7 @@ func (r *reader) Consume(offset, maxCount int64) (int64, []message.Message, erro
if err != nil {
return OffsetInvalid, nil, err
}
defer atomic.AddInt64(&r.messagesInuse, -1)
defer r.messagesInuse.Add(-1)

msgs, err := messages.Consume(position, maxPosition, maxCount)
if err != nil {
Expand All @@ -118,7 +118,7 @@ func (r *reader) Consume(offset, maxCount int64) (int64, []message.Message, erro
}

func (r *reader) ConsumeByKey(key, keyHash []byte, offset, maxCount int64) (int64, []message.Message, error) {
ix, err := r.getIndex()
ix, err := r.getIndexNow()
if err != nil {
return OffsetInvalid, nil, err
}
Expand Down Expand Up @@ -149,7 +149,7 @@ func (r *reader) ConsumeByKey(key, keyHash []byte, offset, maxCount int64) (int6
if err != nil {
return OffsetInvalid, nil, err
}
defer atomic.AddInt64(&r.messagesInuse, -1)
defer r.messagesInuse.Add(-1)

var msgs []message.Message
for i := 0; i < len(positions); i++ {
Expand Down Expand Up @@ -180,7 +180,7 @@ func (r *reader) ConsumeByKey(key, keyHash []byte, offset, maxCount int64) (int6
}

func (r *reader) Get(offset int64) (message.Message, error) {
index, err := r.getIndex()
index, err := r.getIndexNow()
if err != nil {
return message.Invalid, err
}
Expand All @@ -194,13 +194,13 @@ func (r *reader) Get(offset int64) (message.Message, error) {
if err != nil {
return message.Invalid, err
}
defer atomic.AddInt64(&r.messagesInuse, -1)
defer r.messagesInuse.Add(-1)

return messages.Get(position)
}

func (r *reader) GetByKey(key, keyHash []byte) (message.Message, error) {
ix, err := r.getIndex()
func (r *reader) GetByKey(key, keyHash []byte, tctx int64) (message.Message, error) {
ix, err := r.getIndexAt(tctx)
if err != nil {
return message.Invalid, err
}
Expand All @@ -214,7 +214,7 @@ func (r *reader) GetByKey(key, keyHash []byte) (message.Message, error) {
if err != nil {
return message.Invalid, err
}
defer atomic.AddInt64(&r.messagesInuse, -1)
defer r.messagesInuse.Add(-1)

for i := len(positions) - 1; i >= 0; i-- {
msg, err := messages.Get(positions[i])
Expand All @@ -229,8 +229,8 @@ func (r *reader) GetByKey(key, keyHash []byte) (message.Message, error) {
return message.Invalid, index.ErrKeyNotFound
}

func (r *reader) GetByTime(ts int64) (message.Message, error) {
index, err := r.getIndex()
func (r *reader) GetByTime(ts int64, tctx int64) (message.Message, error) {
index, err := r.getIndexAt(tctx)
if err != nil {
return message.Invalid, err
}
Expand All @@ -244,7 +244,7 @@ func (r *reader) GetByTime(ts int64) (message.Message, error) {
if err != nil {
return message.Invalid, err
}
defer atomic.AddInt64(&r.messagesInuse, -1)
defer r.messagesInuse.Add(-1)

return messages.Get(position)
}
Expand Down Expand Up @@ -297,9 +297,17 @@ func (r *reader) Delete(rs *segment.RewriteSegment) (*reader, error) {
return r, nil
}

func (r *reader) getIndex() (indexer, error) {
atomic.StoreInt64(&r.indexLastAccess, time.Now().UnixMicro())
func (r *reader) getIndexNow() (indexer, error) {
r.indexLastAccess.Store(time.Now().UnixMicro())
return r.getIndexMarked()
}

func (r *reader) getIndexAt(tctx int64) (indexer, error) {
r.indexLastAccess.Store(tctx)
return r.getIndexMarked()
}

func (r *reader) getIndexMarked() (indexer, error) {
r.indexMu.RLock()
if ix := r.index; ix != nil {
defer r.indexMu.RUnlock()
Expand All @@ -326,7 +334,7 @@ func (r *reader) getIndex() (indexer, error) {
func (r *reader) getMessages() (*message.Reader, error) {
r.messagesMu.RLock()
if msgs := r.messages; msgs != nil {
atomic.AddInt64(&r.messagesInuse, 1)
r.messagesInuse.Add(1)
r.messagesMu.RUnlock()
return msgs, nil
}
Expand All @@ -336,7 +344,7 @@ func (r *reader) getMessages() (*message.Reader, error) {
defer r.messagesMu.Unlock()

if msgs := r.messages; msgs != nil {
atomic.AddInt64(&r.messagesInuse, 1)
r.messagesInuse.Add(1)
return msgs, nil
}

Expand All @@ -346,7 +354,7 @@ func (r *reader) getMessages() (*message.Reader, error) {
}

r.messages = msgs
atomic.AddInt64(&r.messagesInuse, 1)
r.messagesInuse.Add(1)
return msgs, nil
}

Expand All @@ -363,7 +371,7 @@ func (r *reader) GC(unusedFor time.Duration) error {
return nil
}

indexLastAccess := time.UnixMicro(atomic.LoadInt64(&r.indexLastAccess))
indexLastAccess := time.UnixMicro(r.indexLastAccess.Load())
if time.Since(indexLastAccess) < unusedFor {
// only unload segments unused for defined time
return nil
Expand All @@ -374,7 +382,7 @@ func (r *reader) GC(unusedFor time.Duration) error {
r.messagesMu.Lock()
defer r.messagesMu.Unlock()

if r.messages == nil || atomic.LoadInt64(&r.messagesInuse) > 0 {
if r.messages == nil || r.messagesInuse.Load() > 0 {
return nil
}

Expand Down
Loading