Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/chatty-files-hear.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@tanstack/db": patch
---

Add `subscribeKeyChanges` for subscribing to future changes for a single collection key.
73 changes: 73 additions & 0 deletions packages/db/src/collection/changes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ export class CollectionChangesManager<

public activeSubscribersCount = 0
public changeSubscriptions = new Set<CollectionSubscription>()
private keyChangeSubscriptions = new Map<TKey, Set<CollectionSubscription>>()
public batchedEvents: Array<ChangeMessage<TOutput, TKey>> = []
public shouldBatchEvents = false

Expand Down Expand Up @@ -112,6 +113,39 @@ export class CollectionChangesManager<
for (const subscription of this.changeSubscriptions) {
subscription.emitEvents(enrichedEvents)
}

this.emitKeyChangeEvents(enrichedEvents)
}

private emitKeyChangeEvents(
enrichedEvents: Array<ChangeMessage<WithVirtualProps<TOutput, TKey>, TKey>>,
): void {
const changesBySubscription = new Map<
CollectionSubscription,
Array<ChangeMessage<WithVirtualProps<TOutput, TKey>, TKey>>
>()

for (const change of enrichedEvents) {
const subscriptions = this.keyChangeSubscriptions.get(change.key)

if (!subscriptions) {
continue
}

for (const subscription of subscriptions) {
const existingChanges = changesBySubscription.get(subscription)

if (existingChanges) {
existingChanges.push(change)
} else {
changesBySubscription.set(subscription, [change])
}
}
}

for (const [subscription, changes] of changesBySubscription) {
subscription.emitEvents(changes)
}
}

/**
Expand Down Expand Up @@ -176,6 +210,45 @@ export class CollectionChangesManager<
return subscription
}

/**
* Subscribe to future changes for a single collection key.
*/
public subscribeKeyChanges(
key: TKey,
callback: (
changes: Array<ChangeMessage<WithVirtualProps<TOutput, TKey>, TKey>>,
) => void,
): CollectionSubscription {
const subscription = new CollectionSubscription(this.collection, callback, {
onUnsubscribe: () => {
this.removeSubscriber()

const subscriptions = this.keyChangeSubscriptions.get(key)
if (!subscriptions) {
return
}

subscriptions.delete(subscription)

if (subscriptions.size === 0) {
this.keyChangeSubscriptions.delete(key)
}
},
})

// Key subscriptions do not request a snapshot. Callers can read the current
// row with collection.get(key), then use this method for future row changes.
subscription.markAllStateAsSeen()

const subscriptions = this.keyChangeSubscriptions.get(key) ?? new Set()
subscriptions.add(subscription)
this.keyChangeSubscriptions.set(key, subscriptions)

this.addSubscriber()

return subscription
}

/**
* Increment the active subscribers count and start sync if needed
*/
Expand Down
27 changes: 27 additions & 0 deletions packages/db/src/collection/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -941,6 +941,33 @@ export class CollectionImpl<
return this._changes.subscribeChanges(callback, options)
}

/**
* Subscribe to future changes for a single collection key.
*
* This does not emit the current row. Use collection.get(key) to read the
* current value, then subscribeKeyChanges(key, callback) to react to future
* inserts, updates, and deletes for that key.
*
* @example
* const currentTodo = todos.get("todo-1")
*
* const subscription = todos.subscribeKeyChanges("todo-1", (changes) => {
* for (const change of changes) {
* console.log(change.type, change.value)
* }
* })
*
* // Later: subscription.unsubscribe()
*/
public subscribeKeyChanges(
key: TKey,
callback: (
changes: Array<ChangeMessage<WithVirtualProps<TOutput, TKey>, TKey>>,
) => void,
): CollectionSubscription {
return this._changes.subscribeKeyChanges(key, callback)
}

/**
* Subscribe to a collection event
*/
Expand Down
183 changes: 183 additions & 0 deletions packages/db/tests/collection-subscribe-changes.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ import type {
// Helper function to wait for changes to be processed
const waitForChanges = () => new Promise((resolve) => setTimeout(resolve, 10))

type SyncFunctions<T extends object, TKey extends string | number> = Pick<
Parameters<SyncConfig<T, TKey>[`sync`]>[0],
`begin` | `write` | `commit` | `markReady`
>

const normalizeChange = <T extends Record<string, any>>(
change: ChangeMessage<T>,
): ChangeMessage<T> => ({
Expand Down Expand Up @@ -2154,6 +2159,184 @@ describe(`Collection.subscribeChanges`, () => {
})
})

describe(`Collection.subscribeKeyChanges`, () => {
type TestItem = { id: string; value: string }

const createManualSyncCollection = (id: string) => {
let syncFns: SyncFunctions<TestItem, string> | undefined

const collection = createCollection<TestItem, string>({
id,
getKey: (item) => item.id,
sync: {
sync: (params) => {
syncFns = params
params.markReady()
},
},
})

collection.startSyncImmediate()

if (!syncFns) {
throw new Error(`Sync functions were not initialized`)
}

return { collection, syncFns }
}

const writeSyncedChanges = (
syncFns: SyncFunctions<TestItem, string>,
changes: Array<Omit<ChangeMessage<TestItem, string>, `key`>>,
) => {
syncFns.begin()

for (const change of changes) {
syncFns.write(change)
}

syncFns.commit()
}

it(`should emit only future changes for the subscribed key`, () => {
const { collection, syncFns } = createManualSyncCollection(
`subscribe-key-changes-filter-test`,
)
const changes: Array<
ChangeMessage<OutputWithVirtual<TestItem, string>, string>
> = []

const subscription = collection.subscribeKeyChanges(`row-1`, (events) => {
changes.push(...events)
})

writeSyncedChanges(syncFns, [
{ type: `insert`, value: { id: `row-1`, value: `one` } },
{ type: `insert`, value: { id: `row-2`, value: `two` } },
])

expect(changes).toHaveLength(1)
expect(changes[0]).toMatchObject({
type: `insert`,
key: `row-1`,
value: { id: `row-1`, value: `one` },
})

changes.length = 0

writeSyncedChanges(syncFns, [
{ type: `update`, value: { id: `row-2`, value: `two updated` } },
])

expect(changes).toEqual([])

writeSyncedChanges(syncFns, [
{ type: `update`, value: { id: `row-1`, value: `one updated` } },
])

expect(changes).toHaveLength(1)
expect(changes[0]).toMatchObject({
type: `update`,
key: `row-1`,
value: { id: `row-1`, value: `one updated` },
})

subscription.unsubscribe()
})

it(`should emit matching sync changes written while the subscription starts sync`, () => {
const changes: Array<
ChangeMessage<OutputWithVirtual<TestItem, string>, string>
> = []

const collection = createCollection<TestItem, string>({
id: `subscribe-key-changes-start-sync-test`,
getKey: (item) => item.id,
sync: {
sync: ({ begin, write, commit, markReady }) => {
begin()
write({
type: `insert`,
value: { id: `row-1`, value: `one` },
})
write({
type: `insert`,
value: { id: `row-2`, value: `two` },
})
commit()
markReady()
},
},
})

const subscription = collection.subscribeKeyChanges(`row-1`, (events) => {
changes.push(...events)
})

expect(changes).toHaveLength(1)
expect(changes[0]).toMatchObject({
type: `insert`,
key: `row-1`,
value: { id: `row-1`, value: `one` },
})

subscription.unsubscribe()
})

it(`should not emit an initial snapshot and should still emit future deletes`, () => {
const { collection, syncFns } = createManualSyncCollection(
`subscribe-key-changes-delete-test`,
)
const changes: Array<
ChangeMessage<OutputWithVirtual<TestItem, string>, string>
> = []

writeSyncedChanges(syncFns, [
{ type: `insert`, value: { id: `row-1`, value: `one` } },
])

const subscription = collection.subscribeKeyChanges(`row-1`, (events) => {
changes.push(...events)
})

expect(changes).toEqual([])

writeSyncedChanges(syncFns, [
{ type: `delete`, value: { id: `row-1`, value: `one` } },
])

expect(changes).toHaveLength(1)
expect(changes[0]).toMatchObject({
type: `delete`,
key: `row-1`,
value: { id: `row-1`, value: `one` },
})

subscription.unsubscribe()
})

it(`should stop emitting changes after unsubscribe`, () => {
const { collection, syncFns } = createManualSyncCollection(
`subscribe-key-changes-unsubscribe-test`,
)
const changes: Array<
ChangeMessage<OutputWithVirtual<TestItem, string>, string>
> = []

const subscription = collection.subscribeKeyChanges(`row-1`, (events) => {
changes.push(...events)
})

subscription.unsubscribe()

writeSyncedChanges(syncFns, [
{ type: `insert`, value: { id: `row-1`, value: `one` } },
])

expect(changes).toEqual([])
})
})

describe(`Virtual properties`, () => {
it(`should include virtual properties in change messages`, async () => {
const changes: Array<
Expand Down
24 changes: 23 additions & 1 deletion packages/db/tests/collection.test-d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { assertType, describe, expectTypeOf, it } from 'vitest'
import { z } from 'zod'
import { createCollection } from '../src/collection/index.js'
import type { OutputWithVirtual } from './utils'
import type { OperationConfig } from '../src/types'
import type { ChangeMessage, OperationConfig } from '../src/types'
import type { StandardSchemaV1 } from '@standard-schema/spec'

describe(`Collection.update type tests`, () => {
Expand Down Expand Up @@ -49,6 +49,28 @@ describe(`Collection.update type tests`, () => {
})
})

describe(`Collection.subscribeKeyChanges type tests`, () => {
type TypeTestItem = { id: string; value: number }

const testCollection = createCollection<TypeTestItem, string>({
getKey: (item) => item.id,
sync: { sync: () => {} },
})

it(`should type callback changes by collection key and output`, () => {
const subscription = testCollection.subscribeKeyChanges(
`id1`,
(changes) => {
expectTypeOf(changes).toEqualTypeOf<
Array<ChangeMessage<OutputWithVirtual<TypeTestItem, string>, string>>
>()
},
)

expectTypeOf(subscription.unsubscribe).toEqualTypeOf<() => void>()
})
})

describe(`Collection type resolution tests`, () => {
// Define test types
type ExplicitType = { id: string; explicit: boolean }
Expand Down