Skip to content

Commit 56603be

Browse files
committed
Add warmTimeoutInSeconds option
1 parent d51ed95 commit 56603be

File tree

3 files changed

+53
-19
lines changed

3 files changed

+53
-19
lines changed

packages/trigger-sdk/src/v3/ai.ts

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -406,6 +406,18 @@ export type ChatTaskOptions<TIdentifier extends string> = Omit<
406406
* @default "1h"
407407
*/
408408
turnTimeout?: string;
409+
410+
/**
411+
* How long (in seconds) to keep the run warm after each turn before suspending.
412+
* During this window the run stays active and can respond instantly to the
413+
* next message. After this timeout, the run suspends (frees compute) and waits
414+
* via `inputStream.wait()`.
415+
*
416+
* Set to `0` to suspend immediately after each turn.
417+
*
418+
* @default 30
419+
*/
420+
warmTimeoutInSeconds?: number;
409421
};
410422

411423
/**
@@ -438,7 +450,13 @@ export type ChatTaskOptions<TIdentifier extends string> = Omit<
438450
function chatTask<TIdentifier extends string>(
439451
options: ChatTaskOptions<TIdentifier>
440452
): Task<TIdentifier, ChatTaskPayload, unknown> {
441-
const { run: userRun, maxTurns = 100, turnTimeout = "1h", ...restOptions } = options;
453+
const {
454+
run: userRun,
455+
maxTurns = 100,
456+
turnTimeout = "1h",
457+
warmTimeoutInSeconds = 30,
458+
...restOptions
459+
} = options;
442460

443461
return createTask<TIdentifier, ChatTaskPayload, unknown>({
444462
...restOptions,
@@ -512,15 +530,29 @@ function chatTask<TIdentifier extends string>(
512530
continue;
513531
}
514532

515-
// Suspend the task (frees compute) until the next message arrives
533+
// Phase 1: Keep the run warm for quick response to the next message.
534+
// The run stays active (using compute) during this window.
535+
if (warmTimeoutInSeconds > 0) {
536+
const warm = await messagesInput.once({
537+
timeoutMs: warmTimeoutInSeconds * 1000,
538+
});
539+
540+
if (warm.ok) {
541+
// Message arrived while warm — respond instantly
542+
currentPayload = warm.output;
543+
continue;
544+
}
545+
}
546+
547+
// Phase 2: Suspend the task (frees compute) until the next message arrives
516548
const next = await messagesInput.wait({ timeout: turnTimeout });
517549

518550
if (!next.ok) {
519551
// Timed out waiting for the next message — end the conversation
520552
return;
521553
}
522554

523-
currentPayload = next.output as ChatTaskPayload;
555+
currentPayload = next.output;
524556
}
525557
} finally {
526558
stopSub.off();

packages/trigger-sdk/src/v3/streams.ts

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -750,23 +750,20 @@ function input<TData>(opts: { id: string }): RealtimeDefinedInputStream<TData> {
750750

751751
const apiClient = apiClientManager.clientOrThrow();
752752

753+
// Create the waitpoint before the span so we have the entity ID upfront
754+
const response = await apiClient.createInputStreamWaitpoint(ctx.run.id, {
755+
streamId: opts.id,
756+
timeout: options?.timeout,
757+
idempotencyKey: options?.idempotencyKey,
758+
idempotencyKeyTTL: options?.idempotencyKeyTTL,
759+
tags: options?.tags,
760+
lastSeqNum: inputStreams.lastSeqNum(opts.id),
761+
});
762+
753763
const result = await tracer.startActiveSpan(
754764
`inputStream.wait()`,
755765
async (span) => {
756-
// 1. Create a waitpoint linked to this input stream
757-
const response = await apiClient.createInputStreamWaitpoint(ctx.run.id, {
758-
streamId: opts.id,
759-
timeout: options?.timeout,
760-
idempotencyKey: options?.idempotencyKey,
761-
idempotencyKeyTTL: options?.idempotencyKeyTTL,
762-
tags: options?.tags,
763-
lastSeqNum: inputStreams.lastSeqNum(opts.id),
764-
});
765-
766-
// Set the entity ID now that we have the waitpoint ID
767-
span.setAttribute(SemanticInternalAttributes.ENTITY_ID, response.waitpointId);
768-
769-
// 2. Block the run on the waitpoint
766+
// 1. Block the run on the waitpoint
770767
const waitResponse = await apiClient.waitForWaitpointToken({
771768
runFriendlyId: ctx.run.id,
772769
waitpointFriendlyId: response.waitpointId,
@@ -776,10 +773,10 @@ function input<TData>(opts: { id: string }): RealtimeDefinedInputStream<TData> {
776773
throw new Error("Failed to block on input stream waitpoint");
777774
}
778775

779-
// 3. Suspend the task
776+
// 2. Suspend the task
780777
const waitResult = await runtime.waitUntil(response.waitpointId);
781778

782-
// 4. Parse the output
779+
// 3. Parse the output
783780
const data =
784781
waitResult.output !== undefined
785782
? await conditionallyImportAndParsePacket(
@@ -806,6 +803,7 @@ function input<TData>(opts: { id: string }): RealtimeDefinedInputStream<TData> {
806803
attributes: {
807804
[SemanticInternalAttributes.STYLE_ICON]: "wait",
808805
[SemanticInternalAttributes.ENTITY_TYPE]: "waitpoint",
806+
[SemanticInternalAttributes.ENTITY_ID]: response.waitpointId,
809807
streamId: opts.id,
810808
...accessoryAttributes({
811809
items: [

references/ai-chat/src/trigger/chat.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ declare const Deno: unknown;
6464

6565
export const aiChat = chat.task({
6666
id: "ai-chat",
67+
warmTimeoutInSeconds: 10,
6768
run: async ({ messages, stopSignal }) => {
6869
return streamText({
6970
model: openai("gpt-4o-mini"),
@@ -72,6 +73,9 @@ export const aiChat = chat.task({
7273
tools: { inspectEnvironment },
7374
stopWhen: stepCountIs(10),
7475
abortSignal: stopSignal,
76+
experimental_telemetry: {
77+
isEnabled: true,
78+
}
7579
});
7680
},
7781
});

0 commit comments

Comments
 (0)