Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 96 additions & 32 deletions datafusion/execution/src/cache/list_files_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use std::mem::size_of;
use std::{
sync::{Arc, Mutex},
time::Duration,
Expand All @@ -25,6 +26,19 @@ use object_store::{ObjectMeta, path::Path};

use crate::cache::{CacheAccessor, cache_manager::ListFilesCache, lru_queue::LruQueue};

pub trait TimeProvider: Send + Sync + 'static {
fn now(&self) -> Instant;
}

#[derive(Debug, Default)]
pub struct SystemTimeProvider;

impl TimeProvider for SystemTimeProvider {
fn now(&self) -> Instant {
Instant::now()
}
}

/// Default implementation of [`ListFilesCache`]
///
/// Caches file metadata for file listing operations.
Expand All @@ -41,9 +55,15 @@ use crate::cache::{CacheAccessor, cache_manager::ListFilesCache, lru_queue::LruQ
/// Users should use the [`Self::get`] and [`Self::put`] methods. The
/// [`Self::get_with_extra`] and [`Self::put_with_extra`] methods simply call
/// `get` and `put`, respectively.
#[derive(Default)]
pub struct DefaultListFilesCache {
state: Mutex<DefaultListFilesCacheState>,
time_provider: Arc<dyn TimeProvider>,
}

impl Default for DefaultListFilesCache {
fn default() -> Self {
Self::new(DEFAULT_LIST_FILES_CACHE_MEMORY_LIMIT, None)
}
}

impl DefaultListFilesCache {
Expand All @@ -55,9 +75,16 @@ impl DefaultListFilesCache {
pub fn new(memory_limit: usize, ttl: Option<Duration>) -> Self {
Self {
state: Mutex::new(DefaultListFilesCacheState::new(memory_limit, ttl)),
time_provider: Arc::new(SystemTimeProvider),
}
}

#[cfg(test)]
pub(crate) fn with_time_provider(mut self, provider: Arc<dyn TimeProvider>) -> Self {
self.time_provider = provider;
self
}

/// Returns the cache's memory limit in bytes.
pub fn cache_limit(&self) -> usize {
self.state.lock().unwrap().memory_limit
Expand All @@ -83,14 +110,18 @@ struct ListFilesEntry {
}

impl ListFilesEntry {
fn try_new(metas: Arc<Vec<ObjectMeta>>, ttl: Option<Duration>) -> Option<Self> {
fn try_new(
metas: Arc<Vec<ObjectMeta>>,
ttl: Option<Duration>,
now: Instant,
) -> Option<Self> {
let size_bytes = (metas.capacity() * size_of::<ObjectMeta>())
+ metas.iter().map(meta_heap_bytes).reduce(|acc, b| acc + b)?;

Some(Self {
metas,
size_bytes,
expires: ttl.map(|t| Instant::now() + t),
expires: ttl.map(|t| now + t),
})
}
}
Expand Down Expand Up @@ -141,47 +172,54 @@ impl DefaultListFilesCacheState {
}
}

/// Returns the respective entry from the cache, if it exists and the entry has not expired.
/// Returns the respective entry from the cache, if it exists and the entry
/// has not expired by `now`.
///
/// If the entry exists it becomes the most recently used. If the entry has expired it is
/// removed from the cache
fn get(&mut self, key: &Path) -> Option<Arc<Vec<ObjectMeta>>> {
fn get(&mut self, key: &Path, now: Instant) -> Option<Arc<Vec<ObjectMeta>>> {
let entry = self.lru_queue.get(key)?;

match entry.expires {
Some(exp) if Instant::now() > exp => {
Some(exp) if now > exp => {
self.remove(key);
None
}
_ => Some(Arc::clone(&entry.metas)),
}
}

/// Checks if the respective entry is currently cached. If the entry has expired it is removed
/// from the cache.
/// Checks if the respective entry is currently cached.
///
/// If the entry has expired by `now` it is removed from the cache.
///
/// The LRU queue is not updated.
fn contains_key(&mut self, k: &Path) -> bool {
fn contains_key(&mut self, k: &Path, now: Instant) -> bool {
let Some(entry) = self.lru_queue.peek(k) else {
return false;
};

match entry.expires {
Some(exp) if Instant::now() > exp => {
Some(exp) if now > exp => {
self.remove(k);
false
}
_ => true,
}
}

/// Adds a new key-value pair to cache, meaning LRU entries might be evicted if required.
/// Adds a new key-value pair to cache expiring at `now` + the TTL.
///
/// This means that LRU entries might be evicted if required.
/// If the key is already in the cache, the previous entry is returned.
/// If the size of the entry is greater than the `memory_limit`, the value is not inserted.
fn put(
&mut self,
key: &Path,
value: Arc<Vec<ObjectMeta>>,
now: Instant,
) -> Option<Arc<Vec<ObjectMeta>>> {
let entry = ListFilesEntry::try_new(value, self.ttl)?;
let entry = ListFilesEntry::try_new(value, self.ttl, now)?;
let entry_size = entry.size_bytes;

// no point in trying to add this value to the cache if it cannot fit entirely
Expand Down Expand Up @@ -263,7 +301,8 @@ impl CacheAccessor<Path, Arc<Vec<ObjectMeta>>> for DefaultListFilesCache {

fn get(&self, k: &Path) -> Option<Arc<Vec<ObjectMeta>>> {
let mut state = self.state.lock().unwrap();
state.get(k)
let now = self.time_provider.now();
state.get(k, now)
}

fn get_with_extra(&self, k: &Path, _e: &Self::Extra) -> Option<Arc<Vec<ObjectMeta>>> {
Expand All @@ -276,7 +315,8 @@ impl CacheAccessor<Path, Arc<Vec<ObjectMeta>>> for DefaultListFilesCache {
value: Arc<Vec<ObjectMeta>>,
) -> Option<Arc<Vec<ObjectMeta>>> {
let mut state = self.state.lock().unwrap();
state.put(key, value)
let now = self.time_provider.now();
state.put(key, value, now)
}

fn put_with_extra(
Expand All @@ -295,7 +335,8 @@ impl CacheAccessor<Path, Arc<Vec<ObjectMeta>>> for DefaultListFilesCache {

fn contains_key(&self, k: &Path) -> bool {
let mut state = self.state.lock().unwrap();
state.contains_key(k)
let now = self.time_provider.now();
state.contains_key(k, now)
}

fn len(&self) -> usize {
Expand All @@ -319,6 +360,31 @@ mod tests {
use chrono::DateTime;
use std::thread;

struct MockTimeProvider {
base: Instant,
offset: Mutex<Duration>,
}

impl MockTimeProvider {
fn new() -> Self {
Self {
base: Instant::now(),
offset: Mutex::new(Duration::ZERO),
}
}

fn inc(&self, duration: Duration) {
let mut offset = self.offset.lock().unwrap();
*offset += duration;
}
}

impl TimeProvider for MockTimeProvider {
fn now(&self) -> Instant {
self.base + *self.offset.lock().unwrap()
}
}

/// Helper function to create a test ObjectMeta with a specific path and location string size
fn create_test_object_meta(path: &str, location_size: usize) -> ObjectMeta {
// Create a location string of the desired size by padding with zeros
Expand Down Expand Up @@ -565,9 +631,6 @@ mod tests {
}

#[test]
// Ignored due to flakiness in CI. See
// https://github.com/apache/datafusion/issues/19114
#[ignore]
fn test_cache_with_ttl() {
let ttl = Duration::from_millis(100);
let cache = DefaultListFilesCache::new(10000, Some(ttl));
Expand Down Expand Up @@ -596,32 +659,32 @@ mod tests {
}

#[test]
// Ignored due to flakiness in CI. See
// https://github.com/apache/datafusion/issues/19114
#[ignore]
fn test_cache_with_ttl_and_lru() {
let ttl = Duration::from_millis(200);
let cache = DefaultListFilesCache::new(1000, Some(ttl));

let mock_time = Arc::new(MockTimeProvider::new());
let cache = DefaultListFilesCache::new(1000, Some(ttl))
.with_time_provider(Arc::clone(&mock_time) as Arc<dyn TimeProvider>);

let (path1, value1, _) = create_test_list_files_entry("path1", 1, 400);
let (path2, value2, _) = create_test_list_files_entry("path2", 1, 400);
let (path3, value3, _) = create_test_list_files_entry("path3", 1, 400);

cache.put(&path1, value1);
thread::sleep(Duration::from_millis(50));
mock_time.inc(Duration::from_millis(50));
cache.put(&path2, value2);
thread::sleep(Duration::from_millis(50));
mock_time.inc(Duration::from_millis(50));

// path3 should evict path1 due to size limit
cache.put(&path3, value3);
assert!(!cache.contains_key(&path1)); // Evicted by LRU
assert!(cache.contains_key(&path2));
assert!(cache.contains_key(&path3));

// Wait for path2 to expire
thread::sleep(Duration::from_millis(150));
mock_time.inc(Duration::from_millis(151));

assert!(!cache.contains_key(&path2)); // Expired
assert!(cache.contains_key(&path3)); // Still valid
assert!(cache.contains_key(&path3)); // Still valid
}

#[test]
Expand Down Expand Up @@ -671,15 +734,16 @@ mod tests {
fn test_entry_creation() {
// Test with empty vector
let empty_vec: Arc<Vec<ObjectMeta>> = Arc::new(vec![]);
let entry = ListFilesEntry::try_new(empty_vec, None);
let now = Instant::now();
let entry = ListFilesEntry::try_new(empty_vec, None, now);
assert!(entry.is_none());

// Validate entry size
let metas: Vec<ObjectMeta> = (0..5)
.map(|i| create_test_object_meta(&format!("file{i}"), 30))
.collect();
let metas = Arc::new(metas);
let entry = ListFilesEntry::try_new(metas, None).unwrap();
let entry = ListFilesEntry::try_new(metas, None, now).unwrap();
assert_eq!(entry.metas.len(), 5);
// Size should be: capacity * sizeof(ObjectMeta) + (5 * 30) for heap bytes
let expected_size =
Expand All @@ -689,9 +753,9 @@ mod tests {
// Test with TTL
let meta = create_test_object_meta("file", 50);
let ttl = Duration::from_secs(10);
let entry = ListFilesEntry::try_new(Arc::new(vec![meta]), Some(ttl)).unwrap();
let created = Instant::now();
assert!(entry.expires.unwrap() > created);
let entry =
ListFilesEntry::try_new(Arc::new(vec![meta]), Some(ttl), now).unwrap();
assert!(entry.expires.unwrap() > now);
}

#[test]
Expand Down