Skip to content

Commit d51ed95

Browse files
committed
add stopping support and fix issue with the OpenAI responses API and stopped streams
1 parent dd5da5a commit d51ed95

File tree

4 files changed

+68
-24
lines changed

4 files changed

+68
-24
lines changed

packages/trigger-sdk/src/v3/ai.ts

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,28 @@ export type ChatTaskRunPayload = ChatTaskPayload & ChatTaskSignals;
216216
const messagesInput = streams.input<ChatTaskPayload>({ id: CHAT_MESSAGES_STREAM_ID });
217217
const stopInput = streams.input<{ stop: true; message?: string }>({ id: CHAT_STOP_STREAM_ID });
218218

219+
/**
220+
* Strips provider-specific IDs from message parts so that partial/stopped
221+
* assistant responses don't cause 404s when sent back to the provider
222+
* (e.g. OpenAI Responses API message IDs).
223+
* @internal
224+
*/
225+
function sanitizeMessages<TMessage extends UIMessage>(messages: TMessage[]): TMessage[] {
226+
return messages.map((msg) => {
227+
if (msg.role !== "assistant" || !msg.parts) return msg;
228+
return {
229+
...msg,
230+
parts: msg.parts.map((part: any) => {
231+
// Strip provider-specific metadata (e.g. OpenAI Responses API itemId)
232+
// and streaming state from assistant message parts. These cause 404s
233+
// when partial/stopped responses are sent back to the provider.
234+
const { providerMetadata, state, id, ...rest } = part;
235+
return rest;
236+
}),
237+
};
238+
});
239+
}
240+
219241
/**
220242
* Tracks how many times `pipeChat` has been called in the current `chatTask` run.
221243
* Used to prevent double-piping when a user both calls `pipeChat()` manually
@@ -454,6 +476,7 @@ function chatTask<TIdentifier extends string>(
454476
try {
455477
const result = await userRun({
456478
...currentPayload,
479+
messages: sanitizeMessages(currentPayload.messages),
457480
signal: combinedSignal,
458481
cancelSignal,
459482
stopSignal,

packages/trigger-sdk/src/v3/chat.ts

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ const DEFAULT_STREAM_KEY = "chat";
3030
const DEFAULT_BASE_URL = "https://api.trigger.dev";
3131
const DEFAULT_STREAM_TIMEOUT_SECONDS = 120;
3232

33+
34+
3335
/**
3436
* Options for creating a TriggerChatTransport.
3537
*/
@@ -91,6 +93,8 @@ type ChatSessionState = {
9193
publicAccessToken: string;
9294
/** Last SSE event ID — used to resume the stream without replaying old events. */
9395
lastEventId?: string;
96+
/** Set when the stream was aborted mid-turn (stop). On reconnect, skip chunks until __trigger_turn_complete. */
97+
skipToTurnComplete?: boolean;
9498
};
9599

96100
/**
@@ -164,14 +168,12 @@ export class TriggerChatTransport implements ChatTransport<UIMessage> {
164168
};
165169

166170
const session = this.sessions.get(chatId);
167-
168171
// If we have an existing run, send the message via input stream
169172
// to resume the conversation in the same run.
170173
if (session?.runId) {
171174
try {
172175
const apiClient = new ApiClient(this.baseURL, session.publicAccessToken);
173176
await apiClient.sendInputStream(session.runId, CHAT_MESSAGES_STREAM_ID, payload);
174-
175177
return this.subscribeToStream(
176178
session.runId,
177179
session.publicAccessToken,
@@ -205,7 +207,6 @@ export class TriggerChatTransport implements ChatTransport<UIMessage> {
205207
runId,
206208
publicAccessToken: publicAccessToken ?? currentToken,
207209
});
208-
209210
return this.subscribeToStream(
210211
runId,
211212
publicAccessToken ?? currentToken,
@@ -256,7 +257,8 @@ export class TriggerChatTransport implements ChatTransport<UIMessage> {
256257
abortSignal.addEventListener(
257258
"abort",
258259
() => {
259-
if (session?.runId) {
260+
if (session) {
261+
session.skipToTurnComplete = true;
260262
const api = new ApiClient(this.baseURL, session.publicAccessToken);
261263
api
262264
.sendInputStream(session.runId, CHAT_STOP_STREAM_ID, { stop: true })
@@ -283,16 +285,18 @@ export class TriggerChatTransport implements ChatTransport<UIMessage> {
283285
try {
284286
const sseStream = await subscription.subscribe();
285287
const reader = sseStream.getReader();
288+
let chunkCount = 0;
286289

287290
try {
288291
while (true) {
289292
const { done, value } = await reader.read();
290293

291294
if (done) {
292-
// Stream closed without a control chunk — the run has
293-
// ended (or was killed). Clear the session so that the
294-
// next message triggers a fresh run.
295-
if (chatId) {
295+
// Only delete session if the stream ended naturally (not aborted by stop).
296+
// When the user clicks stop, the abort closes the SSE reader which
297+
// returns done=true, but the run is still alive and waiting for
298+
// the next message via input streams.
299+
if (chatId && !combinedSignal.aborted) {
296300
this.sessions.delete(chatId);
297301
}
298302
controller.close();
@@ -315,11 +319,17 @@ export class TriggerChatTransport implements ChatTransport<UIMessage> {
315319
if (value.chunk != null && typeof value.chunk === "object") {
316320
const chunk = value.chunk as Record<string, unknown>;
317321

318-
// Intercept the turn-complete control chunk emitted by
319-
// `chatTask` after the AI response stream completes. This
320-
// chunk is never forwarded to the AI SDK consumer.
322+
// After a stop, skip leftover chunks from the stopped turn
323+
// until we see the __trigger_turn_complete marker.
324+
if (session?.skipToTurnComplete) {
325+
if (chunk.type === "__trigger_turn_complete") {
326+
session.skipToTurnComplete = false;
327+
chunkCount = 0;
328+
}
329+
continue;
330+
}
331+
321332
if (chunk.type === "__trigger_turn_complete" && chatId) {
322-
// Abort the underlying fetch to close the SSE connection
323333
internalAbort.abort();
324334
try {
325335
controller.close();
@@ -329,6 +339,7 @@ export class TriggerChatTransport implements ChatTransport<UIMessage> {
329339
return;
330340
}
331341

342+
chunkCount++;
332343
controller.enqueue(chunk as unknown as UIMessageChunk);
333344
}
334345
}

references/ai-chat/src/components/chat.tsx

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ export function Chat() {
7979
baseURL: process.env.NEXT_PUBLIC_TRIGGER_API_URL,
8080
});
8181

82-
const { messages, sendMessage, status, error } = useChat({
82+
const { messages, sendMessage, stop, status, error } = useChat({
8383
transport,
8484
});
8585

@@ -152,13 +152,23 @@ export function Chat() {
152152
placeholder="Type a message…"
153153
className="flex-1 rounded-lg border border-gray-300 px-3 py-2 text-sm outline-none focus:border-blue-500 focus:ring-1 focus:ring-blue-500"
154154
/>
155-
<button
156-
type="submit"
157-
disabled={!input.trim() || status === "streaming"}
158-
className="rounded-lg bg-blue-600 px-4 py-2 text-sm font-medium text-white hover:bg-blue-700 disabled:opacity-50"
159-
>
160-
Send
161-
</button>
155+
{status === "streaming" ? (
156+
<button
157+
type="button"
158+
onClick={stop}
159+
className="rounded-lg bg-red-600 px-4 py-2 text-sm font-medium text-white hover:bg-red-700"
160+
>
161+
Stop
162+
</button>
163+
) : (
164+
<button
165+
type="submit"
166+
disabled={!input.trim()}
167+
className="rounded-lg bg-blue-600 px-4 py-2 text-sm font-medium text-white hover:bg-blue-700 disabled:opacity-50"
168+
>
169+
Send
170+
</button>
171+
)}
162172
</form>
163173
</div>
164174
);

references/ai-chat/src/trigger/chat.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { chat } from "@trigger.dev/sdk/ai";
2-
import { streamText, convertToModelMessages, tool } from "ai";
2+
import { streamText, convertToModelMessages, tool, stepCountIs } from "ai";
33
import { openai } from "@ai-sdk/openai";
44
import { z } from "zod";
55
import os from "node:os";
@@ -64,14 +64,14 @@ declare const Deno: unknown;
6464

6565
export const aiChat = chat.task({
6666
id: "ai-chat",
67-
run: async ({ messages, signal }) => {
67+
run: async ({ messages, stopSignal }) => {
6868
return streamText({
6969
model: openai("gpt-4o-mini"),
7070
system: "You are a helpful assistant. Be concise and friendly.",
7171
messages: await convertToModelMessages(messages),
7272
tools: { inspectEnvironment },
73-
maxSteps: 3,
74-
abortSignal: signal,
73+
stopWhen: stepCountIs(10),
74+
abortSignal: stopSignal,
7575
});
7676
},
7777
});

0 commit comments

Comments
 (0)