Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion quickwit/quickwit-ingest/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ bytesize = { workspace = true }
fail = { workspace = true, optional = true }
futures = { workspace = true }
http = { workspace = true }
itertools = { workspace = true }
mockall = { workspace = true, optional = true }
mrecordlog = { workspace = true }
once_cell = { workspace = true }
Expand All @@ -43,7 +44,6 @@ quickwit-doc-mapper = { workspace = true, features = ["testsuite"] }
quickwit-proto = { workspace = true }

[dev-dependencies]
itertools = { workspace = true }
mockall = { workspace = true }
rand = { workspace = true }
rand_distr = { workspace = true }
Expand Down
124 changes: 60 additions & 64 deletions quickwit/quickwit-ingest/src/ingest_v2/ingester.rs

Large diffs are not rendered by default.

88 changes: 25 additions & 63 deletions quickwit/quickwit-ingest/src/ingest_v2/publish_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,6 @@ impl PublishTracker {
}
}

pub fn register_requested_shards<'a>(
&'a self,
shard_ids: impl IntoIterator<Item = &'a ShardId>,
) {
let mut publish_states = self.state.lock().unwrap();
for shard_id in shard_ids {
publish_states.shard_tracked(shard_id.clone());
}
}

pub fn track_persisted_shard_position(&self, shard_id: ShardId, new_position: Position) {
let mut publish_states = self.state.lock().unwrap();
publish_states.position_persisted(&shard_id, &new_position)
Expand All @@ -91,13 +81,11 @@ impl PublishTracker {
}

enum PublishState {
/// The persist request for this shard has been sent
Tracked,
/// The persist request for this shard success response has been received
/// but the position has not yet been published
AwaitingPublish(Position),
/// The shard has been published up to this position (might happen before
/// the persist success is received)
/// The shard has been published up to this position (might happen before
/// the persist success is received via early publish event)
Published(Position),
}

Expand All @@ -108,68 +96,42 @@ struct ShardPublishStates {
}

impl ShardPublishStates {
fn shard_tracked(&mut self, shard_id: ShardId) {
self.states.entry(shard_id).or_insert(PublishState::Tracked);
}

fn position_published(
&mut self,
shard_id: &ShardId,
new_position: &Position,
publish_complete_notifier: &Notify,
) {
if let Some(publish_state) = self.states.get_mut(shard_id) {
match publish_state {
PublishState::AwaitingPublish(shard_position) if new_position >= shard_position => {
*publish_state = PublishState::Published(new_position.clone());
self.awaiting_count -= 1;
if self.awaiting_count == 0 {
// The notification is only relevant once
// `self.wait_publish_complete()` is called.
// Before that, `state.awaiting_publish` might
// still be re-populated.
publish_complete_notifier.notify_waiters();
}
}
PublishState::Published(current_position) if new_position > current_position => {
*current_position = new_position.clone();
}
PublishState::Tracked => {
*publish_state = PublishState::Published(new_position.clone());
}
PublishState::Published(_) => {
// looks like a duplicate or out-of-order event
}
PublishState::AwaitingPublish(_) => {
// the shard made some progress but we are waiting for more
let Some(publish_state) = self.states.get_mut(shard_id) else {
return;
};

match publish_state {
PublishState::AwaitingPublish(shard_position) if new_position >= shard_position => {
*publish_state = PublishState::Published(new_position.clone());
self.awaiting_count -= 1;
if self.awaiting_count == 0 {
publish_complete_notifier.notify_waiters();
}
}
PublishState::Published(current_position) if new_position > current_position => {
*current_position = new_position.clone();
}
PublishState::Published(_) | PublishState::AwaitingPublish(_) => {
// duplicate/out-of-order or not enough progress yet
}
}
// else: this shard is not being tracked here
}

fn position_persisted(&mut self, shard_id: &ShardId, new_position: &Position) {
if let Some(publish_state) = self.states.get_mut(shard_id) {
match publish_state {
PublishState::Published(current_position) if new_position <= current_position => {
// new position already published, no need to track it
}
PublishState::AwaitingPublish(old_position) => {
error!(
%old_position,
%new_position,
%shard_id,
"shard persisted positions should not be tracked multiple times"
);
}
PublishState::Tracked | PublishState::Published(_) => {
*publish_state = PublishState::AwaitingPublish(new_position.clone());
self.awaiting_count += 1;
}
}
} else {
error!(%shard_id, "requested shards should be registered before their position is tracked")
if self.states.contains_key(shard_id) {
error!(%shard_id, "shard persisted positions should not be tracked multiple times");
return;
}

self.states
.insert(shard_id.clone(), PublishState::AwaitingPublish(new_position.clone()));
self.awaiting_count += 1;
}
}

Expand Down
41 changes: 16 additions & 25 deletions quickwit/quickwit-ingest/src/ingest_v2/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,22 +291,22 @@ impl IngestRouter {

match persist_failure.reason() {
PersistFailureReason::ShardClosed => {
let shard_id = persist_failure.shard_id().clone();
let shard_ids = persist_failure.shard_ids.clone();
let index_uid: IndexUid = persist_failure.index_uid().clone();
let source_id: SourceId = persist_failure.source_id;
closed_shards
.entry((index_uid, source_id))
.or_default()
.push(shard_id);
.extend(shard_ids);
}
PersistFailureReason::ShardNotFound => {
let shard_id = persist_failure.shard_id().clone();
let shard_ids = persist_failure.shard_ids.clone();
let index_uid: IndexUid = persist_failure.index_uid().clone();
let source_id: SourceId = persist_failure.source_id;
deleted_shards
.entry((index_uid, source_id))
.or_default()
.push(shard_id);
.extend(shard_ids);
}
PersistFailureReason::WalFull
| PersistFailureReason::ShardRateLimited => {
Expand All @@ -315,8 +315,8 @@ impl IngestRouter {
//
// That way we will avoid to retry the persist request on the very
// same node.
let shard_id = persist_failure.shard_id().clone();
workbench.rate_limited_shards.insert(shard_id);
let shard_ids = persist_failure.shard_ids;
workbench.rate_limited_shards.extend(shard_ids);
}
_ => {}
}
Expand Down Expand Up @@ -377,32 +377,24 @@ impl IngestRouter {
let state_guard = self.state.lock().await;

for subrequest in pending_subrequests(&workbench.subworkbenches) {
let next_open_shard_res_opt = state_guard
let next_node_opt = state_guard
.routing_table
.find_entry(&subrequest.index_id, &subrequest.source_id)
.map(|entry| {
entry.next_open_shard_round_robin(&self.ingester_pool, rate_limited_shards)
});
let next_open_shard = match next_open_shard_res_opt {
Some(Ok(next_open_shard)) => next_open_shard,
Some(Err(NextOpenShardError::RateLimited)) => {
rate_limited_subrequest_ids.push(subrequest.subrequest_id);
continue;
}
Some(Err(NextOpenShardError::NoShardsAvailable)) | None => {
no_shards_available_subrequest_ids.push(subrequest.subrequest_id);
continue;
}
.map(|entry| entry.find_open_node(&self.ingester_pool))
.flatten();

let Some((node_id, index_uid, source_id)) = next_node_opt else {
rate_limited_subrequest_ids.push(subrequest.subrequest_id);
continue;
};
let persist_subrequest = PersistSubrequest {
subrequest_id: subrequest.subrequest_id,
index_uid: next_open_shard.index_uid.clone().into(),
source_id: next_open_shard.source_id.clone(),
shard_id: Some(next_open_shard.shard_id.clone()),
index_uid: Some(index_uid.clone()),
source_id: source_id.clone(),
doc_batch: subrequest.doc_batch.clone(),
};
per_leader_persist_subrequests
.entry(&next_open_shard.leader_id)
.entry(node_id)
.or_default()
.push(persist_subrequest);
}
Expand All @@ -427,7 +419,6 @@ impl IngestRouter {
subrequests,
commit_type: commit_type as i32,
};
workbench.record_persist_request(&persist_request);

let persist_future = async move {
let persist_result = tokio::time::timeout(
Expand Down
Loading
Loading