Skip to content

Commit b143027

Browse files
Fix/sdk stream root fallback (#2874)
1 parent 4093883 commit b143027

File tree

3 files changed

+71
-2
lines changed

3 files changed

+71
-2
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@trigger.dev/sdk": patch
3+
---
4+
5+
Aligned the SDK's `getRunIdForOptions` logic with the Core package to handle semantic targets (`root`, `parent`) in root tasks.
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
import { describe, it, expect, vi, beforeEach } from "vitest";
2+
import { streams } from "./streams.js";
3+
import { taskContext, realtimeStreams } from "@trigger.dev/core/v3";
4+
5+
vi.mock("@trigger.dev/core/v3", async (importOriginal) => {
6+
const original = await importOriginal<typeof import("@trigger.dev/core/v3")>();
7+
return {
8+
...original,
9+
taskContext: {
10+
ctx: {
11+
run: {
12+
id: "run_123",
13+
// parentTaskRunId and rootTaskRunId are undefined for root tasks
14+
},
15+
},
16+
},
17+
realtimeStreams: {
18+
pipe: vi.fn().mockReturnValue({
19+
wait: () => Promise.resolve(),
20+
stream: new ReadableStream(),
21+
}),
22+
},
23+
};
24+
});
25+
26+
describe("streams.pipe consistency", () => {
27+
beforeEach(() => {
28+
vi.clearAllMocks();
29+
});
30+
31+
it("should not throw and should use self runId when target is 'root' in a root task", async () => {
32+
const mockStream = new ReadableStream();
33+
34+
// This should not throw anymore
35+
const { waitUntilComplete } = streams.pipe("test-key", mockStream, {
36+
target: "root",
37+
});
38+
39+
expect(realtimeStreams.pipe).toHaveBeenCalledWith(
40+
"test-key",
41+
mockStream,
42+
expect.objectContaining({
43+
target: "run_123",
44+
})
45+
);
46+
});
47+
48+
it("should not throw and should use self runId when target is 'parent' in a root task", async () => {
49+
const mockStream = new ReadableStream();
50+
51+
// This should not throw anymore
52+
const { waitUntilComplete } = streams.pipe("test-key", mockStream, {
53+
target: "parent",
54+
});
55+
56+
expect(realtimeStreams.pipe).toHaveBeenCalledWith(
57+
"test-key",
58+
mockStream,
59+
expect.objectContaining({
60+
target: "run_123",
61+
})
62+
);
63+
});
64+
});

packages/trigger-sdk/src/v3/streams.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -665,11 +665,11 @@ export const streams = {
665665
function getRunIdForOptions(options?: RealtimeStreamOperationOptions): string | undefined {
666666
if (options?.target) {
667667
if (options.target === "parent") {
668-
return taskContext.ctx?.run?.parentTaskRunId;
668+
return taskContext.ctx?.run?.parentTaskRunId ?? taskContext.ctx?.run?.id;
669669
}
670670

671671
if (options.target === "root") {
672-
return taskContext.ctx?.run?.rootTaskRunId;
672+
return taskContext.ctx?.run?.rootTaskRunId ?? taskContext.ctx?.run?.id;
673673
}
674674

675675
if (options.target === "self") {

0 commit comments

Comments
 (0)