Skip to content
61 changes: 45 additions & 16 deletions crates/commitlog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,15 +153,22 @@ impl Options {
}
}

/// The canonical commitlog, backed by on-disk log files.
/// The canonical commitlog API over a repository backend `R`.
///
/// The default backend is the on-disk filesystem repository
/// [`repo::Fs`], but tests may supply another [`Repo`]
/// implementation.
///
/// Records in the log are of type `T`, which canonically is instantiated to
/// [`payload::Txdata`].
pub struct Commitlog<T> {
inner: RwLock<commitlog::Generic<repo::Fs, T>>,
pub struct Commitlog<T, R = repo::Fs>
where
R: Repo,
{
inner: RwLock<commitlog::Generic<R, T>>,
}

impl<T> Commitlog<T> {
impl<T> Commitlog<T, repo::Fs> {
/// Open the log at root directory `root` with [`Options`].
///
/// The root directory must already exist.
Expand All @@ -180,7 +187,26 @@ impl<T> Commitlog<T> {
root.display()
);
}
let inner = commitlog::Generic::open(repo::Fs::new(root, on_new_segment)?, opts)?;
Self::open_with_repo(repo::Fs::new(root, on_new_segment)?, opts)
}

/// Determine the size on disk of this commitlog.
pub fn size_on_disk(&self) -> io::Result<SizeOnDisk> {
let inner = self.inner.read().unwrap();
inner.repo.size_on_disk()
}
}

impl<T, R> Commitlog<T, R>
where
R: Repo,
{
/// Open the log in `repo` with [`Options`].
///
/// This is useful for tests which provide a repository
/// implementation other than [`repo::Fs`].
pub fn open_with_repo(repo: R, opts: Options) -> io::Result<Self> {
let inner = commitlog::Generic::open(repo, opts)?;

Ok(Self {
inner: RwLock::new(inner),
Expand Down Expand Up @@ -309,7 +335,7 @@ impl<T> Commitlog<T> {
/// This means that, when this iterator yields an `Err` value, the consumer
/// may want to check if the iterator is exhausted (by calling `next()`)
/// before treating the `Err` value as an application error.
pub fn commits(&self) -> impl Iterator<Item = Result<StoredCommit, error::Traversal>> + use<T> {
pub fn commits(&self) -> impl Iterator<Item = Result<StoredCommit, error::Traversal>> + use<T, R> {
self.commits_from(0)
}

Expand All @@ -322,7 +348,10 @@ impl<T> Commitlog<T> {
/// Note that the first [`StoredCommit`] yielded is the first commit
/// containing the given transaction offset, i.e. its `min_tx_offset` may be
/// smaller than `offset`.
pub fn commits_from(&self, offset: u64) -> impl Iterator<Item = Result<StoredCommit, error::Traversal>> + use<T> {
pub fn commits_from(
&self,
offset: u64,
) -> impl Iterator<Item = Result<StoredCommit, error::Traversal>> + use<T, R> {
self.inner.read().unwrap().commits_from(offset)
}

Expand Down Expand Up @@ -401,15 +430,13 @@ impl<T> Commitlog<T> {
inner: RwLock::new(inner),
})
}

/// Determine the size on disk of this commitlog.
pub fn size_on_disk(&self) -> io::Result<SizeOnDisk> {
let inner = self.inner.read().unwrap();
inner.repo.size_on_disk()
}
}

impl<T: Encode> Commitlog<T> {
impl<T, R> Commitlog<T, R>
where
T: Encode,
R: Repo,
Comment thread
Shubham8287 marked this conversation as resolved.
{
/// Write `transactions` to the log.
///
/// This will store all `transactions` as a single [Commit]
Expand Down Expand Up @@ -479,10 +506,11 @@ impl<T: Encode> Commitlog<T> {
pub fn transactions<'a, D>(
&self,
de: &'a D,
) -> impl Iterator<Item = Result<Transaction<T>, D::Error>> + 'a + use<'a, D, T>
) -> impl Iterator<Item = Result<Transaction<T>, D::Error>> + 'a + use<'a, D, T, R>
where
D: Decoder<Record = T>,
D::Error: From<error::Traversal>,
R: 'a,
T: 'a,
{
self.transactions_from(0, de)
Expand All @@ -498,10 +526,11 @@ impl<T: Encode> Commitlog<T> {
&self,
offset: u64,
de: &'a D,
) -> impl Iterator<Item = Result<Transaction<T>, D::Error>> + 'a + use<'a, D, T>
) -> impl Iterator<Item = Result<Transaction<T>, D::Error>> + 'a + use<'a, D, T, R>
where
D: Decoder<Record = T>,
D::Error: From<error::Traversal>,
R: 'a,
T: 'a,
{
self.inner.read().unwrap().transactions_from(offset, de)
Expand Down
12 changes: 12 additions & 0 deletions crates/commitlog/src/repo/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,18 @@ pub trait Repo: Clone + fmt::Display {
}
}

/// Marker for repos that do not require an external lock file.
///
/// Durability implementations can use this to expose repo-backed opening
/// only for storage backends where skipping the filesystem `db.lock` cannot
/// violate single-writer safety.
pub trait RepoWithoutLockFile: Repo {}

impl<T: RepoWithoutLockFile> RepoWithoutLockFile for &T {}

#[cfg(any(test, feature = "test"))]
impl RepoWithoutLockFile for Memory {}

impl<T: Repo> Repo for &T {
type SegmentWriter = T::SegmentWriter;
type SegmentReader = T::SegmentReader;
Expand Down
8 changes: 4 additions & 4 deletions crates/core/src/db/persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use async_trait::async_trait;
use spacetimedb_commitlog::SizeOnDisk;
use spacetimedb_durability::{DurabilityExited, TxOffset};
use spacetimedb_paths::server::ServerDataDir;
use spacetimedb_snapshot::SnapshotRepository;
use spacetimedb_snapshot::DynSnapshotRepo;

use crate::{messages::control_db::Database, util::asyncify};

Expand Down Expand Up @@ -61,9 +61,9 @@ impl Persistence {
}
}

/// If snapshots are enabled, get the [SnapshotRepository] they are stored in.
pub fn snapshot_repo(&self) -> Option<&SnapshotRepository> {
self.snapshots.as_ref().map(|worker| worker.repo())
/// If snapshots are enabled, get the [SnapshotRepo] they are stored in.
pub fn snapshot_repo(&self) -> Option<Arc<DynSnapshotRepo>> {
self.snapshots.as_ref().map(|worker| worker.snapshot_repo())
}

/// Get the [TxOffset] reported as durable by the [Durability] impl.
Expand Down
72 changes: 52 additions & 20 deletions crates/core/src/db/relational_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use spacetimedb_schema::schema::{
ColumnSchema, IndexSchema, RowLevelSecuritySchema, Schema, SequenceSchema, TableSchema,
};
use spacetimedb_schema::table_name::TableName;
use spacetimedb_snapshot::{ReconstructedSnapshot, SnapshotError, SnapshotRepository};
use spacetimedb_snapshot::{DynSnapshotRepo, ReconstructedSnapshot, SnapshotError, SnapshotRepository};
use spacetimedb_table::indexes::RowPointer;
use spacetimedb_table::page_pool::PagePool;
use spacetimedb_table::table::{RowRef, TableScanIter};
Expand Down Expand Up @@ -235,7 +235,7 @@ impl RelationalDB {
///
/// - `snapshot_repo`
///
/// The [`SnapshotRepository`] which stores snapshots of this database.
/// The [`SnapshotRepo`] which stores snapshots of this database.
/// This is only meaningful if `history` and `durability` are also supplied.
/// If restoring from an existing database, the `snapshot_repo` must
/// store views of the same sequence of TXes as the `history`.
Expand Down Expand Up @@ -278,9 +278,10 @@ impl RelationalDB {

let start_time = std::time::Instant::now();

let snapshot_repo = persistence.as_ref().and_then(|p| p.snapshot_repo());
let inner = Self::restore_from_snapshot_or_bootstrap(
database_identity,
persistence.as_ref().and_then(|p| p.snapshot_repo()),
snapshot_repo.as_deref(),
durable_tx_offset,
min_commitlog_offset,
page_pool,
Expand All @@ -292,7 +293,7 @@ impl RelationalDB {
.snapshot_repo()
.map(|repo| repo.database_identity() == database_identity)
.unwrap_or(true),
"snapshot repository does not match database identity",
"snapshot repo does not match database identity",
);
persistence.set_snapshot_state(inner.committed_state.clone());
}
Expand Down Expand Up @@ -471,15 +472,15 @@ impl RelationalDB {

fn restore_from_snapshot_or_bootstrap(
database_identity: Identity,
snapshot_repo: Option<&SnapshotRepository>,
snapshot_repo: Option<&DynSnapshotRepo>,
durable_tx_offset: Option<TxOffset>,
min_commitlog_offset: TxOffset,
page_pool: PagePool,
) -> Result<Locking, RestoreSnapshotError> {
// Try to load the `ReconstructedSnapshot` at `snapshot_offset`.
fn try_load_snapshot(
database_identity: &Identity,
snapshot_repo: &SnapshotRepository,
snapshot_repo: &DynSnapshotRepo,
snapshot_offset: TxOffset,
page_pool: &PagePool,
) -> Result<ReconstructedSnapshot, Box<SnapshotError>> {
Expand Down Expand Up @@ -592,11 +593,12 @@ impl RelationalDB {
// Invalidate the snapshot if the error is permanent.
// Newly created snapshots should not depend on it.
if !is_transient_error(&e) {
let path = snapshot_repo.snapshot_dir_path(snapshot_offset);
log::info!("invalidating bad snapshot at {}", path.display());
path.rename_invalid().map_err(|e| RestoreSnapshotError::Invalidate {
offset: snapshot_offset,
source: Box::new(e.into()),
log::info!("invalidating bad snapshot at {snapshot_offset}");
snapshot_repo.invalidate_snapshot(snapshot_offset).map_err(|e| {
RestoreSnapshotError::Invalidate {
offset: snapshot_offset,
source: Box::new(e),
}
})?;
}
// Try the next older one if the error was transient.
Expand All @@ -612,7 +614,7 @@ impl RelationalDB {
}
}
}
log::info!("[{database_identity}] DATABASE: no usable snapshot on disk");
log::info!("[{database_identity}] DATABASE: no usable snapshot in snapshot repo");

// If we didn't find a snapshot and the commitlog doesn't start at the
// zero-th commit (e.g. due to archiving), there is no way to restore
Expand Down Expand Up @@ -769,6 +771,19 @@ impl RelationalDB {
r
}

#[cfg(any(feature = "test", test))]
#[tracing::instrument(level = "trace", skip_all)]
pub fn try_begin_mut_tx(&self, isolation_level: IsolationLevel, workload: Workload) -> Option<MutTx> {
Comment thread
kim marked this conversation as resolved.
log::trace!("TRY BEGIN MUT TX");
let r = self.inner.try_begin_mut_tx(isolation_level, workload);
if r.is_some() {
log::trace!("ACQUIRED MUT TX");
} else {
log::trace!("MUT TX CONTENDED");
}
r
}

#[tracing::instrument(level = "trace", skip_all)]
pub fn begin_tx(&self, workload: Workload) -> Tx {
log::trace!("BEGIN TX");
Expand Down Expand Up @@ -1007,7 +1022,7 @@ impl RelationalDB {
Ok(self.inner.alter_table_row_type_mut_tx(tx, table_id, column_schemas)?)
}

pub(crate) fn add_columns_to_table(
pub(crate) fn add_columns_to_table_mut_tx(
Comment thread
kim marked this conversation as resolved.
&self,
tx: &mut MutTx,
table_id: TableId,
Expand All @@ -1019,6 +1034,17 @@ impl RelationalDB {
.add_columns_to_table_mut_tx(tx, table_id, column_schemas, default_values)?)
}

#[cfg(any(feature = "test", test))]
pub fn add_columns_to_table(
Comment thread
kim marked this conversation as resolved.
&self,
tx: &mut MutTx,
table_id: TableId,
column_schemas: Vec<ColumnSchema>,
default_values: Vec<AlgebraicValue>,
) -> Result<TableId, DBError> {
self.add_columns_to_table_mut_tx(tx, table_id, column_schemas, default_values)
}

/// Reports the `TxMetrics`s passed.
///
/// Should only be called after the tx lock has been fully released.
Expand Down Expand Up @@ -1771,13 +1797,12 @@ pub mod tests_utils {

use super::*;
use core::ops::Deref;
use durability::EmptyHistory;
use durability::{Durability, EmptyHistory};
use spacetimedb_datastore::locking_tx_datastore::MutTxId;
use spacetimedb_datastore::locking_tx_datastore::TxId;
use spacetimedb_fs_utils::compression::CompressType;
use spacetimedb_lib::{bsatn::to_vec, ser::Serialize};
use spacetimedb_paths::server::ReplicaDir;
use spacetimedb_paths::server::SnapshotDirPath;
use spacetimedb_paths::FromPathUnchecked;
use tempfile::TempDir;

Expand Down Expand Up @@ -1980,11 +2005,13 @@ pub mod tests_utils {
drop(self.db);

if let Some(DurableState {
durability: _,
durability,
rt,
replica_dir,
}) = self.durable
{
rt.block_on(durability.close());
drop(durability);
// Enter the runtime so that `Self::durable_internal` can spawn a `SnapshotWorker`.
let _rt = rt.enter();
let (db, handle) = Self::durable_internal(&replica_dir, rt.handle().clone(), self.want_snapshot_repo)?;
Expand Down Expand Up @@ -2091,7 +2118,7 @@ pub mod tests_utils {
Arc::new(|_, _| i64::MAX)
}

pub fn take_snapshot(&self, repo: &SnapshotRepository) -> Result<Option<SnapshotDirPath>, DBError> {
pub fn take_snapshot(&self, repo: &DynSnapshotRepo) -> Result<Option<TxOffset>, DBError> {
Ok(self.inner.take_snapshot(repo)?)
}
}
Expand Down Expand Up @@ -3661,7 +3688,7 @@ mod tests {
let repo = open_snapshot_repo(dir, Identity::ZERO, 0)?;
RelationalDB::restore_from_snapshot_or_bootstrap(
Identity::ZERO,
Some(&repo),
Some(repo.as_ref()),
Some(last_compress),
0,
PagePool::new_for_test(),
Expand Down Expand Up @@ -3689,8 +3716,13 @@ mod tests {
);

let last = repo.latest_snapshot()?;
let stdb =
RelationalDB::restore_from_snapshot_or_bootstrap(identity, Some(&repo), last, 0, PagePool::new_for_test())?;
let stdb = RelationalDB::restore_from_snapshot_or_bootstrap(
identity,
Some(repo.as_ref()),
last,
0,
PagePool::new_for_test(),
)?;

let out = TempDir::with_prefix("snapshot_test")?;
let dir = SnapshotsPath::from_path_unchecked(out.path());
Expand Down
Loading
Loading