Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use datafusion::physical_plan::ExecutionPlan;
use datafusion_datasource::PartitionedFile;
use datafusion_datasource::file_groups::FileGroup;
use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
use datafusion_datasource_parquet::CachedParquetFileReaderFactory;
use datafusion_datasource_parquet::source::ParquetSource;
use datafusion_physical_plan::expressions::Column;
use quickwit_common::uri::Uri;
Expand Down Expand Up @@ -164,7 +165,7 @@ impl TableProvider for MetricsTableProvider {

async fn scan(
&self,
_state: &dyn Session,
state: &dyn Session,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
Expand Down Expand Up @@ -200,11 +201,20 @@ impl TableProvider for MetricsTableProvider {

// Configure ParquetSource with bloom filters + pushdown enabled
let table_schema: datafusion_datasource::TableSchema = self.schema.clone().into();
let metadata_cache = state.runtime_env().cache_manager.get_file_metadata_cache();
let object_store = state
.runtime_env()
.object_store(self.object_store_url.clone())?;
let reader_factory = Arc::new(CachedParquetFileReaderFactory::new(
object_store,
metadata_cache,
));
let parquet_source = ParquetSource::new(table_schema)
.with_bloom_filter_on_read(true)
.with_pushdown_filters(true)
.with_reorder_filters(true)
.with_enable_page_index(true);
.with_enable_page_index(true)
.with_parquet_file_reader_factory(reader_factory);

// Build the FileScanConfig
let mut builder =
Expand Down Expand Up @@ -348,8 +358,11 @@ fn splits_have_default_metrics_sort(splits: &[ParquetSplitMetadata]) -> bool {
#[cfg(test)]
mod tests {
use arrow::datatypes::{DataType, Field, Schema};
use datafusion::prelude::SessionContext;
use quickwit_parquet_engine::split::TimeRange;

use crate::sources::metrics::test_utils::TestSplitProvider;

use super::*;

fn schema_with_columns(columns: &[&str]) -> SchemaRef {
Expand Down Expand Up @@ -390,6 +403,31 @@ mod tests {
.build()
}

#[tokio::test]
async fn scan_installs_cached_parquet_reader_factory() {
let schema = schema_with_columns(&["metric_name", "timestamp_secs", "value"]);
let split_provider = Arc::new(TestSplitProvider::new(Vec::new()));
let provider =
MetricsTableProvider::new(schema, split_provider, Uri::for_test("file:///metrics"))
.unwrap();
let ctx = SessionContext::new();
let state = ctx.state();

let plan = provider.scan(&state, None, &[], None).await.unwrap();
let data_source_exec = plan
.as_any()
.downcast_ref::<DataSourceExec>()
.expect("metrics scan should produce DataSourceExec");
let (_file_scan, parquet_source) = data_source_exec
.downcast_to_file_source::<ParquetSource>()
.expect("metrics scan should use ParquetSource");

assert!(
parquet_source.parquet_file_reader_factory().is_some(),
"metrics scans should use DataFusion's metadata-caching parquet reader"
);
}

#[test]
fn metrics_output_ordering_stops_at_first_missing_sort_key() {
let schema = schema_with_columns(&["metric_name", "service", "timestamp_secs"]);
Expand Down
44 changes: 42 additions & 2 deletions quickwit/quickwit-df-core/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ use crate::task_estimator::DataSourceExecPartitionEstimator;
type CatalogProviderFactory = Arc<dyn Fn() -> Arc<dyn CatalogProvider> + Send + Sync>;
type SchemaProviderFactory = Arc<dyn Fn() -> Arc<dyn SchemaProvider> + Send + Sync>;

/// Default per-node DataFusion cache budget for file-embedded metadata such
/// as Parquet footers and page indexes.
pub const DEFAULT_FILE_METADATA_CACHE_LIMIT_BYTES: usize = 4usize * 1024 * 1024 * 1024;

#[derive(Clone)]
pub(crate) struct CatalogRegistration {
name: String,
Expand All @@ -80,6 +84,7 @@ pub struct DataFusionSessionBuilder {
task_estimator: Arc<dyn TaskEstimator + Send + Sync>,
memory_pool: Option<Arc<dyn MemoryPool>>,
object_store_registry: Option<Arc<dyn ObjectStoreRegistry>>,
file_metadata_cache_limit_bytes: usize,
runtime: Arc<RuntimeEnv>,
}

Expand All @@ -91,6 +96,10 @@ impl std::fmt::Debug for DataFusionSessionBuilder {
.field("num_catalogs", &self.catalog_registrations.len())
.field("num_schemas", &self.schema_registrations.len())
.field("distributed", &self.worker_resolver.is_some())
.field(
"file_metadata_cache_limit_bytes",
&self.file_metadata_cache_limit_bytes,
)
.finish()
}
}
Expand All @@ -103,6 +112,10 @@ impl Default for DataFusionSessionBuilder {

impl DataFusionSessionBuilder {
pub fn new() -> Self {
let runtime = RuntimeEnvBuilder::new()
.with_metadata_cache_limit(DEFAULT_FILE_METADATA_CACHE_LIMIT_BYTES)
.build_arc()
.expect("default DataFusion runtime should build");
Self {
runtime_plugins: Vec::new(),
substrait_extensions: Vec::new(),
Expand All @@ -112,12 +125,14 @@ impl DataFusionSessionBuilder {
task_estimator: Arc::new(DataSourceExecPartitionEstimator),
memory_pool: None,
object_store_registry: None,
runtime: Arc::new(RuntimeEnv::default()),
file_metadata_cache_limit_bytes: DEFAULT_FILE_METADATA_CACHE_LIMIT_BYTES,
runtime,
}
}

fn rebuild_runtime(&mut self) -> DFResult<()> {
let mut builder = RuntimeEnvBuilder::new();
let mut builder = RuntimeEnvBuilder::new()
.with_metadata_cache_limit(self.file_metadata_cache_limit_bytes);
if let Some(memory_pool) = &self.memory_pool {
builder = builder.with_memory_pool(Arc::clone(memory_pool));
}
Expand Down Expand Up @@ -176,6 +191,12 @@ impl DataFusionSessionBuilder {
Ok(self)
}

pub fn with_file_metadata_cache_limit(mut self, bytes: usize) -> DFResult<Self> {
self.file_metadata_cache_limit_bytes = bytes;
self.rebuild_runtime()?;
Ok(self)
}

pub fn with_object_store_registry(
mut self,
registry: Arc<dyn ObjectStoreRegistry>,
Expand Down Expand Up @@ -411,6 +432,21 @@ mod tests {
}
}

#[test]
fn file_metadata_cache_limit_defaults_to_four_gib_and_can_be_overridden() {
let builder = DataFusionSessionBuilder::new();
assert_eq!(
builder.runtime().cache_manager.get_metadata_cache_limit(),
DEFAULT_FILE_METADATA_CACHE_LIMIT_BYTES
);

let builder = builder.with_file_metadata_cache_limit(8 * 1024).unwrap();
assert_eq!(
builder.runtime().cache_manager.get_metadata_cache_limit(),
8 * 1024
);
}

#[test]
fn runtime_settings_compose_and_reinitialize_sources() {
let source_url = Url::parse("test://source").unwrap();
Expand Down Expand Up @@ -442,6 +478,10 @@ mod tests {
.and_then(|entry| entry.value),
Some("2K".to_string())
);
assert_eq!(
builder.runtime().cache_manager.get_metadata_cache_limit(),
DEFAULT_FILE_METADATA_CACHE_LIMIT_BYTES
);
assert!(
builder
.runtime()
Expand Down