Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions quickwit/quickwit-datafusion/src/object_store_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use datafusion::common::{DataFusionError, Result as DFResult};
use datafusion::execution::object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry};
use object_store::ObjectStore;
use quickwit_common::uri::Uri;
use quickwit_storage::StorageResolver;
use quickwit_storage::{StorageCache, StorageResolver};
use url::Url;

use crate::storage_bridge::QuickwitObjectStore;
Expand All @@ -69,6 +69,7 @@ pub struct QuickwitObjectStoreRegistry {
/// fallback.
default: DefaultObjectStoreRegistry,
storage_resolver: StorageResolver,
storage_cache: Option<Arc<dyn StorageCache>>,
/// Lazy wrappers constructed by `get_store` on demand, keyed by
/// `scheme://authority`. Plain `RwLock<HashMap>` is fine — contention
/// is negligible because the write lock is only taken once per unique
Expand All @@ -92,10 +93,16 @@ impl QuickwitObjectStoreRegistry {
Self {
default: DefaultObjectStoreRegistry::new(),
storage_resolver,
storage_cache: None,
lazy_stores: RwLock::new(HashMap::new()),
}
}

pub fn with_storage_cache(mut self, storage_cache: Arc<dyn StorageCache>) -> Self {
self.storage_cache = Some(storage_cache);
self
}

/// Canonical cache key mirroring DataFusion's `DefaultObjectStoreRegistry`:
/// `scheme://authority`. Preserves the authority so indexes in
/// different buckets stay distinct; paths within an authority share
Expand Down Expand Up @@ -138,8 +145,11 @@ impl ObjectStoreRegistry for QuickwitObjectStoreRegistry {
"failed to build Quickwit URI from `{key}`: {err}"
))))
})?;
let store: Arc<dyn ObjectStore> =
Arc::new(QuickwitObjectStore::new(uri, self.storage_resolver.clone()));
let mut quickwit_store = QuickwitObjectStore::new(uri, self.storage_resolver.clone());
if let Some(storage_cache) = &self.storage_cache {
quickwit_store = quickwit_store.with_storage_cache(Arc::clone(storage_cache));
}
let store: Arc<dyn ObjectStore> = Arc::new(quickwit_store);
let mut write = self
.lazy_stores
.write()
Expand Down
2 changes: 2 additions & 0 deletions quickwit/quickwit-datafusion/src/sources/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub(crate) mod factory;
pub(crate) mod index_resolver;
pub(crate) mod metastore_provider;
pub(crate) mod optimizer;
pub(crate) mod parquet_cache_metrics;
pub(crate) mod predicate;
pub(crate) mod sketch_udf;
pub(crate) mod table_provider;
Expand All @@ -50,6 +51,7 @@ use quickwit_proto::metastore::{MetastoreError, MetastoreServiceClient};
use self::factory::{METRICS_FILE_TYPE, MetricsTableProviderFactory, SKETCHES_FILE_TYPE};
use self::index_resolver::{MetastoreIndexResolver, MetricsIndexResolver};
use self::optimizer::SortedSeriesStreamingAggregateRule;
pub use self::parquet_cache_metrics::instrument_parquet_range_cache_metrics;
use self::sketch_udf::{create_dd_quantile_udf, create_dd_sketch_udaf};
use self::table_provider::MetricsTableProvider;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
// Copyright 2021-Present Datadog, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! Per-query cache metrics for metrics parquet scans.

use std::ops::Range;
use std::sync::Arc;

use bytes::Bytes;
use datafusion::common::tree_node::{Transformed, TreeNode};
use datafusion::datasource::source::DataSourceExec;
use datafusion::error::Result as DFResult;
use datafusion::parquet;
use datafusion::parquet::arrow::arrow_reader::ArrowReaderOptions;
use datafusion::parquet::arrow::async_reader::AsyncFileReader;
use datafusion::parquet::file::metadata::ParquetMetaData;
use datafusion::physical_plan::ExecutionPlan;
use datafusion_datasource::PartitionedFile;
use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
use datafusion_datasource_parquet::ParquetFileReaderFactory;
use datafusion_datasource_parquet::source::ParquetSource;
use datafusion_physical_plan::metrics::{
Count, ExecutionPlanMetricsSet, MetricBuilder, MetricType,
};
use futures::FutureExt;
use futures::future::BoxFuture;
use quickwit_storage::{
StorageCacheMetrics, StorageCacheMetricsSnapshot, with_storage_cache_metrics,
};
use tracing::warn;

pub(super) fn instrument_parquet_file_reader_factory(
inner: Arc<dyn ParquetFileReaderFactory>,
) -> Arc<dyn ParquetFileReaderFactory> {
Arc::new(InstrumentedParquetFileReaderFactory { inner })
}

pub fn instrument_parquet_range_cache_metrics(
plan: Arc<dyn ExecutionPlan>,
) -> Arc<dyn ExecutionPlan> {
match Arc::clone(&plan).transform_up(|plan| {
if let Some(rewritten) = instrument_parquet_scan(&plan) {
Ok(Transformed::yes(rewritten))
} else {
Ok(Transformed::no(plan))
}
}) {
Ok(transformed) => transformed.data,
Err(error) => {
warn!(%error, "failed to install parquet cache metrics on worker plan");
plan
}
}
}

fn instrument_parquet_scan(plan: &Arc<dyn ExecutionPlan>) -> Option<Arc<dyn ExecutionPlan>> {
let data_source_exec = plan.as_any().downcast_ref::<DataSourceExec>()?;
let (file_scan_config, parquet_source) =
data_source_exec.downcast_to_file_source::<ParquetSource>()?;
let reader_factory = parquet_source.parquet_file_reader_factory()?.clone();

let parquet_source = parquet_source
.clone()
.with_parquet_file_reader_factory(instrument_parquet_file_reader_factory(reader_factory));
let file_scan_config = FileScanConfigBuilder::from(file_scan_config.clone())
.with_source(Arc::new(parquet_source))
.build();
let rewritten: Arc<dyn ExecutionPlan> = DataSourceExec::from_data_source(file_scan_config);
Some(rewritten)
}

#[derive(Debug)]
struct InstrumentedParquetFileReaderFactory {
inner: Arc<dyn ParquetFileReaderFactory>,
}

impl ParquetFileReaderFactory for InstrumentedParquetFileReaderFactory {
fn create_reader(
&self,
partition_index: usize,
partitioned_file: PartitionedFile,
metadata_size_hint: Option<usize>,
metrics: &ExecutionPlanMetricsSet,
) -> DFResult<Box<dyn AsyncFileReader + Send>> {
let cache_metrics = Arc::new(StorageCacheMetrics::default());
let counters = StorageCacheMetricCounters::new(partition_index, metrics);
let reader = self.inner.create_reader(
partition_index,
partitioned_file,
metadata_size_hint,
metrics,
)?;
Ok(Box::new(StorageCacheObservedReader {
inner: reader,
cache_metrics,
counters,
}))
}
}

#[derive(Clone)]
struct StorageCacheMetricCounters {
range_hit_bytes: Count,
range_miss_bytes: Count,
range_hit_count: Count,
range_miss_count: Count,
footer_hit_count: Count,
footer_miss_count: Count,
}

impl StorageCacheMetricCounters {
fn new(partition_index: usize, metrics: &ExecutionPlanMetricsSet) -> Self {
let builder = || MetricBuilder::new(metrics).with_type(MetricType::SUMMARY);
Self {
range_hit_bytes: builder().counter("parquet_range_cache_hit_bytes", partition_index),
range_miss_bytes: builder().counter("parquet_range_cache_miss_bytes", partition_index),
range_hit_count: builder().counter("parquet_range_cache_hit_count", partition_index),
range_miss_count: builder().counter("parquet_range_cache_miss_count", partition_index),
footer_hit_count: builder().counter("parquet_footer_cache_hit_count", partition_index),
footer_miss_count: builder()
.counter("parquet_footer_cache_miss_count", partition_index),
}
}

fn record_range_delta(&self, delta: StorageCacheMetricsSnapshot) {
self.range_hit_bytes.add(delta.hit_bytes);
self.range_miss_bytes.add(delta.miss_bytes);
self.range_hit_count.add(delta.hit_count);
self.range_miss_count.add(delta.miss_count);
}

fn record_footer_observation(&self, delta: StorageCacheMetricsSnapshot) {
if delta.hit_count == 0 && delta.miss_count == 0 {
self.footer_hit_count.add(1);
} else {
self.footer_miss_count.add(1);
}
}
}

struct StorageCacheObservedReader {
inner: Box<dyn AsyncFileReader + Send>,
cache_metrics: Arc<StorageCacheMetrics>,
counters: StorageCacheMetricCounters,
}

fn observe_storage_cache_activity<'a, T>(
cache_metrics: Arc<StorageCacheMetrics>,
counters: StorageCacheMetricCounters,
before: StorageCacheMetricsSnapshot,
future: BoxFuture<'a, parquet::errors::Result<T>>,
) -> BoxFuture<'a, parquet::errors::Result<T>>
where
T: Send + 'a,
{
async move {
let result = with_storage_cache_metrics(Arc::clone(&cache_metrics), future).await;
let delta = cache_metrics.snapshot().saturating_delta_since(before);
counters.record_range_delta(delta);
result
}
.boxed()
}

fn observe_footer_cache_activity<'a>(
cache_metrics: Arc<StorageCacheMetrics>,
counters: StorageCacheMetricCounters,
before: StorageCacheMetricsSnapshot,
future: BoxFuture<'a, parquet::errors::Result<Arc<ParquetMetaData>>>,
) -> BoxFuture<'a, parquet::errors::Result<Arc<ParquetMetaData>>> {
async move {
let result = with_storage_cache_metrics(Arc::clone(&cache_metrics), future).await;
let delta = cache_metrics.snapshot().saturating_delta_since(before);
counters.record_range_delta(delta);
if result.is_ok() {
counters.record_footer_observation(delta);
}
result
}
.boxed()
}

impl AsyncFileReader for StorageCacheObservedReader {
fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
let before = self.cache_metrics.snapshot();
let cache_metrics = Arc::clone(&self.cache_metrics);
let counters = self.counters.clone();
let future = self.inner.get_bytes(range);
observe_storage_cache_activity(cache_metrics, counters, before, future)
}

fn get_byte_ranges(
&mut self,
ranges: Vec<Range<u64>>,
) -> BoxFuture<'_, parquet::errors::Result<Vec<Bytes>>>
where
Self: Send,
{
let before = self.cache_metrics.snapshot();
let cache_metrics = Arc::clone(&self.cache_metrics);
let counters = self.counters.clone();
let future = self.inner.get_byte_ranges(ranges);
observe_storage_cache_activity(cache_metrics, counters, before, future)
}

fn get_metadata<'a>(
&'a mut self,
options: Option<&'a ArrowReaderOptions>,
) -> BoxFuture<'a, parquet::errors::Result<Arc<ParquetMetaData>>> {
let before = self.cache_metrics.snapshot();
let cache_metrics = Arc::clone(&self.cache_metrics);
let counters = self.counters.clone();
let future = self.inner.get_metadata(options);
observe_footer_cache_activity(cache_metrics, counters, before, future)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ use regex_automata::dfa::{Automaton, dense};
use regex_automata::{Anchored, Input};
use tracing::debug;

use super::parquet_cache_metrics::instrument_parquet_file_reader_factory;
use super::predicate;

const METRICS_SORT_ORDER: &[&str] = &[
Expand Down Expand Up @@ -278,9 +279,8 @@ impl TableProvider for MetricsTableProvider {
let object_store = state
.runtime_env()
.object_store(self.object_store_url.clone())?;
let reader_factory = Arc::new(CachedParquetFileReaderFactory::new(
object_store,
metadata_cache,
let reader_factory = instrument_parquet_file_reader_factory(Arc::new(
CachedParquetFileReaderFactory::new(object_store, metadata_cache),
));
let parquet_source = ParquetSource::new(table_schema)
.with_bloom_filter_on_read(true)
Expand Down Expand Up @@ -663,9 +663,8 @@ mod tests {
ParquetSplitId, TAG_ENV, TAG_HOST, TAG_SERVICE, TimeRange,
};

use crate::sources::metrics::test_utils::TestSplitProvider;

use super::*;
use crate::sources::metrics::test_utils::TestSplitProvider;

fn schema_with_columns(columns: &[&str]) -> SchemaRef {
let fields = columns
Expand Down
Loading
Loading