Skip to content
1 change: 0 additions & 1 deletion build-docs.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,3 @@ then
else
git status
fi

5 changes: 5 additions & 0 deletions eslint.config.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ const opts = tseslint.config(
"no-restricted-globals": ["error", "URL", "TextDecoder", "TextEncoder"],
},
},
{
rules: {
"@typescript-eslint/no-unused-vars": ["error", { argsIgnorePattern: "^_", destructuredArrayIgnorePattern: "^_" }],
},
},
);

export default opts;
9 changes: 6 additions & 3 deletions src/crdt-clock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@ import {
type BaseBlockstore,
type CarTransaction,
PARAM,
throwFalsy,
} from "./types.js";
import { applyHeadQueue, ApplyHeadQueue } from "./apply-head-queue.js";
import { ensureLogger } from "./utils.js";
import { arrayFromAsyncIterable, ensureLogger } from "./utils.js";

export class CRDTClockImpl {
// todo: track local and remote clocks independently, merge on read
Expand Down Expand Up @@ -70,8 +71,10 @@ export class CRDTClockImpl {
async processUpdates(updatesAcc: DocUpdate<DocTypes>[], all: boolean, prevHead: ClockHead) {
let internalUpdates = updatesAcc;
if (this.watchers.size && !all) {
const changes = await clockChangesSince<DocTypes>(this.blockstore, this.head, prevHead, {}, this.logger);
internalUpdates = changes.result;
const changes = await arrayFromAsyncIterable(
clockChangesSince(throwFalsy(this.blockstore), this.head, prevHead, {}, this.logger),
);
internalUpdates = changes;
}
this.zoomers.forEach((fn) => fn());
this.notifyWatchers(internalUpdates || []);
Expand Down
131 changes: 94 additions & 37 deletions src/crdt-helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { parse } from "multiformats/link";
import { sha256 as hasher } from "multiformats/hashes/sha2";
import * as codec from "@ipld/dag-cbor";
import { put, get, entries, root } from "@fireproof/vendor/@web3-storage/pail/crdt";
import { EventBlockView, EventLink, Operation, PutOperation } from "@fireproof/vendor/@web3-storage/pail/crdt/api";
import { EventBlockView, EventLink, Operation, PutOperation, UnknownLink } from "@fireproof/vendor/@web3-storage/pail/crdt/api";
import { EventFetcher, vis } from "@fireproof/vendor/@web3-storage/pail/clock";
import * as Batch from "@fireproof/vendor/@web3-storage/pail/crdt/batch";
import {
Expand All @@ -30,18 +30,24 @@ import {
CarTransaction,
BaseBlockstore,
PARAM,
ClockLink,
DocFragment,
Row,
DocumentRow,
IndexKey,
} from "./types.js";
import { Result } from "@fireproof/vendor/@web3-storage/pail/crdt/api";
import { Logger } from "@adviser/cement";
import { CarTransactionImpl } from "./blockstore/transaction.js";

// eslint-disable-next-line @typescript-eslint/no-unused-vars
function time(tag: string) {
// @ts-expect-error "charwise" has no types
import charwise from "charwise";

function time(_tag: string) {
// console.time(tag)
}

// eslint-disable-next-line @typescript-eslint/no-unused-vars
function timeEnd(tag: string) {
function timeEnd(_tag: string) {
// console.timeEnd(tag)
}

Expand Down Expand Up @@ -127,6 +133,10 @@ export async function applyBulkUpdateToCrdt<T extends DocTypes>(
return { head: result.head } satisfies CRDTMeta;
}

export function docUpdateToDocWithId<T extends DocTypes>({ id, del, value }: DocUpdate<T>): DocWithId<T> {
return (del ? { _id: id, _deleted: true } : { _id: id, ...value }) as DocWithId<T>;
}

// this whole thing can get pulled outside of the write queue
async function writeDocContent<T extends DocTypes>(
store: StoreRuntime,
Expand Down Expand Up @@ -279,50 +289,74 @@ class DirtyEventFetcher<T> extends EventFetcher<T> {
}
}

export async function clockChangesSince<T extends DocTypes>(
export async function* clockUpdatesSince<K extends IndexKeyType, T extends DocTypes, R extends DocFragment>(
blocks: BlockFetcher,
head: ClockHead,
since: ClockHead,
opts: ChangesOptions = {},
logger: Logger,
allowedKeys?: Set<string>,
): AsyncGenerator<Row<K, R> & { clock: ClockLink }> {
for await (const { id, clock, docLink } of clockChangesSince(blocks, head, since, opts, logger, allowedKeys)) {
const { doc } = await getValueFromLink<T>(blocks, docLink, logger);
yield { id, key: [charwise.encode(id) as K, id], value: docValues<T, R>(doc) as R, clock };
}
}

export async function* clockUpdatesSinceWithDoc<K extends IndexKeyType, T extends DocTypes, R extends DocFragment>(
blocks: BlockFetcher,
head: ClockHead,
since: ClockHead,
opts: ChangesOptions = {},
logger: Logger,
allowedKeys?: Set<string>,
): AsyncGenerator<DocumentRow<K, T, R> & { clock: ClockLink }> {
for await (const { id, clock, docLink } of clockChangesSince(blocks, head, since, opts, logger, allowedKeys)) {
const { doc } = await getValueFromLink<T>(blocks, docLink, logger);
const key: IndexKey<K> = [charwise.encode(id) as K, id];

// NOTE: Technically not correct, probably best to remove when removing the old top-level API.
if (!doc) yield { id, key, doc: { _id: id, _deleted: true } as DocWithId<T>, value: [] as R, clock };
else yield { id, key, doc, value: docValues<T, R>(doc) as R, clock };
}
}

export function clockChangesSince(
blocks: BlockFetcher,
head: ClockHead,
since: ClockHead,
opts: ChangesOptions,
logger: Logger,
): Promise<{ result: DocUpdate<T>[]; head: ClockHead }> {
allowedKeys?: Set<string>,
): AsyncGenerator<{ id: string; docLink: UnknownLink; clock: ClockLink }> {
const eventsFetcher = (
opts.dirty ? new DirtyEventFetcher<Operation>(logger, blocks) : new EventFetcher<Operation>(blocks)
) as EventFetcher<Operation>;
const keys = new Set<string>();
const updates = await gatherUpdates<T>(
blocks,
eventsFetcher,
head,
since,
[],
keys,
new Set<string>(),
opts.limit || Infinity,
logger,
);
return { result: updates.reverse(), head };
return gatherUpdates(eventsFetcher, head, since, keys, new Set<string>(), opts.limit || Infinity, logger, allowedKeys);
}

async function gatherUpdates<T extends DocTypes>(
blocks: BlockFetcher,
async function* gatherUpdates(
eventsFetcher: EventFetcher<Operation>,
head: ClockHead,
since: ClockHead,
updates: DocUpdate<T>[] = [],
keys: Set<string>,
didLinks: Set<string>,
limit: number,
logger: Logger,
): Promise<DocUpdate<T>[]> {
if (limit <= 0) return updates;
allowedKeys?: Set<string>,
): AsyncGenerator<{ id: string; docLink: UnknownLink; clock: ClockLink }> {
if (limit <= 0) return;

// if (Math.random() < 0.001) console.log('gatherUpdates', head.length, since.length, updates.length)
const sHead = head.map((l) => l.toString());

for (const link of since) {
if (sHead.includes(link.toString())) {
return updates;
return;
}
}

for (const link of head) {
if (didLinks.has(link.toString())) continue;
didLinks.add(link.toString());
Expand All @@ -337,34 +371,54 @@ async function gatherUpdates<T extends DocTypes>(
}
for (let i = ops.length - 1; i >= 0; i--) {
const { key, value } = ops[i];
if (!keys.has(key)) {
if (!keys.has(key) && (allowedKeys === undefined || allowedKeys.has(key))) {
// todo option to see all updates
const docValue = await getValueFromLink<T>(blocks, value, logger);
if (key === PARAM.GENESIS_CID) {
continue;
}
updates.push({ id: key, value: docValue.doc, del: docValue.del, clock: link });
yield { id: key, docLink: value, clock: link };
limit--;
keys.add(key);
}
}
if (event.parents) {
updates = await gatherUpdates(blocks, eventsFetcher, event.parents, since, updates, keys, didLinks, limit, logger);
yield* gatherUpdates(eventsFetcher, event.parents, since, keys, didLinks, limit, logger);
}
}
return updates;
}

export async function* getAllEntries<T extends DocTypes>(blocks: BlockFetcher, head: ClockHead, logger: Logger) {
// return entries(blocks, head)
for await (const [key, link] of entries(blocks, head)) {
if (key !== PARAM.GENESIS_CID) {
const docValue = await getValueFromLink(blocks, link, logger);
yield { id: key, value: docValue.doc, del: docValue.del } as DocUpdate<T>;
export async function* getAllEntries<K extends IndexKeyType, T extends DocTypes, R extends DocFragment>(
blocks: BlockFetcher,
head: ClockHead,
logger: Logger,
): AsyncGenerator<Row<K, R>> {
for await (const [id, link] of entries(blocks, head)) {
if (id !== PARAM.GENESIS_CID) {
const { doc } = await getValueFromLink<T>(blocks, link, logger);
yield { id, key: [charwise.encode(id) as K, id], value: docValues<T, R>(doc) as R };
}
}
}

export async function* getAllEntriesWithDoc<K extends IndexKeyType, T extends DocTypes, R extends DocFragment>(
blocks: BlockFetcher,
head: ClockHead,
logger: Logger,
): AsyncGenerator<DocumentRow<K, T, R>> {
for await (const [id, link] of entries(blocks, head)) {
if (id !== PARAM.GENESIS_CID) {
const { doc } = await getValueFromLink<T>(blocks, link, logger);
yield { id, key: [charwise.encode(id) as K, id], doc: doc, value: docValues<T, R>(doc) as R };
}
}
}

export function docValues<T extends DocTypes, R extends DocFragment>(doc: DocWithId<T>) {
return Object.entries(doc)
.filter(([k, _v]) => !k.startsWith("_"))
.map(([_k, v]) => v as R);
}

export async function* clockVis(blocks: BlockFetcher, head: ClockHead) {
for await (const line of vis(blocks, head)) {
yield line;
Expand Down Expand Up @@ -431,7 +485,10 @@ export async function doCompact(blockLog: CompactFetcher, head: ClockHead, logge
timeEnd("compact root blocks");

time("compact changes");
await clockChangesSince(blockLog, head, [], {}, logger);
// TODO
for await (const x of clockChangesSince(blockLog, head, [], {}, logger)) {
void x;
}
timeEnd("compact changes");

isCompacting = false;
Expand Down
Loading
Loading