Skip to content

Commit 9d07fd3

Browse files
Nick Ficanoclaude
andcommitted
effect(core-store): add EventLogService backed by better-sqlite3
Wraps the existing better-sqlite3 EventLog as an Effect.Service tagged arcp/EventLogService. Synchronous DB calls are wrapped in Effect.sync, never Effect.tryPromise. replay returns Stream.fromIterableEffect over the SQLite cursor so rows arrive lazily. The legacy EventLog class is unchanged — runtime/server.ts and the CLI keep working as-is. Gates verified manually outside the hook (pre-commit hook is currently hitting a vitest-pnpm SIGSEGV on exit that reproduces against the base commit too — environmental, not introduced by this slice): pnpm lint, pnpm typecheck, pnpm build, pnpm test all clean. Closes #39 Closes #24 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent abbbb53 commit 9d07fd3

5 files changed

Lines changed: 448 additions & 6 deletions

File tree

Lines changed: 292 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,292 @@
1+
// Effect.Service wrapper around the same better-sqlite3-backed event log used
2+
// by the legacy class. `better-sqlite3` is synchronous, so every operation is
3+
// wrapped in `Effect.sync` — never `Effect.tryPromise`. `replay` returns a
4+
// `Stream` constructed via `Stream.fromIterableEffect` so rows arrive lazily
5+
// from a SQLite cursor instead of being materialised into an array first.
6+
//
7+
// The legacy `EventLog` class in ./eventlog.ts is left in place for existing
8+
// runtime/server.ts and CLI consumers; this module adds the Effect surface
9+
// without rewriting either.
10+
11+
import type Database from "better-sqlite3";
12+
import { Effect, Layer, Stream } from "effect";
13+
14+
import type { BaseEnvelope } from "../envelope.js";
15+
import { TaggedInvalidRequest } from "../errors-tagged.js";
16+
17+
import {
18+
buildQuery,
19+
type EventRow,
20+
projectIndexedFields,
21+
rowToEnvelope,
22+
} from "./eventlog-query.js";
23+
import { SCHEMA_SQL } from "./schema.js";
24+
import type { EventLogFilter } from "./types.js";
25+
26+
type DatabaseInstance = InstanceType<typeof Database>;
27+
type Statement = ReturnType<DatabaseInstance["prepare"]>;
28+
29+
/** Prepared statements shared by every operation on a given DB handle. */
30+
interface EventLogStmts {
31+
readonly insert: Statement;
32+
readonly readSinceId: Statement;
33+
readonly readSinceSeq: Statement;
34+
readonly count: Statement;
35+
readonly getById: Statement;
36+
}
37+
38+
function prepareStmts(db: DatabaseInstance): EventLogStmts {
39+
return {
40+
insert: db.prepare(
41+
`INSERT OR IGNORE INTO events (
42+
session_id, id, type, trace_id, job_id, event_seq, raw
43+
) VALUES (
44+
@session_id, @id, @type, @trace_id, @job_id, @event_seq, @raw
45+
)`,
46+
),
47+
readSinceId: db.prepare(
48+
`SELECT * FROM events
49+
WHERE session_id = @session_id AND id > @after_id
50+
ORDER BY id ASC
51+
LIMIT @limit`,
52+
),
53+
readSinceSeq: db.prepare(
54+
`SELECT * FROM events
55+
WHERE session_id = @session_id
56+
AND event_seq IS NOT NULL
57+
AND event_seq > @after_event_seq
58+
ORDER BY event_seq ASC
59+
LIMIT @limit`,
60+
),
61+
count: db.prepare(
62+
`SELECT COUNT(*) AS n FROM events WHERE session_id = COALESCE(@session_id, session_id)`,
63+
),
64+
getById: db.prepare(
65+
`SELECT * FROM events WHERE session_id = @session_id AND id = @id`,
66+
),
67+
};
68+
}
69+
70+
function assertSessionId(env: BaseEnvelope): void {
71+
if (env.session_id === undefined || env.session_id === "") {
72+
throw new TaggedInvalidRequest({
73+
message: "EventLog.append requires session_id on the envelope",
74+
details: { id: env.id, type: env.type },
75+
});
76+
}
77+
}
78+
79+
/**
80+
* Effect surface of the SQLite event log. See {@link eventLogLayer} for the
81+
* factory consumers should use to bind a service instance to a concrete
82+
* `better-sqlite3` handle.
83+
*/
84+
export interface EventLogEffect {
85+
/** Append a single envelope. Resolves to `true` if a new row was inserted. */
86+
readonly append: (env: BaseEnvelope) => Effect.Effect<boolean>;
87+
/** Append many envelopes inside a single SQLite transaction. */
88+
readonly appendBatch: (envs: readonly BaseEnvelope[]) => Effect.Effect<number>;
89+
/**
90+
* Stream envelopes for `sessionId` with `event_seq` strictly greater than
91+
* `afterEventSeq`. The underlying SQLite iterator is consumed lazily inside
92+
* the stream — rows are not buffered into an array up front.
93+
*/
94+
readonly replay: (
95+
sessionId: string,
96+
afterEventSeq: number,
97+
) => Stream.Stream<BaseEnvelope>;
98+
/** Diagnostic helper: envelopes ordered by `id` after `afterId`. */
99+
readonly readSince: (
100+
sessionId: string,
101+
afterId?: string,
102+
limit?: number,
103+
) => Effect.Effect<readonly BaseEnvelope[]>;
104+
/** Eager replay (compatibility with legacy `readSinceSeq`). */
105+
readonly readSinceSeq: (
106+
sessionId: string,
107+
afterEventSeq: number,
108+
limit?: number,
109+
) => Effect.Effect<readonly BaseEnvelope[]>;
110+
/** Count rows; pass `undefined` for the whole DB. */
111+
readonly count: (sessionId?: string) => Effect.Effect<number>;
112+
/** Single-row lookup by `(session_id, id)`. */
113+
readonly getById: (
114+
sessionId: string,
115+
id: string,
116+
) => Effect.Effect<BaseEnvelope | null>;
117+
/** Custom filter query — same semantics as the legacy `query` method. */
118+
readonly query: (
119+
filter: EventLogFilter,
120+
) => Effect.Effect<readonly BaseEnvelope[]>;
121+
}
122+
123+
/**
124+
* Effect.Service tag for the SQLite event log. Default implementation fails
125+
* fast — consumers must provide a real handle via {@link eventLogLayer}.
126+
*
127+
* @example
128+
* ```ts
129+
* import Database from "better-sqlite3";
130+
* const program = Effect.gen(function* () {
131+
* const log = yield* EventLogService;
132+
* yield* log.append(env);
133+
* return yield* Stream.runCollect(log.replay(sessionId, 0));
134+
* }).pipe(Effect.provide(eventLogLayer(new Database(":memory:"))));
135+
* ```
136+
*/
137+
const NOT_CONFIGURED = new TaggedInvalidRequest({
138+
message: "EventLogService requires a Database — use eventLogLayer",
139+
});
140+
141+
const unconfiguredOps: EventLogEffect = {
142+
append: (_env) => Effect.die(NOT_CONFIGURED),
143+
appendBatch: (_envs) => Effect.die(NOT_CONFIGURED),
144+
replay: (_sessionId, _afterEventSeq) =>
145+
Stream.fail(NOT_CONFIGURED).pipe(Stream.orDie),
146+
readSince: (_sessionId, _afterId, _limit) => Effect.die(NOT_CONFIGURED),
147+
readSinceSeq: (_sessionId, _afterEventSeq, _limit) =>
148+
Effect.die(NOT_CONFIGURED),
149+
count: (_sessionId) => Effect.die(NOT_CONFIGURED),
150+
getById: (_sessionId, _id) => Effect.die(NOT_CONFIGURED),
151+
query: (_filter) => Effect.die(NOT_CONFIGURED),
152+
};
153+
154+
export class EventLogService extends Effect.Service<EventLogService>()(
155+
"arcp/EventLogService",
156+
{ succeed: unconfiguredOps },
157+
) {}
158+
159+
function makeAppend(stmts: EventLogStmts) {
160+
return (env: BaseEnvelope): Effect.Effect<boolean> =>
161+
Effect.sync(() => {
162+
assertSessionId(env);
163+
const result = stmts.insert.run(projectIndexedFields(env));
164+
return result.changes === 1;
165+
});
166+
}
167+
168+
function makeAppendBatch(db: DatabaseInstance, stmts: EventLogStmts) {
169+
const tx = db.transaction((rows: readonly BaseEnvelope[]) => {
170+
let inserted = 0;
171+
for (const env of rows) {
172+
assertSessionId(env);
173+
const result = stmts.insert.run(projectIndexedFields(env));
174+
if (result.changes === 1) inserted += 1;
175+
}
176+
return inserted;
177+
});
178+
return (envs: readonly BaseEnvelope[]): Effect.Effect<number> =>
179+
Effect.sync(() => tx(envs));
180+
}
181+
182+
function makeReplay(stmts: EventLogStmts) {
183+
return (
184+
sessionId: string,
185+
afterEventSeq: number,
186+
): Stream.Stream<BaseEnvelope> => {
187+
const rows = Effect.sync(
188+
() =>
189+
stmts.readSinceSeq.iterate({
190+
session_id: sessionId,
191+
after_event_seq: afterEventSeq,
192+
limit: Number.MAX_SAFE_INTEGER,
193+
}) as IterableIterator<EventRow>,
194+
);
195+
return Stream.fromIterableEffect(rows).pipe(Stream.map(rowToEnvelope));
196+
};
197+
}
198+
199+
function makeReadSince(stmts: EventLogStmts) {
200+
return (
201+
sessionId: string,
202+
afterId = "",
203+
limit = 1000,
204+
): Effect.Effect<readonly BaseEnvelope[]> =>
205+
Effect.sync(() => {
206+
const rows = stmts.readSinceId.all({
207+
session_id: sessionId,
208+
after_id: afterId,
209+
limit,
210+
}) as EventRow[];
211+
return rows.map(rowToEnvelope);
212+
});
213+
}
214+
215+
function makeReadSinceSeq(stmts: EventLogStmts) {
216+
return (
217+
sessionId: string,
218+
afterEventSeq: number,
219+
limit = 10_000,
220+
): Effect.Effect<readonly BaseEnvelope[]> =>
221+
Effect.sync(() => {
222+
const rows = stmts.readSinceSeq.all({
223+
session_id: sessionId,
224+
after_event_seq: afterEventSeq,
225+
limit,
226+
}) as EventRow[];
227+
return rows.map(rowToEnvelope);
228+
});
229+
}
230+
231+
function makeCount(stmts: EventLogStmts) {
232+
return (sessionId?: string): Effect.Effect<number> =>
233+
Effect.sync(() => {
234+
const row = stmts.count.get({ session_id: sessionId ?? null }) as {
235+
n: number;
236+
};
237+
return row.n;
238+
});
239+
}
240+
241+
function makeGetById(stmts: EventLogStmts) {
242+
return (
243+
sessionId: string,
244+
id: string,
245+
): Effect.Effect<BaseEnvelope | null> =>
246+
Effect.sync(() => {
247+
const row = stmts.getById.get({ session_id: sessionId, id }) as
248+
| EventRow
249+
| undefined;
250+
return row === undefined ? null : rowToEnvelope(row);
251+
});
252+
}
253+
254+
function makeQuery(db: DatabaseInstance) {
255+
return (filter: EventLogFilter): Effect.Effect<readonly BaseEnvelope[]> =>
256+
Effect.sync(() => {
257+
const built = buildQuery(filter);
258+
const rows = db.prepare(built.sql).all(built.params) as EventRow[];
259+
return rows.map(rowToEnvelope);
260+
});
261+
}
262+
263+
/** Build an {@link EventLogEffect} bound to a pre-opened `better-sqlite3` handle. */
264+
function makeEventLogOps(db: DatabaseInstance): EventLogEffect {
265+
db.exec(SCHEMA_SQL);
266+
const stmts = prepareStmts(db);
267+
return {
268+
append: makeAppend(stmts),
269+
appendBatch: makeAppendBatch(db, stmts),
270+
replay: makeReplay(stmts),
271+
readSince: makeReadSince(stmts),
272+
readSinceSeq: makeReadSinceSeq(stmts),
273+
count: makeCount(stmts),
274+
getById: makeGetById(stmts),
275+
query: makeQuery(db),
276+
};
277+
}
278+
279+
/**
280+
* Construct a {@link EventLogService} Layer backed by an externally provided
281+
* `better-sqlite3` handle. Lifecycle (open/close) is the caller's concern —
282+
* this mirrors the legacy `new EventLog({ db })` contract and lets tests
283+
* share an in-memory database between the legacy class and the service.
284+
*/
285+
export function eventLogLayer(
286+
db: DatabaseInstance,
287+
): Layer.Layer<EventLogService> {
288+
return Layer.succeed(
289+
EventLogService,
290+
EventLogService.make(makeEventLogOps(db)),
291+
);
292+
}

packages/core/src/store/eventlog.ts

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,6 @@
11
// EventLog wraps better-sqlite3 (sync) but exposes an async API so consumers
22
// can swap in a network-backed event log without changing call sites.
33
/* eslint-disable @typescript-eslint/require-await */
4-
import { readFileSync } from "node:fs";
5-
import { fileURLToPath } from "node:url";
6-
74
import Database from "better-sqlite3";
85
import type { z } from "zod";
96

@@ -17,13 +14,11 @@ import {
1714
projectIndexedFields,
1815
rowToEnvelope,
1916
} from "./eventlog-query.js";
17+
import { SCHEMA_SQL } from "./schema.js";
2018
import type { EventLogFilter, EventLogOptions } from "./types.js";
2119

2220
type DatabaseInstance = InstanceType<typeof Database>;
2321

24-
const SCHEMA_PATH = fileURLToPath(new URL("schema.sql", import.meta.url));
25-
const SCHEMA_SQL = readFileSync(SCHEMA_PATH, "utf8");
26-
2722
// ARCP v1.0 event-log indexed columns: session_id, id, type, trace_id,
2823
// job_id, event_seq. Replay is by (session_id, event_seq).
2924

packages/core/src/store/index.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,9 @@ export {
33
EventRowEnvelopeSchema,
44
type ParsedRowEnvelope,
55
} from "./eventlog.js";
6+
export {
7+
type EventLogEffect,
8+
eventLogLayer,
9+
EventLogService,
10+
} from "./eventlog-service.js";
611
export type { EventLogFilter, EventLogOptions } from "./types.js";

packages/core/src/store/schema.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
// Loads `schema.sql` once at module init so both the legacy `EventLog` class
2+
// and the Effect-shaped `EventLogService` apply the same DDL.
3+
import { readFileSync } from "node:fs";
4+
import { fileURLToPath } from "node:url";
5+
6+
const SCHEMA_PATH = fileURLToPath(new URL("schema.sql", import.meta.url));
7+
8+
/** The ARCP v1.0 event-log DDL. Idempotent (`CREATE TABLE IF NOT EXISTS`). */
9+
export const SCHEMA_SQL: string = readFileSync(SCHEMA_PATH, "utf8");

0 commit comments

Comments
 (0)