Skip to content

Commit b979984

Browse files
committed
fix(cloudflare): avoid repeated flush lock wrapping
1 parent e887b98 commit b979984

2 files changed

Lines changed: 99 additions & 12 deletions

File tree

packages/cloudflare/src/flush.ts

Lines changed: 63 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,17 @@ type FlushLock = {
77
readonly finalize: () => Promise<void>;
88
};
99

10+
type FlushLockRegistry = {
11+
readonly locks: Set<FlushLockInternal>;
12+
};
13+
14+
type FlushLockInternal = FlushLock & {
15+
readonly acquire: () => void;
16+
readonly release: () => void;
17+
};
18+
19+
const flushLockRegistries = new WeakMap<ExecutionContext['waitUntil'], FlushLockRegistry>();
20+
1021
/**
1122
* Enhances the given execution context by wrapping its `waitUntil` method with a proxy
1223
* to monitor pending tasks, and provides a flusher function to ensure all tasks
@@ -16,27 +27,69 @@ type FlushLock = {
1627
* @return {FlushLock} Returns a flusher function if a valid context is provided, otherwise undefined.
1728
*/
1829
export function makeFlushLock(context: ExecutionContext): FlushLock {
30+
const registry = getOrCreateFlushLockRegistry(context);
1931
let resolveAllDone: () => void = () => undefined;
2032
const allDone = new Promise<void>(res => {
2133
resolveAllDone = res;
2234
});
2335
let pending = 0;
36+
37+
const lock: FlushLockInternal = {
38+
ready: allDone,
39+
acquire: () => {
40+
pending++;
41+
},
42+
release: () => {
43+
if (--pending === 0) {
44+
registry.locks.delete(lock);
45+
resolveAllDone();
46+
}
47+
},
48+
finalize: () => {
49+
if (pending === 0) {
50+
registry.locks.delete(lock);
51+
resolveAllDone();
52+
}
53+
return allDone;
54+
},
55+
};
56+
57+
registry.locks.add(lock);
58+
return Object.freeze(lock);
59+
}
60+
61+
function getOrCreateFlushLockRegistry(context: ExecutionContext): FlushLockRegistry {
62+
// eslint-disable-next-line @typescript-eslint/unbound-method
63+
const waitUntil = context.waitUntil;
64+
const existingRegistry = flushLockRegistries.get(waitUntil);
65+
66+
if (existingRegistry) {
67+
return existingRegistry;
68+
}
69+
70+
const registry: FlushLockRegistry = { locks: new Set() };
2471
const originalWaitUntil = context.waitUntil.bind(context) as typeof context.waitUntil;
25-
context.waitUntil = promise => {
26-
pending++;
72+
const instrumentedWaitUntil: typeof context.waitUntil = promise => {
73+
// Snapshot active locks so locks created after this call do not wait for earlier waitUntil work.
74+
const locks = [...registry.locks];
75+
76+
for (const lock of locks) {
77+
lock.acquire();
78+
}
79+
2780
return originalWaitUntil(
2881
promise.finally(() => {
29-
if (--pending === 0) resolveAllDone();
82+
for (const lock of locks) {
83+
lock.release();
84+
}
3085
}),
3186
);
3287
};
33-
return Object.freeze({
34-
ready: allDone,
35-
finalize: () => {
36-
if (pending === 0) resolveAllDone();
37-
return allDone;
38-
},
39-
});
88+
89+
flushLockRegistries.set(instrumentedWaitUntil, registry);
90+
context.waitUntil = instrumentedWaitUntil;
91+
92+
return registry;
4093
}
4194

4295
/**

packages/cloudflare/test/flush.test.ts

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,12 @@ describe('Flush buffer test', () => {
3030
await expect(lock.ready).resolves.toBeUndefined();
3131
});
3232

33-
it('does not grow the waitUntil wrapper stack on repeated flush lock creation', () => {
33+
it('does not grow the waitUntil wrapper stack on repeated flush lock creation', async () => {
34+
const waitUntilPromises: Promise<void>[] = [];
3435
const context: ExecutionContext = {
35-
waitUntil: vi.fn(),
36+
waitUntil: vi.fn(promise => {
37+
waitUntilPromises.push(promise);
38+
}),
3639
passThroughOnException: vi.fn(),
3740
};
3841

@@ -41,6 +44,37 @@ describe('Flush buffer test', () => {
4144
}
4245

4346
expect(() => context.waitUntil(Promise.resolve())).not.toThrow();
47+
await Promise.all(waitUntilPromises);
48+
});
49+
50+
it('creates a fresh flush lock when waitUntil was already instrumented', async () => {
51+
const waitUntilPromises: Promise<void>[] = [];
52+
const context: ExecutionContext = {
53+
waitUntil: vi.fn(promise => {
54+
waitUntilPromises.push(promise);
55+
}),
56+
passThroughOnException: vi.fn(),
57+
};
58+
59+
const firstLock = makeFlushLock(context);
60+
await firstLock.finalize();
61+
62+
let resolveWaitUntil!: () => void;
63+
const secondTask = new Promise<void>(resolve => {
64+
resolveWaitUntil = resolve;
65+
});
66+
const secondLock = makeFlushLock(context);
67+
68+
context.waitUntil(secondTask);
69+
void secondLock.finalize();
70+
71+
await Promise.resolve();
72+
expect(waitUntilPromises).toHaveLength(1);
73+
await expect(Promise.race([secondLock.ready, Promise.resolve('pending')])).resolves.toBe('pending');
74+
75+
resolveWaitUntil();
76+
await Promise.all(waitUntilPromises);
77+
await expect(secondLock.ready).resolves.toBeUndefined();
4478
});
4579
});
4680

0 commit comments

Comments
 (0)