Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions js/dev/server.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import { describe, expect, test, vi } from "vitest";
import {
buildCompletionWebhookPayload,
dispatchCompletionWebhook,
} from "./server";

const summary = {
projectName: "completion-webhook-test",
experimentName: "completion-webhook-test-exp",
scores: {
exact_match: {
name: "exact_match",
score: 1,
},
},
};

describe("completion webhook delivery", () => {
test("builds expected payload shape", async () => {
const payload = buildCompletionWebhookPayload(summary);

expect(payload.event).toBe("experiment.completed");
expect(payload.summary.projectName).toBe("completion-webhook-test");
expect(payload.experiment.projectName).toBe("completion-webhook-test");
expect(payload.timestamp).toMatch(/T/);
});

test("retries transient failures and succeeds", async () => {
const sleep = vi.fn(async (_ms: number) => {});
const fetchImpl = vi
.fn<typeof fetch>()
.mockRejectedValueOnce(new Error("network"))
.mockResolvedValueOnce(new Response("bad", { status: 500 }))
.mockResolvedValueOnce(new Response("ok", { status: 200 }));

await dispatchCompletionWebhook("https://example.com/webhook", summary, {
fetchImpl,
sleep,
timeoutMs: 5,
attempts: 3,
backoffMs: [1, 2, 4],
});

expect(fetchImpl).toHaveBeenCalledTimes(3);
expect(sleep).toHaveBeenCalledTimes(2);
});

test("throws after final failure", async () => {
const sleep = vi.fn(async (_ms: number) => {});
const fetchImpl = vi
.fn<typeof fetch>()
.mockResolvedValue(new Response("bad", { status: 500 }));

await expect(
dispatchCompletionWebhook("https://example.com/webhook", summary, {
fetchImpl,
sleep,
timeoutMs: 5,
attempts: 3,
backoffMs: [1, 2, 4],
}),
).rejects.toThrow("status 500");

expect(fetchImpl).toHaveBeenCalledTimes(3);
expect(sleep).toHaveBeenCalledTimes(2);
});
});
152 changes: 152 additions & 0 deletions js/dev/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,141 @@ export interface DevServerOpts {
orgName?: string;
}

const WEBHOOK_ATTEMPTS = 3;
const WEBHOOK_BACKOFF_MS = [1000, 2000, 4000];
const WEBHOOK_TIMEOUT_MS = 10000;

type CompletionWebhookPayload = {
event: "experiment.completed";
summary: Record<string, unknown>;
experiment: {
projectId?: string;
projectName?: string;
projectUrl?: string;
experimentId?: string;
experimentName?: string;
experimentUrl?: string;
};
timestamp: string;
};

type SerializableSummary = ExperimentSummary | Record<string, unknown>;

type DispatchCompletionWebhookOptions = {
attempts?: number;
backoffMs?: number[];
timeoutMs?: number;
fetchImpl?: typeof fetch;
sleep?: (ms: number) => Promise<void>;
};

function pickString(record: Record<string, unknown>, keys: string[]) {
for (const key of keys) {
const value = record[key];
if (typeof value === "string" && value.length > 0) {
return value;
}
}
return undefined;
}

function sleep(ms: number) {
return new Promise<void>((resolve) => {
setTimeout(resolve, ms);
});
}

function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === "object" && value !== null && !Array.isArray(value);
}

function serializeSummary(summary: SerializableSummary): Record<string, unknown> {
const serializedSummary = JSON.parse(JSON.stringify(summary));
if (isRecord(serializedSummary)) {
return serializedSummary;
}
return {};
}

export function buildCompletionWebhookPayload(summary: SerializableSummary) {
const summaryObject = serializeSummary(summary);
return {
event: "experiment.completed",
summary: summaryObject,
experiment: {
projectId: pickString(summaryObject, ["projectId", "project_id"]),
projectName: pickString(summaryObject, ["projectName", "project_name"]),
projectUrl: pickString(summaryObject, ["projectUrl", "project_url"]),
experimentId: pickString(summaryObject, [
"experimentId",
"experiment_id",
]),
experimentName: pickString(summaryObject, [
"experimentName",
"experiment_name",
]),
experimentUrl: pickString(summaryObject, [
"experimentUrl",
"experiment_url",
]),
},
timestamp: new Date().toISOString(),
} satisfies CompletionWebhookPayload;
}

export async function dispatchCompletionWebhook(
webhookUrl: string,
summary: SerializableSummary,
options: DispatchCompletionWebhookOptions = {},
) {
const attempts = options.attempts ?? WEBHOOK_ATTEMPTS;
const backoffMs = options.backoffMs ?? WEBHOOK_BACKOFF_MS;
const timeoutMs = options.timeoutMs ?? WEBHOOK_TIMEOUT_MS;
const fetchImpl = options.fetchImpl ?? fetch;
const sleepFn = options.sleep ?? sleep;
const payload = buildCompletionWebhookPayload(summary);
let lastError: Error | null = null;

for (let attempt = 1; attempt <= attempts; attempt++) {
const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), timeoutMs);
try {
const response = await fetchImpl(webhookUrl, {
method: "POST",
headers: {
"content-type": "application/json",
},
body: JSON.stringify(payload),
signal: controller.signal,
});

if (response.ok) {
return;
}

lastError = new Error(
`Webhook request failed with status ${response.status}`,
);
} catch (error) {
lastError =
error instanceof Error ? error : new Error(`Webhook failed: ${error}`);
} finally {
clearTimeout(timeoutId);
}

if (attempt < attempts) {
const delayMs = backoffMs[Math.min(attempt - 1, backoffMs.length - 1)];
if (delayMs > 0) {
await sleepFn(delayMs);
}
}
}

if (lastError) {
throw lastError;
}
}

export function runDevServer(
// eslint-disable-next-line @typescript-eslint/no-explicit-any
evaluators: EvaluatorDef<any, any, any, any, any>[],
Expand Down Expand Up @@ -137,6 +272,7 @@ export function runDevServer(
parent,
experiment_name,
project_id,
on_complete_webhook,
data,
scores,
stream,
Expand Down Expand Up @@ -253,6 +389,22 @@ export function runDevServer(
);
}
},
onComplete: async (completionSummary) => {
if (!on_complete_webhook) {
return;
}
try {
await dispatchCompletionWebhook(
on_complete_webhook,
completionSummary,
);
} catch (error) {
console.error(
`Failed to deliver completion webhook to ${on_complete_webhook}`,
error,
);
}
},
parent: parseParent(parent),
parameters: parameters ?? {},
},
Expand Down
1 change: 1 addition & 0 deletions js/dev/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ export const evalBodySchema = z.object({
.nullish(),
experiment_name: z.string().nullish(),
project_id: z.string().nullish(),
on_complete_webhook: z.string().nullish(),
parent: invokeParentSchema.optional(),
stream: z.boolean().optional(),
});
Expand Down
81 changes: 81 additions & 0 deletions js/src/framework.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -676,6 +676,87 @@ test("Eval with returnResults: true collects all results", async () => {
expect(result.summary.scores.exact_match.score).toBe(1);
});

test("Eval onComplete is called exactly once on success", async () => {
const onComplete = vi.fn();
const result = await Eval(
"test-on-complete",
{
data: [{ input: "hello", expected: "hello world" }],
task: (input) => input + " world",
scores: [
(args) => ({
name: "exact_match",
score: args.output === args.expected ? 1 : 0,
}),
],
},
{ noSendLogs: true, onComplete },
);

expect(onComplete).toHaveBeenCalledTimes(1);
expect(onComplete).toHaveBeenCalledWith(result.summary);
});

test("Eval onComplete runs after flush", async () => {
await _exportsForTestingOnly.simulateLoginForTests();

_exportsForTestingOnly.useTestBackgroundLogger();

const evaluatorState = new BraintrustState({
apiKey: "test-api-key",
appUrl: "https://example.com",
});
const evaluatorMemoryLogger = new TestBackgroundLogger();
evaluatorState.setOverrideBgLogger(evaluatorMemoryLogger);

const logger = initLogger({ projectName: "test", projectId: "pid" });
const span = logger.startSpan({ name: "parent-span" });
const parentStr = await span.export();
span.end();

const evaluatorFlushSpy = vi.spyOn(evaluatorMemoryLogger, "flush");
const onComplete = vi.fn(() => {
expect(evaluatorFlushSpy).toHaveBeenCalled();
});

await Eval(
"test-parent-flush-on-complete",
{
data: [{ input: 1, expected: 2 }],
task: (input) => input * 2,
scores: [],
state: evaluatorState,
},
{ parent: parentStr, onComplete },
);

expect(onComplete).toHaveBeenCalledTimes(1);

_exportsForTestingOnly.clearTestBackgroundLogger();
_exportsForTestingOnly.simulateLogoutForTests();
});

test("Eval behavior is unchanged when onComplete is omitted", async () => {
const result = await Eval(
"test-no-on-complete",
{
data: [{ input: "foo", expected: "foo bar" }],
task: (input) => input + " bar",
scores: [
(args) => ({
name: "exact_match",
score: args.output === args.expected ? 1 : 0,
}),
],
},
{ noSendLogs: true },
);

expect(result.results).toHaveLength(1);
expect(result.summary.projectName).toBe("test-no-on-complete");
expect(result.summary.scores.exact_match.score).toBe(1);
});

test("tags can be appended and logged to root span", async () => {
await _exportsForTestingOnly.simulateLoginForTests();
const memoryLogger = _exportsForTestingOnly.useTestBackgroundLogger();
Expand Down
17 changes: 16 additions & 1 deletion js/src/framework.ts
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,11 @@ export interface EvalOptions<EvalReport, Parameters extends EvalParameters> {
* @param metadata
*/
onStart?: (metadata: Omit<ExperimentSummary, "scores" | "metrics">) => void;
/**
* A callback function that is called once after the eval has completed and spans are flushed.
* @param summary
*/
onComplete?: (summary: ExperimentSummary) => void | Promise<void>;
/**
* A function that will be called with progress events, which can be used to
* display intermediate progress.
Expand Down Expand Up @@ -683,6 +688,8 @@ export async function Eval<
}

const resolvedReporter = options.reporter || defaultReporter;
let result: EvalResultWithSummary<Input, Output, Expected, Metadata> | null =
null;
try {
const { data, baseExperiment: defaultBaseExperiment } = callEvaluatorData(
evaluator.data,
Expand Down Expand Up @@ -770,14 +777,22 @@ export async function Eval<
verbose: true,
jsonl: false,
});
return ret;
result = ret;
} finally {
if (experiment) {
await experiment.flush().catch(console.error);
} else if (options.parent) {
await flush({ state: evaluator.state }).catch(console.error);
}
}

if (!result) {
throw new Error("Eval completed without a result");
}
if (options.onComplete) {
await options.onComplete(result.summary);
}
return result;
} finally {
progressReporter.stop();
}
Expand Down
Loading