Skip to content

Commit cef3df3

Browse files
Nick Ficanoclaude
andcommitted
effect(runtime-stores): add IdempotencyStore + ResumeStore Effect services
Wrap the (principal, idempotency_key) and session-id → resume-record maps in SynchronizedRef-backed Effect.Services so concurrent fibers can race checkAndStore for the same key and all observe the same canonical jobId, closing the cross-fiber race in #26. The legacy IdempotencyStore / ResumeStore classes remain in place; ARCPServer keeps using them unchanged. New service tags: arcp/IdempotencyStoreService, arcp/ResumeStoreService. Closes #42 Closes #26 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 9d07fd3 commit cef3df3

2 files changed

Lines changed: 413 additions & 1 deletion

File tree

packages/runtime/src/stores.ts

Lines changed: 209 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { randomBytes } from "node:crypto";
22

3-
import type { JobId, ResumeToken } from "@arcp/core";
3+
import { type JobId, type ResumeToken, TaggedResumeWindowExpired } from "@arcp/core";
4+
import { Effect, SynchronizedRef } from "effect";
45

56
export interface IdempotencyEntry {
67
jobId: JobId;
@@ -70,3 +71,210 @@ export class ResumeStore {
7071
}
7172
}
7273
}
74+
75+
// ============================================================================
76+
// Effect-shaped twin — `IdempotencyStoreService`
77+
// ============================================================================
78+
79+
/**
80+
* Compose the storage key the legacy `IdempotencyStore` uses. Matches the
81+
* `${principal}::${idempotency_key}` shape produced by `job-runner.ts` so the
82+
* two store impls remain interchangeable while migration is in flight.
83+
*/
84+
export function idempotencyKey(principal: string, key: string): string {
85+
return `${principal}::${key}`;
86+
}
87+
88+
type IdempotencyMap = ReadonlyMap<string, IdempotencyEntry>;
89+
type IdempotencyRef = SynchronizedRef.SynchronizedRef<IdempotencyMap>;
90+
91+
function withEntry(
92+
map: IdempotencyMap,
93+
key: string,
94+
entry: IdempotencyEntry,
95+
): IdempotencyMap {
96+
const next = new Map(map);
97+
next.set(key, entry);
98+
return next;
99+
}
100+
101+
function sweepMap<V extends { expiresAt: number }>(
102+
map: ReadonlyMap<string, V>,
103+
now: number,
104+
): ReadonlyMap<string, V> {
105+
let mutated: Map<string, V> | null = null;
106+
for (const [k, v] of map.entries()) {
107+
if (v.expiresAt <= now) {
108+
mutated ??= new Map(map);
109+
mutated.delete(k);
110+
}
111+
}
112+
return mutated ?? map;
113+
}
114+
115+
function makeIdempotencyOps(ref: IdempotencyRef) {
116+
return {
117+
get: (principal: string, key: string): Effect.Effect<IdempotencyEntry | undefined> =>
118+
SynchronizedRef.get(ref).pipe(
119+
Effect.map((m) => m.get(idempotencyKey(principal, key))),
120+
),
121+
set: (
122+
principal: string,
123+
key: string,
124+
entry: IdempotencyEntry,
125+
): Effect.Effect<void> =>
126+
SynchronizedRef.update(ref, (m) =>
127+
withEntry(m, idempotencyKey(principal, key), entry),
128+
),
129+
checkAndStore: (
130+
principal: string,
131+
key: string,
132+
entry: IdempotencyEntry,
133+
): Effect.Effect<IdempotencyEntry> =>
134+
SynchronizedRef.modify(
135+
ref,
136+
(m): readonly [IdempotencyEntry, IdempotencyMap] => {
137+
const k = idempotencyKey(principal, key);
138+
const existing = m.get(k);
139+
if (existing !== undefined && existing.expiresAt > Date.now()) {
140+
return [existing, m];
141+
}
142+
return [entry, withEntry(m, k, entry)];
143+
},
144+
),
145+
sweep: (now: number = Date.now()): Effect.Effect<void> =>
146+
SynchronizedRef.update(ref, (m) => sweepMap(m, now)),
147+
snapshot: SynchronizedRef.get(ref).pipe(
148+
Effect.map((m) => new Map(m) as ReadonlyMap<string, IdempotencyEntry>),
149+
),
150+
} as const;
151+
}
152+
153+
/**
154+
* Effect-shaped twin of {@link IdempotencyStore}. Backs the
155+
* `(principal, idempotency_key) → IdempotencyEntry` map with a
156+
* {@link SynchronizedRef} so concurrent fibers can race a `checkAndStore` for
157+
* the same key and all observe the same canonical entry — closing the race
158+
* documented in #26. Key composition follows {@link idempotencyKey}.
159+
*
160+
* The legacy {@link IdempotencyStore} class is preserved during migration; the
161+
* service is a behavioral twin, not a strict wrapper.
162+
*/
163+
export class IdempotencyStoreService extends Effect.Service<IdempotencyStoreService>()(
164+
"arcp/IdempotencyStoreService",
165+
{
166+
effect: Effect.gen(function* () {
167+
const ref = yield* SynchronizedRef.make<IdempotencyMap>(new Map());
168+
return makeIdempotencyOps(ref);
169+
}),
170+
},
171+
) {}
172+
173+
// ============================================================================
174+
// Effect-shaped twin — `ResumeStoreService`
175+
// ============================================================================
176+
177+
type ResumeMap = ReadonlyMap<string, ResumeRecord>;
178+
type ResumeRef = SynchronizedRef.SynchronizedRef<ResumeMap>;
179+
180+
export type ResumeStoreFailure = TaggedResumeWindowExpired;
181+
182+
function withResume(
183+
map: ResumeMap,
184+
sessionId: string,
185+
record: ResumeRecord,
186+
): ResumeMap {
187+
const next = new Map(map);
188+
next.set(sessionId, record);
189+
return next;
190+
}
191+
192+
function withoutResume(map: ResumeMap, sessionId: string): ResumeMap {
193+
if (!map.has(sessionId)) return map;
194+
const next = new Map(map);
195+
next.delete(sessionId);
196+
return next;
197+
}
198+
199+
type TakeOutcome =
200+
| { readonly kind: "hit"; readonly record: ResumeRecord }
201+
| { readonly kind: "missing" }
202+
| { readonly kind: "expired" };
203+
204+
function takeResumeOutcome(
205+
ref: ResumeRef,
206+
sessionId: string,
207+
now: number,
208+
): Effect.Effect<TakeOutcome> {
209+
return SynchronizedRef.modify(ref, (map): readonly [TakeOutcome, ResumeMap] => {
210+
const existing = map.get(sessionId);
211+
if (existing === undefined) {
212+
return [{ kind: "missing" }, map];
213+
}
214+
if (existing.expiresAt < now) {
215+
return [{ kind: "expired" }, withoutResume(map, sessionId)];
216+
}
217+
return [
218+
{ kind: "hit", record: existing },
219+
withoutResume(map, sessionId),
220+
];
221+
});
222+
}
223+
224+
function takeResume(
225+
ref: ResumeRef,
226+
sessionId: string,
227+
now: number,
228+
): Effect.Effect<ResumeRecord, ResumeStoreFailure> {
229+
return takeResumeOutcome(ref, sessionId, now).pipe(
230+
Effect.flatMap((outcome) => {
231+
if (outcome.kind === "hit") return Effect.succeed(outcome.record);
232+
const message =
233+
outcome.kind === "missing"
234+
? `No resume record for session "${sessionId}"`
235+
: `Resume window has expired for session "${sessionId}"`;
236+
return Effect.fail(new TaggedResumeWindowExpired({ message }));
237+
}),
238+
);
239+
}
240+
241+
function makeResumeOps(ref: ResumeRef) {
242+
return {
243+
get: (sessionId: string): Effect.Effect<ResumeRecord | undefined> =>
244+
SynchronizedRef.get(ref).pipe(Effect.map((m) => m.get(sessionId))),
245+
store: (sessionId: string, record: ResumeRecord): Effect.Effect<void> =>
246+
SynchronizedRef.update(ref, (m) => withResume(m, sessionId, record)),
247+
consume: (
248+
sessionId: string,
249+
now: number = Date.now(),
250+
): Effect.Effect<ResumeRecord, ResumeStoreFailure> =>
251+
takeResume(ref, sessionId, now),
252+
delete: (sessionId: string): Effect.Effect<void> =>
253+
SynchronizedRef.update(ref, (m) => withoutResume(m, sessionId)),
254+
sweep: (now: number = Date.now()): Effect.Effect<void> =>
255+
SynchronizedRef.update(ref, (m) => sweepMap(m, now)),
256+
snapshot: SynchronizedRef.get(ref).pipe(
257+
Effect.map((m) => new Map(m) as ReadonlyMap<string, ResumeRecord>),
258+
),
259+
} as const;
260+
}
261+
262+
/**
263+
* Effect-shaped twin of {@link ResumeStore}. Backs the
264+
* `session_id → ResumeRecord` map with a {@link SynchronizedRef} so concurrent
265+
* fibers can `store`, `consume`, and `sweep` without trampling each other.
266+
*
267+
* The legacy {@link ResumeStore} class is preserved during migration; the
268+
* service is a behavioral twin, not a strict wrapper. {@link newResumeToken}
269+
* is intentionally left as a free function because it is pure randomness — it
270+
* does not touch service state.
271+
*/
272+
export class ResumeStoreService extends Effect.Service<ResumeStoreService>()(
273+
"arcp/ResumeStoreService",
274+
{
275+
effect: Effect.gen(function* () {
276+
const ref = yield* SynchronizedRef.make<ResumeMap>(new Map());
277+
return makeResumeOps(ref);
278+
}),
279+
},
280+
) {}

0 commit comments

Comments
 (0)