Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ target/
cortex.db
cortex.db-wal
cortex.db-shm
cortex.lock
cortex.pid
cortex.token
write_buffer.jsonl
.cortex/

# Logs
Expand Down
69 changes: 65 additions & 4 deletions daemon-rs/src/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ const FEEDBACK_AGGREGATION_DAYS: i64 = 60;
pub struct CompactionResult {
pub events_pruned: usize,
pub archived_text_stripped: usize,
pub expired_pruned: usize,
pub crystal_embeddings_pruned: usize,
pub feedback_aggregated: usize,
pub bytes_before: i64,
Expand All @@ -51,18 +52,22 @@ pub fn run_compaction(conn: &Connection) -> CompactionResult {
// 2. Archived entry text cleanup
result.archived_text_stripped = strip_archived_text(conn);

// 3. Crystal member embedding pruning
// 3. Hard-expiration cleanup
result.expired_pruned = prune_expired_entries(conn);

// 4. Crystal member embedding pruning
result.crystal_embeddings_pruned = prune_crystal_member_embeddings(conn);

// 4. Feedback aggregation
// 5. Feedback aggregation
result.feedback_aggregated = aggregate_old_feedback(conn);

// 5. Reclaim space
// 6. Reclaim space
let _ = conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);");
// VACUUM is expensive but reclaims space from deletions.
// Only run if we deleted a meaningful amount.
let total_deleted = result.events_pruned
+ result.archived_text_stripped
+ result.expired_pruned
+ result.crystal_embeddings_pruned
+ result.feedback_aggregated;
if total_deleted > 100 {
Expand All @@ -74,8 +79,9 @@ pub fn run_compaction(conn: &Connection) -> CompactionResult {
if total_deleted > 0 {
let saved_kb = (result.bytes_before - result.bytes_after) / 1024;
eprintln!(
"[compaction] Pruned: {} events, {} archived texts, {} crystal embeddings, {} feedback rows. Saved {}KB",
"[compaction] Pruned: {} events, {} archived texts, {} expired rows, {} crystal embeddings, {} feedback rows. Saved {}KB",
result.events_pruned, result.archived_text_stripped,
result.expired_pruned,
result.crystal_embeddings_pruned, result.feedback_aggregated,
saved_kb
);
Expand Down Expand Up @@ -125,6 +131,26 @@ fn strip_archived_text(conn: &Connection) -> usize {
count
}

fn prune_expired_entries(conn: &Connection) -> usize {
let mut count = 0usize;

count += conn
.execute(
"DELETE FROM memories WHERE expires_at IS NOT NULL AND expires_at < datetime('now')",
[],
)
.unwrap_or(0);

count += conn
.execute(
"DELETE FROM decisions WHERE expires_at IS NOT NULL AND expires_at < datetime('now')",
[],
)
.unwrap_or(0);

count
}

// ─── Crystal member embedding pruning ───────────────────────────────────────

/// Remove individual embeddings for entries that are members of a crystal.
Expand Down Expand Up @@ -261,6 +287,7 @@ mod tests {
let conn = Connection::open_in_memory().unwrap();
crate::db::configure(&conn).unwrap();
crate::db::initialize_schema(&conn).unwrap();
crate::db::run_pending_migrations(&conn);
crate::crystallize::migrate_crystal_tables(&conn);
conn
}
Expand Down Expand Up @@ -319,6 +346,7 @@ mod tests {
// Empty DB should compact cleanly
assert_eq!(result.events_pruned, 0);
assert_eq!(result.archived_text_stripped, 0);
assert_eq!(result.expired_pruned, 0);
}

#[test]
Expand All @@ -329,5 +357,38 @@ mod tests {
// All counts should be 0 for empty DB
assert!(breakdown.iter().all(|(_, count)| *count == 0));
}

#[test]
fn test_prune_expired_entries() {
let conn = setup();
conn.execute(
"INSERT INTO memories (text, source, status, expires_at) VALUES ('expired memory', 'ttl::mem', 'active', datetime('now', '-1 second'))",
[],
)
.unwrap();
conn.execute(
"INSERT INTO decisions (decision, context, status, expires_at) VALUES ('expired decision', 'ttl::dec', 'active', datetime('now', '-1 second'))",
[],
)
.unwrap();

let deleted = prune_expired_entries(&conn);
assert_eq!(deleted, 2);

let mem_count: i64 = conn
.query_row("SELECT COUNT(*) FROM memories WHERE source = 'ttl::mem'", [], |r| {
r.get(0)
})
.unwrap();
let dec_count: i64 = conn
.query_row(
"SELECT COUNT(*) FROM decisions WHERE context = 'ttl::dec'",
[],
|r| r.get(0),
)
.unwrap();
assert_eq!(mem_count, 0);
assert_eq!(dec_count, 0);
}
}

8 changes: 6 additions & 2 deletions daemon-rs/src/daemon_lifecycle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ use std::time::Duration;

use crate::auth::CortexPaths;

const RESPAWN_HEALTH_TIMEOUT_SECS: u64 = 90;

/// Check if the daemon responds to /health within 2s.
pub async fn daemon_healthy(port: u16) -> bool {
let client = match reqwest::Client::builder()
Expand Down Expand Up @@ -91,12 +93,14 @@ pub async fn try_respawn(paths: &CortexPaths) -> bool {
return false;
}

let healthy = wait_for_health(paths.port, Duration::from_secs(10)).await;
let healthy =
wait_for_health(paths.port, Duration::from_secs(RESPAWN_HEALTH_TIMEOUT_SECS)).await;
if healthy {
eprintln!("[cortex-lifecycle] Daemon respawned successfully on port {}", paths.port);
} else {
eprintln!(
"[cortex-lifecycle] Daemon did not become healthy within 10s after respawn"
"[cortex-lifecycle] Daemon did not become healthy within {}s after respawn",
RESPAWN_HEALTH_TIMEOUT_SECS
);
}
healthy
Expand Down
68 changes: 66 additions & 2 deletions daemon-rs/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
use std::collections::HashSet;
use std::path::Path;

use rusqlite::{params, Connection};
use rusqlite::{params, Connection, OptionalExtension};

/// Result of an auto-repair attempt.
#[derive(Debug)]
Expand Down Expand Up @@ -68,12 +68,13 @@ pub fn configure(conn: &Connection) -> rusqlite::Result<()> {

type MigrationDef = (&'static str, &'static str);

const SCHEMA_MIGRATIONS: [MigrationDef; 5] = [
const SCHEMA_MIGRATIONS: [MigrationDef; 6] = [
("001_initial_schema", "initial_schema"),
("002_aging_columns", "aging_columns"),
("003_focus_table", "focus_table"),
("004_crystal_tables", "crystal_tables"),
("005_quality_dedup_columns", "quality_dedup_columns"),
("006", "ttl_expiration"),
];

/// Return ordered schema migration definitions.
Expand Down Expand Up @@ -169,6 +170,19 @@ fn apply_migration(conn: &Connection, version: &str) -> rusqlite::Result<()> {
let _ = conn.execute("UPDATE decisions SET quality = 50 WHERE quality IS NULL", []);
Ok(())
}
"006" => {
ensure_column(
conn,
"memories",
"ALTER TABLE memories ADD COLUMN expires_at TEXT",
)?;
ensure_column(
conn,
"decisions",
"ALTER TABLE decisions ADD COLUMN expires_at TEXT",
)?;
Ok(())
}
other => Err(migration_error(format!("unknown schema migration: {other}"))),
}
}
Expand Down Expand Up @@ -976,6 +990,32 @@ pub fn rebuild_fts(conn: &Connection) -> rusqlite::Result<()> {
Ok(())
}

/// Seed FTS indexes at most once per database.
///
/// Uses a marker row in `schema_migrations` so startup does not rescan the
/// entire corpus on every daemon restart.
pub fn rebuild_fts_if_needed(conn: &Connection) -> rusqlite::Result<bool> {
let already_seeded = conn
.query_row(
"SELECT 1 FROM schema_migrations WHERE version = 'fts_seeded_v1' LIMIT 1",
[],
|row| row.get::<_, i64>(0),
)
.optional()?;

if already_seeded.is_some() {
return Ok(false);
}

rebuild_fts(conn)?;
conn.execute(
"INSERT OR IGNORE INTO schema_migrations (version, name, applied_at)
VALUES ('fts_seeded_v1', 'fts_seeded', datetime('now'))",
[],
)?;
Ok(true)
}

/// Run `PRAGMA integrity_check` and return `true` when the database reports
/// `ok`.
pub fn verify_integrity(conn: &Connection) -> rusqlite::Result<bool> {
Expand Down Expand Up @@ -1373,6 +1413,28 @@ mod tests {
assert_eq!(count2, 0);
}

#[test]
fn test_rebuild_fts_if_needed_rebuilds_once_for_empty_fts() {
let conn = Connection::open_in_memory().unwrap();
configure(&conn).unwrap();
initialize_schema(&conn).unwrap();

let rebuilt = rebuild_fts_if_needed(&conn).unwrap();
assert!(rebuilt, "first call should seed FTS marker");

let marker_rows: i64 = conn
.query_row(
"SELECT COUNT(*) FROM schema_migrations WHERE version = 'fts_seeded_v1'",
[],
|row| row.get(0),
)
.unwrap();
assert_eq!(marker_rows, 1, "expected FTS marker row to be persisted");

let rebuilt_again = rebuild_fts_if_needed(&conn).unwrap();
assert!(!rebuilt_again, "second call should skip when marker already exists");
}

#[test]
fn test_solo_schema_baseline_unchanged() {
let conn = Connection::open_in_memory().unwrap();
Expand Down Expand Up @@ -1568,8 +1630,10 @@ mod tests {

assert!(table_has_column(&conn, "memories", "merged_count"));
assert!(table_has_column(&conn, "memories", "quality"));
assert!(table_has_column(&conn, "memories", "expires_at"));
assert!(table_has_column(&conn, "decisions", "merged_count"));
assert!(table_has_column(&conn, "decisions", "quality"));
assert!(table_has_column(&conn, "decisions", "expires_at"));
assert!(table_exists(&conn, "focus_sessions"));
assert!(table_exists(&conn, "memory_clusters"));
assert!(table_exists(&conn, "cluster_members"));
Expand Down
11 changes: 7 additions & 4 deletions daemon-rs/src/handlers/boot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use axum::response::Response;
use serde::Deserialize;
use serde_json::json;

use super::{ensure_auth, json_response, now_iso};
use super::{ensure_auth_with_caller, json_response, now_iso};
use crate::compiler;
use crate::db::checkpoint_wal_best_effort;
use crate::state::RuntimeState;
Expand All @@ -26,9 +26,10 @@ pub async fn handle_boot(
Query(query): Query<BootQuery>,
headers: HeaderMap,
) -> Response {
if let Err(resp) = ensure_auth(&headers, &state) {
return resp;
}
let caller_id = match ensure_auth_with_caller(&headers, &state) {
Ok(id) => id,
Err(resp) => return resp,
};
let agent = query
.agent
.or_else(|| {
Expand All @@ -38,6 +39,8 @@ pub async fn handle_boot(
.map(|s| s.to_string())
})
.unwrap_or_else(|| "unknown".to_string());
super::register_agent_presence_from_headers(&state, &headers, caller_id).await;

let profile = query.profile.unwrap_or_else(|| "full".to_string());
let max_tokens = query.budget.unwrap_or(600);

Expand Down
30 changes: 14 additions & 16 deletions daemon-rs/src/handlers/conductor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,11 @@ fn parse_json_array(raw: &str) -> Value {
serde_json::from_str(raw).unwrap_or_else(|_| json!([]))
}

fn is_valid_agent_label(agent: &str) -> bool {
let trimmed = agent.trim();
!trimmed.is_empty() && trimmed.len() <= 160 && !trimmed.chars().any(|ch| ch.is_control())
}

fn parse_timestamp_ms(value: &str) -> i64 {
if value.trim().is_empty() {
return 0;
Expand Down Expand Up @@ -344,13 +349,13 @@ fn fetch_sessions(
{
(
"SELECT session_id, agent, project, files_json, description, started_at, last_heartbeat, expires_at
FROM sessions WHERE owner_id = ?1 ORDER BY started_at ASC",
FROM sessions WHERE owner_id = ?1 ORDER BY last_heartbeat DESC",
vec![Box::new(owner_id)],
)
} else {
(
"SELECT session_id, agent, project, files_json, description, started_at, last_heartbeat, expires_at
FROM sessions ORDER BY started_at ASC",
FROM sessions ORDER BY last_heartbeat DESC",
vec![],
)
};
Expand Down Expand Up @@ -941,6 +946,12 @@ pub async fn handle_session_start(
)
}
};
if !is_valid_agent_label(&agent) {
return json_response(
StatusCode::BAD_REQUEST,
json!({ "error": "Invalid agent label" }),
);
}

let ttl = body.ttl.unwrap_or(SESSION_TTL_SECONDS).max(1);
let owner_id = owner_id_from_state(&state);
Expand Down Expand Up @@ -1029,25 +1040,12 @@ pub async fn handle_session_heartbeat(
}

let agent = body.agent.unwrap_or_default().trim().to_string();
if agent.is_empty() {
if !is_valid_agent_label(&agent) {
return json_response(
StatusCode::BAD_REQUEST,
json!({ "error": "Missing or invalid required field: agent" }),
);
}
if agent.len() > 100 {
return json_response(
StatusCode::BAD_REQUEST,
json!({ "error": "Invalid agent: name too long (max 100 chars)" }),
);
}
let agent_re = Regex::new(r"^[a-zA-Z0-9_-]+$").unwrap();
if !agent_re.is_match(&agent) {
return json_response(
StatusCode::BAD_REQUEST,
json!({ "error": "Invalid agent: name contains invalid characters (use alphanumeric, underscore, hyphen only)" }),
);
}

let owner_id = owner_id_from_state(&state);
let conn = state.db.lock().await;
Expand Down
Loading
Loading