Skip to content

Commit 63424fa

Browse files
committed
feat(supervisor): add snapshot delay for compute path via timer wheel
delay compute snapshot requests to avoid wasted work on short-lived waitpoints (e.g. triggerAndWait resolving in <5s). configurable via COMPUTE_SNAPSHOT_DELAY_MS (default 5s).
1 parent 0edc308 commit 63424fa

File tree

5 files changed

+482
-13
lines changed

5 files changed

+482
-13
lines changed

apps/supervisor/src/env.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ const Env = z
8383
COMPUTE_GATEWAY_AUTH_TOKEN: z.string().optional(),
8484
COMPUTE_GATEWAY_TIMEOUT_MS: z.coerce.number().int().default(30_000),
8585
COMPUTE_SNAPSHOTS_ENABLED: BoolEnv.default(false),
86+
COMPUTE_SNAPSHOT_DELAY_MS: z.coerce.number().int().min(0).max(60_000).default(5_000),
8687

8788
// Kubernetes settings
8889
KUBERNETES_FORCE_ENABLED: BoolEnv.default(false),

apps/supervisor/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -429,6 +429,7 @@ class ManagedSupervisor {
429429

430430
async stop() {
431431
this.logger.log("Shutting down");
432+
await this.workloadServer.stop();
432433
await this.workerSession.stop();
433434

434435
// Optional services
Lines changed: 254 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,254 @@
1+
import { describe, it, expect, vi, beforeEach, afterEach } from "vitest";
2+
import { TimerWheel } from "./timerWheel.js";
3+
4+
describe("TimerWheel", () => {
5+
beforeEach(() => {
6+
vi.useFakeTimers();
7+
});
8+
9+
afterEach(() => {
10+
vi.useRealTimers();
11+
});
12+
13+
it("dispatches item after delay", () => {
14+
const dispatched: string[] = [];
15+
const wheel = new TimerWheel<string>({
16+
delayMs: 3000,
17+
onExpire: (item) => dispatched.push(item.key),
18+
});
19+
20+
wheel.start();
21+
wheel.submit("run-1", "snapshot-data");
22+
23+
// Not yet
24+
vi.advanceTimersByTime(2900);
25+
expect(dispatched).toEqual([]);
26+
27+
// After delay
28+
vi.advanceTimersByTime(200);
29+
expect(dispatched).toEqual(["run-1"]);
30+
31+
wheel.stop();
32+
});
33+
34+
it("cancels item before it fires", () => {
35+
const dispatched: string[] = [];
36+
const wheel = new TimerWheel<string>({
37+
delayMs: 3000,
38+
onExpire: (item) => dispatched.push(item.key),
39+
});
40+
41+
wheel.start();
42+
wheel.submit("run-1", "data");
43+
44+
vi.advanceTimersByTime(1000);
45+
expect(wheel.cancel("run-1")).toBe(true);
46+
47+
vi.advanceTimersByTime(5000);
48+
expect(dispatched).toEqual([]);
49+
expect(wheel.size).toBe(0);
50+
51+
wheel.stop();
52+
});
53+
54+
it("cancel returns false for unknown key", () => {
55+
const wheel = new TimerWheel<string>({
56+
delayMs: 3000,
57+
onExpire: () => {},
58+
});
59+
expect(wheel.cancel("nonexistent")).toBe(false);
60+
});
61+
62+
it("deduplicates: resubmitting same key replaces the entry", () => {
63+
const dispatched: { key: string; data: string }[] = [];
64+
const wheel = new TimerWheel<string>({
65+
delayMs: 3000,
66+
onExpire: (item) => dispatched.push({ key: item.key, data: item.data }),
67+
});
68+
69+
wheel.start();
70+
wheel.submit("run-1", "old-data");
71+
72+
vi.advanceTimersByTime(1000);
73+
wheel.submit("run-1", "new-data");
74+
75+
// Original would have fired at t=3000, but was replaced
76+
// New one fires at t=1000+3000=4000
77+
vi.advanceTimersByTime(2100);
78+
expect(dispatched).toEqual([]);
79+
80+
vi.advanceTimersByTime(1000);
81+
expect(dispatched).toEqual([{ key: "run-1", data: "new-data" }]);
82+
83+
wheel.stop();
84+
});
85+
86+
it("handles many concurrent items", () => {
87+
const dispatched: string[] = [];
88+
const wheel = new TimerWheel<string>({
89+
delayMs: 3000,
90+
onExpire: (item) => dispatched.push(item.key),
91+
});
92+
93+
wheel.start();
94+
95+
for (let i = 0; i < 1000; i++) {
96+
wheel.submit(`run-${i}`, `data-${i}`);
97+
}
98+
expect(wheel.size).toBe(1000);
99+
100+
vi.advanceTimersByTime(3100);
101+
expect(dispatched.length).toBe(1000);
102+
expect(wheel.size).toBe(0);
103+
104+
wheel.stop();
105+
});
106+
107+
it("handles items submitted at different times", () => {
108+
const dispatched: string[] = [];
109+
const wheel = new TimerWheel<string>({
110+
delayMs: 3000,
111+
onExpire: (item) => dispatched.push(item.key),
112+
});
113+
114+
wheel.start();
115+
116+
wheel.submit("run-1", "data");
117+
vi.advanceTimersByTime(1000);
118+
wheel.submit("run-2", "data");
119+
vi.advanceTimersByTime(1000);
120+
wheel.submit("run-3", "data");
121+
122+
// t=2000: nothing yet
123+
expect(dispatched).toEqual([]);
124+
125+
// t=3100: run-1 fires
126+
vi.advanceTimersByTime(1100);
127+
expect(dispatched).toEqual(["run-1"]);
128+
129+
// t=4100: run-2 fires
130+
vi.advanceTimersByTime(1000);
131+
expect(dispatched).toEqual(["run-1", "run-2"]);
132+
133+
// t=5100: run-3 fires
134+
vi.advanceTimersByTime(1000);
135+
expect(dispatched).toEqual(["run-1", "run-2", "run-3"]);
136+
137+
wheel.stop();
138+
});
139+
140+
it("setDelay changes delay for new items only", () => {
141+
const dispatched: string[] = [];
142+
const wheel = new TimerWheel<string>({
143+
delayMs: 3000,
144+
onExpire: (item) => dispatched.push(item.key),
145+
});
146+
147+
wheel.start();
148+
149+
wheel.submit("run-1", "data"); // 3s delay
150+
151+
vi.advanceTimersByTime(500);
152+
wheel.setDelay(1000);
153+
wheel.submit("run-2", "data"); // 1s delay
154+
155+
// t=1500: run-2 should have fired (submitted at t=500 with 1s delay)
156+
vi.advanceTimersByTime(1100);
157+
expect(dispatched).toEqual(["run-2"]);
158+
159+
// t=3100: run-1 fires at its original 3s delay
160+
vi.advanceTimersByTime(1500);
161+
expect(dispatched).toEqual(["run-2", "run-1"]);
162+
163+
wheel.stop();
164+
});
165+
166+
it("stop returns unprocessed items", () => {
167+
const dispatched: string[] = [];
168+
const wheel = new TimerWheel<string>({
169+
delayMs: 3000,
170+
onExpire: (item) => dispatched.push(item.key),
171+
});
172+
173+
wheel.start();
174+
wheel.submit("run-1", "data-1");
175+
wheel.submit("run-2", "data-2");
176+
wheel.submit("run-3", "data-3");
177+
178+
const remaining = wheel.stop();
179+
expect(dispatched).toEqual([]);
180+
expect(wheel.size).toBe(0);
181+
expect(remaining.length).toBe(3);
182+
expect(remaining.map((r) => r.key).sort()).toEqual(["run-1", "run-2", "run-3"]);
183+
expect(remaining.find((r) => r.key === "run-1")?.data).toBe("data-1");
184+
});
185+
186+
it("after stop, new submissions are silently dropped", () => {
187+
const dispatched: string[] = [];
188+
const wheel = new TimerWheel<string>({
189+
delayMs: 3000,
190+
onExpire: (item) => dispatched.push(item.key),
191+
});
192+
193+
wheel.start();
194+
wheel.stop();
195+
196+
wheel.submit("run-late", "data");
197+
expect(dispatched).toEqual([]);
198+
expect(wheel.size).toBe(0);
199+
});
200+
201+
it("tracks size correctly through submit/cancel/dispatch", () => {
202+
const wheel = new TimerWheel<string>({
203+
delayMs: 3000,
204+
onExpire: () => {},
205+
});
206+
207+
wheel.start();
208+
209+
wheel.submit("a", "data");
210+
wheel.submit("b", "data");
211+
expect(wheel.size).toBe(2);
212+
213+
wheel.cancel("a");
214+
expect(wheel.size).toBe(1);
215+
216+
vi.advanceTimersByTime(3100);
217+
expect(wheel.size).toBe(0);
218+
219+
wheel.stop();
220+
});
221+
222+
it("clamps delay to valid range", () => {
223+
const dispatched: string[] = [];
224+
225+
// Very small delay (should be at least 1 tick = 100ms)
226+
const wheel = new TimerWheel<string>({
227+
delayMs: 0,
228+
onExpire: (item) => dispatched.push(item.key),
229+
});
230+
231+
wheel.start();
232+
wheel.submit("run-1", "data");
233+
234+
vi.advanceTimersByTime(200);
235+
expect(dispatched).toEqual(["run-1"]);
236+
237+
wheel.stop();
238+
});
239+
240+
it("multiple cancel calls are safe", () => {
241+
const wheel = new TimerWheel<string>({
242+
delayMs: 3000,
243+
onExpire: () => {},
244+
});
245+
246+
wheel.start();
247+
wheel.submit("run-1", "data");
248+
249+
expect(wheel.cancel("run-1")).toBe(true);
250+
expect(wheel.cancel("run-1")).toBe(false);
251+
252+
wheel.stop();
253+
});
254+
});

0 commit comments

Comments
 (0)