@@ -17,14 +17,15 @@ use std::rc::Rc;
1717
1818use differential_dataflow:: hashable:: Hashable ;
1919use differential_dataflow:: lattice:: Lattice ;
20- use differential_dataflow:: operators:: arrange:: Arrange ;
20+ use differential_dataflow:: operators:: arrange:: { Arrange , Arranged , TraceAgent } ;
2121use differential_dataflow:: operators:: reduce:: ReduceCore ;
2222use differential_dataflow:: operators:: Consolidate ;
2323use differential_dataflow:: trace:: implementations:: ord:: OrdValSpine ;
2424use differential_dataflow:: AsCollection ;
2525use differential_dataflow:: Collection ;
26- use timely:: dataflow:: channels:: pact:: { Exchange , Pipeline } ;
26+ use timely:: dataflow:: channels:: pact:: { Exchange , ParallelizationContract , Pipeline } ;
2727use timely:: dataflow:: operators:: Operator ;
28+ use timely:: dataflow:: stream:: Stream ;
2829use timely:: dataflow:: Scope ;
2930
3031use mz_compute_client:: plan:: top_k:: {
@@ -303,11 +304,6 @@ where
303304 G : Scope ,
304305 G :: Timestamp : Lattice ,
305306 {
306- let shared = Rc :: new ( RefCell :: new ( monoids:: Top1MonoidShared {
307- order_key,
308- left : DatumVec :: new ( ) ,
309- right : DatumVec :: new ( ) ,
310- } ) ) ;
311307 // We can place our rows directly into the diff field, and only keep the relevant one
312308 // corresponding to evaluating our aggregate, instead of having to do a hierarchical
313309 // reduction.
@@ -327,43 +323,36 @@ where
327323 }
328324 } ) ;
329325
330- // We arrange the inputs ourself to force it into a leaner structure because we know we
331- // won't care about values.
332- //
333- // TODO: Could we use explode here? We'd lose the diff>0 assert and we'd have to impl Mul
334- // for the monoid, unclear if it's worth it.
335- let mut buffer = Default :: default ( ) ;
336- let partial: Collection < G , ( Row , ( ) ) , monoids:: Top1Monoid > = collection
337- . inner
338- . unary (
339- Exchange :: new ( move |( ( key, _) , _, _) : & ( ( Row , _ ) , _ , _ ) | key. hashed ( ) ) ,
340- "Top1MonotonicPrepare" ,
341- move |_cap, _op| {
342- move |input, output| {
343- while let Some ( ( cap, data) ) = input. next ( ) {
344- data. swap ( & mut buffer) ;
345- output. session ( & cap) . give_iterator ( buffer. drain ( ..) . map (
346- |( ( group_key, row) , time, diff) | {
347- assert ! ( diff > 0 ) ;
348- // NB: Top1 can throw out the diff since we've asserted that it's > 0. A more
349- // general TopK monoid would have to account for diff.
350- (
351- ( group_key, ( ) ) ,
352- time,
353- monoids:: Top1Monoid {
354- row,
355- shared : Rc :: clone ( & shared) ,
356- } ,
357- )
358- } ,
359- ) )
360- }
361- }
362- } ,
363- )
364- . as_collection ( ) ;
326+ // Firstly, we render a pipelined step for pre-aggregation per worker, mapping rows to diffs
365327 let arranged =
366- partial. arrange_core :: < _ , RowSpine < _ , _ , _ , _ > > ( Pipeline , "Top1MonotonicArrange" ) ;
328+ render_top1_preaggregation ( collection. inner , order_key. clone ( ) , Pipeline ) ;
329+
330+ // Secondly, we map rows back from the diff field in preparation for another step
331+ // Note that we do not use explode here since it's unclear whether we'd like to implement Multiply in the Top1Monoid
332+ let mut buffer = Default :: default ( ) ;
333+ let preaggregated = arranged. as_collection ( |row, ( ) | row. clone ( ) ) . inner . unary (
334+ Pipeline ,
335+ "Top1MonotonicMapToRow" ,
336+ move |_cap, _op| {
337+ move |input, output| {
338+ input. for_each ( |time, data| {
339+ data. swap ( & mut buffer) ;
340+ output. session ( & time) . give_iterator (
341+ buffer. drain ( ..) . map ( |x| ( ( x. 0 , x. 2 . into_row ( ) ) , x. 1 , 1 ) ) ,
342+ ) ;
343+ } )
344+ }
345+ } ,
346+ ) ;
347+
348+ // Thirdly, we render an exchange step to correctly shuffle among workers
349+ let arranged = render_top1_preaggregation (
350+ preaggregated,
351+ order_key,
352+ Exchange :: new ( move |( ( group_key, _) , _, _) : & ( ( Row , _ ) , _ , _ ) | group_key. hashed ( ) ) ,
353+ ) ;
354+
355+ // Lastly, we compute the result by a final reduction
367356 let result = arranged. reduce_abelian :: < _ , OrdValSpine < _ , _ , _ , _ > > ( "Top1Monotonic" , {
368357 move |_key, input, output| {
369358 let accum = & input[ 0 ] . 1 ;
@@ -374,6 +363,56 @@ where
374363 result. as_collection ( |_k, v| v. clone ( ) )
375364 }
376365
366+ fn render_top1_preaggregation < G , P > (
367+ input : Stream < G , ( ( Row , Row ) , G :: Timestamp , Diff ) > ,
368+ order_key : Vec < mz_expr:: ColumnOrder > ,
369+ pact : P ,
370+ ) -> Arranged < G , TraceAgent < RowSpine < Row , ( ) , G :: Timestamp , monoids:: Top1Monoid > > >
371+ where
372+ G : Scope ,
373+ G :: Timestamp : Lattice ,
374+ P : ParallelizationContract < G :: Timestamp , ( ( Row , Row ) , G :: Timestamp , Diff ) > ,
375+ {
376+ // We allocate a single reference-counted object to represent the desired ordering
377+ let shared = Rc :: new ( RefCell :: new ( monoids:: Top1MonoidShared {
378+ order_key,
379+ left : DatumVec :: new ( ) ,
380+ right : DatumVec :: new ( ) ,
381+ } ) ) ;
382+
383+ // We arrange the input ourselves to force it into a leaner structure because we know we
384+ // won't care about values.
385+ //
386+ // TODO: Could we use explode here? We'd lose the diff>0 assert and we'd have to impl Mul
387+ // for the monoid, unclear if it's worth it.
388+ let mut buffer = Default :: default ( ) ;
389+ let prepared: Collection < G , ( Row , ( ) ) , monoids:: Top1Monoid > = input
390+ . unary ( pact, "Top1MonotonicPrepare" , move |_cap, _op| {
391+ move |input, output| {
392+ while let Some ( ( cap, data) ) = input. next ( ) {
393+ data. swap ( & mut buffer) ;
394+ output. session ( & cap) . give_iterator ( buffer. drain ( ..) . map (
395+ |( ( group_key, row) , time, diff) | {
396+ assert ! ( diff > 0 ) ;
397+ // NB: Top1 can throw out the diff since we've asserted that it's > 0. A more
398+ // general TopK monoid would have to account for diff.
399+ (
400+ ( group_key, ( ) ) ,
401+ time,
402+ monoids:: Top1Monoid {
403+ row,
404+ shared : Rc :: clone ( & shared) ,
405+ } ,
406+ )
407+ } ,
408+ ) )
409+ }
410+ }
411+ } )
412+ . as_collection ( ) ;
413+ prepared. arrange_core :: < _ , RowSpine < _ , _ , _ , _ > > ( Pipeline , "Top1MonotonicArrange" )
414+ }
415+
377416 fn render_intra_ts_thinning < G > (
378417 collection : Collection < G , ( Row , Row ) , Diff > ,
379418 order_key : Vec < mz_expr:: ColumnOrder > ,
0 commit comments