Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 61 additions & 3 deletions packages/db/src/collection/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ import {
CollectionRequiresConfigError,
CollectionRequiresSyncConfigError,
} from '../errors'
import { getBuilderFromConfig } from '../query/live/collection-registry.js'
import { currentStateAsChanges } from './change-events'
import { TrackedSourceRecordsManager } from './tracked-source-records.js'
import { registerTrackedSourceRecordsManager } from './tracked-source-records-store.js'

import { CollectionStateManager } from './state'
import { CollectionChangesManager } from './changes'
Expand Down Expand Up @@ -34,6 +37,9 @@ import type {
SingleResult,
StringCollationConfig,
SubscribeChangesOptions,
SubscribeTrackedSourceRecordsOptions,
TrackedSourceRecord,
TrackedSourceRecordsChange,
Transaction as TransactionType,
UtilsRecord,
WritableDeep,
Expand All @@ -44,6 +50,14 @@ import type { WithVirtualProps } from '../virtual-props.js'

export type { CollectionIndexMetadata } from './events.js'

type LiveQueryTrackedSourceView = {
snapshot: () => Array<TrackedSourceRecord>
subscribe: (
callback: (change: TrackedSourceRecordsChange) => void,
options?: SubscribeTrackedSourceRecordsOptions,
) => () => void
}

/**
* Enhanced Collection interface that includes both data type T and utilities TUtils
* @template T - The type of items in the collection
Expand Down Expand Up @@ -255,9 +269,7 @@ export function createCollection(
schema?: StandardSchemaV1
},
): Collection<any, string | number, UtilsRecord, any, any> {
const collection = new CollectionImpl<any, string | number, any, any, any>(
options,
)
const collection = new CollectionImpl(options)

// Attach utils to collection
if (options.utils) {
Expand Down Expand Up @@ -299,6 +311,12 @@ export class CollectionImpl<
// The core state of the collection is "public" so that is accessible in tests
// and for debugging
public _state: CollectionStateManager<TOutput, TKey, TSchema, TInput>
// Aggregated view of source-records currently being used by active live
// queries that depend on this collection.
private readonly _trackedSourceRecords: TrackedSourceRecordsManager<TKey>
// For live-query collections only: a live-query-local view of "source
// records this query is currently using." Undefined on base collections.
private readonly _liveQueryTrackedSourceView?: LiveQueryTrackedSourceView

/**
* When set, collection consumers should defer processing incoming data
Expand Down Expand Up @@ -354,6 +372,10 @@ export class CollectionImpl<
this._mutations = new CollectionMutationsManager(config, this.id)
this._state = new CollectionStateManager(config)
this._sync = new CollectionSyncManager(config, this.id)
this._trackedSourceRecords = new TrackedSourceRecordsManager<TKey>(this.id)
registerTrackedSourceRecordsManager(this, this._trackedSourceRecords)
this._liveQueryTrackedSourceView =
getBuilderFromConfig(config)?.liveQueryTrackedSourceView

this.comparisonOpts = buildCompareOptionsFromConfig(config)

Expand Down Expand Up @@ -941,6 +963,42 @@ export class CollectionImpl<
return this._changes.subscribeChanges(callback, options)
}

/**
* Snapshot of source records currently being tracked through this
* collection's data flow.
*
* On a base collection: the union of records OF this collection being
* used by any active live query. Each record appears once regardless
* of how many queries reference it.
*
* On a live query collection: the records FROM this query's source
* collections that the query is currently using.
*
* Both views answer "what source records are currently flowing through
* me," from opposite ends of the data-flow graph.
*/
public getTrackedSourceRecords(): Array<TrackedSourceRecord> {
return (
this._liveQueryTrackedSourceView?.snapshot() ??
this._trackedSourceRecords.get()
)
}

/**
* Subscribe to changes in the set of source records tracked through this
* collection's data flow. See `getTrackedSourceRecords` for the per-
* collection-type semantics.
*/
public subscribeTrackedSourceRecords(
callback: (change: TrackedSourceRecordsChange) => void,
options?: SubscribeTrackedSourceRecordsOptions,
): () => void {
if (this._liveQueryTrackedSourceView) {
return this._liveQueryTrackedSourceView.subscribe(callback, options)
}
return this._trackedSourceRecords.subscribe(callback, options)
}

/**
* Subscribe to a collection event
*/
Expand Down
22 changes: 22 additions & 0 deletions packages/db/src/collection/tracked-source-records-store.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import type { TrackedSourceRecordsManager } from './tracked-source-records.js'

const trackedSourceRecordsManagers = new WeakMap<
object,
TrackedSourceRecordsManager
>()

export function registerTrackedSourceRecordsManager(
collection: object,
manager: TrackedSourceRecordsManager,
): void {
trackedSourceRecordsManagers.set(collection, manager)
}

export function applyTrackedSourceRecordDelta(
collection: object | undefined,
added: Iterable<string | number>,
removed: Iterable<string | number>,
): void {
if (!collection) return
trackedSourceRecordsManagers.get(collection)?.apply(added, removed)
}
98 changes: 98 additions & 0 deletions packages/db/src/collection/tracked-source-records.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
import type {
SubscribeTrackedSourceRecordsOptions,
TrackedSourceRecord,
TrackedSourceRecordsChange,
} from '../types.js'

type Entry<TKey> = { key: TKey; refCount: number }

/**
* Per-base-collection tracked source records manager.
*
* Refcounts over active live queries that depend on this collection. Each
* live query's aggregator pushes its net alias-level transitions here; this
* manager dedupes across queries and emits to subscribers only on true 0↔1
* transitions.
*/
export class TrackedSourceRecordsManager<
TKey extends string | number = string | number,
> {
// Keys are primitives; use them directly as the Map key. No serialization.
private readonly entries = new Map<TKey, Entry<TKey>>()
private readonly listeners = new Set<
(change: TrackedSourceRecordsChange) => void
>()

constructor(private readonly collectionId: string) {}

apply(added: Iterable<TKey>, removed: Iterable<TKey>): void {
const keyDeltas = new Map<TKey, number>()
for (const key of added) {
const currentDelta = keyDeltas.get(key) ?? 0
keyDeltas.set(key, currentDelta + 1)
}
for (const key of removed) {
const currentDelta = keyDeltas.get(key) ?? 0
keyDeltas.set(key, currentDelta - 1)
}

const netAdded: Array<TKey> = []
const netRemoved: Array<TKey> = []

for (const [key, delta] of keyDeltas) {
if (delta === 0) continue
const existing = this.entries.get(key)

if (delta > 0) {
if (existing) {
existing.refCount += delta
} else {
this.entries.set(key, { key, refCount: delta })
netAdded.push(key)
}
continue
}

if (!existing) {
continue
}

const nextRefCount = existing.refCount + delta
if (nextRefCount <= 0) {
this.entries.delete(key)
netRemoved.push(existing.key)
} else {
existing.refCount = nextRefCount
}
}

if (netAdded.length === 0 && netRemoved.length === 0) return
if (this.listeners.size === 0) return
const change: TrackedSourceRecordsChange = {
added: netAdded.map((key) => this.toRecord(key)),
removed: netRemoved.map((key) => this.toRecord(key)),
}
for (const listener of this.listeners) listener(change)
}

get(): Array<TrackedSourceRecord> {
return Array.from(this.entries.values(), ({ key }) => this.toRecord(key))
}

subscribe(
callback: (change: TrackedSourceRecordsChange) => void,
options?: SubscribeTrackedSourceRecordsOptions,
): () => void {
this.listeners.add(callback)
if (options?.includeInitialState && this.entries.size > 0) {
callback({ added: this.get(), removed: [] })
}
return () => {
this.listeners.delete(callback)
}
}

private toRecord(key: TKey): TrackedSourceRecord {
return { collectionId: this.collectionId, key }
}
}
9 changes: 7 additions & 2 deletions packages/db/src/query/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,13 @@ export {
// One-shot query execution
export { queryOnce, type QueryOnceConfig } from './query-once.js'

export { type LiveQueryCollectionConfig } from './live/types.js'
export { type LiveQueryCollectionUtils } from './live/collection-config-builder.js'
export type {
SubscribeTrackedSourceRecordsOptions,
TrackedSourceRecord,
TrackedSourceRecordsChange,
} from '../types.js'
export type { LiveQueryCollectionConfig } from './live/types.js'
export { type LiveQueryCollectionUtils } from './live-query-collection.js'

// Predicate utilities for predicate push-down
export {
Expand Down
38 changes: 20 additions & 18 deletions packages/db/src/query/live-query-collection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import type {
RootQueryResult,
} from './builder/types.js'

export type { LiveQueryCollectionUtils } from './live/collection-config-builder.js'

type CollectionConfigForContext<
TContext extends Context,
TResult extends object,
Expand Down Expand Up @@ -180,26 +182,26 @@ export function createLiveQueryCollection<
TContext,
TResult
> & { utils: LiveQueryCollectionUtils & TUtils }
} else {
// Config object case
const config = configOrQuery as LiveQueryCollectionConfig<
TContext,
TResult
> & { utils?: TUtils }
// Same overload implementation limitation as above: the config has already
// been validated by the public signatures, but the branch loses that precision.
const options = liveQueryCollectionOptions(config as any)
}

// Merge custom utils if provided, preserving the getBuilder() method for dependency tracking
if (config.utils) {
options.utils = { ...options.utils, ...config.utils }
}
// Config object case. Same overload implementation limitation as above:
// the config has already been validated by the public signatures, but the
// branch loses that precision.
const config = configOrQuery as LiveQueryCollectionConfig<
TContext,
TResult
> & { utils?: TUtils }
const options = liveQueryCollectionOptions(config as any)

return bridgeToCreateCollection(options) as CollectionForContext<
TContext,
TResult
> & { utils: LiveQueryCollectionUtils & TUtils }
// Merge custom utils if provided, preserving the getBuilder() method for dependency tracking
if (config.utils) {
options.utils = { ...options.utils, ...config.utils }
}

return bridgeToCreateCollection(options) as CollectionForContext<
TContext,
TResult
> & { utils: LiveQueryCollectionUtils & TUtils }
}

/**
Expand All @@ -212,13 +214,13 @@ function bridgeToCreateCollection<
>(
options: CollectionConfig<TResult> & { utils: TUtils },
): Collection<TResult, string | number, TUtils> {
const builder = getBuilderFromConfig(options)
const collection = createCollection(options as any) as unknown as Collection<
TResult,
string | number,
LiveQueryCollectionUtils
>

const builder = getBuilderFromConfig(options)
if (builder) {
registerCollectionBuilder(collection, builder)
}
Expand Down
Loading
Loading