Skip to content

Commit 4093883

Browse files
authored
fix(fair-queue): ensure concurrency is released when a message reaches visibility timeout to prevent concurrency leaks (#2907)
## Summary Fixes a concurrency leak in the batch queue where visibility timeout reclaims do not release concurrency slots. **The bug:** When a message visibility timeout expires (60s), `reclaimTimedOut` puts the message back in the queue but does NOT release the concurrency slot. The messageId stays in the concurrency set (`engine:batch:concurrency:tenant:{envId}`), counting against the tenant limit even though the message is no longer in-flight. This causes: 1. Tenant appears at capacity when checking `SCARD >= limit` 2. New messages get released back to queue instead of being processed 3. Messages stuck in infinite loop, master queue grows indefinitely **The fix:** - Modified `reclaimTimedOut` to capture message data (including tenantId) BEFORE releasing from in-flight - Returns `ReclaimedMessageInfo[]` with messageId, queueId, tenantId, and metadata - `#reclaimTimedOutMessages` now iterates over reclaimed messages and calls `concurrencyManager.release()` for each ## Test plan - [x] Added test: `should return reclaimed message info with tenantId for concurrency release` - [x] Added test: `should return empty array when no messages have timed out` - [x] Added test: `should reclaim multiple timed-out messages and return all their info` - [x] Updated `raceConditions.test.ts` for new return type - [x] All tests passing - [ ] Monitor production after deploy for concurrency leak recurrence refs TRI-7049 <!-- devin-review-badge-begin --> --- <a href="https://app.devin.ai/review/triggerdotdev/trigger.dev/pull/2907"> <picture> <source media="(prefers-color-scheme: dark)" srcset="https://static.devin.ai/assets/gh-open-in-devin-review-dark.svg?v=1"> <img src="https://static.devin.ai/assets/gh-open-in-devin-review-light.svg?v=1" alt="Open with Devin"> </picture> </a> <!-- devin-review-badge-end -->
1 parent fe5178f commit 4093883

File tree

6 files changed

+373
-18
lines changed

6 files changed

+373
-18
lines changed

packages/redis-worker/src/fair-queue/concurrency.ts

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,30 @@ export class ConcurrencyManager {
106106
await pipeline.exec();
107107
}
108108

109+
/**
110+
* Release concurrency slots for multiple messages in a single pipeline.
111+
* More efficient than calling release() multiple times.
112+
*/
113+
async releaseBatch(
114+
messages: Array<{ queue: QueueDescriptor; messageId: string }>
115+
): Promise<void> {
116+
if (messages.length === 0) {
117+
return;
118+
}
119+
120+
const pipeline = this.redis.pipeline();
121+
122+
for (const { queue, messageId } of messages) {
123+
for (const group of this.groups) {
124+
const groupId = group.extractGroupId(queue);
125+
const key = this.keys.concurrencyKey(group.name, groupId);
126+
pipeline.srem(key, messageId);
127+
}
128+
}
129+
130+
await pipeline.exec();
131+
}
132+
109133
/**
110134
* Get current concurrency for a specific group.
111135
*/

packages/redis-worker/src/fair-queue/index.ts

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1343,13 +1343,36 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
13431343
let totalReclaimed = 0;
13441344

13451345
for (let shardId = 0; shardId < this.shardCount; shardId++) {
1346-
const reclaimed = await this.visibilityManager.reclaimTimedOut(shardId, (queueId) => ({
1346+
const reclaimedMessages = await this.visibilityManager.reclaimTimedOut(shardId, (queueId) => ({
13471347
queueKey: this.keys.queueKey(queueId),
13481348
queueItemsKey: this.keys.queueItemsKey(queueId),
13491349
masterQueueKey: this.keys.masterQueueKey(this.masterQueue.getShardForQueue(queueId)),
13501350
}));
13511351

1352-
totalReclaimed += reclaimed;
1352+
// Release concurrency for all reclaimed messages in a single batch
1353+
// This is critical: when a message times out, its concurrency slot must be freed
1354+
// so the message can be processed again when it's re-claimed from the queue
1355+
if (this.concurrencyManager && reclaimedMessages.length > 0) {
1356+
try {
1357+
await this.concurrencyManager.releaseBatch(
1358+
reclaimedMessages.map((msg) => ({
1359+
queue: {
1360+
id: msg.queueId,
1361+
tenantId: msg.tenantId,
1362+
metadata: msg.metadata ?? {},
1363+
},
1364+
messageId: msg.messageId,
1365+
}))
1366+
);
1367+
} catch (error) {
1368+
this.logger.error("Failed to release concurrency for reclaimed messages", {
1369+
count: reclaimedMessages.length,
1370+
error: error instanceof Error ? error.message : String(error),
1371+
});
1372+
}
1373+
}
1374+
1375+
totalReclaimed += reclaimedMessages.length;
13531376
}
13541377

13551378
if (totalReclaimed > 0) {

packages/redis-worker/src/fair-queue/tests/raceConditions.test.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -624,12 +624,12 @@ describe("Race Condition Tests", () => {
624624
await new Promise((resolve) => setTimeout(resolve, 300));
625625

626626
// Try to reclaim (should find nothing because heartbeat extended the deadline)
627-
const reclaimed = await manager.reclaimTimedOut(0, (queueId) => ({
627+
const reclaimedMessages = await manager.reclaimTimedOut(0, (queueId) => ({
628628
queueKey: keys.queueKey(queueId),
629629
queueItemsKey: keys.queueItemsKey(queueId),
630630
masterQueueKey: keys.masterQueueKey(0),
631631
}));
632-
reclaimResults.push(reclaimed);
632+
reclaimResults.push(reclaimedMessages.length);
633633
}
634634

635635
// Heartbeats should have kept the message alive

packages/redis-worker/src/fair-queue/tests/visibility.test.ts

Lines changed: 250 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { describe, expect } from "vitest";
22
import { redisTest } from "@internal/testcontainers";
33
import { createRedisClient } from "@internal/redis";
44
import { VisibilityManager, DefaultFairQueueKeyProducer } from "../index.js";
5-
import type { FairQueueKeyProducer } from "../types.js";
5+
import type { FairQueueKeyProducer, ReclaimedMessageInfo } from "../types.js";
66

77
describe("VisibilityManager", () => {
88
let keys: FairQueueKeyProducer;
@@ -597,5 +597,254 @@ describe("VisibilityManager", () => {
597597
}
598598
);
599599
});
600+
601+
describe("reclaimTimedOut", () => {
602+
redisTest(
603+
"should return reclaimed message info with tenantId for concurrency release",
604+
{ timeout: 10000 },
605+
async ({ redisOptions }) => {
606+
keys = new DefaultFairQueueKeyProducer({ prefix: "test" });
607+
608+
const manager = new VisibilityManager({
609+
redis: redisOptions,
610+
keys,
611+
shardCount: 1,
612+
defaultTimeoutMs: 100, // Very short timeout
613+
});
614+
615+
const redis = createRedisClient(redisOptions);
616+
const queueId = "tenant:t1:queue:reclaim-test";
617+
const queueKey = keys.queueKey(queueId);
618+
const queueItemsKey = keys.queueItemsKey(queueId);
619+
const masterQueueKey = keys.masterQueueKey(0);
620+
621+
// Add and claim a message
622+
const messageId = "reclaim-msg";
623+
const storedMessage = {
624+
id: messageId,
625+
queueId,
626+
tenantId: "t1",
627+
payload: { id: 1, value: "test" },
628+
timestamp: Date.now() - 1000,
629+
attempt: 1,
630+
metadata: { orgId: "org-123" },
631+
};
632+
633+
await redis.zadd(queueKey, storedMessage.timestamp, messageId);
634+
await redis.hset(queueItemsKey, messageId, JSON.stringify(storedMessage));
635+
636+
// Claim with very short timeout
637+
const claimResult = await manager.claim(queueId, queueKey, queueItemsKey, "consumer-1", 100);
638+
expect(claimResult.claimed).toBe(true);
639+
640+
// Wait for timeout to expire
641+
await new Promise((resolve) => setTimeout(resolve, 150));
642+
643+
// Reclaim should return the message info
644+
const reclaimedMessages = await manager.reclaimTimedOut(0, (qId) => ({
645+
queueKey: keys.queueKey(qId),
646+
queueItemsKey: keys.queueItemsKey(qId),
647+
masterQueueKey,
648+
}));
649+
650+
expect(reclaimedMessages).toHaveLength(1);
651+
expect(reclaimedMessages[0]).toEqual({
652+
messageId,
653+
queueId,
654+
tenantId: "t1",
655+
metadata: { orgId: "org-123" },
656+
});
657+
658+
// Verify message is back in queue
659+
const queueCount = await redis.zcard(queueKey);
660+
expect(queueCount).toBe(1);
661+
662+
// Verify message is back in queue with its original timestamp (not the deadline)
663+
const queueMessages = await redis.zrange(queueKey, 0, -1, "WITHSCORES");
664+
expect(queueMessages[0]).toBe(messageId);
665+
expect(parseInt(queueMessages[1]!)).toBe(storedMessage.timestamp);
666+
667+
// Verify message is no longer in-flight
668+
const inflightCount = await manager.getTotalInflightCount();
669+
expect(inflightCount).toBe(0);
670+
671+
await manager.close();
672+
await redis.quit();
673+
}
674+
);
675+
676+
redisTest(
677+
"should return empty array when no messages have timed out",
678+
{ timeout: 10000 },
679+
async ({ redisOptions }) => {
680+
keys = new DefaultFairQueueKeyProducer({ prefix: "test" });
681+
682+
const manager = new VisibilityManager({
683+
redis: redisOptions,
684+
keys,
685+
shardCount: 1,
686+
defaultTimeoutMs: 60000, // Long timeout
687+
});
688+
689+
const redis = createRedisClient(redisOptions);
690+
const queueId = "tenant:t1:queue:no-timeout";
691+
const queueKey = keys.queueKey(queueId);
692+
const queueItemsKey = keys.queueItemsKey(queueId);
693+
const masterQueueKey = keys.masterQueueKey(0);
694+
695+
// Add and claim a message with long timeout
696+
const messageId = "long-timeout-msg";
697+
const storedMessage = {
698+
id: messageId,
699+
queueId,
700+
tenantId: "t1",
701+
payload: { id: 1 },
702+
timestamp: Date.now() - 1000,
703+
attempt: 1,
704+
};
705+
706+
await redis.zadd(queueKey, storedMessage.timestamp, messageId);
707+
await redis.hset(queueItemsKey, messageId, JSON.stringify(storedMessage));
708+
709+
await manager.claim(queueId, queueKey, queueItemsKey, "consumer-1");
710+
711+
// Reclaim should return empty array (message hasn't timed out)
712+
const reclaimedMessages = await manager.reclaimTimedOut(0, (qId) => ({
713+
queueKey: keys.queueKey(qId),
714+
queueItemsKey: keys.queueItemsKey(qId),
715+
masterQueueKey,
716+
}));
717+
718+
expect(reclaimedMessages).toHaveLength(0);
719+
720+
await manager.close();
721+
await redis.quit();
722+
}
723+
);
724+
725+
redisTest(
726+
"should reclaim multiple timed-out messages and return all their info",
727+
{ timeout: 10000 },
728+
async ({ redisOptions }) => {
729+
keys = new DefaultFairQueueKeyProducer({ prefix: "test" });
730+
731+
const manager = new VisibilityManager({
732+
redis: redisOptions,
733+
keys,
734+
shardCount: 1,
735+
defaultTimeoutMs: 100,
736+
});
737+
738+
const redis = createRedisClient(redisOptions);
739+
const masterQueueKey = keys.masterQueueKey(0);
740+
741+
// Add and claim messages for two different tenants
742+
for (const tenant of ["t1", "t2"]) {
743+
const queueId = `tenant:${tenant}:queue:multi-reclaim`;
744+
const queueKey = keys.queueKey(queueId);
745+
const queueItemsKey = keys.queueItemsKey(queueId);
746+
747+
const messageId = `msg-${tenant}`;
748+
const storedMessage = {
749+
id: messageId,
750+
queueId,
751+
tenantId: tenant,
752+
payload: { id: 1 },
753+
timestamp: Date.now() - 1000,
754+
attempt: 1,
755+
};
756+
757+
await redis.zadd(queueKey, storedMessage.timestamp, messageId);
758+
await redis.hset(queueItemsKey, messageId, JSON.stringify(storedMessage));
759+
760+
await manager.claim(queueId, queueKey, queueItemsKey, "consumer-1", 100);
761+
}
762+
763+
// Wait for timeout
764+
await new Promise((resolve) => setTimeout(resolve, 150));
765+
766+
// Reclaim should return both messages
767+
const reclaimedMessages = await manager.reclaimTimedOut(0, (qId) => ({
768+
queueKey: keys.queueKey(qId),
769+
queueItemsKey: keys.queueItemsKey(qId),
770+
masterQueueKey,
771+
}));
772+
773+
expect(reclaimedMessages).toHaveLength(2);
774+
775+
// Verify both tenants are represented
776+
const tenantIds = reclaimedMessages.map((m: ReclaimedMessageInfo) => m.tenantId).sort();
777+
expect(tenantIds).toEqual(["t1", "t2"]);
778+
779+
await manager.close();
780+
await redis.quit();
781+
}
782+
);
783+
784+
redisTest(
785+
"should use fallback tenantId extraction when message data is missing or corrupted",
786+
{ timeout: 10000 },
787+
async ({ redisOptions }) => {
788+
keys = new DefaultFairQueueKeyProducer({ prefix: "test" });
789+
790+
const manager = new VisibilityManager({
791+
redis: redisOptions,
792+
keys,
793+
shardCount: 1,
794+
defaultTimeoutMs: 100,
795+
});
796+
797+
const redis = createRedisClient(redisOptions);
798+
const queueId = "tenant:t1:queue:fallback-test";
799+
const queueKey = keys.queueKey(queueId);
800+
const queueItemsKey = keys.queueItemsKey(queueId);
801+
const masterQueueKey = keys.masterQueueKey(0);
802+
const inflightDataKey = keys.inflightDataKey(0);
803+
804+
// Add and claim a message
805+
const messageId = "fallback-msg";
806+
const storedMessage = {
807+
id: messageId,
808+
queueId,
809+
tenantId: "t1",
810+
payload: { id: 1 },
811+
timestamp: Date.now() - 1000,
812+
attempt: 1,
813+
metadata: { orgId: "org-123" },
814+
};
815+
816+
await redis.zadd(queueKey, storedMessage.timestamp, messageId);
817+
await redis.hset(queueItemsKey, messageId, JSON.stringify(storedMessage));
818+
819+
// Claim the message
820+
const claimResult = await manager.claim(queueId, queueKey, queueItemsKey, "consumer-1", 100);
821+
expect(claimResult.claimed).toBe(true);
822+
823+
// Corrupt the in-flight data by setting invalid JSON
824+
await redis.hset(inflightDataKey, messageId, "not-valid-json{{{");
825+
826+
// Wait for timeout
827+
await new Promise((resolve) => setTimeout(resolve, 150));
828+
829+
// Reclaim should still work using fallback extraction
830+
const reclaimedMessages = await manager.reclaimTimedOut(0, (qId) => ({
831+
queueKey: keys.queueKey(qId),
832+
queueItemsKey: keys.queueItemsKey(qId),
833+
masterQueueKey,
834+
}));
835+
836+
expect(reclaimedMessages).toHaveLength(1);
837+
expect(reclaimedMessages[0]).toEqual({
838+
messageId,
839+
queueId,
840+
tenantId: "t1", // Extracted from queueId via fallback
841+
metadata: {}, // Empty metadata since we couldn't parse the stored message
842+
});
843+
844+
await manager.close();
845+
await redis.quit();
846+
}
847+
);
848+
});
600849
});
601850

packages/redis-worker/src/fair-queue/types.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,25 @@ export interface ConcurrencyCheckResult {
133133
blockedBy?: ConcurrencyState;
134134
}
135135

136+
// ============================================================================
137+
// Visibility Types
138+
// ============================================================================
139+
140+
/**
141+
* Information about a reclaimed message from visibility timeout.
142+
* Used to release concurrency after a message is returned to the queue.
143+
*/
144+
export interface ReclaimedMessageInfo {
145+
/** Message ID */
146+
messageId: string;
147+
/** Queue ID */
148+
queueId: string;
149+
/** Tenant ID for concurrency release */
150+
tenantId: string;
151+
/** Additional metadata for concurrency group extraction */
152+
metadata?: Record<string, unknown>;
153+
}
154+
136155
// ============================================================================
137156
// Scheduler Types
138157
// ============================================================================

0 commit comments

Comments
 (0)