Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 1 addition & 26 deletions quickwit/quickwit-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ mod tests {
};
use quickwit_cli::split::{DescribeSplitArgs, SplitCliCommand};
use quickwit_cli::tool::{
ExtractSplitArgs, GarbageCollectIndexArgs, LocalIngestDocsArgs, LocalSearchArgs, MergeArgs,
ExtractSplitArgs, GarbageCollectIndexArgs, LocalIngestDocsArgs, LocalSearchArgs,
ToolCliCommand,
};
use quickwit_common::uri::Uri;
Expand Down Expand Up @@ -740,31 +740,6 @@ mod tests {
Ok(())
}

#[test]
fn test_parse_merge_args() -> anyhow::Result<()> {
let app = build_cli().no_binary_name(true);
let matches = app.try_get_matches_from([
"tool",
"merge",
"--index",
"wikipedia",
"--source",
"ingest-source",
"--config",
"/config.yaml",
])?;
let command = CliCommand::parse_cli_args(matches)?;
assert!(matches!(
command,
CliCommand::Tool(ToolCliCommand::Merge(MergeArgs {
index_id,
source_id,
..
})) if &index_id == "wikipedia" && source_id == "ingest-source"
));
Ok(())
}

#[test]
fn test_parse_no_color() {
// SAFETY: this test may not be entirely sound if not run with nextest or --test-threads=1
Expand Down
140 changes: 6 additions & 134 deletions quickwit/quickwit-cli/src/tool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use anyhow::{Context, bail};
use clap::{ArgMatches, Command, arg};
use colored::{ColoredString, Colorize};
use humantime::format_duration;
use quickwit_actors::{ActorExitStatus, ActorHandle, Mailbox, Universe};
use quickwit_actors::{ActorHandle, Universe};
use quickwit_cluster::{
ChannelTransport, Cluster, ClusterMember, FailureDetectorConfig, make_client_grpc_config,
};
Expand All @@ -34,28 +34,26 @@ use quickwit_common::uri::Uri;
use quickwit_config::service::QuickwitService;
use quickwit_config::{
CLI_SOURCE_ID, IndexerConfig, NodeConfig, SourceConfig, SourceInputFormat, SourceParams,
TransformConfig, VecSourceParams,
TransformConfig,
};
use quickwit_index_management::{IndexService, clear_cache_directory};
use quickwit_indexing::IndexingPipeline;
use quickwit_indexing::actors::{IndexingService, MergePipeline, MergeSchedulerService};
use quickwit_indexing::models::{
DetachIndexingPipeline, DetachMergePipeline, IndexingStatistics, SpawnPipeline,
};
use quickwit_indexing::actors::IndexingService;
use quickwit_indexing::models::{DetachIndexingPipeline, IndexingStatistics, SpawnPipeline};
use quickwit_ingest::IngesterPool;
use quickwit_metastore::IndexMetadataResponseExt;
use quickwit_proto::indexing::CpuCapacity;
use quickwit_proto::ingest::ingester::IngesterStatus;
use quickwit_proto::metastore::{IndexMetadataRequest, MetastoreService, MetastoreServiceClient};
use quickwit_proto::search::{CountHits, SearchResponse};
use quickwit_proto::types::{IndexId, PipelineUid, SourceId, SplitId};
use quickwit_proto::types::{IndexId, PipelineUid, SplitId};
use quickwit_search::{SearchResponseRest, single_node_search};
use quickwit_serve::{
BodyFormat, SearchRequestQueryString, SortBy, search_request_from_api_request,
};
use quickwit_storage::{BundleStorage, Storage};
use thousands::Separable;
use tracing::{debug, info};
use tracing::debug;

use crate::checklist::{GREEN_COLOR, RED_COLOR};
use crate::{
Expand Down Expand Up @@ -201,13 +199,6 @@ pub struct GarbageCollectIndexArgs {
pub dry_run: bool,
}

#[derive(Debug, Eq, PartialEq)]
pub struct MergeArgs {
pub config_uri: Uri,
pub index_id: IndexId,
pub source_id: SourceId,
}

#[derive(Debug, Eq, PartialEq)]
pub struct ExtractSplitArgs {
pub config_uri: Uri,
Expand All @@ -221,7 +212,6 @@ pub enum ToolCliCommand {
GarbageCollect(GarbageCollectIndexArgs),
LocalIngest(LocalIngestDocsArgs),
LocalSearch(LocalSearchArgs),
Merge(MergeArgs),
ExtractSplit(ExtractSplitArgs),
}

Expand All @@ -234,7 +224,6 @@ impl ToolCliCommand {
"gc" => Self::parse_garbage_collect_args(submatches),
"local-ingest" => Self::parse_local_ingest_args(submatches),
"local-search" => Self::parse_local_search_args(submatches),
"merge" => Self::parse_merge_args(submatches),
"extract-split" => Self::parse_extract_split_args(submatches),
_ => bail!("unknown tool subcommand `{subcommand}`"),
}
Expand Down Expand Up @@ -324,24 +313,6 @@ impl ToolCliCommand {
}))
}

fn parse_merge_args(mut matches: ArgMatches) -> anyhow::Result<Self> {
let config_uri = matches
.remove_one::<String>("config")
.map(|uri_str| Uri::from_str(&uri_str))
.expect("`config` should be a required arg.")?;
let index_id = matches
.remove_one::<String>("index")
.expect("'index-id' should be a required arg.");
let source_id = matches
.remove_one::<String>("source")
.expect("'source-id' should be a required arg.");
Ok(Self::Merge(MergeArgs {
index_id,
source_id,
config_uri,
}))
}

fn parse_garbage_collect_args(mut matches: ArgMatches) -> anyhow::Result<Self> {
let config_uri = matches
.get_one("config")
Expand Down Expand Up @@ -391,7 +362,6 @@ impl ToolCliCommand {
Self::GarbageCollect(args) => garbage_collect_index_cli(args).await,
Self::LocalIngest(args) => local_ingest_docs_cli(args).await,
Self::LocalSearch(args) => local_search_cli(args).await,
Self::Merge(args) => merge_cli(args).await,
Self::ExtractSplit(args) => extract_split_cli(args).await,
}
}
Expand Down Expand Up @@ -447,7 +417,6 @@ pub async fn local_ingest_docs_cli(args: LocalIngestDocsArgs) -> anyhow::Result<
&HashSet::from_iter([QuickwitService::Indexer]),
)?;
let universe = Universe::new();
let merge_scheduler_service_mailbox = universe.get_or_spawn_one();
let indexing_server = IndexingService::new(
config.node_id.clone(),
config.data_dir_path.clone(),
Expand All @@ -456,7 +425,6 @@ pub async fn local_ingest_docs_cli(args: LocalIngestDocsArgs) -> anyhow::Result<
cluster,
metastore,
None,
Some(merge_scheduler_service_mailbox),
IngesterPool::default(),
storage_resolver,
EventBroker::default(),
Expand All @@ -471,11 +439,6 @@ pub async fn local_ingest_docs_cli(args: LocalIngestDocsArgs) -> anyhow::Result<
pipeline_uid: PipelineUid::random(),
})
.await?;
let merge_pipeline_handle = indexing_server_mailbox
.ask_for_res(DetachMergePipeline {
pipeline_id: pipeline_id.merge_pipeline_id(),
})
.await?;
let indexing_pipeline_handle = indexing_server_mailbox
.ask_for_res(DetachIndexingPipeline { pipeline_id })
.await?;
Expand All @@ -493,11 +456,6 @@ pub async fn local_ingest_docs_cli(args: LocalIngestDocsArgs) -> anyhow::Result<
let statistics =
start_statistics_reporting_loop(indexing_pipeline_handle, args.input_path_opt.is_none())
.await?;
merge_pipeline_handle
.mailbox()
.ask(quickwit_indexing::FinishPendingMergesAndShutdownPipeline)
.await?;
merge_pipeline_handle.join().await;
// Shutdown the indexing server.
universe
.send_exit_with_success(&indexing_server_mailbox)
Expand Down Expand Up @@ -565,92 +523,6 @@ pub async fn local_search_cli(args: LocalSearchArgs) -> anyhow::Result<()> {
Ok(())
}

pub async fn merge_cli(args: MergeArgs) -> anyhow::Result<()> {
debug!(args=?args, "run-merge-operations");
println!("❯ Merging splits locally...");
let config = load_node_config(&args.config_uri).await?;
let (storage_resolver, metastore_resolver) =
get_resolvers(&config.storage_configs, &config.metastore_configs);
let mut metastore = metastore_resolver.resolve(&config.metastore_uri).await?;
run_index_checklist(&mut metastore, &storage_resolver, &args.index_id, None).await?;
// The indexing service needs to update its cluster chitchat state so that the control plane is
// aware of the running tasks. We thus create a fake cluster to instantiate the indexing service
// and avoid impacting potential control plane running on the cluster.
let cluster = create_empty_cluster(&config).await?;
let runtimes_config = RuntimesConfig::default();
start_actor_runtimes(
runtimes_config,
&HashSet::from_iter([QuickwitService::Indexer]),
)?;
let indexer_config = IndexerConfig::default();
let universe = Universe::new();
let merge_scheduler_service: Mailbox<MergeSchedulerService> = universe.get_or_spawn_one();
let indexing_server = IndexingService::new(
config.node_id,
config.data_dir_path,
indexer_config,
runtimes_config.num_threads_blocking,
cluster,
metastore,
None,
Some(merge_scheduler_service),
IngesterPool::default(),
storage_resolver,
EventBroker::default(),
)
.await?;
let (indexing_service_mailbox, indexing_service_handle) =
universe.spawn_builder().spawn(indexing_server);
let pipeline_id = indexing_service_mailbox
.ask_for_res(SpawnPipeline {
index_id: args.index_id,
source_config: SourceConfig {
source_id: args.source_id,
num_pipelines: NonZeroUsize::MIN,
enabled: true,
source_params: SourceParams::Vec(VecSourceParams::default()),
transform_config: None,
input_format: SourceInputFormat::Json,
},
pipeline_uid: PipelineUid::random(),
})
.await?;
let pipeline_handle: ActorHandle<MergePipeline> = indexing_service_mailbox
.ask_for_res(DetachMergePipeline {
pipeline_id: pipeline_id.merge_pipeline_id(),
})
.await?;

let mut check_interval = tokio::time::interval(Duration::from_secs(1));
loop {
check_interval.tick().await;

pipeline_handle.refresh_observe();
let observation = pipeline_handle.last_observation();

if observation.num_ongoing_merges == 0 {
info!("merge pipeline has no more ongoing merges, exiting");
break;
}

if pipeline_handle.state().is_exit() {
info!("merge pipeline has exited, exiting");
break;
}
}

let (pipeline_exit_status, _pipeline_statistics) = pipeline_handle.quit().await;
indexing_service_handle.quit().await;
if !matches!(
pipeline_exit_status,
ActorExitStatus::Success | ActorExitStatus::Quit
) {
bail!(pipeline_exit_status);
}
println!("{} Merge successful.", "✔".color(GREEN_COLOR));
Ok(())
}

pub async fn garbage_collect_index_cli(args: GarbageCollectIndexArgs) -> anyhow::Result<()> {
debug!(args=?args, "garbage-collect-index");
println!("❯ Garbage collecting index...");
Expand Down
43 changes: 33 additions & 10 deletions quickwit/quickwit-compaction/src/compaction_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@
// limitations under the License.

use std::sync::Arc;
use std::time::Instant;

use quickwit_actors::{ActorHandle, Health, SpawnContext, Supervisable};
use quickwit_actors::{ActorHandle, HEARTBEAT, Health, SpawnContext, Supervisable};
use quickwit_common::KillSwitch;
use quickwit_common::io::{IoControls, Limiter};
use quickwit_common::pubsub::EventBroker;
Expand All @@ -31,6 +32,8 @@ use quickwit_proto::metastore::MetastoreServiceClient;
use quickwit_proto::types::{IndexUid, SourceId, SplitId};
use tracing::{debug, error, info};

use crate::TaskId;

#[derive(Clone, Debug, PartialEq)]
pub enum PipelineStatus {
InProgress,
Expand All @@ -39,11 +42,10 @@ pub enum PipelineStatus {
}

pub struct PipelineStatusUpdate {
pub task_id: String,
pub task_id: TaskId,
pub index_uid: IndexUid,
pub source_id: SourceId,
pub split_ids: Vec<SplitId>,
pub merged_split_id: SplitId,
pub status: PipelineStatus,
}

Expand All @@ -53,6 +55,22 @@ struct CompactionPipelineHandles {
merge_packager: ActorHandle<Packager>,
merge_uploader: ActorHandle<Uploader>,
merge_publisher: ActorHandle<Publisher>,
next_check_for_progress: Instant,
}

impl CompactionPipelineHandles {
/// Returns true once per HEARTBEAT interval. Between heartbeats we only
/// check whether actors are alive, not whether they are making progress.
/// This gives CPU-intensive actors (like the packager) up to 30s to
/// complete before being declared unhealthy.
fn should_check_for_progress(&mut self) -> bool {
let now = Instant::now();
let check_for_progress = now > self.next_check_for_progress;
if check_for_progress {
self.next_check_for_progress = now + *HEARTBEAT;
}
check_for_progress
}
}

/// A single-use compaction pipeline. Processes one merge task and terminates.
Expand All @@ -61,7 +79,7 @@ struct CompactionPipelineHandles {
/// `check_actor_health()` to collect status updates. The pipeline manages
/// its own retry logic internally.
pub struct CompactionPipeline {
task_id: String,
task_id: TaskId,
merge_operation: MergeOperation,
pipeline_id: MergePipelineId,
status: PipelineStatus,
Expand All @@ -81,7 +99,7 @@ pub struct CompactionPipeline {
impl CompactionPipeline {
#[allow(clippy::too_many_arguments)]
pub fn new(
task_id: String,
task_id: TaskId,
scratch_directory: TempDirectory,
merge_operation: MergeOperation,
pipeline_id: MergePipelineId,
Expand Down Expand Up @@ -149,16 +167,21 @@ impl CompactionPipeline {
) {
return;
}
// Pipeline is not initialized yet.
if self.handles.is_none() {
let Some(handles) = self.handles.as_mut() else {
return;
}
};

// We check whether actors are alive on every tick (1s), but only
// check whether they are making progress once per HEARTBEAT (30s).
// This gives CPU-intensive actors like the packager time to finish
// without being falsely declared unhealthy.
let check_for_progress = handles.should_check_for_progress();

let mut has_healthy = false;
let mut failure_actor_names: Vec<String> = Vec::new();

for supervisable in self.supervisables() {
match supervisable.check_health(true) {
match supervisable.check_health(check_for_progress) {
Health::Healthy => {
has_healthy = true;
}
Expand Down Expand Up @@ -192,7 +215,6 @@ impl CompactionPipeline {
.iter()
.map(|split| split.split_id().to_string())
.collect(),
merged_split_id: self.merge_operation.merge_split_id.clone(),
status: self.status.clone(),
}
}
Expand Down Expand Up @@ -286,6 +308,7 @@ impl CompactionPipeline {
merge_packager: merge_packager_handle,
merge_uploader: merge_uploader_handle,
merge_publisher: merge_publisher_handle,
next_check_for_progress: Instant::now() + *HEARTBEAT,
});

Ok(())
Expand Down
Loading
Loading