Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -191,4 +191,5 @@ Data structures often include:

### Comments

Documenting deltas is not important or useful. A developer who has never worked on the project will not gain extra information if you add a comment stating that something was removed or changed because they don't know what was there before. The only time you would be adding a comment for something NOT being there is if its unintuitive for why its not there in the first place.
- Write comments as normal, complete sentences. Avoid fragmented structures with parentheticals and dashes like `// Spawn engine (if configured) - regardless of start kind`. Instead, write `// Spawn the engine if configured`. Especially avoid dashes (hyphens are OK).
- Documenting deltas is not important or useful. A developer who has never worked on the project will not gain extra information if you add a comment stating that something was removed or changed because they don't know what was there before. The only time you would be adding a comment for something NOT being there is if its unintuitive for why its not there in the first place.
273 changes: 146 additions & 127 deletions rivetkit-typescript/packages/rivetkit/runtime/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,30 @@ import { logger } from "../src/registry/log";
import { crossPlatformServe, findFreePort } from "@/registry/serve";
import { ManagerDriver } from "@/manager/driver";
import { buildServerlessRouter } from "@/serverless/router";
import { Registry } from "@/registry";
import type { Registry } from "@/registry";

/**
* Defines what type of server is being started. Used internally for
* Registry.#start
**/
/** Tracks whether the runtime was started as serverless or runner. */
export type StartKind = "serverless" | "runner";

/**
* Manages the lifecycle of RivetKit.
*
* Startup happens in two phases:
* 1. `Runtime.create()` initializes shared infrastructure like the manager
* server and engine process. This runs before we know the deployment mode.
* 2. `startServerless()` or `startRunner()` configures mode-specific behavior.
* These are idempotent and called lazily when the first request arrives
* or when explicitly starting a runner.
*/
export class Runtime<A extends RegistryActors> {
#registry: Registry<A>;
managerPort?: number;
#config: RegistryConfig;
#driver: DriverConfig;
#kind: StartKind;
#managerDriver: ManagerDriver;
#startKind?: StartKind;

managerPort?: number;
#serverlessRouter?: ReturnType<typeof buildServerlessRouter>["router"];

get config() {
return this.#config;
Expand All @@ -45,185 +54,195 @@ export class Runtime<A extends RegistryActors> {
return this.#managerDriver;
}

#serverlessRouter?: ReturnType<typeof buildServerlessRouter>["router"];

constructor(registry: Registry<A>, kind: StartKind) {
/** Use Runtime.create() instead */
private constructor(
registry: Registry<A>,
config: RegistryConfig,
driver: DriverConfig,
managerDriver: ManagerDriver,
managerPort?: number,
) {
this.#registry = registry;
this.#kind = kind;

const config = this.#registry.parseConfig();
this.#config = config;
this.#driver = driver;
this.#managerDriver = managerDriver;
this.managerPort = managerPort;
}

static async create<A extends RegistryActors>(
registry: Registry<A>,
): Promise<Runtime<A>> {
logger().info("rivetkit starting");

// Promise for any async operations we need to wait to complete
const readyPromises: Promise<unknown>[] = [];
const config = registry.parseConfig();

// Configure logger
if (config.logging?.baseLogger) {
// Use provided base logger
configureBaseLogger(config.logging.baseLogger);
} else {
// Configure default logger with log level from config getPinoLevel
// will handle env variable priority
configureDefaultLogger(config.logging?.level);
}

// Handle spawnEngine before choosing driver
// Start engine
invariant(
!(
kind === "serverless" &&
config.serverless.spawnEngine &&
config.serveManager
),
"cannot specify spawnEngine and serveManager together",
!(config.serverless.spawnEngine && config.serveManager),
"cannot specify both spawnEngine and serveManager",
);

if (kind === "serverless" && config.serverless.spawnEngine) {
this.managerPort = ENGINE_PORT;
const driver = chooseDefaultDriver(config);
const managerDriver = driver.manager(config);

// Start main server. This is either:
// - Manager: Run a server in-process on port 6420 that mimics the
// engine's API for development.
// - Engine: Download and run the full Rivet engine binary on port
// 6420. This is a fallback for platforms that cannot use the manager
// like Next.js.
//
// We do this before startServerless or startRunner has been called
// since the engine API needs to be available on port 6420 before
// anything else happens. For example, serverless platforms use
// `registry.handler(req)` so `startServerless` is called lazily.
// Starting the server preemptively allows for clients to reach 6420
// BEFORE `startServerless` is called.
let managerPort: number | undefined;
if (config.serverless.spawnEngine) {
invariant(
!config.serveManager,
"cannot specify spawnEngine and serveManager together",
);

managerPort = ENGINE_PORT;
logger().debug({
msg: "run engine requested",
msg: "spawning engine",
version: config.serverless.engineVersion,
});

// Start the engine
const engineProcessPromise = ensureEngineProcess({
await ensureEngineProcess({
version: config.serverless.engineVersion,
});

// Chain ready promise
readyPromises.push(engineProcessPromise);
}

// Choose the driver based on configuration
const driver = chooseDefaultDriver(config);

// Create manager driver (always needed for actor driver + inline client)
const managerDriver = driver.manager(config);

// Start manager
if (config.serveManager) {
// Configure getUpgradeWebSocket lazily so we can assign it in crossPlatformServe
} else if (config.serveManager) {
let upgradeWebSocket: any;
const getUpgradeWebSocket: GetUpgradeWebSocket = () =>
upgradeWebSocket;
managerDriver.setGetUpgradeWebSocket(getUpgradeWebSocket);

// Build router
const { router: managerRouter } = buildManagerRouter(
config,
managerDriver,
getUpgradeWebSocket,
);

// Serve manager
const serverPromise = (async () => {
const managerPort = await findFreePort(config.managerPort);
this.managerPort = managerPort;

const out = await crossPlatformServe(
config,
managerPort,
managerRouter,
);
upgradeWebSocket = out.upgradeWebSocket;
})();
readyPromises.push(serverPromise);
}

// Build serverless router
if (kind === "serverless") {
this.#serverlessRouter = buildServerlessRouter(
driver,
managerPort = await findFreePort(config.managerPort);
const out = await crossPlatformServe(
config,
).router;
managerPort,
managerRouter,
);
upgradeWebSocket = out.upgradeWebSocket;
}

this.#driver = driver;
this.#managerDriver = managerDriver;

// Log and print welcome after all ready promises complete
// biome-ignore lint/nursery/noFloatingPromises: bg promise
Promise.all(readyPromises).then(async () => this.#onAfterReady());
}

async #onAfterReady() {
const config = this.#config;
const kind = this.#kind;
const driver = this.#driver;
const managerDriver = this.#managerDriver;

// Auto-start actor driver for drivers that require it.
//
// This is only enabled for runner config since serverless will
// auto-start the actor driver on `GET /start`.
if (kind === "runner" && config.runner && driver.autoStartActorDriver) {
logger().debug("starting actor driver");
const inlineClient =
createClientWithDriver<Registry<A>>(managerDriver);
driver.actor(config, managerDriver, inlineClient);
}
// Create runtime
const runtime = new Runtime(
registry,
config,
driver,
managerDriver,
managerPort,
);

// Log starting
// Log ready
const driverLog = managerDriver.extraStartupLog?.() ?? {};
logger().info({
msg: "rivetkit ready",
driver: driver.name,
definitions: Object.keys(config.use).length,
...driverLog,
});
invariant(this.managerPort, "managerPort should be set");
const inspectorUrl = getInspectorUrl(config, this.managerPort);
if (inspectorUrl && config.inspector.enabled) {
logger().info({
msg: "inspector ready",
url: inspectorUrl,
});

return runtime;
}

startServerless(): void {
if (this.#startKind === "serverless") return;
invariant(!this.#startKind, "Runtime already started as runner");
this.#startKind = "serverless";

this.#serverlessRouter = buildServerlessRouter(
this.#driver,
this.#config,
).router;

this.#printWelcome();

if (this.#config.serverless.configureRunnerPool) {
// biome-ignore lint/nursery/noFloatingPromises: intentional
configureServerlessRunner(this.#config);
}
}

startRunner(): void {
if (this.#startKind === "runner") return;
invariant(!this.#startKind, "Runtime already started as serverless");
this.#startKind = "runner";

if (this.#config.runner && this.#driver.autoStartActorDriver) {
logger().debug("starting actor driver");
const inlineClient = createClientWithDriver<Registry<A>>(
this.#managerDriver,
);
this.#driver.actor(this.#config, this.#managerDriver, inlineClient);
}

// Print welcome information
if (!config.noWelcome) {
console.log();
console.log(` RivetKit ${pkg.version} (${driver.displayName})`);
// Only show endpoint if manager is running or engine is spawned
this.#printWelcome();
}

#printWelcome(): void {
if (this.#config.noWelcome) return;

const inspectorUrl = this.managerPort
? getInspectorUrl(this.#config, this.managerPort)
: undefined;

console.log();
console.log(` RivetKit ${pkg.version} (${this.#driver.displayName})`);

if (this.#startKind === "serverless") {
const shouldShowEndpoint =
config.serveManager ||
(kind === "serverless" && config.serverless.spawnEngine);
this.#config.serveManager ||
this.#config.serverless.spawnEngine;
if (
kind === "serverless" &&
config.serverless.advertiseEndpoint &&
this.#config.serverless.advertiseEndpoint &&
shouldShowEndpoint
) {
console.log(
` - Endpoint: ${config.serverless.advertiseEndpoint}`,
` - Endpoint: ${this.#config.serverless.advertiseEndpoint}`,
);
}
if (kind === "serverless" && config.serverless.spawnEngine) {
if (this.#config.serverless.spawnEngine) {
const padding = " ".repeat(Math.max(0, 13 - "Engine".length));
console.log(
` - Engine:${padding}v${config.serverless.engineVersion}`,
` - Engine:${padding}v${this.#config.serverless.engineVersion}`,
);
}
const displayInfo = managerDriver.displayInformation();
for (const [k, v] of Object.entries(displayInfo.properties)) {
const padding = " ".repeat(Math.max(0, 13 - k.length));
console.log(` - ${k}:${padding}${v}`);
}
if (inspectorUrl && config.inspector.enabled) {
console.log(` - Inspector: ${inspectorUrl}`);
}
console.log();
}

// Configure serverless runner if enabled when actor driver is disabled
if (kind === "serverless" && config.serverless.configureRunnerPool) {
await configureServerlessRunner(config);
const displayInfo = this.#managerDriver.displayInformation();
for (const [k, v] of Object.entries(displayInfo.properties)) {
const padding = " ".repeat(Math.max(0, 13 - k.length));
console.log(` - ${k}:${padding}${v}`);
}

if (inspectorUrl && this.#config.inspector.enabled) {
console.log(` - Inspector: ${inspectorUrl}`);
}
console.log();
}

public handler(request: Request): Response | Promise<Response> {
invariant(this.#kind === "serverless", "kind not serverless");
invariant(this.#serverlessRouter, "missing serverless router");
/** Handle serverless request */
handleServerlessRequest(request: Request): Response | Promise<Response> {
invariant(
this.#startKind === "serverless",
"not started as serverless",
);
invariant(this.#serverlessRouter, "serverless router not initialized");
return this.#serverlessRouter.fetch(request);
}
}
Loading
Loading