Skip to content

Commit e4f1547

Browse files
committed
Add support for toUIMessageStream() options
1 parent 1c56bcf commit e4f1547

File tree

5 files changed

+230
-13
lines changed

5 files changed

+230
-13
lines changed

docs/ai-chat/backend.mdx

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -504,6 +504,89 @@ run: async ({ messages, signal }) => {
504504
Longer warm timeout means faster responses but more compute usage. Set to `0` to suspend immediately after each turn (minimum latency cost, slight delay on next message).
505505
</Info>
506506

507+
#### Stream options
508+
509+
Control how `streamText` results are converted to the frontend stream via `toUIMessageStream()`. Set static defaults on the task, or override per-turn.
510+
511+
##### Error handling with onError
512+
513+
When `streamText` encounters an error mid-stream (rate limits, API failures, network errors), the `onError` callback converts it to a string that's sent to the frontend as an `{ type: "error", errorText }` chunk. The AI SDK's `useChat` receives this via its `onError` callback.
514+
515+
By default, the raw error message is sent to the frontend. Use `onError` to sanitize errors and avoid leaking internal details:
516+
517+
```ts
518+
export const myChat = chat.task({
519+
id: "my-chat",
520+
uiMessageStreamOptions: {
521+
onError: (error) => {
522+
// Log the full error server-side for debugging
523+
console.error("Stream error:", error);
524+
// Return a sanitized message — this is what the frontend sees
525+
if (error instanceof Error && error.message.includes("rate limit")) {
526+
return "Rate limited — please wait a moment and try again.";
527+
}
528+
return "Something went wrong. Please try again.";
529+
},
530+
},
531+
run: async ({ messages, signal }) => {
532+
return streamText({ model: openai("gpt-4o"), messages, abortSignal: signal });
533+
},
534+
});
535+
```
536+
537+
`onError` is also called for tool execution errors, so a single handler covers both LLM errors and tool failures.
538+
539+
On the frontend, handle the error in `useChat`:
540+
541+
```tsx
542+
const { messages, sendMessage } = useChat({
543+
transport,
544+
onError: (error) => {
545+
// error.message contains the string returned by your onError handler
546+
toast.error(error.message);
547+
},
548+
});
549+
```
550+
551+
##### Reasoning and sources
552+
553+
Control which AI SDK features are forwarded to the frontend:
554+
555+
```ts
556+
export const myChat = chat.task({
557+
id: "my-chat",
558+
uiMessageStreamOptions: {
559+
sendReasoning: true, // Forward model reasoning (default: true)
560+
sendSources: true, // Forward source citations (default: false)
561+
},
562+
run: async ({ messages, signal }) => {
563+
return streamText({ model: openai("gpt-4o"), messages, abortSignal: signal });
564+
},
565+
});
566+
```
567+
568+
##### Per-turn overrides
569+
570+
Override per-turn with `chat.setUIMessageStreamOptions()` — per-turn values merge with the static config (per-turn wins on conflicts). The override is cleared automatically after each turn.
571+
572+
```ts
573+
run: async ({ messages, clientData, signal }) => {
574+
// Enable reasoning only for certain models
575+
if (clientData.model?.includes("claude")) {
576+
chat.setUIMessageStreamOptions({ sendReasoning: true });
577+
}
578+
return streamText({ model: openai(clientData.model ?? "gpt-4o"), messages, abortSignal: signal });
579+
},
580+
```
581+
582+
`chat.setUIMessageStreamOptions()` works across all abstraction levels — `chat.task()`, `chat.createSession()` / `turn.complete()`, and `chat.pipeAndCapture()`.
583+
584+
See [ChatUIMessageStreamOptions](/ai-chat/reference#chatuimessagestreamoptions) for the full reference.
585+
586+
<Note>
587+
`onFinish` is managed internally for response capture and cannot be overridden here. Use `streamText`'s `onFinish` callback for custom finish handling, or use [raw task mode](#raw-task-with-primitives) for full control over `toUIMessageStream()`.
588+
</Note>
589+
507590
### Manual mode with task()
508591

509592
If you need full control over task options, use the standard `task()` with `ChatTaskPayload` and `chat.pipe()`:
@@ -647,6 +730,8 @@ for await (const turn of session) {
647730

648731
For full control, use a standard `task()` with the composable primitives from the `chat` namespace. You manage everything: the turn loop, stop signals, message accumulation, and turn-complete signaling.
649732

733+
Raw task mode also lets you call `.toUIMessageStream()` yourself with any options — including `onFinish` and `originalMessages`. This is the right choice when you need complete control over the stream conversion beyond what `chat.setUIMessageStreamOptions()` provides.
734+
650735
### Primitives
651736

652737
| Primitive | Description |

docs/ai-chat/reference.mdx

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ Options for `chat.task()`.
2323
| `chatAccessTokenTTL` | `string` | `"1h"` | How long the scoped access token remains valid |
2424
| `preloadWarmTimeoutInSeconds` | `number` | Same as `warmTimeoutInSeconds` | Warm timeout after `onPreload` fires |
2525
| `preloadTimeout` | `string` | Same as `turnTimeout` | Suspend timeout for preloaded runs |
26+
| `uiMessageStreamOptions` | `ChatUIMessageStreamOptions` || Default options for `toUIMessageStream()`. Per-turn override via `chat.setUIMessageStreamOptions()` |
2627

2728
Plus all standard [TaskOptions](/tasks/overview)`retry`, `queue`, `machine`, `maxDuration`, etc.
2829

@@ -156,12 +157,28 @@ All methods available on the `chat` object from `@trigger.dev/sdk/ai`.
156157
| `chat.setTurnTimeout(duration)` | Override turn timeout at runtime (e.g. `"2h"`) |
157158
| `chat.setTurnTimeoutInSeconds(seconds)` | Override turn timeout at runtime (in seconds) |
158159
| `chat.setWarmTimeoutInSeconds(seconds)` | Override warm timeout at runtime |
160+
| `chat.setUIMessageStreamOptions(options)` | Override `toUIMessageStream()` options for the current turn |
159161
| `chat.defer(promise)` | Run background work in parallel with streaming, awaited before `onTurnComplete` |
160162
| `chat.isStopped()` | Check if the current turn was stopped by the user |
161163
| `chat.cleanupAbortedParts(message)` | Remove incomplete parts from a stopped response message |
162164
| `chat.stream` | Typed chat output stream — use `.writer()`, `.pipe()`, `.append()`, `.read()` |
163165
| `chat.MessageAccumulator` | Class that accumulates conversation messages across turns |
164166

167+
## ChatUIMessageStreamOptions
168+
169+
Options for customizing `toUIMessageStream()`. Set as static defaults via `uiMessageStreamOptions` on `chat.task()`, or override per-turn via `chat.setUIMessageStreamOptions()`. See [Stream options](/ai-chat/backend#stream-options) for usage examples.
170+
171+
Derived from the AI SDK's `UIMessageStreamOptions` with `onFinish`, `originalMessages`, and `generateMessageId` omitted (managed internally).
172+
173+
| Option | Type | Default | Description |
174+
|--------|------|---------|-------------|
175+
| `onError` | `(error: unknown) => string` | Raw error message | Called on LLM errors and tool execution errors. Return a sanitized string — sent as `{ type: "error", errorText }` to the frontend. |
176+
| `sendReasoning` | `boolean` | `true` | Send reasoning parts to the client |
177+
| `sendSources` | `boolean` | `false` | Send source parts to the client |
178+
| `sendFinish` | `boolean` | `true` | Send the finish event. Set to `false` when chaining multiple `streamText` calls. |
179+
| `sendStart` | `boolean` | `true` | Send the message start event. Set to `false` when chaining. |
180+
| `messageMetadata` | `(options: { part }) => metadata` || Extract message metadata to send to the client. Called on `start` and `finish` events. |
181+
165182
## TriggerChatTransport options
166183

167184
Options for the frontend transport constructor and `useTriggerChatTransport` hook.

packages/trigger-sdk/src/v3/ai.ts

Lines changed: 103 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import {
1414
type TaskSchema,
1515
type TaskWithSchema,
1616
} from "@trigger.dev/core/v3";
17-
import type { ModelMessage, UIMessage, UIMessageChunk } from "ai";
17+
import type { ModelMessage, UIMessage, UIMessageChunk, UIMessageStreamOptions } from "ai";
1818
import type { StreamWriteResult } from "@trigger.dev/core/v3";
1919
import { convertToModelMessages, dynamicTool, generateId as generateMessageId, jsonSchema, JSONSchema7, Schema, Tool, ToolCallOptions, zodSchema } from "ai";
2020
import { type Attributes, trace } from "@opentelemetry/api";
@@ -399,6 +399,10 @@ const chatDeferKey = locals.create<Set<Promise<unknown>>>("chat.defer");
399399
*/
400400
const chatPipeCountKey = locals.create<number>("chat.pipeCount");
401401
const chatStopControllerKey = locals.create<AbortController>("chat.stopController");
402+
/** Static (task-level) UIMessageStream options, set once during chatTask setup. @internal */
403+
const chatUIStreamStaticKey = locals.create<ChatUIMessageStreamOptions>("chat.uiMessageStreamOptions.static");
404+
/** Per-turn UIMessageStream options, set via chat.setUIMessageStreamOptions(). @internal */
405+
const chatUIStreamPerTurnKey = locals.create<ChatUIMessageStreamOptions>("chat.uiMessageStreamOptions.perTurn");
402406

403407
/**
404408
* Options for `pipeChat`.
@@ -423,6 +427,23 @@ export type PipeChatOptions = {
423427
spanName?: string;
424428
};
425429

430+
/**
431+
* Options for customizing the `toUIMessageStream()` call used when piping
432+
* `streamText` results to the frontend.
433+
*
434+
* Set static defaults via `uiMessageStreamOptions` on `chat.task()`, or
435+
* override per-turn via `chat.setUIMessageStreamOptions()`.
436+
*
437+
* `onFinish`, `originalMessages`, and `generateMessageId` are omitted because
438+
* they are managed internally for response capture and message accumulation.
439+
* Use `streamText`'s `onFinish` for custom finish handling, or drop down to
440+
* raw task mode with `chat.pipe()` for full control.
441+
*/
442+
export type ChatUIMessageStreamOptions = Omit<
443+
UIMessageStreamOptions<UIMessage>,
444+
"onFinish" | "originalMessages" | "generateMessageId"
445+
>;
446+
426447
/**
427448
* An object with a `toUIMessageStream()` method (e.g. `StreamTextResult` from `streamText()`).
428449
*/
@@ -803,6 +824,35 @@ export type ChatTaskOptions<
803824
* @default Same as `turnTimeout`
804825
*/
805826
preloadTimeout?: string;
827+
828+
/**
829+
* Default options for `toUIMessageStream()` when auto-piping or using
830+
* `turn.complete()` / `chat.pipeAndCapture()`.
831+
*
832+
* Controls how the `StreamTextResult` is converted to a `UIMessageChunk`
833+
* stream — error handling, reasoning/source visibility, metadata, etc.
834+
*
835+
* Can be overridden per-turn by calling `chat.setUIMessageStreamOptions()`
836+
* inside `run()` or lifecycle hooks. Per-turn values are merged on top
837+
* of these defaults (per-turn wins on conflicts).
838+
*
839+
* `onFinish`, `originalMessages`, and `generateMessageId` are managed
840+
* internally and cannot be overridden here. Use `streamText`'s `onFinish`
841+
* for custom finish handling, or drop to raw task mode for full control.
842+
*
843+
* @example
844+
* ```ts
845+
* chat.task({
846+
* id: "my-chat",
847+
* uiMessageStreamOptions: {
848+
* sendReasoning: true,
849+
* onError: (error) => error instanceof Error ? error.message : "An error occurred.",
850+
* },
851+
* run: async ({ messages, signal }) => { ... },
852+
* });
853+
* ```
854+
*/
855+
uiMessageStreamOptions?: ChatUIMessageStreamOptions;
806856
};
807857

808858
/**
@@ -851,6 +901,7 @@ function chatTask<
851901
chatAccessTokenTTL = "1h",
852902
preloadWarmTimeoutInSeconds,
853903
preloadTimeout,
904+
uiMessageStreamOptions,
854905
...restOptions
855906
} = options;
856907

@@ -867,6 +918,11 @@ function chatTask<
867918
activeSpan.setAttribute("gen_ai.conversation.id", payload.chatId);
868919
}
869920

921+
// Store static UIMessageStream options in locals so resolveUIMessageStreamOptions() can read them
922+
if (uiMessageStreamOptions) {
923+
locals.set(chatUIStreamStaticKey, uiMessageStreamOptions);
924+
}
925+
870926
let currentWirePayload = payload;
871927
const continuation = payload.continuation ?? false;
872928
const previousRunId = payload.previousRunId;
@@ -1192,6 +1248,7 @@ function chatTask<
11921248
if ((locals.get(chatPipeCountKey) ?? 0) === 0 && isUIMessageStreamable(result)) {
11931249
onFinishAttached = true;
11941250
const uiStream = result.toUIMessageStream({
1251+
...resolveUIMessageStreamOptions(),
11951252
onFinish: ({ responseMessage }: { responseMessage: UIMessage }) => {
11961253
capturedResponseMessage = responseMessage;
11971254
resolveOnFinish!();
@@ -1447,6 +1504,48 @@ function setWarmTimeoutInSeconds(seconds: number): void {
14471504
metadata.set(WARM_TIMEOUT_METADATA_KEY, seconds);
14481505
}
14491506

1507+
/**
1508+
* Override the `toUIMessageStream()` options for the current turn.
1509+
*
1510+
* These options control how the `StreamTextResult` is converted to a
1511+
* `UIMessageChunk` stream — error handling, reasoning/source visibility,
1512+
* message metadata, etc.
1513+
*
1514+
* Per-turn options are merged on top of the static `uiMessageStreamOptions`
1515+
* set on `chat.task()`. Per-turn values win on conflicts.
1516+
*
1517+
* @example
1518+
* ```ts
1519+
* run: async ({ messages, signal }) => {
1520+
* chat.setUIMessageStreamOptions({
1521+
* sendReasoning: true,
1522+
* onError: (error) => error instanceof Error ? error.message : "An error occurred.",
1523+
* });
1524+
* return streamText({ model, messages, abortSignal: signal });
1525+
* }
1526+
* ```
1527+
*/
1528+
function setUIMessageStreamOptions(options: ChatUIMessageStreamOptions): void {
1529+
locals.set(chatUIStreamPerTurnKey, options);
1530+
}
1531+
1532+
/**
1533+
* Resolve the effective UIMessageStream options by merging:
1534+
* 1. Static task-level options (from `chat.task({ uiMessageStreamOptions })`)
1535+
* 2. Per-turn overrides (from `chat.setUIMessageStreamOptions()`)
1536+
*
1537+
* Per-turn values win on conflicts. Clears the per-turn override after reading
1538+
* so it doesn't leak into subsequent turns.
1539+
* @internal
1540+
*/
1541+
function resolveUIMessageStreamOptions(): ChatUIMessageStreamOptions {
1542+
const staticOptions = locals.get(chatUIStreamStaticKey) ?? {};
1543+
const perTurnOptions = locals.get(chatUIStreamPerTurnKey) ?? {};
1544+
// Clear per-turn override so it doesn't leak into subsequent turns
1545+
locals.set(chatUIStreamPerTurnKey, undefined);
1546+
return { ...staticOptions, ...perTurnOptions };
1547+
}
1548+
14501549
// ---------------------------------------------------------------------------
14511550
// Stop detection
14521551
// ---------------------------------------------------------------------------
@@ -1641,6 +1740,7 @@ async function pipeChatAndCapture(
16411740
const onFinishPromise = new Promise<void>((r) => { resolveOnFinish = r; });
16421741

16431742
const uiStream = source.toUIMessageStream({
1743+
...resolveUIMessageStreamOptions(),
16441744
onFinish: ({ responseMessage }: { responseMessage: UIMessage }) => {
16451745
captured = responseMessage;
16461746
resolveOnFinish!();
@@ -2180,6 +2280,8 @@ export const chat = {
21802280
setTurnTimeoutInSeconds,
21812281
/** Override the warm timeout at runtime. See {@link setWarmTimeoutInSeconds}. */
21822282
setWarmTimeoutInSeconds,
2283+
/** Override toUIMessageStream() options for the current turn. See {@link setUIMessageStreamOptions}. */
2284+
setUIMessageStreamOptions,
21832285
/** Check if the current turn was stopped by the user. See {@link isStopped}. */
21842286
isStopped,
21852287
/** Clean up aborted parts from a UIMessage. See {@link cleanupAbortedParts}. */

0 commit comments

Comments
 (0)