Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions crates/storage-postgres/src/data/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,28 @@ impl PostgresEngine {
.map_err(|e| StorageError::Internal(e.to_string()))?;
}

// Index on base table key columns for delete_index_row_multi lookups.
{
let mut base_key_cols = vec!["base_pk".to_owned()];
for (i, &(_, sk_type)) in base_sks.iter().enumerate() {
let col = if i == 0 {
format!("base_{}", sk_column(sk_type))
} else {
format!("base_{}", sk_column_n(i, sk_type))
};
base_key_cols.push(col);
}
let idx_name = format!("_ddb_{index_id}_base_key_idx");
let base_key_idx = format!(
"CREATE INDEX \"{idx_name}\" ON {idx_table} ({})",
base_key_cols.join(", ")
);
sqlx::query(&base_key_idx)
.execute(&mut **tx)
.await
.map_err(|e| StorageError::Internal(e.to_string()))?;
}

Ok(())
}

Expand Down
68 changes: 68 additions & 0 deletions crates/storage-postgres/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,69 @@ impl PostgresEngine {
pub fn data_pool(&self) -> &PgPool {
&self.data_pool
}

/// Ensure every GSI/LSI table has an index on its base table key columns.
///
/// This index is required for efficient deletes during GSI propagation.
/// Without it, `delete_index_row_multi` performs a sequential scan.
/// Runs at startup using `CREATE INDEX IF NOT EXISTS` (idempotent).
/// Uses `CONCURRENTLY` to avoid blocking concurrent writes on large tables.
pub async fn ensure_gsi_base_key_indexes(&self) -> Result<(), StorageError> {
use extenddb_core::types::{AttributeDefinition, KeySchemaElement};
use extenddb_storage::util::{sk_column, sk_column_n};

// Query the catalog for all indexes and their base table key schema.
let rows: Vec<(String, serde_json::Value, serde_json::Value)> = sqlx::query_as(
"SELECT i.index_id, t.key_schema, t.attribute_definitions \
FROM indexes i \
JOIN tables t ON i.table_id = t.table_id",
)
.fetch_all(&self.pool)
.await
.map_err(|e| StorageError::Internal(e.to_string()))?;

for (index_id, ks_json, ad_json) in rows {
let base_ks: Vec<KeySchemaElement> = serde_json::from_value(ks_json)
.map_err(|e| StorageError::Internal(e.to_string()))?;
let attr_defs: Vec<AttributeDefinition> = serde_json::from_value(ad_json)
.map_err(|e| StorageError::Internal(e.to_string()))?;

let base_sks = data::all_sort_key_info(&base_ks, &attr_defs);
let idx_table = data::index_table_name(&index_id);
// Strip quotes for the index name (idx_table is quoted like "_ddb_<id>")
let bare_table = format!("_ddb_{index_id}");
let idx_name = format!("{bare_table}_base_key_idx");

let mut base_key_cols = vec!["base_pk".to_owned()];
for (i, &(_, sk_type)) in base_sks.iter().enumerate() {
let col = if i == 0 {
format!("base_{}", sk_column(sk_type))
} else {
format!("base_{}", sk_column_n(i, sk_type))
};
base_key_cols.push(col);
}

let sql = format!(
"CREATE INDEX CONCURRENTLY IF NOT EXISTS \"{}\" ON {} ({})",
idx_name,
idx_table,
base_key_cols.join(", ")
);

// CONCURRENTLY cannot run inside a transaction, so use the pool directly.
sqlx::query(&sql)
.execute(&self.data_pool)
.await
.map_err(|e| {
StorageError::Internal(format!(
"Failed to create base key index on {idx_table}: {e}"
))
})?;
}

Ok(())
}
}

// ============================================================================
Expand Down Expand Up @@ -436,6 +499,11 @@ inventory::submit! {
_ => BackendError::InitializationFailed(e.to_string()),
})?;

// Ensure GSI base key indexes exist (idempotent, non-blocking).
if let Err(e) = engine.ensure_gsi_base_key_indexes().await {
tracing::error!("Failed to ensure GSI base key indexes: {e}");
}

// Recover control plane transitions (ignore errors)
match engine.process_control_plane_transitions().await {
Ok(ref t) if t.is_empty() => {}
Expand Down
Loading