Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
148 changes: 78 additions & 70 deletions crates/bindings-typescript/src/server/procedures.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,11 @@ import {
} from '../lib/type_builders';
import { bsatnBaseSize } from '../lib/util';
import { Uuid } from '../lib/uuid';
import type { HttpClient } from '../server/http_internal';
import { httpClient } from './http_internal';
import { httpClient, type HttpClient } from './http_internal';
import { makeRandom, type Random } from './rng';
import { callUserFunction, ReducerCtxImpl, sys } from './runtime';
import type { SchemaInner } from './schema';

const { freeze } = Object;

export type ProcedureFn<
S extends UntypedSchemaDef,
Params extends ParamsObj,
Expand All @@ -37,7 +35,7 @@ export interface ProcedureCtx<S extends UntypedSchemaDef> {
readonly timestamp: Timestamp;
readonly connectionId: ConnectionId | null;
readonly http: HttpClient;
readonly counter_uuid: { value: number };
readonly random: Random;
withTx<T>(body: (ctx: TransactionCtx<S>) => T): T;
newUuidV4(): Uuid;
newUuidV7(): Uuid;
Expand Down Expand Up @@ -107,76 +105,86 @@ export function callProcedure(
moduleCtx.procedures[id];
const args = deserializeArgs(new BinaryReader(argsBuf));

const ctx: ProcedureCtx<UntypedSchemaDef> = {
const ctx: ProcedureCtx<UntypedSchemaDef> = new ProcedureCtxImpl(
sender,
timestamp,
connectionId,
http: httpClient,
// **Note:** must be 0..=u32::MAX
counter_uuid: { value: Number(0) },
get identity() {
return new Identity(sys.identity());
},
withTx(body) {
const run = () => {
const timestamp = sys.procedure_start_mut_tx();

try {
const ctx: TransactionCtx<UntypedSchemaDef> = new ReducerCtxImpl(
sender,
new Timestamp(timestamp),
connectionId
);
return body(ctx);
} catch (e) {
sys.procedure_abort_mut_tx();
throw e;
}
};

let res = run();
try {
sys.procedure_commit_mut_tx();
return res;
} catch {
// ignore the commit error
}
console.warn('committing anonymous transaction failed');
res = run();
try {
sys.procedure_commit_mut_tx();
return res;
} catch (e) {
throw new Error('transaction retry failed again', { cause: e });
}
},
/**
* Create a new random {@link Uuid} `v4` using the {@link crypto} RNG.
*
* WARN: Until we use a spacetime RNG this make calls non-deterministic.
*/
newUuidV4(): Uuid {
// TODO: Use a spacetime RNG when available
const bytes = crypto.getRandomValues(new Uint8Array(16));
return Uuid.fromRandomBytesV4(bytes);
},

/**
* Create a new sortable {@link Uuid} `v7` using the {@link crypto} RNG, counter,
* and the timestamp.
*
* WARN: Until we use a spacetime RNG this make calls non-deterministic.
*/
newUuidV7(): Uuid {
// TODO: Use a spacetime RNG when available
const bytes = crypto.getRandomValues(new Uint8Array(10));
return Uuid.fromCounterV7(this.counter_uuid, this.timestamp, bytes);
},
};
freeze(ctx);
connectionId
);

const ret = callUserFunction(fn, ctx, args);
const retBuf = new BinaryWriter(returnTypeBaseSize);
serializeReturn(retBuf, ret);
return retBuf.getBuffer();
}

type IProcedureCtx<S extends UntypedSchemaDef> = ProcedureCtx<S>;
const ProcedureCtxImpl = class ProcedureCtx<S extends UntypedSchemaDef>
implements IProcedureCtx<S>
{
#identity: Identity | undefined;
#uuidCounter: { value: 0 } | undefined;
#random: Random | undefined;

constructor(
readonly sender: Identity,
readonly timestamp: Timestamp,
readonly connectionId: ConnectionId | null
) {}

get identity() {
return (this.#identity ??= new Identity(sys.identity()));
}

get random() {
return (this.#random ??= makeRandom(this.timestamp));
}

get http() {
return httpClient;
}

withTx<T>(body: (ctx: TransactionCtx<S>) => T): T {
const run = () => {
const timestamp = sys.procedure_start_mut_tx();

try {
const ctx: TransactionCtx<UntypedSchemaDef> = new ReducerCtxImpl(
this.sender,
new Timestamp(timestamp),
this.connectionId
);
return body(ctx);
} catch (e) {
sys.procedure_abort_mut_tx();
throw e;
}
};

let res = run();
try {
sys.procedure_commit_mut_tx();
return res;
} catch {
// ignore the commit error
}
console.warn('committing anonymous transaction failed');
res = run();
try {
sys.procedure_commit_mut_tx();
return res;
} catch (e) {
throw new Error('transaction retry failed again', { cause: e });
}
}

newUuidV4(): Uuid {
const bytes = this.random.fill(new Uint8Array(16));
return Uuid.fromRandomBytesV4(bytes);
}

newUuidV7(): Uuid {
const bytes = this.random.fill(new Uint8Array(4));
const counter = (this.#uuidCounter ??= { value: 0 });
return Uuid.fromCounterV7(counter, this.timestamp, bytes);
}
};
1 change: 0 additions & 1 deletion crates/bindings-typescript/src/server/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,6 @@ export const ReducerCtxImpl = class ReducerCtx<
* Create a new random {@link Uuid} `v4` using this `ReducerCtx`'s RNG.
*/
newUuidV4(): Uuid {
// TODO: Use a spacetime RNG when available
const bytes = this.random.fill(new Uint8Array(16));
return Uuid.fromRandomBytesV4(bytes);
}
Expand Down
Loading