Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 40 additions & 36 deletions src/watch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ export class Watch {
const signal = AbortSignal.any([controller.signal, timeoutSignal]);

const ctx = new RequestContext(watchURL.toString(), HttpMethod.GET);
await this.config.applySecurityAuthentication(ctx);

let doneCalled: boolean = false;
const doneCallOnce = (err: any) => {
Expand All @@ -59,45 +58,50 @@ export class Watch {
}
};

try {
const response = await fetch(watchURL, {
method: 'GET',
headers: ctx.getHeaders(),
dispatcher: ctx.getDispatcher(),
signal,
});
const startWatch = async (): Promise<void> => {
try {
await this.config.applySecurityAuthentication(ctx);

if (response.status === 200) {
const body = Readable.fromWeb(response.body! as any);
const response = await fetch(watchURL, {
method: 'GET',
headers: ctx.getHeaders(),
dispatcher: ctx.getDispatcher(),
signal,
});

body.on('error', doneCallOnce);
body.on('close', () => doneCallOnce(null));
body.on('finish', () => doneCallOnce(null));
if (response.status === 200) {
const body = Readable.fromWeb(response.body! as any);

const lines = createInterface(body);
lines.on('error', doneCallOnce);
lines.on('close', () => doneCallOnce(null));
lines.on('finish', () => doneCallOnce(null));
lines.on('line', (line) => {
try {
const data = JSON.parse(line.toString());
callback(data.type, data.object, data);
} catch {
// ignore parse errors
}
});
} else {
const statusText =
response.statusText || STATUS_CODES[response.status] || 'Internal Server Error';
const error = new Error(statusText) as Error & {
statusCode: number | undefined;
};
error.statusCode = response.status;
throw error;
body.on('error', doneCallOnce);
body.on('close', () => doneCallOnce(null));
body.on('finish', () => doneCallOnce(null));

const lines = createInterface(body);
lines.on('error', doneCallOnce);
lines.on('close', () => doneCallOnce(null));
lines.on('finish', () => doneCallOnce(null));
lines.on('line', (line) => {
try {
const data = JSON.parse(line.toString());
callback(data.type, data.object, data);
} catch {
// ignore parse errors
}
});
} else {
const statusText =
response.statusText || STATUS_CODES[response.status] || 'Internal Server Error';
const error = new Error(statusText) as Error & {
statusCode: number | undefined;
};
error.statusCode = response.status;
throw error;
}
} catch (err) {
doneCallOnce(err);
}
} catch (err) {
doneCallOnce(err);
}
};
startWatch().catch(doneCallOnce);

return controller;
}
Expand Down
49 changes: 47 additions & 2 deletions src/watch_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ describe('Watch', () => {

let doneCalled = false;
let doneErr: any;
let doneResolve!: () => void;
const donePromise = new Promise<void>((resolve) => {
doneResolve = resolve;
});

await watch.watch(
path,
Expand All @@ -73,8 +77,10 @@ describe('Watch', () => {
(err: any) => {
doneCalled = true;
doneErr = err;
doneResolve();
},
);
await donePromise;
strictEqual(doneCalled, true);
strictEqual(doneErr.toString(), 'Error: Internal Server Error');
mockAgent.assertNoPendingInterceptors();
Expand Down Expand Up @@ -170,7 +176,7 @@ describe('Watch', () => {
const watch = new Watch(kc);

let doneCalled = 0;
let doneResolve: () => void;
let doneResolve!: () => void;

const donePromise = new Promise<void>((resolve) => {
doneResolve = resolve;
Expand Down Expand Up @@ -364,7 +370,7 @@ describe('Watch', () => {

let doneErr: any;

let doneResolve: () => void;
let doneResolve!: () => void;
const donePromise = new Promise<void>((resolve) => {
doneResolve = resolve;
});
Expand All @@ -386,6 +392,45 @@ describe('Watch', () => {
strictEqual(doneErr.name, 'TimeoutError');
});

it('should return abort controller before receiving response data', async (t) => {
const kc = await setupMockSystem(t, (_req: any, _res: any) => {
// Intentionally do not write headers/body so fetch stays pending.
});
const watch = new Watch(kc);

let doneErr: any;

let doneResolve!: () => void;
const donePromise = new Promise<void>((resolve) => {
doneResolve = resolve;
});

const controllerPromise = watch.watch(
'/some/path/to/object',
{},
() => {
throw new Error('Unexpected data received');
},
(err: any) => {
doneErr = err;
doneResolve();
},
);

const controller = await Promise.race([
controllerPromise,
new Promise<AbortController>((_, reject) => {
setTimeout(() => {
reject(new Error('watch() did not return AbortController in time'));
}, 100);
}),
]);

controller.abort();
await donePromise;
strictEqual(doneErr?.name, 'AbortError');
});

it('should throw on empty config', async () => {
const kc = new KubeConfig();
const watch = new Watch(kc);
Expand Down