Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions engine/packages/depot-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ crate-type = ["lib"]

[dependencies]
anyhow.workspace = true
async-trait.workspace = true
crossbeam-channel = "0.5"
libsqlite3-sys = { version = "0.30", features = ["bundled"] }
rivet-envoy-client = { workspace = true, features = ["native-transport"] }
tokio.workspace = true
tracing.workspace = true
getrandom = "0.2"
Expand All @@ -25,7 +25,6 @@ moka = { version = "0.12", default-features = false, features = ["sync"] }
parking_lot.workspace = true

[dev-dependencies]
async-trait.workspace = true
depot = { workspace = true, features = ["test-faults"] }
futures-util.workspace = true
gas.workspace = true
Expand Down
45 changes: 16 additions & 29 deletions engine/packages/depot-client/src/database.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
use std::sync::Arc;

use anyhow::{Result, anyhow};
use rivet_envoy_client::handle::EnvoyHandle;
use tokio::runtime::Handle;

use crate::{
query::{BindParam, ExecResult, ExecuteResult, QueryResult},
transport::EmbeddedDepotSqliteTransport,
vfs::{
NativeVfsHandle, SqliteTransport, SqliteVfs, SqliteVfsMetrics, SqliteVfsMetricsSnapshot,
VfsConfig, VfsPreloadHintSnapshot, fetch_initial_main_page_for_registration,
NativeVfsHandle, SqliteTransportHandle, SqliteVfs, SqliteVfsMetrics,
SqliteVfsMetricsSnapshot, VfsConfig, VfsPreloadHintSnapshot,
fetch_initial_main_page_for_registration,
},
worker::SqliteWorkerHandle,
};
Expand All @@ -23,16 +24,15 @@ pub fn vfs_name_for_actor_database(actor_id: &str, generation: u64) -> String {
format!("envoy-sqlite-{actor_id}-g{generation}")
}

pub async fn open_database_from_envoy(
handle: EnvoyHandle,
pub async fn open_database_from_transport(
transport: SqliteTransportHandle,
actor_id: String,
generation: u64,
rt_handle: Handle,
metrics: Option<Arc<dyn SqliteVfsMetrics>>,
) -> Result<NativeDatabaseHandle> {
let vfs_name = vfs_name_for_actor_database(&actor_id, generation);
let transport = SqliteTransport::from_envoy(handle);
let initial_main_page = fetch_initial_main_page_for_registration(&transport, &actor_id)
let initial_main_page = fetch_initial_main_page_for_registration(transport.clone(), &actor_id)
.await
.map_err(|e| anyhow!("failed to preload sqlite main page: {e}"))?;
let vfs = Arc::new(
Expand All @@ -53,34 +53,21 @@ pub async fn open_database_from_envoy(
Ok(native_db)
}

pub async fn open_database_from_conveyer(
pub async fn open_database_from_embedded_depot(
db: Arc<depot::conveyer::Db>,
actor_id: String,
generation: u64,
rt_handle: Handle,
metrics: Option<Arc<dyn SqliteVfsMetrics>>,
) -> Result<NativeDatabaseHandle> {
let vfs_name = vfs_name_for_actor_database(&actor_id, generation);
let transport = SqliteTransport::from_conveyer(db);
let initial_main_page = fetch_initial_main_page_for_registration(&transport, &actor_id)
.await
.map_err(|e| anyhow!("failed to preload sqlite main page: {e}"))?;
let vfs = Arc::new(
SqliteVfs::register_with_transport_and_initial_page(
&vfs_name,
transport,
actor_id.clone(),
rt_handle,
VfsConfig::default(),
initial_main_page,
metrics.clone(),
)
.map_err(|e| anyhow!("failed to register sqlite VFS: {e}"))?,
);

let native_db = NativeDatabaseHandle::new_with_metrics(vfs, actor_id, metrics)?;
native_db.initialize().await?;
Ok(native_db)
open_database_from_transport(
Arc::new(EmbeddedDepotSqliteTransport::new(db)),
actor_id,
generation,
rt_handle,
metrics,
)
.await
}

impl NativeDatabaseHandle {
Expand Down
3 changes: 3 additions & 0 deletions engine/packages/depot-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ pub mod optimization_flags;
/// SQLite query execution helpers.
pub mod query;

/// SQLite transport adapters for same-process Depot usage.
pub mod transport;

pub use depot_client_types as types;

/// Custom SQLite VFS for actor-side depot transport.
Expand Down
86 changes: 86 additions & 0 deletions engine/packages/depot-client/src/transport.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
//! SQLite transport adapters.
//!
//! `EmbeddedDepotSqliteTransport` is for deployments where the SQLite VFS runs in the
//! same process or server as the Depot backend. It calls `depot::conveyer::Db`
//! directly instead of routing page operations through an actor Envoy transport.

use std::sync::Arc;

use anyhow::Result;
use async_trait::async_trait;
use rivet_envoy_protocol as protocol;

use crate::vfs::SqliteTransport;

pub struct EmbeddedDepotSqliteTransport {
db: Arc<depot::conveyer::Db>,
}

impl EmbeddedDepotSqliteTransport {
pub fn new(db: Arc<depot::conveyer::Db>) -> Self {
Self { db }
}
}

#[async_trait]
impl SqliteTransport for EmbeddedDepotSqliteTransport {
async fn get_pages(
&self,
request: protocol::SqliteGetPagesRequest,
) -> Result<protocol::SqliteGetPagesResponse> {
match self.db.get_pages(request.pgnos).await {
Ok(pages) => Ok(protocol::SqliteGetPagesResponse::SqliteGetPagesOk(
protocol::SqliteGetPagesOk {
pages: pages
.into_iter()
.map(|page| protocol::SqliteFetchedPage {
pgno: page.pgno,
bytes: page.bytes,
})
.collect(),
},
)),
Err(err) => Ok(protocol::SqliteGetPagesResponse::SqliteErrorResponse(
protocol::SqliteErrorResponse {
message: sqlite_error_reason(&err),
},
)),
}
}

async fn commit(
&self,
request: protocol::SqliteCommitRequest,
) -> Result<protocol::SqliteCommitResponse> {
match self
.db
.commit(
request
.dirty_pages
.into_iter()
.map(|page| depot::types::DirtyPage {
pgno: page.pgno,
bytes: page.bytes,
})
.collect(),
request.db_size_pages,
request.now_ms,
)
.await
{
Ok(()) => Ok(protocol::SqliteCommitResponse::SqliteCommitOk),
Err(err) => Ok(protocol::SqliteCommitResponse::SqliteErrorResponse(
protocol::SqliteErrorResponse {
message: sqlite_error_reason(&err),
},
)),
}
}
}

fn sqlite_error_reason(err: &anyhow::Error) -> String {
err.chain()
.map(ToString::to_string)
.collect::<Vec<_>>()
.join(": ")
}
Loading
Loading