Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions build.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
index "github.com/blevesearch/bleve_index_api"
)

const Version uint32 = 17
const Version uint32 = 16

const Type string = "zap"

Expand Down Expand Up @@ -165,7 +165,7 @@ func InitSegmentBase(mem []byte, memCRC uint32, chunkMode uint32, numDocs uint64
numDocs: numDocs,
storedIndexOffset: storedIndexOffset,
sectionsIndexOffset: sectionsIndexOffset,
fieldDvReaders: make([]map[uint16]*docValueReader, len(segmentSections)),
fieldDvReaders: make([][]*docValueReader, len(segmentSections)),
updatedFields: make(map[string]*index.UpdateFieldInfo),
invIndexCache: newInvertedIndexCache(),
vecIndexCache: newVectorIndexCache(),
Expand Down
6 changes: 2 additions & 4 deletions cmd/zap/cmd/docvalue.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"sort"
"strconv"

index "github.com/blevesearch/bleve_index_api"
zap "github.com/blevesearch/zapx/v16"
"github.com/golang/snappy"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -168,13 +169,10 @@ func dumpDocValueResults(data []byte, args []string, field string, id int, field
return err
}

var termSeparator byte = 0xff
var termSeparatorSplitSlice = []byte{termSeparator}

// pick the terms for the given docNum
uncompressed = uncompressed[start:end]
for {
i := bytes.Index(uncompressed, termSeparatorSplitSlice)
i := bytes.IndexByte(uncompressed, index.DocValueTermSeparator)
if i < 0 {
break
}
Expand Down
44 changes: 25 additions & 19 deletions contentcoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,6 @@ func init() {
reflectStaticSizeMetaData = int(reflect.TypeOf(md).Size())
}

var (
termSeparator byte = 0xff
termSeparatorSplitSlice = []byte{termSeparator}
)

type chunkedContentCoder struct {
bytesWritten uint64 // moved to top to correct alignment issues on ARM, 386 and 32-bit MIPS.

Expand All @@ -47,6 +42,7 @@ type chunkedContentCoder struct {

w io.Writer
progressiveWrite bool
skipEncode bool

chunkMeta []MetaData
chunkMetaBuf bytes.Buffer
Expand All @@ -63,7 +59,7 @@ type MetaData struct {
// newChunkedContentCoder returns a new chunk content coder which
// packs data into chunks based on the provided chunkSize
func newChunkedContentCoder(chunkSize uint64, maxDocNum uint64,
w io.Writer, progressiveWrite bool,
w io.Writer, progressiveWrite bool, skipEncode bool,
) *chunkedContentCoder {
total := maxDocNum/chunkSize + 1
rv := &chunkedContentCoder{
Expand All @@ -72,6 +68,7 @@ func newChunkedContentCoder(chunkSize uint64, maxDocNum uint64,
chunkMeta: make([]MetaData, 0, total),
w: w,
progressiveWrite: progressiveWrite,
skipEncode: skipEncode,
}

return rv
Expand Down Expand Up @@ -121,25 +118,34 @@ func (c *chunkedContentCoder) getBytesWritten() uint64 {
func (c *chunkedContentCoder) flushContents() error {
// flush the contents, with meta information at first
buf := make([]byte, binary.MaxVarintLen64)
n := binary.PutUvarint(buf, uint64(len(c.chunkMeta)))
_, err := c.chunkMetaBuf.Write(buf[:n])
if err != nil {
return err
}

// write out the metaData slice
for _, meta := range c.chunkMeta {
_, err := writeUvarints(&c.chunkMetaBuf, meta.DocNum, meta.DocDvOffset)
var metaData []byte
if c.chunkSize != 1 {
n := binary.PutUvarint(buf, uint64(len(c.chunkMeta)))
_, err := c.chunkMetaBuf.Write(buf[:n])
if err != nil {
return err
}

// write out the metaData slice
for _, meta := range c.chunkMeta {
_, err := writeUvarints(&c.chunkMetaBuf, meta.DocNum, meta.DocDvOffset)
if err != nil {
return err
}
}

// write the metadata to final data
metaData = c.chunkMetaBuf.Bytes()
c.final = append(c.final, c.chunkMetaBuf.Bytes()...)
}

// write the metadata to final data
metaData := c.chunkMetaBuf.Bytes()
c.final = append(c.final, c.chunkMetaBuf.Bytes()...)
// write the compressed data to the final data
c.compressed = snappy.Encode(c.compressed[:cap(c.compressed)], c.chunkBuf.Bytes())
if c.skipEncode {
c.compressed = c.chunkBuf.Bytes()
} else {
c.compressed = snappy.Encode(c.compressed[:cap(c.compressed)], c.chunkBuf.Bytes())
}

c.incrementBytesWritten(uint64(len(c.compressed)))
c.final = append(c.final, c.compressed...)

Expand Down
180 changes: 136 additions & 44 deletions contentcoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,25 @@ func TestChunkedContentCoder(t *testing.T) {
docNums: []uint64{0},
vals: [][]byte{[]byte("bleve")},
// 1 chunk, chunk-0 length 11(b), value
expected: []byte{0x1, 0x0, 0x5, 0x5, 0x10, 0x62, 0x6c, 0x65, 0x76, 0x65,
0xa,
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1,
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1},
expected: []byte{
0x5, 0x10, 0x62, 0x6c, 0x65, 0x76, 0x65, // compressed value - "bleve"
0x7, // chunk offset length - 7 bytes
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, // length of offset entries - 1 entry
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, // number of chuncks - 1 chunk
},
},
{
maxDocNum: 0,
chunkSize: 2,
docNums: []uint64{0},
vals: [][]byte{[]byte("bleve")},
// 1 chunk, chunk-0 length 11(b), value
expected: []byte{
0x1, 0x0, 0x5, 0x5, 0x10, 0x62, 0x6c, 0x65, 0x76, 0x65, // meta + compressed value - "bleve"
0xa, // chunk offset length - 10 bytes
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, // length of offset entries - 1 entry
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, // number of chuncks - 1 chunk
},
},
{
maxDocNum: 1,
Expand All @@ -47,17 +62,38 @@ func TestChunkedContentCoder(t *testing.T) {
[]byte("scorch"),
},

expected: []byte{0x1, 0x0, 0x6, 0x6, 0x14, 0x75, 0x70, 0x73, 0x69, 0x64,
0x65, 0x1, 0x1, 0x6, 0x6, 0x14, 0x73, 0x63, 0x6f, 0x72, 0x63, 0x68,
0xb, 0x16,
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x2,
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x2},
expected: []byte{
0x6, 0x14, 0x75, 0x70, 0x73, 0x69, 0x64, 0x65, // compressed value - "upside"
0x6, 0x14, 0x73, 0x63, 0x6f, 0x72, 0x63, 0x68, // compressed value - "scorch"
0x8, 0x10, // chunk offset lengths - 8 and 16 bytes
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x2, // length of offset entries - 2 entries
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x2, // number of chuncks - 2 chunks
},
},
{
maxDocNum: 1,
chunkSize: 2,
docNums: []uint64{0, 1},
vals: [][]byte{
[]byte("upside"),
[]byte("scorch"),
},

expected: []byte{
0x2, // meta - 2 documents in chunk
0x0, 0x6, // meta - docNum 0, offset 6
0x1, 0xc, // meta - docNum 1, offset 12
0xc, 0x2c, 0x75, 0x70, 0x73, 0x69, 0x64, 0x65, 0x73, 0x63, 0x6f, 0x72, 0x63, 0x68, // compressed value - "upsidescorch"
0x13, // chunk offset length - 19 bytes
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, // length of offset entries - 1 entry
0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, // number of chuncks - 1 chunk
},
},
}

for _, test := range tests {
var actual bytes.Buffer
cic := newChunkedContentCoder(test.chunkSize, test.maxDocNum, &actual, false)
cic := newChunkedContentCoder(test.chunkSize, test.maxDocNum, &actual, false, false)
for i, docNum := range test.docNums {
err := cic.Add(docNum, test.vals[i])
if err != nil {
Expand All @@ -77,47 +113,103 @@ func TestChunkedContentCoder(t *testing.T) {
}

func TestChunkedContentCoders(t *testing.T) {
maxDocNum := uint64(5)
chunkSize := uint64(1)
docNums := []uint64{0, 1, 2, 3, 4, 5}
vals := [][]byte{
[]byte("scorch"),
[]byte("does"),
[]byte("better"),
[]byte("than"),
[]byte("upside"),
[]byte("down"),

tests := []struct {
maxDocNum uint64
chunkSize uint64
skipSnappy bool
docNums []uint64
vals [][]byte
}{
{
maxDocNum: 5,
chunkSize: 1,
skipSnappy: false,
docNums: []uint64{0, 1, 2, 3, 4, 5},
vals: [][]byte{
[]byte("scorch"),
[]byte("does"),
[]byte("better"),
[]byte("than"),
[]byte("upside"),
[]byte("down"),
},
},
{
maxDocNum: 5,
chunkSize: 2,
skipSnappy: false,
docNums: []uint64{0, 1, 2, 3, 4, 5},
vals: [][]byte{
[]byte("scorch"),
[]byte("does"),
[]byte("better"),
[]byte("than"),
[]byte("upside"),
[]byte("down"),
},
},
{
maxDocNum: 5,
chunkSize: 1,
skipSnappy: true,
docNums: []uint64{0, 1, 2, 3, 4, 5},
vals: [][]byte{
[]byte("scorch"),
[]byte("does"),
[]byte("better"),
[]byte("than"),
[]byte("upside"),
[]byte("down"),
},
},
{
maxDocNum: 5,
chunkSize: 2,
skipSnappy: true,
docNums: []uint64{0, 1, 2, 3, 4, 5},
vals: [][]byte{
[]byte("scorch"),
[]byte("does"),
[]byte("better"),
[]byte("than"),
[]byte("upside"),
[]byte("down"),
},
},
}

var actual1, actual2 bytes.Buffer
// chunkedContentCoder that writes out at the end
cic1 := newChunkedContentCoder(chunkSize, maxDocNum, &actual1, false)
// chunkedContentCoder that writes out in chunks
cic2 := newChunkedContentCoder(chunkSize, maxDocNum, &actual2, true)
for _, test := range tests {
var actual1, actual2 bytes.Buffer
// chunkedContentCoder that writes out at the end
cic1 := newChunkedContentCoder(test.chunkSize, test.maxDocNum, &actual1, false, false)
// chunkedContentCoder that writes out in chunks
cic2 := newChunkedContentCoder(test.chunkSize, test.maxDocNum, &actual2, true, false)

for i, docNum := range docNums {
err := cic1.Add(docNum, vals[i])
for i, docNum := range test.docNums {
err := cic1.Add(docNum, test.vals[i])
if err != nil {
t.Fatalf("error adding to intcoder: %v", err)
}
err = cic2.Add(docNum, test.vals[i])
if err != nil {
t.Fatalf("error adding to intcoder: %v", err)
}
}
_ = cic1.Close()
_ = cic2.Close()

_, err := cic1.Write()
if err != nil {
t.Fatalf("error adding to intcoder: %v", err)
t.Fatalf("error writing: %v", err)
}
err = cic2.Add(docNum, vals[i])
_, err = cic2.Write()
if err != nil {
t.Fatalf("error adding to intcoder: %v", err)
t.Fatalf("error writing: %v", err)
}
}
_ = cic1.Close()
_ = cic2.Close()

_, err := cic1.Write()
if err != nil {
t.Fatalf("error writing: %v", err)
}
_, err = cic2.Write()
if err != nil {
t.Fatalf("error writing: %v", err)
}

if !bytes.Equal(actual1.Bytes(), actual2.Bytes()) {
t.Errorf("%s != %s", actual1.String(), actual2.String())
if !bytes.Equal(actual1.Bytes(), actual2.Bytes()) {
t.Errorf("%s != %s", actual1.String(), actual2.String())
}
}
}
Loading
Loading