@@ -2,7 +2,7 @@ import { describe, expect } from "vitest";
22import { redisTest } from "@internal/testcontainers" ;
33import { createRedisClient } from "@internal/redis" ;
44import { VisibilityManager , DefaultFairQueueKeyProducer } from "../index.js" ;
5- import type { FairQueueKeyProducer } from "../types.js" ;
5+ import type { FairQueueKeyProducer , ReclaimedMessageInfo } from "../types.js" ;
66
77describe ( "VisibilityManager" , ( ) => {
88 let keys : FairQueueKeyProducer ;
@@ -597,5 +597,254 @@ describe("VisibilityManager", () => {
597597 }
598598 ) ;
599599 } ) ;
600+
601+ describe ( "reclaimTimedOut" , ( ) => {
602+ redisTest (
603+ "should return reclaimed message info with tenantId for concurrency release" ,
604+ { timeout : 10000 } ,
605+ async ( { redisOptions } ) => {
606+ keys = new DefaultFairQueueKeyProducer ( { prefix : "test" } ) ;
607+
608+ const manager = new VisibilityManager ( {
609+ redis : redisOptions ,
610+ keys,
611+ shardCount : 1 ,
612+ defaultTimeoutMs : 100 , // Very short timeout
613+ } ) ;
614+
615+ const redis = createRedisClient ( redisOptions ) ;
616+ const queueId = "tenant:t1:queue:reclaim-test" ;
617+ const queueKey = keys . queueKey ( queueId ) ;
618+ const queueItemsKey = keys . queueItemsKey ( queueId ) ;
619+ const masterQueueKey = keys . masterQueueKey ( 0 ) ;
620+
621+ // Add and claim a message
622+ const messageId = "reclaim-msg" ;
623+ const storedMessage = {
624+ id : messageId ,
625+ queueId,
626+ tenantId : "t1" ,
627+ payload : { id : 1 , value : "test" } ,
628+ timestamp : Date . now ( ) - 1000 ,
629+ attempt : 1 ,
630+ metadata : { orgId : "org-123" } ,
631+ } ;
632+
633+ await redis . zadd ( queueKey , storedMessage . timestamp , messageId ) ;
634+ await redis . hset ( queueItemsKey , messageId , JSON . stringify ( storedMessage ) ) ;
635+
636+ // Claim with very short timeout
637+ const claimResult = await manager . claim ( queueId , queueKey , queueItemsKey , "consumer-1" , 100 ) ;
638+ expect ( claimResult . claimed ) . toBe ( true ) ;
639+
640+ // Wait for timeout to expire
641+ await new Promise ( ( resolve ) => setTimeout ( resolve , 150 ) ) ;
642+
643+ // Reclaim should return the message info
644+ const reclaimedMessages = await manager . reclaimTimedOut ( 0 , ( qId ) => ( {
645+ queueKey : keys . queueKey ( qId ) ,
646+ queueItemsKey : keys . queueItemsKey ( qId ) ,
647+ masterQueueKey,
648+ } ) ) ;
649+
650+ expect ( reclaimedMessages ) . toHaveLength ( 1 ) ;
651+ expect ( reclaimedMessages [ 0 ] ) . toEqual ( {
652+ messageId,
653+ queueId,
654+ tenantId : "t1" ,
655+ metadata : { orgId : "org-123" } ,
656+ } ) ;
657+
658+ // Verify message is back in queue
659+ const queueCount = await redis . zcard ( queueKey ) ;
660+ expect ( queueCount ) . toBe ( 1 ) ;
661+
662+ // Verify message is back in queue with its original timestamp (not the deadline)
663+ const queueMessages = await redis . zrange ( queueKey , 0 , - 1 , "WITHSCORES" ) ;
664+ expect ( queueMessages [ 0 ] ) . toBe ( messageId ) ;
665+ expect ( parseInt ( queueMessages [ 1 ] ! ) ) . toBe ( storedMessage . timestamp ) ;
666+
667+ // Verify message is no longer in-flight
668+ const inflightCount = await manager . getTotalInflightCount ( ) ;
669+ expect ( inflightCount ) . toBe ( 0 ) ;
670+
671+ await manager . close ( ) ;
672+ await redis . quit ( ) ;
673+ }
674+ ) ;
675+
676+ redisTest (
677+ "should return empty array when no messages have timed out" ,
678+ { timeout : 10000 } ,
679+ async ( { redisOptions } ) => {
680+ keys = new DefaultFairQueueKeyProducer ( { prefix : "test" } ) ;
681+
682+ const manager = new VisibilityManager ( {
683+ redis : redisOptions ,
684+ keys,
685+ shardCount : 1 ,
686+ defaultTimeoutMs : 60000 , // Long timeout
687+ } ) ;
688+
689+ const redis = createRedisClient ( redisOptions ) ;
690+ const queueId = "tenant:t1:queue:no-timeout" ;
691+ const queueKey = keys . queueKey ( queueId ) ;
692+ const queueItemsKey = keys . queueItemsKey ( queueId ) ;
693+ const masterQueueKey = keys . masterQueueKey ( 0 ) ;
694+
695+ // Add and claim a message with long timeout
696+ const messageId = "long-timeout-msg" ;
697+ const storedMessage = {
698+ id : messageId ,
699+ queueId,
700+ tenantId : "t1" ,
701+ payload : { id : 1 } ,
702+ timestamp : Date . now ( ) - 1000 ,
703+ attempt : 1 ,
704+ } ;
705+
706+ await redis . zadd ( queueKey , storedMessage . timestamp , messageId ) ;
707+ await redis . hset ( queueItemsKey , messageId , JSON . stringify ( storedMessage ) ) ;
708+
709+ await manager . claim ( queueId , queueKey , queueItemsKey , "consumer-1" ) ;
710+
711+ // Reclaim should return empty array (message hasn't timed out)
712+ const reclaimedMessages = await manager . reclaimTimedOut ( 0 , ( qId ) => ( {
713+ queueKey : keys . queueKey ( qId ) ,
714+ queueItemsKey : keys . queueItemsKey ( qId ) ,
715+ masterQueueKey,
716+ } ) ) ;
717+
718+ expect ( reclaimedMessages ) . toHaveLength ( 0 ) ;
719+
720+ await manager . close ( ) ;
721+ await redis . quit ( ) ;
722+ }
723+ ) ;
724+
725+ redisTest (
726+ "should reclaim multiple timed-out messages and return all their info" ,
727+ { timeout : 10000 } ,
728+ async ( { redisOptions } ) => {
729+ keys = new DefaultFairQueueKeyProducer ( { prefix : "test" } ) ;
730+
731+ const manager = new VisibilityManager ( {
732+ redis : redisOptions ,
733+ keys,
734+ shardCount : 1 ,
735+ defaultTimeoutMs : 100 ,
736+ } ) ;
737+
738+ const redis = createRedisClient ( redisOptions ) ;
739+ const masterQueueKey = keys . masterQueueKey ( 0 ) ;
740+
741+ // Add and claim messages for two different tenants
742+ for ( const tenant of [ "t1" , "t2" ] ) {
743+ const queueId = `tenant:${ tenant } :queue:multi-reclaim` ;
744+ const queueKey = keys . queueKey ( queueId ) ;
745+ const queueItemsKey = keys . queueItemsKey ( queueId ) ;
746+
747+ const messageId = `msg-${ tenant } ` ;
748+ const storedMessage = {
749+ id : messageId ,
750+ queueId,
751+ tenantId : tenant ,
752+ payload : { id : 1 } ,
753+ timestamp : Date . now ( ) - 1000 ,
754+ attempt : 1 ,
755+ } ;
756+
757+ await redis . zadd ( queueKey , storedMessage . timestamp , messageId ) ;
758+ await redis . hset ( queueItemsKey , messageId , JSON . stringify ( storedMessage ) ) ;
759+
760+ await manager . claim ( queueId , queueKey , queueItemsKey , "consumer-1" , 100 ) ;
761+ }
762+
763+ // Wait for timeout
764+ await new Promise ( ( resolve ) => setTimeout ( resolve , 150 ) ) ;
765+
766+ // Reclaim should return both messages
767+ const reclaimedMessages = await manager . reclaimTimedOut ( 0 , ( qId ) => ( {
768+ queueKey : keys . queueKey ( qId ) ,
769+ queueItemsKey : keys . queueItemsKey ( qId ) ,
770+ masterQueueKey,
771+ } ) ) ;
772+
773+ expect ( reclaimedMessages ) . toHaveLength ( 2 ) ;
774+
775+ // Verify both tenants are represented
776+ const tenantIds = reclaimedMessages . map ( ( m : ReclaimedMessageInfo ) => m . tenantId ) . sort ( ) ;
777+ expect ( tenantIds ) . toEqual ( [ "t1" , "t2" ] ) ;
778+
779+ await manager . close ( ) ;
780+ await redis . quit ( ) ;
781+ }
782+ ) ;
783+
784+ redisTest (
785+ "should use fallback tenantId extraction when message data is missing or corrupted" ,
786+ { timeout : 10000 } ,
787+ async ( { redisOptions } ) => {
788+ keys = new DefaultFairQueueKeyProducer ( { prefix : "test" } ) ;
789+
790+ const manager = new VisibilityManager ( {
791+ redis : redisOptions ,
792+ keys,
793+ shardCount : 1 ,
794+ defaultTimeoutMs : 100 ,
795+ } ) ;
796+
797+ const redis = createRedisClient ( redisOptions ) ;
798+ const queueId = "tenant:t1:queue:fallback-test" ;
799+ const queueKey = keys . queueKey ( queueId ) ;
800+ const queueItemsKey = keys . queueItemsKey ( queueId ) ;
801+ const masterQueueKey = keys . masterQueueKey ( 0 ) ;
802+ const inflightDataKey = keys . inflightDataKey ( 0 ) ;
803+
804+ // Add and claim a message
805+ const messageId = "fallback-msg" ;
806+ const storedMessage = {
807+ id : messageId ,
808+ queueId,
809+ tenantId : "t1" ,
810+ payload : { id : 1 } ,
811+ timestamp : Date . now ( ) - 1000 ,
812+ attempt : 1 ,
813+ metadata : { orgId : "org-123" } ,
814+ } ;
815+
816+ await redis . zadd ( queueKey , storedMessage . timestamp , messageId ) ;
817+ await redis . hset ( queueItemsKey , messageId , JSON . stringify ( storedMessage ) ) ;
818+
819+ // Claim the message
820+ const claimResult = await manager . claim ( queueId , queueKey , queueItemsKey , "consumer-1" , 100 ) ;
821+ expect ( claimResult . claimed ) . toBe ( true ) ;
822+
823+ // Corrupt the in-flight data by setting invalid JSON
824+ await redis . hset ( inflightDataKey , messageId , "not-valid-json{{{" ) ;
825+
826+ // Wait for timeout
827+ await new Promise ( ( resolve ) => setTimeout ( resolve , 150 ) ) ;
828+
829+ // Reclaim should still work using fallback extraction
830+ const reclaimedMessages = await manager . reclaimTimedOut ( 0 , ( qId ) => ( {
831+ queueKey : keys . queueKey ( qId ) ,
832+ queueItemsKey : keys . queueItemsKey ( qId ) ,
833+ masterQueueKey,
834+ } ) ) ;
835+
836+ expect ( reclaimedMessages ) . toHaveLength ( 1 ) ;
837+ expect ( reclaimedMessages [ 0 ] ) . toEqual ( {
838+ messageId,
839+ queueId,
840+ tenantId : "t1" , // Extracted from queueId via fallback
841+ metadata : { } , // Empty metadata since we couldn't parse the stored message
842+ } ) ;
843+
844+ await manager . close ( ) ;
845+ await redis . quit ( ) ;
846+ }
847+ ) ;
848+ } ) ;
600849} ) ;
601850
0 commit comments