@@ -17,6 +17,7 @@ import type { StreamWriteResult } from "@trigger.dev/core/v3";
1717import { convertToModelMessages , dynamicTool , generateId as generateMessageId , jsonSchema , JSONSchema7 , Schema , Tool , ToolCallOptions , zodSchema } from "ai" ;
1818import { type Attributes , trace } from "@opentelemetry/api" ;
1919import { auth } from "./auth.js" ;
20+ import { locals } from "./locals.js" ;
2021import { metadata } from "./metadata.js" ;
2122import { streams } from "./streams.js" ;
2223import { createTask } from "./shared.js" ;
@@ -239,12 +240,11 @@ const messagesInput = streams.input<ChatTaskWirePayload>({ id: CHAT_MESSAGES_STR
239240const stopInput = streams . input < { stop : true ; message ?: string } > ( { id : CHAT_STOP_STREAM_ID } ) ;
240241
241242/**
242- * Tracks how many times `pipeChat` has been called in the current `chatTask` run.
243- * Used to prevent double-piping when a user both calls `pipeChat()` manually
244- * and returns a streamable from their `run` function.
243+ * Run-scoped pipe counter. Stored in locals so concurrent runs in the
244+ * same worker don't share state.
245245 * @internal
246246 */
247- let _chatPipeCount = 0 ;
247+ const chatPipeCountKey = locals . create < number > ( "chat.pipeCount" ) ;
248248
249249/**
250250 * Options for `pipeChat`.
@@ -336,7 +336,7 @@ async function pipeChat(
336336 source : UIMessageStreamable | AsyncIterable < unknown > | ReadableStream < unknown > ,
337337 options ?: PipeChatOptions
338338) : Promise < void > {
339- _chatPipeCount ++ ;
339+ locals . set ( chatPipeCountKey , ( locals . get ( chatPipeCountKey ) ?? 0 ) + 1 ) ;
340340 const streamKey = options ?. streamKey ?? CHAT_STREAM_KEY ;
341341
342342 let stream : AsyncIterable < unknown > | ReadableStream < unknown > ;
@@ -662,7 +662,7 @@ function chatTask<TIdentifier extends string>(
662662 const turnResult = await tracer . startActiveSpan (
663663 `chat turn ${ turn + 1 } ` ,
664664 async ( ) => {
665- _chatPipeCount = 0 ;
665+ locals . set ( chatPipeCountKey , 0 ) ;
666666
667667 // Per-turn stop controller (reset each turn)
668668 const stopController = new AbortController ( ) ;
@@ -792,7 +792,7 @@ function chatTask<TIdentifier extends string>(
792792 // Auto-pipe if the run function returned a StreamTextResult or similar,
793793 // but only if pipeChat() wasn't already called manually during this turn.
794794 // We call toUIMessageStream ourselves to attach onFinish for response capture.
795- if ( _chatPipeCount === 0 && isUIMessageStreamable ( result ) ) {
795+ if ( ( locals . get ( chatPipeCountKey ) ?? 0 ) === 0 && isUIMessageStreamable ( result ) ) {
796796 const uiStream = result . toUIMessageStream ( {
797797 onFinish : ( { responseMessage } : { responseMessage : UIMessage } ) => {
798798 capturedResponseMessage = responseMessage ;
0 commit comments