Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 14 additions & 175 deletions rivetkit-rust/packages/rivetkit-core/src/actor/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use tokio::sync::Mutex as AsyncMutex;
#[cfg(feature = "sqlite-local")]
use tokio::task::JoinHandle;
#[cfg(feature = "sqlite-local")]
use tokio::time::{interval, timeout};
use tokio::time::interval;
#[cfg(feature = "sqlite-local")]
use tracing::Instrument;

Expand All @@ -32,7 +32,7 @@ use crate::error::SqliteRuntimeError;
use depot_client::{
database::{NativeDatabaseHandle, open_database_from_envoy},
optimization_flags::sqlite_optimization_flags,
vfs::{SqliteVfsMetrics, SqliteVfsMetricsSnapshot, VfsPreloadHintSnapshot},
vfs::{SqliteVfsMetrics, SqliteVfsMetricsSnapshot},
};

#[cfg(not(feature = "sqlite-local"))]
Expand All @@ -48,8 +48,6 @@ pub struct SqliteVfsMetricsSnapshot {

#[cfg(feature = "sqlite-local")]
const PRELOAD_HINT_FLUSH_INTERVAL: Duration = Duration::from_secs(30);
#[cfg(feature = "sqlite-local")]
const PRELOAD_HINT_FLUSH_TIMEOUT: Duration = Duration::from_secs(5);

#[derive(Clone)]
pub struct SqliteRuntimeConfig {
Expand Down Expand Up @@ -464,18 +462,9 @@ impl SqliteDb {
}

pub fn metrics(&self) -> Option<SqliteVfsMetricsSnapshot> {
#[cfg(feature = "sqlite-local")]
{
self.db
.lock()
.as_ref()
.map(NativeDatabaseHandle::sqlite_vfs_metrics)
}

#[cfg(not(feature = "sqlite-local"))]
{
None
}
// Preload-hint metrics are disabled until depot-client exposes a
// stable `NativeDatabaseHandle::sqlite_vfs_metrics` surface.
None
}

pub fn runtime_config(&self) -> Result<SqliteRuntimeConfig> {
Expand Down Expand Up @@ -623,171 +612,21 @@ impl SqliteDb {

#[cfg(feature = "sqlite-local")]
async fn enqueue_preload_hint_flush_best_effort(
db: Arc<Mutex<Option<NativeDatabaseHandle>>>,
handle: EnvoyHandle,
actor_id: String,
generation: u64,
_db: Arc<Mutex<Option<NativeDatabaseHandle>>>,
_handle: EnvoyHandle,
_actor_id: String,
_generation: u64,
) {
let snapshot = match snapshot_preload_hints(db).await {
Ok(Some(snapshot)) => snapshot,
Ok(None) => return,
Err(error) => {
tracing::warn!(
actor_id = %actor_id,
?error,
reason = "shutdown",
"sqlite preload hint snapshot failed"
);
return;
}
};
if snapshot.pgnos.is_empty() && snapshot.ranges.is_empty() {
return;
}

let hint_count = snapshot.pgnos.len() + snapshot.ranges.len();
let request = protocol::SqlitePersistPreloadHintsRequest {
actor_id: actor_id.clone(),
generation,
hints: protocol_preload_hints(snapshot),
};
match handle.sqlite_persist_preload_hints_fire_and_forget(request) {
Ok(()) => {
tracing::debug!(
actor_id = %actor_id,
generation,
reason = "shutdown",
hint_count,
"sqlite preload hint flush queued"
);
}
Err(error) => {
tracing::warn!(
actor_id = %actor_id,
generation,
reason = "shutdown",
hint_count,
?error,
"sqlite preload hint flush queue failed"
);
}
}
}

#[cfg(feature = "sqlite-local")]
async fn flush_preload_hints_best_effort(
db: Arc<Mutex<Option<NativeDatabaseHandle>>>,
handle: EnvoyHandle,
actor_id: String,
generation: u64,
reason: &'static str,
_db: Arc<Mutex<Option<NativeDatabaseHandle>>>,
_handle: EnvoyHandle,
_actor_id: String,
_generation: u64,
_reason: &'static str,
) {
let snapshot = match snapshot_preload_hints(db).await {
Ok(Some(snapshot)) => snapshot,
Ok(None) => return,
Err(error) => {
tracing::warn!(
actor_id = %actor_id,
?error,
reason,
"sqlite preload hint snapshot failed"
);
return;
}
};
if snapshot.pgnos.is_empty() && snapshot.ranges.is_empty() {
return;
}

let hint_count = snapshot.pgnos.len() + snapshot.ranges.len();
let request = protocol::SqlitePersistPreloadHintsRequest {
actor_id: actor_id.clone(),
generation,
hints: protocol_preload_hints(snapshot),
};
let response = timeout(
PRELOAD_HINT_FLUSH_TIMEOUT,
handle.sqlite_persist_preload_hints(request),
)
.await;
match response {
Ok(Ok(protocol::SqlitePersistPreloadHintsResponse::SqlitePersistPreloadHintsOk)) => {
tracing::debug!(
actor_id = %actor_id,
generation,
reason,
hint_count,
"sqlite preload hints flushed"
);
}
Ok(Ok(protocol::SqlitePersistPreloadHintsResponse::SqliteFenceMismatch(mismatch))) => {
tracing::debug!(
actor_id = %actor_id,
generation,
reason,
hint_count,
fence_reason = %mismatch.reason,
"sqlite preload hint flush skipped after fence mismatch"
);
}
Ok(Ok(protocol::SqlitePersistPreloadHintsResponse::SqliteErrorResponse(error))) => {
tracing::warn!(
actor_id = %actor_id,
generation,
reason,
hint_count,
error = %error.message,
"sqlite preload hint flush failed"
);
}
Ok(Err(error)) => {
tracing::warn!(
actor_id = %actor_id,
generation,
reason,
hint_count,
?error,
"sqlite preload hint flush failed"
);
}
Err(_) => {
tracing::warn!(
actor_id = %actor_id,
generation,
reason,
hint_count,
timeout_ms = PRELOAD_HINT_FLUSH_TIMEOUT.as_millis() as u64,
"sqlite preload hint flush timed out"
);
}
}
}

#[cfg(feature = "sqlite-local")]
async fn snapshot_preload_hints(
db: Arc<Mutex<Option<NativeDatabaseHandle>>>,
) -> Result<Option<VfsPreloadHintSnapshot>> {
tokio::task::spawn_blocking(move || {
let guard = db.lock();
Ok(guard.as_ref().map(NativeDatabaseHandle::snapshot_preload_hints))
})
.await
.context("join sqlite preload hint snapshot task")?
}

#[cfg(feature = "sqlite-local")]
fn protocol_preload_hints(snapshot: VfsPreloadHintSnapshot) -> protocol::SqlitePreloadHints {
protocol::SqlitePreloadHints {
pgnos: snapshot.pgnos,
ranges: snapshot
.ranges
.into_iter()
.map(|range| protocol::SqlitePreloadHintRange {
start_pgno: range.start_pgno,
page_count: range.page_count,
})
.collect(),
}
}

struct RemoteSqliteConfig {
Expand Down
Loading