Skip to content

Commit cc1ce9b

Browse files
committed
build full example with persisting messages, adding necessary hooks, and documenting it all
1 parent 6b4e3dd commit cc1ce9b

File tree

21 files changed

+2114
-177
lines changed

21 files changed

+2114
-177
lines changed

docs/guides/ai-chat.mdx

Lines changed: 621 additions & 111 deletions
Large diffs are not rendered by default.

packages/trigger-sdk/src/v3/ai.ts

Lines changed: 223 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -379,6 +379,47 @@ async function pipeChat(
379379
* emits a control chunk and suspends via `messagesInput.wait()`. The frontend
380380
* transport resumes the same run by sending the next message via input streams.
381381
*/
382+
/**
383+
* Event passed to the `onChatStart` callback.
384+
*/
385+
export type ChatStartEvent = {
386+
/** The unique identifier for the chat session. */
387+
chatId: string;
388+
/** The initial model-ready messages for this conversation. */
389+
messages: ModelMessage[];
390+
/** Custom data from the frontend (passed via `metadata` on `sendMessage()` or the transport). */
391+
clientData: unknown;
392+
};
393+
394+
/**
395+
* Event passed to the `onTurnComplete` callback.
396+
*/
397+
export type TurnCompleteEvent = {
398+
/** The unique identifier for the chat session. */
399+
chatId: string;
400+
/** The full accumulated conversation in model format (all turns so far). */
401+
messages: ModelMessage[];
402+
/**
403+
* The full accumulated conversation in UI format (all turns so far).
404+
* This is the format expected by `useChat` — store this for persistence.
405+
*/
406+
uiMessages: UIMessage[];
407+
/**
408+
* Only the new model messages from this turn (user message(s) + assistant response).
409+
* Useful for appending to an existing conversation record.
410+
*/
411+
newMessages: ModelMessage[];
412+
/**
413+
* Only the new UI messages from this turn (user message(s) + assistant response).
414+
* Useful for inserting individual message records instead of overwriting the full history.
415+
*/
416+
newUIMessages: UIMessage[];
417+
/** The assistant's response for this turn (undefined if `pipeChat` was used manually). */
418+
responseMessage: UIMessage | undefined;
419+
/** The turn number (0-indexed). */
420+
turn: number;
421+
};
422+
382423
export type ChatTaskOptions<TIdentifier extends string> = Omit<
383424
TaskOptions<TIdentifier, ChatTaskWirePayload, unknown>,
384425
"run"
@@ -394,6 +435,35 @@ export type ChatTaskOptions<TIdentifier extends string> = Omit<
394435
*/
395436
run: (payload: ChatTaskRunPayload) => Promise<unknown>;
396437

438+
/**
439+
* Called on the first turn (turn 0) of a new run, before the `run` function executes.
440+
*
441+
* Use this to create the chat record in your database when a new conversation starts.
442+
*
443+
* @example
444+
* ```ts
445+
* onChatStart: async ({ chatId, messages, clientData }) => {
446+
* await db.chat.create({ data: { id: chatId, userId: clientData.userId } });
447+
* }
448+
* ```
449+
*/
450+
onChatStart?: (event: ChatStartEvent) => Promise<void> | void;
451+
452+
/**
453+
* Called after each turn completes (after the response is captured, before waiting
454+
* for the next message). Also fires on the final turn.
455+
*
456+
* Use this to persist the conversation to your database after each assistant response.
457+
*
458+
* @example
459+
* ```ts
460+
* onTurnComplete: async ({ chatId, messages }) => {
461+
* await db.chat.update({ where: { id: chatId }, data: { messages } });
462+
* }
463+
* ```
464+
*/
465+
onTurnComplete?: (event: TurnCompleteEvent) => Promise<void> | void;
466+
397467
/**
398468
* Maximum number of conversational turns (message round-trips) a single run
399469
* will handle before ending. After this many turns the run completes
@@ -456,6 +526,8 @@ function chatTask<TIdentifier extends string>(
456526
): Task<TIdentifier, ChatTaskWirePayload, unknown> {
457527
const {
458528
run: userRun,
529+
onChatStart,
530+
onTurnComplete,
459531
maxTurns = 100,
460532
turnTimeout = "1h",
461533
warmTimeoutInSeconds = 30,
@@ -478,6 +550,10 @@ function chatTask<TIdentifier extends string>(
478550
// user message(s) and the captured assistant response.
479551
let accumulatedMessages: ModelMessage[] = [];
480552

553+
// Accumulated UI messages for persistence. Mirrors the model accumulator
554+
// but in frontend-friendly UIMessage format (with parts, id, etc.).
555+
let accumulatedUIMessages: UIMessage[] = [];
556+
481557
// Mutable reference to the current turn's stop controller so the
482558
// stop input stream listener (registered once) can abort the right turn.
483559
let currentStopController: AbortController | undefined;
@@ -549,15 +625,52 @@ function chatTask<TIdentifier extends string>(
549625
// Turn 2+: only the new message(s) → appended to the accumulator.
550626
const incomingModelMessages = await convertToModelMessages(uiMessages);
551627

628+
// Track new messages for this turn (user input + assistant response).
629+
const turnNewModelMessages: ModelMessage[] = [];
630+
const turnNewUIMessages: UIMessage[] = [];
631+
552632
if (turn === 0) {
553633
accumulatedMessages = incomingModelMessages;
634+
accumulatedUIMessages = [...uiMessages];
635+
// On first turn, the "new" messages are just the last user message
636+
// (the rest is history). We'll add the response after streaming.
637+
if (uiMessages.length > 0) {
638+
turnNewUIMessages.push(uiMessages[uiMessages.length - 1]!);
639+
const lastModel = incomingModelMessages[incomingModelMessages.length - 1];
640+
if (lastModel) turnNewModelMessages.push(lastModel);
641+
}
554642
} else if (currentWirePayload.trigger === "regenerate-message") {
555643
// Regenerate: frontend sent full history with last assistant message
556644
// removed. Reset the accumulator to match.
557645
accumulatedMessages = incomingModelMessages;
646+
accumulatedUIMessages = [...uiMessages];
647+
// No new user messages for regenerate — just the response (added below)
558648
} else {
559649
// Submit: frontend sent only the new user message(s). Append to accumulator.
560650
accumulatedMessages.push(...incomingModelMessages);
651+
accumulatedUIMessages.push(...uiMessages);
652+
turnNewModelMessages.push(...incomingModelMessages);
653+
turnNewUIMessages.push(...uiMessages);
654+
}
655+
656+
// Fire onChatStart on the first turn
657+
if (turn === 0 && onChatStart) {
658+
await tracer.startActiveSpan(
659+
"onChatStart()",
660+
async () => {
661+
await onChatStart({
662+
chatId: currentWirePayload.chatId,
663+
messages: accumulatedMessages,
664+
clientData: wireMetadata,
665+
});
666+
},
667+
{
668+
attributes: {
669+
[SemanticInternalAttributes.STYLE_ICON]: "task-hook-onStart",
670+
[SemanticInternalAttributes.COLLAPSED]: true,
671+
},
672+
}
673+
);
561674
}
562675

563676
// Captured by the onFinish callback below — works even on abort/stop.
@@ -602,11 +715,14 @@ function chatTask<TIdentifier extends string>(
602715
// The onFinish callback fires even on abort/stop, so partial responses
603716
// from stopped generation are captured correctly.
604717
if (capturedResponseMessage) {
718+
accumulatedUIMessages.push(capturedResponseMessage);
719+
turnNewUIMessages.push(capturedResponseMessage);
605720
try {
606721
const responseModelMessages = await convertToModelMessages([
607722
stripProviderMetadata(capturedResponseMessage),
608723
]);
609724
accumulatedMessages.push(...responseModelMessages);
725+
turnNewModelMessages.push(...responseModelMessages);
610726
} catch {
611727
// Conversion failed — skip accumulation for this turn
612728
}
@@ -618,6 +734,30 @@ function chatTask<TIdentifier extends string>(
618734

619735
if (runSignal.aborted) return "exit";
620736

737+
// Fire onTurnComplete after response capture
738+
if (onTurnComplete) {
739+
await tracer.startActiveSpan(
740+
"onTurnComplete()",
741+
async () => {
742+
await onTurnComplete({
743+
chatId: currentWirePayload.chatId,
744+
messages: accumulatedMessages,
745+
uiMessages: accumulatedUIMessages,
746+
newMessages: turnNewModelMessages,
747+
newUIMessages: turnNewUIMessages,
748+
responseMessage: capturedResponseMessage,
749+
turn,
750+
});
751+
},
752+
{
753+
attributes: {
754+
[SemanticInternalAttributes.STYLE_ICON]: "task-hook-onComplete",
755+
[SemanticInternalAttributes.COLLAPSED]: true,
756+
},
757+
}
758+
);
759+
}
760+
621761
// Write turn-complete control chunk so frontend closes its stream
622762
await writeTurnCompleteChunk(currentWirePayload.chatId);
623763

@@ -629,9 +769,12 @@ function chatTask<TIdentifier extends string>(
629769

630770
// Phase 1: Keep the run warm for quick response to the next message.
631771
// The run stays active (using compute) during this window.
632-
if (warmTimeoutInSeconds > 0) {
772+
const effectiveWarmTimeout =
773+
(metadata.get(WARM_TIMEOUT_METADATA_KEY) as number | undefined) ?? warmTimeoutInSeconds;
774+
775+
if (effectiveWarmTimeout > 0) {
633776
const warm = await messagesInput.once({
634-
timeoutMs: warmTimeoutInSeconds * 1000,
777+
timeoutMs: effectiveWarmTimeout * 1000,
635778
spanName: "waiting (warm)",
636779
});
637780

@@ -643,8 +786,11 @@ function chatTask<TIdentifier extends string>(
643786
}
644787

645788
// Phase 2: Suspend the task (frees compute) until the next message arrives
789+
const effectiveTurnTimeout =
790+
(metadata.get(TURN_TIMEOUT_METADATA_KEY) as string | undefined) ?? turnTimeout;
791+
646792
const next = await messagesInput.wait({
647-
timeout: turnTimeout,
793+
timeout: effectiveTurnTimeout,
648794
spanName: "waiting (suspended)",
649795
});
650796

@@ -693,13 +839,87 @@ function chatTask<TIdentifier extends string>(
693839
* const token = await chat.createAccessToken("my-chat");
694840
* ```
695841
*/
842+
// ---------------------------------------------------------------------------
843+
// Runtime configuration helpers
844+
// ---------------------------------------------------------------------------
845+
846+
const TURN_TIMEOUT_METADATA_KEY = "chat.turnTimeout";
847+
const WARM_TIMEOUT_METADATA_KEY = "chat.warmTimeout";
848+
849+
/**
850+
* Override the turn timeout for subsequent turns in the current run.
851+
*
852+
* The turn timeout controls how long the run stays suspended (freeing compute)
853+
* waiting for the next user message. When it expires, the run completes
854+
* gracefully and the next message starts a fresh run.
855+
*
856+
* Call from inside a `chatTask` run function to adjust based on context.
857+
*
858+
* @param duration - A duration string (e.g. `"5m"`, `"1h"`, `"30s"`)
859+
*
860+
* @example
861+
* ```ts
862+
* run: async ({ messages, signal }) => {
863+
* chat.setTurnTimeout("2h");
864+
* return streamText({ model, messages, abortSignal: signal });
865+
* }
866+
* ```
867+
*/
868+
function setTurnTimeout(duration: string): void {
869+
metadata.set(TURN_TIMEOUT_METADATA_KEY, duration);
870+
}
871+
872+
/**
873+
* Override the turn timeout in seconds for subsequent turns in the current run.
874+
*
875+
* @param seconds - Number of seconds to wait for the next message before ending the run
876+
*
877+
* @example
878+
* ```ts
879+
* run: async ({ messages, signal }) => {
880+
* chat.setTurnTimeoutInSeconds(3600); // 1 hour
881+
* return streamText({ model, messages, abortSignal: signal });
882+
* }
883+
* ```
884+
*/
885+
function setTurnTimeoutInSeconds(seconds: number): void {
886+
metadata.set(TURN_TIMEOUT_METADATA_KEY, `${seconds}s`);
887+
}
888+
889+
/**
890+
* Override the warm timeout for subsequent turns in the current run.
891+
*
892+
* The warm timeout controls how long the run stays active (using compute)
893+
* after each turn, waiting for the next message. During this window,
894+
* responses are instant. After it expires, the run suspends.
895+
*
896+
* @param seconds - Number of seconds to stay warm (0 to suspend immediately)
897+
*
898+
* @example
899+
* ```ts
900+
* run: async ({ messages, signal }) => {
901+
* chat.setWarmTimeoutInSeconds(60);
902+
* return streamText({ model, messages, abortSignal: signal });
903+
* }
904+
* ```
905+
*/
906+
function setWarmTimeoutInSeconds(seconds: number): void {
907+
metadata.set(WARM_TIMEOUT_METADATA_KEY, seconds);
908+
}
909+
696910
export const chat = {
697911
/** Create a chat task. See {@link chatTask}. */
698912
task: chatTask,
699913
/** Pipe a stream to the chat transport. See {@link pipeChat}. */
700914
pipe: pipeChat,
701915
/** Create a public access token for a chat task. See {@link createChatAccessToken}. */
702916
createAccessToken: createChatAccessToken,
917+
/** Override the turn timeout at runtime (duration string). See {@link setTurnTimeout}. */
918+
setTurnTimeout,
919+
/** Override the turn timeout at runtime (seconds). See {@link setTurnTimeoutInSeconds}. */
920+
setTurnTimeoutInSeconds,
921+
/** Override the warm timeout at runtime. See {@link setWarmTimeoutInSeconds}. */
922+
setWarmTimeoutInSeconds,
703923
};
704924

705925
/**

packages/trigger-sdk/src/v3/chat-react.ts

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
* ```
2424
*/
2525

26-
import { useRef } from "react";
26+
import { useEffect, useRef } from "react";
2727
import {
2828
TriggerChatTransport,
2929
type TriggerChatTransportOptions,
@@ -57,6 +57,9 @@ export type UseTriggerChatTransportOptions<TTask extends AnyTask = AnyTask> = Om
5757
* For dynamic access tokens, pass a function — it will be called on each
5858
* request without needing to recreate the transport.
5959
*
60+
* The `onSessionChange` callback is kept in a ref so the transport always
61+
* calls the latest version without needing to be recreated.
62+
*
6063
* @example
6164
* ```tsx
6265
* import { useChat } from "@ai-sdk/react";
@@ -80,5 +83,12 @@ export function useTriggerChatTransport<TTask extends AnyTask = AnyTask>(
8083
if (ref.current === null) {
8184
ref.current = new TriggerChatTransport(options);
8285
}
86+
87+
// Keep onSessionChange up to date without recreating the transport
88+
const { onSessionChange } = options;
89+
useEffect(() => {
90+
ref.current?.setOnSessionChange(onSessionChange);
91+
}, [onSessionChange]);
92+
8393
return ref.current;
8494
}

0 commit comments

Comments
 (0)