Skip to content

Commit afc8b08

Browse files
committed
release reclaimed messages in batches using redis pipelines
1 parent 96f3fb2 commit afc8b08

File tree

2 files changed

+36
-13
lines changed

2 files changed

+36
-13
lines changed

packages/redis-worker/src/fair-queue/concurrency.ts

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,30 @@ export class ConcurrencyManager {
106106
await pipeline.exec();
107107
}
108108

109+
/**
110+
* Release concurrency slots for multiple messages in a single pipeline.
111+
* More efficient than calling release() multiple times.
112+
*/
113+
async releaseBatch(
114+
messages: Array<{ queue: QueueDescriptor; messageId: string }>
115+
): Promise<void> {
116+
if (messages.length === 0) {
117+
return;
118+
}
119+
120+
const pipeline = this.redis.pipeline();
121+
122+
for (const { queue, messageId } of messages) {
123+
for (const group of this.groups) {
124+
const groupId = group.extractGroupId(queue);
125+
const key = this.keys.concurrencyKey(group.name, groupId);
126+
pipeline.srem(key, messageId);
127+
}
128+
}
129+
130+
await pipeline.exec();
131+
}
132+
109133
/**
110134
* Get current concurrency for a specific group.
111135
*/

packages/redis-worker/src/fair-queue/index.ts

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1349,27 +1349,26 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
13491349
masterQueueKey: this.keys.masterQueueKey(this.masterQueue.getShardForQueue(queueId)),
13501350
}));
13511351

1352-
// Release concurrency for each reclaimed message
1352+
// Release concurrency for all reclaimed messages in a single batch
13531353
// This is critical: when a message times out, its concurrency slot must be freed
13541354
// so the message can be processed again when it's re-claimed from the queue
13551355
if (this.concurrencyManager && reclaimedMessages.length > 0) {
1356-
for (const msg of reclaimedMessages) {
1357-
try {
1358-
await this.concurrencyManager.release(
1359-
{
1356+
try {
1357+
await this.concurrencyManager.releaseBatch(
1358+
reclaimedMessages.map((msg) => ({
1359+
queue: {
13601360
id: msg.queueId,
13611361
tenantId: msg.tenantId,
13621362
metadata: msg.metadata ?? {},
13631363
},
1364-
msg.messageId
1365-
);
1366-
} catch (error) {
1367-
this.logger.error("Failed to release concurrency for reclaimed message", {
13681364
messageId: msg.messageId,
1369-
queueId: msg.queueId,
1370-
error: error instanceof Error ? error.message : String(error),
1371-
});
1372-
}
1365+
}))
1366+
);
1367+
} catch (error) {
1368+
this.logger.error("Failed to release concurrency for reclaimed messages", {
1369+
count: reclaimedMessages.length,
1370+
error: error instanceof Error ? error.message : String(error),
1371+
});
13731372
}
13741373
}
13751374

0 commit comments

Comments
 (0)