Skip to content

Commit b4bb823

Browse files
committed
fix(batch): add batch queue back into master queue after visibility timeout
1 parent d416f34 commit b4bb823

File tree

3 files changed

+66
-24
lines changed

3 files changed

+66
-24
lines changed

packages/redis-worker/src/fair-queue/index.ts

Lines changed: 38 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -815,8 +815,14 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
815815
if (this.concurrencyManager) {
816816
const reserved = await this.concurrencyManager.reserve(descriptor, message.messageId);
817817
if (!reserved) {
818-
// Release message back to queue
819-
await this.visibilityManager.release(message.messageId, queueId, queueKey, queueItemsKey);
818+
// Release message back to queue (and ensure it's in master queue)
819+
await this.visibilityManager.release(
820+
message.messageId,
821+
queueId,
822+
queueKey,
823+
queueItemsKey,
824+
masterQueueKey
825+
);
820826
return false;
821827
}
822828
}
@@ -1056,8 +1062,14 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
10561062
if (this.concurrencyManager) {
10571063
const reserved = await this.concurrencyManager.reserve(descriptor, message.messageId);
10581064
if (!reserved) {
1059-
// Release message back to queue
1060-
await this.visibilityManager.release(message.messageId, queueId, queueKey, queueItemsKey);
1065+
// Release message back to queue (and ensure it's in master queue)
1066+
await this.visibilityManager.release(
1067+
message.messageId,
1068+
queueId,
1069+
queueKey,
1070+
queueItemsKey,
1071+
masterQueueKey
1072+
);
10611073
return false;
10621074
}
10631075
}
@@ -1105,11 +1117,14 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
11051117
try {
11061118
await this.concurrencyManager.release(descriptor, storedMessage.id);
11071119
} catch (releaseError) {
1108-
this.logger.error("Failed to release concurrency slot after payload validation failure", {
1109-
messageId: storedMessage.id,
1110-
queueId,
1111-
error: releaseError instanceof Error ? releaseError.message : String(releaseError),
1112-
});
1120+
this.logger.error(
1121+
"Failed to release concurrency slot after payload validation failure",
1122+
{
1123+
messageId: storedMessage.id,
1124+
queueId,
1125+
error: releaseError instanceof Error ? releaseError.message : String(releaseError),
1126+
}
1127+
);
11131128
}
11141129
}
11151130

@@ -1172,7 +1187,14 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
11721187
);
11731188
},
11741189
release: async () => {
1175-
await this.#releaseMessage(storedMessage, queueId, queueKey, queueItemsKey, descriptor);
1190+
await this.#releaseMessage(
1191+
storedMessage,
1192+
queueId,
1193+
queueKey,
1194+
queueItemsKey,
1195+
masterQueueKey,
1196+
descriptor
1197+
);
11761198
},
11771199
fail: async (error?: Error) => {
11781200
await this.#handleMessageFailure(
@@ -1251,14 +1273,16 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
12511273
queueId: string,
12521274
queueKey: string,
12531275
queueItemsKey: string,
1276+
masterQueueKey: string,
12541277
descriptor: QueueDescriptor
12551278
): Promise<void> {
1256-
// Release back to queue
1279+
// Release back to queue (and update master queue to ensure the queue is picked up)
12571280
await this.visibilityManager.release(
12581281
storedMessage.id,
12591282
queueId,
12601283
queueKey,
12611284
queueItemsKey,
1285+
masterQueueKey,
12621286
Date.now() // Put at back of queue
12631287
);
12641288

@@ -1302,12 +1326,13 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
13021326
attempt: storedMessage.attempt + 1,
13031327
};
13041328

1305-
// Release with delay
1329+
// Release with delay (and ensure queue is in master queue)
13061330
await this.visibilityManager.release(
13071331
storedMessage.id,
13081332
queueId,
13091333
queueKey,
13101334
queueItemsKey,
1335+
masterQueueKey,
13111336
Date.now() + nextDelay
13121337
);
13131338

@@ -1433,6 +1458,7 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
14331458
const reclaimed = await this.visibilityManager.reclaimTimedOut(shardId, (queueId) => ({
14341459
queueKey: this.keys.queueKey(queueId),
14351460
queueItemsKey: this.keys.queueItemsKey(queueId),
1461+
masterQueueKey: this.keys.masterQueueKey(this.masterQueue.getShardForQueue(queueId)),
14361462
}));
14371463

14381464
totalReclaimed += reclaimed;

packages/redis-worker/src/fair-queue/tests/raceConditions.test.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -446,6 +446,7 @@ describe("Race Condition Tests", () => {
446446
const reclaimed = await manager.reclaimTimedOut(0, (queueId) => ({
447447
queueKey: keys.queueKey(queueId),
448448
queueItemsKey: keys.queueItemsKey(queueId),
449+
masterQueueKey: keys.masterQueueKey(0),
449450
}));
450451
reclaimResults.push(reclaimed);
451452
}

packages/redis-worker/src/fair-queue/visibility.ts

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -143,11 +143,7 @@ export class VisibilityManager {
143143

144144
// Use Lua script to atomically check existence and update score
145145
// ZADD XX returns 0 even on successful updates, so we use a custom command
146-
const result = await this.redis.heartbeatMessage(
147-
inflightKey,
148-
member,
149-
newDeadline.toString()
150-
);
146+
const result = await this.redis.heartbeatMessage(inflightKey, member, newDeadline.toString());
151147

152148
const success = result === 1;
153149

@@ -187,13 +183,15 @@ export class VisibilityManager {
187183
* @param queueId - The queue ID
188184
* @param queueKey - The Redis key for the queue
189185
* @param queueItemsKey - The Redis key for the queue items hash
186+
* @param masterQueueKey - The Redis key for the master queue
190187
* @param score - Optional score for the message (defaults to now)
191188
*/
192189
async release<TPayload = unknown>(
193190
messageId: string,
194191
queueId: string,
195192
queueKey: string,
196193
queueItemsKey: string,
194+
masterQueueKey: string,
197195
score?: number
198196
): Promise<void> {
199197
const shardId = this.#getShardForQueue(queueId);
@@ -206,14 +204,17 @@ export class VisibilityManager {
206204
// 1. Get message data from in-flight
207205
// 2. Remove from in-flight
208206
// 3. Add back to queue
207+
// 4. Update master queue to ensure queue is picked up
209208
await this.redis.releaseMessage(
210209
inflightKey,
211210
inflightDataKey,
212211
queueKey,
213212
queueItemsKey,
213+
masterQueueKey,
214214
member,
215215
messageId,
216-
messageScore.toString()
216+
messageScore.toString(),
217+
queueId
217218
);
218219

219220
this.logger.debug("Message released", {
@@ -233,7 +234,11 @@ export class VisibilityManager {
233234
*/
234235
async reclaimTimedOut(
235236
shardId: number,
236-
getQueueKeys: (queueId: string) => { queueKey: string; queueItemsKey: string }
237+
getQueueKeys: (queueId: string) => {
238+
queueKey: string;
239+
queueItemsKey: string;
240+
masterQueueKey: string;
241+
}
237242
): Promise<number> {
238243
const inflightKey = this.keys.inflightKey(shardId);
239244
const inflightDataKey = this.keys.inflightDataKey(shardId);
@@ -259,7 +264,7 @@ export class VisibilityManager {
259264
continue;
260265
}
261266
const { messageId, queueId } = this.#parseMember(member);
262-
const { queueKey, queueItemsKey } = getQueueKeys(queueId);
267+
const { queueKey, queueItemsKey, masterQueueKey } = getQueueKeys(queueId);
263268

264269
try {
265270
// Re-add to queue with original score (or now if not available)
@@ -269,9 +274,11 @@ export class VisibilityManager {
269274
inflightDataKey,
270275
queueKey,
271276
queueItemsKey,
277+
masterQueueKey,
272278
member,
273279
messageId,
274-
score.toString()
280+
score.toString(),
281+
queueId
275282
);
276283

277284
reclaimed++;
@@ -431,18 +438,20 @@ return {messageId, payload}
431438
`,
432439
});
433440

434-
// Atomic release: remove from in-flight, add back to queue
441+
// Atomic release: remove from in-flight, add back to queue, update master queue
435442
this.redis.defineCommand("releaseMessage", {
436-
numberOfKeys: 4,
443+
numberOfKeys: 5,
437444
lua: `
438445
local inflightKey = KEYS[1]
439446
local inflightDataKey = KEYS[2]
440447
local queueKey = KEYS[3]
441448
local queueItemsKey = KEYS[4]
449+
local masterQueueKey = KEYS[5]
442450
443451
local member = ARGV[1]
444452
local messageId = ARGV[2]
445453
local score = tonumber(ARGV[3])
454+
local queueId = ARGV[4]
446455
447456
-- Get message data from in-flight
448457
local payload = redis.call('HGET', inflightDataKey, messageId)
@@ -459,6 +468,10 @@ redis.call('HDEL', inflightDataKey, messageId)
459468
redis.call('ZADD', queueKey, score, messageId)
460469
redis.call('HSET', queueItemsKey, messageId, payload)
461470
471+
-- Update master queue to ensure queue is picked up by consumers
472+
-- Use the message score so older messages get priority
473+
redis.call('ZADD', masterQueueKey, score, queueId)
474+
462475
return 1
463476
`,
464477
});
@@ -505,9 +518,11 @@ declare module "@internal/redis" {
505518
inflightDataKey: string,
506519
queueKey: string,
507520
queueItemsKey: string,
521+
masterQueueKey: string,
508522
member: string,
509523
messageId: string,
510-
score: string
524+
score: string,
525+
queueId: string
511526
): Promise<number>;
512527

513528
heartbeatMessage(inflightKey: string, member: string, newDeadline: string): Promise<number>;

0 commit comments

Comments
 (0)