Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 29 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ members = [
"crates/physical-plan",
"crates/primitives",
"crates/query",
"crates/runtime",
"crates/sats",
"crates/schema",
"crates/smoketests",
Expand Down Expand Up @@ -139,6 +140,7 @@ spacetimedb-pg = { path = "crates/pg", version = "=2.2.0" }
spacetimedb-physical-plan = { path = "crates/physical-plan", version = "=2.2.0" }
spacetimedb-primitives = { path = "crates/primitives", version = "=2.2.0" }
spacetimedb-query = { path = "crates/query", version = "=2.2.0" }
spacetimedb-runtime = { path = "crates/runtime", version = "=2.2.0" }
spacetimedb-sats = { path = "crates/sats", version = "=2.2.0" }
spacetimedb-schema = { path = "crates/schema", version = "=2.2.0" }
spacetimedb-standalone = { path = "crates/standalone", version = "=2.2.0" }
Expand Down
1 change: 1 addition & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ spacetimedb-primitives.workspace = true
spacetimedb-paths.workspace = true
spacetimedb-physical-plan.workspace = true
spacetimedb-query.workspace = true
spacetimedb-runtime = { workspace = true, features = ["tokio"] }
spacetimedb-sats = { workspace = true, features = ["serde"] }
spacetimedb-schema.workspace = true
spacetimedb-table.workspace = true
Expand Down
12 changes: 6 additions & 6 deletions crates/core/src/db/durability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ use spacetimedb_datastore::{execution_context::ReducerContext, traits::TxData};
use spacetimedb_durability::Transaction;
use spacetimedb_lib::Identity;
use spacetimedb_sats::ProductValue;
use tokio::{runtime, time::timeout};

use crate::db::persistence::Durability;
use spacetimedb_runtime::Runtime;

pub(super) fn request_durability(
durability: &Durability,
Expand All @@ -32,11 +32,11 @@ pub(super) fn request_durability(
}));
}

pub(super) fn spawn_close(durability: Arc<Durability>, runtime: &runtime::Handle, database_identity: Identity) {
let rt = runtime.clone();
rt.spawn(async move {
let label = format!("[{database_identity}]");
match timeout(Duration::from_secs(10), durability.close()).await {
pub(super) fn spawn_close(durability: Arc<Durability>, runtime: &Runtime, database_identity: Identity) {
let label = format!("[{database_identity}]");
let runtime = runtime.clone();
runtime.clone().spawn(async move {
match runtime.timeout(Duration::from_secs(10), durability.close()).await {
Err(_elapsed) => {
error!("{label} timeout waiting for durability shutdown");
}
Expand Down
24 changes: 18 additions & 6 deletions crates/core/src/db/persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use spacetimedb_paths::server::ServerDataDir;
use spacetimedb_snapshot::DynSnapshotRepo;

use crate::{messages::control_db::Database, util::asyncify};
use spacetimedb_runtime::Runtime;

use super::{
relational_db::{self, Txdata},
Expand Down Expand Up @@ -41,8 +42,8 @@ pub struct Persistence {
/// persistent (as opposed to in-memory) databases. This is enforced by
/// this type.
pub snapshots: Option<SnapshotWorker>,
/// The tokio runtime onto which durability-related tasks shall be spawned.
pub runtime: tokio::runtime::Handle,
/// Runtime onto which durability-related tasks shall be spawned.
pub runtime: Runtime,
}

impl Persistence {
Expand All @@ -52,6 +53,15 @@ impl Persistence {
disk_size: impl Fn() -> io::Result<SizeOnDisk> + Send + Sync + 'static,
snapshots: Option<SnapshotWorker>,
runtime: tokio::runtime::Handle,
) -> Self {
Self::new_with_runtime(durability, disk_size, snapshots, Runtime::tokio(runtime))
}

pub fn new_with_runtime(
durability: impl spacetimedb_durability::Durability<TxData = Txdata> + 'static,
disk_size: impl Fn() -> io::Result<SizeOnDisk> + Send + Sync + 'static,
snapshots: Option<SnapshotWorker>,
runtime: Runtime,
) -> Self {
Self {
durability: Arc::new(durability),
Expand Down Expand Up @@ -91,7 +101,7 @@ impl Persistence {
Option<Arc<Durability>>,
Option<DiskSizeFn>,
Option<SnapshotWorker>,
Option<tokio::runtime::Handle>,
Option<Runtime>,
) {
this.map(
|Self {
Expand Down Expand Up @@ -143,13 +153,15 @@ impl PersistenceProvider for LocalPersistenceProvider {
async fn persistence(&self, database: &Database, replica_id: u64) -> anyhow::Result<Persistence> {
let replica_dir = self.data_dir.replica(replica_id);
let snapshot_dir = replica_dir.snapshots();
let runtime = Runtime::tokio_current();

let database_identity = database.database_identity;
let snapshot_worker =
asyncify(move || relational_db::open_snapshot_repo(snapshot_dir, database_identity, replica_id))
.await
.map(|repo| SnapshotWorker::new(repo, snapshot::Compression::Enabled))?;
let (durability, disk_size) = relational_db::local_durability(replica_dir, Some(&snapshot_worker)).await?;
.map(|repo| SnapshotWorker::new(repo, snapshot::Compression::Enabled, runtime.clone()))?;
let (durability, disk_size) =
relational_db::local_durability(replica_dir, runtime.clone(), Some(&snapshot_worker)).await?;

tokio::spawn(relational_db::snapshot_watching_commitlog_compressor(
snapshot_worker.subscribe(),
Expand All @@ -162,7 +174,7 @@ impl PersistenceProvider for LocalPersistenceProvider {
durability,
disk_size,
snapshots: Some(snapshot_worker),
runtime: tokio::runtime::Handle::current(),
runtime,
})
}
}
33 changes: 22 additions & 11 deletions crates/core/src/db/relational_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,11 @@ use spacetimedb_lib::db::raw_def::v9::{btree, RawModuleDefV9Builder, RawSql};
use spacetimedb_lib::st_var::StVarValue;
use spacetimedb_lib::ConnectionId;
use spacetimedb_lib::Identity;
#[cfg(test)]
use spacetimedb_paths::server::SnapshotDirPath;
use spacetimedb_paths::server::{ReplicaDir, SnapshotsPath};
use spacetimedb_primitives::*;
use spacetimedb_runtime::Runtime;
use spacetimedb_sats::memory_usage::MemoryUsage;
use spacetimedb_sats::raw_identifier::RawIdentifier;
use spacetimedb_sats::{AlgebraicType, AlgebraicValue, ProductType, ProductValue};
Expand Down Expand Up @@ -101,7 +104,7 @@ pub struct RelationalDB {

inner: Locking,
durability: Option<Arc<Durability>>,
durability_runtime: Option<tokio::runtime::Handle>,
durability_runtime: Option<Runtime>,
snapshot_worker: Option<SnapshotWorker>,

row_count_fn: RowCountFn,
Expand Down Expand Up @@ -135,6 +138,7 @@ impl std::fmt::Debug for RelationalDB {

impl Drop for RelationalDB {
fn drop(&mut self) {
log::info!("starting drop");
// Attempt to flush the outstanding transactions.
if let (Some(durability), Some(runtime)) = (self.durability.take(), self.durability_runtime.take()) {
spawn_durability_close(durability, &runtime, self.database_identity);
Expand Down Expand Up @@ -1676,9 +1680,9 @@ const COMMITLOG_COMPRESSION_FORCE_SEGMENT_BACKLOG: usize = 8;
/// of the commitlog.
pub async fn local_durability(
replica_dir: ReplicaDir,
runtime: Runtime,
snapshot_worker: Option<&SnapshotWorker>,
) -> Result<(LocalDurability, DiskSizeFn), DBError> {
let rt = tokio::runtime::Handle::current();
let on_new_segment = snapshot_worker.map(|snapshot_worker| {
let snapshot_worker = snapshot_worker.clone();
Arc::new(move || {
Expand All @@ -1690,7 +1694,7 @@ pub async fn local_durability(
let local = asyncify(move || {
durability::Local::open(
replica_dir.clone(),
rt,
runtime,
<_>::default(),
// Give the durability a handle to request a new snapshot run,
// which it will send down whenever we rotate commitlog segments.
Expand Down Expand Up @@ -2030,6 +2034,7 @@ pub mod tests_utils {
use spacetimedb_fs_utils::compression::CompressType;
use spacetimedb_lib::{bsatn::to_vec, ser::Serialize};
use spacetimedb_paths::server::ReplicaDir;
use spacetimedb_paths::server::SnapshotDirPath;
use spacetimedb_paths::FromPathUnchecked;
use tempfile::TempDir;

Expand Down Expand Up @@ -2176,19 +2181,22 @@ pub mod tests_utils {
) -> Result<(RelationalDB, Arc<durability::Local<ProductValue>>), DBError> {
let snapshots = want_snapshot_repo
.then(|| {
open_snapshot_repo(root.snapshots(), db_identity, replica_id)
.map(|repo| SnapshotWorker::new(repo, snapshot::Compression::Disabled))
open_snapshot_repo(root.snapshots(), db_identity, replica_id).map(|repo| {
SnapshotWorker::new(repo, snapshot::Compression::Disabled, Runtime::tokio(rt.clone()))
})
})
.transpose()?;

let (local, disk_size_fn) = rt.block_on(local_durability(root.clone(), snapshots.as_ref()))?;
let runtime = Runtime::tokio(rt.clone());
let (local, disk_size_fn) =
rt.block_on(local_durability(root.clone(), runtime.clone(), snapshots.as_ref()))?;
let history = local.as_history();

let persistence = Persistence {
durability: local.clone(),
disk_size: disk_size_fn,
snapshots,
runtime: rt,
runtime,
};

let (db, _) = RelationalDB::open(
Expand Down Expand Up @@ -2301,17 +2309,20 @@ pub mod tests_utils {
) -> Result<(RelationalDB, Arc<durability::Local<ProductValue>>), DBError> {
let snapshots = want_snapshot_repo
.then(|| {
open_snapshot_repo(root.snapshots(), Identity::ZERO, 0)
.map(|repo| SnapshotWorker::new(repo, snapshot::Compression::Disabled))
open_snapshot_repo(root.snapshots(), Identity::ZERO, 0).map(|repo| {
SnapshotWorker::new(repo, snapshot::Compression::Enabled, Runtime::tokio(rt.clone()))
})
})
.transpose()?;
let (local, disk_size_fn) = rt.block_on(local_durability(root.clone(), snapshots.as_ref()))?;
let runtime = Runtime::tokio(rt.clone());
let (local, disk_size_fn) =
rt.block_on(local_durability(root.clone(), runtime.clone(), snapshots.as_ref()))?;
let history = local.as_history();
let persistence = Persistence {
durability: local.clone(),
disk_size: disk_size_fn,
snapshots,
runtime: rt,
runtime,
};
let db = Self::open_db(history, Some(persistence), None, 0)?;

Expand Down
Loading
Loading