@@ -1567,6 +1567,156 @@ function cleanupAbortedParts(message: UIMessage): UIMessage {
15671567 };
15681568}
15691569
1570+ // ---------------------------------------------------------------------------
1571+ // Composable primitives for raw task chat
1572+ // ---------------------------------------------------------------------------
1573+
1574+ /**
1575+ * Create a managed stop signal wired to the chat stop input stream.
1576+ *
1577+ * Call once at the start of your run. Use `signal` as the abort signal for
1578+ * `streamText`. Call `reset()` at the start of each turn to get a fresh
1579+ * per-turn signal. Call `cleanup()` when the run ends.
1580+ *
1581+ * @example
1582+ * ```ts
1583+ * const stop = chat.createStopSignal();
1584+ * for (let turn = 0; turn < 100; turn++) {
1585+ * stop.reset();
1586+ * const result = streamText({ model, messages, abortSignal: stop.signal });
1587+ * await chat.pipe(result);
1588+ * // ...
1589+ * }
1590+ * stop.cleanup();
1591+ * ```
1592+ */
1593+ function createStopSignal(): { readonly signal: AbortSignal; reset: () => void; cleanup: () => void } {
1594+ let controller = new AbortController();
1595+ const sub = stopInput.on((data) => {
1596+ controller.abort(data?.message || "stopped");
1597+ });
1598+ return {
1599+ get signal() { return controller.signal; },
1600+ reset() { controller = new AbortController(); },
1601+ cleanup() { sub.off(); },
1602+ };
1603+ }
1604+
1605+ /**
1606+ * Signal the frontend that the current turn is complete.
1607+ *
1608+ * The `TriggerChatTransport` intercepts this to close the ReadableStream
1609+ * for the current turn. Call after piping the response stream.
1610+ *
1611+ * @example
1612+ * ```ts
1613+ * await chat.pipe(result);
1614+ * await chat.writeTurnComplete();
1615+ * ```
1616+ */
1617+ async function chatWriteTurnComplete(options?: { publicAccessToken?: string }): Promise<void> {
1618+ await writeTurnCompleteChunk(undefined, options?.publicAccessToken);
1619+ }
1620+
1621+ /**
1622+ * Pipe a `StreamTextResult` (or similar) to the chat stream and capture
1623+ * the assistant's response message via `onFinish`.
1624+ *
1625+ * Combines `toUIMessageStream()` + `onFinish` callback + `chat.pipe()`.
1626+ * Returns the captured `UIMessage`, or `undefined` if capture failed.
1627+ *
1628+ * @example
1629+ * ```ts
1630+ * const result = streamText({ model, messages, abortSignal: signal });
1631+ * const response = await chat.pipeAndCapture(result, { signal });
1632+ * if (response) conversation.addResponse(response);
1633+ * ```
1634+ */
1635+ async function pipeChatAndCapture(
1636+ source: UIMessageStreamable,
1637+ options?: { signal?: AbortSignal; spanName?: string }
1638+ ): Promise<UIMessage | undefined> {
1639+ let captured: UIMessage | undefined;
1640+ let resolveOnFinish: () => void;
1641+ const onFinishPromise = new Promise<void>((r) => { resolveOnFinish = r; });
1642+
1643+ const uiStream = source.toUIMessageStream({
1644+ onFinish: ({ responseMessage }: { responseMessage: UIMessage }) => {
1645+ captured = responseMessage;
1646+ resolveOnFinish!();
1647+ },
1648+ });
1649+
1650+ await pipeChat(uiStream, { signal: options?.signal, spanName: options?.spanName ?? "stream response" });
1651+ await onFinishPromise;
1652+
1653+ return captured;
1654+ }
1655+
1656+ /**
1657+ * Accumulates conversation messages across turns.
1658+ *
1659+ * Handles the transport protocol: turn 0 sends full history (replace),
1660+ * subsequent turns send only new messages (append), regenerate sends
1661+ * full history minus last assistant message (replace).
1662+ *
1663+ * @example
1664+ * ```ts
1665+ * const conversation = new chat.MessageAccumulator();
1666+ * for (let turn = 0; turn < 100; turn++) {
1667+ * const messages = await conversation.addIncoming(payload.messages, payload.trigger, turn);
1668+ * const result = streamText({ model, messages });
1669+ * const response = await chat.pipeAndCapture(result);
1670+ * if (response) await conversation.addResponse(response);
1671+ * }
1672+ * ```
1673+ */
1674+ class ChatMessageAccumulator {
1675+ modelMessages: ModelMessage[] = [];
1676+ uiMessages: UIMessage[] = [];
1677+
1678+ /**
1679+ * Add incoming messages from the transport payload.
1680+ * Returns the full accumulated model messages for `streamText`.
1681+ */
1682+ async addIncoming(
1683+ messages: UIMessage[],
1684+ trigger: string,
1685+ turn: number
1686+ ): Promise<ModelMessage[]> {
1687+ const cleaned = messages.map((m) =>
1688+ m.role === "assistant" ? cleanupAbortedParts(m) : m
1689+ );
1690+ const model = await convertToModelMessages(cleaned);
1691+
1692+ if (turn === 0 || trigger === "regenerate-message") {
1693+ this.modelMessages = model;
1694+ this.uiMessages = [...cleaned];
1695+ } else {
1696+ this.modelMessages.push(...model);
1697+ this.uiMessages.push(...cleaned);
1698+ }
1699+ return this.modelMessages;
1700+ }
1701+
1702+ /**
1703+ * Add the assistant's response to the accumulator.
1704+ * Call after `pipeAndCapture` with the captured response.
1705+ */
1706+ async addResponse(response: UIMessage): Promise<void> {
1707+ if (!response.id) {
1708+ response = { ...response, id: generateMessageId() };
1709+ }
1710+ this.uiMessages.push(response);
1711+ try {
1712+ const msgs = await convertToModelMessages([stripProviderMetadata(response)]);
1713+ this.modelMessages.push(...msgs);
1714+ } catch {
1715+ // Conversion failed — skip model message accumulation for this response
1716+ }
1717+ }
1718+ }
1719+
15701720// ---------------------------------------------------------------------------
15711721// chat.local — per-run typed data with Proxy access
15721722// ---------------------------------------------------------------------------
@@ -1825,6 +1975,16 @@ export const chat = {
18251975 defer: chatDefer,
18261976 /** Typed chat output stream for writing custom chunks or piping from subtasks. */
18271977 stream: chatStream,
1978+ /** Pre-built input stream for receiving messages from the transport. */
1979+ messages: messagesInput,
1980+ /** Create a managed stop signal wired to the stop input stream. See {@link createStopSignal}. */
1981+ createStopSignal,
1982+ /** Signal the frontend that the current turn is complete. See {@link chatWriteTurnComplete}. */
1983+ writeTurnComplete: chatWriteTurnComplete,
1984+ /** Pipe a stream and capture the response message. See {@link pipeChatAndCapture}. */
1985+ pipeAndCapture: pipeChatAndCapture,
1986+ /** Message accumulator class for raw task chat. See {@link ChatMessageAccumulator}. */
1987+ MessageAccumulator: ChatMessageAccumulator,
18281988};
18291989
18301990/**
0 commit comments