|
| 1 | +# AI Chat Architecture |
| 2 | + |
| 3 | +## System Overview |
| 4 | + |
| 5 | +```mermaid |
| 6 | +graph TB |
| 7 | + subgraph Frontend["Frontend (Browser)"] |
| 8 | + UC[useChat Hook] |
| 9 | + TCT[TriggerChatTransport] |
| 10 | + UI[Chat UI Components] |
| 11 | + end |
| 12 | +
|
| 13 | + subgraph Platform["Trigger.dev Platform"] |
| 14 | + API[REST API] |
| 15 | + RS[Realtime Streams] |
| 16 | + RE[Run Engine] |
| 17 | + end |
| 18 | +
|
| 19 | + subgraph Worker["Task Worker"] |
| 20 | + CT[chat.task Turn Loop] |
| 21 | + ST[streamText / AI SDK] |
| 22 | + LLM[LLM Provider] |
| 23 | + SUB[Subtasks via ai.tool] |
| 24 | + end |
| 25 | +
|
| 26 | + UI -->|user types| UC |
| 27 | + UC -->|sendMessages| TCT |
| 28 | + TCT -->|triggerTask / sendInputStream| API |
| 29 | + API -->|queue run / deliver input| RE |
| 30 | + RE -->|execute| CT |
| 31 | + CT -->|call| ST |
| 32 | + ST -->|API call| LLM |
| 33 | + LLM -->|stream chunks| ST |
| 34 | + ST -->|UIMessageChunks| RS |
| 35 | + RS -->|SSE| TCT |
| 36 | + TCT -->|ReadableStream| UC |
| 37 | + UC -->|update| UI |
| 38 | + CT -->|triggerAndWait| SUB |
| 39 | + SUB -->|chat.stream target:root| RS |
| 40 | +``` |
| 41 | + |
| 42 | +## Detailed Flow: New Chat (First Message) |
| 43 | + |
| 44 | +```mermaid |
| 45 | +sequenceDiagram |
| 46 | + participant User |
| 47 | + participant useChat as useChat + Transport |
| 48 | + participant API as Trigger.dev API |
| 49 | + participant Task as chat.task Worker |
| 50 | + participant LLM as LLM Provider |
| 51 | +
|
| 52 | + User->>useChat: sendMessage("Hello") |
| 53 | + useChat->>useChat: No session for chatId → trigger new run |
| 54 | +
|
| 55 | + useChat->>API: triggerTask(payload, tags: [chat:id]) |
| 56 | + API-->>useChat: { runId, publicAccessToken } |
| 57 | + useChat->>useChat: Store session, subscribe to SSE |
| 58 | +
|
| 59 | + API->>Task: Start run with ChatTaskWirePayload |
| 60 | +
|
| 61 | + Note over Task: Preload phase skipped (trigger ≠ "preload") |
| 62 | +
|
| 63 | + rect rgb(240, 248, 255) |
| 64 | + Note over Task: Turn 0 |
| 65 | + Task->>Task: convertToModelMessages(uiMessages) |
| 66 | + Task->>Task: Mint access token |
| 67 | + Task->>Task: onChatStart({ chatId, messages, clientData }) |
| 68 | + Task->>Task: onTurnStart({ chatId, messages, uiMessages }) |
| 69 | + Task->>LLM: streamText({ model, messages, abortSignal }) |
| 70 | + LLM-->>Task: Stream response chunks |
| 71 | + Task->>API: streams.pipe("chat", uiStream) |
| 72 | + API-->>useChat: SSE: UIMessageChunks |
| 73 | + useChat-->>User: Render streaming text |
| 74 | + Task->>Task: onFinish → capturedResponseMessage |
| 75 | + Task->>Task: Accumulate response in messages |
| 76 | + Task->>API: Write __trigger_turn_complete chunk |
| 77 | + API-->>useChat: SSE: { type: __trigger_turn_complete, publicAccessToken } |
| 78 | + useChat->>useChat: Close stream, update session |
| 79 | + Task->>Task: onTurnComplete({ messages, uiMessages, stopped }) |
| 80 | + end |
| 81 | +
|
| 82 | + rect rgb(255, 248, 240) |
| 83 | + Note over Task: Wait for next message |
| 84 | + Task->>Task: messagesInput.once() [warm, 30s] |
| 85 | + Note over Task: No message → suspend |
| 86 | + Task->>Task: messagesInput.wait() [suspended, 1h] |
| 87 | + end |
| 88 | +``` |
| 89 | + |
| 90 | +## Detailed Flow: Multi-Turn (Subsequent Messages) |
| 91 | + |
| 92 | +```mermaid |
| 93 | +sequenceDiagram |
| 94 | + participant User |
| 95 | + participant useChat as useChat + Transport |
| 96 | + participant API as Trigger.dev API |
| 97 | + participant Task as chat.task Worker |
| 98 | + participant LLM as LLM Provider |
| 99 | +
|
| 100 | + Note over Task: Suspended, waiting for message |
| 101 | +
|
| 102 | + User->>useChat: sendMessage("Tell me more") |
| 103 | + useChat->>useChat: Session exists → send via input stream |
| 104 | + useChat->>API: sendInputStream(runId, "chat-messages", payload) |
| 105 | + Note right of useChat: Only sends new message<br/>(not full history) |
| 106 | +
|
| 107 | + API->>Task: Deliver to messagesInput |
| 108 | + Task->>Task: Wake from suspend |
| 109 | +
|
| 110 | + rect rgb(240, 248, 255) |
| 111 | + Note over Task: Turn 1 |
| 112 | + Task->>Task: Append new message to accumulators |
| 113 | + Task->>Task: Mint fresh access token |
| 114 | + Task->>Task: onTurnStart({ turn: 1, messages }) |
| 115 | + Task->>LLM: streamText({ messages: [all accumulated] }) |
| 116 | + LLM-->>Task: Stream response |
| 117 | + Task->>API: streams.pipe("chat", uiStream) |
| 118 | + API-->>useChat: SSE: UIMessageChunks |
| 119 | + useChat-->>User: Render streaming text |
| 120 | + Task->>API: Write __trigger_turn_complete |
| 121 | + Task->>Task: onTurnComplete({ turn: 1 }) |
| 122 | + end |
| 123 | +
|
| 124 | + Task->>Task: Wait for next message (warm → suspend) |
| 125 | +``` |
| 126 | + |
| 127 | +## Stop Signal Flow |
| 128 | + |
| 129 | +```mermaid |
| 130 | +sequenceDiagram |
| 131 | + participant User |
| 132 | + participant useChat as useChat + Transport |
| 133 | + participant API as Trigger.dev API |
| 134 | + participant Task as chat.task Worker |
| 135 | + participant LLM as LLM Provider |
| 136 | +
|
| 137 | + Note over Task: Streaming response... |
| 138 | +
|
| 139 | + User->>useChat: Click "Stop" |
| 140 | + useChat->>API: sendInputStream(runId, "chat-stop", { stop: true }) |
| 141 | + useChat->>useChat: Set skipToTurnComplete = true |
| 142 | +
|
| 143 | + API->>Task: Deliver to stopInput |
| 144 | + Task->>Task: stopController.abort() |
| 145 | + Task->>LLM: AbortSignal fires |
| 146 | + LLM-->>Task: Stream ends (AbortError) |
| 147 | + Task->>Task: Catch AbortError, fall through |
| 148 | + Task->>Task: await onFinishPromise (race condition fix) |
| 149 | + Task->>Task: cleanupAbortedParts(responseMessage) |
| 150 | + Note right of Task: Remove partial tool calls<br/>Mark streaming parts as done |
| 151 | +
|
| 152 | + Task->>API: Write __trigger_turn_complete |
| 153 | + API-->>useChat: SSE: __trigger_turn_complete |
| 154 | + useChat->>useChat: skipToTurnComplete = false, close stream |
| 155 | +
|
| 156 | + Task->>Task: onTurnComplete({ stopped: true, responseMessage: cleaned }) |
| 157 | + Task->>Task: Wait for next message |
| 158 | +``` |
| 159 | + |
| 160 | +## Preload Flow |
| 161 | + |
| 162 | +```mermaid |
| 163 | +sequenceDiagram |
| 164 | + participant User |
| 165 | + participant useChat as useChat + Transport |
| 166 | + participant API as Trigger.dev API |
| 167 | + participant Task as chat.task Worker |
| 168 | +
|
| 169 | + User->>useChat: Click "New Chat" |
| 170 | + useChat->>API: transport.preload(chatId) |
| 171 | + Note right of useChat: payload: { messages: [], trigger: "preload" }<br/>tags: [chat:id, preload:true] |
| 172 | + API-->>useChat: { runId, publicAccessToken } |
| 173 | + useChat->>useChat: Store session |
| 174 | +
|
| 175 | + API->>Task: Start run (trigger = "preload") |
| 176 | +
|
| 177 | + rect rgb(240, 255, 240) |
| 178 | + Note over Task: Preload Phase |
| 179 | + Task->>Task: Mint access token |
| 180 | + Task->>Task: onPreload({ chatId, clientData }) |
| 181 | + Note right of Task: DB setup, load user context,<br/>load dynamic tools |
| 182 | + Task->>Task: messagesInput.once() [warm] |
| 183 | + Note over Task: Waiting for first message... |
| 184 | + end |
| 185 | +
|
| 186 | + Note over User: User is typing... |
| 187 | +
|
| 188 | + User->>useChat: sendMessage("Hello") |
| 189 | + useChat->>useChat: Session exists → send via input stream |
| 190 | + useChat->>API: sendInputStream(runId, "chat-messages", payload) |
| 191 | + API->>Task: Deliver message |
| 192 | +
|
| 193 | + rect rgb(240, 248, 255) |
| 194 | + Note over Task: Turn 0 (preloaded = true) |
| 195 | + Task->>Task: onChatStart({ preloaded: true }) |
| 196 | + Task->>Task: onTurnStart({ preloaded: true }) |
| 197 | + Task->>Task: run() with preloaded dynamic tools ready |
| 198 | + end |
| 199 | +``` |
| 200 | + |
| 201 | +## Subtask Streaming (Tool as Task) |
| 202 | + |
| 203 | +```mermaid |
| 204 | +sequenceDiagram |
| 205 | + participant useChat as useChat + Transport |
| 206 | + participant API as Trigger.dev API |
| 207 | + participant Chat as chat.task |
| 208 | + participant LLM as LLM Provider |
| 209 | + participant Sub as Subtask (ai.tool) |
| 210 | +
|
| 211 | + Chat->>LLM: streamText({ tools: { research: ai.tool(task) } }) |
| 212 | + LLM-->>Chat: Tool call: research({ query, urls }) |
| 213 | +
|
| 214 | + Chat->>API: triggerAndWait(subtask, input) |
| 215 | + Note right of Chat: Passes toolCallId, chatId,<br/>clientData via metadata |
| 216 | +
|
| 217 | + API->>Sub: Start subtask |
| 218 | +
|
| 219 | + Sub->>Sub: ai.chatContextOrThrow() → { chatId, clientData } |
| 220 | + Sub->>API: chat.stream.writer({ target: "root" }) |
| 221 | + Note right of Sub: Write data-research-progress<br/>chunks to parent's stream |
| 222 | + API-->>useChat: SSE: data-* chunks |
| 223 | + useChat-->>useChat: Render progress UI |
| 224 | +
|
| 225 | + Sub-->>Chat: Return result |
| 226 | + Chat->>LLM: Tool result |
| 227 | + LLM-->>Chat: Continue response |
| 228 | +``` |
| 229 | + |
| 230 | +## Continuation Flow (Run Timeout / Cancel) |
| 231 | + |
| 232 | +```mermaid |
| 233 | +sequenceDiagram |
| 234 | + participant User |
| 235 | + participant useChat as useChat + Transport |
| 236 | + participant API as Trigger.dev API |
| 237 | + participant Task as chat.task Worker |
| 238 | +
|
| 239 | + Note over Task: Previous run timed out / was cancelled |
| 240 | +
|
| 241 | + User->>useChat: sendMessage("Continue") |
| 242 | + useChat->>API: sendInputStream(runId, payload) |
| 243 | + API-->>useChat: Error (run dead) |
| 244 | +
|
| 245 | + useChat->>useChat: Delete session, set isContinuation = true |
| 246 | + useChat->>API: triggerTask(payload, continuation: true, previousRunId) |
| 247 | + API-->>useChat: New { runId, publicAccessToken } |
| 248 | +
|
| 249 | + API->>Task: Start new run |
| 250 | +
|
| 251 | + rect rgb(255, 245, 238) |
| 252 | + Note over Task: Turn 0 (continuation = true) |
| 253 | + Task->>Task: cleanupAbortedParts(incoming messages) |
| 254 | + Note right of Task: Strip incomplete tool calls<br/>from previous run's response |
| 255 | + Task->>Task: onChatStart({ continuation: true, previousRunId }) |
| 256 | + Task->>Task: Normal turn flow... |
| 257 | + end |
| 258 | +``` |
| 259 | + |
| 260 | +## Hook Lifecycle |
| 261 | + |
| 262 | +```mermaid |
| 263 | +graph TD |
| 264 | + START([Run Starts]) --> IS_PRELOAD{trigger = preload?} |
| 265 | +
|
| 266 | + IS_PRELOAD -->|Yes| PRELOAD[onPreload] |
| 267 | + PRELOAD --> WAIT_MSG[Wait for first message<br/>warm → suspend] |
| 268 | + WAIT_MSG --> TURN0 |
| 269 | +
|
| 270 | + IS_PRELOAD -->|No| TURN0 |
| 271 | +
|
| 272 | + TURN0[Turn 0] --> CHAT_START[onChatStart<br/>continuation, preloaded] |
| 273 | + CHAT_START --> TURN_START_0[onTurnStart] |
| 274 | + TURN_START_0 --> RUN_0[run → streamText] |
| 275 | + RUN_0 --> TURN_COMPLETE_0[onTurnComplete<br/>stopped, responseMessage] |
| 276 | +
|
| 277 | + TURN_COMPLETE_0 --> WAIT{Wait for<br/>next message} |
| 278 | + WAIT -->|Message arrives| TURN_N[Turn N] |
| 279 | + WAIT -->|Timeout| END_RUN([Run Ends]) |
| 280 | +
|
| 281 | + TURN_N --> TURN_START_N[onTurnStart] |
| 282 | + TURN_START_N --> RUN_N[run → streamText] |
| 283 | + RUN_N --> TURN_COMPLETE_N[onTurnComplete] |
| 284 | + TURN_COMPLETE_N --> WAIT |
| 285 | +``` |
| 286 | + |
| 287 | +## Stream Architecture |
| 288 | + |
| 289 | +```mermaid |
| 290 | +graph LR |
| 291 | + subgraph Output["Output Stream (chat)"] |
| 292 | + direction TB |
| 293 | + O1[UIMessageChunks<br/>text, reasoning, tools] |
| 294 | + O2[data-* custom chunks] |
| 295 | + O3[__trigger_turn_complete<br/>control chunk] |
| 296 | + end |
| 297 | +
|
| 298 | + subgraph Input["Input Streams"] |
| 299 | + direction TB |
| 300 | + I1[chat-messages<br/>User messages] |
| 301 | + I2[chat-stop<br/>Stop signal] |
| 302 | + end |
| 303 | +
|
| 304 | + Frontend -->|sendInputStream| I1 |
| 305 | + Frontend -->|sendInputStream| I2 |
| 306 | + I1 -->|messagesInput.once/wait| Worker |
| 307 | + I2 -->|stopInput.on| Worker |
| 308 | + Worker -->|streams.pipe / chat.stream| Output |
| 309 | + Subtask -->|chat.stream target:root| Output |
| 310 | + Output -->|SSE /realtime/v1/streams| Frontend |
| 311 | +``` |
0 commit comments