Skip to content

Commit 419dd21

Browse files
committed
use the new interval service everywhere
1 parent 1023d13 commit 419dd21

File tree

5 files changed

+25
-130
lines changed

5 files changed

+25
-130
lines changed

apps/supervisor/src/services/podCleaner.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { SimpleStructuredLogger } from "@trigger.dev/core/v3/utils/structuredLogger";
22
import { K8sApi } from "../clients/kubernetes.js";
33
import { createK8sApi } from "../clients/kubernetes.js";
4-
import { HeartbeatService } from "@trigger.dev/core/v3";
4+
import { IntervalService } from "@trigger.dev/core/v3";
55
import { Counter, Gauge, Registry } from "prom-client";
66
import { register } from "../metrics.js";
77

@@ -19,7 +19,7 @@ export class PodCleaner {
1919
private readonly namespace: string;
2020

2121
private readonly batchSize: number;
22-
private readonly deletionHeartbeat: HeartbeatService;
22+
private readonly deletionInterval: IntervalService;
2323

2424
// Metrics
2525
private readonly register: Registry;
@@ -32,10 +32,10 @@ export class PodCleaner {
3232
this.namespace = opts.namespace;
3333
this.batchSize = opts.batchSize ?? 500;
3434

35-
this.deletionHeartbeat = new HeartbeatService({
35+
this.deletionInterval = new IntervalService({
3636
intervalMs: opts.intervalMs ?? 10000,
3737
leadingEdge: true,
38-
heartbeat: this.deleteCompletedPods.bind(this),
38+
onInterval: this.deleteCompletedPods.bind(this),
3939
});
4040

4141
// Initialize metrics
@@ -57,11 +57,11 @@ export class PodCleaner {
5757
}
5858

5959
async start() {
60-
this.deletionHeartbeat.start();
60+
this.deletionInterval.start();
6161
}
6262

6363
async stop() {
64-
this.deletionHeartbeat.stop();
64+
this.deletionInterval.stop();
6565
}
6666

6767
private async deleteCompletedPods() {

apps/webapp/app/v3/authenticatedSocketConnection.server.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import {
22
clientWebsocketMessages,
3-
HeartbeatService,
3+
IntervalService,
44
serverWebsocketMessages,
55
} from "@trigger.dev/core/v3";
66
import { ZodMessageHandler, ZodMessageSender } from "@trigger.dev/core/v3/zodMessageHandler";
@@ -19,7 +19,7 @@ export class AuthenticatedSocketConnection {
1919
private _sender: ZodMessageSender<typeof serverWebsocketMessages>;
2020
private _consumer: DevQueueConsumer;
2121
private _messageHandler: ZodMessageHandler<typeof clientWebsocketMessages>;
22-
private _pingService: HeartbeatService;
22+
private _pingService: IntervalService;
2323

2424
constructor(
2525
public ws: WebSocket,
@@ -75,8 +75,8 @@ export class AuthenticatedSocketConnection {
7575
// });
7676
});
7777

78-
this._pingService = new HeartbeatService({
79-
heartbeat: async () => {
78+
this._pingService = new IntervalService({
79+
onInterval: async () => {
8080
if (ws.readyState !== WebSocket.OPEN) {
8181
logger.debug("[AuthenticatedSocketConnection] Websocket not open, skipping ping");
8282
return;

packages/cli-v3/src/entryPoints/dev-run-controller.ts

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import {
22
CompleteRunAttemptResult,
33
DequeuedMessage,
4-
HeartbeatService,
4+
IntervalService,
55
LogLevel,
66
RunExecutionData,
77
TaskRunExecution,
@@ -44,9 +44,9 @@ export class DevRunController {
4444
private taskRunProcess?: TaskRunProcess;
4545
private readonly worker: BackgroundWorker;
4646
private readonly httpClient: CliApiClient;
47-
private readonly runHeartbeat: HeartbeatService;
47+
private readonly runHeartbeat: IntervalService;
4848
private readonly heartbeatIntervalSeconds: number;
49-
private readonly snapshotPoller: HeartbeatService;
49+
private readonly snapshotPoller: IntervalService;
5050
private readonly snapshotPollIntervalSeconds: number;
5151

5252
private state:
@@ -78,8 +78,8 @@ export class DevRunController {
7878

7979
this.httpClient = opts.httpClient;
8080

81-
this.snapshotPoller = new HeartbeatService({
82-
heartbeat: async () => {
81+
this.snapshotPoller = new IntervalService({
82+
onInterval: async () => {
8383
if (!this.runFriendlyId) {
8484
logger.debug("[DevRunController] Skipping snapshot poll, no run ID");
8585
return;
@@ -121,8 +121,8 @@ export class DevRunController {
121121
},
122122
});
123123

124-
this.runHeartbeat = new HeartbeatService({
125-
heartbeat: async () => {
124+
this.runHeartbeat = new IntervalService({
125+
onInterval: async () => {
126126
if (!this.runFriendlyId || !this.snapshotFriendlyId) {
127127
logger.debug("[DevRunController] Skipping heartbeat, no run ID or snapshot ID");
128128
return;
Lines changed: 4 additions & 109 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { HeartbeatService, RunExecutionData } from "@trigger.dev/core/v3";
1+
import { IntervalService } from "@trigger.dev/core/v3";
22
import { WorkloadHttpClient } from "@trigger.dev/core/v3/runEngineWorker";
33
import { RunLogger } from "./logger.js";
44

@@ -17,7 +17,7 @@ export class RunExecutionHeartbeat {
1717
private readonly httpClient: WorkloadHttpClient;
1818
private readonly logger: RunLogger;
1919
private readonly heartbeatIntervalMs: number;
20-
private readonly heartbeat: HeartbeatService;
20+
private readonly heartbeat: IntervalService;
2121

2222
constructor(opts: RunExecutionHeartbeatOptions) {
2323
this.runFriendlyId = opts.runFriendlyId;
@@ -36,8 +36,8 @@ export class RunExecutionHeartbeat {
3636
},
3737
});
3838

39-
this.heartbeat = new HeartbeatService({
40-
heartbeat: async () => {
39+
this.heartbeat = new IntervalService({
40+
onInterval: async () => {
4141
this.logger.sendDebugLog({
4242
runId: this.runFriendlyId,
4343
message: "heartbeat: started",
@@ -90,108 +90,3 @@ export class RunExecutionHeartbeat {
9090
this.heartbeat.stop();
9191
}
9292
}
93-
94-
type RunExecutionSnapshotPollerOptions = {
95-
runFriendlyId: string;
96-
snapshotFriendlyId: string;
97-
httpClient: WorkloadHttpClient;
98-
logger: RunLogger;
99-
snapshotPollIntervalSeconds: number;
100-
handleSnapshotChange: (execution: RunExecutionData) => Promise<void>;
101-
};
102-
103-
class RunExecutionSnapshotPoller {
104-
private readonly logger: RunLogger;
105-
private readonly poller: HeartbeatService;
106-
private readonly httpClient: WorkloadHttpClient;
107-
108-
private readonly runFriendlyId: string;
109-
private readonly snapshotFriendlyId: string;
110-
111-
private readonly handleSnapshotChange: (execution: RunExecutionData) => Promise<void>;
112-
113-
constructor(opts: RunExecutionSnapshotPollerOptions) {
114-
this.logger = opts.logger;
115-
this.httpClient = opts.httpClient;
116-
117-
this.runFriendlyId = opts.runFriendlyId;
118-
this.snapshotFriendlyId = opts.snapshotFriendlyId;
119-
120-
this.handleSnapshotChange = opts.handleSnapshotChange;
121-
122-
this.poller = new HeartbeatService({
123-
heartbeat: async () => {
124-
if (!this.runFriendlyId) {
125-
this.logger.sendDebugLog({
126-
runId: this.runFriendlyId,
127-
message: "Skipping snapshot poll, no run ID",
128-
});
129-
return;
130-
}
131-
132-
this.logger.sendDebugLog({
133-
runId: this.runFriendlyId,
134-
message: "Polling for latest snapshot",
135-
});
136-
137-
this.logger.sendDebugLog({
138-
runId: this.runFriendlyId,
139-
message: `snapshot poll: started`,
140-
properties: {
141-
snapshotId: this.snapshotFriendlyId,
142-
},
143-
});
144-
145-
const response = await this.httpClient.getRunExecutionData(this.runFriendlyId);
146-
147-
if (!response.success) {
148-
this.logger.sendDebugLog({
149-
runId: this.runFriendlyId,
150-
message: "Snapshot poll failed",
151-
properties: {
152-
error: response.error,
153-
},
154-
});
155-
156-
this.logger.sendDebugLog({
157-
runId: this.runFriendlyId,
158-
message: `snapshot poll: failed`,
159-
properties: {
160-
snapshotId: this.snapshotFriendlyId,
161-
error: response.error,
162-
},
163-
});
164-
165-
return;
166-
}
167-
168-
await this.handleSnapshotChange(response.data.execution);
169-
},
170-
intervalMs: opts.snapshotPollIntervalSeconds * 1000,
171-
leadingEdge: false,
172-
onError: async (error) => {
173-
this.logger.sendDebugLog({
174-
runId: this.runFriendlyId,
175-
message: "Failed to poll for snapshot",
176-
properties: { error: error instanceof Error ? error.message : String(error) },
177-
});
178-
},
179-
});
180-
}
181-
182-
resetCurrentInterval() {
183-
this.poller.resetCurrentInterval();
184-
}
185-
186-
updateInterval(intervalMs: number) {
187-
this.poller.updateInterval(intervalMs);
188-
}
189-
190-
start() {
191-
this.poller.start();
192-
}
193-
194-
stop() {
195-
this.poller.stop();
196-
}
197-
}

packages/cli-v3/src/entryPoints/managed/poller.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { WorkloadHttpClient } from "@trigger.dev/core/v3/runEngineWorker";
22
import { RunLogger } from "./logger.js";
3-
import { HeartbeatService, RunExecutionData } from "@trigger.dev/core/v3";
3+
import { IntervalService, RunExecutionData } from "@trigger.dev/core/v3";
44

55
export type RunExecutionSnapshotPollerOptions = {
66
runFriendlyId: string;
@@ -19,7 +19,7 @@ export class RunExecutionSnapshotPoller {
1919
private readonly logger: RunLogger;
2020
private readonly snapshotPollIntervalMs: number;
2121
private readonly handleSnapshotChange: (runData: RunExecutionData) => Promise<void>;
22-
private readonly poller: HeartbeatService;
22+
private readonly poller: IntervalService;
2323

2424
constructor(opts: RunExecutionSnapshotPollerOptions) {
2525
this.runFriendlyId = opts.runFriendlyId;
@@ -39,8 +39,8 @@ export class RunExecutionSnapshotPoller {
3939
},
4040
});
4141

42-
this.poller = new HeartbeatService({
43-
heartbeat: async () => {
42+
this.poller = new IntervalService({
43+
onInterval: async () => {
4444
if (!this.runFriendlyId) {
4545
this.logger.sendDebugLog({
4646
runId: this.runFriendlyId,

0 commit comments

Comments
 (0)