Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@ The runtime state schema is separately versioned as `RuntimeState.schemaVersion:

The domain boundary is explicit: `Decomposer` creates the concrete graph, `Scheduler` computes ready tasks, `WorkerRuntime` dispatches and heartbeats work, `Verifier` validates evidence refs, and `GateEngine` decides transitions.

Runtime events are append-only `schemaVersion: 1` envelopes with monotonic `seq`, stable `eventId`, `idempotencyKey`, `type`, `category`, timestamp, run id, and a typed payload. The event taxonomy covers run lifecycle, objective/policy decisions, graph readiness, claim/lease transitions, worker readiness/heartbeat/retention/safe-close, evidence, evaluation, repair, integration, and reward records. Replay applies events by ascending sequence into derived state; duplicate event ids must be byte-equivalent, stale leases and missing workers recover only through explicit events, and `run.sealed` is terminal.

## Workflow Map

### Lightweight Or Artifact-Backed Workflows
Expand Down
123 changes: 123 additions & 0 deletions src/domain/runtime-events.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
export type RuntimeEventType =
| "run.created" | "run.objective_updated" | "policy.candidate_recorded" | "policy.simulated" | "policy.selected"
| "task_graph.created" | "task.ready" | "task.claimed" | "task.lease_renewed" | "task.lease_expired" | "task.transitioned"
| "worker.provisioned" | "worker.ready" | "worker.heartbeat" | "worker.retained" | "worker.cleanup_released"
| "evidence.recorded" | "evaluation.recorded" | "repair.created" | "integration.attempted" | "reward.recorded" | "run.sealed";

export type RuntimeEventCategory = "run" | "objective" | "policy" | "task" | "worker" | "evidence" | "evaluation" | "repair" | "integration" | "reward";
export type RuntimeEventReplayAction = "create-run" | "upsert-objective" | "record-policy" | "upsert-task" | "upsert-worker" | "record-evidence" | "record-evaluation" | "record-repair" | "record-integration" | "record-reward" | "seal-run";

export interface RuntimeEventEnvelope {
schemaVersion: 1;
eventId: string;
runId: string;
type: RuntimeEventType;
category: RuntimeEventCategory;
at: string;
seq: number;
idempotencyKey: string;
payload: Record<string, unknown>;
}

export interface RuntimeEventSpec {
type: RuntimeEventType;
category: RuntimeEventCategory;
replay: RuntimeEventReplayAction;
requiredPayload: readonly string[];
terminal?: true;
}

export const RUNTIME_EVENT_SCHEMA_VERSION = 1;

export const RUNTIME_EVENT_SPECS: readonly RuntimeEventSpec[] = [
spec("run.created", "run", "create-run", ["goal"]),
spec("run.objective_updated", "objective", "upsert-objective", ["objectiveRef"]),
spec("policy.candidate_recorded", "policy", "record-policy", ["policyCandidateRef"]),
spec("policy.simulated", "policy", "record-policy", ["policyCandidateRef", "simulationRef"]),
spec("policy.selected", "policy", "record-policy", ["policySelectionRef"]),
spec("task_graph.created", "task", "upsert-task", ["taskGraphRef"]),
spec("task.ready", "task", "upsert-task", ["taskId", "reason"]),
spec("task.claimed", "task", "upsert-task", ["taskId", "workerId", "claimToken", "leaseExpiresAt"]),
spec("task.lease_renewed", "task", "upsert-task", ["taskId", "claimToken", "leaseExpiresAt"]),
spec("task.lease_expired", "task", "upsert-task", ["taskId", "claimToken", "recoveryPolicy"]),
spec("task.transitioned", "task", "upsert-task", ["taskId", "from", "to"]),
spec("worker.provisioned", "worker", "upsert-worker", ["workerId", "adapterId"]),
spec("worker.ready", "worker", "upsert-worker", ["workerId", "readinessNonce"]),
spec("worker.heartbeat", "worker", "upsert-worker", ["workerId"]),
spec("worker.retained", "worker", "upsert-worker", ["workerId", "reason"]),
spec("worker.cleanup_released", "worker", "upsert-worker", ["workerId", "safeCloseRef"]),
spec("evidence.recorded", "evidence", "record-evidence", ["evidenceRef"]),
spec("evaluation.recorded", "evaluation", "record-evaluation", ["evaluationRef", "verdict"]),
spec("repair.created", "repair", "record-repair", ["sourceTaskId", "repairTaskId", "reason"]),
spec("integration.attempted", "integration", "record-integration", ["integrationCandidateRef", "dryRunRef"]),
spec("reward.recorded", "reward", "record-reward", ["rewardRef"]),
spec("run.sealed", "run", "seal-run", ["finalReportRef"], true),
];

export const RUNTIME_EVENT_REPLAY_RULES = [
"Apply events by ascending seq; equal seq/eventId replay is invalid unless idempotencyKey and payload match an already-applied event.",
"Replay is additive: events upsert derived run/task/worker projections and append evidence, evaluation, reward, repair, and integration ledgers.",
"task.lease_expired is explicit recovery evidence; stale leases are recoverable only after this event and never by destructive cleanup.",
"worker.heartbeat updates liveness only; missing heartbeat becomes worker.retained or task.lease_expired through explicit recovery events.",
"run.sealed is terminal: later non-run.sealed events for the same run are invalid and must not mutate projected state.",
] as const;

export function validateRuntimeEvents(events: readonly unknown[]): { ok: boolean; issues: string[] } {
const issues: string[] = [];
const seen = new Map<string, string>();
const seenSeq = new Map<number, { eventId: string; fingerprint: string }>();
let lastSeq = 0;
let sealedAtSeq: number | undefined;
for (let index = 0; index < events.length; index += 1) {
const event = events[index];
const prefix = `events[${index}]`;
if (!isRecord(event)) { issues.push(`${prefix} must be an object`); continue; }
validateEvent(event, prefix, issues);
const eventId = typeof event.eventId === "string" ? event.eventId : "";
const fingerprint = JSON.stringify(event);
if (eventId) {
const prior = seen.get(eventId);
if (prior && prior !== fingerprint) issues.push(`${prefix}.eventId duplicates with different payload`);
seen.set(eventId, fingerprint);
}
const seq = typeof event.seq === "number" && Number.isInteger(event.seq) ? event.seq : undefined;
if (seq !== undefined) {
const priorSeq = seenSeq.get(seq);
if (seq < lastSeq) issues.push(`${prefix}.seq must be monotonic`);
if (priorSeq && (priorSeq.eventId !== eventId || priorSeq.fingerprint !== fingerprint)) issues.push(`${prefix}.seq duplicates with different event`);
if (!priorSeq) seenSeq.set(seq, { eventId, fingerprint });
if (seq > lastSeq) lastSeq = seq;
}
if (sealedAtSeq !== undefined && event.type !== "run.sealed") issues.push(`${prefix} occurs after terminal run.sealed`);
if (event.type === "run.sealed" && typeof event.seq === "number") sealedAtSeq = event.seq;
}
return { ok: issues.length === 0, issues };
}

export function runtimeEventCatalogMarkdown(): string {
return RUNTIME_EVENT_SPECS.map((item) => `- \`${item.type}\` (${item.category}) payload: ${item.requiredPayload.join(", ")}; replay: ${item.replay}${item.terminal ? "; terminal" : ""}`).join("\n");
}

const specsByType = new Map<RuntimeEventType, RuntimeEventSpec>(RUNTIME_EVENT_SPECS.map((item) => [item.type, item]));

function validateEvent(event: Record<string, unknown>, prefix: string, issues: string[]): void {
if (event.schemaVersion !== RUNTIME_EVENT_SCHEMA_VERSION) issues.push(`${prefix}.schemaVersion must be 1`);
needText(event.eventId, `${prefix}.eventId`, issues); needText(event.runId, `${prefix}.runId`, issues); needText(event.idempotencyKey, `${prefix}.idempotencyKey`, issues);
if (typeof event.seq !== "number" || !Number.isInteger(event.seq) || event.seq < 1) issues.push(`${prefix}.seq must be a positive integer`);
if (typeof event.at !== "string" || !Number.isFinite(Date.parse(event.at))) issues.push(`${prefix}.at must be a timestamp`);
if (!isRecord(event.payload)) issues.push(`${prefix}.payload must be an object`);
const specForType = typeof event.type === "string" ? specsByType.get(event.type as RuntimeEventType) : undefined;
if (!specForType) { issues.push(`${prefix}.type is unsupported`); return; }
if (event.category !== specForType.category) issues.push(`${prefix}.category must be ${specForType.category}`);
if (!isRecord(event.payload)) return;
for (const key of specForType.requiredPayload) needText(event.payload[key], `${prefix}.payload.${key}`, issues);
if ((event.type === "task.claimed" || event.type === "task.lease_renewed") && typeof event.payload.leaseExpiresAt === "string" && !Number.isFinite(Date.parse(event.payload.leaseExpiresAt))) issues.push(`${prefix}.payload.leaseExpiresAt must be a timestamp`);
}

function spec(type: RuntimeEventType, category: RuntimeEventCategory, replay: RuntimeEventReplayAction, requiredPayload: readonly string[], terminal?: true): RuntimeEventSpec {
return { type, category, replay, requiredPayload, ...(terminal ? { terminal } : {}) };
}
function needText(value: unknown, label: string, issues: string[]): void {
if (typeof value !== "string" || !value.trim()) issues.push(`${label} is required`);
}
function isRecord(value: unknown): value is Record<string, unknown> { return typeof value === "object" && value !== null && !Array.isArray(value); }
47 changes: 47 additions & 0 deletions test/runtime-events.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import * as assert from "node:assert/strict";
import { test } from "node:test";
import { RUNTIME_EVENT_REPLAY_RULES, RUNTIME_EVENT_SPECS, runtimeEventCatalogMarkdown, validateRuntimeEvents, type RuntimeEventEnvelope } from "../src/domain/runtime-events.js";

const at = "2026-01-01T00:00:00.000Z";
function event(overrides: Partial<RuntimeEventEnvelope> = {}): RuntimeEventEnvelope {
return { schemaVersion: 1, eventId: "e1", runId: "run-1", type: "task.claimed", category: "task", at, seq: 1, idempotencyKey: "run-1/task-1/claim", payload: { taskId: "task-1", workerId: "worker-1", claimToken: "claim-1", leaseExpiresAt: "2026-01-01T00:05:00.000Z" }, ...overrides };
}

test("runtime event catalog covers MVP categories and replay rules", () => {
assert.ok(RUNTIME_EVENT_SPECS.some((item) => item.type === "run.sealed" && item.terminal));
assert.ok(RUNTIME_EVENT_SPECS.some((item) => item.type === "task.lease_expired" && item.requiredPayload.includes("recoveryPolicy")));
assert.ok(RUNTIME_EVENT_SPECS.some((item) => item.type === "worker.cleanup_released"));
assert.ok(runtimeEventCatalogMarkdown().includes("`policy.simulated`"));
assert.ok(RUNTIME_EVENT_REPLAY_RULES.some((rule) => rule.includes("run.sealed is terminal")));
});

test("runtime events validate required envelope and payload surfaces", () => {
assert.deepEqual(validateRuntimeEvents([event()]), { ok: true, issues: [] });
const malformed = validateRuntimeEvents([{ ...event(), schemaVersion: 2, seq: 0, category: "worker", payload: { taskId: "task-1", workerId: "worker-1", claimToken: "claim-1", leaseExpiresAt: "not-time" } }]);
assert.equal(malformed.ok, false);
assert.ok(malformed.issues.includes("events[0].schemaVersion must be 1"));
assert.ok(malformed.issues.includes("events[0].seq must be a positive integer"));
assert.ok(malformed.issues.includes("events[0].category must be task"));
assert.ok(malformed.issues.includes("events[0].payload.leaseExpiresAt must be a timestamp"));
});

test("runtime events reject conflicting duplicate ids and mutation after seal", () => {
const sealed = event({ eventId: "e2", type: "run.sealed", category: "run", seq: 2, idempotencyKey: "seal", payload: { finalReportRef: "final.md" } });
const afterSeal = event({ eventId: "e3", seq: 3 });
const duplicate = event({ payload: { taskId: "other", workerId: "worker-1", claimToken: "claim-1", leaseExpiresAt: "2026-01-01T00:05:00.000Z" } });
const result = validateRuntimeEvents([event(), duplicate, sealed, afterSeal]);
assert.equal(result.ok, false);
assert.ok(result.issues.includes("events[1].eventId duplicates with different payload"));
assert.ok(result.issues.includes("events[1].seq duplicates with different event"));
assert.ok(result.issues.includes("events[3] occurs after terminal run.sealed"));
});

test("runtime events enforce monotonic seq while allowing exact idempotent duplicates", () => {
const first = event();
assert.deepEqual(validateRuntimeEvents([first, first]), { ok: true, issues: [] });
const sameSeqDifferentId = event({ eventId: "e2", idempotencyKey: "other" });
assert.ok(validateRuntimeEvents([first, sameSeqDifferentId]).issues.includes("events[1].seq duplicates with different event"));
const outOfOrder = event({ eventId: "e3", seq: 1, idempotencyKey: "late" });
const result = validateRuntimeEvents([event({ seq: 2 }), outOfOrder]);
assert.ok(result.issues.includes("events[1].seq must be monotonic"));
});
Loading