Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/compute-client/src/protocol/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,8 @@ pub struct Peek<T = mz_repr::Timestamp> {
/// `map_filter_project`.
pub result_desc: RelationDesc,
/// If `Some`, then look up only the given keys from the collection (instead of a full scan).
/// The vector is never empty.
/// The vector may be empty, indicating that no keys are targeted on this worker
/// (the worker should respond with empty results immediately).
pub literal_constraints: Option<Vec<Row>>,
/// The identifier of this peek request.
///
Expand Down
27 changes: 25 additions & 2 deletions src/compute/src/command_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@
use std::sync::mpsc::{self, TryRecvError};
use std::sync::{Arc, Mutex};

use differential_dataflow::Hashable;
use itertools::Itertools;
use mz_compute_client::protocol::command::ComputeCommand;
use mz_compute_client::protocol::command::{ComputeCommand, PeekTarget};
use mz_compute_types::dataflows::{BuildDesc, DataflowDescription};
use mz_ore::cast::CastFrom;
use mz_repr::Row;
use mz_timely_util::scope_label::ScopeExt;
use timely::communication::Allocate;
use timely::dataflow::channels::pact::Exchange;
Expand Down Expand Up @@ -197,7 +199,28 @@ fn split_command(
})
.map(Box::new)
.map(ComputeCommand::CreateDataflow);
Either::Left(commands)
Either::Left(Either::Left(commands))
}
ComputeCommand::Peek(mut peek)
if peek.literal_constraints.is_some()
&& matches!(peek.target, PeekTarget::Index { .. }) =>
{
// Index peeks with literal constraints can be split per worker: each key
// hashes to exactly one worker's arrangement shard, so we route each
// constraint to the owning worker and give other workers an empty list
// (they must still respond, but can short-circuit immediately).
let constraints = peek.literal_constraints.take().unwrap();
let mut per_worker: Vec<Vec<Row>> = vec![Vec::new(); parts];
for row in constraints {
let target = usize::cast_from(row.hashed()) % parts;
per_worker[target].push(row);
}
let commands = per_worker.into_iter().map(move |wc| {
let mut wp = (*peek).clone();
wp.literal_constraints = Some(wc);
ComputeCommand::Peek(Box::new(wp))
});
Either::Left(Either::Right(commands))
}
command => {
let commands = std::iter::repeat_n(command, parts);
Expand Down
15 changes: 15 additions & 0 deletions src/compute/src/compute_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,21 @@ impl<'a, A: Allocate + 'static> ActiveComputeState<'a, A> {

#[mz_ore::instrument(level = "debug")]
fn handle_peek(&mut self, peek: Peek) {
// Empty literal constraints means no keys hash to this worker.
// Respond immediately with empty results.
if peek
.literal_constraints
.as_ref()
.is_some_and(|c| c.is_empty())
{
self.send_compute_response(ComputeResponse::PeekResponse(
peek.uuid,
PeekResponse::Rows(RowCollection::default()),
OpenTelemetryContext::obtain(),
));
return;
}

let pending = match &peek.target {
PeekTarget::Index { id } => {
// Acquire a copy of the trace suitable for fulfilling the peek.
Expand Down