Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions crates/storage-postgres/data_migrations/002_gsi_pending.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
-- Copyright 2026 ExtendDB contributors
-- SPDX-License-Identifier: Apache-2.0
-- Persistent queue for async GSI propagation.
-- Inserted atomically within the base write transaction, consumed by
-- background workers. Survives process crash/restart.

CREATE TABLE IF NOT EXISTS gsi_pending (
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't recall where it's buried, but somewhere there is a catalog version identifier that should be updated when the metadata/system table schema is updated, so that extenddb migrate will know that there is a migration to perform. I believe it's in storage-postgres somewhere.

id BIGSERIAL PRIMARY KEY,
table_id TEXT NOT NULL,
old_item JSONB,
new_item JSONB,
ready_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE INDEX IF NOT EXISTS idx_gsi_pending_ready
ON gsi_pending (ready_at);
57 changes: 26 additions & 31 deletions crates/storage-postgres/src/data/delete_item.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ use extenddb_storage::StreamCapture;
use extenddb_storage::error::StorageError;
use extenddb_storage::util::{SortKeyValue, parse_sk, pk_to_text, sk_column, sk_info};

use super::index::{enqueue_async_indexes, fetch_indexes_for_table, pk_hash, sync_indexes};
use super::index::{fetch_indexes_for_table, has_async_indexes, sync_indexes};
use super::query::check_condition;
use super::tx_helpers::write_stream_record_in_tx;
use super::{data_table_name, json_to_item};
use crate::PostgresEngine;
use crate::gsi_queue::enqueue_gsi_pending;

impl PostgresEngine {
/// Implementation of `DataEngine::delete_item`.
Expand Down Expand Up @@ -155,26 +156,23 @@ impl PostgresEngine {
)
.await?;
}
tx.commit()
.await
.map_err(|e| StorageError::Internal(e.to_string()))?;

// Enqueue async GSI updates after commit (D-4).
if let Some(ref q) = self.gsi_queue {
enqueue_async_indexes(
q,
pk_hash(pk_text.as_ref()),
&key_info.account_id,
&key_info.table_name,
if has_async_indexes(&indexes, sys_delay) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could probably be called once at the beginning (right after indexes is populated).

enqueue_gsi_pending(
&mut tx,
&key_info.table_id,
&key_info.key_schema,
&key_info.attribute_definitions,
&indexes,
old_item_for_idx.as_ref(),
None,
sys_delay,
)
.await;
.await?;
}

tx.commit()
.await
.map_err(|e| StorageError::Internal(e.to_string()))?;

if let Some(ref q) = self.gsi_queue {
q.notify_workers();
}

if return_old {
Expand Down Expand Up @@ -293,26 +291,23 @@ impl PostgresEngine {
)
.await?;
}
tx.commit()
.await
.map_err(|e| StorageError::Internal(e.to_string()))?;

// Enqueue async GSI updates after commit (D-4).
if let Some(ref q) = self.gsi_queue {
enqueue_async_indexes(
q,
pk_hash(pk_text.as_ref()),
&key_info.account_id,
&key_info.table_name,
if has_async_indexes(&indexes, sys_delay) {
enqueue_gsi_pending(
&mut tx,
&key_info.table_id,
&key_info.key_schema,
&key_info.attribute_definitions,
&indexes,
old_item_for_idx.as_ref(),
None,
sys_delay,
)
.await;
.await?;
}

tx.commit()
.await
.map_err(|e| StorageError::Internal(e.to_string()))?;

if let Some(ref q) = self.gsi_queue {
q.notify_workers();
}

if return_old {
Expand Down
60 changes: 9 additions & 51 deletions crates/storage-postgres/src/data/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use super::{all_sort_key_info, index_table_name};

/// Metadata for a single index, used during write-path GSI/LSI sync.
pub(crate) struct IndexMeta {
pub(super) index_name: String,
pub(super) _index_name: String,
pub(super) index_id: String,
pub(super) index_type: String,
pub(super) key_schema: Vec<KeySchemaElement>,
Expand Down Expand Up @@ -48,7 +48,7 @@ pub(crate) async fn fetch_indexes_for_table(
let projection: Projection = serde_json::from_value(proj_json)
.map_err(|e| StorageError::Internal(e.to_string()))?;
Ok(IndexMeta {
index_name: name,
_index_name: name,
index_id: id,
index_type: idx_type,
key_schema,
Expand Down Expand Up @@ -175,56 +175,14 @@ pub(crate) async fn sync_indexes(
Ok(())
}

/// Enqueue async GSI updates for indexes with non-zero propagation delay.
/// Check whether any indexes require async propagation.
///
/// Called after the base table transaction commits. The queue workers apply
/// the index updates after a random delay within the configured range.
#[allow(clippy::too_many_arguments)]
pub(crate) async fn enqueue_async_indexes(
gsi_queue: &crate::gsi_queue::GsiQueue,
pk_hash: u64,
account_id: &str,
table_name: &str,
table_id: &str,
base_key_schema: &[KeySchemaElement],
attr_defs: &[AttributeDefinition],
indexes: &[IndexMeta],
old_item: Option<&Item>,
new_item: Option<&Item>,
system_default_delay: u64,
) {
for idx in indexes {
let delay = effective_delay(idx, system_default_delay);
if delay == 0 {
continue; // Sync — already handled in transaction.
}
if idx.index_type == "LSI" {
continue; // LSIs are always synchronous — already handled in transaction.
}
gsi_queue
.enqueue(
pk_hash,
account_id,
table_name,
table_id,
base_key_schema,
attr_defs,
&idx.index_name,
&idx.index_id,
&idx.key_schema,
&idx.projection,
old_item,
new_item,
delay,
)
.await;
}
}

/// Compute a hash of the partition key text for queue partitioning.
/// Uses crc32 for stability across Rust versions (DefaultHasher is not stable).
pub(crate) fn pk_hash(pk_text: &str) -> u64 {
u64::from(crc32fast::hash(pk_text.as_bytes()))
/// Returns `true` if at least one GSI has a non-zero effective delay,
/// meaning the write should insert a row into `gsi_pending`.
pub(crate) fn has_async_indexes(indexes: &[IndexMeta], system_default_delay: u64) -> bool {
indexes
.iter()
.any(|idx| idx.index_type != "LSI" && effective_delay(idx, system_default_delay) != 0)
}

/// Delete a row from an index table using base table key columns.
Expand Down
59 changes: 28 additions & 31 deletions crates/storage-postgres/src/data/put_item.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ use extenddb_storage::StreamCapture;
use extenddb_storage::error::StorageError;
use extenddb_storage::util::{composite_pk_to_text, parse_sk, pk_to_text, sk_column, sk_info};

use super::index::{enqueue_async_indexes, fetch_indexes_for_table, pk_hash, sync_indexes};
use super::index::{fetch_indexes_for_table, has_async_indexes, sync_indexes};
use super::query::check_condition;
use super::tx_helpers::write_stream_record_in_tx;
use super::{data_table_name, json_to_item};
use crate::PostgresEngine;
use crate::gsi_queue::enqueue_gsi_pending;

impl PostgresEngine {
/// Implementation of `DataEngine::put_item`.
Expand Down Expand Up @@ -146,26 +147,24 @@ impl PostgresEngine {
)
.await?;
}
tx.commit()
.await
.map_err(|e| StorageError::Internal(e.to_string()))?;

// Enqueue async GSI updates after commit (D-4).
if let Some(ref q) = self.gsi_queue {
enqueue_async_indexes(
q,
pk_hash(pk_text.as_str()),
&key_info.account_id,
&key_info.table_name,
// Persist async GSI work inside the same transaction.
if has_async_indexes(&indexes, sys_delay) {
enqueue_gsi_pending(
&mut tx,
&key_info.table_id,
&key_info.key_schema,
&key_info.attribute_definitions,
&indexes,
old_item_for_idx.as_ref(),
Some(&item),
sys_delay,
)
.await;
.await?;
}

tx.commit()
.await
.map_err(|e| StorageError::Internal(e.to_string()))?;

if let Some(ref q) = self.gsi_queue {
q.notify_workers();
}

if return_old {
Expand Down Expand Up @@ -291,26 +290,24 @@ impl PostgresEngine {
)
.await?;
}
tx.commit()
.await
.map_err(|e| StorageError::Internal(e.to_string()))?;

// Enqueue async GSI updates after commit (D-4).
if let Some(ref q) = self.gsi_queue {
enqueue_async_indexes(
q,
pk_hash(pk_text.as_str()),
&key_info.account_id,
&key_info.table_name,
// Persist async GSI work inside the same transaction.
if has_async_indexes(&indexes, sys_delay) {
enqueue_gsi_pending(
&mut tx,
&key_info.table_id,
&key_info.key_schema,
&key_info.attribute_definitions,
&indexes,
old_item_for_idx.as_ref(),
Some(&item),
sys_delay,
)
.await;
.await?;
}

tx.commit()
.await
.map_err(|e| StorageError::Internal(e.to_string()))?;

if let Some(ref q) = self.gsi_queue {
q.notify_workers();
}

if return_old {
Expand Down
77 changes: 33 additions & 44 deletions crates/storage-postgres/src/data/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,15 @@ use extenddb_core::types::{
};
use extenddb_core::validation;
use extenddb_storage::error::StorageError;
use extenddb_storage::util::pk_to_text;
use extenddb_storage::{TransactGetOp, TransactWriteOp};

use super::index::{
IndexMeta, enqueue_async_indexes, fetch_indexes_for_table, pk_hash, sync_indexes,
};
use super::index::{IndexMeta, fetch_indexes_for_table, has_async_indexes, sync_indexes};
use super::tx_helpers::{
check_idempotency_token_in_tx, delete_item_in_tx, fetch_item_for_update, fetch_item_in_tx,
upsert_item_in_tx, write_stream_record_in_tx,
};
use crate::PostgresEngine;
use crate::gsi_queue::enqueue_gsi_pending;

impl PostgresEngine {
/// Implementation of `DataEngine::transact_get_items`.
Expand Down Expand Up @@ -164,50 +162,41 @@ impl PostgresEngine {
}
}

// Persist async GSI work inside the transaction.
let mut needs_notify = false;
for (op, (old_item, new_item)) in ops.iter().zip(op_items.iter()) {
let indexes = &table_indexes[transact_op_table_name(op)];
if !has_async_indexes(indexes, sys_delay) {
continue;
}
let key_info = match op {
TransactWriteOp::Put { key_info, .. }
| TransactWriteOp::Delete { key_info, .. }
| TransactWriteOp::Update { key_info, .. }
| TransactWriteOp::ConditionCheck { key_info, .. } => key_info,
};
// ConditionCheck — no index changes.
if old_item.is_none() && new_item.is_none() {
continue;
}
enqueue_gsi_pending(
&mut tx,
&key_info.table_id,
old_item.as_ref(),
new_item.as_ref(),
sys_delay,
)
.await?;
needs_notify = true;
}

tx.commit()
.await
.map_err(|e| StorageError::Internal(e.to_string()))?;

// D-4: Enqueue async GSI updates after commit using the old/new items
// collected during transaction execution (M-3 fix).
if let Some(ref q) = self.gsi_queue {
for (op, (old_item, new_item)) in ops.iter().zip(op_items.iter()) {
let indexes = &table_indexes[transact_op_table_name(op)];
if indexes.is_empty() {
continue;
}
let key_info = match op {
TransactWriteOp::Put { key_info, .. }
| TransactWriteOp::Delete { key_info, .. }
| TransactWriteOp::Update { key_info, .. }
| TransactWriteOp::ConditionCheck { key_info, .. } => key_info,
};
let pk_name = &key_info.key_schema[0].attribute_name;
// Derive pk_text from whichever item is available.
let pk_item = new_item.as_ref().or(old_item.as_ref());
let Some(pk_item) = pk_item else { continue }; // ConditionCheck — no index changes
let pk_value = match pk_item.get(pk_name) {
Some(v) => v,
None => continue,
};
let pk_text = match pk_to_text(pk_value) {
Ok(t) => t,
Err(_) => continue,
};
enqueue_async_indexes(
q,
pk_hash(&pk_text),
&key_info.account_id,
&key_info.table_name,
&key_info.table_id,
&key_info.key_schema,
&key_info.attribute_definitions,
indexes,
old_item.as_ref(),
new_item.as_ref(),
sys_delay,
)
.await;
if needs_notify {
if let Some(ref q) = self.gsi_queue {
q.notify_workers();
}
}

Expand Down
Loading
Loading