Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,6 @@ These features are intended for implementing intra-service logic.
* `Connector` - connector for the S3 SDK
* `putObjectToS3` - stream steps to put an object in a bucket


## SNS Support
These features are intended for implementing intra-service logic.
* `fromSns` - creates a stream from an SNS topic
Expand All @@ -462,6 +461,14 @@ These features are intended for implementing intra-service logic. They are frequ
* `Connector` - connector for the SQS SDK
* `sendToSqs` - stream steps to send a message to a queue

## WebSocket Support
These features are intended for broadcasting events to connected WebSocket clients via API Gateway WebSocket API. Useful for lightweight real-time notifications (e.g. job completion, status updates) as an alternative to polling or SQS-based live data.
* `Connector` - connector for the API Gateway Management API SDK (`PostToConnectionCommand`, `GetConnectionCommand`, `DeleteConnectionCommand`)
* `publishToConnections` - stream steps to post a message to connected WebSocket clients
* `disconnectConnections` - stream steps to forcibly disconnect WebSocket clients
* `queryConnection` - query step to fetch connection metadata (identity, last active time) and adorn it to the `uow` as `getConnectionResponse`
* `broadcastToWebSocket` - a flavor that composes the full pipeline: filter on event type, format message via `rule.toMessage`, resolve target connections via `rule.toConnections`, fan out and publish to each connection

## Encryption Support
* https://github.com/jgilbert01/aws-lambda-stream/issues/20

Expand Down
8,582 changes: 5,246 additions & 3,336 deletions package-lock.json

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "aws-lambda-stream",
"version": "1.1.20",
"version": "1.2.0",
"description": "Create stream processors with AWS Lambda functions.",
"keywords": [
"aws",
Expand Down Expand Up @@ -64,6 +64,7 @@
"@aws-sdk/client-kinesis": "^3.450.0",
"@aws-sdk/client-kms": "^3.450.0",
"@aws-sdk/client-lambda": "^3.450.0",
"@aws-sdk/client-apigatewaymanagementapi": "^3.450.0",
"@aws-sdk/client-s3": "^3.450.0",
"@aws-sdk/client-secrets-manager": "^3.450.0",
"@aws-sdk/client-scheduler": "^3.450.0",
Expand Down
79 changes: 79 additions & 0 deletions src/connectors/apigatewayclient.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/* eslint import/no-extraneous-dependencies: ["error", {"devDependencies": true}] */

import Promise from 'bluebird';

import {
ApiGatewayManagementApiClient,
DeleteConnectionCommand,
GetConnectionCommand,
PostToConnectionCommand,
} from '@aws-sdk/client-apigatewaymanagementapi';
import { NodeHttpHandler } from '@smithy/node-http-handler';
import { ConfiguredRetryStrategy } from '@smithy/util-retry';
import { omit, pick } from 'lodash';
import { defaultBackoffDelay } from '../utils/retry';
import { defaultDebugLogger } from '../utils/log';

class Connector {
constructor({
debug,
pipelineId,
endpoint = process.env.WEBSOCKET_ENDPOINT,
timeout = Number(process.env.WEBSOCKET_TIMEOUT) || Number(process.env.TIMEOUT) || 1000,
additionalClientOpts = {},
...opt
}) {
this.debug = (msg) => debug('%j', msg);
this.endpoint = endpoint || 'undefined';
this.client = Connector.getClient(pipelineId, debug, timeout, endpoint, additionalClientOpts);
this.opt = opt;
}

static clients = {};

static getClient(pipelineId, debug, timeout, endpoint, additionalClientOpts) {
const addlRequestHandlerOpts = pick(additionalClientOpts, ['requestHandler']);
const addlClientOpts = omit(additionalClientOpts, ['requestHandler']);

if (!this.clients[pipelineId]) {
this.clients[pipelineId] = new ApiGatewayManagementApiClient({
endpoint,
requestHandler: new NodeHttpHandler({
requestTimeout: timeout,
connectionTimeout: timeout,
...addlRequestHandlerOpts,
}),
retryStrategy: new ConfiguredRetryStrategy(11, defaultBackoffDelay),
logger: defaultDebugLogger(debug),
...addlClientOpts,
});
}
return this.clients[pipelineId];
}

postToConnection(connectionId, data, ctx) {
const params = {
ConnectionId: connectionId,
Data: typeof data === 'string' ? data : JSON.stringify(data),
};

return this._sendCommand(new PostToConnectionCommand(params), ctx);
}

getConnection(connectionId, ctx) {
return this._sendCommand(new GetConnectionCommand({ ConnectionId: connectionId }), ctx);
}

deleteConnection(connectionId, ctx) {
return this._sendCommand(new DeleteConnectionCommand({ ConnectionId: connectionId }), ctx);
}

_sendCommand(command, ctx) {
this.opt.metrics?.capture(this.client, command, 'ws', this.opt, ctx);
return Promise.resolve(this.client.send(command))
.tap(this.debug)
.tapCatch(this.debug);
}
}

export default Connector;
1 change: 1 addition & 0 deletions src/connectors/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ export { default as SecretsMgrConnector } from './secretsmgr';
export { default as S3Connector } from './s3';
export { default as SnsConnector } from './sns';
export { default as SqsConnector } from './sqs';
export { default as ApiGatewayClientConnector } from './apigatewayclient';
1 change: 1 addition & 0 deletions src/flavors/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ export * from './materialize';
export * from './materializeS3';
export * from './sendMessages';
export * from './update';
export * from './websocket';
44 changes: 44 additions & 0 deletions src/flavors/websocket.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import _ from 'highland';

import {
printStartPipeline, printEndPipeline,
faulty, faultyAsyncStream, faultify,
} from '../utils';

import { publishToConnections } from '../sinks/websocket';
import { filterOnEventType, filterOnContent } from '../filters';

export const broadcastToWebSocket = (rule) => (s) => s // eslint-disable-line import/prefer-default-export
.filter(onEventType(rule))
.tap(printStartPipeline)

.filter(onContent(rule))

.map(toMessage(rule))
.parallel(rule.parallel || Number(process.env.PARALLEL) || 4)

.flatMap(toConnections(rule))

.through(publishToConnections(rule))

.tap(printEndPipeline);

const onEventType = (rule) => faulty((uow) => filterOnEventType(rule, uow));
const onContent = (rule) => faulty((uow) => filterOnContent(rule, uow));

const toMessage = (rule) => faultyAsyncStream(async (uow) => ({
...uow,
message: await faultify(rule.toMessage)(uow, rule),
}));

// rule.toConnections returns a promise resolving to an array of { connectionId }
// flatMap fans out the uow into one per connection
const toConnections = (rule) => (uow) => {
if (!rule.toConnections) return _([uow]);
const p = faultify(rule.toConnections)(uow, rule)
.then((connections) => connections.map((conn) => ({
...uow,
connectionId: conn.connectionId,
})));
return _(p).flatten();
};
1 change: 1 addition & 0 deletions src/queries/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ export * from './claimcheck';
export * from './dynamodb';
export * from './secretsmgr';
export * from './s3';
export * from './websocket';
36 changes: 36 additions & 0 deletions src/queries/websocket.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import _ from 'highland';

import Connector from '../connectors/apigatewayclient';

import { rejectWithFault } from '../utils/faults';
import { debug as d } from '../utils/print';
import { ratelimit } from '../utils/ratelimit';

export const queryConnection = ({ // eslint-disable-line import/prefer-default-export
id: pipelineId,
debug = d('ws'),
endpoint = process.env.WEBSOCKET_ENDPOINT,
getConnectionResponseField = 'getConnectionResponse',
parallel = Number(process.env.WEBSOCKET_PARALLEL) || Number(process.env.PARALLEL) || 8,
step = 'getConnection',
...opt
} = {}) => {
const connector = new Connector({
pipelineId, debug, endpoint, ...opt,
});

const get = (uow) => {
if (!uow.connectionId) return _(Promise.resolve(uow));

const p = () => connector.getConnection(uow.connectionId, uow)
.then((getConnectionResponse) => ({ ...uow, [getConnectionResponseField]: getConnectionResponse }))
.catch(rejectWithFault(uow));

return _(uow.metrics?.w(p, step) || /* istanbul ignore next */ p()); // wrap promise in a stream
};

return (s) => s
.through(ratelimit(opt))
.map(get)
.parallel(parallel);
};
1 change: 1 addition & 0 deletions src/sinks/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ export * from './eventbridge';
export * from './s3';
export * from './sns';
export * from './sqs';
export * from './websocket';
76 changes: 76 additions & 0 deletions src/sinks/websocket.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
import _ from 'highland';

import Connector from '../connectors/apigatewayclient';

import { rejectWithFault } from '../utils/faults';
import { debug as d } from '../utils/print';
import { ratelimit } from '../utils/ratelimit';

export const publishToConnections = ({
id: pipelineId,
debug = d('ws'),
endpoint = process.env.WEBSOCKET_ENDPOINT,
messageField = 'message',
parallel = Number(process.env.WEBSOCKET_PARALLEL) || Number(process.env.PARALLEL) || 8,
step = 'postToConnection',
...opt
} = {}) => {
const connector = new Connector({
pipelineId, debug, endpoint, ...opt,
});

const post = (uow) => {
if (!uow.connectionId || !uow[messageField]) return _(Promise.resolve(uow));

const p = () => connector.postToConnection(uow.connectionId, uow[messageField], uow)
.then((postResponse) => ({ ...uow, postResponse }))
.catch((err) => {
// 410 = connection is gone, clean up silently
if (err.statusCode === 410 || err.$metadata?.httpStatusCode === 410) {
return { ...uow, postResponse: { statusCode: 410, connectionId: uow.connectionId } };
}
return rejectWithFault(uow)(err);
});

return _(uow.metrics?.w(p, step) || /* istanbul ignore next */ p()); // wrap promise in a stream
};

return (s) => s
.through(ratelimit(opt))
.map(post)
.parallel(parallel);
};

export const disconnectConnections = ({
id: pipelineId,
debug = d('ws'),
endpoint = process.env.WEBSOCKET_ENDPOINT,
parallel = Number(process.env.WEBSOCKET_PARALLEL) || Number(process.env.PARALLEL) || 8,
step = 'deleteConnection',
...opt
} = {}) => {
const connector = new Connector({
pipelineId, debug, endpoint, ...opt,
});

const disconnect = (uow) => {
if (!uow.connectionId) return _(Promise.resolve(uow));

const p = () => connector.deleteConnection(uow.connectionId, uow)
.then((deleteResponse) => ({ ...uow, deleteResponse }))
.catch((err) => {
// 410 = connection already gone, clean up silently
if (err.statusCode === 410 || err.$metadata?.httpStatusCode === 410) {
return { ...uow, deleteResponse: { statusCode: 410, connectionId: uow.connectionId } };
}
return rejectWithFault(uow)(err);
});

return _(uow.metrics?.w(p, step) || /* istanbul ignore next */ p()); // wrap promise in a stream
};

return (s) => s
.through(ratelimit(opt))
.map(disconnect)
.parallel(parallel);
};
93 changes: 93 additions & 0 deletions test/unit/connectors/apigatewayclient.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import 'mocha';
import { expect } from 'chai';
import sinon from 'sinon';
import { mockClient } from 'aws-sdk-client-mock';
import {
ApiGatewayManagementApiClient,
DeleteConnectionCommand,
GetConnectionCommand,
PostToConnectionCommand,
} from '@aws-sdk/client-apigatewaymanagementapi';

import Connector from '../../../src/connectors/apigatewayclient';

import { debug } from '../../../src/utils';

describe('connectors/websocket.js', () => {
let mockWs = mockClient(ApiGatewayManagementApiClient);

beforeEach(() => {
mockWs = mockClient(ApiGatewayManagementApiClient);
});

afterEach(() => {
mockWs.restore();
});

it('should reuse client per pipeline', () => {
const client1 = Connector.getClient('ws-test1', debug('test'), 1000, 'https://test');
const client2 = Connector.getClient('ws-test1', debug('test'), 1000, 'https://test');
const client3 = Connector.getClient('ws-test2', debug('test'), 1000, 'https://test');

expect(client1).to.eq(client2);
expect(client2).to.not.eq(client3);
});

it('should post to connection with object data', async () => {
const spy = sinon.spy((_) => ({}));
mockWs.on(PostToConnectionCommand).callsFake(spy);

const data = await new Connector({
debug: debug('ws'),
endpoint: 'https://test.execute-api.us-west-2.amazonaws.com/dev',
}).postToConnection('conn-1', { type: 'thing-updated', data: { id: '1' } });

expect(spy).to.have.been.calledWith({
ConnectionId: 'conn-1',
Data: JSON.stringify({ type: 'thing-updated', data: { id: '1' } }),
});
expect(data).to.deep.equal({});
});

it('should post to connection with string data', async () => {
const spy = sinon.spy((_) => ({}));
mockWs.on(PostToConnectionCommand).callsFake(spy);

const data = await new Connector({
debug: debug('ws'),
endpoint: 'https://test.execute-api.us-west-2.amazonaws.com/dev',
}).postToConnection('conn-1', 'raw string');

expect(spy).to.have.been.calledWith({
ConnectionId: 'conn-1',
Data: 'raw string',
});
expect(data).to.deep.equal({});
});

it('should get connection', async () => {
const spy = sinon.spy((_) => ({ ConnectedAt: '2024-01-01T00:00:00Z', LastActiveAt: '2024-01-01T01:00:00Z' }));
mockWs.on(GetConnectionCommand).callsFake(spy);

const data = await new Connector({
debug: debug('ws'),
endpoint: 'https://test.execute-api.us-west-2.amazonaws.com/dev',
}).getConnection('conn-1');

expect(spy).to.have.been.calledWith({ ConnectionId: 'conn-1' });
expect(data).to.deep.equal({ ConnectedAt: '2024-01-01T00:00:00Z', LastActiveAt: '2024-01-01T01:00:00Z' });
});

it('should delete connection', async () => {
const spy = sinon.spy((_) => ({}));
mockWs.on(DeleteConnectionCommand).callsFake(spy);

const data = await new Connector({
debug: debug('ws'),
endpoint: 'https://test.execute-api.us-west-2.amazonaws.com/dev',
}).deleteConnection('conn-1');

expect(spy).to.have.been.calledWith({ ConnectionId: 'conn-1' });
expect(data).to.deep.equal({});
});
});
Loading