Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions glidefs/src/block/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ pub struct PutExportRequest {
/// If set (with manifest_name), fork from this specific snapshot sequence.
#[serde(default)]
pub snapshot_sequence: Option<u64>,
/// Cooldown compaction window in flush cycles (0/unset = disabled). Defers
/// dead-ratio compaction of a chunk until it has been idle this many cycles;
/// cuts S3 PUT write-amp on overwrite-heavy DB volumes. Typical value: 8.
#[serde(default)]
pub compaction_cooldown: Option<u64>,
}

/// Optional request body for POST /api/exports/{name}/snapshot.
Expand Down Expand Up @@ -427,6 +432,7 @@ where
flush_threshold: put_req.flush_threshold,
flush_mode: put_req.flush_mode,
transport: put_req.transport,
compaction_cooldown: put_req.compaction_cooldown,
};

let t_handler = Instant::now();
Expand Down
44 changes: 44 additions & 0 deletions glidefs/src/block/flush_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
//! Manifest sync happens after every successful pack upload so that flushed packs
//! are immediately discoverable on cross-host recovery (host death without drain).

use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::Duration;

Expand Down Expand Up @@ -42,6 +43,9 @@ struct FlushResult {
/// Another host owns this export's manifest (ETag conflict).
/// When true, the scheduler should stop flushing entirely.
manifest_conflict: bool,
/// Chunk indices written this cycle. Feeds the per-chunk idle-age map used
/// by cooldown compaction.
touched_chunks: HashSet<u32>,
}

/// Execute the atomic flush+manifest+checkpoint cycle on the cache.
Expand Down Expand Up @@ -96,6 +100,7 @@ async fn flush_and_sync(
Some(FlushResult {
packs_uploaded: stats.packs_uploaded,
manifest_conflict: false,
touched_chunks: stats.touched_chunks,
})
}
Err(e) if e.is_manifest_conflict() => {
Expand All @@ -107,6 +112,7 @@ async fn flush_and_sync(
Some(FlushResult {
packs_uploaded: 0,
manifest_conflict: true,
touched_chunks: HashSet::new(),
})
}
Err(e) => {
Expand Down Expand Up @@ -149,11 +155,19 @@ pub async fn flush_scheduler(
metrics: Arc<ExportMetrics>,
flush_semaphore: Option<Arc<tokio::sync::Semaphore>>,
flush_threshold: usize,
compaction_cooldown: u64,
) {
info!("flush scheduler started");

const CHECKPOINT_INTERVAL: Duration = Duration::from_secs(5);

// Per-chunk idle age (flush cycles since last written) for cooldown
// compaction. Ephemeral: empty on (re)start — every chunk then looks
// freshly written (age 0) and dead-ratio compaction defers for up to
// `compaction_cooldown` cycles, which is the safe direction. Unused when
// `compaction_cooldown == 0`. Bounded to multi-pack chunks (pruned below).
let mut chunk_idle_age: HashMap<u32, u64> = HashMap::new();

// Backoff state: when flush fails (e.g., S3 down), wait before retrying
// to avoid a tight spin of failed flush_packs calls.
let mut flush_backoff = Duration::ZERO;
Expand Down Expand Up @@ -253,6 +267,7 @@ pub async fn flush_scheduler(

let start = std::time::Instant::now();
let mut packs_uploaded = 0usize;
let mut touched_chunks: HashSet<u32> = HashSet::new();

// Acquire global flush semaphore to limit how many exports
// prepare + upload pack data simultaneously (memory bound).
Expand Down Expand Up @@ -288,6 +303,7 @@ pub async fn flush_scheduler(
).await {
metrics.record_s3_put_latency(start.elapsed());
packs_uploaded = result.packs_uploaded;
touched_chunks = result.touched_chunks;
if result.manifest_conflict {
return;
}
Expand All @@ -307,12 +323,31 @@ pub async fn flush_scheduler(
}
} // compaction runs without holding the flush lock.

// Advance per-chunk idle ages for cooldown compaction: every
// tracked chunk ages one cycle, chunks written this cycle reset
// to 0, then prune chunks no longer multi-pack (compacted away or
// never a candidate) to bound the map to the churning set. No-op
// when cooldown is disabled.
if compaction_cooldown > 0 && packs_uploaded > 0 {
for age in chunk_idle_age.values_mut() {
*age += 1;
}
for &c in &touched_chunks {
chunk_idle_age.insert(c, 0);
}
let vm = volume_manifest.read();
chunk_idle_age
.retain(|idx, _| vm.chunks.get(idx).is_some_and(|e| e.packs.len() >= 2));
}

// Compaction runs outside the flush lock so it doesn't block
// concurrent drain/snapshot/flush operations.
if packs_uploaded > 0 {
match crate::block::write_cache::compact::compact_if_needed(
crate::block::write_cache::compact::DEFAULT_COMPACTION_THRESHOLD,
crate::block::write_cache::compact::DEFAULT_DEAD_RATIO_THRESHOLD,
compaction_cooldown,
&chunk_idle_age,
&content_store,
&pack_index_cache,
&volume_manifest,
Expand Down Expand Up @@ -707,6 +742,7 @@ mod tests {
metrics,
None,
0, // flush_threshold (manual-mode-equivalent for tests that fire their own notify)
0, // compaction_cooldown (disabled in scheduler tests)
)
.await;
});
Expand Down Expand Up @@ -763,6 +799,7 @@ mod tests {
metrics,
None,
0, // flush_threshold (manual-mode-equivalent for tests that fire their own notify)
0, // compaction_cooldown (disabled in scheduler tests)
)
.await;
});
Expand Down Expand Up @@ -845,6 +882,7 @@ mod tests {
metrics,
None,
DEFAULT_FLUSH_THRESHOLD, // auto-flush mode — startup must self-trigger
0, // compaction_cooldown (disabled in scheduler tests)
)
.await;
});
Expand Down Expand Up @@ -916,6 +954,7 @@ mod tests {
metrics,
None,
0, // manual mode — scheduler must NOT auto-fire flush_notify
0, // compaction_cooldown (disabled in scheduler tests)
)
.await;
});
Expand Down Expand Up @@ -977,6 +1016,7 @@ mod tests {
metrics,
None,
0, // flush_threshold (manual-mode-equivalent for tests that fire their own notify)
0, // compaction_cooldown (disabled in scheduler tests)
)
.await;
});
Expand Down Expand Up @@ -1066,6 +1106,7 @@ mod tests {
metrics,
None,
0, // flush_threshold (manual-mode-equivalent for tests that fire their own notify)
0, // compaction_cooldown (disabled in scheduler tests)
)
.await;
});
Expand Down Expand Up @@ -1151,6 +1192,7 @@ mod tests {
metrics,
None,
0, // flush_threshold (manual-mode-equivalent for tests that fire their own notify)
0, // compaction_cooldown (disabled in scheduler tests)
)
.await;
});
Expand Down Expand Up @@ -1228,6 +1270,7 @@ mod tests {
metrics,
None,
0, // flush_threshold (manual-mode-equivalent for tests that fire their own notify)
0, // compaction_cooldown (disabled in scheduler tests)
)
.await;
});
Expand Down Expand Up @@ -1324,6 +1367,7 @@ mod tests {
metrics,
Some(sem),
0, // flush_threshold (manual-mode-equivalent for tests that fire their own notify)
0, // compaction_cooldown (disabled in scheduler tests)
)
.await;
});
Expand Down
Loading
Loading