Skip to content
1 change: 1 addition & 0 deletions packages/storage/src/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ export {
MAX_CURSOR_LENGTH,
} from "./tabular/Cursor";
export type { PageCursor } from "./tabular/Cursor";
export * from "./tabular/HttpTabularProxyStorage";
export * from "./tabular/HuggingFaceTabularStorage";
export * from "./tabular/InMemoryTabularMigrationApplier";
export * from "./tabular/InMemoryTabularStorage";
Expand Down
293 changes: 293 additions & 0 deletions packages/storage/src/tabular/HttpTabularProxyStorage.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,293 @@
/**
* @license
* Copyright 2026 Steven Roussey <sroussey@gmail.com>
* SPDX-License-Identifier: Apache-2.0
*/

import { createServiceToken, deepEqual, makeFingerprint } from "@workglow/util";
import { DataPortSchemaObject, FromSchema, TypedArraySchemaOptions } from "@workglow/util/schema";
import { PollingSubscriptionManager } from "../util/PollingSubscriptionManager";
import { BaseTabularStorage, ClientProvidedKeysOption } from "./BaseTabularStorage";
import type {
AnyTabularStorage,
AutoGeneratedKeys,
CoveringIndexQueryOptions,
DeleteSearchCriteria,
InsertEntity,
Page,
PageRequest,
QueryOptions,
SearchCriteria,
SimplifyPrimaryKey,
TabularChangePayload,
TabularSubscribeOptions,
} from "./ITabularStorage";

export const HTTP_TABULAR_PROXY_REPOSITORY = createServiceToken<AnyTabularStorage>(
"storage.tabularRepository.httpProxy"
);

/**
* Fetch-shaped function used to forward operations to the server. The signature
* matches the DOM `fetch` so a real browser fetch, an in-process Hono `app.fetch`,
* or the Electron `window.desktop.api` shim all work without adaptation.
*/
export type HttpTabularProxyFetch = (path: string, init?: RequestInit) => Promise<Response>;

export interface HttpTabularProxyOptions<
Schema extends DataPortSchemaObject,
PrimaryKeyNames extends ReadonlyArray<keyof Schema["properties"]>,
> {
readonly fetch: HttpTabularProxyFetch;
readonly table: string;
readonly schema: Schema;
readonly primaryKey: PrimaryKeyNames;
readonly indexes?: readonly (
| keyof Schema["properties"]
| readonly (keyof Schema["properties"])[]
)[];
/** Optional base path. Defaults to `/api/storage`. Trailing slashes are stripped. */
readonly basePath?: string;
/** Forwarded to BaseTabularStorage. Defaults to "if-missing". */
readonly clientProvidedKeys?: ClientProvidedKeysOption;
}
Comment on lines +45 to +53

function trimTrailingSlashes(value: string): string {
let end = value.length;
while (end > 0 && value.charCodeAt(end - 1) === 47) {
end--;
}
return end === value.length ? value : value.slice(0, end);
}

/**
* Storage adapter that forwards every {@link ITabularStorage} operation as
* `POST {basePath}/{table}/{op}` through an injected fetch impl.
*
* Transport is the fetch impl's concern — this class doesn't know whether the
* request crosses a socket (dev), a `MessageChannelMain` port (Electron prod),
* or runs in-process against `app.fetch` (tests / web mode).
*
* Wire format is JSON: values must be JSON-serialisable. Schemas using
* `Uint8Array` (blob) or `bigint` columns are NOT supported through this
* proxy — those types don't round-trip through `JSON.stringify`/`res.json()`.
* Add a field-aware (de)serializer here if a binary-valued schema ever needs
* to go over the proxy.
*/
export class HttpTabularProxyStorage<
Schema extends DataPortSchemaObject,
PrimaryKeyNames extends ReadonlyArray<keyof Schema["properties"]>,
Entity = FromSchema<Schema, TypedArraySchemaOptions>,
PrimaryKey = SimplifyPrimaryKey<Entity, PrimaryKeyNames>,
Value = Omit<Entity, PrimaryKeyNames[number] & keyof Entity>,
InsertType = InsertEntity<Entity, AutoGeneratedKeys<Schema>>,
> extends BaseTabularStorage<Schema, PrimaryKeyNames, Entity, PrimaryKey, Value, InsertType> {
protected readonly fetchImpl: HttpTabularProxyFetch;
protected readonly table: string;
protected readonly basePath: string;
private pollingManager: PollingSubscriptionManager<
Entity,
string,
TabularChangePayload<Entity>
> | null = null;

constructor(opts: HttpTabularProxyOptions<Schema, PrimaryKeyNames>) {
const indexes = (opts.indexes ?? []) as readonly (keyof Entity | readonly (keyof Entity)[])[];
super(opts.schema, opts.primaryKey, indexes, opts.clientProvidedKeys ?? "if-missing");
this.fetchImpl = opts.fetch;
this.table = opts.table;
this.basePath = trimTrailingSlashes(opts.basePath ?? "/api/storage");
}

/**
* Build the URL for an operation under this table.
*/
protected url(op: string): string {
return `${this.basePath}/${encodeURIComponent(this.table)}/${op}`;
}

/**
* Send a POST with a JSON body. Throws on non-2xx, propagating the server's
* `{ error }` body when present.
*/
protected async call<T>(op: string, body: unknown): Promise<T> {
const res = await this.fetchImpl(this.url(op), {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify(body ?? {}),
});
if (!res.ok) {
let message: string;
try {
const data = (await res.json()) as { error?: string };
message = data?.error ?? `HTTP ${res.status}`;
} catch {
message = `HTTP ${res.status}`;
}
throw new Error(`HttpTabularProxyStorage(${this.table}/${op}): ${message}`);
}
return (await res.json()) as T;
}
Comment on lines +113 to +130

async put(value: InsertType): Promise<Entity> {
const { entity } = await this.call<{ entity: Entity }>("put", { value });
this.events.emit("put", entity);
return entity;
}

async putBulk(values: InsertType[]): Promise<Entity[]> {
if (values.length === 0) return [];
const { entities } = await this.call<{ entities: Entity[] }>("putBulk", { values });
for (const entity of entities) this.events.emit("put", entity);
return entities;
}

async get(key: PrimaryKey): Promise<Entity | undefined> {
const { entity } = await this.call<{ entity: Entity | null }>("get", { key });
const result = entity ?? undefined;
this.events.emit("get", key, result);
return result;
}

async delete(key: PrimaryKey | Entity): Promise<void> {
await this.call<{ ok: true }>("delete", { key });
const { key: normalizedKey } = this.separateKeyValueFromCombined(key as Entity);
this.events.emit("delete", normalizedKey as keyof Entity);
}

override async getBulk(keys: readonly PrimaryKey[]): Promise<Entity[]> {
if (keys.length === 0) return [];
const { entities } = await this.call<{ entities: Entity[] }>("getBulk", { keys });
this.events.emit("getBulk", keys, entities);
return entities;
}

async query(
criteria: SearchCriteria<Entity>,
options?: QueryOptions<Entity>
): Promise<Entity[] | undefined> {
this.validateQueryParams(criteria, options);
const { entities } = await this.call<{ entities: Entity[] | null }>("query", {
criteria,
options,
});
const result = entities ?? undefined;
this.events.emit("query", criteria as Partial<Entity>, result);
return result;
}

async getAll(options?: QueryOptions<Entity>): Promise<Entity[] | undefined> {
this.validateGetAllOptions(options);
const { entities } = await this.call<{ entities: Entity[] | null }>("getAll", { options });
return entities ?? undefined;
}

async size(): Promise<number> {
const { size } = await this.call<{ size: number }>("size", {});
return size;
}

override async count(criteria?: SearchCriteria<Entity>): Promise<number> {
const { count } = await this.call<{ count: number }>("count", { criteria });
return count;
}

async deleteAll(): Promise<void> {
await this.call<{ ok: true }>("deleteAll", {});
this.events.emit("clearall");
}

async deleteSearch(criteria: DeleteSearchCriteria<Entity>): Promise<void> {
await this.call<{ ok: true }>("deleteSearch", { criteria });
}

async getOffsetPage(offset: number, limit: number): Promise<Entity[] | undefined> {
const { entities } = await this.call<{ entities: Entity[] | null }>("getOffsetPage", {
offset,
limit,
});
return entities ?? undefined;
}

override async getPage(request: PageRequest<Entity> = {}): Promise<Page<Entity>> {
this.validatePageRequest(request);
const { page } = await this.call<{ page: Page<Entity> }>("getPage", { request });
return page;
}

override async queryPage(
criteria: SearchCriteria<Entity>,
request: PageRequest<Entity> = {}
): Promise<Page<Entity>> {
this.validatePageRequest(request);
const { page } = await this.call<{ page: Page<Entity> }>("queryPage", {
criteria,
request,
});
return page;
}

override async queryIndex<K extends keyof Entity & string>(
criteria: SearchCriteria<Entity>,
options: CoveringIndexQueryOptions<Entity, K>
): Promise<Pick<Entity, K>[]> {
this.validateSelect(options);
this.validateQueryParams(criteria, options);
const { entities } = await this.call<{ entities: Pick<Entity, K>[] }>("queryIndex", {
criteria,
options,
});
return entities;
Comment on lines +230 to +240
}

override subscribeToChanges(
callback: (change: TabularChangePayload<Entity>) => void,
options?: TabularSubscribeOptions
): () => void {
if (this.pollingManager === null) {
this.pollingManager = new PollingSubscriptionManager<
Entity,
string,
TabularChangePayload<Entity>
>(
async () => {
const rows = (await this.getAll()) ?? [];
const map = new Map<string, Entity>();
for (const row of rows) {
const { key } = this.separateKeyValueFromCombined(row);
const fingerprint = await makeFingerprint(key);
map.set(fingerprint, row);
}
return map;
},
(a, b) => deepEqual(a, b),
{
insert: (item) => ({ type: "INSERT", new: item }),
update: (oldItem, newItem) => ({ type: "UPDATE", old: oldItem, new: newItem }),
delete: (item) => ({ type: "DELETE", old: item }),
},
{ defaultIntervalMs: 2000 }
);
}
const intervalMs = options?.pollingIntervalMs;
const unsubscribe = this.pollingManager.subscribe(
callback,
intervalMs !== undefined ? { intervalMs } : undefined
);
return () => {
unsubscribe();
if (this.pollingManager !== null && !this.pollingManager.hasSubscriptions) {
this.pollingManager.destroy();
this.pollingManager = null;
}
};
}

override destroy(): void {
if (this.pollingManager !== null) {
this.pollingManager.destroy();
this.pollingManager = null;
}
super.destroy();
}
}
Loading