Skip to content

Commit 336a6f0

Browse files
committed
implement lru cache to replace unkey memory store, much better performance and no elu blocking
1 parent 00fb2e5 commit 336a6f0

File tree

9 files changed

+332
-194
lines changed

9 files changed

+332
-194
lines changed

apps/webapp/app/services/realtime/v1StreamsGlobal.server.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import {
22
createCache,
3-
createMemoryStore,
3+
createLRUMemoryStore,
44
DefaultStatefulContext,
55
Namespace,
66
RedisCacheStore,
@@ -97,7 +97,7 @@ function initializeS2RealtimeStreamsCache() {
9797
useModernCacheKeyBuilder: true,
9898
});
9999

100-
const memoryStore = createMemoryStore(5000, 0.001);
100+
const memoryStore = createLRUMemoryStore(5000);
101101

102102
return createCache({
103103
accessToken: new Namespace<string>(ctx, {

apps/webapp/app/v3/services/worker/workerGroupTokenService.server.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { createCache, createMemoryStore, DefaultStatefulContext, Namespace } from "@internal/cache";
1+
import { createCache, createLRUMemoryStore, DefaultStatefulContext, Namespace } from "@internal/cache";
22
import {
33
CheckpointInput,
44
CompleteRunAttemptResult,
@@ -39,7 +39,7 @@ function createAuthenticatedWorkerInstanceCache() {
3939
authenticatedWorkerInstance: new Namespace<AuthenticatedWorkerInstance>(
4040
new DefaultStatefulContext(),
4141
{
42-
stores: [createMemoryStore(1000, 0.001)],
42+
stores: [createLRUMemoryStore(1000)],
4343
fresh: 60_000 * 10, // 10 minutes
4444
stale: 60_000 * 11, // 11 minutes
4545
}

internal-packages/cache/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
"@trigger.dev/core": "workspace:*",
1111
"@unkey/cache": "^1.5.0",
1212
"@unkey/error": "^0.2.0",
13+
"lru-cache": "^11.2.4",
1314
"superjson": "^2.2.1"
1415
},
1516
"scripts": {

internal-packages/cache/src/index.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,8 @@ export {
88
export { type Result, Ok, Err } from "@unkey/error";
99
export { RedisCacheStore } from "./stores/redis.js";
1010
export { createMemoryStore, type MemoryStore } from "./stores/memory.js";
11+
export {
12+
LRUMemoryStore,
13+
createLRUMemoryStore,
14+
type LRUMemoryStoreConfig,
15+
} from "./stores/lruMemory.js";
Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
import { LRUCache } from "lru-cache";
2+
import { CacheError } from "@unkey/cache";
3+
import type { Store, Entry } from "@unkey/cache/stores";
4+
import { Ok, Err, type Result } from "@unkey/error";
5+
6+
export type LRUMemoryStoreConfig = {
7+
/**
8+
* Maximum number of items to store in the cache.
9+
* This is a hard limit - the cache will never exceed this size.
10+
*/
11+
max: number;
12+
13+
/**
14+
* Name for metrics/tracing.
15+
* @default "lru-memory"
16+
*/
17+
name?: string;
18+
};
19+
20+
/**
21+
* A memory store implementation using lru-cache.
22+
*
23+
* This provides O(1) get/set/delete operations and automatic LRU eviction
24+
* without blocking the event loop (unlike @unkey/cache's MemoryStore which
25+
* uses O(n) synchronous iteration for eviction).
26+
*
27+
* TTL is checked lazily on get() - expired items are not proactively removed
28+
* but will be evicted by LRU when the cache is full.
29+
*/
30+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
31+
export class LRUMemoryStore<TNamespace extends string, TValue = any>
32+
implements Store<TNamespace, TValue>
33+
{
34+
readonly name: string;
35+
private readonly cache: LRUCache<string, Entry<TValue>>;
36+
37+
constructor(config: LRUMemoryStoreConfig) {
38+
this.name = config.name ?? "lru-memory";
39+
this.cache = new LRUCache<string, Entry<TValue>>({
40+
max: config.max,
41+
// Don't use ttlAutopurge - it creates a setTimeout per item which
42+
// doesn't scale well at high throughput (thousands of items/second).
43+
// Instead, we check TTL lazily on get().
44+
ttlAutopurge: false,
45+
// Allow returning stale values - the cache layer handles SWR semantics
46+
allowStale: true,
47+
// Use the staleUntil timestamp for TTL calculation
48+
ttl: 1, // Placeholder, we set per-item TTL in set()
49+
});
50+
}
51+
52+
private buildCacheKey(namespace: TNamespace, key: string): string {
53+
return `${namespace}::${key}`;
54+
}
55+
56+
async get(
57+
namespace: TNamespace,
58+
key: string
59+
): Promise<Result<Entry<TValue> | undefined, CacheError>> {
60+
try {
61+
const cacheKey = this.buildCacheKey(namespace, key);
62+
const entry = this.cache.get(cacheKey);
63+
64+
if (!entry) {
65+
return Ok(undefined);
66+
}
67+
68+
// Check if entry is expired (past staleUntil)
69+
// The cache layer will handle fresh vs stale semantics
70+
if (entry.staleUntil <= Date.now()) {
71+
// Remove expired entry
72+
this.cache.delete(cacheKey);
73+
return Ok(undefined);
74+
}
75+
76+
return Ok(entry);
77+
} catch (err) {
78+
return Err(
79+
new CacheError({
80+
tier: this.name,
81+
key,
82+
message: err instanceof Error ? err.message : String(err),
83+
})
84+
);
85+
}
86+
}
87+
88+
async set(
89+
namespace: TNamespace,
90+
key: string,
91+
entry: Entry<TValue>
92+
): Promise<Result<void, CacheError>> {
93+
try {
94+
const cacheKey = this.buildCacheKey(namespace, key);
95+
96+
// Calculate TTL from staleUntil timestamp
97+
const ttl = Math.max(0, entry.staleUntil - Date.now());
98+
99+
this.cache.set(cacheKey, entry, { ttl });
100+
101+
return Ok(undefined as void);
102+
} catch (err) {
103+
return Err(
104+
new CacheError({
105+
tier: this.name,
106+
key,
107+
message: err instanceof Error ? err.message : String(err),
108+
})
109+
);
110+
}
111+
}
112+
113+
async remove(
114+
namespace: TNamespace,
115+
keys: string | string[]
116+
): Promise<Result<void, CacheError>> {
117+
try {
118+
const keyArray = Array.isArray(keys) ? keys : [keys];
119+
120+
for (const key of keyArray) {
121+
const cacheKey = this.buildCacheKey(namespace, key);
122+
this.cache.delete(cacheKey);
123+
}
124+
125+
return Ok(undefined as void);
126+
} catch (err) {
127+
return Err(
128+
new CacheError({
129+
tier: this.name,
130+
key: Array.isArray(keys) ? keys.join(",") : keys,
131+
message: err instanceof Error ? err.message : String(err),
132+
})
133+
);
134+
}
135+
}
136+
137+
/**
138+
* Returns the current number of items in the cache.
139+
*/
140+
get size(): number {
141+
return this.cache.size;
142+
}
143+
144+
/**
145+
* Clears all items from the cache.
146+
*/
147+
clear(): void {
148+
this.cache.clear();
149+
}
150+
}
151+
152+
/**
153+
* Creates an LRU memory store with the specified maximum size.
154+
*
155+
* This is a drop-in replacement for createMemoryStore() that uses lru-cache
156+
* instead of @unkey/cache's MemoryStore, providing:
157+
* - O(1) operations (vs O(n) eviction in MemoryStore)
158+
* - No event loop blocking
159+
* - Strict memory bounds (hard max vs soft cap)
160+
*
161+
* @param maxItems Maximum number of items to store
162+
* @param name Optional name for metrics/tracing (default: "lru-memory")
163+
*/
164+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
165+
export function createLRUMemoryStore(maxItems: number, name?: string): LRUMemoryStore<string, any> {
166+
return new LRUMemoryStore({ max: maxItems, name });
167+
}

internal-packages/run-engine/src/engine/billingCache.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,13 @@
11
import {
22
createCache,
3+
createLRUMemoryStore,
34
DefaultStatefulContext,
4-
MemoryStore,
55
Namespace,
66
Ok,
77
RedisCacheStore,
88
type UnkeyCache,
99
type CacheError,
1010
type Result,
11-
createMemoryStore,
1211
} from "@internal/cache";
1312
import type { RedisOptions } from "@internal/redis";
1413
import type { Logger } from "@trigger.dev/core/logger";
@@ -53,7 +52,7 @@ export class BillingCache {
5352

5453
this.cache = createCache({
5554
currentPlan: new Namespace<BillingPlan>(ctx, {
56-
stores: [createMemoryStore(1000), redisCacheStore],
55+
stores: [createLRUMemoryStore(1000), redisCacheStore],
5756
fresh: BILLING_FRESH_TTL,
5857
stale: BILLING_STALE_TTL,
5958
}),

internal-packages/run-engine/src/engine/systems/runAttemptSystem.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
import {
22
createCache,
3-
createMemoryStore,
3+
createLRUMemoryStore,
44
DefaultStatefulContext,
5-
MemoryStore,
65
Namespace,
76
RedisCacheStore,
87
UnkeyCache,
@@ -130,7 +129,7 @@ export class RunAttemptSystem {
130129
this.delayedRunSystem = options.delayedRunSystem;
131130

132131
const ctx = new DefaultStatefulContext();
133-
const memory = createMemoryStore(5000, 0.001);
132+
const memory = createLRUMemoryStore(5000);
134133
const redisCacheStore = new RedisCacheStore({
135134
name: "run-attempt-system",
136135
connection: {

internal-packages/run-engine/src/run-queue/fairQueueSelectionStrategy.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,10 @@ import { createRedisClient, Redis, type RedisOptions } from "@internal/redis";
22
import { startSpan, type Tracer } from "@internal/tracing";
33
import {
44
createCache,
5+
createLRUMemoryStore,
56
DefaultStatefulContext,
67
Namespace,
78
type UnkeyCache,
8-
MemoryStore,
9-
createMemoryStore,
109
} from "@internal/cache";
1110
import { randomUUID } from "crypto";
1211
import seedrandom from "seedrandom";
@@ -107,7 +106,7 @@ export class FairQueueSelectionStrategy implements RunQueueSelectionStrategy {
107106

108107
constructor(private options: FairQueueSelectionStrategyOptions) {
109108
const ctx = new DefaultStatefulContext();
110-
const memory = createMemoryStore(1000);
109+
const memory = createLRUMemoryStore(1000);
111110

112111
this._cache = createCache({
113112
concurrencyLimit: new Namespace<number>(ctx, {

0 commit comments

Comments
 (0)