Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 158 additions & 2 deletions packages/client/lib/commands/XADD.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,67 @@ describe('XADD', () => {
['XADD', 'key', 'MAXLEN', '~', '1000', 'LIMIT', '100', 'ACKED', '*', 'field', 'value']
);
});

it('with policy', () => {
assert.deepEqual(
parseArgs(XADD, 'key', '*', {
field: 'value'
}, {
policy: STREAM_DELETION_POLICY.KEEPREF
}),
['XADD', 'key', 'KEEPREF', '*', 'field', 'value']
);
});

it('with IDMPAUTO', () => {
assert.deepEqual(
parseArgs(XADD, 'key', '*', {
field: 'value'
}, {
IDMPAUTO: { pid: 'producer1' }
}),
['XADD', 'key', 'IDMPAUTO', 'producer1', '*', 'field', 'value']
);
});

it('with IDMP', () => {
assert.deepEqual(
parseArgs(XADD, 'key', '*', {
field: 'value'
}, {
IDMP: { pid: 'producer1', iid: '42' }
}),
['XADD', 'key', 'IDMP', 'producer1', '42', '*', 'field', 'value']
);
});

it('with policy and IDMPAUTO', () => {
assert.deepEqual(
parseArgs(XADD, 'key', '*', {
field: 'value'
}, {
policy: STREAM_DELETION_POLICY.DELREF,
IDMPAUTO: { pid: 'producer1' }
}),
['XADD', 'key', 'DELREF', 'IDMPAUTO', 'producer1', '*', 'field', 'value']
);
});

it('with policy, IDMP, and TRIM', () => {
assert.deepEqual(
parseArgs(XADD, 'key', '*', {
field: 'value'
}, {
policy: STREAM_DELETION_POLICY.ACKED,
IDMP: { pid: 'producer1', iid: 'msg123' },
TRIM: {
strategy: 'MAXLEN',
threshold: 1000
}
}),
['XADD', 'key', 'ACKED', 'IDMP', 'producer1', 'msg123', 'MAXLEN', '1000', '*', 'field', 'value']
);
});
});

testUtils.testAll('xAdd', async client => {
Expand All @@ -128,7 +189,7 @@ describe('XADD', () => {
'xAdd with TRIM policy',
async (client) => {
assert.equal(
typeof await client.xAdd('{tag}key', '*',
typeof await client.xAdd('{tag}key', '*',
{ field: 'value' },
{
TRIM: {
Expand All @@ -151,7 +212,7 @@ describe('XADD', () => {
'xAdd with all TRIM options',
async (client) => {
assert.equal(
typeof await client.xAdd('{tag}key2', '*',
typeof await client.xAdd('{tag}key2', '*',
{ field: 'value' },
{
TRIM: {
Expand All @@ -171,4 +232,99 @@ describe('XADD', () => {
cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 2] },
}
);

testUtils.testAll(
'xAdd with policy',
async (client) => {
assert.equal(
typeof await client.xAdd('{tag}key3', '*',
{ field: 'value' },
{
policy: STREAM_DELETION_POLICY.KEEPREF
}
),
'string'
);
},
{
client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 6] },
cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 6] },
}
);

testUtils.testAll(
'xAdd with IDMPAUTO',
async (client) => {
const id1 = await client.xAdd('{tag}key4', '*',
{ field1: 'value1', field2: 'value2' },
{
IDMPAUTO: { pid: 'producer1' }
}
);
assert.equal(typeof id1, 'string');

// Adding the same content with same producer should return the same ID (idempotent)
const id2 = await client.xAdd('{tag}key4', '*',
{ field1: 'value1', field2: 'value2' },
{
IDMPAUTO: { pid: 'producer1' }
}
);
assert.equal(id1, id2);
},
{
client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 6] },
cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 6] },
}
);

testUtils.testAll(
'xAdd with IDMP',
async (client) => {
const id1 = await client.xAdd('{tag}key5', '*',
{ field: 'value' },
{
IDMP: { pid: 'producer1', iid: '42' }
}
);
assert.equal(typeof id1, 'string');

// Adding with same producer and iid should return the same ID (idempotent)
const id2 = await client.xAdd('{tag}key5', '*',
{ field: 'value' },
{
IDMP: { pid: 'producer1', iid: '42' }
}
);
assert.equal(id1, id2);
},
{
client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 6] },
cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 6] },
}
);

testUtils.testAll(
'xAdd with policy, IDMP, and TRIM',
async (client) => {
assert.equal(
typeof await client.xAdd('{tag}key6', '*',
{ field: 'value' },
{
policy: STREAM_DELETION_POLICY.ACKED,
IDMP: { pid: 'producer1', iid: 'msg123' },
TRIM: {
strategy: 'MAXLEN',
threshold: 1000
}
}
),
'string'
);
},
{
client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 6] },
cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 6] },
}
);
});
38 changes: 34 additions & 4 deletions packages/client/lib/commands/XADD.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,13 @@ import { Tail } from './generic-transformers';

/**
* Options for the XADD command
*
*
* @property policy - Reference tracking policy for the entry (KEEPREF, DELREF, or ACKED) - added in 8.6
* @property IDMPAUTO - Automatically calculate an idempotent ID based on entry content to prevent duplicate entries - added in 8.6
* @property IDMPAUTO.pid - Producer ID, must be unique per producer and consistent across restarts
* @property IDMP - Use a specific idempotent ID to prevent duplicate entries - added in 8.6
* @property IDMP.pid - Producer ID, must be unique per producer and consistent across restarts
* @property IDMP.iid - Idempotent ID (binary string), must be unique per message and per pid
* @property TRIM - Optional trimming configuration
* @property TRIM.strategy - Trim strategy: MAXLEN (by length) or MINID (by ID)
* @property TRIM.strategyModifier - Exact ('=') or approximate ('~') trimming
Expand All @@ -14,25 +20,36 @@ import { Tail } from './generic-transformers';
* @property TRIM.policy - Policy to apply when trimming entries (optional, defaults to KEEPREF)
*/
export interface XAddOptions {
/** added in 8.6 */
policy?: StreamDeletionPolicy;
/** added in 8.6 */
IDMPAUTO?: {
pid: RedisArgument;
};
/** added in 8.6 */
IDMP?: {
pid: RedisArgument;
iid: RedisArgument;
};
TRIM?: {
strategy?: 'MAXLEN' | 'MINID';
strategyModifier?: '=' | '~';
threshold: number;
limit?: number;
/** added in 8.2 */
/** added in 8.6 */
policy?: StreamDeletionPolicy;
};
}

/**
* Parses arguments for the XADD command
*
* @param optional - Optional command modifier
* @param optional - Optional command modifier (e.g., NOMKSTREAM)
* @param parser - The command parser
* @param key - The stream key
* @param id - Message ID (* for auto-generation)
* @param message - Key-value pairs representing the message fields
* @param options - Additional options for stream trimming
* @param options - Additional options for reference tracking, idempotency, and trimming
*/
export function parseXAddArguments(
optional: RedisArgument | undefined,
Expand All @@ -48,6 +65,19 @@ export function parseXAddArguments(
parser.push(optional);
}

// Reference tracking policy (KEEPREF | DELREF | ACKED)
if (options?.policy) {
parser.push(options.policy);
}

// Idempotency options (IDMPAUTO or IDMP)
if (options?.IDMPAUTO) {
parser.push('IDMPAUTO', options.IDMPAUTO.pid);
} else if (options?.IDMP) {
parser.push('IDMP', options.IDMP.pid, options.IDMP.iid);
}

// Trimming options
if (options?.TRIM) {
if (options.TRIM.strategy) {
parser.push(options.TRIM.strategy);
Expand Down
Loading