Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions frac/active_token_list.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package frac

import (
"bytes"
"context"
"fmt"
"hash/crc32"
Expand Down Expand Up @@ -38,6 +39,38 @@ func (tp *activeTokenProvider) GetToken(tid uint32) []byte {
return tp.tidToVal[id]
}

// FindContains finds tids of tokens which contain a provided needle. From and to indices are specified inclusive.
func (tp *activeTokenProvider) FindContains(needle []byte) ([]uint32, error) {
if len(needle) == 0 {
return nil, nil
}
var tids []uint32
for tid := tp.FirstTID(); tid <= tp.LastTID(); tid++ {
if bytes.Contains(tp.GetToken(tid), needle) {
tids = append(tids, tid)
}
}
return tids, nil
}

// FindToken finds tids of tokens which suffice a provided searcher (predicate).
func (tp *activeTokenProvider) FindToken(searcher pattern.Searcher) ([]uint32, error) {
firstTID := searcher.FirstTID()
lastTID := searcher.LastTID()
var tids []uint32
for tid := firstTID; tid <= lastTID; tid++ {
match, err := searcher.Check(tp.GetToken(tid))
if err != nil {
return nil, err
}

if match {
tids = append(tids, tid)
}
}
return tids, nil
}

func (tp *activeTokenProvider) FirstTID() uint32 {
return 1
}
Expand Down
57 changes: 56 additions & 1 deletion frac/fraction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1395,14 +1395,69 @@ func (s *FractionTestSuite) TestSearchLargeFrac() {
toTime: midTime,
},

// other queries
// wildcards
{
name: "trace_id:trace-4*",
query: "trace_id:trace-4*",
filter: func(doc *testDoc) bool { return strings.Contains(doc.traceId, "trace-4") },
fromTime: fromTime,
toTime: toTime,
},
{
name: "id:*1* OR id:*2*",
query: "id:*1* OR id:*2*",
filter: func(doc *testDoc) bool {
return strings.Contains(doc.id, "1") || strings.Contains(doc.id, "2")
},
fromTime: fromTime,
toTime: toTime,
},
{
name: "id:*1* AND id:*2*",
query: "id:*1* AND id:*2*",
filter: func(doc *testDoc) bool {
return strings.Contains(doc.id, "1") && strings.Contains(doc.id, "2")
},
fromTime: fromTime,
toTime: toTime,
},
{
name: "id:*1 OR id:*2 OR id:*3",
query: "id:*1 OR id:*2 OR id:*3",
filter: func(doc *testDoc) bool {
return strings.HasSuffix(doc.id, "1") || strings.HasSuffix(doc.id, "2") || strings.HasSuffix(doc.id, "3")
},
fromTime: fromTime,
toTime: toTime,
},
{
name: "message:*re*",
query: "message:*re*",
filter: func(doc *testDoc) bool {
return strings.Contains(doc.message, "re")
},
fromTime: fromTime,
toTime: toTime,
},
{
name: "message:*uest OR id:*1",
query: "message:*uest OR id:*1",
filter: func(doc *testDoc) bool {
// the only message token which suffices is 'request'
return strings.Contains(doc.message, "request") || strings.HasSuffix(doc.id, "1")
},
fromTime: fromTime,
toTime: toTime,
},
{
name: "service:*a*",
query: "service:*a*",
filter: func(doc *testDoc) bool {
return strings.Contains(doc.service, "a")
},
fromTime: fromTime,
toTime: toTime,
},
}

for _, tc := range searchTestCases {
Expand Down
26 changes: 26 additions & 0 deletions frac/sealed/token/block_loader.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package token

import (
"bytes"
"encoding/binary"
"fmt"
"math"
Expand All @@ -10,6 +11,7 @@ import (

"github.com/ozontech/seq-db/cache"
"github.com/ozontech/seq-db/logger"
"github.com/ozontech/seq-db/pattern"
"github.com/ozontech/seq-db/storage"
)

Expand Down Expand Up @@ -60,6 +62,30 @@ func (b *Block) GetToken(index int) []byte {
return b.Payload[offset : offset+l]
}

func (b *Block) contains(from, to int, needle []byte) ([]int, error) {
indexes := make([]int, 0)
for i := from; i <= to; i++ {
if bytes.Contains(b.GetToken(i), needle) {
indexes = append(indexes, i)
}
}
return indexes, nil
}

func (b *Block) find(from, to int, searcher pattern.Searcher) ([]int, error) {
indexes := make([]int, 0)
for i := from; i <= to; i++ {
ok, err := searcher.Check(b.GetToken(i))
if err != nil {
return nil, err
}
if ok {
indexes = append(indexes, i)
}
}
return indexes, nil
}

// BlockLoader is responsible for Reading from disk, unpacking and caching tokens blocks.
// NOT THREAD SAFE. Do not use concurrently.
// Use your own BlockLoader instance for each search query
Expand Down
32 changes: 32 additions & 0 deletions frac/sealed/token/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package token
import (
"math"
"sort"

"github.com/ozontech/seq-db/pattern"
)

type Provider struct {
Expand Down Expand Up @@ -56,3 +58,33 @@ func (tp *Provider) GetToken(tid uint32) []byte {
block := tp.findBlock(entry.BlockIndex)
return block.GetToken(entry.GetIndexInTokensBlock(tid))
}

func (tp *Provider) FindContains(needle []byte) ([]uint32, error) {
return tp.findInBlocks(tp.FirstTID(), tp.LastTID(), func(b *Block, firstIndex, lastIndex int) ([]int, error) {
return b.contains(firstIndex, lastIndex, needle)
})
}

func (tp *Provider) FindToken(searcher pattern.Searcher) ([]uint32, error) {
return tp.findInBlocks(searcher.FirstTID(), searcher.LastTID(), func(b *Block, firstIndex, lastIndex int) ([]int, error) {
return b.find(firstIndex, lastIndex, searcher)
})
}

func (tp *Provider) findInBlocks(firstTID, lastTID uint32, search func(*Block, int, int) ([]int, error)) ([]uint32, error) {
var tids []uint32

for _, entry := range tp.entries {
block := tp.findBlock(entry.BlockIndex)
firstIndex, lastIndex := entry.narrowIndexes(firstTID, lastTID)
indexes, err := search(block, firstIndex, lastIndex)
if err != nil {
return nil, err
}
for _, idx := range indexes {
tid := entry.StartTID + uint32(idx-int(entry.StartIndex))
tids = append(tids, tid)
}
}
return tids, nil
}
15 changes: 15 additions & 0 deletions frac/sealed/token/table_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,21 @@ func (t *TableEntry) getLastTID() uint32 {
return t.StartTID + t.ValCount - 1
}

func (t *TableEntry) narrowIndexes(firstTID, lastTID uint32) (int, int) {
tidStart := firstTID
if t.StartTID > tidStart {
tidStart = t.StartTID
}
tidEnd := lastTID
if entryLastTID := t.getLastTID(); entryLastTID < tidEnd {
tidEnd = entryLastTID
}

firstIndex := t.GetIndexInTokensBlock(tidStart)
lastIndex := t.GetIndexInTokensBlock(tidEnd)
return firstIndex, lastIndex
}

func (t *TableEntry) checkTIDInBlock(tid uint32) bool {
if tid < t.StartTID {
return false
Expand Down
63 changes: 34 additions & 29 deletions pattern/pattern.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (

type tokenProvider interface {
GetToken(uint32) []byte
FindContains(needle []byte) ([]uint32, error)
FindToken(searcher Searcher) ([]uint32, error)
FirstTID() uint32
LastTID() uint32
Ordered() bool
Expand All @@ -27,11 +29,11 @@ type baseSearch struct {
last int
}

func (s *baseSearch) firstTID() uint32 {
func (s *baseSearch) FirstTID() uint32 {
return uint32(s.first)
}

func (s *baseSearch) lastTID() uint32 {
func (s *baseSearch) LastTID() uint32 {
return uint32(s.last)
}

Expand Down Expand Up @@ -67,7 +69,7 @@ func (s *literalSearch) Narrow(tp tokenProvider) {
s.last = s.first - 1 // begin > end: will be considered empty
}

func (s *literalSearch) check(val []byte) (bool, error) {
func (s *literalSearch) Check(val []byte) (bool, error) {
if s.narrowed {
return len(s.value) == len(val), nil
}
Expand Down Expand Up @@ -165,7 +167,7 @@ func findSequence(haystack []byte, needles [][]byte) int {
return len(needles)
}

func (s *wildcardSearch) check(val []byte) (bool, error) {
func (s *wildcardSearch) Check(val []byte) (bool, error) {
return s.checkPrefix(val) && s.checkSuffix(val) && s.checkMiddle(val), nil
}

Expand All @@ -181,7 +183,7 @@ func newRangeTextSearch(base baseSearch, token *parser.Range) *rangeTextSearch {
}
}

func (s *rangeTextSearch) check(val []byte) (bool, error) {
func (s *rangeTextSearch) Check(val []byte) (bool, error) {
valStr := string(val)
if s.token.From.Kind != parser.TermSymbol {
if s.token.IncludeFrom {
Expand Down Expand Up @@ -244,7 +246,7 @@ func newRangeNumberSearch(base baseSearch, token *parser.Range) *rangeNumberSear
return s
}

func (s *rangeNumberSearch) check(rawVal []byte) (bool, error) {
func (s *rangeNumberSearch) Check(rawVal []byte) (bool, error) {
val, err := strconv.ParseFloat(string(rawVal), 64)
if err != nil || isNaNOrInf(val) {
return false, nil
Expand Down Expand Up @@ -301,7 +303,7 @@ func newRangeIPSearch(base baseSearch, token *parser.IPRange) *rangeIpSearch {
return s
}

func (s *rangeIpSearch) check(rawVal []byte) (bool, error) {
func (s *rangeIpSearch) Check(rawVal []byte) (bool, error) {
val, err := netip.ParseAddr(string(rawVal))
if err != nil {
return false, nil
Expand All @@ -324,7 +326,7 @@ func newReSearch(base baseSearch, token *parser.Re) *reSearch {
return &reSearch{baseSearch: base, r: token.CompiledExpression}
}

func (s *reSearch) check(rawVal []byte) (bool, error) {
func (s *reSearch) Check(rawVal []byte) (bool, error) {
if config.MaxRegexTokensCheck > 0 && s.checked >= config.MaxRegexTokensCheck {
return false, errors.New(
"'re' filter exceeded token limit: " +
Expand All @@ -335,13 +337,13 @@ func (s *reSearch) check(rawVal []byte) (bool, error) {
return s.r.Match(rawVal), nil
}

type searcher interface {
firstTID() uint32
lastTID() uint32
check(val []byte) (bool, error)
type Searcher interface {
FirstTID() uint32
LastTID() uint32
Check(val []byte) (bool, error)
}

func newSearcher(token parser.Token, tp tokenProvider) searcher {
func newSearcher(token parser.Token, tp tokenProvider) Searcher {
base := baseSearch{
first: int(tp.FirstTID()),
last: int(tp.LastTID()),
Expand Down Expand Up @@ -390,22 +392,25 @@ func isNaNOrInf(f float64) bool {
return math.IsNaN(f) || math.IsInf(f, 0)
}

func Search(ctx context.Context, t parser.Token, tp tokenProvider) ([]uint32, error) {
tids := []uint32{}
s := newSearcher(t, tp)
for tid := s.firstTID(); tid <= s.lastTID(); tid++ {
if tid&1023 == 0 && util.IsCancelled(ctx) {
return nil, ctx.Err()
}

match, err := s.check(tp.GetToken(tid))
if err != nil {
return nil, err
}
// isSimpleWildcardContains checks if this AST token is simple wildcard like 'foo:*bar*'
func isSimpleWildcardContains(token parser.Token) (needle []byte, ok bool) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a comment with an example of an expression that fits/matches this check

lit, ok := token.(*parser.Literal)
if !ok || len(lit.Terms) != 3 {
return nil, false
}
if !lit.Terms[0].IsWildcard() || lit.Terms[1].Kind != parser.TermText || !lit.Terms[2].IsWildcard() {
return nil, false
}
return []byte(lit.Terms[1].Data), true
}

if match {
tids = append(tids, tid)
}
func Search(ctx context.Context, t parser.Token, tp tokenProvider) ([]uint32, error) {
if util.IsCancelled(ctx) {
return nil, ctx.Err()
}
if needle, ok := isSimpleWildcardContains(t); ok {
return tp.FindContains(needle)
}
return tids, nil
s := newSearcher(t, tp)
return tp.FindToken(s)
}
Loading
Loading