Skip to content
Merged
128 changes: 120 additions & 8 deletions objectstore-service/src/backend/changelog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
use std::collections::HashMap;
use std::fmt;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use std::time::{Duration, SystemTime};

use tokio_util::task::TaskTracker;
use tokio_util::task::task_tracker::TaskTrackerToken;
Expand Down Expand Up @@ -76,6 +76,10 @@ pub struct Change {
///
/// Needs cleanup on success (the CAS committed and the old blob is unreferenced).
pub old: Option<ObjectId>,
/// Earliest time at which this entry becomes eligible for cleanup.
///
/// [`ChangeLog::scan`] filters out the entry, unless the deadline has passed.
pub cleanup_after: Option<SystemTime>,
}

/// Manager for multi-step storage changes, including backends and durable log.
Expand Down Expand Up @@ -134,6 +138,28 @@ impl ChangeManager {
Ok(ChangeGuard { state: Some(state) })
}

/// Records the change to the log and returns a guard in the `Assembling` state.
///
/// Behaves like [`Self::record`], except that the guard is created in the `Assembling` state.
/// Unlike other states, this guard does nothing on drop, leaving the burden of cleaning up to
/// the [`ChangeLog`].
pub async fn record_assembling(self: Arc<Self>, change: Change) -> Result<ChangeGuard> {
let token = self.tracker.token();

let id = ChangeId::new();
self.changelog.record(&id, &change).await?;

let state = ChangeState {
id,
change,
phase: ChangePhase::Assembling,
manager: self.clone(),
_token: token,
};

Ok(ChangeGuard { state: Some(state) })
}

/// Scans the changelog for outstanding entries and runs cleanup for each.
///
/// Spawn this into a background task at startup to recover from any orphaned objects after a
Expand Down Expand Up @@ -226,15 +252,31 @@ impl ChangeLog for InMemoryChangeLog {
}

async fn scan(&self) -> Result<Vec<(ChangeId, Change)>> {
let now = SystemTime::now();
let entries = self.entries.lock().expect("lock poisoned");
let result = entries
.iter()
.filter(|(_, change)| match change.cleanup_after {
None => true,
Some(deadline) => now >= deadline,
})
.map(|(id, change)| (id.clone(), change.clone()))
.collect();
Ok(result)
}
}

#[cfg(test)]
impl InMemoryChangeLog {
/// Sets [`Change::cleanup_after`] to the past for all entries, forcing them to be returned by a subsequent [`ChangeLog::scan`].
pub fn expire_all(&self) {
let mut entries = self.entries.lock().expect("lock poisoned");
for change in entries.values_mut() {
change.cleanup_after = Some(SystemTime::UNIX_EPOCH);
}
}
}

/// [`ChangeLog`] implementation that discards all entries.
///
/// Used as the default when no durable log is configured. Provides no
Expand Down Expand Up @@ -265,6 +307,12 @@ pub enum ChangePhase {
Recovered,
/// The change is recorded in the log and LT upload has started.
Recorded,
/// The LT blob originated from a multipart upload and is being assembled.
///
/// Multipart upload completion can fail, and we want the client to be able to retry it
/// without the change cleanup process racing to delete the LT blob.
/// Therefore, cleanup of changes in this phase is deferred.
Assembling,
/// LT upload has succeeded and the tombstone is being updated.
Written,
/// The tombstone update failed due to a conflict.
Expand Down Expand Up @@ -310,7 +358,7 @@ impl ChangeState {
ChangePhase::Written => self.read_tombstone().await,
ChangePhase::Lost => self.change.old.clone(),
ChangePhase::Updated => self.change.new.clone(),
ChangePhase::Completed => return, // unreachable
ChangePhase::Assembling | ChangePhase::Completed => return, // unreachable
};

if current != self.change.old
Expand Down Expand Up @@ -373,12 +421,21 @@ impl ChangeState {

impl Drop for ChangeState {
fn drop(&mut self) {
if self.phase != ChangePhase::Completed {
objectstore_log::error!(
change = ?self.change,
phase = ?self.phase,
"Operation dropped without completing cleanup"
);
match self.phase {
ChangePhase::Completed => {}
ChangePhase::Assembling => {
objectstore_log::warn!(
change = ?self.change,
"Operation dropped in Assembling state, cleanup deferred to ChangeLog recovery"
);
}
_ => {
objectstore_log::error!(
change = ?self.change,
phase = ?self.phase,
"Operation dropped without completing cleanup"
);
}
}
}
}
Expand All @@ -405,6 +462,7 @@ impl ChangeGuard {
impl Drop for ChangeGuard {
fn drop(&mut self) {
if let Some(state) = self.state.take()
&& state.phase != ChangePhase::Assembling
&& state.phase != ChangePhase::Completed
&& let Ok(handle) = tokio::runtime::Handle::try_current()
{
Expand Down Expand Up @@ -440,6 +498,7 @@ mod tests {
id: make_id("object-key"),
new: Some(make_id("object-key/rev1")),
old: None,
cleanup_after: None,
};

log.record(&id, &change).await.unwrap();
Expand All @@ -457,6 +516,7 @@ mod tests {
id: make_id("object-key"),
new: None,
old: Some(make_id("object-key/rev1")),
cleanup_after: None,
};

log.record(&id, &change).await.unwrap();
Expand Down Expand Up @@ -495,6 +555,7 @@ mod tests {
id: make_id("crash-test"),
new: Some(make_id("crash-test/rev")),
old: None,
cleanup_after: None,
}))
.unwrap()
// Runtime drops here while `guard` is still alive outside it.
Expand All @@ -508,4 +569,55 @@ mod tests {
let entries = rt.block_on(log.scan()).unwrap();
assert_eq!(entries.len(), 1, "log entry must persist");
}

#[tokio::test]
async fn scan_filters_by_cleanup_after() {
let log = InMemoryChangeLog::default();

let ready_id = ChangeId::new();
log.record(
&ready_id,
&Change {
id: make_id("ready"),
new: Some(make_id("ready/rev")),
old: None,
cleanup_after: None,
},
)
.await
.unwrap();

let expired_id = ChangeId::new();
log.record(
&expired_id,
&Change {
id: make_id("expired"),
new: Some(make_id("expired/rev")),
old: None,
cleanup_after: Some(SystemTime::now() - Duration::from_secs(1)),
},
)
.await
.unwrap();

let deferred_id = ChangeId::new();
log.record(
&deferred_id,
&Change {
id: make_id("deferred"),
new: Some(make_id("deferred/rev")),
old: None,
cleanup_after: Some(SystemTime::now() + Duration::from_hours(24)),
},
)
.await
.unwrap();

let entries = log.scan().await.unwrap();
assert_eq!(entries.len(), 2);

let ids: Vec<_> = entries.iter().map(|(id, _)| id).collect();
assert!(ids.contains(&&ready_id));
assert!(ids.contains(&&expired_id));
}
}
Loading
Loading