Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 87 additions & 0 deletions packages/client/lib/commands/XCFGSET.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import { strict as assert } from 'node:assert';
import testUtils, { GLOBAL } from '../test-utils';
import XCFGSET from './XCFGSET';
import { parseArgs } from './generic-transformers';

describe('XCFGSET', () => {
describe('transformArguments', () => {
it('simple', () => {
assert.deepEqual(
parseArgs(XCFGSET, 'mystream'),
['XCFGSET', 'mystream']
);
});

it('with IDMP_DURATION', () => {
assert.deepEqual(
parseArgs(XCFGSET, 'mystream', {
IDMP_DURATION: 120
}),
['XCFGSET', 'mystream', 'IDMP-DURATION', '120']
);
});

it('with IDMP_MAXSIZE', () => {
assert.deepEqual(
parseArgs(XCFGSET, 'mystream', {
IDMP_MAXSIZE: 5000
}),
['XCFGSET', 'mystream', 'IDMP-MAXSIZE', '5000']
);
});

it('with IDMP_DURATION and IDMP_MAXSIZE', () => {
assert.deepEqual(
parseArgs(XCFGSET, 'mystream', {
IDMP_DURATION: 120,
IDMP_MAXSIZE: 5000
}),
['XCFGSET', 'mystream', 'IDMP-DURATION', '120', 'IDMP-MAXSIZE', '5000']
);
});
});

testUtils.testAll('xCfgSet with IDMP_DURATION', async client => {
await client.xAdd('key', '*', { field: 'value' });

assert.equal(
await client.xCfgSet('key', {
IDMP_DURATION: 60
}),
'OK'
);
}, {
client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 6] },
cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 6] }
});

testUtils.testAll('xCfgSet with IDMP_MAXSIZE', async client => {
await client.xAdd('key', '*', { field: 'value' });

assert.equal(
await client.xCfgSet('key', {
IDMP_MAXSIZE: 10000
}),
'OK'
);
}, {
client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 6] },
cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 6] }
});

testUtils.testAll('xCfgSet with IDMP_DURATION and IDMP_MAXSIZE', async client => {
await client.xAdd('key', '*', { field: 'value' });

assert.equal(
await client.xCfgSet('key', {
IDMP_DURATION: 120,
IDMP_MAXSIZE: 5000
}),
'OK'
);
}, {
client: { ...GLOBAL.SERVERS.OPEN, minimumDockerVersion: [8, 6] },
cluster: { ...GLOBAL.CLUSTERS.OPEN, minimumDockerVersion: [8, 6] }
});
});

62 changes: 62 additions & 0 deletions packages/client/lib/commands/XCFGSET.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import { CommandParser } from '../client/parser';
import { RedisArgument, SimpleStringReply, Command } from '../RESP/types';

/**
* Options for the XCFGSET command
*
* @property IDMP_DURATION - How long Redis remembers each iid in seconds (1-300 seconds)
* @property IDMP_MAXSIZE - Maximum number of iids Redis remembers per pid (1-1,000,000 iids)
*/
export interface XCfgSetOptions {
/**
* How long Redis remembers each iid in seconds.
* - Minimum value: 1 second
* - Maximum value: 300 seconds
* - Default: 100 seconds (or value set by stream-idmp-duration config parameter)
* - Operational guarantee: Redis won't forget an iid for this duration (unless maxsize is reached)
* - Should accommodate application crash recovery time
*/
IDMP_DURATION?: number;
/**
* Maximum number of iids Redis remembers per pid.
* - Minimum value: 1 iid
* - Maximum value: 1,000,000 (1M) iids
* - Default: 100 iids (or value set by stream-idmp-maxsize config parameter)
* - Should be set to: mark-delay [in msec] × (messages/msec) + margin
* - Example: 10K msgs/sec (10 msgs/msec), 80 msec mark-delay → maxsize = 10 × 80 + margin = 1000 iids
*/
IDMP_MAXSIZE?: number;
}

export default {
IS_READ_ONLY: false,
/**
* Configures the idempotency parameters for a stream's IDMP map.
* Sets how long Redis remembers each iid and the maximum number of iids to track.
* This command clears the existing IDMP map (Redis forgets all previously stored iids),
* but only if the configuration value actually changes.
*
* @param parser - The command parser
* @param key - The name of the stream
* @param options - Optional idempotency configuration parameters
* @returns 'OK' on success
*/
parseCommand(
parser: CommandParser,
key: RedisArgument,
options?: XCfgSetOptions
) {
parser.push('XCFGSET');
parser.pushKey(key);

if (options?.IDMP_DURATION !== undefined) {
parser.push('IDMP-DURATION', options.IDMP_DURATION.toString());
}

if (options?.IDMP_MAXSIZE !== undefined) {
parser.push('IDMP-MAXSIZE', options.IDMP_MAXSIZE.toString());
}
},
transformReply: undefined as unknown as () => SimpleStringReply<'OK'>
} as const satisfies Command;

3 changes: 3 additions & 0 deletions packages/client/lib/commands/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ import XAUTOCLAIM_JUSTID from './XAUTOCLAIM_JUSTID';
import XAUTOCLAIM from './XAUTOCLAIM';
import XCLAIM_JUSTID from './XCLAIM_JUSTID';
import XCLAIM from './XCLAIM';
import XCFGSET from './XCFGSET';
import XDEL from './XDEL';
import XDELEX from './XDELEX';
import XGROUP_CREATE from './XGROUP_CREATE';
Expand Down Expand Up @@ -966,6 +967,8 @@ export default {
xClaimJustId: XCLAIM_JUSTID,
XCLAIM,
xClaim: XCLAIM,
XCFGSET,
xCfgSet: XCFGSET,
XDEL,
xDel: XDEL,
XDELEX,
Expand Down
Loading