Skip to content

Commit 61c087b

Browse files
committed
Fix resource leaks in DirectLineStreaming after end() is called
- Cancel pending timers on end() via AbortController-aware sleep() - Stop refreshToken() loop early on abort and after fatal 403 - Unsubscribe from connectionStatus$ in waitUntilOnline() on resolve/reject - Use cancellable sleep() for retry delays in connectWithRetryAsync() - Add unit tests covering all leak fixes (sync and async paths) Fixes #433
1 parent 18f369c commit 61c087b

File tree

2 files changed

+430
-5
lines changed

2 files changed

+430
-5
lines changed
Lines changed: 392 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,392 @@
1+
/**
2+
* Tests for leak fixes in DirectLineStreaming after end() is called.
3+
*
4+
* Covers:
5+
* 1. sleep() resolves immediately when end() is called (abort signal)
6+
* 2. refreshToken() exits early when end() is called (before and after sleep)
7+
* 3. refreshToken() returns after fatal 403 instead of continuing the loop
8+
* 4. connectWithRetryAsync() retry delay is cancellable via end()
9+
* 5. waitUntilOnline() unsubscribes from connectionStatus$ on resolve/reject
10+
*/
11+
12+
import { ConnectionStatus } from './directLine';
13+
14+
// We need to capture the WebSocketClient instances to control them in tests.
15+
let mockWebSocketClientInstances: MockWebSocketClient[] = [];
16+
17+
class MockWebSocketClient {
18+
#disconnectionHandler: ((reason?: string) => void) | undefined;
19+
#connectResolve: (() => void) | undefined;
20+
#connectReject: ((err: Error) => void) | undefined;
21+
22+
connect: jest.Mock;
23+
disconnect: jest.Mock;
24+
send: jest.Mock;
25+
26+
constructor(init: any) {
27+
this.#disconnectionHandler = init.disconnectionHandler;
28+
29+
this.connect = jest.fn(
30+
() =>
31+
new Promise<void>((resolve, reject) => {
32+
this.#connectResolve = resolve;
33+
this.#connectReject = reject;
34+
})
35+
);
36+
37+
this.disconnect = jest.fn(() => {
38+
this.#disconnectionHandler?.('disconnect() called');
39+
});
40+
41+
this.send = jest.fn(async () => ({
42+
statusCode: 200,
43+
streams: [
44+
{
45+
readAsString: async () =>
46+
JSON.stringify({ conversationId: 'conv-123' })
47+
}
48+
]
49+
}));
50+
51+
mockWebSocketClientInstances.push(this);
52+
}
53+
54+
// Test helpers to simulate connection lifecycle.
55+
__test__resolveConnect() {
56+
this.#connectResolve?.();
57+
}
58+
59+
__test__rejectConnect(err: Error) {
60+
this.#connectReject?.(err);
61+
}
62+
63+
__test__simulateDisconnect(reason?: string) {
64+
this.#disconnectionHandler?.(reason);
65+
}
66+
}
67+
68+
jest.mock('./streaming/WebSocketClientWithNetworkInformation', () => ({
69+
__esModule: true,
70+
default: function (...args: any[]) {
71+
return new MockWebSocketClient(args[0]);
72+
}
73+
}));
74+
75+
// Mock cross-fetch to prevent real network calls.
76+
// jest.mock is hoisted, so we use jest.fn() inline and retrieve it later.
77+
jest.mock('cross-fetch', () => ({
78+
__esModule: true,
79+
default: jest.fn()
80+
}));
81+
82+
// Import after mocks.
83+
import { DirectLineStreaming } from './directLineStreaming';
84+
import _mockFetchImport from 'cross-fetch';
85+
86+
const mockFetch = _mockFetchImport as unknown as jest.Mock;
87+
88+
beforeEach(() => {
89+
jest.useFakeTimers({ now: 0 });
90+
mockWebSocketClientInstances = [];
91+
mockFetch.mockReset();
92+
93+
// Default: token refresh returns 200.
94+
mockFetch.mockResolvedValue({
95+
ok: true,
96+
json: async () => ({ token: 'new-token' })
97+
});
98+
});
99+
100+
afterEach(() => {
101+
jest.useRealTimers();
102+
});
103+
104+
/**
105+
* Helper: creates a DirectLineStreaming instance, subscribes to activity$,
106+
* and drives the connection through to Online state.
107+
*/
108+
async function createAndConnect(): Promise<{
109+
directLine: DirectLineStreaming;
110+
client: MockWebSocketClient;
111+
}> {
112+
const directLine = new DirectLineStreaming({
113+
domain: 'https://test.bot',
114+
token: 'test-token'
115+
});
116+
117+
// Subscribe to activity$ to kick off the connection.
118+
directLine.activity$.subscribe({ next() {}, error() {}, complete() {} });
119+
120+
// Let microtasks flush so connectWithRetryAsync starts.
121+
await jest.advanceTimersByTimeAsync(0);
122+
123+
const client = mockWebSocketClientInstances[mockWebSocketClientInstances.length - 1];
124+
125+
// Simulate successful WebSocket connection.
126+
client.__test__resolveConnect();
127+
await jest.advanceTimersByTimeAsync(0);
128+
129+
return { directLine, client };
130+
}
131+
132+
// ---------------------------------------------------------------------------
133+
// 1. sleep() resolves immediately when _endAbortController is aborted
134+
// ---------------------------------------------------------------------------
135+
describe('sleep() abort on end()', () => {
136+
test('calling end() during refreshToken sleep should stop the token refresh loop (no dangling timer)', async () => {
137+
const { directLine, client } = await createAndConnect();
138+
139+
// At this point, refreshToken() is waiting for waitUntilOnline() which has resolved,
140+
// and then it sleeps for refreshTokenInterval (15 minutes).
141+
// Advance only partway through the 15min sleep.
142+
await jest.advanceTimersByTimeAsync(5 * 60 * 1000);
143+
144+
// Now call end(). This should abort the sleep immediately.
145+
directLine.end();
146+
147+
// Let microtasks complete.
148+
await jest.advanceTimersByTimeAsync(0);
149+
150+
// The refresh loop should have exited. No further fetch calls should be made.
151+
const fetchCallCountAfterEnd = mockFetch.mock.calls.length;
152+
153+
// Advance time way past when the next refresh would have happened.
154+
await jest.advanceTimersByTimeAsync(60 * 60 * 1000);
155+
156+
// No new fetch calls should have been made.
157+
expect(mockFetch.mock.calls.length).toBe(fetchCallCountAfterEnd);
158+
});
159+
});
160+
161+
// ---------------------------------------------------------------------------
162+
// 2. refreshToken() exits early when end() is called before/after sleep
163+
// ---------------------------------------------------------------------------
164+
describe('refreshToken() abort checks', () => {
165+
test('refreshToken should not make fetch calls after end() is called', async () => {
166+
const { directLine, client } = await createAndConnect();
167+
168+
// No refresh fetch yet (haven't advanced 15 minutes).
169+
const fetchCalls = mockFetch.mock.calls.filter(
170+
([url]: [string]) => typeof url === 'string' && url.includes('/tokens/refresh')
171+
);
172+
expect(fetchCalls).toHaveLength(0);
173+
174+
// Call end().
175+
directLine.end();
176+
await jest.advanceTimersByTimeAsync(0);
177+
178+
// Advance past the refresh interval.
179+
await jest.advanceTimersByTimeAsync(30 * 60 * 1000);
180+
181+
// There should be no refresh calls.
182+
const refreshCallsAfterEnd = mockFetch.mock.calls.filter(
183+
([url]: [string]) => typeof url === 'string' && url.includes('/tokens/refresh')
184+
);
185+
expect(refreshCallsAfterEnd).toHaveLength(0);
186+
});
187+
188+
test('refreshToken should stop after abort even between sleep and fetch', async () => {
189+
const { directLine, client } = await createAndConnect();
190+
191+
// Advance to just before the refresh sleep would end.
192+
await jest.advanceTimersByTimeAsync(15 * 60 * 1000 - 100);
193+
194+
// Call end() right before the sleep resolves.
195+
directLine.end();
196+
await jest.advanceTimersByTimeAsync(0);
197+
198+
// The sleep resolves immediately due to abort, then the abort check returns.
199+
// Advance more time.
200+
await jest.advanceTimersByTimeAsync(30 * 60 * 1000);
201+
202+
// No token refresh should have been attempted.
203+
const refreshCalls = mockFetch.mock.calls.filter(
204+
([url]: [string]) => typeof url === 'string' && url.includes('/tokens/refresh')
205+
);
206+
expect(refreshCalls).toHaveLength(0);
207+
});
208+
});
209+
210+
// ---------------------------------------------------------------------------
211+
// 3. refreshToken() returns after fatal 403
212+
// ---------------------------------------------------------------------------
213+
describe('refreshToken() on fatal 403', () => {
214+
test('should stop refresh loop and not continue retrying after 403', async () => {
215+
const { directLine, client } = await createAndConnect();
216+
217+
// Make the refresh endpoint return 403.
218+
mockFetch.mockResolvedValue({
219+
ok: false,
220+
status: 403,
221+
statusText: 'Forbidden',
222+
json: async () => ({})
223+
});
224+
225+
// Advance through the first refresh interval.
226+
await jest.advanceTimersByTimeAsync(15 * 60 * 1000);
227+
await jest.advanceTimersByTimeAsync(0);
228+
229+
// The first refresh attempt should have been made and returned 403.
230+
const refreshCalls = mockFetch.mock.calls.filter(
231+
([url]: [string]) => typeof url === 'string' && url.includes('/tokens/refresh')
232+
);
233+
expect(refreshCalls.length).toBeGreaterThanOrEqual(1);
234+
235+
const callCountAfter403 = mockFetch.mock.calls.length;
236+
237+
// Advance far past further refresh intervals.
238+
await jest.advanceTimersByTimeAsync(60 * 60 * 1000);
239+
240+
// No more refresh calls should be made (the loop returned after 403).
241+
expect(mockFetch.mock.calls.length).toBe(callCountAfter403);
242+
});
243+
});
244+
245+
// ---------------------------------------------------------------------------
246+
// 4. connectWithRetryAsync() retry delay is cancellable via end()
247+
// ---------------------------------------------------------------------------
248+
describe('connectWithRetryAsync() retry sleep cancellation', () => {
249+
test('calling end() during retry delay should stop reconnection attempts promptly', async () => {
250+
const { directLine, client } = await createAndConnect();
251+
252+
// Simulate a disconnect to trigger reconnection retries.
253+
client.__test__simulateDisconnect('test disconnect');
254+
await jest.advanceTimersByTimeAsync(0);
255+
256+
// The first retry should begin after a delay of 3-15 seconds.
257+
// Advance partway into the retry delay.
258+
await jest.advanceTimersByTimeAsync(1000);
259+
260+
// Call end() while waiting for the retry delay.
261+
directLine.end();
262+
await jest.advanceTimersByTimeAsync(0);
263+
264+
// Record the current state after end() has been processed.
265+
const clientCountAfterEnd = mockWebSocketClientInstances.length;
266+
267+
// Advance time far past any retry delays.
268+
await jest.advanceTimersByTimeAsync(60 * 1000);
269+
270+
// No additional WebSocket clients should have been created after end() settled.
271+
expect(mockWebSocketClientInstances.length).toBe(clientCountAfterEnd);
272+
});
273+
274+
test('calling end() should prevent further reconnection attempts', async () => {
275+
const { directLine, client } = await createAndConnect();
276+
277+
// Tick for 1 minute to make the connection "stable" (resets retry count).
278+
await jest.advanceTimersByTimeAsync(60_000);
279+
280+
const statusValues: ConnectionStatus[] = [];
281+
directLine.connectionStatus$.subscribe(s => statusValues.push(s));
282+
283+
// Simulate disconnect.
284+
client.__test__simulateDisconnect('test disconnect');
285+
await jest.advanceTimersByTimeAsync(0);
286+
287+
// Call end() immediately.
288+
directLine.end();
289+
await jest.advanceTimersByTimeAsync(0);
290+
291+
// Should observe Ended status.
292+
expect(statusValues).toContain(ConnectionStatus.Ended);
293+
294+
// Advance time past all possible retries.
295+
await jest.advanceTimersByTimeAsync(120_000);
296+
297+
const clientsAfterEnd = mockWebSocketClientInstances.length;
298+
299+
// No further connection attempts.
300+
await jest.advanceTimersByTimeAsync(120_000);
301+
expect(mockWebSocketClientInstances.length).toBe(clientsAfterEnd);
302+
});
303+
});
304+
305+
// ---------------------------------------------------------------------------
306+
// 5. waitUntilOnline() cleans up subscription
307+
// ---------------------------------------------------------------------------
308+
describe('waitUntilOnline() subscription cleanup', () => {
309+
test('should unsubscribe from connectionStatus$ after going online (async case)', async () => {
310+
const directLine = new DirectLineStreaming({
311+
domain: 'https://test.bot',
312+
token: 'test-token'
313+
});
314+
315+
// Subscribe to activity$ to kick off the connection.
316+
directLine.activity$.subscribe({ next() {}, error() {}, complete() {} });
317+
await jest.advanceTimersByTimeAsync(0);
318+
319+
const client = mockWebSocketClientInstances[mockWebSocketClientInstances.length - 1];
320+
321+
const observerCountBeforeOnline = (directLine.connectionStatus$ as any).observers.length;
322+
323+
// Simulate successful connection — this triggers Online and refreshToken's waitUntilOnline resolves.
324+
client.__test__resolveConnect();
325+
await jest.advanceTimersByTimeAsync(0);
326+
327+
// After going Online, the waitUntilOnline() subscription from refreshToken should be cleaned up.
328+
// Observer count should not have grown (the waitUntilOnline subscription was added then removed).
329+
const observerCountAfterOnline = (directLine.connectionStatus$ as any).observers.length;
330+
expect(observerCountAfterOnline).toBeLessThanOrEqual(observerCountBeforeOnline);
331+
332+
// Clean up.
333+
directLine.end();
334+
await jest.advanceTimersByTimeAsync(0);
335+
336+
// After end(), no observers should remain subscribed (connectionStatus$ completed).
337+
expect((directLine.connectionStatus$ as any).observers.length).toBe(0);
338+
});
339+
340+
test('should unsubscribe from connectionStatus$ when already online (synchronous case)', async () => {
341+
const { directLine } = await createAndConnect();
342+
343+
// Status is already Online. Calling waitUntilOnline() will subscribe to a BehaviorSubject
344+
// that synchronously emits Online. The subscription must still be cleaned up.
345+
const observerCountBefore = (directLine.connectionStatus$ as any).observers.length;
346+
347+
// Call waitUntilOnline() via refreshToken indirectly by triggering another refresh cycle.
348+
// Instead, we can test it more directly: force a second waitUntilOnline by disconnecting
349+
// and reconnecting, then checking observer count is stable.
350+
351+
// Disconnect and reconnect to trigger another waitUntilOnline() call in refreshToken.
352+
const client = mockWebSocketClientInstances[mockWebSocketClientInstances.length - 1];
353+
client.__test__simulateDisconnect('test');
354+
await jest.advanceTimersByTimeAsync(0);
355+
356+
// A new connection attempt is made. Simulate success.
357+
const newClient = mockWebSocketClientInstances[mockWebSocketClientInstances.length - 1];
358+
newClient.__test__resolveConnect();
359+
await jest.advanceTimersByTimeAsync(0);
360+
361+
// Now we're back Online. A new refreshToken waitUntilOnline() resolved synchronously
362+
// (status was briefly Connecting, then Online). Observer count should not have grown.
363+
const observerCountAfterReconnect = (directLine.connectionStatus$ as any).observers.length;
364+
expect(observerCountAfterReconnect).toBeLessThanOrEqual(observerCountBefore);
365+
366+
// Clean up.
367+
directLine.end();
368+
await jest.advanceTimersByTimeAsync(0);
369+
expect((directLine.connectionStatus$ as any).observers.length).toBe(0);
370+
});
371+
});
372+
373+
// ---------------------------------------------------------------------------
374+
// Integration: end() should not leave any pending timers
375+
// ---------------------------------------------------------------------------
376+
describe('end() cleanup integration', () => {
377+
test('after end(), advancing time should not trigger any activity', async () => {
378+
const { directLine, client } = await createAndConnect();
379+
380+
directLine.end();
381+
await jest.advanceTimersByTimeAsync(0);
382+
383+
const fetchCountAfterEnd = mockFetch.mock.calls.length;
384+
const clientCountAfterEnd = mockWebSocketClientInstances.length;
385+
386+
// Advance time by 2 hours - nothing should happen.
387+
await jest.advanceTimersByTimeAsync(2 * 60 * 60 * 1000);
388+
389+
expect(mockFetch.mock.calls.length).toBe(fetchCountAfterEnd);
390+
expect(mockWebSocketClientInstances.length).toBe(clientCountAfterEnd);
391+
});
392+
});

0 commit comments

Comments
 (0)