Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .golangci.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
version: "2"
linters:
exclusions:
rules:
- path: _test\.go
linters:
errcheck
2 changes: 1 addition & 1 deletion bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func BenchmarkMulti(b *testing.B) {
}

func MkdirBench(b *testing.B) string {
name := strings.Replace(b.Name(), "/", "_", -1)
name := strings.ReplaceAll(b.Name(), "/", "_")

currentDir, err := os.Getwd()
require.NoError(b, err)
Expand Down
4 changes: 2 additions & 2 deletions blocking.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,14 @@ func (l *blockingLog) ConsumeBlocking(ctx context.Context, offset int64, maxCoun
if err := l.notify.Wait(ctx, offset); err != nil {
return 0, nil, err
}
return l.Log.Consume(offset, maxCount)
return l.Consume(offset, maxCount)
}

func (l *blockingLog) ConsumeByKeyBlocking(ctx context.Context, key []byte, offset int64, maxCount int64) (int64, []Message, error) {
if err := l.notify.Wait(ctx, offset); err != nil {
return 0, nil, err
}
return l.Log.ConsumeByKey(key, offset, maxCount)
return l.ConsumeByKey(key, offset, maxCount)
}

func (l *blockingLog) Close() error {
Expand Down
4 changes: 2 additions & 2 deletions index/format.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func Write(path string, opts Params, index []Item) error {
if err != nil {
return err
}
defer w.Close()
defer func() { _ = w.Close() }() // ignoring since its only applicable if an error has happened

for _, item := range index {
if err := w.Write(item); err != nil {
Expand All @@ -109,7 +109,7 @@ func Read(path string, opts Params) ([]Item, error) {
if err != nil {
return nil, fmt.Errorf("read index open: %w", err)
}
defer f.Close()
defer func() { _ = f.Close() }()

stat, err := os.Stat(path)
if err != nil {
Expand Down
14 changes: 7 additions & 7 deletions log.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,10 +267,10 @@ func (l *log) GetByKey(key []byte) (message.Message, error) {
for i := len(l.readers) - 1; i >= 0; i-- {
rdr := l.readers[i]

switch msg, err := rdr.GetByKey(key, hash, tctx); {
case err == nil:
switch msg, err := rdr.GetByKey(key, hash, tctx); err {
case nil:
return msg, nil
case err == index.ErrKeyNotFound:
case index.ErrKeyNotFound:
// not in this segment, try the rest
default:
return message.Invalid, err
Expand Down Expand Up @@ -303,15 +303,15 @@ func (l *log) GetByTime(start time.Time) (message.Message, error) {
for i := len(l.readers) - 1; i >= 0; i-- {
rdr := l.readers[i]

switch msg, err := rdr.GetByTime(ts, tctx); {
case err == nil:
switch msg, err := rdr.GetByTime(ts, tctx); err {
case nil:
return msg, nil
case err == index.ErrTimeBeforeStart:
case index.ErrTimeBeforeStart:
// not in this segment, try the rest
if i == 0 {
return rdr.Get(message.OffsetOldest)
}
case err == index.ErrTimeAfterEnd:
case index.ErrTimeAfterEnd:
// time is between end of this and begin next
if i < len(l.readers)-1 {
nextRdr := l.readers[i+1]
Expand Down
10 changes: 5 additions & 5 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,10 @@ func (r *reader) ConsumeByKey(key, keyHash []byte, offset, maxCount int64) (int6
}

positions, err := ix.Keys(keyHash)
switch {
case err == nil:
switch err {
case nil:
break
case err == index.ErrKeyNotFound:
case index.ErrKeyNotFound:
nextOffset, err := ix.GetNextOffset()
if err != nil {
return OffsetInvalid, nil, err
Expand All @@ -152,8 +152,8 @@ func (r *reader) ConsumeByKey(key, keyHash []byte, offset, maxCount int64) (int6
defer r.messagesInuse.Add(-1)

var msgs []message.Message
for i := 0; i < len(positions); i++ {
msg, err := messages.Get(positions[i])
for _, position := range positions {
msg, err := messages.Get(position)
if err != nil {
return OffsetInvalid, nil, err
}
Expand Down
12 changes: 6 additions & 6 deletions segment/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (s Segment) Check(params index.Params) error {
if err != nil {
return err
}
defer log.Close()
defer func() { _ = log.Close() }()

var position, indexTime int64
var logIndex []index.Item
Expand Down Expand Up @@ -110,13 +110,13 @@ func (s Segment) Recover(params index.Params) error {
if err != nil {
return err
}
defer log.Close()
defer func() { _ = log.Close() }()

restore, err := message.OpenWriter(s.Log + ".recover")
if err != nil {
return err
}
defer restore.Close()
defer func() { _ = restore.Close() }() // ignoring since its only applicable if an error has happened

var position, indexTime int64
var corrupted = false
Expand Down Expand Up @@ -217,7 +217,7 @@ func (s Segment) Reindex(params index.Params) ([]index.Item, error) {
if err != nil {
return nil, err
}
defer log.Close()
defer func() { _ = log.Close() }()

return s.ReindexReader(params, log)
}
Expand Down Expand Up @@ -346,13 +346,13 @@ func (src Segment) Rewrite(dropOffsets map[int64]struct{}, params index.Params)
if err != nil {
return nil, err
}
defer srcLog.Close()
defer func() { _ = srcLog.Close() }()

dstLog, err := message.OpenWriter(dst.Log)
if err != nil {
return nil, err
}
defer dstLog.Close()
defer func() { _ = dstLog.Close() }() // ignoring since its only applicable if an error has happened

result.SurviveOffsets = map[int64]struct{}{}
result.DeletedOffsets = map[int64]struct{}{}
Expand Down
4 changes: 2 additions & 2 deletions segment/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func copyFile(src, dst string) error {
if err != nil {
return fmt.Errorf("copy src open: %w", err)
}
defer fsrc.Close()
defer func() { _ = fsrc.Close() }()

stat, err := fsrc.Stat()
if err != nil {
Expand All @@ -43,7 +43,7 @@ func copyFile(src, dst string) error {
if err != nil {
return fmt.Errorf("copy dst open: %w", err)
}
defer fdst.Close()
defer func() { _ = fdst.Close() }() // ignoring since its only applicable if an error has happened

switch n, err := io.Copy(fdst, fsrc); {
case err != nil:
Expand Down
4 changes: 2 additions & 2 deletions typed_blocking.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,14 @@ func (l *tlogBlocking[K, V]) ConsumeBlocking(ctx context.Context, offset int64,
if err := l.notify.Wait(ctx, offset); err != nil {
return 0, nil, err
}
return l.TLog.Consume(offset, maxCount)
return l.Consume(offset, maxCount)
}

func (l *tlogBlocking[K, V]) ConsumeByKeyBlocking(ctx context.Context, key K, empty bool, offset int64, maxCount int64) (int64, []TMessage[K, V], error) {
if err := l.notify.Wait(ctx, offset); err != nil {
return 0, nil, err
}
return l.TLog.ConsumeByKey(key, empty, offset, maxCount)
return l.ConsumeByKey(key, empty, offset, maxCount)
}

func (l *tlogBlocking[K, V]) Close() error {
Expand Down