Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@datastream/github-workflows",
"version": "0.4.0",
"version": "0.5.0",
"private": true,
"engines": {
"node": ">=24.0"
Expand Down
2 changes: 1 addition & 1 deletion biome.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"$schema": "https://biomejs.dev/schemas/2.4.12/schema.json",
"$schema": "https://biomejs.dev/schemas/2.4.14/schema.json",
"vcs": {
"enabled": true,
"clientKind": "git",
Expand Down
2,243 changes: 1,307 additions & 936 deletions package-lock.json

Large diffs are not rendered by default.

9 changes: 4 additions & 5 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@datastream/monorepo",
"version": "0.4.0",
"version": "0.5.0",
"description": "Streams made easy.",
"private": true,
"type": "module",
Expand Down Expand Up @@ -67,9 +67,9 @@
"homepage": "https://datastream.js.org",
"devDependencies": {
"@biomejs/biome": "^2.0.0",
"@commitlint/cli": "^20.0.0",
"@commitlint/config-conventional": "^20.0.0",
"@types/node": "25.6.0",
"@commitlint/cli": "^21.0.0",
"@commitlint/config-conventional": "^21.0.0",
"@types/node": "^25.0.0",
"esbuild": "^0.28.0",
"fast-check": "^4.0.0",
"husky": "^9.0.0",
Expand All @@ -83,7 +83,6 @@
"@willfarrell-ds/cli": {
"mathjs": ">=15.2.0"
},
"fast-xml-parser": ">=5.5.12",
"license-check-and-add": {
"minimatch": "^10.2.5"
}
Expand Down
2 changes: 1 addition & 1 deletion packages/aws/README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<div align="center">
<h1>&lt;datastream&gt; `aws`</h1>
<img alt="datastream logo" src="https://raw.githubusercontent.com/willfarrell/datastream/main/docs/img/datastream-logo.svg"/>
<p><strong>AWS service streams for CloudWatch Logs, DynamoDB, Kinesis, Lambda, S3, SNS, and SQS.</strong></p>
<p><strong>AWS service streams for CloudWatch Logs, DynamoDB, DynamoDB Streams, Kinesis, Lambda, S3, SNS, and SQS.</strong></p>
<p>
<a href="https://github.com/willfarrell/datastream/actions/workflows/test-unit.yml"><img src="https://github.com/willfarrell/datastream/actions/workflows/test-unit.yml/badge.svg" alt="GitHub Actions unit test status"></a>
<a href="https://github.com/willfarrell/datastream/actions/workflows/test-dast.yml"><img src="https://github.com/willfarrell/datastream/actions/workflows/test-dast.yml/badge.svg" alt="GitHub Actions dast test status"></a>
Expand Down
18 changes: 18 additions & 0 deletions packages/aws/dynamodb-streams.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright 2026 will Farrell, and datastream contributors.
// SPDX-License-Identifier: MIT
import type { DatastreamReadable, StreamOptions } from "@datastream/core";

export function awsDynamoDBStreamsSetClient(
dynamoDBStreamsClient: unknown,
): void;

export function awsDynamoDBStreamsGetRecordsStream(
options: {
client?: unknown;
ShardIterator?: string;
pollingActive?: boolean;
pollingDelay?: number;
[key: string]: unknown;
},
streamOptions?: StreamOptions,
): Promise<DatastreamReadable>;
43 changes: 43 additions & 0 deletions packages/aws/dynamodb-streams.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright 2026 will Farrell, and datastream contributors.
// SPDX-License-Identifier: MIT
import {
DynamoDBStreamsClient,
GetRecordsCommand,
} from "@aws-sdk/client-dynamodb-streams";
import { awsClientDefaults } from "./client.js";

let client = new DynamoDBStreamsClient(awsClientDefaults);
export const awsDynamoDBStreamsSetClient = (dynamoDBStreamsClient) => {
client = dynamoDBStreamsClient;
};

export const awsDynamoDBStreamsGetRecordsStream = async (
options,
streamOptions = {},
) => {
const { pollingActive, pollingDelay = 1000, ...streamsOptions } = options;
async function* command(opts) {
let expectMore = true;
while (expectMore) {
const response = await client.send(new GetRecordsCommand(opts), {
abortSignal: streamOptions.signal,
});
const records = response.Records ?? [];
for (const item of records) {
yield item;
}
opts.ShardIterator = response.NextShardIterator;
expectMore =
opts.ShardIterator !== null && (pollingActive || records.length > 0);
if (pollingActive && records.length === 0 && pollingDelay > 0) {
await new Promise((resolve) => setTimeout(resolve, pollingDelay));
}
}
}
return command({ ...streamsOptions });
};

export default {
setClient: awsDynamoDBStreamsSetClient,
getRecordsStream: awsDynamoDBStreamsGetRecordsStream,
};
180 changes: 180 additions & 0 deletions packages/aws/dynamodb-streams.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
import { deepStrictEqual } from "node:assert";
import test from "node:test";
import {
DynamoDBStreamsClient,
GetRecordsCommand,
} from "@aws-sdk/client-dynamodb-streams";
import {
awsDynamoDBStreamsGetRecordsStream,
awsDynamoDBStreamsSetClient,
} from "@datastream/aws/dynamodb-streams";

import { streamToArray } from "@datastream/core";
import { mockClient } from "aws-sdk-client-mock";

let variant = "unknown";
for (const execArgv of process.execArgv) {
const flag = "--conditions=";
if (execArgv.includes(flag)) {
variant = execArgv.replace(flag, "");
}
}

test(`${variant}: awsDynamoDBStreamsGetRecordsStream should get chunk`, async (_t) => {
const client = mockClient(DynamoDBStreamsClient);
awsDynamoDBStreamsSetClient(client);

client
.on(GetRecordsCommand)
.resolvesOnce({
Records: [{ eventID: "1" }],
NextShardIterator: "iter2",
})
.resolvesOnce({
Records: [],
NextShardIterator: "iter3",
});

const options = { ShardIterator: "iter1" };
const stream = await awsDynamoDBStreamsGetRecordsStream(options);
const output = await streamToArray(stream);

deepStrictEqual(output, [{ eventID: "1" }]);
});

test(`${variant}: awsDynamoDBStreamsGetRecordsStream should handle empty Records (undefined)`, async (_t) => {
const client = mockClient(DynamoDBStreamsClient);
awsDynamoDBStreamsSetClient(client);

client.on(GetRecordsCommand).resolves({ NextShardIterator: "iter2" });

const options = { ShardIterator: "iter1" };
const stream = await awsDynamoDBStreamsGetRecordsStream(options);
const output = await streamToArray(stream);

deepStrictEqual(output, []);
});

test(`${variant}: awsDynamoDBStreamsGetRecordsStream should track NextShardIterator across calls`, async (_t) => {
const client = mockClient(DynamoDBStreamsClient);
awsDynamoDBStreamsSetClient(client);

client
.on(GetRecordsCommand)
.resolvesOnce({
Records: [{ eventID: "1" }],
NextShardIterator: "iter2",
})
.resolvesOnce({
Records: [{ eventID: "2" }],
NextShardIterator: "iter3",
})
.resolvesOnce({
Records: [],
NextShardIterator: "iter4",
});

const options = { ShardIterator: "iter1" };
const stream = await awsDynamoDBStreamsGetRecordsStream(options);
const output = await streamToArray(stream);

deepStrictEqual(output, [{ eventID: "1" }, { eventID: "2" }]);
});

test(`${variant}: awsDynamoDBStreamsGetRecordsStream should stop when NextShardIterator is null (closed shard)`, async (_t) => {
const client = mockClient(DynamoDBStreamsClient);
awsDynamoDBStreamsSetClient(client);

client.on(GetRecordsCommand).resolvesOnce({
Records: [{ eventID: "1" }],
NextShardIterator: null,
});

const options = { ShardIterator: "iter1" };
const stream = await awsDynamoDBStreamsGetRecordsStream(options);
const output = await streamToArray(stream);

deepStrictEqual(output, [{ eventID: "1" }]);
});

test(`${variant}: awsDynamoDBStreamsGetRecordsStream should keep polling with pollingActive option`, async (_t) => {
const client = mockClient(DynamoDBStreamsClient);
awsDynamoDBStreamsSetClient(client);

client
.on(GetRecordsCommand)
.resolvesOnce({ Records: [{ eventID: "1" }], NextShardIterator: "iter2" })
.resolvesOnce({ Records: [], NextShardIterator: "iter3" })
.resolvesOnce({ Records: [{ eventID: "2" }], NextShardIterator: "iter4" })
.resolves({ Records: [], NextShardIterator: "iter5" });

const options = {
ShardIterator: "iter1",
pollingActive: true,
pollingDelay: 0,
};
const stream = await awsDynamoDBStreamsGetRecordsStream(options);

const output = [];
for await (const item of stream) {
output.push(item);
if (output.length >= 2) break;
}

deepStrictEqual(output, [{ eventID: "1" }, { eventID: "2" }]);
});

test(`${variant}: awsDynamoDBStreamsGetRecordsStream should delay polling when pollingActive and no records`, async (t) => {
t.mock.timers.enable({ apis: ["setTimeout"] });

const client = mockClient(DynamoDBStreamsClient);
awsDynamoDBStreamsSetClient(client);

client
.on(GetRecordsCommand)
.resolvesOnce({ Records: [], NextShardIterator: "iter2" })
.resolvesOnce({ Records: [{ eventID: "1" }], NextShardIterator: "iter3" })
.resolves({ Records: [], NextShardIterator: "iter4" });

const options = {
ShardIterator: "iter1",
pollingActive: true,
pollingDelay: 1000,
};
const stream = await awsDynamoDBStreamsGetRecordsStream(options);

const output = [];
const consuming = (async () => {
for await (const item of stream) {
output.push(item);
if (output.length >= 1) break;
}
})();

await new Promise((resolve) => setImmediate(resolve));
t.mock.timers.tick(1000);

await consuming;

deepStrictEqual(output, [{ eventID: "1" }]);
});

// *** AbortSignal *** //
test(`${variant}: awsDynamoDBStreamsGetRecordsStream should pass signal to client`, async (_t) => {
const client = mockClient(DynamoDBStreamsClient);
awsDynamoDBStreamsSetClient(client);
client.on(GetRecordsCommand).resolves({
Records: [{ eventID: "1" }],
NextShardIterator: null,
});

const controller = new AbortController();
const options = { ShardIterator: "iter-1" };
const stream = await awsDynamoDBStreamsGetRecordsStream(options, {
signal: controller.signal,
});
await streamToArray(stream);

const calls = client.commandCalls(GetRecordsCommand);
deepStrictEqual(calls[0].args[1]?.abortSignal, controller.signal);
});
32 changes: 16 additions & 16 deletions packages/aws/index.d.ts
Original file line number Diff line number Diff line change
@@ -1,42 +1,42 @@
// Copyright 2026 will Farrell, and datastream contributors.
// SPDX-License-Identifier: MIT
export {
awsCloudWatchLogsGetLogEventsStream,
awsCloudWatchLogsFilterLogEventsStream,
awsCloudWatchLogsGetLogEventsStream,
awsCloudWatchLogsSetClient,
} from "@datastream/aws/cloudwatch-logs";
export {
awsKinesisGetRecordsStream,
awsKinesisPutRecordsStream,
awsKinesisSetClient,
} from "@datastream/aws/kinesis";
export {
awsS3GetObjectStream,
awsS3PutObjectStream,
awsS3ChecksumStream,
awsS3SetClient,
} from "@datastream/aws/s3";
export {
awsDynamoDBQueryStream,
awsDynamoDBScanStream,
awsDynamoDBDeleteItemStream,
awsDynamoDBExecuteStatementStream,
awsDynamoDBGetItemStream,
awsDynamoDBPutItemStream,
awsDynamoDBDeleteItemStream,
awsDynamoDBQueryStream,
awsDynamoDBScanStream,
awsDynamoDBSetClient,
} from "@datastream/aws/dynamodb";
export {
awsKinesisGetRecordsStream,
awsKinesisPutRecordsStream,
awsKinesisSetClient,
} from "@datastream/aws/kinesis";
export {
awsLambdaReadableStream,
awsLambdaResponseStream,
awsLambdaSetClient,
} from "@datastream/aws/lambda";
export {
awsS3ChecksumStream,
awsS3GetObjectStream,
awsS3PutObjectStream,
awsS3SetClient,
} from "@datastream/aws/s3";
export {
awsSNSPublishMessageStream,
awsSNSSetClient,
} from "@datastream/aws/sns";
export {
awsSQSDeleteMessageStream,
awsSQSReceiveMessageStream,
awsSQSSendMessageStream,
awsSQSDeleteMessageStream,
awsSQSSetClient,
} from "@datastream/aws/sqs";
1 change: 1 addition & 0 deletions packages/aws/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: MIT
export * from "@datastream/aws/cloudwatch-logs";
export * from "@datastream/aws/dynamodb";
export * from "@datastream/aws/dynamodb-streams";
export * from "@datastream/aws/kinesis";
export * from "@datastream/aws/lambda";
export * from "@datastream/aws/s3";
Expand Down
Loading
Loading