Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .changeset/fix-index-stale-bucket.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
'@tanstack/db': patch
---

fix(db): prevent indexes from leaving a key in two buckets at once

`BasicIndex` and `BTreeIndex` now track each key's currently-indexed value internally and use that on `remove`/`update`, instead of trusting the caller-supplied `oldItem`. If `oldItem` evaluated to a value that didn't match the bucket the key actually lived in, the previous implementation silently no-oped on the bucket map while the subsequent `add` placed the key in a new bucket, leaving the key in two buckets simultaneously. Calling `add` for a key that's already indexed also now drops the previous bucket first, so a missed `remove` can't double-bucket the key.

This was the underlying cause of stale auto-index buckets after an optimistic→synced update on the indexed column (e.g. a kanban card snapping back to its old column).
74 changes: 49 additions & 25 deletions packages/db/src/indexes/basic-index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ export class BasicIndex<
private sortedValues: Array<any> = []
// Set of all indexed PKs
private indexedKeys = new Set<TKey>()
// Reverse map: key -> currently indexed value. Lets `remove`/`update`
// find the bucket a key actually lives in instead of trusting the
// caller-supplied old item, which can be stale across optimistic→synced
// lifecycle races.
private keyToValue = new Map<TKey, any>()
// Comparator function
private compareFn: (a: any, b: any) => number = defaultComparator

Expand Down Expand Up @@ -88,6 +93,13 @@ export class BasicIndex<

const normalizedValue = normalizeValue(indexedValue)

// If the key is already indexed, drop it from its current bucket first so
// an `add` after a stale or missed `remove` can't leave the key in two
// buckets at once.
if (this.indexedKeys.has(key)) {
this.removeFromBucket(key, this.keyToValue.get(key))
}

if (this.valueMap.has(normalizedValue)) {
// Value already exists, just add the key to the set
this.valueMap.get(normalizedValue)!.add(key)
Expand All @@ -105,40 +117,49 @@ export class BasicIndex<
}

this.indexedKeys.add(key)
this.keyToValue.set(key, normalizedValue)
this.updateTimestamp()
}

private removeFromBucket(key: TKey, normalizedValue: any): void {
if (normalizedValue === undefined && !this.valueMap.has(undefined)) {
// Key wasn't tracked yet; nothing to remove from valueMap.
return
}
const keySet = this.valueMap.get(normalizedValue)
if (!keySet) return
keySet.delete(key)
if (keySet.size === 0) {
this.valueMap.delete(normalizedValue)
deleteInSortedArray(this.sortedValues, normalizedValue, this.compareFn)
}
}

/**
* Removes a value from the index
*/
remove(key: TKey, item: any): void {
let indexedValue: any
try {
indexedValue = this.evaluateIndexExpression(item)
} catch (error) {
console.warn(
`Failed to evaluate index expression for key ${key} during removal:`,
error,
)
this.indexedKeys.delete(key)
this.updateTimestamp()
return
}

const normalizedValue = normalizeValue(indexedValue)

if (this.valueMap.has(normalizedValue)) {
const keySet = this.valueMap.get(normalizedValue)!
keySet.delete(key)

if (keySet.size === 0) {
// No more keys for this value, remove from map and sorted array
this.valueMap.delete(normalizedValue)
deleteInSortedArray(this.sortedValues, normalizedValue, this.compareFn)
// Use the tracked bucket for this key. We intentionally do NOT trust
// the caller-supplied `item`: across optimistic→synced lifecycle races
// the previousValue passed in can disagree with the value we actually
// indexed, which would silently leave the key in a stale bucket.
if (this.keyToValue.has(key)) {
this.removeFromBucket(key, this.keyToValue.get(key))
} else {
// Fall back to evaluating the supplied item (covers the case where
// the key was never indexed but caller still asks to remove it).
try {
const indexedValue = this.evaluateIndexExpression(item)
this.removeFromBucket(key, normalizeValue(indexedValue))
} catch (error) {
console.warn(
`Failed to evaluate index expression for key ${key} during removal:`,
error,
)
}
}

this.indexedKeys.delete(key)
this.keyToValue.delete(key)
this.updateTimestamp()
}

Expand Down Expand Up @@ -168,8 +189,10 @@ export class BasicIndex<
{ cause: error },
)
}
entriesArray.push({ key, value: normalizeValue(indexedValue) })
const normalizedValue = normalizeValue(indexedValue)
entriesArray.push({ key, value: normalizedValue })
this.indexedKeys.add(key)
this.keyToValue.set(key, normalizedValue)
}

// Group by value
Expand All @@ -194,6 +217,7 @@ export class BasicIndex<
this.valueMap.clear()
this.sortedValues = []
this.indexedKeys.clear()
this.keyToValue.clear()
this.updateTimestamp()
}

Expand Down
67 changes: 42 additions & 25 deletions packages/db/src/indexes/btree-index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ export class BTreeIndex<
private orderedEntries: BTree<any, undefined> // we don't associate values with the keys of the B+ tree (the keys are indexed values)
private valueMap = new Map<any, Set<TKey>>() // instead we store a mapping of indexed values to a set of PKs
private indexedKeys = new Set<TKey>()
// Reverse map: key -> currently indexed value. Lets `remove`/`update`
// find the bucket a key actually lives in instead of trusting the
// caller-supplied old item, which can be stale across optimistic→synced
// lifecycle races.
private keyToValue = new Map<TKey, any>()
private compareFn: (a: any, b: any) => number = defaultComparator

constructor(
Expand Down Expand Up @@ -93,6 +98,13 @@ export class BTreeIndex<
// Normalize the value for Map key usage
const normalizedValue = normalizeForBTree(indexedValue)

// If the key is already indexed, drop it from its current bucket first so
// an `add` after a stale or missed `remove` can't leave the key in two
// buckets at once.
if (this.indexedKeys.has(key)) {
this.removeFromBucket(key, this.keyToValue.get(key))
}

// Check if this value already exists
if (this.valueMap.has(normalizedValue)) {
// Add to existing set
Expand All @@ -105,41 +117,45 @@ export class BTreeIndex<
}

this.indexedKeys.add(key)
this.keyToValue.set(key, normalizedValue)
this.updateTimestamp()
}

private removeFromBucket(key: TKey, normalizedValue: any): void {
const keySet = this.valueMap.get(normalizedValue)
if (!keySet) return
keySet.delete(key)
if (keySet.size === 0) {
this.valueMap.delete(normalizedValue)
this.orderedEntries.delete(normalizedValue)
}
}

/**
* Removes a value from the index
*/
remove(key: TKey, item: any): void {
let indexedValue: any
try {
indexedValue = this.evaluateIndexExpression(item)
} catch (error) {
console.warn(
`Failed to evaluate index expression for key ${key} during removal:`,
error,
)
return
}

// Normalize the value for Map key usage
const normalizedValue = normalizeForBTree(indexedValue)

if (this.valueMap.has(normalizedValue)) {
const keySet = this.valueMap.get(normalizedValue)!
keySet.delete(key)

// If set is now empty, remove the entry entirely
if (keySet.size === 0) {
this.valueMap.delete(normalizedValue)

// Remove from ordered entries
this.orderedEntries.delete(normalizedValue)
// Use the tracked bucket for this key. We intentionally do NOT trust
// the caller-supplied `item`: across optimistic→synced lifecycle races
// the previousValue passed in can disagree with the value we actually
// indexed, which would silently leave the key in a stale bucket.
if (this.keyToValue.has(key)) {
this.removeFromBucket(key, this.keyToValue.get(key))
} else {
// Fall back to evaluating the supplied item (covers the case where
// the key was never indexed but caller still asks to remove it).
try {
const indexedValue = this.evaluateIndexExpression(item)
this.removeFromBucket(key, normalizeForBTree(indexedValue))
} catch (error) {
console.warn(
`Failed to evaluate index expression for key ${key} during removal:`,
error,
)
}
}

this.indexedKeys.delete(key)
this.keyToValue.delete(key)
this.updateTimestamp()
}

Expand Down Expand Up @@ -169,6 +185,7 @@ export class BTreeIndex<
this.orderedEntries.clear()
this.valueMap.clear()
this.indexedKeys.clear()
this.keyToValue.clear()
this.updateTimestamp()
}

Expand Down
127 changes: 127 additions & 0 deletions packages/db/tests/index-key-bucket-tracking.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
import { describe, expect, it } from 'vitest'
import { createCollection } from '../src/collection/index.js'
import { createTransaction } from '../src/transactions'
import { BasicIndex } from '../src/indexes/basic-index'
import { BTreeIndex } from '../src/indexes/btree-index'
import { eq } from '../src/query/builder/functions'
import { PropRef } from '../src/query/ir'
import type { BaseIndex, IndexConstructor } from '../src/indexes/base-index'
import type { ChangeMessageOrDeleteKeyMessage } from '../src/types'

interface Project {
id: string
name: string
current_stage_id: string
updated_at: string
}

const row = (
id: string,
stage: string,
t: string = `2023-01-01T00:00:00Z`,
): Project => ({
id,
name: `Project-${id}`,
current_stage_id: stage,
updated_at: t,
})

describe.each([
[`BasicIndex`, BasicIndex],
[`BTreeIndex`, BTreeIndex],
] as Array<[string, IndexConstructor<string>]>)(
`%s: a key lives in at most one bucket at a time`,
(_name, IndexType) => {
it(`update moves the key out of its actual bucket, regardless of the oldItem passed`, () => {
const index = new IndexType(1, new PropRef([`current_stage_id`]))
index.add(`P`, row(`P`, `A`))
expect(index.equalityLookup(`A`)).toEqual(new Set([`P`]))

// The oldItem disagrees with the value the index recorded for P.
index.update(`P`, row(`P`, `B`), row(`P`, `C`))

expect(index.equalityLookup(`A`)).toEqual(new Set())
expect(index.equalityLookup(`B`)).toEqual(new Set())
expect(index.equalityLookup(`C`)).toEqual(new Set([`P`]))
})

it(`add called twice without an intervening remove keeps the key in the latest bucket only`, () => {
const index = new IndexType(1, new PropRef([`current_stage_id`]))
index.add(`P`, row(`P`, `A`))
index.add(`P`, row(`P`, `B`))

expect(index.equalityLookup(`A`)).toEqual(new Set())
expect(index.equalityLookup(`B`)).toEqual(new Set([`P`]))
})

it(`an optimistic→synced update on the indexed column ends with the row in the new bucket`, async () => {
let resolveMutation: () => void
let begin!: () => void
let write!: (m: ChangeMessageOrDeleteKeyMessage<Project, string>) => void
let commit!: () => void
let markReady!: () => void

const collection = createCollection<Project, string>({
id: `projects`,
getKey: (item) => item.id,
autoIndex: `eager`,
defaultIndexType: IndexType,
startSync: true,
sync: {
sync: (ctx) => {
begin = ctx.begin
write = ctx.write
commit = ctx.commit
markReady = ctx.markReady
begin()
write({ type: `insert`, value: row(`P`, `A`) })
commit()
markReady()
},
},
})
await collection.stateWhenReady()

collection.subscribeChanges(() => {}, {
whereExpression: eq(new PropRef([`current_stage_id`]), `A`),
includeInitialState: true,
})
const idx = Array.from(
collection.indexes.values(),
)[0] as BaseIndex<string>

// Offline-style optimistic mutation (non-direct ambient transaction).
const tx = createTransaction({
autoCommit: false,
mutationFn: async () => {
await new Promise<void>((resolve) => {
resolveMutation = resolve
})
},
})
tx.mutate(() => {
collection.update(`P`, (draft) => {
draft.current_stage_id = `B`
})
})

const commitPromise = tx.commit()

begin()
write({
type: `update`,
value: row(`P`, `B`, `2023-01-02T00:00:00Z`),
})
commit()

resolveMutation!()
await commitPromise
await Promise.resolve()
await Promise.resolve()

expect(collection.get(`P`)?.current_stage_id).toBe(`B`)
expect(idx.equalityLookup(`A`)).toEqual(new Set())
expect(idx.equalityLookup(`B`)).toEqual(new Set([`P`]))
})
},
)
Loading