Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 0 additions & 111 deletions packages/engine/migrate/bootstrap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import { SQL, semver } from "bun";
export async function bootstrap(sql: SQL): Promise<void> {
await ensurePrerequisites(sql);
await ensureRoles(sql);
await ensureEmbeddingSchema(sql);
}

async function ensurePrerequisites(sql: SQL): Promise<void> {
Expand Down Expand Up @@ -94,113 +93,3 @@ async function ensureRoles(sql: SQL): Promise<void> {
$block$;
`);
}

async function ensureEmbeddingSchema(sql: SQL): Promise<void> {
await sql.unsafe(`create schema if not exists embedding`);

await sql.unsafe(`
create table if not exists embedding.queue
( id bigint generated always as identity primary key
, schema_name text not null
, memory_id uuid not null
, embedding_version int not null
, vt timestamptz not null default now()
, outcome text check (outcome is null or outcome in ('completed', 'failed', 'cancelled'))
, attempts int not null default 0
, max_attempts int not null default 3
, last_error text
, created_at timestamptz not null default now()
)
`);

await sql.unsafe(`
create table if not exists embedding.queue_hist
( id bigint primary key
, schema_name text not null
, memory_id uuid not null
, embedding_version int not null
, vt timestamptz not null
, outcome text
, attempts int not null
, max_attempts int not null
, last_error text
, created_at timestamptz not null
)
`);

await sql.unsafe(`
create index if not exists embedding_queue_claim_idx
on embedding.queue (vt)
where outcome is null;

create index if not exists embedding_queue_memory_idx
on embedding.queue (schema_name, memory_id, embedding_version desc)
where outcome is null;

create index if not exists embedding_queue_archive_idx
on embedding.queue (created_at)
where outcome is not null;
`);

// claim_batch stub function
await sql.unsafe(`
create or replace function embedding.claim_batch(
p_batch_size int default 10,
p_visibility_timeout interval default '30 seconds'
)
returns table (
id bigint,
schema_name text,
memory_id uuid,
embedding_version int,
attempts int
)
language plpgsql
security definer
set search_path to pg_catalog, embedding, pg_temp
as $func$
begin
-- stub: will be replaced by worker package with full implementation
return query
with claimed as (
select q.id
from embedding.queue q
where q.outcome is null
and q.vt <= now()
and q.attempts < q.max_attempts
order by q.vt
limit p_batch_size
for update skip locked
)
update embedding.queue q
set vt = now() + p_visibility_timeout,
attempts = q.attempts + 1
from claimed c
where q.id = c.id
returning q.id, q.schema_name, q.memory_id, q.embedding_version, q.attempts;
end;
$func$;
`);

// Shared trigger function — each engine's triggers pass their schema name as TG_ARGV[0]
await sql.unsafe(`
create or replace function embedding.enqueue_embedding()
returns trigger
language plpgsql volatile security definer
set search_path to pg_catalog, embedding, pg_temp
as $func$
begin
insert into embedding.queue (schema_name, memory_id, embedding_version)
values (TG_ARGV[0], new.id, new.embedding_version);
return new;
end;
$func$;
`);

// Grants — only me_embed needs access; enqueue_embedding() is security definer
await sql.unsafe(`
grant usage on schema embedding to me_embed;
grant select, update on embedding.queue to me_embed;
grant execute on function embedding.claim_batch(int, interval) to me_embed;
`);
}
Loading