Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions src/compute-types/src/explain/text.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ impl Plan {
key_val_plan,
plan,
mfp_after,
input_strategy: _,
} => {
ctx.indent.set();
if !mfp_after.expressions.is_empty() || !mfp_after.predicates.is_empty() {
Expand Down Expand Up @@ -374,7 +375,11 @@ impl Plan {

ctx.indent.reset();
}
TopK { input, top_k_plan } => {
TopK {
input,
top_k_plan,
input_strategy: _,
} => {
use crate::plan::top_k::TopKPlan;
match top_k_plan {
TopKPlan::MonotonicTop1(plan) => {
Expand Down Expand Up @@ -469,6 +474,7 @@ impl Plan {
Union {
inputs,
consolidate_output,
input_strategies: _,
} => {
write!(f, "{}→", ctx.indent)?;
if *consolidate_output {
Expand Down Expand Up @@ -739,6 +745,7 @@ impl Plan {
key_val_plan,
plan,
mfp_after,
input_strategy: _,
} => {
use crate::plan::reduce::ReducePlan;
match plan {
Expand Down Expand Up @@ -790,7 +797,11 @@ impl Plan {
input.fmt_text(f, ctx)
})?;
}
TopK { input, top_k_plan } => {
TopK {
input,
top_k_plan,
input_strategy: _,
} => {
use crate::plan::top_k::TopKPlan;
match top_k_plan {
TopKPlan::MonotonicTop1(plan) => {
Expand Down Expand Up @@ -876,6 +887,7 @@ impl Plan {
Union {
inputs,
consolidate_output,
input_strategies: _,
} => {
if *consolidate_output {
writeln!(
Expand Down
30 changes: 24 additions & 6 deletions src/compute-types/src/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,13 @@ impl AvailableCollections {
Serialize
)]
pub enum ArrangementStrategy {
/// Form arrangements directly from the input collection.
/// Form arrangements directly from the input collection. For `Union`, feed the input
/// straight into the concatenation without inserting temporal bucketing.
Direct,
/// Insert temporal bucketing in front of the arrangement, to delay future-stamped
/// updates (e.g., from `mz_now()` MFPs) until their bucket boundary releases them.
/// Honoured only when `ENABLE_COMPUTE_TEMPORAL_BUCKETING` is set; otherwise behaves like
/// `Direct`.
/// Insert temporal bucketing in front of the arrangement (for `ArrangeBy`) or the
/// downstream consolidate (for `Union`), to delay future-stamped updates (e.g., from
/// `mz_now()` MFPs) until their bucket boundary releases them. Honoured only when
/// `ENABLE_COMPUTE_TEMPORAL_BUCKETING` is set; otherwise behaves like `Direct`.
TemporalBucketing,
}

Expand Down Expand Up @@ -308,6 +309,11 @@ pub enum PlanNode {
/// predicates so that it can be readily evaluated.
/// TODO(ggevay): should we wrap this in [`mz_expr::SafeMfpPlan`]?
mfp_after: MapFilterProject,
/// Rendering strategy for the input collection. Consulted by the renderer when the
/// reduce performs a pre-aggregation consolidation (currently: monotonic hierarchical
/// reductions with `must_consolidate` set), to decide whether to insert temporal
/// bucketing before that consolidation. Ignored otherwise.
input_strategy: ArrangementStrategy,
},
/// Key-based "Top K" operator, retaining the first K records in each group.
TopK {
Expand All @@ -319,6 +325,11 @@ pub enum PlanNode {
/// on the properties of the reduction, and the input itself. Please check
/// out the documentation for this type for more detail.
top_k_plan: TopKPlan,
/// Rendering strategy for the input collection. Consulted by the renderer when the
/// top-k performs a pre-aggregation consolidation (currently: monotonic top-k variants
/// with `must_consolidate` set), to decide whether to insert temporal bucketing before
/// that consolidation. Ignored otherwise.
input_strategy: ArrangementStrategy,
},
/// Inverts the sign of each update.
Negate {
Expand Down Expand Up @@ -350,6 +361,10 @@ pub enum PlanNode {
inputs: Vec<Plan>,
/// Whether to consolidate the output, e.g., cancel negated records.
consolidate_output: bool,
/// Per-input rendering strategy, aligned with `inputs`. Consulted by the renderer
/// when `consolidate_output` is set, to decide whether to insert temporal bucketing
/// on that input before the downstream consolidation. Ignored otherwise.
input_strategies: Vec<ArrangementStrategy>,
},
/// The `input` plan, but with additional arrangements.
///
Expand Down Expand Up @@ -649,7 +664,7 @@ impl Plan {
PlanNode::Union {
inputs,
consolidate_output,
..
input_strategies: _,
} => {
if inputs
.iter()
Expand Down Expand Up @@ -775,6 +790,7 @@ impl CollectionPlan for PlanNode {
| PlanNode::Union {
inputs,
consolidate_output: _,
input_strategies: _,
} => {
for input in inputs {
input.depends_on_into(out);
Expand Down Expand Up @@ -805,10 +821,12 @@ impl CollectionPlan for PlanNode {
key_val_plan: _,
plan: _,
mfp_after: _,
input_strategy: _,
}
| PlanNode::TopK {
input,
top_k_plan: _,
input_strategy: _,
}
| PlanNode::Negate { input }
| PlanNode::Threshold {
Expand Down
16 changes: 14 additions & 2 deletions src/compute-types/src/plan/interpret/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,7 @@ where
key_val_plan,
plan,
mfp_after,
input_strategy: _,
} => {
// Descend recursively into all children.
let input = self.apply_rec(input, rg)?;
Expand All @@ -406,7 +407,11 @@ where
mfp_after,
))
}
TopK { input, top_k_plan } => {
TopK {
input,
top_k_plan,
input_strategy: _,
} => {
// Descend recursively into all children.
let input = self.apply_rec(input, rg)?;
// Interpret the current node.
Expand All @@ -430,6 +435,7 @@ where
Union {
inputs,
consolidate_output,
input_strategies: _,
} => {
// Descend recursively into all children.
let inputs = inputs
Expand Down Expand Up @@ -676,6 +682,7 @@ where
key_val_plan,
plan,
mfp_after,
input_strategy: _,
} => {
// Descend recursively into all children.
let input = self.apply_rec(input, rg)?;
Expand All @@ -693,7 +700,11 @@ where
// Pass the interpretation result up.
Ok(result)
}
TopK { input, top_k_plan } => {
TopK {
input,
top_k_plan,
input_strategy: _,
} => {
// Descend recursively into all children.
let input = self.apply_rec(input, rg)?;
// Interpret the current node.
Expand Down Expand Up @@ -731,6 +742,7 @@ where
Union {
inputs,
consolidate_output,
input_strategies: _,
} => {
// Descend recursively into all children.
let inputs: Vec<_> = inputs
Expand Down
7 changes: 7 additions & 0 deletions src/compute-types/src/plan/lowering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -863,6 +863,7 @@ This is not expected to cause incorrect results, but could indicate a performanc
plan: PlanNode::TopK {
input: Box::new(input),
top_k_plan,
input_strategy: strategy_from_future(input_future),
}
.as_plan(lir_id),
keys: AvailableCollections::new_raw(),
Expand Down Expand Up @@ -946,6 +947,10 @@ This is not expected to cause incorrect results, but could indicate a performanc
lowered_inputs.push(self.lower_mir_expr(input)?);
}
let any_future = lowered_inputs.iter().any(|l| l.has_future_updates);
let input_strategies: Vec<ArrangementStrategy> = lowered_inputs
.iter()
.map(|l| strategy_from_future(l.has_future_updates))
.collect();
let plans = lowered_inputs
.into_iter()
.map(
Expand Down Expand Up @@ -976,6 +981,7 @@ This is not expected to cause incorrect results, but could indicate a performanc
plan: PlanNode::Union {
inputs: plans,
consolidate_output: false,
input_strategies,
}
.as_plan(lir_id),
keys: AvailableCollections::new_raw(),
Expand Down Expand Up @@ -1213,6 +1219,7 @@ This is not expected to cause incorrect results, but could indicate a performanc
key_val_plan,
plan: reduce_plan,
mfp_after,
input_strategy: strategy_from_future(input_future),
}
.as_plan(lir_id),
keys: output_keys,
Expand Down
26 changes: 25 additions & 1 deletion src/compute-types/src/plan/render_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,10 @@ pub enum Expr {
/// the key for the reduction; otherwise, the results become undefined. Additionally, the
/// MFP must be free from temporal predicates so that it can be readily evaluated.
mfp_after: MapFilterProject,
/// Rendering strategy for the input collection. Consulted by the renderer when the
/// reduce performs a pre-aggregation consolidation, to decide whether to insert temporal
/// bucketing before that consolidation. Ignored otherwise.
input_strategy: ArrangementStrategy,
},
/// Key-based "Top K" operator, retaining the first K records in each group.
TopK {
Expand All @@ -226,6 +230,10 @@ pub enum Expr {
/// the Top-K, and the input itself. Please check out the documentation for this type for
/// more detail.
top_k_plan: TopKPlan,
/// Rendering strategy for the input collection. Consulted by the renderer when the
/// top-k performs a pre-aggregation consolidation, to decide whether to insert temporal
/// bucketing before that consolidation. Ignored otherwise.
input_strategy: ArrangementStrategy,
},
/// Inverts the sign of each update.
Negate {
Expand Down Expand Up @@ -256,6 +264,10 @@ pub enum Expr {
inputs: Vec<LirId>,
/// Whether to consolidate the output, e.g., cancel negated records.
consolidate_output: bool,
/// Per-input rendering strategy, aligned with `inputs`. Consulted by the renderer
/// when `consolidate_output` is set, to decide whether to insert temporal bucketing
/// on that input before the downstream consolidation. Ignored otherwise.
input_strategies: Vec<ArrangementStrategy>,
},
/// The `input` plan, but with additional arrangements.
///
Expand Down Expand Up @@ -438,22 +450,29 @@ impl TryFrom<Plan> for LetFreePlan {
key_val_plan,
plan,
mfp_after,
input_strategy,
} => {
let expr = Reduce {
input_key,
input: input.lir_id,
key_val_plan,
plan,
mfp_after,
input_strategy,
};
insert_node(lir_id, parent, expr, nesting);

todo.push((*input, Some(lir_id), nesting.saturating_add(1)));
}
PlanNode::TopK { input, top_k_plan } => {
PlanNode::TopK {
input,
top_k_plan,
input_strategy,
} => {
let expr = TopK {
input: input.lir_id,
top_k_plan,
input_strategy,
};
insert_node(lir_id, parent, expr, nesting);

Expand Down Expand Up @@ -482,10 +501,12 @@ impl TryFrom<Plan> for LetFreePlan {
PlanNode::Union {
inputs,
consolidate_output,
input_strategies,
} => {
let expr = Union {
inputs: inputs.iter().map(|i| i.lir_id).collect(),
consolidate_output,
input_strategies,
};
insert_node(lir_id, parent, expr, nesting);

Expand Down Expand Up @@ -911,6 +932,7 @@ impl<'a> std::fmt::Display for RenderPlanExprHumanizer<'a> {
key_val_plan: _key_val_plan,
plan,
mfp_after: _mfp_after,
input_strategy: _,
} => match plan {
ReducePlan::Distinct => write!(f, "Distinct GroupAggregate"),
ReducePlan::Accumulable(..) => write!(f, "Accumulable GroupAggregate"),
Expand Down Expand Up @@ -939,6 +961,7 @@ impl<'a> std::fmt::Display for RenderPlanExprHumanizer<'a> {
TopK {
input: _,
top_k_plan,
input_strategy: _,
} => {
match top_k_plan {
TopKPlan::MonotonicTop1(..) => write!(f, "Monotonic Top1")?,
Expand All @@ -965,6 +988,7 @@ impl<'a> std::fmt::Display for RenderPlanExprHumanizer<'a> {
Union {
inputs: _,
consolidate_output,
input_strategies: _,
} => {
if *consolidate_output {
write!(f, "Consolidating ")?;
Expand Down
Loading
Loading