-
Notifications
You must be signed in to change notification settings - Fork 541
Improvements from live testing #6404
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -36,21 +36,18 @@ use super::compaction_state::CompactionState; | |
| use super::index_config_metastore::{IndexConfigMetastore, IndexEntry}; | ||
| use crate::planner::metrics::COMPACTION_PLANNER_METRICS; | ||
|
|
||
| /// Cap on splits fetched per tick. Every tick, the planner re-scans the immature published set, | ||
| /// sorted by `maturity_timestamp` ASC so the most-urgent splits are processed first when a backlog | ||
| /// exists. Splits beyond this cap aren't lost -- they bubble into range as the front of the queue | ||
| /// is merged off. | ||
| const SCAN_PAGE_SIZE: usize = 5_000; | ||
| #[derive(Debug)] | ||
| pub struct CompactionPlanner { | ||
| state: CompactionState, | ||
| index_config_metastore: IndexConfigMetastore, | ||
| cursor: i64, | ||
| metastore: MetastoreServiceClient, | ||
| } | ||
|
|
||
| impl Debug for CompactionPlanner { | ||
| fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | ||
| f.debug_struct("CompactionPlanner") | ||
| .field("cursor", &self.cursor) | ||
| .finish() | ||
| } | ||
| } | ||
|
|
||
| const SCAN_AND_PLAN_INTERVAL: Duration = Duration::from_secs(5); | ||
| /// On initialization, we want to wait for two intervals to allow any in-progress workers to report | ||
| /// their progress, preventing us from frivolously rescheduling work. | ||
|
|
@@ -112,15 +109,11 @@ impl Handler<ReportStatusRequest> for CompactionPlanner { | |
| } | ||
| } | ||
|
|
||
| const STARTUP_LOOKBACK: Duration = Duration::from_secs(24 * 60 * 60); | ||
|
|
||
| impl CompactionPlanner { | ||
| pub fn new(metastore: MetastoreServiceClient) -> Self { | ||
| let cursor = OffsetDateTime::now_utc().unix_timestamp() - STARTUP_LOOKBACK.as_secs() as i64; | ||
| CompactionPlanner { | ||
| state: CompactionState::new(), | ||
| index_config_metastore: IndexConfigMetastore::new(metastore.clone()), | ||
| cursor, | ||
| metastore, | ||
| } | ||
| } | ||
|
|
@@ -141,8 +134,6 @@ impl CompactionPlanner { | |
| if index_entry.is_split_mature(&split.split_metadata) { | ||
| continue; | ||
| } | ||
| self.cursor = self.cursor.max(split.update_timestamp); | ||
| info!(max_timestamp=%self.cursor, "[compaction planner] update metastore cursor min_timestamp cursor"); | ||
| self.state.track_split(split.split_metadata); | ||
| } | ||
| } | ||
|
|
@@ -151,7 +142,8 @@ impl CompactionPlanner { | |
| let query = ListSplitsQuery::for_all_indexes() | ||
| .with_split_state(SplitState::Published) | ||
| .retain_immature(OffsetDateTime::now_utc()) | ||
| .with_update_timestamp_gte(self.cursor); | ||
| .sort_by_maturity_timestamp() | ||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. investigate sending NOT IN in flight merges |
||
| .with_limit(SCAN_PAGE_SIZE); | ||
| let request = ListSplitsRequest::try_from_list_splits_query(&query)?; | ||
| let splits = self | ||
| .metastore | ||
|
|
@@ -237,7 +229,7 @@ fn emit_metastore_scan_metrics(new_splits: &[Split]) { | |
| COMPACTION_PLANNER_METRICS | ||
| .new_splits_scanned | ||
| .with_label_values([&index_uid.to_string()]) | ||
| .set(count as i64); | ||
| .inc_by(count as u64); | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -270,6 +262,7 @@ fn build_task_assignment( | |
|
|
||
| #[cfg(test)] | ||
| mod tests { | ||
| use std::ops::Bound; | ||
| use std::time::Duration; | ||
|
|
||
| use quickwit_common::ServiceStream; | ||
|
|
@@ -278,14 +271,15 @@ mod tests { | |
| ConstWriteAmplificationMergePolicyConfig, MergePolicyConfig, | ||
| }; | ||
| use quickwit_metastore::{ | ||
| IndexMetadata, IndexMetadataResponseExt, ListSplitsResponseExt, Split, SplitMaturity, | ||
| SplitMetadata, SplitState, | ||
| IndexMetadata, IndexMetadataResponseExt, ListSplitsRequestExt, ListSplitsResponseExt, | ||
| SortBy, Split, SplitMaturity, SplitMetadata, SplitState, | ||
| }; | ||
| use quickwit_proto::compaction::CompactionSuccess; | ||
| use quickwit_proto::metastore::{ | ||
| IndexMetadataResponse, ListSplitsResponse, MetastoreError, MockMetastoreService, | ||
| }; | ||
| use quickwit_proto::types::IndexUid; | ||
| use time::OffsetDateTime; | ||
|
|
||
| use super::*; | ||
|
|
||
|
|
@@ -334,30 +328,47 @@ mod tests { | |
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn test_scan_metastore() { | ||
| async fn test_scan_metastore_query_shape_and_passthrough() { | ||
| let index_uid = IndexUid::for_test("test-index", 0); | ||
| let splits = vec![ | ||
| test_split("split-1", &index_uid, 1000), | ||
| test_split("split-2", &index_uid, 2000), | ||
| let returned_splits = vec![ | ||
| test_split("a", &index_uid, 1000), | ||
| test_split("b", &index_uid, 2000), | ||
| ]; | ||
| let splits_clone = splits.clone(); | ||
| let returned_clone = returned_splits.clone(); | ||
| let scan_started_at = OffsetDateTime::now_utc().unix_timestamp(); | ||
|
|
||
| let mut mock = MockMetastoreService::new(); | ||
| mock.expect_list_splits().returning(move |_| { | ||
| let response = ListSplitsResponse::try_from_splits(splits_clone.clone()).unwrap(); | ||
| mock.expect_list_splits().returning(move |req| { | ||
| let query = req.deserialize_list_splits_query().unwrap(); | ||
|
|
||
| assert_eq!(query.split_states, vec![SplitState::Published]); | ||
| assert_eq!(query.limit, Some(SCAN_PAGE_SIZE)); | ||
| assert_eq!(query.sort_by, SortBy::MaturityTimestamp); | ||
|
|
||
| let Bound::Excluded(mature_at) = query.mature else { | ||
| panic!("expected Excluded mature bound, got {:?}", query.mature); | ||
| }; | ||
| let now_secs = OffsetDateTime::now_utc().unix_timestamp(); | ||
| assert!(mature_at.unix_timestamp() >= scan_started_at); | ||
| assert!(mature_at.unix_timestamp() <= now_secs); | ||
|
|
||
| assert_eq!(query.update_timestamp.start, Bound::Unbounded); | ||
| assert_eq!(query.update_timestamp.end, Bound::Unbounded); | ||
|
|
||
| let response = ListSplitsResponse::try_from_splits(returned_clone.clone()).unwrap(); | ||
| Ok(ServiceStream::from(vec![Ok(response)])) | ||
| }); | ||
|
|
||
| let planner = CompactionPlanner::new(MetastoreServiceClient::from_mock(mock)); | ||
| let result = planner.scan_metastore().await.unwrap(); | ||
|
|
||
| assert_eq!(result.len(), 2); | ||
| assert_eq!(result[0].split_metadata.split_id, "split-1"); | ||
| assert_eq!(result[1].split_metadata.split_id, "split-2"); | ||
| assert_eq!(result[0].split_metadata.split_id, "a"); | ||
| assert_eq!(result[1].split_metadata.split_id, "b"); | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn test_ingest_splits_dedup_maturity_and_cursor() { | ||
| async fn test_ingest_splits_dedups_and_skips_mature() { | ||
| let index_metadata = test_index_metadata(); | ||
| let response = test_index_metadata_response(&index_metadata); | ||
| let index_uid = index_metadata.index_uid.clone(); | ||
|
|
@@ -367,7 +378,6 @@ mod tests { | |
| .returning(move |_| Ok(response.clone())); | ||
|
|
||
| let mut planner = CompactionPlanner::new(MetastoreServiceClient::from_mock(mock)); | ||
| planner.cursor = 0; | ||
|
|
||
| // Pre-populate: "in-flight" is already being compacted. | ||
| planner.state.track_split(SplitMetadata { | ||
|
|
@@ -390,11 +400,10 @@ mod tests { | |
| assert!(planner.state.is_split_tracked("fresh")); | ||
| assert!(planner.state.is_split_tracked("in-flight")); | ||
| assert!(!planner.state.is_split_tracked("mature")); | ||
| assert_eq!(planner.cursor, 3000); | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn test_scan_and_plan_metastore_error() { | ||
| async fn test_scan_and_plan_propagates_metastore_error() { | ||
| let mut mock = MockMetastoreService::new(); | ||
| mock.expect_list_splits().returning(|_| { | ||
| Err(MetastoreError::Internal { | ||
|
|
@@ -404,11 +413,7 @@ mod tests { | |
| }); | ||
|
|
||
| let mut planner = CompactionPlanner::new(MetastoreServiceClient::from_mock(mock)); | ||
| let original_cursor = planner.cursor; | ||
|
|
||
| let result = planner.scan_and_plan().await; | ||
| assert!(result.is_err()); | ||
| assert_eq!(planner.cursor, original_cursor); | ||
| assert!(planner.scan_and_plan().await.is_err()); | ||
| } | ||
|
|
||
| #[tokio::test] | ||
|
|
@@ -425,7 +430,6 @@ mod tests { | |
| }); | ||
|
|
||
| let mut planner = CompactionPlanner::new(MetastoreServiceClient::from_mock(mock)); | ||
| planner.cursor = 0; | ||
| planner.ingest_splits(splits).await; | ||
|
|
||
| assert!(!planner.state.is_split_tracked("orphan")); | ||
|
|
@@ -452,12 +456,60 @@ mod tests { | |
| .returning(move |_| Ok(index_metadata_response.clone())); | ||
|
|
||
| let mut planner = CompactionPlanner::new(MetastoreServiceClient::from_mock(mock)); | ||
| planner.cursor = 0; | ||
| planner.scan_and_plan().await.unwrap(); | ||
|
|
||
| assert!(planner.state.is_split_tracked("s1")); | ||
| assert!(planner.state.is_split_tracked("s2")); | ||
| assert_eq!(planner.cursor, 6000); | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn test_failed_task_is_retracked_on_next_scan() { | ||
| // After a worker reports failure (or times out), planner-local | ||
| // tracking is cleared. Because there is no cursor, the next scan | ||
| // rediscovers the still-Published, still-immature splits and | ||
| // re-tracks them. | ||
| let index_metadata = test_index_metadata_with_merge_factor_2(); | ||
| let index_metadata_response = test_index_metadata_response(&index_metadata); | ||
| let index_uid = index_metadata.index_uid.clone(); | ||
|
|
||
| let splits = vec![ | ||
| test_split("s1", &index_uid, 1000), | ||
| test_split("s2", &index_uid, 2000), | ||
| ]; | ||
| let splits_clone = splits.clone(); | ||
|
|
||
| let mut mock = MockMetastoreService::new(); | ||
| mock.expect_list_splits().returning(move |_| { | ||
| let response = ListSplitsResponse::try_from_splits(splits_clone.clone()).unwrap(); | ||
| Ok(ServiceStream::from(vec![Ok(response)])) | ||
| }); | ||
| mock.expect_index_metadata() | ||
| .returning(move |_| Ok(index_metadata_response.clone())); | ||
|
|
||
| let mut planner = CompactionPlanner::new(MetastoreServiceClient::from_mock(mock)); | ||
| let node_id = NodeId::from("worker-1"); | ||
|
|
||
| planner.scan_and_plan().await.unwrap(); | ||
| let assignments = planner.assign_tasks(&node_id, 10); | ||
| assert_eq!(assignments.len(), 1); | ||
| let task_id = assignments[0].task_id.clone(); | ||
| assert!(planner.state.is_split_tracked("s1")); | ||
| assert!(planner.state.is_split_tracked("s2")); | ||
|
|
||
| // Worker reports failure; planner forgets the splits. | ||
| planner | ||
| .state | ||
| .process_failures(&[quickwit_proto::compaction::CompactionFailure { | ||
| task_id, | ||
| error_message: "boom".to_string(), | ||
| }]); | ||
| assert!(!planner.state.is_split_tracked("s1")); | ||
| assert!(!planner.state.is_split_tracked("s2")); | ||
|
|
||
| // Next scan rediscovers them and re-tracks them. | ||
| planner.scan_and_plan().await.unwrap(); | ||
| assert!(planner.state.is_split_tracked("s1")); | ||
| assert!(planner.state.is_split_tracked("s2")); | ||
| } | ||
|
|
||
| /// Helper: creates a planner with merge_factor=2, ingests the given splits, | ||
|
|
@@ -477,7 +529,6 @@ mod tests { | |
| }); | ||
|
|
||
| let mut planner = CompactionPlanner::new(MetastoreServiceClient::from_mock(mock)); | ||
| planner.cursor = 0; | ||
|
|
||
| let splits: Vec<Split> = split_ids | ||
| .iter() | ||
|
|
||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.