-
Notifications
You must be signed in to change notification settings - Fork 2
fix: harden leader verification and optimize scans; add MVCC/route co… #315
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
3defc84
68353fc
984ea8c
f6bc81e
7ad54bd
71737c6
322b188
0a1b8e9
e01f39b
16d3b50
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,48 @@ | ||
| package adapter | ||
|
|
||
| import ( | ||
| "context" | ||
| "testing" | ||
|
|
||
| "github.com/bootjp/elastickv/distribution" | ||
| pb "github.com/bootjp/elastickv/proto" | ||
| "github.com/stretchr/testify/require" | ||
| ) | ||
|
|
||
| func TestDistributionServerGetRoute_HitAndMiss(t *testing.T) { | ||
| t.Parallel() | ||
|
|
||
| engine := distribution.NewEngine() | ||
| engine.UpdateRoute([]byte("a"), []byte("m"), 1) | ||
| engine.UpdateRoute([]byte("m"), nil, 2) | ||
|
|
||
| s := NewDistributionServer(engine) | ||
| ctx := context.Background() | ||
|
|
||
| hit, err := s.GetRoute(ctx, &pb.GetRouteRequest{Key: []byte("b")}) | ||
| require.NoError(t, err) | ||
| require.Equal(t, []byte("a"), hit.Start) | ||
| require.Equal(t, []byte("m"), hit.End) | ||
| require.Equal(t, uint64(1), hit.RaftGroupId) | ||
|
|
||
| miss, err := s.GetRoute(ctx, &pb.GetRouteRequest{Key: []byte("0")}) | ||
| require.NoError(t, err) | ||
| require.Equal(t, uint64(0), miss.RaftGroupId) | ||
| require.Nil(t, miss.Start) | ||
| require.Nil(t, miss.End) | ||
| } | ||
|
|
||
| func TestDistributionServerGetTimestamp_IsMonotonic(t *testing.T) { | ||
| t.Parallel() | ||
|
|
||
| s := NewDistributionServer(distribution.NewEngine()) | ||
| ctx := context.Background() | ||
|
|
||
| first, err := s.GetTimestamp(ctx, &pb.GetTimestampRequest{}) | ||
| require.NoError(t, err) | ||
|
|
||
| second, err := s.GetTimestamp(ctx, &pb.GetTimestampRequest{}) | ||
| require.NoError(t, err) | ||
|
|
||
| require.Greater(t, second.Timestamp, first.Timestamp) | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -39,6 +39,7 @@ const ( | |
| const ( | ||
| redisLatestCommitTimeout = 5 * time.Second | ||
| redisDispatchTimeout = 10 * time.Second | ||
| maxByteValue = 0xFF | ||
| ) | ||
|
|
||
| //nolint:mnd | ||
|
|
@@ -383,19 +384,50 @@ func (r *RedisServer) localKeysExact(pattern []byte) ([][]byte, error) { | |
| if res { | ||
| return [][]byte{bytes.Clone(pattern)}, nil | ||
| } | ||
|
|
||
| isList, err := r.isListKeyAt(context.Background(), pattern, readTS) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| if isList { | ||
| return [][]byte{bytes.Clone(pattern)}, nil | ||
| } | ||
| return [][]byte{}, nil | ||
| } | ||
|
|
||
| func (r *RedisServer) localKeysPattern(pattern []byte) ([][]byte, error) { | ||
| start := r.patternStart(pattern) | ||
|
|
||
| start, end := patternScanBounds(pattern) | ||
| keyset := map[string][]byte{} | ||
| readTS := r.readTS() | ||
| keys, err := r.store.ScanAt(context.Background(), start, nil, math.MaxInt, readTS) | ||
| if err != nil { | ||
| return nil, errors.WithStack(err) | ||
|
|
||
| mergeScannedKeys := func(scanStart, scanEnd []byte) error { | ||
| keys, err := r.store.ScanAt(context.Background(), scanStart, scanEnd, math.MaxInt, readTS) | ||
| if err != nil { | ||
| return errors.WithStack(err) | ||
| } | ||
| for k, v := range r.collectUserKeys(keys, pattern) { | ||
| keyset[k] = v | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| keyset := r.collectUserKeys(keys) | ||
| if err := mergeScannedKeys(start, end); err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| // User-key bounded scans like "foo*" do not naturally include internal list | ||
| // keys ("!lst|..."), so scan list namespaces separately with mapped bounds. | ||
| if start != nil || end != nil { | ||
| metaStart, metaEnd := listPatternScanBounds(store.ListMetaPrefix, pattern) | ||
| if err := mergeScannedKeys(metaStart, metaEnd); err != nil { | ||
|
Comment on lines
+418
to
+422
|
||
| return nil, err | ||
| } | ||
|
|
||
| itemStart, itemEnd := listPatternScanBounds(store.ListItemPrefix, pattern) | ||
| if err := mergeScannedKeys(itemStart, itemEnd); err != nil { | ||
| return nil, err | ||
| } | ||
| } | ||
|
|
||
| out := make([][]byte, 0, len(keyset)) | ||
| for _, v := range keyset { | ||
|
|
@@ -404,22 +436,102 @@ func (r *RedisServer) localKeysPattern(pattern []byte) ([][]byte, error) { | |
| return out, nil | ||
| } | ||
|
|
||
| func (r *RedisServer) patternStart(pattern []byte) []byte { | ||
| func patternScanBounds(pattern []byte) ([]byte, []byte) { | ||
| if bytes.Equal(pattern, []byte("*")) { | ||
| return nil, nil | ||
| } | ||
|
|
||
| i := bytes.IndexByte(pattern, '*') | ||
| if i <= 0 { | ||
| return nil, nil | ||
| } | ||
|
|
||
| start := bytes.Clone(pattern[:i]) | ||
| return start, prefixScanEnd(start) | ||
| } | ||
|
|
||
| func listPatternScanBounds(prefix string, pattern []byte) ([]byte, []byte) { | ||
| userStart, userEnd := patternScanBounds(pattern) | ||
| prefixBytes := []byte(prefix) | ||
|
|
||
| if userStart == nil && userEnd == nil { | ||
| return prefixBytes, prefixScanEnd(prefixBytes) | ||
| } | ||
|
|
||
| start := append(bytes.Clone(prefixBytes), userStart...) | ||
| if userEnd == nil { | ||
| return start, prefixScanEnd(prefixBytes) | ||
| } | ||
| end := append(bytes.Clone(prefixBytes), userEnd...) | ||
| return start, end | ||
| } | ||
|
|
||
| func prefixScanEnd(prefix []byte) []byte { | ||
| if len(prefix) == 0 { | ||
| return nil | ||
| } | ||
| return bytes.ReplaceAll(pattern, []byte("*"), nil) | ||
|
|
||
| end := bytes.Clone(prefix) | ||
| for i := len(end) - 1; i >= 0; i-- { | ||
| if end[i] == maxByteValue { | ||
| continue | ||
| } | ||
| end[i]++ | ||
| return end[:i+1] | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| func matchesAsteriskPattern(pattern, key []byte) bool { | ||
| parts := bytes.Split(pattern, []byte("*")) | ||
| if len(parts) == 1 { | ||
| return bytes.Equal(pattern, key) | ||
| } | ||
|
|
||
| pos := 0 | ||
| if len(parts[0]) > 0 { | ||
| if !bytes.HasPrefix(key, parts[0]) { | ||
| return false | ||
| } | ||
| pos = len(parts[0]) | ||
| } | ||
|
|
||
| for i := 1; i < len(parts)-1; i++ { | ||
| part := parts[i] | ||
| if len(part) == 0 { | ||
| continue | ||
| } | ||
| idx := bytes.Index(key[pos:], part) | ||
| if idx < 0 { | ||
| return false | ||
| } | ||
| pos += idx + len(part) | ||
| } | ||
|
|
||
| last := parts[len(parts)-1] | ||
| if len(last) > 0 && !bytes.HasSuffix(key, last) { | ||
| return false | ||
| } | ||
|
|
||
| return true | ||
| } | ||
|
|
||
| func (r *RedisServer) collectUserKeys(kvs []*store.KVPair) map[string][]byte { | ||
| func (r *RedisServer) collectUserKeys(kvs []*store.KVPair, pattern []byte) map[string][]byte { | ||
| keyset := map[string][]byte{} | ||
| for _, kvPair := range kvs { | ||
| if store.IsListMetaKey(kvPair.Key) || store.IsListItemKey(kvPair.Key) { | ||
| if userKey := store.ExtractListUserKey(kvPair.Key); userKey != nil { | ||
| if !matchesAsteriskPattern(pattern, userKey) { | ||
| continue | ||
| } | ||
| keyset[string(userKey)] = userKey | ||
| } | ||
| continue | ||
| } | ||
| if !matchesAsteriskPattern(pattern, kvPair.Key) { | ||
| continue | ||
| } | ||
| keyset[string(kvPair.Key)] = kvPair.Key | ||
| } | ||
| return keyset | ||
|
|
@@ -507,6 +619,20 @@ type listTxnState struct { | |
| metaExists bool | ||
| appends [][]byte | ||
| deleted bool | ||
| purge bool | ||
| purgeMeta store.ListMeta | ||
| } | ||
|
|
||
| func stageListDelete(st *listTxnState) { | ||
| if st == nil { | ||
| return | ||
| } | ||
| if st.metaExists { | ||
| st.purge = true | ||
| st.purgeMeta = st.meta | ||
| } | ||
| st.deleted = true | ||
| st.appends = nil | ||
| } | ||
|
|
||
| func (t *txnContext) load(key []byte) (*txnValue, error) { | ||
|
|
@@ -586,16 +712,22 @@ func (t *txnContext) applySet(cmd redcon.Command) (redisResult, error) { | |
| } | ||
|
|
||
| func (t *txnContext) applyDel(cmd redcon.Command) (redisResult, error) { | ||
| // handle list delete separately | ||
| if isList, err := t.server.isListKeyAt(context.Background(), cmd.Args[1], t.startTS); err != nil { | ||
| // Handle list delete through txn-local list state so subsequent commands in | ||
| // the same MULTI observe the staged delete consistently. | ||
| if st, ok := t.listStates[string(cmd.Args[1])]; ok { | ||
| stageListDelete(st) | ||
| return redisResult{typ: resultInt, integer: 1}, nil | ||
| } | ||
| isList, err := t.server.isListKeyAt(context.Background(), cmd.Args[1], t.startTS) | ||
| if err != nil { | ||
| return redisResult{}, err | ||
| } else if isList { | ||
| } | ||
| if isList { | ||
| st, err := t.loadListState(cmd.Args[1]) | ||
| if err != nil { | ||
| return redisResult{}, err | ||
| } | ||
| st.deleted = true | ||
| st.appends = nil | ||
| stageListDelete(st) | ||
| return redisResult{typ: resultInt, integer: 1}, nil | ||
|
Comment on lines
717
to
731
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| } | ||
|
|
||
|
|
@@ -647,6 +779,17 @@ func (t *txnContext) applyRPush(cmd redcon.Command) (redisResult, error) { | |
| if err != nil { | ||
| return redisResult{}, err | ||
| } | ||
| if st.deleted { | ||
| if st.metaExists { | ||
| st.purge = true | ||
| st.purgeMeta = st.meta | ||
| } | ||
| // DEL followed by RPUSH in the same transaction recreates the list. | ||
| st.deleted = false | ||
| st.metaExists = false | ||
| st.meta = store.ListMeta{} | ||
| st.appends = nil | ||
| } | ||
|
|
||
| for _, v := range cmd.Args[2:] { | ||
| st.appends = append(st.appends, bytes.Clone(v)) | ||
|
|
@@ -761,6 +904,24 @@ func (t *txnContext) buildKeyElems() []*kv.Elem[kv.OP] { | |
| return elems | ||
| } | ||
|
|
||
| func listDeleteMeta(st *listTxnState) (store.ListMeta, bool) { | ||
| switch { | ||
| case st.metaExists: | ||
| return st.meta, true | ||
| case st.purge: | ||
| return st.purgeMeta, true | ||
| default: | ||
| return store.ListMeta{}, false | ||
| } | ||
| } | ||
|
|
||
| func appendListDeleteOps(elems []*kv.Elem[kv.OP], userKey []byte, meta store.ListMeta) []*kv.Elem[kv.OP] { | ||
| for seq := meta.Head; seq < meta.Tail; seq++ { | ||
| elems = append(elems, &kv.Elem[kv.OP]{Op: kv.Del, Key: listItemKey(userKey, seq)}) | ||
| } | ||
| return append(elems, &kv.Elem[kv.OP]{Op: kv.Del, Key: listMetaKey(userKey)}) | ||
| } | ||
|
|
||
| func (t *txnContext) buildListElems() ([]*kv.Elem[kv.OP], error) { | ||
| listKeys := make([]string, 0, len(t.listStates)) | ||
| for k := range t.listStates { | ||
|
|
@@ -774,16 +935,17 @@ func (t *txnContext) buildListElems() ([]*kv.Elem[kv.OP], error) { | |
| userKey := []byte(k) | ||
|
|
||
| if st.deleted { | ||
| // delete all persisted list items | ||
| for seq := st.meta.Head; seq < st.meta.Tail; seq++ { | ||
| elems = append(elems, &kv.Elem[kv.OP]{Op: kv.Del, Key: listItemKey(userKey, seq)}) | ||
| if meta, ok := listDeleteMeta(st); ok { | ||
| elems = appendListDeleteOps(elems, userKey, meta) | ||
| } | ||
| elems = append(elems, &kv.Elem[kv.OP]{Op: kv.Del, Key: listMetaKey(userKey)}) | ||
| continue | ||
| } | ||
| if len(st.appends) == 0 { | ||
| continue | ||
| } | ||
| if st.purge { | ||
| elems = appendListDeleteOps(elems, userKey, st.purgeMeta) | ||
| } | ||
|
|
||
| startSeq := st.meta.Head + st.meta.Len | ||
| for i, v := range st.appends { | ||
|
|
@@ -1010,7 +1172,7 @@ func (r *RedisServer) listRPush(ctx context.Context, key []byte, values [][]byte | |
| } | ||
|
|
||
| func (r *RedisServer) deleteList(ctx context.Context, key []byte) error { | ||
| meta, exists, err := r.loadListMeta(ctx, key) | ||
| _, exists, err := r.loadListMeta(ctx, key) | ||
| if err != nil { | ||
| return err | ||
| } | ||
|
|
@@ -1021,8 +1183,16 @@ func (r *RedisServer) deleteList(ctx context.Context, key []byte) error { | |
| start := listItemKey(key, math.MinInt64) | ||
| end := listItemKey(key, math.MaxInt64) | ||
|
|
||
| readTS := r.readTS() | ||
| kvs, err := r.store.ScanAt(ctx, start, end, math.MaxInt, readTS) | ||
| startTS := r.readTS() | ||
| if startTS == ^uint64(0) && r.coordinator != nil && r.coordinator.Clock() != nil { | ||
| startTS = r.coordinator.Clock().Next() | ||
| } | ||
|
|
||
| // Keep DEL atomic by deleting all persisted list entries and metadata in one | ||
| // transaction at a single snapshot timestamp. This can allocate large slices | ||
| // for very large lists; if the storage layer grows range-delete support, this | ||
| // path should move to a streaming/range tombstone strategy. | ||
| kvs, err := r.store.ScanAt(ctx, start, end, math.MaxInt, startTS) | ||
| if err != nil { | ||
| return errors.WithStack(err) | ||
| } | ||
|
Comment on lines
+1195
to
1198
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This implementation of A more scalable approach would be to use a range deletion at the storage layer if the underlying store supports it. For example, PebbleDB has a If implementing a range delete is not immediately feasible, it would be good to add a comment here noting the potential for high memory usage with large lists. |
||
|
|
@@ -1034,10 +1204,7 @@ func (r *RedisServer) deleteList(ctx context.Context, key []byte) error { | |
| // delete meta last | ||
| ops = append(ops, &kv.Elem[kv.OP]{Op: kv.Del, Key: listMetaKey(key)}) | ||
|
|
||
| // ensure meta bounds consistent even if scan missed (in case of empty list) | ||
| _ = meta | ||
|
|
||
| group := &kv.OperationGroup[kv.OP]{IsTxn: true, Elems: ops} | ||
| group := &kv.OperationGroup[kv.OP]{IsTxn: true, StartTS: startTS, Elems: ops} | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a critical correctness fix. By adding References
|
||
| _, err = r.coordinator.Dispatch(ctx, group) | ||
| return errors.WithStack(err) | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
KEYScommand should operate on a consistent snapshot of the data. In the current implementation oflocalKeysPattern,r.readTS()is called inside themergeScannedKeysclosure. SincemergeScannedKeysis called multiple times for a singleKEYScommand, each scan could potentially use a different timestamp if writes are occurring concurrently. This can lead to inconsistent results.To ensure atomicity and a consistent view, the read timestamp should be fetched once at the beginning of
localKeysPatternand reused for all subsequent scans within that function.References
KEYSis not a transaction, it's a single command that should present a consistent view of the database, so the principle of using a single snapshot timestamp applies.