Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,15 @@ use std::ops::Bound;

use async_trait::async_trait;
use datafusion::error::Result as DFResult;
use quickwit_metastore::{
ListParquetSplitsQuery, ListParquetSplitsRequestExt, ListParquetSplitsResponseExt,
};
use quickwit_metastore::{ListParquetSplitsQuery, list_parquet_splits_paginated};
use quickwit_parquet_engine::split::{ParquetSplitKind, ParquetSplitMetadata};
use quickwit_proto::metastore::{
ListMetricsSplitsRequest, ListSketchSplitsRequest, MetastoreService, MetastoreServiceClient,
};
use quickwit_proto::metastore::MetastoreServiceClient;
use quickwit_proto::types::IndexUid;
use tracing::{debug, instrument};

use super::predicate::MetricsSplitQuery;
use super::table_provider::MetricsSplitProvider;

/// Per-page split count for metastore split pagination.
///
/// The list split RPCs are unary, so we need to page client side.
/// TODO: Use streaming RPCs for listing Parquet splits.
const SPLIT_PAGE_SIZE: usize = 200;

/// `MetricsSplitProvider` backed by the Quickwit metastore RPC.
#[derive(Debug, Clone)]
pub struct MetastoreSplitProvider {
Expand Down Expand Up @@ -73,74 +63,19 @@ impl MetricsSplitProvider for MetastoreSplitProvider {
)
)]
async fn list_splits(&self, query: &MetricsSplitQuery) -> DFResult<Vec<ParquetSplitMetadata>> {
let mut metastore_query = to_metastore_query(&self.index_uid, query);
metastore_query.limit = Some(SPLIT_PAGE_SIZE);

let mut splits: Vec<ParquetSplitMetadata> = Vec::new();
let mut num_pages: usize = 0;

loop {
let records = match self.split_kind {
ParquetSplitKind::Metrics => {
let request = ListMetricsSplitsRequest::try_from_query(
self.index_uid.clone(),
&metastore_query,
)
.map_err(|err| datafusion::error::DataFusionError::External(Box::new(err)))?;

self.metastore
.clone()
.list_metrics_splits(request)
.await
.map_err(|err| datafusion::error::DataFusionError::External(Box::new(err)))?
.deserialize_splits()
.map_err(|err| {
datafusion::error::DataFusionError::External(Box::new(err))
})?
}
ParquetSplitKind::Sketches => {
let request = ListSketchSplitsRequest::try_from_query(
self.index_uid.clone(),
&metastore_query,
)
.map_err(|err| datafusion::error::DataFusionError::External(Box::new(err)))?;

self.metastore
.clone()
.list_sketch_splits(request)
.await
.map_err(|err| datafusion::error::DataFusionError::External(Box::new(err)))?
.deserialize_splits()
.map_err(|err| {
datafusion::error::DataFusionError::External(Box::new(err))
})?
}
};

num_pages += 1;
let page_len = records.len();

// The metastore guarantees only Published splits are returned because
// `to_metastore_query` sets `split_states = vec![Published]`. No
// client-side re-filter is needed here.
splits.extend(records.into_iter().map(|record| record.metadata));

// A short page (fewer rows than we asked for) means we've drained
// the result set. The Postgres backend orders by `split_id ASC`
// and applies `split_id > $after_split_id` for the cursor, so the
// last metadata's split_id is the correct next cursor.
if page_len < SPLIT_PAGE_SIZE {
break;
}
let Some(last) = splits.last() else { break };
metastore_query.after_split_id = Some(last.split_id.as_str().to_string());
}
let metastore_query = to_metastore_query(&self.index_uid, query);
let records = list_parquet_splits_paginated(
self.metastore.clone(),
self.split_kind,
metastore_query,
)
.await
.map_err(|err| datafusion::error::DataFusionError::External(Box::new(err)))?;
let splits: Vec<ParquetSplitMetadata> =
records.into_iter().map(|record| record.metadata).collect();

tracing::Span::current().record("num_splits", splits.len());
debug!(
num_splits = splits.len(),
num_pages, "metastore returned splits"
);
debug!(num_splits = splits.len(), "metastore returned splits");

Ok(splits)
}
Expand Down
164 changes: 65 additions & 99 deletions quickwit/quickwit-index-management/src/parquet_garbage_collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ use std::time::Duration;
use anyhow::Context;
use quickwit_common::{Progress, is_sketches_index};
use quickwit_metastore::{
ListParquetSplitsQuery, ListParquetSplitsRequestExt, ListParquetSplitsResponseExt,
ParquetSplitRecord, SplitState,
ListParquetSplitsQuery, PARQUET_SPLITS_PAGE_SIZE, ParquetSplitRecord, SplitState,
list_parquet_splits_page, list_parquet_splits_paginated,
};
use quickwit_parquet_engine::split::ParquetSplitKind;
use quickwit_proto::metastore::{
DeleteMetricsSplitsRequest, DeleteSketchSplitsRequest, ListMetricsSplitsRequest,
ListSketchSplitsRequest, MarkMetricsSplitsForDeletionRequest,
DeleteMetricsSplitsRequest, DeleteSketchSplitsRequest, MarkMetricsSplitsForDeletionRequest,
MarkSketchSplitsForDeletionRequest, MetastoreService, MetastoreServiceClient,
};
use quickwit_proto::types::IndexUid;
Expand Down Expand Up @@ -69,9 +69,6 @@ impl ParquetSplitRemovalInfo {
}
}

/// Maximum number of parquet splits to process per paginated query.
const DELETE_PARQUET_SPLITS_BATCH_SIZE: usize = 10_000;

/// Runs garbage collection for parquet splits.
#[instrument(skip_all, fields(num_indexes=%indexes.len()))]
pub async fn run_parquet_garbage_collect(
Expand Down Expand Up @@ -177,22 +174,13 @@ async fn list_parquet_splits(
let query = ListParquetSplitsQuery::for_index(index_uid.clone())
.with_split_states(states)
.with_update_timestamp_lte(cutoff);

if is_sketches_index(&index_uid.index_id) {
let request = ListSketchSplitsRequest::try_from_query(index_uid.clone(), &query)
.context("failed to build list sketch splits request")?;
protect_future(progress_opt, metastore.list_sketch_splits(request))
.await?
.deserialize_splits()
.context("failed to deserialize sketch splits")
} else {
let request = ListMetricsSplitsRequest::try_from_query(index_uid.clone(), &query)
.context("failed to build list metrics splits request")?;
protect_future(progress_opt, metastore.list_metrics_splits(request))
.await?
.deserialize_splits()
.context("failed to deserialize metrics splits")
}
let kind = parquet_split_kind_for_index(index_uid);
protect_future(
progress_opt,
list_parquet_splits_paginated(metastore.clone(), kind, query),
)
.await
.context("failed to list parquet splits")
}

/// Marks the given splits for deletion in the metastore, grouped by index.
Expand All @@ -216,26 +204,32 @@ async fn mark_splits_for_deletion(

for (index_uid_str, split_ids) in splits_by_index {
let index_uid: IndexUid = index_uid_str.parse()?;
info!(index_uid=%index_uid, count=%split_ids.len(), "marking stale staged parquet splits for deletion");

if is_sketches_index(&index_uid.index_id) {
protect_future(
progress_opt,
metastore.mark_sketch_splits_for_deletion(MarkSketchSplitsForDeletionRequest {
index_uid: Some(index_uid),
split_ids,
}),
)
.await?;
} else {
protect_future(
progress_opt,
metastore.mark_metrics_splits_for_deletion(MarkMetricsSplitsForDeletionRequest {
index_uid: Some(index_uid),
split_ids,
}),
)
.await?;
let is_sketch = is_sketches_index(&index_uid.index_id);
for split_ids_chunk in split_ids.chunks(PARQUET_SPLITS_PAGE_SIZE) {
let split_ids = split_ids_chunk.to_vec();
info!(index_uid=%index_uid, count=%split_ids.len(), "marking stale staged parquet splits for deletion");

if is_sketch {
protect_future(
progress_opt,
metastore.mark_sketch_splits_for_deletion(MarkSketchSplitsForDeletionRequest {
index_uid: Some(index_uid.clone()),
split_ids,
}),
)
.await?;
} else {
protect_future(
progress_opt,
metastore.mark_metrics_splits_for_deletion(
MarkMetricsSplitsForDeletionRequest {
index_uid: Some(index_uid.clone()),
split_ids,
},
),
)
.await?;
}
}
}

Expand All @@ -255,75 +249,39 @@ async fn delete_marked_parquet_splits(

let mut query = ListParquetSplitsQuery::for_index(index_uid.clone())
.with_split_states(vec![SplitState::MarkedForDeletion])
.with_update_timestamp_lte(deletion_cutoff)
.with_limit(DELETE_PARQUET_SPLITS_BATCH_SIZE);
.with_update_timestamp_lte(deletion_cutoff);

let is_sketch = is_sketches_index(&index_uid.index_id);
let kind = parquet_split_kind_for_index(index_uid);

loop {
let sleep_duration = if let Some(max_rate) = get_maximum_split_deletion_rate_per_sec() {
Duration::from_secs(DELETE_PARQUET_SPLITS_BATCH_SIZE.div_ceil(max_rate) as u64)
Duration::from_secs(PARQUET_SPLITS_PAGE_SIZE.div_ceil(max_rate) as u64)
} else {
Duration::default()
};
let sleep_future = tokio::time::sleep(sleep_duration);

let splits: Vec<ParquetSplitRecord> = if is_sketch {
let request = match ListSketchSplitsRequest::try_from_query(index_uid.clone(), &query) {
Ok(req) => req,
Err(err) => {
error!(index_uid=%index_uid, error=?err, "failed to build list sketch splits request");
break;
}
};
match protect_future(progress_opt, metastore.list_sketch_splits(request)).await {
Ok(resp) => match resp.deserialize_splits() {
Ok(splits) => splits,
Err(err) => {
error!(index_uid=%index_uid, error=?err, "failed to deserialize sketch splits");
break;
}
},
Err(err) => {
error!(index_uid=%index_uid, error=?err, "failed to list sketch splits");
break;
}
}
} else {
let request = match ListMetricsSplitsRequest::try_from_query(index_uid.clone(), &query)
{
Ok(req) => req,
Err(err) => {
error!(index_uid=%index_uid, error=?err, "failed to build list metrics splits request");
break;
}
};
match protect_future(progress_opt, metastore.list_metrics_splits(request)).await {
Ok(resp) => match resp.deserialize_splits() {
Ok(splits) => splits,
Err(err) => {
error!(index_uid=%index_uid, error=?err, "failed to deserialize metrics splits");
break;
}
},
Err(err) => {
error!(index_uid=%index_uid, error=?err, "failed to list metrics splits");
break;
}
let page = match protect_future(
progress_opt,
list_parquet_splits_page(metastore, kind, &mut query),
)
.await
{
Ok(page) => page,
Err(err) => {
error!(index_uid=%index_uid, error=?err, "failed to list parquet splits");
break;
}
};
let splits = page.splits;

// We page through the list of splits to delete using a limit and a `search_after` trick.
// To detect if this is the last page, we check if the number of splits is less than the
// limit.
assert!(splits.len() <= DELETE_PARQUET_SPLITS_BATCH_SIZE);
let splits_to_delete_possibly_remaining = splits.len() == DELETE_PARQUET_SPLITS_BATCH_SIZE;
// The metastore helper advanced the cursor when the page was full.
assert!(splits.len() <= PARQUET_SPLITS_PAGE_SIZE);
let splits_to_delete_possibly_remaining = page.has_next_page;

// Set split after which to search for the next loop.
let Some(last_split) = splits.last() else {
if splits.is_empty() {
break;
};
query = query.with_after_split_id(last_split.metadata.split_id.to_string());
}

let (batch_succeeded, batch_failed) = delete_parquet_splits_from_storage_and_metastore(
metastore,
Expand All @@ -342,14 +300,22 @@ async fn delete_marked_parquet_splits(
sleep_future.await;
} else {
// Stop the GC if this was the last batch.
// We are guaranteed to make progress due to .with_after_split_id().
// The paginator advanced the cursor before this batch was processed.
break;
}
}

Ok(removal_info)
}

fn parquet_split_kind_for_index(index_uid: &IndexUid) -> ParquetSplitKind {
if is_sketches_index(&index_uid.index_id) {
ParquetSplitKind::Sketches
} else {
ParquetSplitKind::Metrics
}
}

/// Deletes a single batch of parquet splits from storage and metastore.
/// Returns (succeeded, failed).
async fn delete_parquet_splits_from_storage_and_metastore(
Expand Down
4 changes: 4 additions & 0 deletions quickwit/quickwit-indexing/src/actors/indexing_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,10 @@ impl IndexingService {
merge_scheduler_service: self.merge_scheduler_service.clone(),
max_concurrent_split_uploads: self.max_concurrent_split_uploads,
event_broker: self.event_broker.clone(),
skip_initial_seed: quickwit_common::get_bool_from_env(
super::metrics_pipeline::PARQUET_MERGE_SKIP_INITIAL_SEED_ENV_KEY,
false,
),
writer_config,
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
//! ```

mod indexing_service_impl;
mod parquet_compaction_metrics;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Add the declared compaction metrics module

In the reviewed commit, this new module declaration has no corresponding parquet_compaction_metrics.rs or parquet_compaction_metrics/mod.rs under quickwit-indexing/src/actors/metrics_pipeline (rg --files quickwit/quickwit-indexing/src/actors/metrics_pipeline | rg parquet_compaction_metrics returns nothing). Any build that includes quickwit-indexing will fail with Rust's missing-module error before reaching the pagination changes.

Useful? React with 👍 / 👎.

mod parquet_doc_processor;
mod parquet_indexer;
mod parquet_merge_executor;
Expand Down Expand Up @@ -55,7 +56,9 @@ pub use parquet_doc_processor::{
pub use parquet_indexer::{ParquetIndexer, ParquetIndexerCounters, ParquetSplitBatch};
pub use parquet_merge_executor::ParquetMergeExecutor;
pub use parquet_merge_messages::{ParquetMergeScratch, ParquetMergeTask, ParquetNewSplits};
pub use parquet_merge_pipeline::{ParquetMergePipeline, ParquetMergePipelineParams};
pub use parquet_merge_pipeline::{
PARQUET_MERGE_SKIP_INITIAL_SEED_ENV_KEY, ParquetMergePipeline, ParquetMergePipelineParams,
};
pub use parquet_merge_planner::ParquetMergePlanner;
pub use parquet_merge_split_downloader::ParquetMergeSplitDownloader;
pub use parquet_packager::{ParquetBatchForPackager, ParquetPackager, ParquetPackagerCounters};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -584,7 +584,7 @@ impl Actor for ParquetIndexer {
}

fn queue_capacity(&self) -> QueueCapacity {
QueueCapacity::Bounded(10)
QueueCapacity::Bounded(5)
}

fn name(&self) -> String {
Expand Down
Loading
Loading