Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
213 changes: 212 additions & 1 deletion daemon-rs/src/db.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
// SPDX-License-Identifier: MIT
use std::collections::HashSet;
use std::path::Path;

use rusqlite::{params, Connection};
Expand Down Expand Up @@ -65,6 +66,176 @@ pub fn configure(conn: &Connection) -> rusqlite::Result<()> {
Ok(())
}

type MigrationDef = (&'static str, &'static str);

const SCHEMA_MIGRATIONS: [MigrationDef; 5] = [
("001_initial_schema", "initial_schema"),
("002_aging_columns", "aging_columns"),
("003_focus_table", "focus_table"),
("004_crystal_tables", "crystal_tables"),
("005_quality_dedup_columns", "quality_dedup_columns"),
];

/// Return ordered schema migration definitions.
pub fn migration_definitions() -> &'static [MigrationDef] {
&SCHEMA_MIGRATIONS
}

/// Ensure schema migration tracking table exists.
pub fn ensure_schema_migrations_table(conn: &Connection) -> rusqlite::Result<()> {
conn.execute_batch(
r#"
CREATE TABLE IF NOT EXISTS schema_migrations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
version TEXT NOT NULL UNIQUE,
name TEXT NOT NULL,
applied_at TEXT NOT NULL DEFAULT (datetime('now'))
);
"#,
)?;
Ok(())
}

fn migration_error(msg: impl Into<String>) -> rusqlite::Error {
rusqlite::Error::InvalidParameterName(msg.into())
}

fn apply_migration(conn: &Connection, version: &str) -> rusqlite::Result<()> {
match version {
// Baseline marker for pre-versioned schemas.
"001_initial_schema" => Ok(()),
"002_aging_columns" => {
migrate_aging_columns(conn);
if table_has_column(conn, "memories", "compressed_text")
&& table_has_column(conn, "memories", "age_tier")
&& table_has_column(conn, "decisions", "compressed_text")
&& table_has_column(conn, "decisions", "age_tier")
{
Ok(())
} else {
Err(migration_error(
"aging migration did not create expected columns",
))
}
}
"003_focus_table" => {
migrate_focus_table(conn);
if table_exists(conn, "focus_sessions") {
Ok(())
} else {
Err(migration_error("focus table migration did not create focus_sessions"))
}
}
"004_crystal_tables" => {
crate::crystallize::migrate_crystal_tables(conn);
if table_exists(conn, "memory_clusters") && table_exists(conn, "cluster_members") {
Ok(())
} else {
Err(migration_error(
"crystal migration did not create memory_clusters/cluster_members",
))
}
}
"005_quality_dedup_columns" => {
ensure_column(
conn,
"memories",
"ALTER TABLE memories ADD COLUMN merged_count INTEGER DEFAULT 0",
)?;
ensure_column(
conn,
"memories",
"ALTER TABLE memories ADD COLUMN quality INTEGER DEFAULT 50",
)?;
ensure_column(
conn,
"decisions",
"ALTER TABLE decisions ADD COLUMN merged_count INTEGER DEFAULT 0",
)?;
ensure_column(
conn,
"decisions",
"ALTER TABLE decisions ADD COLUMN quality INTEGER DEFAULT 50",
)?;
let _ = conn.execute(
"UPDATE memories SET merged_count = 0 WHERE merged_count IS NULL",
[],
);
let _ = conn.execute("UPDATE memories SET quality = 50 WHERE quality IS NULL", []);
let _ = conn.execute(
"UPDATE decisions SET merged_count = 0 WHERE merged_count IS NULL",
[],
);
let _ = conn.execute("UPDATE decisions SET quality = 50 WHERE quality IS NULL", []);
Ok(())
}
other => Err(migration_error(format!("unknown schema migration: {other}"))),
}
}

/// Return already-applied migration versions.
pub fn applied_migration_versions(conn: &Connection) -> rusqlite::Result<Vec<String>> {
ensure_schema_migrations_table(conn)?;
let mut stmt =
conn.prepare("SELECT version FROM schema_migrations ORDER BY id ASC, version ASC")?;
let rows = stmt.query_map([], |row| row.get::<_, String>(0))?;
Ok(rows.filter_map(|r| r.ok()).collect())
}

/// Return pending migration versions in execution order.
pub fn pending_migration_versions(conn: &Connection) -> rusqlite::Result<Vec<String>> {
let applied: HashSet<String> = applied_migration_versions(conn)?.into_iter().collect();
let mut pending = Vec::new();
for (version, _) in migration_definitions() {
if !applied.contains(*version) {
pending.push((*version).to_string());
}
}
Ok(pending)
}

/// Execute pending schema migrations in-order and record each in
/// `schema_migrations`. Returns the number of newly-applied migrations.
pub fn run_pending_migrations(conn: &Connection) -> usize {
if let Err(e) = ensure_schema_migrations_table(conn) {
eprintln!("[db] schema migration setup failed: {e}");
return 0;
}

let mut applied_set: HashSet<String> = match applied_migration_versions(conn) {
Ok(v) => v.into_iter().collect(),
Err(e) => {
eprintln!("[db] failed to read applied migrations: {e}");
return 0;
}
};

let mut applied_count = 0usize;
for (version, name) in migration_definitions() {
if applied_set.contains(*version) {
continue;
}

if let Err(e) = apply_migration(conn, version) {
eprintln!("[db] migration {version} ({name}) failed: {e}");
break;
}

if let Err(e) = conn.execute(
"INSERT INTO schema_migrations (version, name) VALUES (?1, ?2)",
params![version, name],
) {
eprintln!("[db] failed to record migration {version} ({name}): {e}");
break;
}

applied_set.insert((*version).to_string());
applied_count += 1;
}

applied_count
}

/// Create all 12 application tables and supporting indexes if they do not
/// already exist.
pub fn initialize_schema(conn: &Connection) -> rusqlite::Result<()> {
Expand Down Expand Up @@ -231,6 +402,13 @@ pub fn initialize_schema(conn: &Connection) -> rusqlite::Result<()> {
hits INTEGER DEFAULT 0
);

CREATE TABLE IF NOT EXISTS schema_migrations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
version TEXT NOT NULL UNIQUE,
name TEXT NOT NULL,
applied_at TEXT NOT NULL DEFAULT (datetime('now'))
);

-- FTS5 full-text search indexes (trigram tokenizer for code/identifier matching)
CREATE VIRTUAL TABLE IF NOT EXISTS memories_fts USING fts5(
text, source, tags,
Expand Down Expand Up @@ -706,7 +884,7 @@ fn ensure_column(conn: &Connection, table: &str, alter_sql: &str) -> rusqlite::R
}
}

fn table_exists(conn: &Connection, table: &str) -> bool {
pub fn table_exists(conn: &Connection, table: &str) -> bool {
conn.query_row(
"SELECT 1 FROM sqlite_master WHERE type='table' AND name = ?1 LIMIT 1",
params![table],
Expand Down Expand Up @@ -1216,6 +1394,7 @@ mod tests {
"feed",
"feed_acks",
"context_cache",
"schema_migrations",
"memories_fts",
"decisions_fts",
"recall_feedback",
Expand Down Expand Up @@ -1364,6 +1543,38 @@ mod tests {
}
}

#[test]
fn test_run_pending_migrations_applies_all_once() {
let conn = Connection::open_in_memory().unwrap();
configure(&conn).unwrap();
initialize_schema(&conn).unwrap();

let first_applied = run_pending_migrations(&conn);
assert_eq!(first_applied, migration_definitions().len());

let second_applied = run_pending_migrations(&conn);
assert_eq!(second_applied, 0);

let pending = pending_migration_versions(&conn).unwrap();
assert!(
pending.is_empty(),
"no pending migrations expected after first run"
);

let recorded: i64 = conn
.query_row("SELECT COUNT(*) FROM schema_migrations", [], |row| row.get(0))
.unwrap();
assert_eq!(recorded as usize, migration_definitions().len());

assert!(table_has_column(&conn, "memories", "merged_count"));
assert!(table_has_column(&conn, "memories", "quality"));
assert!(table_has_column(&conn, "decisions", "merged_count"));
assert!(table_has_column(&conn, "decisions", "quality"));
assert!(table_exists(&conn, "focus_sessions"));
assert!(table_exists(&conn, "memory_clusters"));
assert!(table_exists(&conn, "cluster_members"));
}

#[test]
fn test_team_migration_creates_owner_scoped_schema() {
let conn = Connection::open_in_memory().unwrap();
Expand Down
Loading
Loading