Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 59 additions & 2 deletions handwritten/pubsub/src/message-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import {Duration} from './temporal';
import {ExponentialRetry} from './exponential-retry';
import {DebugMessage} from './debug';
import {logs as baseLogs} from './logs';
import {randomUUID} from 'crypto';

/**
* Loggers. Exported for unit tests.
Expand Down Expand Up @@ -139,6 +140,9 @@ export class ChannelError extends Error implements grpc.ServiceError {
interface StreamTracked {
stream?: PullStream;
receivedStatus?: boolean;
lastPingTime?: number;
lastResponseTime?: number;
aliveTimer?: NodeJS.Timeout;
}

/**
Expand Down Expand Up @@ -200,7 +204,7 @@ export class MessageStream extends PassThrough {
*/
setStreamAckDeadline(deadline: Duration) {
const request: StreamingPullRequest = {
streamAckDeadlineSeconds: deadline.totalOf('second'),
streamAckDeadlineSeconds: deadline.seconds,
};

for (const tracker of this._streams) {
Expand All @@ -227,6 +231,9 @@ export class MessageStream extends PassThrough {

for (let i = 0; i < this._streams.length; i++) {
const tracker = this._streams[i];
if (tracker.aliveTimer) {
clearInterval(tracker.aliveTimer);
}
if (tracker.stream) {
this._removeStream(i, 'overall message stream destroyed', 'n/a');
}
Expand All @@ -253,6 +260,8 @@ export class MessageStream extends PassThrough {
const tracker = this._streams[index];
tracker.stream = stream;
tracker.receivedStatus = false;
tracker.lastResponseTime = Date.now();
this._setAliveTimer(index);

stream
.on('error', err => this._onError(index, err))
Expand All @@ -263,11 +272,51 @@ export class MessageStream extends PassThrough {
private _onData(index: number, data: PullResponse): void {
// Mark this stream as alive again. (reset backoff)
const tracker = this._streams[index];
tracker.lastResponseTime = Date.now();
this._retrier.reset(tracker);

this.emit('data', data);
}

private _clearAliveTimer(tracker: StreamTracked): void {
if (tracker.aliveTimer) {
clearInterval(tracker.aliveTimer);
tracker.aliveTimer = undefined;
}
}

private _checkAliveTimer(index: number): void {
const tracker = this._streams[index];
const now = Date.now();
const lastPingTime = tracker.lastPingTime ?? -1;
const lastResponseTime = tracker.lastResponseTime ?? 0;
if (lastPingTime <= lastResponseTime) {
return;
}

const elapsedSincePing = now - lastPingTime;

if (elapsedSincePing > 15000) {
this._removeStream(
index,
'no keepalive response from server within 15 seconds',
'will be retried',
);
this._retrier.retryLater(tracker, () =>
this._fillOne(index, undefined, 'retry'),
);
}
}

private _setAliveTimer(index: number): void {
const tracker = this._streams[index];
this._clearAliveTimer(tracker);

tracker.aliveTimer = setInterval(() => {
this._checkAliveTimer(index);
}, 10000);
}

/**
* Attempts to create and cache the desired number of StreamingPull requests.
* gRPC does not supply a way to confirm that a stream is connected, so our
Expand Down Expand Up @@ -347,6 +396,8 @@ export class MessageStream extends PassThrough {
maxOutstandingBytes: this._subscriber.useLegacyFlowControl
? 0
: this._subscriber.maxBytes,
clientId: randomUUID().toString(),
protocolVersion: 1, // Set protocol version to fulfill keepalive capabilities
};
const otherArgs = {
headers: {
Expand Down Expand Up @@ -386,12 +437,14 @@ export class MessageStream extends PassThrough {
'sending keepAlive to %i streams',
this._streams.length,
);
this._streams.forEach(tracker => {
this._streams.forEach((tracker, index) => {
// It's possible that a status event fires off (signaling the rpc being
// closed) but the stream hasn't drained yet. Writing to such a stream will
// result in a `write after end` error.
if (!tracker.receivedStatus && tracker.stream) {
tracker.stream.write({});
tracker.lastPingTime = Date.now();
this._setAliveTimer(index);
}
});
}
Expand Down Expand Up @@ -511,6 +564,10 @@ export class MessageStream extends PassThrough {
whatNext?: string,
): void {
const tracker = this._streams[index];
if (tracker.aliveTimer) {
clearInterval(tracker.aliveTimer);
tracker.aliveTimer = undefined;
}
if (tracker.stream) {
logs.subscriberStreams.info(
'closing stream %i; why: %s; next: %s',
Expand Down
60 changes: 60 additions & 0 deletions handwritten/pubsub/test/message-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,24 @@ describe('MessageStream', () => {
});

describe('keeping streams alive', () => {
it('should set protocolVersion in the initial packet', async () => {
// The special handling for messageStream and the spy below are
// so that we can test the initial message.
messageStream.destroy();

const spy = sandbox.spy(FakeGrpcStream.prototype, 'write');
const ms = new MessageStream(subscriber);
await ms.start();

assert.strictEqual(spy.callCount, 5);
const {args} = spy.firstCall;
const request = args[0] as any;

assert.strictEqual(String(request.protocolVersion), '1');

ms.destroy();
});

it('should keep the streams alive', () => {
const frequency = 30000;
const stubs = client.streams.map(stream => {
Expand All @@ -536,6 +554,48 @@ describe('MessageStream', () => {
assert.deepStrictEqual(data, {});
});
});

it('should close stream if no data received for 15 seconds after keepalive', async () => {
messageStream.destroy();
client.streams.length = 0;

const ms = new MessageStream(subscriber);
await ms.start();

const cancelSpies = client.streams.map(s => sandbox.spy(s, 'cancel'));

// wait for keepalive ping (30s) + 21s timeout to pass two 10s polling intervals
sandbox.clock.tick(51000);

cancelSpies.forEach(spy => {
assert.strictEqual(spy.callCount, 1);
});

ms.destroy();
});

it('should not close stream if data received within 15 seconds of keepalive', async () => {
messageStream.destroy();

const ms = new MessageStream(subscriber);
await ms.start();

const cancelSpies = client.streams.map(s => sandbox.spy(s, 'cancel'));

sandbox.clock.tick(30000);

// Simulating data prevents timeout
client.streams.forEach(s => s.emit('data', {}));

// Wait for two 10s polling intervals to pass
sandbox.clock.tick(21000);

cancelSpies.forEach(spy => {
assert.strictEqual(spy.callCount, 0);
});

ms.destroy();
});
});

it('should allow updating the ack deadline', async () => {
Expand Down
Loading