Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
215 changes: 115 additions & 100 deletions pnpm-lock.yaml

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions rivetkit-typescript/packages/rivetkit/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -168,13 +168,14 @@
"actor-config-schema-gen": "tsx scripts/actor-config-schema-gen.ts"
},
"dependencies": {
"@rivet-dev/agent-os-core": "^0.1.1",
"@hono/node-server": "^1.18.2",
"@hono/node-ws": "^1.1.1",
"@hono/zod-openapi": "^1.1.5",
"@rivet-dev/agent-os-core": "^0.1.1",
"@rivetkit/bare-ts": "^0.6.2",
"@rivetkit/engine-cli": "workspace:*",
"@rivetkit/engine-envoy-protocol": "workspace:*",
"@rivetkit/on-change": "6.0.1",
"@rivetkit/rivetkit-napi": "workspace:*",
"@rivetkit/rivetkit-wasm": "workspace:*",
"@rivetkit/traces": "workspace:*",
Expand All @@ -192,10 +193,10 @@
"zod": "^4.1.0"
},
"devDependencies": {
"@biomejs/biome": "^2.3",
"@copilotkit/llmock": "^1.6.0",
"@rivet-dev/agent-os-common": "*",
"@rivet-dev/agent-os-pi": "^0.1.1",
"@biomejs/biome": "^2.3",
"@standard-schema/spec": "^1.0.0",
"@types/invariant": "^2",
"@types/node": "^22.13.1",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import {
import { assertUnreachable, stringifyError } from "@/common/utils";
import type { UniversalWebSocket } from "@/common/websocket-interface";
import type { EngineControlClient } from "@/engine-client/driver";
import type { CborSerializable } from "@/common/encoding";
import {
decodeCborCompat,
deserializeWithEncoding,
Expand Down Expand Up @@ -1269,7 +1270,7 @@ export class ActorConnRaw {
name: msg.body.val.name,
args: bufferToArrayBuffer(
encodeCborCompat(
msg.body.val.args,
msg.body.val.args as CborSerializable,
),
),
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import {
} from "@/common/client-protocol-zod";
import { deconstructError } from "@/common/utils";
import type { EngineControlClient } from "@/engine-client/driver";
import type { CborSerializable } from "@/common/encoding";
import { decodeCborCompat, deserializeWithEncoding, encodeCborCompat } from "@/serde";
import { bufferToArrayBuffer } from "@/utils";
import type {
Expand Down Expand Up @@ -332,7 +333,7 @@ export class ActorHandleRaw {
args,
}),
requestToBare: (args): protocol.HttpActionRequest => ({
args: bufferToArrayBuffer(encodeCborCompat(args)),
args: bufferToArrayBuffer(encodeCborCompat(args as CborSerializable)),
}),
responseFromJson: (json): Response => json.output as Response,
responseFromBare: (bare): Response =>
Expand Down
3 changes: 2 additions & 1 deletion rivetkit-typescript/packages/rivetkit/src/client/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
type HttpQueueSendResponse as HttpQueueSendResponseJson,
HttpQueueSendResponseSchema,
} from "@/common/client-protocol-zod";
import type { CborSerializable } from "@/common/encoding";
import { decodeCborCompat, encodeCborCompat } from "@/serde";
import { bufferToArrayBuffer } from "@/utils";
import { sendHttpRequest } from "./utils";
Expand Down Expand Up @@ -111,7 +112,7 @@ export function createQueueSender(
}),
requestToBare: (value): protocol.HttpQueueSendRequest => ({
name: value.name ?? name,
body: bufferToArrayBuffer(encodeCborCompat(value.body)),
body: bufferToArrayBuffer(encodeCborCompat(value.body as CborSerializable)),
wait: value.wait ?? false,
timeout:
value.timeout !== undefined ? BigInt(value.timeout) : null,
Expand Down
235 changes: 230 additions & 5 deletions rivetkit-typescript/packages/rivetkit/src/common/encoding.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,168 @@ const JSON_COMPAT_BIGINT = "$BigInt";
const JSON_COMPAT_ARRAY_BUFFER = "$ArrayBuffer";
const JSON_COMPAT_UINT8_ARRAY = "$Uint8Array";
const JSON_COMPAT_UNDEFINED = "$Undefined";
const JSON_COMPAT_SET = "$Set";

/**
* Recursive type representing all values that can be serialized via CBOR
* (cbor-x). Matches the supported CBOR tag set: primitives, BigInt (tags 2/3),
* Date (tags 0/1), Error (tag 27), TypedArrays (tags 64-82), ArrayBuffer,
* Map (tag 259), Set (custom $Set encoding), arrays, and plain objects.
*/
export type CborSerializable =
| string
| number
| boolean
| null
| undefined
| bigint
| Date
| Error
| ArrayBuffer
| Uint8Array
| Uint8ClampedArray
| Uint16Array
| Uint32Array
| BigUint64Array
| Int8Array
| Int16Array
| Int32Array
| BigInt64Array
| Float32Array
| Float64Array
| CborSerializable[]
| Map<CborSerializable, CborSerializable>
| Set<CborSerializable>
| { [key: string]: CborSerializable };

function isTypedArray(value: unknown): boolean {
return (
value instanceof Uint8ClampedArray ||
value instanceof Uint16Array ||
value instanceof Uint32Array ||
value instanceof BigUint64Array ||
value instanceof Int8Array ||
value instanceof Int16Array ||
value instanceof Int32Array ||
value instanceof BigInt64Array ||
value instanceof Float32Array ||
value instanceof Float64Array
);
}

/**
* Recursively validates that a value is CBOR serializable. Throws TypeError
* with a descriptive message for non-serializable values.
*/
export function assertCborSerializable(
value: unknown,
path = "",
): asserts value is CborSerializable {
if (
value === null ||
value === undefined ||
typeof value === "string" ||
typeof value === "number" ||
typeof value === "boolean" ||
typeof value === "bigint"
) {
return;
}

if (typeof value === "function") {
throw new TypeError(
`Value at ${path || "root"} is a function and is not CBOR serializable`,
);
}

if (typeof value === "symbol") {
throw new TypeError(
`Value at ${path || "root"} is a symbol and is not CBOR serializable`,
);
}

if (
value instanceof Date ||
value instanceof Error ||
value instanceof ArrayBuffer ||
value instanceof Uint8Array ||
isTypedArray(value)
) {
return;
}

if (value instanceof RegExp) {
throw new TypeError(
`Value at ${path || "root"} is a RegExp and is not CBOR serializable`,
);
}

if (value instanceof WeakMap) {
throw new TypeError(
`Value at ${path || "root"} is a WeakMap and is not CBOR serializable`,
);
}

if (value instanceof WeakSet) {
throw new TypeError(
`Value at ${path || "root"} is a WeakSet and is not CBOR serializable`,
);
}

if (value instanceof WeakRef) {
throw new TypeError(
`Value at ${path || "root"} is a WeakRef and is not CBOR serializable`,
);
}

if (value instanceof Promise) {
throw new TypeError(
`Value at ${path || "root"} is a Promise and is not CBOR serializable`,
);
}

if (value instanceof Map) {
for (const [k, v] of value.entries()) {
assertCborSerializable(k, `${path || "root"}.key(${String(k)})`);
assertCborSerializable(v, `${path || "root"}.value(${String(k)})`);
}
return;
}

if (value instanceof Set) {
let index = 0;
for (const item of value.values()) {
assertCborSerializable(item, `${path || "root"}.set[${index}]`);
index++;
}
return;
}

if (Array.isArray(value)) {
for (let i = 0; i < value.length; i++) {
assertCborSerializable(value[i], `${path || "root"}[${i}]`);
}
return;
}

if (isPlainObject(value)) {
for (const key in value) {
assertCborSerializable(
value[key as keyof typeof value],
path ? `${path}.${key}` : key,
);
}
return;
}

const typeName =
typeof value === "object" && value !== null
? value.constructor?.name ?? typeof value
: typeof value;
throw new TypeError(
`Value at ${path || "root"} of type "${typeName}" is not CBOR serializable`,
);
}

export const EncodingSchema = z.enum(["json", "cbor", "bare"]);

Expand Down Expand Up @@ -112,38 +274,95 @@ function isPlainObject(value: unknown): value is Record<string, unknown> {
return proto === Object.prototype || proto === null;
}

export function encodeJsonCompatValue(input: any): any {
export function encodeJsonCompatValue(input: CborSerializable): unknown {
// Primitives
if (input === null) {
return input;
}
if (input === undefined) {
return [JSON_COMPAT_UNDEFINED, 0];
}
if (
typeof input === "string" ||
typeof input === "number" ||
typeof input === "boolean"
) {
return input;
}
if (typeof input === "bigint") {
return [JSON_COMPAT_BIGINT, input.toString()];
}

// Binary types with custom encoding
if (input instanceof ArrayBuffer) {
return [JSON_COMPAT_ARRAY_BUFFER, base64EncodeArrayBuffer(input)];
}
if (input instanceof Uint8Array) {
return [JSON_COMPAT_UINT8_ARRAY, base64EncodeUint8Array(input)];
}

// TypedArrays pass through for cbor-x native handling
if (isTypedArray(input)) {
return input;
}

// Date and Error pass through for cbor-x native handling
if (input instanceof Date || input instanceof Error) {
return input;
}

// Set uses custom tag encoding
if (input instanceof Set) {
const encoded = [...input.values()].map((v) =>
encodeJsonCompatValue(v as CborSerializable),
);
return [JSON_COMPAT_SET, encoded];
}

// Map recurses into keys and values
if (input instanceof Map) {
const encoded = new Map<unknown, unknown>();
for (const [k, v] of input.entries()) {
encoded.set(
encodeJsonCompatValue(k as CborSerializable),
encodeJsonCompatValue(v as CborSerializable),
);
}
return encoded;
}

// Arrays
if (Array.isArray(input)) {
const encoded = input.map((value) => encodeJsonCompatValue(value));
const encoded = input.map((value) =>
encodeJsonCompatValue(value as CborSerializable),
);
if (
encoded.length === 2 &&
typeof encoded[0] === "string" &&
encoded[0].startsWith("$")
(encoded[0] as string).startsWith("$")
) {
return ["$" + encoded[0], encoded[1]];
}
return encoded;
}

// Plain objects
if (isPlainObject(input)) {
const encoded: Record<string, unknown> = {};
for (const [key, value] of Object.entries(input)) {
encoded[key] = encodeJsonCompatValue(value);
encoded[key] = encodeJsonCompatValue(value as CborSerializable);
}
return encoded;
}
return input;

// Not serializable
const typeName =
typeof input === "object" && input !== null
? (input as object).constructor?.name ?? typeof input
: typeof input;
throw new TypeError(
`Value of type "${typeName}" is not CBOR serializable`,
);
}

export interface JsonCompatReviveOptions {
Expand Down Expand Up @@ -182,6 +401,12 @@ export function reviveJsonCompatValue(
if (input[0] === JSON_COMPAT_UNDEFINED) {
return undefined;
}
if (input[0] === JSON_COMPAT_SET) {
const items = (input[1] as unknown[]).map((v) =>
reviveJsonCompatValue(v, options),
);
return new Set(items);
}
if (input[0].startsWith("$$")) {
return [
input[0].substring(1),
Expand Down
4 changes: 2 additions & 2 deletions rivetkit-typescript/packages/rivetkit/src/common/router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import {
HEADER_ACTOR_ID,
HEADER_ACTOR_KEY,
} from "@/common/actor-router-consts";
import type { Encoding } from "@/common/encoding";
import type { CborSerializable, Encoding } from "@/common/encoding";
import {
getRequestEncoding,
getRequestExposeInternalError,
Expand Down Expand Up @@ -111,7 +111,7 @@ export function handleRouteError(error: unknown, c: HonoContext) {
code: value.code,
message: value.message,
metadata: value.metadata
? bufferToArrayBuffer(encodeCborCompat(value.metadata))
? bufferToArrayBuffer(encodeCborCompat(value.metadata as CborSerializable))
: null,
actor: value.actor
? {
Expand Down
Loading
Loading