Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions objectstore-server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use secrecy::{CloneableSecret, SecretBox, SerializableSecret, zeroize::Zeroize};
use serde::{Deserialize, Serialize};

pub use objectstore_log::{LevelFilter, LogFormat, LoggingConfig};
pub use objectstore_service::backend::StorageConfig;
pub use objectstore_service::backend::{MultipartUploadStorageConfig, StorageConfig};

use crate::killswitches::Killswitches;
use crate::rate_limits::RateLimits;
Expand Down Expand Up @@ -625,7 +625,7 @@ impl Config {
mod tests {
use std::io::Write;

use objectstore_service::backend::HighVolumeStorageConfig;
use objectstore_service::backend::{HighVolumeStorageConfig, MultipartUploadStorageConfig};
use secrecy::ExposeSecret;

use crate::killswitches::Killswitch;
Expand Down Expand Up @@ -771,7 +771,7 @@ mod tests {
};
let HighVolumeStorageConfig::BigTable(hv) = &c.high_volume;
assert_eq!(hv.project_id, "my-project");
let StorageConfig::Gcs(lt) = c.long_term.as_ref() else {
let MultipartUploadStorageConfig::Gcs(lt) = &c.long_term else {
panic!("expected gcs long_term");
};
assert_eq!(lt.bucket, "my-objectstore-bucket");
Expand Down Expand Up @@ -800,7 +800,7 @@ mod tests {
assert_eq!(hv.project_id, "my-project");
assert_eq!(hv.instance_name, "my-instance");
assert_eq!(hv.table_name, "my-table");
let StorageConfig::FileSystem(lt) = c.long_term.as_ref() else {
let MultipartUploadStorageConfig::FileSystem(lt) = &c.long_term else {
panic!("expected filesystem long_term");
};
assert_eq!(lt.path, Path::new("/data/lt"));
Expand Down
1 change: 1 addition & 0 deletions objectstore-service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ publish = false
[dependencies]
anyhow = { workspace = true }
async-trait = { workspace = true }
base64 = "0.22"
bigtable_rs = { git = "https://github.com/getsentry/bigtable_rs.git", rev = "4cb75bc5e5f87204363973f6302107768e64972e" }
chrono = "0.4"
bytes = { workspace = true }
Expand Down
134 changes: 123 additions & 11 deletions objectstore-service/src/backend/changelog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@
use std::collections::HashMap;
use std::fmt;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use std::time::{Duration, SystemTime};

use tokio_util::task::TaskTracker;
use tokio_util::task::task_tracker::TaskTrackerToken;

use crate::backend::common::{Backend, HighVolumeBackend, TieredMetadata};
use crate::backend::common::{HighVolumeBackend, MultipartUploadBackend, TieredMetadata};
Comment thread
lcian marked this conversation as resolved.
use crate::error::Result;
use crate::id::ObjectId;

Expand Down Expand Up @@ -76,6 +76,10 @@ pub struct Change {
///
/// Needs cleanup on success (the CAS committed and the old blob is unreferenced).
pub old: Option<ObjectId>,
/// Earliest time at which this entry becomes eligible for cleanup.
///
/// [`ChangeLog::scan`] filters out the entry, unless the deadline has passed.
pub cleanup_after: Option<SystemTime>,
}

/// Manager for multi-step storage changes, including backends and durable log.
Expand All @@ -88,7 +92,7 @@ pub struct ChangeManager {
/// The backend for small objects (≤ 1 MiB).
pub(crate) high_volume: Box<dyn HighVolumeBackend>,
/// The backend for large objects (> 1 MiB).
pub(crate) long_term: Box<dyn Backend>,
pub(crate) long_term: Box<dyn MultipartUploadBackend>,
/// Durable write-ahead log for multi-step changes.
pub(crate) changelog: Box<dyn ChangeLog>,
/// Tracks outstanding background cleanup operations for graceful shutdown.
Expand All @@ -99,7 +103,7 @@ impl ChangeManager {
/// Creates a new `ChangeManager` with the given backends and changelog.
pub fn new(
high_volume: Box<dyn HighVolumeBackend>,
long_term: Box<dyn Backend>,
long_term: Box<dyn MultipartUploadBackend>,
changelog: Box<dyn ChangeLog>,
) -> Arc<Self> {
Arc::new(Self {
Expand Down Expand Up @@ -134,6 +138,28 @@ impl ChangeManager {
Ok(ChangeGuard { state: Some(state) })
}

/// Records the change to the log and returns a guard in the `Assembling` state.
///
/// Behaves like [`Self::record`], except that the guard is created in the `Assembling` state.
/// Unlike other states, this guard does nothing on drop, leaving the burden of cleaning up to
/// the [`ChangeLog`].
pub async fn record_assembling(self: Arc<Self>, change: Change) -> Result<ChangeGuard> {
let token = self.tracker.token();

let id = ChangeId::new();
self.changelog.record(&id, &change).await?;

let state = ChangeState {
id,
change,
phase: ChangePhase::Assembling,
manager: self.clone(),
_token: token,
};

Ok(ChangeGuard { state: Some(state) })
}

/// Scans the changelog for outstanding entries and runs cleanup for each.
///
/// Spawn this into a background task at startup to recover from any orphaned objects after a
Expand Down Expand Up @@ -226,15 +252,31 @@ impl ChangeLog for InMemoryChangeLog {
}

async fn scan(&self) -> Result<Vec<(ChangeId, Change)>> {
let now = SystemTime::now();
let entries = self.entries.lock().expect("lock poisoned");
let result = entries
.iter()
.filter(|(_, change)| match change.cleanup_after {
None => true,
Some(deadline) => now >= deadline,
})
.map(|(id, change)| (id.clone(), change.clone()))
.collect();
Ok(result)
}
}

#[cfg(test)]
impl InMemoryChangeLog {
/// Sets [`Change::cleanup_after`] to the past for all entries, forcing them to be returned by a subsequent [`ChangeLog::scan`].
pub fn expire_all(&self) {
let mut entries = self.entries.lock().expect("lock poisoned");
for change in entries.values_mut() {
change.cleanup_after = Some(SystemTime::UNIX_EPOCH);
}
}
}

/// [`ChangeLog`] implementation that discards all entries.
///
/// Used as the default when no durable log is configured. Provides no
Expand Down Expand Up @@ -265,6 +307,12 @@ pub enum ChangePhase {
Recovered,
/// The change is recorded in the log and LT upload has started.
Recorded,
/// The LT blob originated from a multipart upload and is being assembled.
///
/// Multipart upload completion can fail, and we want the client to be able to retry it
/// without the change cleanup process racing to delete the LT blob.
/// Therefore, cleanup of changes in this phase is deferred.
Assembling,
/// LT upload has succeeded and the tombstone is being updated.
Written,
/// The tombstone update failed due to a conflict.
Expand Down Expand Up @@ -310,7 +358,7 @@ impl ChangeState {
ChangePhase::Written => self.read_tombstone().await,
ChangePhase::Lost => self.change.old.clone(),
ChangePhase::Updated => self.change.new.clone(),
ChangePhase::Completed => return, // unreachable
ChangePhase::Assembling | ChangePhase::Completed => return, // unreachable
};

if current != self.change.old
Expand Down Expand Up @@ -373,12 +421,21 @@ impl ChangeState {

impl Drop for ChangeState {
fn drop(&mut self) {
if self.phase != ChangePhase::Completed {
objectstore_log::error!(
change = ?self.change,
phase = ?self.phase,
"Operation dropped without completing cleanup"
);
match self.phase {
ChangePhase::Completed => {}
ChangePhase::Assembling => {
objectstore_log::warn!(
change = ?self.change,
"Operation dropped in Assembling state, cleanup deferred to ChangeLog recovery"
);
}
_ => {
objectstore_log::error!(
change = ?self.change,
phase = ?self.phase,
"Operation dropped without completing cleanup"
);
}
}
}
}
Expand All @@ -405,6 +462,7 @@ impl ChangeGuard {
impl Drop for ChangeGuard {
fn drop(&mut self) {
if let Some(state) = self.state.take()
&& state.phase != ChangePhase::Assembling
&& state.phase != ChangePhase::Completed
&& let Ok(handle) = tokio::runtime::Handle::try_current()
{
Expand Down Expand Up @@ -440,6 +498,7 @@ mod tests {
id: make_id("object-key"),
new: Some(make_id("object-key/rev1")),
old: None,
cleanup_after: None,
};

log.record(&id, &change).await.unwrap();
Expand All @@ -457,6 +516,7 @@ mod tests {
id: make_id("object-key"),
new: None,
old: Some(make_id("object-key/rev1")),
cleanup_after: None,
};

log.record(&id, &change).await.unwrap();
Expand Down Expand Up @@ -495,6 +555,7 @@ mod tests {
id: make_id("crash-test"),
new: Some(make_id("crash-test/rev")),
old: None,
cleanup_after: None,
}))
.unwrap()
// Runtime drops here while `guard` is still alive outside it.
Expand All @@ -508,4 +569,55 @@ mod tests {
let entries = rt.block_on(log.scan()).unwrap();
assert_eq!(entries.len(), 1, "log entry must persist");
}

#[tokio::test]
async fn scan_filters_by_cleanup_after() {
let log = InMemoryChangeLog::default();

let ready_id = ChangeId::new();
log.record(
&ready_id,
&Change {
id: make_id("ready"),
new: Some(make_id("ready/rev")),
old: None,
cleanup_after: None,
},
)
.await
.unwrap();

let expired_id = ChangeId::new();
log.record(
&expired_id,
&Change {
id: make_id("expired"),
new: Some(make_id("expired/rev")),
old: None,
cleanup_after: Some(SystemTime::now() - Duration::from_secs(1)),
},
)
.await
.unwrap();

let deferred_id = ChangeId::new();
log.record(
&deferred_id,
&Change {
id: make_id("deferred"),
new: Some(make_id("deferred/rev")),
old: None,
cleanup_after: Some(SystemTime::now() + Duration::from_hours(24)),
},
)
.await
.unwrap();

let entries = log.scan().await.unwrap();
assert_eq!(entries.len(), 2);

let ids: Vec<_> = entries.iter().map(|(id, _)| id).collect();
assert!(ids.contains(&&ready_id));
assert!(ids.contains(&&expired_id));
}
}
6 changes: 6 additions & 0 deletions objectstore-service/src/backend/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,12 @@ pub trait MultipartUploadBackend: Backend + fmt::Debug + Send + Sync + 'static {

/// Finalizes the upload identified by `(id, upload_id)` with the given
/// ordered list of parts.
///
/// Note that this returns `Result<Option<CompleteMultipartError>>`.
/// It's therefore possible to get `Ok(Some(err))`, meaning that at the server level this will
/// translate to HTTP `200 OK` with an error contained in the response body.
/// We need to do it this way to mirror backends that also behave like this (namely S3 and
/// GCS).
async fn complete_multipart(
&self,
id: &ObjectId,
Expand Down
27 changes: 26 additions & 1 deletion objectstore-service/src/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ pub async fn from_config(config: StorageConfig) -> Result<Box<dyn common::Backen
Ok(match config {
StorageConfig::Tiered(c) => {
let hv = hv_from_config(c.high_volume).await?;
let lt = from_leaf_config(*c.long_term).await?;
let lt = lt_from_config(c.long_term).await?;
let log = Box::new(changelog::NoopChangeLog);
Box::new(tiered::TieredStorage::new(hv, lt, log))
}
Expand Down Expand Up @@ -104,3 +104,28 @@ async fn hv_from_config(
HighVolumeStorageConfig::BigTable(c) => Box::new(bigtable::BigTableBackend::new(c).await?),
})
}

/// Configuration for the long-term backend in a [`tiered::TieredStorageConfig`].
///
/// Only backends that implement [`common::MultipartUploadBackend`] are valid here.
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(tag = "type", rename_all = "lowercase")]
pub enum MultipartUploadStorageConfig {
/// Local filesystem storage backend (type `"filesystem"`).
FileSystem(local_fs::FileSystemConfig),

/// [Google Cloud Storage] backend (type `"gcs"`).
///
/// [Google Cloud Storage]: https://cloud.google.com/storage
Gcs(gcs::GcsConfig),
}

/// Constructs a type-erased [`common::MultipartUploadBackend`] from the given config.
async fn lt_from_config(
Comment thread
lcian marked this conversation as resolved.
config: MultipartUploadStorageConfig,
) -> anyhow::Result<Box<dyn common::MultipartUploadBackend>> {
Ok(match config {
MultipartUploadStorageConfig::FileSystem(c) => Box::new(local_fs::LocalFsBackend::new(c)),
MultipartUploadStorageConfig::Gcs(c) => Box::new(gcs::GcsBackend::new(c).await?),
})
}
Loading
Loading