Skip to content

Commit 72594a4

Browse files
authored
fix(dashboard): properly cleanup trace pubsub redis clients to redis/memory/elu leaks in the dashboard (#2901)
1 parent 5504e7f commit 72594a4

File tree

4 files changed

+141
-102
lines changed

4 files changed

+141
-102
lines changed
Lines changed: 109 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
1-
import { TaskRun } from "@trigger.dev/database";
2-
import { eventStream } from "remix-utils/sse/server";
31
import { PrismaClient, prisma } from "~/db.server";
42
import { logger } from "~/services/logger.server";
3+
import { singleton } from "~/utils/singleton";
4+
import { createSSELoader } from "~/utils/sse";
55
import { throttle } from "~/utils/throttle";
66
import { tracePubSub } from "~/v3/services/tracePubSub.server";
77

8-
const pingInterval = 1000;
8+
const PING_INTERVAL = 1000;
9+
const STREAM_TIMEOUT = 30 * 1000; // 30 seconds
910

1011
export class RunStreamPresenter {
1112
#prismaClient: PrismaClient;
@@ -14,105 +15,126 @@ export class RunStreamPresenter {
1415
this.#prismaClient = prismaClient;
1516
}
1617

17-
public async call({
18-
request,
19-
runFriendlyId,
20-
}: {
21-
request: Request;
22-
runFriendlyId: TaskRun["friendlyId"];
23-
}) {
24-
const run = await this.#prismaClient.taskRun.findFirst({
25-
where: {
26-
friendlyId: runFriendlyId,
27-
},
28-
select: {
29-
traceId: true,
30-
},
31-
});
18+
public createLoader() {
19+
const prismaClient = this.#prismaClient;
3220

33-
if (!run) {
34-
return new Response("Not found", { status: 404 });
35-
}
21+
return createSSELoader({
22+
timeout: STREAM_TIMEOUT,
23+
interval: PING_INTERVAL,
24+
handler: async (context) => {
25+
const runFriendlyId = context.params.runParam;
3626

37-
logger.info("RunStreamPresenter.call", {
38-
runFriendlyId,
39-
traceId: run.traceId,
40-
});
41-
42-
let pinger: NodeJS.Timeout | undefined = undefined;
43-
44-
const { unsubscribe, eventEmitter } = await tracePubSub.subscribeToTrace(run.traceId);
45-
46-
return eventStream(request.signal, (send, close) => {
47-
const safeSend = (args: { event?: string; data: string }) => {
48-
try {
49-
send(args);
50-
} catch (error) {
51-
if (error instanceof Error) {
52-
if (error.name !== "TypeError") {
53-
logger.debug("Error sending SSE, aborting", {
54-
error: {
55-
name: error.name,
56-
message: error.message,
57-
stack: error.stack,
58-
},
59-
args,
60-
});
61-
}
62-
} else {
63-
logger.debug("Unknown error sending SSE, aborting", {
64-
error,
65-
args,
66-
});
67-
}
68-
69-
close();
27+
if (!runFriendlyId) {
28+
throw new Response("Missing runParam", { status: 400 });
7029
}
71-
};
7230

73-
const throttledSend = throttle(safeSend, 1000);
74-
75-
eventEmitter.addListener("message", (event) => {
76-
throttledSend({ data: event });
77-
});
31+
const run = await prismaClient.taskRun.findFirst({
32+
where: {
33+
friendlyId: runFriendlyId,
34+
},
35+
select: {
36+
traceId: true,
37+
},
38+
});
7839

79-
pinger = setInterval(() => {
80-
if (request.signal.aborted) {
81-
return close();
40+
if (!run) {
41+
throw new Response("Not found", { status: 404 });
8242
}
8343

84-
safeSend({ event: "ping", data: new Date().toISOString() });
85-
}, pingInterval);
86-
87-
return function clear() {
88-
logger.info("RunStreamPresenter.abort", {
44+
logger.info("RunStreamPresenter.start", {
8945
runFriendlyId,
9046
traceId: run.traceId,
9147
});
9248

93-
clearInterval(pinger);
94-
95-
eventEmitter.removeAllListeners();
49+
// Subscribe to trace updates
50+
const { unsubscribe, eventEmitter } = await tracePubSub.subscribeToTrace(run.traceId);
51+
52+
// Store throttled send function and message listener for cleanup
53+
let throttledSend: ReturnType<typeof throttle> | undefined;
54+
let messageListener: ((event: string) => void) | undefined;
55+
56+
return {
57+
initStream: ({ send }) => {
58+
// Create throttled send function
59+
throttledSend = throttle((args: { event?: string; data: string }) => {
60+
try {
61+
send(args);
62+
} catch (error) {
63+
if (error instanceof Error) {
64+
if (error.name !== "TypeError") {
65+
logger.debug("Error sending SSE in RunStreamPresenter", {
66+
error: {
67+
name: error.name,
68+
message: error.message,
69+
stack: error.stack,
70+
},
71+
});
72+
}
73+
}
74+
// Abort the stream on send error
75+
context.controller.abort("Send error");
76+
}
77+
}, 1000);
78+
79+
// Set up message listener for pub/sub events
80+
messageListener = (event: string) => {
81+
throttledSend?.({ data: event });
82+
};
83+
eventEmitter.addListener("message", messageListener);
84+
85+
context.debug("Subscribed to trace pub/sub");
86+
},
87+
88+
iterator: ({ send }) => {
89+
// Send ping to keep connection alive
90+
try {
91+
send({ event: "ping", data: new Date().toISOString() });
92+
} catch (error) {
93+
// If we can't send a ping, the connection is likely dead
94+
return false;
95+
}
96+
},
9697

97-
unsubscribe()
98-
.then(() => {
99-
logger.info("RunStreamPresenter.abort.unsubscribe succeeded", {
100-
runFriendlyId,
101-
traceId: run.traceId,
102-
});
103-
})
104-
.catch((error) => {
105-
logger.error("RunStreamPresenter.abort.unsubscribe failed", {
98+
cleanup: () => {
99+
logger.info("RunStreamPresenter.cleanup", {
106100
runFriendlyId,
107101
traceId: run.traceId,
108-
error: {
109-
name: error.name,
110-
message: error.message,
111-
stack: error.stack,
112-
},
113102
});
114-
});
115-
};
103+
104+
// Remove message listener
105+
if (messageListener) {
106+
eventEmitter.removeListener("message", messageListener);
107+
}
108+
eventEmitter.removeAllListeners();
109+
110+
// Unsubscribe from Redis pub/sub
111+
unsubscribe()
112+
.then(() => {
113+
logger.info("RunStreamPresenter.cleanup.unsubscribe succeeded", {
114+
runFriendlyId,
115+
traceId: run.traceId,
116+
});
117+
})
118+
.catch((error) => {
119+
logger.error("RunStreamPresenter.cleanup.unsubscribe failed", {
120+
runFriendlyId,
121+
traceId: run.traceId,
122+
error: {
123+
name: error.name,
124+
message: error.message,
125+
stack: error.stack,
126+
},
127+
});
128+
});
129+
},
130+
};
131+
},
116132
});
117133
}
118134
}
135+
136+
// Export a singleton loader for the route to use
137+
export const runStreamLoader = singleton("runStreamLoader", () => {
138+
const presenter = new RunStreamPresenter();
139+
return presenter.createLoader();
140+
});
Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,11 @@
11
import type { LoaderFunctionArgs } from "@remix-run/server-runtime";
2-
import { z } from "zod";
3-
import { RunStreamPresenter } from "~/presenters/v3/RunStreamPresenter.server";
2+
import { runStreamLoader } from "~/presenters/v3/RunStreamPresenter.server";
43
import { requireUserId } from "~/services/session.server";
54

6-
export async function loader({ request, params }: LoaderFunctionArgs) {
7-
await requireUserId(request);
5+
export async function loader(args: LoaderFunctionArgs) {
6+
// Authenticate the user before starting the stream
7+
await requireUserId(args.request);
88

9-
const { runParam } = z.object({ runParam: z.string() }).parse(params);
10-
11-
const presenter = new RunStreamPresenter();
12-
return presenter.call({ request, runFriendlyId: runParam });
9+
// Delegate to the SSE loader
10+
return runStreamLoader(args);
1311
}

apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,7 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder {
272272
await redis.quit().catch(console.error);
273273
}
274274

275-
signal.addEventListener("abort", cleanup);
275+
signal.addEventListener("abort", cleanup, { once: true });
276276

277277
return new Response(stream, {
278278
headers: {

apps/webapp/app/v3/services/tracePubSub.server.ts

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ import { createRedisClient, RedisClient, RedisWithClusterOptions } from "~/redis
22
import { EventEmitter } from "node:events";
33
import { env } from "~/env.server";
44
import { singleton } from "~/utils/singleton";
5+
import { Gauge } from "prom-client";
6+
import { metricsRegister } from "~/metrics.server";
57

68
export type TracePubSubOptions = {
79
redis: RedisWithClusterOptions;
@@ -15,7 +17,10 @@ export class TracePubSub {
1517
this._publisher = createRedisClient("trigger:eventRepoPublisher", this._options.redis);
1618
}
1719

18-
// TODO: do this more efficiently
20+
get subscriberCount() {
21+
return this._subscriberCount;
22+
}
23+
1924
async publish(traceIds: string[]) {
2025
if (traceIds.length === 0) return;
2126
const uniqueTraces = new Set(traceIds.map((e) => `events:${e}`));
@@ -40,15 +45,18 @@ export class TracePubSub {
4045

4146
const eventEmitter = new EventEmitter();
4247

43-
// Define the message handler.
44-
redis.on("message", (_, message) => {
48+
// Define the message handler - store reference so we can remove it later.
49+
const messageHandler = (_: string, message: string) => {
4550
eventEmitter.emit("message", message);
46-
});
51+
};
52+
redis.on("message", messageHandler);
4753

4854
// Return a function that can be used to unsubscribe.
4955
const unsubscribe = async () => {
56+
// Remove the message listener before closing the connection
57+
redis.off("message", messageHandler);
5058
await redis.unsubscribe(channel);
51-
redis.quit();
59+
await redis.quit();
5260
this._subscriberCount--;
5361
};
5462

@@ -62,7 +70,7 @@ export class TracePubSub {
6270
export const tracePubSub = singleton("tracePubSub", initializeTracePubSub);
6371

6472
function initializeTracePubSub() {
65-
return new TracePubSub({
73+
const pubSub = new TracePubSub({
6674
redis: {
6775
port: env.PUBSUB_REDIS_PORT,
6876
host: env.PUBSUB_REDIS_HOST,
@@ -72,4 +80,15 @@ function initializeTracePubSub() {
7280
clusterMode: env.PUBSUB_REDIS_CLUSTER_MODE_ENABLED === "1",
7381
},
7482
});
83+
84+
new Gauge({
85+
name: "trace_pub_sub_subscribers",
86+
help: "Number of trace pub sub subscribers",
87+
collect() {
88+
this.set(pubSub.subscriberCount);
89+
},
90+
registers: [metricsRegister],
91+
});
92+
93+
return pubSub;
7594
}

0 commit comments

Comments
 (0)