Skip to content

Commit 90e5615

Browse files
committed
fix(fair-queue): ensure concurrency is released when a message reaches visibility timeout to prevent concurrency leaks
1 parent fe5178f commit 90e5615

File tree

5 files changed

+258
-11
lines changed

5 files changed

+258
-11
lines changed

packages/redis-worker/src/fair-queue/index.ts

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1343,13 +1343,37 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
13431343
let totalReclaimed = 0;
13441344

13451345
for (let shardId = 0; shardId < this.shardCount; shardId++) {
1346-
const reclaimed = await this.visibilityManager.reclaimTimedOut(shardId, (queueId) => ({
1346+
const reclaimedMessages = await this.visibilityManager.reclaimTimedOut(shardId, (queueId) => ({
13471347
queueKey: this.keys.queueKey(queueId),
13481348
queueItemsKey: this.keys.queueItemsKey(queueId),
13491349
masterQueueKey: this.keys.masterQueueKey(this.masterQueue.getShardForQueue(queueId)),
13501350
}));
13511351

1352-
totalReclaimed += reclaimed;
1352+
// Release concurrency for each reclaimed message
1353+
// This is critical: when a message times out, its concurrency slot must be freed
1354+
// so the message can be processed again when it's re-claimed from the queue
1355+
if (this.concurrencyManager && reclaimedMessages.length > 0) {
1356+
for (const msg of reclaimedMessages) {
1357+
try {
1358+
await this.concurrencyManager.release(
1359+
{
1360+
id: msg.queueId,
1361+
tenantId: msg.tenantId,
1362+
metadata: msg.metadata ?? {},
1363+
},
1364+
msg.messageId
1365+
);
1366+
} catch (error) {
1367+
this.logger.error("Failed to release concurrency for reclaimed message", {
1368+
messageId: msg.messageId,
1369+
queueId: msg.queueId,
1370+
error: error instanceof Error ? error.message : String(error),
1371+
});
1372+
}
1373+
}
1374+
}
1375+
1376+
totalReclaimed += reclaimedMessages.length;
13531377
}
13541378

13551379
if (totalReclaimed > 0) {

packages/redis-worker/src/fair-queue/tests/raceConditions.test.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -624,12 +624,12 @@ describe("Race Condition Tests", () => {
624624
await new Promise((resolve) => setTimeout(resolve, 300));
625625

626626
// Try to reclaim (should find nothing because heartbeat extended the deadline)
627-
const reclaimed = await manager.reclaimTimedOut(0, (queueId) => ({
627+
const reclaimedMessages = await manager.reclaimTimedOut(0, (queueId) => ({
628628
queueKey: keys.queueKey(queueId),
629629
queueItemsKey: keys.queueItemsKey(queueId),
630630
masterQueueKey: keys.masterQueueKey(0),
631631
}));
632-
reclaimResults.push(reclaimed);
632+
reclaimResults.push(reclaimedMessages.length);
633633
}
634634

635635
// Heartbeats should have kept the message alive

packages/redis-worker/src/fair-queue/tests/visibility.test.ts

Lines changed: 180 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { describe, expect } from "vitest";
22
import { redisTest } from "@internal/testcontainers";
33
import { createRedisClient } from "@internal/redis";
44
import { VisibilityManager, DefaultFairQueueKeyProducer } from "../index.js";
5-
import type { FairQueueKeyProducer } from "../types.js";
5+
import type { FairQueueKeyProducer, ReclaimedMessageInfo } from "../types.js";
66

77
describe("VisibilityManager", () => {
88
let keys: FairQueueKeyProducer;
@@ -597,5 +597,184 @@ describe("VisibilityManager", () => {
597597
}
598598
);
599599
});
600+
601+
describe("reclaimTimedOut", () => {
602+
redisTest(
603+
"should return reclaimed message info with tenantId for concurrency release",
604+
{ timeout: 10000 },
605+
async ({ redisOptions }) => {
606+
keys = new DefaultFairQueueKeyProducer({ prefix: "test" });
607+
608+
const manager = new VisibilityManager({
609+
redis: redisOptions,
610+
keys,
611+
shardCount: 1,
612+
defaultTimeoutMs: 100, // Very short timeout
613+
});
614+
615+
const redis = createRedisClient(redisOptions);
616+
const queueId = "tenant:t1:queue:reclaim-test";
617+
const queueKey = keys.queueKey(queueId);
618+
const queueItemsKey = keys.queueItemsKey(queueId);
619+
const masterQueueKey = keys.masterQueueKey(0);
620+
621+
// Add and claim a message
622+
const messageId = "reclaim-msg";
623+
const storedMessage = {
624+
id: messageId,
625+
queueId,
626+
tenantId: "t1",
627+
payload: { id: 1, value: "test" },
628+
timestamp: Date.now() - 1000,
629+
attempt: 1,
630+
metadata: { orgId: "org-123" },
631+
};
632+
633+
await redis.zadd(queueKey, storedMessage.timestamp, messageId);
634+
await redis.hset(queueItemsKey, messageId, JSON.stringify(storedMessage));
635+
636+
// Claim with very short timeout
637+
const claimResult = await manager.claim(queueId, queueKey, queueItemsKey, "consumer-1", 100);
638+
expect(claimResult.claimed).toBe(true);
639+
640+
// Wait for timeout to expire
641+
await new Promise((resolve) => setTimeout(resolve, 150));
642+
643+
// Reclaim should return the message info
644+
const reclaimedMessages = await manager.reclaimTimedOut(0, (qId) => ({
645+
queueKey: keys.queueKey(qId),
646+
queueItemsKey: keys.queueItemsKey(qId),
647+
masterQueueKey,
648+
}));
649+
650+
expect(reclaimedMessages).toHaveLength(1);
651+
expect(reclaimedMessages[0]).toEqual({
652+
messageId,
653+
queueId,
654+
tenantId: "t1",
655+
metadata: { orgId: "org-123" },
656+
});
657+
658+
// Verify message is back in queue
659+
const queueCount = await redis.zcard(queueKey);
660+
expect(queueCount).toBe(1);
661+
662+
// Verify message is no longer in-flight
663+
const inflightCount = await manager.getTotalInflightCount();
664+
expect(inflightCount).toBe(0);
665+
666+
await manager.close();
667+
await redis.quit();
668+
}
669+
);
670+
671+
redisTest(
672+
"should return empty array when no messages have timed out",
673+
{ timeout: 10000 },
674+
async ({ redisOptions }) => {
675+
keys = new DefaultFairQueueKeyProducer({ prefix: "test" });
676+
677+
const manager = new VisibilityManager({
678+
redis: redisOptions,
679+
keys,
680+
shardCount: 1,
681+
defaultTimeoutMs: 60000, // Long timeout
682+
});
683+
684+
const redis = createRedisClient(redisOptions);
685+
const queueId = "tenant:t1:queue:no-timeout";
686+
const queueKey = keys.queueKey(queueId);
687+
const queueItemsKey = keys.queueItemsKey(queueId);
688+
const masterQueueKey = keys.masterQueueKey(0);
689+
690+
// Add and claim a message with long timeout
691+
const messageId = "long-timeout-msg";
692+
const storedMessage = {
693+
id: messageId,
694+
queueId,
695+
tenantId: "t1",
696+
payload: { id: 1 },
697+
timestamp: Date.now() - 1000,
698+
attempt: 1,
699+
};
700+
701+
await redis.zadd(queueKey, storedMessage.timestamp, messageId);
702+
await redis.hset(queueItemsKey, messageId, JSON.stringify(storedMessage));
703+
704+
await manager.claim(queueId, queueKey, queueItemsKey, "consumer-1");
705+
706+
// Reclaim should return empty array (message hasn't timed out)
707+
const reclaimedMessages = await manager.reclaimTimedOut(0, (qId) => ({
708+
queueKey: keys.queueKey(qId),
709+
queueItemsKey: keys.queueItemsKey(qId),
710+
masterQueueKey,
711+
}));
712+
713+
expect(reclaimedMessages).toHaveLength(0);
714+
715+
await manager.close();
716+
await redis.quit();
717+
}
718+
);
719+
720+
redisTest(
721+
"should reclaim multiple timed-out messages and return all their info",
722+
{ timeout: 10000 },
723+
async ({ redisOptions }) => {
724+
keys = new DefaultFairQueueKeyProducer({ prefix: "test" });
725+
726+
const manager = new VisibilityManager({
727+
redis: redisOptions,
728+
keys,
729+
shardCount: 1,
730+
defaultTimeoutMs: 100,
731+
});
732+
733+
const redis = createRedisClient(redisOptions);
734+
const masterQueueKey = keys.masterQueueKey(0);
735+
736+
// Add and claim messages for two different tenants
737+
for (const tenant of ["t1", "t2"]) {
738+
const queueId = `tenant:${tenant}:queue:multi-reclaim`;
739+
const queueKey = keys.queueKey(queueId);
740+
const queueItemsKey = keys.queueItemsKey(queueId);
741+
742+
const messageId = `msg-${tenant}`;
743+
const storedMessage = {
744+
id: messageId,
745+
queueId,
746+
tenantId: tenant,
747+
payload: { id: 1 },
748+
timestamp: Date.now() - 1000,
749+
attempt: 1,
750+
};
751+
752+
await redis.zadd(queueKey, storedMessage.timestamp, messageId);
753+
await redis.hset(queueItemsKey, messageId, JSON.stringify(storedMessage));
754+
755+
await manager.claim(queueId, queueKey, queueItemsKey, "consumer-1", 100);
756+
}
757+
758+
// Wait for timeout
759+
await new Promise((resolve) => setTimeout(resolve, 150));
760+
761+
// Reclaim should return both messages
762+
const reclaimedMessages = await manager.reclaimTimedOut(0, (qId) => ({
763+
queueKey: keys.queueKey(qId),
764+
queueItemsKey: keys.queueItemsKey(qId),
765+
masterQueueKey,
766+
}));
767+
768+
expect(reclaimedMessages).toHaveLength(2);
769+
770+
// Verify both tenants are represented
771+
const tenantIds = reclaimedMessages.map((m: ReclaimedMessageInfo) => m.tenantId).sort();
772+
expect(tenantIds).toEqual(["t1", "t2"]);
773+
774+
await manager.close();
775+
await redis.quit();
776+
}
777+
);
778+
});
600779
});
601780

packages/redis-worker/src/fair-queue/types.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,25 @@ export interface ConcurrencyCheckResult {
133133
blockedBy?: ConcurrencyState;
134134
}
135135

136+
// ============================================================================
137+
// Visibility Types
138+
// ============================================================================
139+
140+
/**
141+
* Information about a reclaimed message from visibility timeout.
142+
* Used to release concurrency after a message is returned to the queue.
143+
*/
144+
export interface ReclaimedMessageInfo {
145+
/** Message ID */
146+
messageId: string;
147+
/** Queue ID */
148+
queueId: string;
149+
/** Tenant ID for concurrency release */
150+
tenantId: string;
151+
/** Additional metadata for concurrency group extraction */
152+
metadata?: Record<string, unknown>;
153+
}
154+
136155
// ============================================================================
137156
// Scheduler Types
138157
// ============================================================================

packages/redis-worker/src/fair-queue/visibility.ts

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,12 @@
11
import { createRedisClient, type Redis, type RedisOptions } from "@internal/redis";
22
import { jumpHash } from "@trigger.dev/core/v3/serverOnly";
3-
import type { ClaimResult, FairQueueKeyProducer, InFlightMessage } from "./types.js";
3+
import type {
4+
ClaimResult,
5+
FairQueueKeyProducer,
6+
InFlightMessage,
7+
ReclaimedMessageInfo,
8+
StoredMessage,
9+
} from "./types.js";
410

511
export interface VisibilityManagerOptions {
612
redis: RedisOptions;
@@ -368,7 +374,7 @@ export class VisibilityManager {
368374
*
369375
* @param shardId - The shard to check
370376
* @param getQueueKeys - Function to get queue keys for a queue ID
371-
* @returns Number of messages reclaimed
377+
* @returns Array of reclaimed message info for concurrency release
372378
*/
373379
async reclaimTimedOut(
374380
shardId: number,
@@ -377,7 +383,7 @@ export class VisibilityManager {
377383
queueItemsKey: string;
378384
masterQueueKey: string;
379385
}
380-
): Promise<number> {
386+
): Promise<ReclaimedMessageInfo[]> {
381387
const inflightKey = this.keys.inflightKey(shardId);
382388
const inflightDataKey = this.keys.inflightDataKey(shardId);
383389
const now = Date.now();
@@ -393,7 +399,7 @@ export class VisibilityManager {
393399
100 // Process in batches
394400
);
395401

396-
let reclaimed = 0;
402+
const reclaimedMessages: ReclaimedMessageInfo[] = [];
397403

398404
for (let i = 0; i < timedOut.length; i += 2) {
399405
const member = timedOut[i];
@@ -405,6 +411,17 @@ export class VisibilityManager {
405411
const { queueKey, queueItemsKey, masterQueueKey } = getQueueKeys(queueId);
406412

407413
try {
414+
// Get message data BEFORE releasing so we can extract tenantId for concurrency release
415+
const dataJson = await this.redis.hget(inflightDataKey, messageId);
416+
let storedMessage: StoredMessage | null = null;
417+
if (dataJson) {
418+
try {
419+
storedMessage = JSON.parse(dataJson);
420+
} catch {
421+
// Ignore parse error, proceed with reclaim
422+
}
423+
}
424+
408425
// Re-add to queue with original score (or now if not available)
409426
const score = parseFloat(originalScore) || now;
410427
await this.redis.releaseMessage(
@@ -419,7 +436,15 @@ export class VisibilityManager {
419436
queueId
420437
);
421438

422-
reclaimed++;
439+
// Track reclaimed message for concurrency release
440+
if (storedMessage) {
441+
reclaimedMessages.push({
442+
messageId,
443+
queueId,
444+
tenantId: storedMessage.tenantId,
445+
metadata: storedMessage.metadata,
446+
});
447+
}
423448

424449
this.logger.debug("Reclaimed timed-out message", {
425450
messageId,
@@ -435,7 +460,7 @@ export class VisibilityManager {
435460
}
436461
}
437462

438-
return reclaimed;
463+
return reclaimedMessages;
439464
}
440465

441466
/**

0 commit comments

Comments
 (0)