Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions engine/packages/depot-client/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ use crate::{
},
vfs::{
NativeVfsHandle, SqliteVfs, SqliteVfsMetrics, VfsConfig, VfsPreloadHintSnapshot,
configure_connection_for_database, verify_batch_atomic_writes,
configure_connection_for_database, fetch_initial_main_page_from_envoy,
verify_batch_atomic_writes,
},
};

Expand Down Expand Up @@ -42,12 +43,17 @@ pub async fn open_database_from_envoy(
metrics: Option<Arc<dyn SqliteVfsMetrics>>,
) -> Result<NativeDatabaseHandle> {
let vfs_name = vfs_name_for_actor_database(&actor_id, generation);
let initial_main_page = fetch_initial_main_page_from_envoy(&handle, &actor_id)
.await
.map_err(|e| anyhow!("failed to fetch sqlite initial main page: {e}"))?;
let mut vfs_config = VfsConfig::default();
vfs_config.initial_main_page = initial_main_page;
let vfs = Arc::new(SqliteVfs::register(
&vfs_name,
handle,
actor_id.clone(),
rt_handle,
VfsConfig::default(),
vfs_config,
metrics.clone(),
)
.map_err(|e| anyhow!("failed to register sqlite VFS: {e}"))?);
Expand Down Expand Up @@ -162,6 +168,10 @@ impl NativeDatabaseHandle {
self.vfs.take_last_error()
}

pub fn sqlite_vfs_metrics(&self) -> crate::vfs::SqliteVfsMetricsSnapshot {
self.vfs.sqlite_vfs_metrics()
}

pub fn snapshot_preload_hints(&self) -> VfsPreloadHintSnapshot {
self.vfs.snapshot_preload_hints()
}
Expand Down
2 changes: 1 addition & 1 deletion engine/packages/depot-client/src/optimization_flags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub const DEFAULT_STARTUP_PRELOAD_MAX_BYTES: usize = 1024 * 1024;
pub const MAX_STARTUP_PRELOAD_MAX_BYTES: usize = 8 * 1024 * 1024;
pub const DEFAULT_STARTUP_PRELOAD_FIRST_PAGE_COUNT: u32 = 1;
pub const MAX_STARTUP_PRELOAD_FIRST_PAGE_COUNT: u32 = 256;
pub const DEFAULT_VFS_PAGE_CACHE_CAPACITY_PAGES: u64 = 50_000;
pub const DEFAULT_VFS_PAGE_CACHE_CAPACITY_PAGES: u64 = 4_096;
pub const MAX_VFS_PAGE_CACHE_CAPACITY_PAGES: u64 = 500_000;
pub const DEFAULT_VFS_PROTECTED_CACHE_PAGES: usize = 512;
pub const MAX_VFS_PROTECTED_CACHE_PAGES: usize = 8_192;
Expand Down
168 changes: 131 additions & 37 deletions engine/packages/depot-client/src/vfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ use rivet_envoy_client::handle::EnvoyHandle;
use rivet_envoy_protocol as protocol;
use tokio::runtime::Handle;

use crate::optimization_flags::{SqliteOptimizationFlags, sqlite_optimization_flags};
use crate::optimization_flags::{
SqliteOptimizationFlags, SqliteVfsPageCacheMode, sqlite_optimization_flags,
};

const DEFAULT_CACHE_CAPACITY_PAGES: u64 = 50_000;
const DEFAULT_PREFETCH_DEPTH: usize = 64;
const LEGACY_PREFETCH_DEPTH: usize = 16;
const DEFAULT_MAX_PREFETCH_BYTES: usize = 256 * 1024;
Expand Down Expand Up @@ -230,7 +231,9 @@ fn sqlite_now_ms() -> Result<i64> {

#[derive(Debug, Clone)]
pub struct VfsConfig {
pub page_cache_mode: SqliteVfsPageCacheMode,
pub cache_capacity_pages: u64,
pub initial_main_page: Option<Vec<u8>>,
pub prefetch_depth: usize,
pub adaptive_prefetch_depth: usize,
pub max_prefetch_bytes: usize,
Expand All @@ -254,35 +257,50 @@ impl Default for VfsConfig {
impl VfsConfig {
pub fn from_optimization_flags(flags: SqliteOptimizationFlags) -> Self {
Self {
cache_capacity_pages: DEFAULT_CACHE_CAPACITY_PAGES,
page_cache_mode: flags.vfs_page_cache_mode,
cache_capacity_pages: flags.vfs_page_cache_capacity_pages,
initial_main_page: None,
prefetch_depth: if flags.read_ahead {
DEFAULT_PREFETCH_DEPTH
} else {
LEGACY_PREFETCH_DEPTH
},
adaptive_prefetch_depth: DEFAULT_ADAPTIVE_PREFETCH_DEPTH,
max_prefetch_bytes: DEFAULT_MAX_PREFETCH_BYTES,
adaptive_max_prefetch_bytes: DEFAULT_ADAPTIVE_MAX_PREFETCH_BYTES,
max_pages_per_stage: DEFAULT_MAX_PAGES_PER_STAGE,
recent_hint_page_budget: if flags.recent_page_hints {
DEFAULT_RECENT_HINT_PAGE_BUDGET
} else {
0
},
recent_hint_range_budget: if flags.recent_page_hints {
DEFAULT_RECENT_HINT_RANGE_BUDGET
} else {
0
},
cache_hit_predictor_training: flags.cache_hit_predictor_training,
recent_page_hints: flags.recent_page_hints,
adaptive_read_ahead: flags.adaptive_read_ahead,
#[cfg(test)]
assert_batch_atomic: true,
}
adaptive_prefetch_depth: DEFAULT_ADAPTIVE_PREFETCH_DEPTH,
max_prefetch_bytes: DEFAULT_MAX_PREFETCH_BYTES,
adaptive_max_prefetch_bytes: DEFAULT_ADAPTIVE_MAX_PREFETCH_BYTES,
max_pages_per_stage: DEFAULT_MAX_PAGES_PER_STAGE,
recent_hint_page_budget: if flags.recent_page_hints {
DEFAULT_RECENT_HINT_PAGE_BUDGET
} else {
0
},
recent_hint_range_budget: if flags.recent_page_hints {
DEFAULT_RECENT_HINT_RANGE_BUDGET
} else {
0
},
cache_hit_predictor_training: flags.cache_hit_predictor_training,
recent_page_hints: flags.recent_page_hints,
adaptive_read_ahead: flags.adaptive_read_ahead,
#[cfg(test)]
assert_batch_atomic: true,
}
}

fn caches_target_pages(&self) -> bool {
self.page_cache_mode.caches_target_pages() && self.cache_capacity_pages > 0
}

fn caches_prefetched_pages(&self) -> bool {
self.page_cache_mode.caches_prefetched_pages() && self.cache_capacity_pages > 0
}

#[cfg(test)]
fn caches_startup_preloaded_pages(&self) -> bool {
self.page_cache_mode.caches_startup_preloaded_pages() && self.cache_capacity_pages > 0
}
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct VfsPreloadHintRange {
pub start_pgno: u32,
Expand Down Expand Up @@ -329,6 +347,11 @@ pub struct SqliteVfsMetricsSnapshot {
pub state_update_ns: u64,
pub total_ns: u64,
pub commit_count: u64,
pub page_cache_entries: u64,
pub page_cache_weighted_size: u64,
pub page_cache_capacity_pages: u64,
pub write_buffer_dirty_pages: u64,
pub db_size_pages: u64,
}

pub trait SqliteVfsMetrics: Send + Sync {
Expand Down Expand Up @@ -861,7 +884,7 @@ fn push_coalesced_range(ranges: &mut VecDeque<VfsPreloadHintRange>, range: VfsPr
impl VfsState {
fn new(config: &VfsConfig) -> Self {
let page_cache = Cache::builder()
.max_capacity(config.cache_capacity_pages)
.max_capacity(config.cache_capacity_pages.max(1))
.build();
page_cache.insert(1, empty_db_page());

Expand Down Expand Up @@ -889,6 +912,22 @@ impl VfsState {
}
self.page_cache.insert(1, page);
}

fn evict_pages_after_eof(&mut self) {
let db_size_pages = self.db_size_pages;
let stale_pgnos = self
.page_cache
.iter()
.filter_map(|entry| {
let pgno = *entry.0;
(pgno > db_size_pages).then_some(pgno)
})
.collect::<Vec<_>>();
for pgno in stale_pgnos {
self.page_cache.invalidate(&pgno);
}
self.page_cache.run_pending_tasks();
}
}

impl VfsContext {
Expand All @@ -901,10 +940,15 @@ impl VfsContext {
metrics: Option<Arc<dyn SqliteVfsMetrics>>,
) -> std::result::Result<Self, String> {
let mut state = VfsState::new(&config);
if let Some(page) = config.initial_main_page.clone() {
state.seed_main_page(page);
}
#[cfg(test)]
if let SqliteTransportInner::Direct(storage) = &*transport.inner {
if storage.is_strict_mode() {
if let Some(page) = fetch_initial_main_page(&transport, &runtime, &actor_id)? {
if config.initial_main_page.is_none()
&& let Some(page) = fetch_initial_main_page(&transport, &runtime, &actor_id)?
{
state.seed_main_page(page);
}
} else {
Expand All @@ -913,15 +957,13 @@ impl VfsContext {
state.db_size_pages = snapshot.db_size_pages;
state.page_cache.invalidate_all();
for (pgno, bytes) in snapshot.pages {
state.page_cache.insert(pgno, bytes);
if pgno == 1 || config.caches_startup_preloaded_pages() {
state.page_cache.insert(pgno, bytes);
}
}
}
}
}
#[cfg(not(test))]
if let Some(page) = fetch_initial_main_page(&transport, &runtime, &actor_id)? {
state.seed_main_page(page);
}

Ok(Self {
actor_id,
Expand Down Expand Up @@ -997,13 +1039,19 @@ impl VfsContext {
}

fn sqlite_vfs_metrics(&self) -> SqliteVfsMetricsSnapshot {
let state = self.state.read();
SqliteVfsMetricsSnapshot {
request_build_ns: self.commit_request_build_ns.load(Ordering::Relaxed),
serialize_ns: self.commit_serialize_ns.load(Ordering::Relaxed),
transport_ns: self.commit_transport_ns.load(Ordering::Relaxed),
state_update_ns: self.commit_state_update_ns.load(Ordering::Relaxed),
total_ns: self.commit_duration_ns_total.load(Ordering::Relaxed),
commit_count: self.commit_total.load(Ordering::Relaxed),
page_cache_entries: state.page_cache.entry_count(),
page_cache_weighted_size: state.page_cache.weighted_size(),
page_cache_capacity_pages: self.config.cache_capacity_pages,
write_buffer_dirty_pages: state.write_buffer.dirty.len() as u64,
db_size_pages: state.db_size_pages as u64,
}
}

Expand Down Expand Up @@ -1248,10 +1296,18 @@ impl VfsContext {

match response {
protocol::SqliteGetPagesResponse::SqliteGetPagesOk(ok) => {
let target_missing = missing.iter().copied().collect::<HashSet<_>>();
let page_cache = { self.state.read().page_cache.clone() };
for fetched in ok.pages {
if let Some(bytes) = &fetched.bytes {
page_cache.insert(fetched.pgno, bytes.clone());
let should_cache = if target_missing.contains(&fetched.pgno) {
self.config.caches_target_pages()
} else {
self.config.caches_prefetched_pages()
};
if should_cache {
page_cache.insert(fetched.pgno, bytes.clone());
}
}
resolved.insert(fetched.pgno, fetched.bytes);
}
Expand Down Expand Up @@ -1346,10 +1402,15 @@ impl VfsContext {
let mut state = self.state.write();
state.db_size_pages = request.new_db_size_pages;
for dirty_page in &request.dirty_pages {
state
.page_cache
.insert(dirty_page.pgno, dirty_page.bytes.clone());
if self.config.caches_target_pages() {
state
.page_cache
.insert(dirty_page.pgno, dirty_page.bytes.clone());
} else {
state.page_cache.invalidate(&dirty_page.pgno);
}
}
state.evict_pages_after_eof();
state.write_buffer.dirty.clear();
let state_update_ns = state_update_start.elapsed().as_nanos() as u64;
self.add_commit_phase_metrics(
Expand Down Expand Up @@ -1445,10 +1506,15 @@ impl VfsContext {
let mut state = self.state.write();
state.db_size_pages = request.new_db_size_pages;
for dirty_page in &request.dirty_pages {
state
.page_cache
.insert(dirty_page.pgno, dirty_page.bytes.clone());
if self.config.caches_target_pages() {
state
.page_cache
.insert(dirty_page.pgno, dirty_page.bytes.clone());
} else {
state.page_cache.invalidate(&dirty_page.pgno);
}
}
state.evict_pages_after_eof();
state.write_buffer.dirty.clear();
state.write_buffer.in_atomic_write = false;
let state_update_ns = state_update_start.elapsed().as_nanos() as u64;
Expand All @@ -1471,6 +1537,7 @@ impl VfsContext {
.dirty
.retain(|pgno, _| *pgno <= truncated_pages);
state.page_cache.invalidate_all();
state.page_cache.run_pending_tasks();
}
}

Expand Down Expand Up @@ -1544,6 +1611,7 @@ fn mark_dead_from_fence_commit_error(ctx: &VfsContext, err: &CommitBufferError)
}
}

#[cfg(test)]
fn fetch_initial_main_page(
transport: &SqliteTransport,
runtime: &Handle,
Expand All @@ -1556,6 +1624,28 @@ fn fetch_initial_main_page(
expected_head_txid: None,
}));

initial_main_page_from_response(actor_id, response)
}

pub async fn fetch_initial_main_page_from_envoy(
handle: &EnvoyHandle,
actor_id: &str,
) -> std::result::Result<Option<Vec<u8>>, String> {
let response = handle
.sqlite_get_pages(protocol::SqliteGetPagesRequest {
actor_id: actor_id.to_string(),
pgnos: vec![1],
expected_generation: None,
expected_head_txid: None,
})
.await;
initial_main_page_from_response(actor_id, response)
}

fn initial_main_page_from_response(
actor_id: &str,
response: anyhow::Result<protocol::SqliteGetPagesResponse>,
) -> std::result::Result<Option<Vec<u8>>, String> {
match response {
Ok(protocol::SqliteGetPagesResponse::SqliteGetPagesOk(ok)) => Ok(ok
.pages
Expand Down Expand Up @@ -2490,6 +2580,10 @@ impl SqliteVfs {
self.ctx.snapshot_preload_hints()
}

pub(crate) fn sqlite_vfs_metrics(&self) -> SqliteVfsMetricsSnapshot {
self.ctx.sqlite_vfs_metrics()
}

fn register_with_transport(
name: &str,
transport: SqliteTransport,
Expand Down
Loading
Loading