Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 29 additions & 4 deletions packages/redis-worker/src/fair-queue/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
private cooloffEnabled: boolean;
private cooloffThreshold: number;
private cooloffPeriodMs: number;
private maxCooloffStatesSize: number;
private queueCooloffStates = new Map<string, QueueCooloffState>();

// Global rate limiter
Expand Down Expand Up @@ -142,6 +143,7 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
this.cooloffEnabled = options.cooloff?.enabled ?? true;
this.cooloffThreshold = options.cooloff?.threshold ?? 10;
this.cooloffPeriodMs = options.cooloff?.periodMs ?? 10_000;
this.maxCooloffStatesSize = options.cooloff?.maxStatesSize ?? 1000;

// Global rate limiter
this.globalRateLimiter = options.globalRateLimiter;
Expand Down Expand Up @@ -878,8 +880,11 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
}
this.#resetCooloff(queueId);
} else {
this.batchedSpanManager.incrementStat(loopId, "claim_failures");
this.#incrementCooloff(queueId);
// Don't increment cooloff here - the queue was either:
// 1. Empty (removed from master, cache cleaned up)
// 2. Concurrency blocked (message released back to queue)
// Neither case warrants cooloff as they're not failures
this.batchedSpanManager.incrementStat(loopId, "claim_skipped");
}
}
}
Expand All @@ -904,6 +909,8 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
if (this.concurrencyManager) {
const check = await this.concurrencyManager.canProcess(descriptor);
if (!check.allowed) {
// Queue at max concurrency, back off to avoid repeated attempts
this.#incrementCooloff(queueId);
return false;
}
}
Expand Down Expand Up @@ -953,6 +960,8 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
queueItemsKey,
masterQueueKey
);
// Concurrency reservation failed, back off to avoid repeated attempts
this.#incrementCooloff(queueId);
return false;
}
}
Expand Down Expand Up @@ -1214,8 +1223,11 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
this.#resetCooloff(queueId);
slotsUsed++;
} else {
this.batchedSpanManager.incrementStat(loopId, "process_failures");
this.#incrementCooloff(queueId);
// Don't increment cooloff here - the queue was either:
// 1. Empty (removed from master, cache cleaned up)
// 2. Concurrency blocked (message released back to queue)
// Neither case warrants cooloff as they're not failures
this.batchedSpanManager.incrementStat(loopId, "process_skipped");
break; // Queue empty or blocked, try next queue
}
}
Expand Down Expand Up @@ -1245,6 +1257,8 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
if (this.concurrencyManager) {
const check = await this.concurrencyManager.canProcess(descriptor);
if (!check.allowed) {
// Queue at max concurrency, back off to avoid repeated attempts
this.#incrementCooloff(queueId);
return false;
}
}
Expand Down Expand Up @@ -1294,6 +1308,8 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
queueItemsKey,
masterQueueKey
);
// Concurrency reservation failed, back off to avoid repeated attempts
this.#incrementCooloff(queueId);
return false;
}
}
Expand Down Expand Up @@ -1717,6 +1733,15 @@ export class FairQueue<TPayloadSchema extends z.ZodTypeAny = z.ZodUnknown> {
}

#incrementCooloff(queueId: string): void {
// Safety check: if the cache is too large, just clear it
if (this.queueCooloffStates.size >= this.maxCooloffStatesSize) {
this.logger.warn("Cooloff states cache hit size cap, clearing all entries", {
size: this.queueCooloffStates.size,
cap: this.maxCooloffStatesSize,
});
this.queueCooloffStates.clear();
}
Comment on lines 1735 to +1743
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Disruptive cache clearing strategy.

Clearing the entire cooloff cache when the size cap is reached has significant drawbacks:

  • All queues lose their cooloff state, including those legitimately in cooloff
  • Queues that were backing off will be retried immediately, potentially causing a thundering herd
  • Failure counts for queues tracking consecutive failures are lost

Consider these alternatives:

  1. LRU eviction: Remove only the least recently used entries
  2. Selective removal: Remove only normal state entries (tracking failures), keep cooloff entries until they expire
  3. Partial clearing: Remove oldest 10-20% of entries rather than all
🔎 Suggested LRU-based approach

Track access times and evict oldest entries:

+  private queueCooloffStatesAccessOrder: string[] = [];
+
   #incrementCooloff(queueId: string): void {
-    // Safety check: if the cache is too large, just clear it
-    if (this.queueCooloffStates.size >= this.maxCooloffStatesSize) {
-      this.logger.warn("Cooloff states cache hit size cap, clearing all entries", {
-        size: this.queueCooloffStates.size,
-        cap: this.maxCooloffStatesSize,
-      });
-      this.queueCooloffStates.clear();
-    }
+    // Safety check: if at capacity, remove oldest entry (LRU)
+    if (this.queueCooloffStates.size >= this.maxCooloffStatesSize) {
+      const oldestQueue = this.queueCooloffStatesAccessOrder.shift();
+      if (oldestQueue) {
+        this.queueCooloffStates.delete(oldestQueue);
+        this.logger.debug("Evicted oldest cooloff state", {
+          queueId: oldestQueue,
+          remainingSize: this.queueCooloffStates.size,
+        });
+      }
+    }

     const state = this.queueCooloffStates.get(queueId) ?? {
       tag: "normal" as const,
       consecutiveFailures: 0,
     };
+    
+    // Update access order
+    const existingIndex = this.queueCooloffStatesAccessOrder.indexOf(queueId);
+    if (existingIndex !== -1) {
+      this.queueCooloffStatesAccessOrder.splice(existingIndex, 1);
+    }
+    this.queueCooloffStatesAccessOrder.push(queueId);

     // ... rest of function
   }

Committable suggestion skipped: line range outside the PR's diff.

🤖 Prompt for AI Agents
In packages/redis-worker/src/fair-queue/index.ts around lines 1735 to 1743, the
current behavior clears the entire queueCooloffStates map when the size cap is
hit, which drops all cooloff and failure-tracking state; instead implement a
targeted eviction strategy: maintain access order (e.g., use the existing Map
insertion order by re-setting an entry on access to mark it recent or add an
access timestamp) and when maxCooloffStatesSize is exceeded evict entries until
under the cap by first removing entries in "normal" (failure-tracking) state,
then the oldest entries (LRU), or evict the oldest 10-20% as a fallback; update
any access patterns to refresh order/timestamps so LRU works correctly and
replace the single clear() call with code that removes only selected entries
until size < cap.


const state = this.queueCooloffStates.get(queueId) ?? {
tag: "normal" as const,
consecutiveFailures: 0,
Expand Down
66 changes: 66 additions & 0 deletions packages/redis-worker/src/fair-queue/tests/fairQueue.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -728,6 +728,72 @@ describe("FairQueue", () => {
await queue.close();
}
);

redisTest(
"should clear cooloff states when size cap is exceeded",
{ timeout: 15000 },
async ({ redisOptions }) => {
keys = new DefaultFairQueueKeyProducer({ prefix: "test" });

const scheduler = new DRRScheduler({
redis: redisOptions,
keys,
quantum: 10,
maxDeficit: 100,
});

const queue = new FairQueue({
redis: redisOptions,
keys,
scheduler,
payloadSchema: TestPayloadSchema,
shardCount: 1,
consumerCount: 1,
consumerIntervalMs: 20,
visibilityTimeoutMs: 5000,
cooloff: {
enabled: true,
threshold: 1, // Enter cooloff after 1 failure
periodMs: 100, // Short cooloff for testing
maxStatesSize: 5, // Very small cap for testing
},
startConsumers: false,
});

// Enqueue messages to multiple queues
for (let i = 0; i < 10; i++) {
await queue.enqueue({
queueId: `tenant:t${i}:queue:q1`,
tenantId: `t${i}`,
payload: { value: `msg-${i}` },
});
}

const processed: string[] = [];

// Handler that always fails to trigger cooloff
queue.onMessage(async (ctx) => {
processed.push(ctx.message.payload.value);
await ctx.fail(new Error("Forced failure"));
});

queue.start();

// Wait for some messages to be processed and fail
await vi.waitFor(
() => {
expect(processed.length).toBeGreaterThanOrEqual(5);
},
{ timeout: 10000 }
);

// The cooloff states size should be capped (test that it doesn't grow unbounded)
const cacheSizes = queue.getCacheSizes();
expect(cacheSizes.cooloffStatesSize).toBeLessThanOrEqual(10); // Some buffer for race conditions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test passes vacuously due to disabled cooloff

The test comment says "Handler that always fails to trigger cooloff" but ctx.fail() never triggered cooloff - cooloff was only triggered by dequeue failures (empty queue or concurrency blocked). Since #incrementCooloff is now dead code, cooloffStatesSize will always be 0, making the assertion toBeLessThanOrEqual(10) pass trivially without actually testing the size cap behavior. The test provides false confidence that the cap works when cooloff is actually non-functional.

Fix in Cursor Fix in Web


await queue.close();
}
);
});

describe("inspection methods", () => {
Expand Down
2 changes: 2 additions & 0 deletions packages/redis-worker/src/fair-queue/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,8 @@ export interface CooloffOptions {
threshold?: number;
/** Duration of cooloff period in milliseconds (default: 10000) */
periodMs?: number;
/** Maximum number of cooloff state entries before triggering cleanup (default: 1000) */
maxStatesSize?: number;
}

/**
Expand Down