Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

195 changes: 148 additions & 47 deletions src/compute/src/render/top_k.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,31 @@
//!
//! Consult [TopKPlan] documentation for details.

use std::cell::RefCell;
use std::collections::HashMap;
use std::rc::Rc;

use differential_dataflow::hashable::Hashable;
use differential_dataflow::lattice::Lattice;
use differential_dataflow::operators::arrange::ArrangeBySelf;
use differential_dataflow::operators::arrange::{Arrange, Arranged, TraceAgent};
use differential_dataflow::operators::reduce::ReduceCore;
use differential_dataflow::operators::Consolidate;
use differential_dataflow::trace::implementations::ord::OrdValSpine;
use differential_dataflow::AsCollection;
use differential_dataflow::Collection;
use timely::dataflow::channels::pact::Pipeline;
use timely::dataflow::channels::pact::{Exchange, ParallelizationContract, Pipeline};
use timely::dataflow::operators::Operator;
use timely::dataflow::stream::Stream;
use timely::dataflow::Scope;

use mz_compute_client::plan::top_k::{
BasicTopKPlan, MonotonicTop1Plan, MonotonicTopKPlan, TopKPlan,
};
use mz_repr::{Diff, Row};
use mz_repr::{DatumVec, Diff, Row};

use crate::render::context::CollectionBundle;
use crate::render::context::Context;
use crate::typedefs::RowSpine;

// The implementation requires integer timestamps to be able to delay feedback for monotonic inputs.
impl<G> Context<G, Row>
Expand Down Expand Up @@ -303,7 +307,6 @@ where
// We can place our rows directly into the diff field, and only keep the relevant one
// corresponding to evaluating our aggregate, instead of having to do a hierarchical
// reduction.
use timely::dataflow::operators::Map;

let collection = collection.map({
let mut datum_vec = mz_repr::DatumVec::new();
Expand All @@ -320,40 +323,94 @@ where
}
});

// We arrange the inputs ourself to force it into a leaner structure because we know we
// Firstly, we render a pipelined step for pre-aggregation per worker, mapping rows to diffs
let arranged =
render_top1_preaggregation(collection.inner, order_key.clone(), Pipeline);

// Secondly, we map rows back from the diff field in preparation for another step
// Note that we do not use explode here since it's unclear whether we'd like to implement Multiply in the Top1Monoid
let mut buffer = Default::default();
let preaggregated = arranged.as_collection(|row, ()| row.clone()).inner.unary(
Pipeline,
"Top1MonotonicMapToRow",
move |_cap, _op| {
move |input, output| {
input.for_each(|time, data| {
data.swap(&mut buffer);
output.session(&time).give_iterator(
buffer.drain(..).map(|x| ((x.0, x.2.into_row()), x.1, 1)),
);
})
}
},
);

// Thirdly, we render an exchange step to correctly shuffle among workers
let arranged = render_top1_preaggregation(
preaggregated,
order_key,
Exchange::new(move |((group_key, _), _, _): &((Row, _), _, _)| group_key.hashed()),
);

// Lastly, we compute the result by a final reduction
let result = arranged.reduce_abelian::<_, OrdValSpine<_, _, _, _>>("Top1Monotonic", {
move |_key, input, output| {
let accum = &input[0].1;
output.push((accum.row.clone(), 1));
}
});
// TODO(#7331): Here we discard the arranged output.
result.as_collection(|_k, v| v.clone())
}

fn render_top1_preaggregation<G, P>(
input: Stream<G, ((Row, Row), G::Timestamp, Diff)>,
order_key: Vec<mz_expr::ColumnOrder>,
pact: P,
) -> Arranged<G, TraceAgent<RowSpine<Row, (), G::Timestamp, monoids::Top1Monoid>>>
where
G: Scope,
G::Timestamp: Lattice,
P: ParallelizationContract<G::Timestamp, ((Row, Row), G::Timestamp, Diff)>,
{
// We allocate a single reference-counted object to represent the desired ordering
let shared = Rc::new(RefCell::new(monoids::Top1MonoidShared {
order_key,
left: DatumVec::new(),
right: DatumVec::new(),
}));

// We arrange the input ourselves to force it into a leaner structure because we know we
// won't care about values.
//
// TODO: Could we use explode here? We'd lose the diff>0 assert and we'd have to impl Mul
// for the monoid, unclear if it's worth it.
let partial: Collection<G, Row, monoids::Top1Monoid> = collection
// TODO(#16549): Use explicit arrangement
.consolidate()
.inner
.map(move |((group_key, row), time, diff)| {
assert!(diff > 0);
// NB: Top1 can throw out the diff since we've asserted that it's > 0. A more
// general TopK monoid would have to account for diff.
(
group_key,
time,
monoids::Top1Monoid {
row,
order_key: order_key.clone(),
},
)
let mut buffer = Default::default();
let prepared: Collection<G, (Row, ()), monoids::Top1Monoid> = input
.unary(pact, "Top1MonotonicPrepare", move |_cap, _op| {
move |input, output| {
while let Some((cap, data)) = input.next() {
data.swap(&mut buffer);
output.session(&cap).give_iterator(buffer.drain(..).map(
|((group_key, row), time, diff)| {
assert!(diff > 0);
// NB: Top1 can throw out the diff since we've asserted that it's > 0. A more
// general TopK monoid would have to account for diff.
(
(group_key, ()),
time,
monoids::Top1Monoid {
row,
shared: Rc::clone(&shared),
},
)
},
))
}
}
})
.as_collection();
let result = partial
// TODO(#16549): Use explicit arrangement
.arrange_by_self()
.reduce_abelian::<_, OrdValSpine<_, _, _, _>>("Top1Monotonic", {
move |_key, input, output| {
let accum = &input[0].1;
output.push((accum.row.clone(), 1));
}
});
// TODO(#7331): Here we discard the arranged output.
result.as_collection(|_k, v| v.clone())
prepared.arrange_core::<_, RowSpine<_, _, _, _>>(Pipeline, "Top1MonotonicArrange")
}

fn render_intra_ts_thinning<G>(
Expand All @@ -367,6 +424,11 @@ where
{
let mut aggregates = HashMap::new();
let mut vector = Vec::new();
let shared = Rc::new(RefCell::new(monoids::Top1MonoidShared {
order_key,
left: DatumVec::new(),
right: DatumVec::new(),
}));
collection
.inner
.unary_notify(
Expand All @@ -382,7 +444,7 @@ where
for ((grp_row, row), record_time, diff) in vector.drain(..) {
let monoid = monoids::Top1Monoid {
row,
order_key: order_key.clone(),
shared: Rc::clone(&shared),
};
let topk = agg_time.entry((grp_row, record_time)).or_insert_with(
move || {
Expand All @@ -401,7 +463,11 @@ where
let mut session = output.session(&time);
for ((grp_row, record_time), topk) in aggs {
session.give_iterator(topk.into_iter().map(|(monoid, diff)| {
((grp_row.clone(), monoid.row), record_time.clone(), diff)
(
(grp_row.clone(), monoid.into_row()),
record_time.clone(),
diff,
)
}))
}
}
Expand Down Expand Up @@ -508,32 +574,67 @@ pub mod topk_agg {

/// Monoids for in-place compaction of monotonic streams.
pub mod monoids {
use std::cell::RefCell;
use std::cmp::Ordering;
use std::hash::{Hash, Hasher};
use std::rc::Rc;

use differential_dataflow::difference::Semigroup;
use serde::{Deserialize, Serialize};

use mz_expr::ColumnOrder;
use mz_repr::Row;
use mz_repr::{DatumVec, Row};

/// A shared portion of a thread-local top-1 monoid implementation.
#[derive(Debug)]
pub struct Top1MonoidShared {
pub order_key: Vec<ColumnOrder>,
pub left: DatumVec,
pub right: DatumVec,
}

/// A monoid containing a row and an ordering.
#[derive(Eq, PartialEq, Debug, Clone, Serialize, Deserialize, Hash)]
/// A monoid containing a row and a shared pointer to a shared structure.
/// Only suitable for thread-local aggregations.
#[derive(Debug, Clone)]
pub struct Top1Monoid {
pub row: Row,
pub order_key: Vec<ColumnOrder>,
pub shared: Rc<RefCell<Top1MonoidShared>>,
}

impl Top1Monoid {
pub fn into_row(self) -> Row {
self.row
}
}

impl PartialEq for Top1Monoid {
fn eq(&self, other: &Self) -> bool {
self.row.eq(&other.row)
}
}

impl Eq for Top1Monoid {}

impl Hash for Top1Monoid {
fn hash<H: Hasher>(&self, state: &mut H) {
self.row.hash(state);
}
}

impl Ord for Top1Monoid {
fn cmp(&self, other: &Self) -> Ordering {
debug_assert_eq!(self.order_key, other.order_key);

// It might be nice to cache this row decoding like the non-monotonic codepath, but we'd
// have to store the decoded Datums in the same struct as the Row, which gets tricky.
let left: Vec<_> = self.row.unpack();
let right: Vec<_> = other.row.unpack();
mz_expr::compare_columns(&self.order_key, &left, &right, || left.cmp(&right))
debug_assert!(Rc::ptr_eq(&self.shared, &other.shared));
let Top1MonoidShared {
left,
right,
order_key,
} = &mut *self.shared.borrow_mut();

let left = left.borrow_with(&self.row);
let right = right.borrow_with(&other.row);
mz_expr::compare_columns(order_key, &left, &right, || left.cmp(&right))
}
}

impl PartialOrd for Top1Monoid {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
Expand All @@ -545,7 +646,7 @@ pub mod monoids {
let cmp = (*self).cmp(rhs);
// NB: Reminder that TopK returns the _minimum_ K items.
if cmp == Ordering::Greater {
self.clone_from(rhs);
self.row.clone_from(&rhs.row);
}
}

Expand Down
4 changes: 4 additions & 0 deletions src/repr/src/row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ impl Clone for Row {
data: SmallVec::from_slice(self.data.as_slice()),
}
}

fn clone_from(&mut self, source: &Self) {
self.data.clone_from(&source.data);
}
}

impl Row {
Expand Down