Skip to content

Commit 02dca08

Browse files
committed
fix: AGENT/SCHEDULED runs misclassified + green up the chat.handover idle test
Two unrelated fixes that both block the ai-chat feature branch. apps/webapp queues concern — locked + specified-queue branch was silently dropping `taskKind`. The TTL-skip optimization on the backgroundWorkerTask lookup also skipped the only place we read `triggerSource`, so AGENT and SCHEDULED runs triggered with both `lockToVersion` and a queue override were annotated as STANDARD and disappeared from the run-list "Source" filter (and replicated to ClickHouse with `task_kind = 'STANDARD'`). The lookup now always runs and includes `triggerSource` in the same select; ttl is still gated on the override being absent. Mirrors the sibling locked-with- default-queue branch (line ~162) and the non-locked branch's `getTaskQueueInfo`. trigger-sdk test harness — `mockChatAgent` was leaving an `ApiClientMissingError` unhandled-rejection trail when an agent's suspend path tripped (the `chat.handover` idle-timeout test reliably hit it). The harness reused the real `SessionInputChannel`, whose `wait()` calls `apiClientManager.clientOrThrow()` — fine in production, fatal in a test process with no `TRIGGER_SECRET_KEY`. Added a `TestSessionInputChannel` subclass that overrides only `wait()` and resolves `{ok:false}` when the harness's run signal aborts; `on`/`once`/`peek`/`send` continue to flow through the real `sessionStreams` global. The harness threads its `runSignal.signal` in via a lazy getter so the channel reads it after the controller is constructed. All 97 sdk tests pass; webapp typecheck is clean.
1 parent eb1dc3a commit 02dca08

3 files changed

Lines changed: 84 additions & 21 deletions

File tree

apps/webapp/app/runEngine/concerns/queues.server.ts

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -107,19 +107,26 @@ export class DefaultQueueManager implements QueueManager {
107107
queueName = specifiedQueue.name;
108108
lockedQueueId = specifiedQueue.id;
109109

110-
// Only fetch task for TTL if caller didn't provide a per-trigger TTL
111-
if (request.body.options?.ttl === undefined) {
112-
const lockedTask = await this.replicaPrisma.backgroundWorkerTask.findFirst({
113-
where: {
114-
workerId: lockedBackgroundWorker.id,
115-
runtimeEnvironmentId: request.environment.id,
116-
slug: request.taskId,
117-
},
118-
select: { ttl: true },
119-
});
110+
// Always fetch the task so we can resolve `triggerSource` (which
111+
// becomes `taskKind` on annotations and replicates to ClickHouse).
112+
// Without this, AGENT/SCHEDULED runs triggered with
113+
// `lockToVersion` + a queue override would be annotated as
114+
// STANDARD and disappear from the run-list "Source" filter.
115+
// `ttl` is read from the same row but only used when the caller
116+
// didn't specify a per-trigger TTL.
117+
const lockedTask = await this.replicaPrisma.backgroundWorkerTask.findFirst({
118+
where: {
119+
workerId: lockedBackgroundWorker.id,
120+
runtimeEnvironmentId: request.environment.id,
121+
slug: request.taskId,
122+
},
123+
select: { ttl: true, triggerSource: true },
124+
});
120125

126+
if (request.body.options?.ttl === undefined) {
121127
taskTtl = lockedTask?.ttl;
122128
}
129+
taskKind = lockedTask?.triggerSource;
123130
} else {
124131
// No queue override - fetch task with queue to get both default queue and TTL
125132
const lockedTask = await this.replicaPrisma.backgroundWorkerTask.findFirst({

packages/trigger-sdk/src/v3/test/mock-chat-agent.ts

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -260,11 +260,15 @@ export function mockChatAgent(
260260
});
261261

262262
// Install the session open override so `sessions.open(id)` returns a
263-
// SessionHandle with an in-memory `.out` that captures writes. `.in`
264-
// stays the real SessionInputChannel — it routes through the
265-
// `sessionStreams` global, which the mock-task-context installs as a
266-
// TestSessionStreamManager.
267-
__setSessionOpenImplForTests((id) => createTestSessionHandle(id, sessionOutState));
263+
// SessionHandle with an in-memory `.out` that captures writes. The
264+
// `.in` channel routes record subscriptions (`on`/`once`/`peek`)
265+
// through the `sessionStreams` global — the mock task context
266+
// installs a `TestSessionStreamManager` there — and stubs `wait()`
267+
// so the suspend path resolves cleanly on `runSignal.abort()` without
268+
// touching the api client.
269+
__setSessionOpenImplForTests((id) =>
270+
createTestSessionHandle(id, sessionOutState, () => runSignal?.signal)
271+
);
268272

269273
// Install the session start override so any test path that invokes
270274
// `sessions.start()` (typically through a server action shim like

packages/trigger-sdk/src/v3/test/test-session-handle.ts

Lines changed: 58 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import type {
44
StreamWriteResult,
55
WriterStreamOptions,
66
} from "@trigger.dev/core/v3";
7-
import { ensureReadableStream } from "@trigger.dev/core/v3";
7+
import { ensureReadableStream, ManualWaitpointPromise } from "@trigger.dev/core/v3";
88
import {
99
SessionHandle,
1010
SessionInputChannel,
@@ -13,6 +13,51 @@ import {
1313
SessionSubscribeOptions,
1414
} from "../sessions.js";
1515

16+
/**
17+
* Stub for `SessionInputChannel.wait` that skips the apiClient round-trip
18+
* the production path makes via `createSessionStreamWaitpoint`. Without
19+
* this override, every test that exercises the suspend fallback (e.g.
20+
* the `chat.handover` idle-timeout case) throws `ApiClientMissingError`
21+
* because `apiClientManager.clientOrThrow()` runs in a test process that
22+
* has no `TRIGGER_SECRET_KEY`.
23+
*
24+
* The promise resolves with `{ ok: false, error }` when the harness
25+
* aborts its run signal — that mimics production semantics (suspended
26+
* until something happens, returns cleanly on abort) without making a
27+
* network call.
28+
*/
29+
class TestSessionInputChannel extends SessionInputChannel {
30+
constructor(sessionId: string, private readonly getAbortSignal: () => AbortSignal | undefined) {
31+
super(sessionId);
32+
}
33+
34+
// Override only the `wait` path. `on` / `once` / `peek` / `send`
35+
// continue to flow through the real `sessionStreams` global, which
36+
// the mock task context installs as a `TestSessionStreamManager`.
37+
wait<T = unknown>(): ManualWaitpointPromise<T> {
38+
return new ManualWaitpointPromise<T>((resolve: (value: { ok: false; error: Error }) => void) => {
39+
const signal = this.getAbortSignal();
40+
if (!signal) {
41+
// Harness hasn't wired up its run signal yet — nothing to abort
42+
// on. Stay pending; the run loop should never reach this state
43+
// in practice but we don't want to throw here either.
44+
return;
45+
}
46+
const onAbort = () => {
47+
resolve({
48+
ok: false,
49+
error: new Error("session.in.wait() aborted by test harness"),
50+
});
51+
};
52+
if (signal.aborted) {
53+
onAbort();
54+
return;
55+
}
56+
signal.addEventListener("abort", onAbort, { once: true });
57+
});
58+
}
59+
}
60+
1661
/**
1762
* Per-session in-memory state collected from `.out` writes during a test.
1863
* Owned by the mock-chat-agent harness; updated by {@link TestSessionOutputChannel}.
@@ -201,16 +246,23 @@ export class TestSessionOutputChannel extends SessionOutputChannel {
201246

202247
/**
203248
* Construct a {@link SessionHandle} whose `.out` channel captures writes in
204-
* memory while `.in` reuses the real {@link SessionInputChannel} (which
205-
* routes through the `sessionStreams` global — the mock task context
206-
* installs a `TestSessionStreamManager` there).
249+
* memory and whose `.in` channel routes through the `sessionStreams`
250+
* global for record subscriptions (`on` / `once` / `peek`) but stubs
251+
* `wait()` to skip the apiClient round-trip — see
252+
* {@link TestSessionInputChannel}.
253+
*
254+
* `getAbortSignal` lets the channel observe the harness's run signal so
255+
* `wait()` resolves cleanly on close. Pass a getter (not the signal
256+
* directly) so the channel reads it lazily — the harness creates its
257+
* `AbortController` after the override is installed.
207258
*/
208259
export function createTestSessionHandle(
209260
sessionId: string,
210-
state: TestSessionOutState
261+
state: TestSessionOutState,
262+
getAbortSignal: () => AbortSignal | undefined = () => undefined
211263
): SessionHandle {
212264
return new SessionHandle(sessionId, {
213-
in: new SessionInputChannel(sessionId),
265+
in: new TestSessionInputChannel(sessionId, getAbortSignal),
214266
out: new TestSessionOutputChannel(sessionId, state),
215267
});
216268
}

0 commit comments

Comments
 (0)