Skip to content

Commit 96f3fb2

Browse files
committed
Added fallback to extract tenantId from queueId when message data is missing or corrupted
Renamed misleading originalScore variable to _deadlineScore (it's the visibility deadline, not original timestamp)
1 parent 90e5615 commit 96f3fb2

File tree

2 files changed

+92
-7
lines changed

2 files changed

+92
-7
lines changed

packages/redis-worker/src/fair-queue/tests/visibility.test.ts

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -659,6 +659,11 @@ describe("VisibilityManager", () => {
659659
const queueCount = await redis.zcard(queueKey);
660660
expect(queueCount).toBe(1);
661661

662+
// Verify message is back in queue with its original timestamp (not the deadline)
663+
const queueMessages = await redis.zrange(queueKey, 0, -1, "WITHSCORES");
664+
expect(queueMessages[0]).toBe(messageId);
665+
expect(parseInt(queueMessages[1]!)).toBe(storedMessage.timestamp);
666+
662667
// Verify message is no longer in-flight
663668
const inflightCount = await manager.getTotalInflightCount();
664669
expect(inflightCount).toBe(0);
@@ -775,6 +780,71 @@ describe("VisibilityManager", () => {
775780
await redis.quit();
776781
}
777782
);
783+
784+
redisTest(
785+
"should use fallback tenantId extraction when message data is missing or corrupted",
786+
{ timeout: 10000 },
787+
async ({ redisOptions }) => {
788+
keys = new DefaultFairQueueKeyProducer({ prefix: "test" });
789+
790+
const manager = new VisibilityManager({
791+
redis: redisOptions,
792+
keys,
793+
shardCount: 1,
794+
defaultTimeoutMs: 100,
795+
});
796+
797+
const redis = createRedisClient(redisOptions);
798+
const queueId = "tenant:t1:queue:fallback-test";
799+
const queueKey = keys.queueKey(queueId);
800+
const queueItemsKey = keys.queueItemsKey(queueId);
801+
const masterQueueKey = keys.masterQueueKey(0);
802+
const inflightDataKey = keys.inflightDataKey(0);
803+
804+
// Add and claim a message
805+
const messageId = "fallback-msg";
806+
const storedMessage = {
807+
id: messageId,
808+
queueId,
809+
tenantId: "t1",
810+
payload: { id: 1 },
811+
timestamp: Date.now() - 1000,
812+
attempt: 1,
813+
metadata: { orgId: "org-123" },
814+
};
815+
816+
await redis.zadd(queueKey, storedMessage.timestamp, messageId);
817+
await redis.hset(queueItemsKey, messageId, JSON.stringify(storedMessage));
818+
819+
// Claim the message
820+
const claimResult = await manager.claim(queueId, queueKey, queueItemsKey, "consumer-1", 100);
821+
expect(claimResult.claimed).toBe(true);
822+
823+
// Corrupt the in-flight data by setting invalid JSON
824+
await redis.hset(inflightDataKey, messageId, "not-valid-json{{{");
825+
826+
// Wait for timeout
827+
await new Promise((resolve) => setTimeout(resolve, 150));
828+
829+
// Reclaim should still work using fallback extraction
830+
const reclaimedMessages = await manager.reclaimTimedOut(0, (qId) => ({
831+
queueKey: keys.queueKey(qId),
832+
queueItemsKey: keys.queueItemsKey(qId),
833+
masterQueueKey,
834+
}));
835+
836+
expect(reclaimedMessages).toHaveLength(1);
837+
expect(reclaimedMessages[0]).toEqual({
838+
messageId,
839+
queueId,
840+
tenantId: "t1", // Extracted from queueId via fallback
841+
metadata: {}, // Empty metadata since we couldn't parse the stored message
842+
});
843+
844+
await manager.close();
845+
await redis.quit();
846+
}
847+
);
778848
});
779849
});
780850

packages/redis-worker/src/fair-queue/visibility.ts

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,8 @@ export class VisibilityManager {
4545
this.shardCount = options.shardCount;
4646
this.defaultTimeoutMs = options.defaultTimeoutMs;
4747
this.logger = options.logger ?? {
48-
debug: () => {},
49-
error: () => {},
48+
debug: () => { },
49+
error: () => { },
5050
};
5151

5252
this.#registerCommands();
@@ -403,8 +403,8 @@ export class VisibilityManager {
403403

404404
for (let i = 0; i < timedOut.length; i += 2) {
405405
const member = timedOut[i];
406-
const originalScore = timedOut[i + 1];
407-
if (!member || !originalScore) {
406+
const _deadlineScore = timedOut[i + 1]; // This is the visibility deadline, not the original timestamp
407+
if (!member || !_deadlineScore) {
408408
continue;
409409
}
410410
const { messageId, queueId } = this.#parseMember(member);
@@ -422,8 +422,9 @@ export class VisibilityManager {
422422
}
423423
}
424424

425-
// Re-add to queue with original score (or now if not available)
426-
const score = parseFloat(originalScore) || now;
425+
// Re-add to queue with original timestamp to preserve priority
426+
// Fall back to now if we can't get the original timestamp
427+
const score = storedMessage?.timestamp ?? now;
427428
await this.redis.releaseMessage(
428429
inflightKey,
429430
inflightDataKey,
@@ -437,19 +438,33 @@ export class VisibilityManager {
437438
);
438439

439440
// Track reclaimed message for concurrency release
441+
// Always add to reclaimedMessages to avoid concurrency leaks
440442
if (storedMessage) {
441443
reclaimedMessages.push({
442444
messageId,
443445
queueId,
444446
tenantId: storedMessage.tenantId,
445447
metadata: storedMessage.metadata,
446448
});
449+
} else {
450+
// Fallback: extract tenantId from queueId when message data is missing or corrupted
451+
// This ensures concurrency is released even if we can't get the full metadata
452+
this.logger.error("Missing or corrupted message data during reclaim, using fallback", {
453+
messageId,
454+
queueId,
455+
});
456+
reclaimedMessages.push({
457+
messageId,
458+
queueId,
459+
tenantId: this.keys.extractTenantId(queueId),
460+
metadata: {},
461+
});
447462
}
448463

449464
this.logger.debug("Reclaimed timed-out message", {
450465
messageId,
451466
queueId,
452-
originalScore,
467+
deadline: _deadlineScore,
453468
});
454469
} catch (error) {
455470
this.logger.error("Failed to reclaim message", {

0 commit comments

Comments
 (0)