Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
85 commits
Select commit Hold shift + click to select a range
f6809a1
Add a default FileStatisticsCache implementation for the ListingTable
mkleen Jan 18, 2026
0ef1b06
fixup! Add a default FileStatisticsCache implementation for the Listi…
mkleen Jan 28, 2026
994aecd
Adapt memory usage when removing entries
mkleen Feb 4, 2026
5cf0b11
Adapt heapsize for &str
mkleen Feb 4, 2026
d8372f8
Fix formatting
mkleen Feb 4, 2026
de9789d
Adapt heapsize for &str and add another scalarvalue
mkleen Feb 4, 2026
e9cdb5f
Add better error message
mkleen Feb 10, 2026
f859150
Add todo to add heapsize for ordering in CachedFileMetadata
mkleen Feb 10, 2026
c7274d8
Fix comment/docs on DefaultFileStatisticsCache
mkleen Feb 10, 2026
24d91cb
Simplify test data generation
mkleen Feb 10, 2026
5adc600
Remove potential stale entry, if entry is too large
mkleen Feb 10, 2026
9a8b538
Fix typo in sql logic test comment
mkleen Feb 10, 2026
40587c7
Fix comment about default behaviour in cache manager
mkleen Feb 10, 2026
38b45f7
Fix variable name in test
mkleen Feb 10, 2026
b4c7b90
Fix variable name in test
mkleen Feb 10, 2026
e900ddb
Disable cache for sql logic test
mkleen Feb 10, 2026
e6c05b9
Include key into memory estimation
mkleen Feb 11, 2026
2fa8a61
Fix fmt
mkleen Feb 11, 2026
e4dea07
Fix clippy
mkleen Feb 11, 2026
241480d
minor
mkleen Feb 11, 2026
af56b06
Add more key memory accounting
mkleen Feb 12, 2026
43f7d64
Fix Formatting
mkleen Feb 12, 2026
47a5853
Account path as string and remove dependency to object_store
mkleen Feb 12, 2026
49478cc
Improve error handling
mkleen Feb 12, 2026
0dc67ea
Fix fmt
mkleen Feb 12, 2026
9ed4bce
Remove path.clone
mkleen Feb 12, 2026
69483d1
Simplify accounting for statistics
mkleen Feb 12, 2026
101a801
Adapt offset buffer
mkleen Feb 12, 2026
ec35784
Fix heap size for Arc
mkleen Feb 12, 2026
838708f
Adapt estimate in test
mkleen Feb 12, 2026
7f6f92a
Fix sql logic test
mkleen Feb 12, 2026
df3fff3
Register cache from cachemanager at listing table
mkleen Apr 8, 2026
d1b3a05
Revert slt
mkleen Apr 8, 2026
a6a4b2f
Add tablescoping for file stats cache
mkleen Feb 18, 2026
8874a76
Adapt slt
mkleen Apr 9, 2026
d37e7a7
Fix linter
mkleen Apr 9, 2026
9f054c4
Remove uneeded clone
mkleen Apr 9, 2026
376bd44
Rename cache_unit to file_statistics_cache
mkleen Apr 9, 2026
8177a7b
Simplify heap size accounting
mkleen Apr 9, 2026
a8d2c53
Adapt comments in test
mkleen Apr 10, 2026
58da87f
Seperate drop table clean-ups
mkleen Apr 10, 2026
6693819
fixup! Seperate drop table clean-ups
mkleen Apr 10, 2026
1f3812b
Increase default limit to 10 mb
mkleen Apr 15, 2026
2034e62
Increase default limit to 20 mb
mkleen Apr 15, 2026
f48d223
Fix comment
mkleen Apr 15, 2026
ab39f39
Fix deregister logic
mkleen Apr 15, 2026
9bb35db
Fix slt
mkleen Apr 15, 2026
8b543e5
Add table reference to FileStatisticsCacheEntry
mkleen Apr 15, 2026
5144ef5
fixup! Add table reference to FileStatisticsCacheEntry
mkleen Apr 15, 2026
ac06906
Fix comment
mkleen Apr 15, 2026
3c834b3
Fix runtime_env entry
mkleen Apr 19, 2026
ae34df2
Add cache for all benchmark runs
mkleen Apr 21, 2026
c60fb8c
Add cache to listing table creation
mkleen Apr 21, 2026
71fd2dd
fixup! Add cache to listing table creation
mkleen Apr 21, 2026
3fede3b
Adapt limit to 20M in configs.md
mkleen Apr 22, 2026
274e055
fixup! Adapt limit to 20M in configs.md
mkleen Apr 22, 2026
8b13a1a
Fix linter
mkleen Apr 22, 2026
66f8a5b
Add cache to listing table in _read_type()
mkleen Apr 22, 2026
aa0350d
Add ListView and LargeListView to heapsize
mkleen Apr 22, 2026
d072b7d
fixup! Add ListView and LargeListView to heapsize
mkleen Apr 22, 2026
fbd1d55
Remove array.slt
mkleen Apr 22, 2026
dd90888
Add table ref to ListingTableUrl
mkleen Apr 23, 2026
987ce88
Add heapsize for table-scoped-path
mkleen Apr 23, 2026
f3c39a9
Make list_entries table-scoped
mkleen Apr 23, 2026
630b44e
fixup! Make list_entries table-scoped
mkleen Apr 23, 2026
4a52cb9
fixup! fixup! Make list_entries table-scoped
mkleen Apr 23, 2026
ddf135a
Improve heap size estimation for Arc
mkleen Apr 26, 2026
0c6356e
fixup! Improve heap size estimation for Arc
mkleen Apr 27, 2026
3995e4e
Update migration guide
mkleen Apr 27, 2026
30da2c7
fixup! Update migration guide
mkleen Apr 27, 2026
e9c0ec9
Improve heapsize estimation for TableReference
mkleen Apr 29, 2026
326698b
Improve memory handling when inserting
mkleen Apr 29, 2026
e5a1049
Fix comments in Cache Manager
mkleen Apr 29, 2026
bd9d05c
Improve upgrade guide
mkleen Apr 29, 2026
2ec39ff
Fix upgrade guide
mkleen Apr 29, 2026
c56eb5f
Return stale entries from cache
mkleen May 4, 2026
ef64cdc
Fix upgrade guide
mkleen May 4, 2026
78575c1
Fix Arc<str> heapsize test
mkleen May 5, 2026
cf7be58
Remove const i32 cast from heapsize estimation
mkleen May 5, 2026
fb25a1a
Fix heapsize estimation for Arc<T>
mkleen May 5, 2026
7dabd91
Fix comment in cache_manager
mkleen May 5, 2026
26ed54c
Fix linter + clippy
mkleen May 5, 2026
457032a
Adapt test acording to heapsize estimation changes
mkleen May 5, 2026
953cf60
Merge branch 'main' into file-stats-cache
mkleen May 5, 2026
14a8c74
Merge branch 'main' into file-stats-cache
alamb May 6, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion benchmarks/src/bin/external_aggr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,9 @@ impl ExternalAggrConfig {
let config = ListingTableConfig::new(table_path).with_listing_options(options);
let config = config.infer_schema(&state).await?;

Ok(Arc::new(ListingTable::try_new(config)?))
Ok(Arc::new(ListingTable::try_new(config)?.with_cache(
ctx.runtime_env().cache_manager.get_file_statistic_cache(),
)))
}

fn iterations(&self) -> usize {
Expand Down
4 changes: 3 additions & 1 deletion benchmarks/src/imdb/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,9 @@ impl RunOpt {
_ => unreachable!(),
};

Ok(Arc::new(ListingTable::try_new(config)?))
Ok(Arc::new(ListingTable::try_new(config)?.with_cache(
ctx.runtime_env().cache_manager.get_file_statistic_cache(),
)))
}

fn iterations(&self) -> usize {
Expand Down
4 changes: 3 additions & 1 deletion benchmarks/src/sort_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,9 @@ impl RunOpt {
.with_listing_options(options)
.with_schema(schema);

Ok(Arc::new(ListingTable::try_new(config)?))
Ok(Arc::new(ListingTable::try_new(config)?.with_cache(
ctx.runtime_env().cache_manager.get_file_statistic_cache(),
)))
}

fn iterations(&self) -> usize {
Expand Down
4 changes: 3 additions & 1 deletion benchmarks/src/sort_tpch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,9 @@ impl RunOpt {
.with_listing_options(options)
.with_schema(schema);

Ok(Arc::new(ListingTable::try_new(config)?))
Ok(Arc::new(ListingTable::try_new(config)?.with_cache(
ctx.runtime_env().cache_manager.get_file_statistic_cache(),
)))
}

fn iterations(&self) -> usize {
Expand Down
4 changes: 3 additions & 1 deletion benchmarks/src/tpcds/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,9 @@ impl RunOpt {
.with_listing_options(options)
.with_schema(schema);

Ok(Arc::new(ListingTable::try_new(config)?))
Ok(Arc::new(ListingTable::try_new(config)?.with_cache(
ctx.runtime_env().cache_manager.get_file_statistic_cache(),
)))
}

fn iterations(&self) -> usize {
Expand Down
4 changes: 3 additions & 1 deletion benchmarks/src/tpch/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,9 @@ impl RunOpt {
.with_listing_options(options)
.with_schema(schema);

Ok(Arc::new(ListingTable::try_new(config)?))
Ok(Arc::new(ListingTable::try_new(config)?.with_cache(
ctx.runtime_env().cache_manager.get_file_statistic_cache(),
)))
}

fn iterations(&self) -> usize {
Expand Down
58 changes: 1 addition & 57 deletions datafusion-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,10 +443,7 @@ mod tests {
use super::*;
use datafusion::{
common::test_util::batches_to_string,
execution::cache::{
DefaultListFilesCache, cache_manager::CacheManagerConfig,
cache_unit::DefaultFileStatisticsCache,
},
execution::cache::{DefaultListFilesCache, cache_manager::CacheManagerConfig},
prelude::{ParquetReadOptions, col, lit, split_part},
};
use insta::assert_snapshot;
Expand Down Expand Up @@ -656,8 +653,6 @@ mod tests {
Ok(())
}

/// Shows that the statistics cache is not enabled by default yet
/// See https://github.com/apache/datafusion/issues/19217
#[tokio::test]
async fn test_statistics_cache_default() -> Result<(), DataFusionError> {
let ctx = SessionContext::new();
Expand Down Expand Up @@ -687,57 +682,6 @@ mod tests {
.await?;
}

// When the cache manager creates a StatisticsCache by default,
// the contents will show up here
let sql = "SELECT split_part(path, '/', -1) as filename, file_size_bytes, num_rows, num_columns, table_size_bytes from statistics_cache() order by filename";
let df = ctx.sql(sql).await?;
let rbs = df.collect().await?;
assert_snapshot!(batches_to_string(&rbs),@r"
++
++
");

Ok(())
}

// Can be removed when https://github.com/apache/datafusion/issues/19217 is resolved
#[tokio::test]
async fn test_statistics_cache_override() -> Result<(), DataFusionError> {
// Install a specific StatisticsCache implementation
let file_statistics_cache = Arc::new(DefaultFileStatisticsCache::default());
let cache_config = CacheManagerConfig::default()
.with_files_statistics_cache(Some(file_statistics_cache.clone()));
let runtime = RuntimeEnvBuilder::new()
.with_cache_manager(cache_config)
.build()?;
let config = SessionConfig::new().with_collect_statistics(true);
let ctx = SessionContext::new_with_config_rt(config, Arc::new(runtime));

ctx.register_udtf(
"statistics_cache",
Arc::new(StatisticsCacheFunc::new(
ctx.task_ctx().runtime_env().cache_manager.clone(),
)),
);

for filename in [
"alltypes_plain",
"alltypes_tiny_pages",
"lz4_raw_compressed_larger",
] {
ctx.sql(
format!(
"create external table {filename}
stored as parquet
location '../parquet-testing/data/{filename}.parquet'",
)
.as_str(),
)
.await?
.collect()
.await?;
}

let sql = "SELECT split_part(path, '/', -1) as filename, file_size_bytes, num_rows, num_columns, table_size_bytes from statistics_cache() order by filename";
let df = ctx.sql(sql).await?;
let rbs = df.collect().await?;
Expand Down
1 change: 1 addition & 0 deletions datafusion/catalog-listing/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,7 @@ fn try_into_partitioned_file(

let mut pf: PartitionedFile = object_meta.into();
pf.partition_values = partition_values;
pf.table_reference.clone_from(table_path.get_table_ref());

Ok(Some(pf))
}
Expand Down
35 changes: 19 additions & 16 deletions datafusion/catalog-listing/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ use datafusion_datasource::{
};
use datafusion_execution::cache::TableScopedPath;
use datafusion_execution::cache::cache_manager::FileStatisticsCache;
use datafusion_execution::cache::cache_unit::DefaultFileStatisticsCache;
use datafusion_expr::dml::InsertOp;
use datafusion_expr::execution_props::ExecutionProps;
use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType};
Expand Down Expand Up @@ -187,7 +186,7 @@ pub struct ListingTable {
/// The SQL definition for this table, if any
definition: Option<String>,
/// Cache for collected file statistics
collected_statistics: Arc<dyn FileStatisticsCache>,
collected_statistics: Option<Arc<dyn FileStatisticsCache>>,
/// Constraints applied to this table
constraints: Constraints,
/// Column default expressions for columns that are not physically present in the data files
Expand Down Expand Up @@ -231,7 +230,7 @@ impl ListingTable {
schema_source,
options,
definition: None,
collected_statistics: Arc::new(DefaultFileStatisticsCache::default()),
collected_statistics: None,
constraints: Constraints::default(),
column_defaults: HashMap::new(),
expr_adapter_factory: config.expr_adapter_factory,
Expand Down Expand Up @@ -260,10 +259,8 @@ impl ListingTable {
/// Setting a statistics cache on the `SessionContext` can avoid refetching statistics
/// multiple times in the same session.
///
/// If `None`, creates a new [`DefaultFileStatisticsCache`] scoped to this query.
pub fn with_cache(mut self, cache: Option<Arc<dyn FileStatisticsCache>>) -> Self {
self.collected_statistics =
cache.unwrap_or_else(|| Arc::new(DefaultFileStatisticsCache::default()));
self.collected_statistics = cache;
self
}

Expand Down Expand Up @@ -802,11 +799,15 @@ impl ListingTable {
) -> datafusion_common::Result<(Arc<Statistics>, Option<LexOrdering>)> {
use datafusion_execution::cache::cache_manager::CachedFileMetadata;

let path = &part_file.object_meta.location;
let path = TableScopedPath {
table: part_file.table_reference.clone(),
path: part_file.object_meta.location.clone(),
};
let meta = &part_file.object_meta;

// Check cache first - if we have valid cached statistics and ordering
if let Some(cached) = self.collected_statistics.get(path)
if let Some(cache) = &self.collected_statistics
&& let Some(cached) = cache.get(&path)
&& cached.is_valid_for(meta)
{
// Return cached statistics and ordering
Expand All @@ -823,14 +824,16 @@ impl ListingTable {
let statistics = Arc::new(file_meta.statistics);

// Store in cache
self.collected_statistics.put(
path,
CachedFileMetadata::new(
meta.clone(),
Arc::clone(&statistics),
file_meta.ordering.clone(),
),
);
if let Some(cache) = &self.collected_statistics {
cache.put(
&path,
CachedFileMetadata::new(
meta.clone(),
Arc::clone(&statistics),
file_meta.ordering.clone(),
),
);
}

Ok((statistics, file_meta.ordering))
}
Expand Down
Loading
Loading