diff --git a/differential-dataflow/examples/columnar.rs b/differential-dataflow/examples/columnar.rs index b5af06ef2..ef4c56871 100644 --- a/differential-dataflow/examples/columnar.rs +++ b/differential-dataflow/examples/columnar.rs @@ -44,8 +44,9 @@ fn main() { let data_pact = ExchangeCore::,_>::new_core(|x: &((&str,()),&u64,&i64)| (x.0).0.as_bytes().iter().map(|x| *x as u64).sum::() as u64); let keys_pact = ExchangeCore::,_>::new_core(|x: &((&str,()),&u64,&i64)| (x.0).0.as_bytes().iter().map(|x| *x as u64).sum::() as u64); - let data = arrange_core::<_,_,Col2KeyBatcher<_,_,_>, ColKeyBuilder<_,_,_>, ColKeySpine<_,_,_>>(&data, data_pact, "Data"); - let keys = arrange_core::<_,_,Col2KeyBatcher<_,_,_>, ColKeyBuilder<_,_,_>, ColKeySpine<_,_,_>>(&keys, keys_pact, "Keys"); + use crate::batcher::Col2ValChunker; + let data = arrange_core::<_,_,_,Col2ValChunker,Col2KeyBatcher<_,_,_>, ColKeyBuilder<_,_,_>, ColKeySpine<_,_,_>>(&data, data_pact, "Data"); + let keys = arrange_core::<_,_,_,Col2ValChunker,Col2KeyBatcher<_,_,_>, ColKeyBuilder<_,_,_>, ColKeySpine<_,_,_>>(&keys, keys_pact, "Keys"); keys.join_core(&data, |_k, &(), &()| Option::<()>::None) .probe_with(&mut probe); @@ -373,7 +374,8 @@ pub mod batcher { use differential_dataflow::trace::implementations::merge_batcher::MergeBatcher; /// A batcher for columnar storage. - pub type Col2ValBatcher = MergeBatcher, Chunker>, merger::ColumnMerger<(K,V),T,R>>; + pub type Col2ValChunker = Chunker>; + pub type Col2ValBatcher = MergeBatcher>; pub type Col2KeyBatcher = Col2ValBatcher; // First draft: build a "chunker" and a "merger". @@ -408,11 +410,11 @@ pub mod batcher { impl<'a, D, T, R, C2> PushInto<&'a mut Column<(D, T, R)>> for Chunker where - D: for<'b> Columnar, + D: Columnar, for<'b> columnar::Ref<'b, D>: Ord, - T: for<'b> Columnar, + T: Columnar, for<'b> columnar::Ref<'b, T>: Ord, - R: for<'b> Columnar + for<'b> Semigroup>, + R: Columnar + for<'b> Semigroup>, for<'b> columnar::Ref<'b, R>: Ord, C2: Container + for<'b, 'c> PushInto<(columnar::Ref<'b, D>, columnar::Ref<'b, T>, &'c R)>, { diff --git a/differential-dataflow/examples/spines.rs b/differential-dataflow/examples/spines.rs index 3fada3d5f..e36623d51 100644 --- a/differential-dataflow/examples/spines.rs +++ b/differential-dataflow/examples/spines.rs @@ -28,23 +28,26 @@ fn main() { match mode.as_str() { "new" => { + use differential_dataflow::trace::implementations::ColumnationChunker; use differential_dataflow::trace::implementations::ord_neu::{ColKeyBatcher, ColKeyBuilder, ColKeySpine}; - let data = data.arrange::, ColKeyBuilder<_,_,_>, ColKeySpine<_,_,_>>(); - let keys = keys.arrange::, ColKeyBuilder<_,_,_>, ColKeySpine<_,_,_>>(); + let data = data.arrange::,ColKeyBatcher<_,_,_>, ColKeyBuilder<_,_,_>, ColKeySpine<_,_,_>>(); + let keys = keys.arrange::,ColKeyBatcher<_,_,_>, ColKeyBuilder<_,_,_>, ColKeySpine<_,_,_>>(); keys.join_core(&data, |_k, &(), &()| Option::<()>::None) .probe_with(&mut probe); }, "old" => { + use differential_dataflow::trace::implementations::VecChunker; use differential_dataflow::trace::implementations::ord_neu::{OrdKeyBatcher, RcOrdKeyBuilder, OrdKeySpine}; - let data = data.arrange::, RcOrdKeyBuilder<_,_,_>, OrdKeySpine<_,_,_>>(); - let keys = keys.arrange::, RcOrdKeyBuilder<_,_,_>, OrdKeySpine<_,_,_>>(); + let data = data.arrange::,OrdKeyBatcher<_,_,_>, RcOrdKeyBuilder<_,_,_>, OrdKeySpine<_,_,_>>(); + let keys = keys.arrange::,OrdKeyBatcher<_,_,_>, RcOrdKeyBuilder<_,_,_>, OrdKeySpine<_,_,_>>(); keys.join_core(&data, |_k, &(), &()| Option::<()>::None) .probe_with(&mut probe); }, "rhh" => { + use differential_dataflow::trace::implementations::VecChunker; use differential_dataflow::trace::implementations::rhh::{HashWrapper, VecBatcher, VecBuilder, VecSpine}; - let data = data.map(|x| HashWrapper { inner: x }).arrange::, VecBuilder<_,(),_,_>, VecSpine<_,(),_,_>>(); - let keys = keys.map(|x| HashWrapper { inner: x }).arrange::, VecBuilder<_,(),_,_>, VecSpine<_,(),_,_>>(); + let data = data.map(|x| HashWrapper { inner: x }).arrange::,VecBatcher<_,(),_,_>, VecBuilder<_,(),_,_>, VecSpine<_,(),_,_>>(); + let keys = keys.map(|x| HashWrapper { inner: x }).arrange::,VecBatcher<_,(),_,_>, VecBuilder<_,(),_,_>, VecSpine<_,(),_,_>>(); keys.join_core(&data, |_k, &(), &()| Option::<()>::None) .probe_with(&mut probe); }, diff --git a/differential-dataflow/src/operators/arrange/arrangement.rs b/differential-dataflow/src/operators/arrange/arrangement.rs index 05cab5328..00b229e79 100644 --- a/differential-dataflow/src/operators/arrange/arrangement.rs +++ b/differential-dataflow/src/operators/arrange/arrangement.rs @@ -30,7 +30,7 @@ use crate::{Data, ExchangeData, Collection, AsCollection, Hashable}; use crate::difference::Semigroup; use crate::lattice::Lattice; use crate::trace::{self, Trace, TraceReader, BatchReader, Batcher, Builder, Cursor}; -use crate::trace::implementations::{KeyBatcher, KeyBuilder, KeySpine, ValBatcher, ValBuilder, ValSpine}; +use crate::trace::implementations::{KeyBatcher, KeyBuilder, KeySpine, ValBatcher, ValBuilder, ValSpine, VecChunker}; use trace::wrappers::enter::{TraceEnter, BatchEnter,}; use trace::wrappers::enter_at::TraceEnter as TraceEnterAt; @@ -76,7 +76,7 @@ where use ::timely::dataflow::scopes::Child; use ::timely::progress::timestamp::Refines; use timely::Container; -use timely::container::PushInto; +use timely::container::{ContainerBuilder, PushInto}; impl Arranged where @@ -348,20 +348,22 @@ where G: Scope, { /// Arranges updates into a shared trace. - fn arrange(&self) -> Arranged> + fn arrange(&self) -> Arranged> where - Ba: Batcher + 'static, - Bu: Builder, + Chu: ContainerBuilder + for<'a> PushInto<&'a mut C>, + Ba: Batcher + 'static, + Bu: Builder, Tr: Trace + 'static, { - self.arrange_named::("Arrange") + self.arrange_named::("Arrange") } /// Arranges updates into a shared trace, with a supplied name. - fn arrange_named(&self, name: &str) -> Arranged> + fn arrange_named(&self, name: &str) -> Arranged> where - Ba: Batcher + 'static, - Bu: Builder, + Chu: ContainerBuilder + for<'a> PushInto<&'a mut C>, + Ba: Batcher + 'static, + Bu: Builder, Tr: Trace + 'static, ; } @@ -373,14 +375,15 @@ where V: ExchangeData, R: ExchangeData + Semigroup, { - fn arrange_named(&self, name: &str) -> Arranged> + fn arrange_named(&self, name: &str) -> Arranged> where - Ba: Batcher, Time=G::Timestamp> + 'static, - Bu: Builder, + Chu: ContainerBuilder + for<'a> PushInto<&'a mut Vec<((K, V), G::Timestamp, R)>>, + Ba: Batcher + 'static, + Bu: Builder, Tr: Trace + 'static, { let exchange = Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into()); - arrange_core::<_, _, Ba, Bu, _>(&self.inner, exchange, name) + arrange_core::<_, _, _, Chu, Ba, Bu, _>(&self.inner, exchange, name) } } @@ -389,12 +392,14 @@ where /// This operator arranges a stream of values into a shared trace, whose contents it maintains. /// It uses the supplied parallelization contract to distribute the data, which does not need to /// be consistently by key (though this is the most common). -pub fn arrange_core(stream: &StreamCore, pact: P, name: &str) -> Arranged> +pub fn arrange_core(stream: &StreamCore, pact: P, name: &str) -> Arranged> where G: Scope, - P: ParallelizationContract, - Ba: Batcher + 'static, - Bu: Builder, + P: ParallelizationContract, + C: Container + Clone + 'static, + Chu: ContainerBuilder + for<'a> PushInto<&'a mut C>, + Ba: Batcher + 'static, + Bu: Builder, Tr: Trace+'static, { // The `Arrange` operator is tasked with reacting to an advancing input @@ -443,6 +448,8 @@ where // Initialize to the minimal input frontier. let mut prev_frontier = Antichain::from_elem(::minimum()); + let mut chunker = Chu::default(); + move |input, output| { // As we receive data, we need to (i) stash the data and (ii) keep *enough* capabilities. @@ -451,7 +458,11 @@ where input.for_each(|cap, data| { capabilities.insert(cap.retain()); - batcher.push_container(data); + chunker.push_into(data); + while let Some(chunk) = chunker.extract() { + let chunk = std::mem::take(chunk); + batcher.push_into(chunk); + } }); // The frontier may have advanced by multiple elements, which is an issue because @@ -481,6 +492,11 @@ where // If there is at least one capability not in advance of the input frontier ... if capabilities.elements().iter().any(|c| !input.frontier().less_equal(c.time())) { + while let Some(chunk) = chunker.finish() { + let chunk = std::mem::take(chunk); + batcher.push_into(chunk); + } + let mut upper = Antichain::new(); // re-used allocation for sealing batches. // For each capability not in advance of the input frontier ... @@ -547,14 +563,15 @@ impl Arrange, { - fn arrange_named(&self, name: &str) -> Arranged> + fn arrange_named(&self, name: &str) -> Arranged> where - Ba: Batcher, Time=G::Timestamp> + 'static, - Bu: Builder, + Chu: ContainerBuilder + for<'a> PushInto<&'a mut Vec<((K, ()), G::Timestamp, R)>>, + Ba: Batcher + 'static, + Bu: Builder, Tr: Trace + 'static, { let exchange = Exchange::new(move |update: &((K,()),G::Timestamp,R)| (update.0).0.hashed().into()); - arrange_core::<_,_,Ba,Bu,_>(&self.map(|k| (k, ())).inner, exchange, name) + arrange_core::<_,_,_,Chu,Ba,Bu,_>(&self.map(|k| (k, ())).inner, exchange, name) } } @@ -587,7 +604,7 @@ where } fn arrange_by_key_named(&self, name: &str) -> Arranged>> { - self.arrange_named::,ValBuilder<_,_,_,_>,_>(name) + self.arrange_named::, ValBatcher<_,_,_,_>,ValBuilder<_,_,_,_>,_>(name) } } @@ -622,6 +639,6 @@ where fn arrange_by_self_named(&self, name: &str) -> Arranged>> { self.map(|k| (k, ())) - .arrange_named::,KeyBuilder<_,_,_>,_>(name) + .arrange_named::, KeyBatcher<_,_,_>,KeyBuilder<_,_,_>,_>(name) } } diff --git a/differential-dataflow/src/operators/consolidate.rs b/differential-dataflow/src/operators/consolidate.rs index 00ad3f4b9..1430ecfa2 100644 --- a/differential-dataflow/src/operators/consolidate.rs +++ b/differential-dataflow/src/operators/consolidate.rs @@ -6,6 +6,7 @@ //! underlying system can more clearly see that no work must be done in the later case, and we can //! drop out of, e.g. iterative computations. +use timely::container::{ContainerBuilder, PushInto}; use timely::dataflow::Scope; use crate::{Collection, ExchangeData, Hashable}; @@ -44,22 +45,23 @@ where /// }); /// ``` pub fn consolidate(&self) -> Self { - use crate::trace::implementations::{KeyBatcher, KeyBuilder, KeySpine}; - self.consolidate_named::,KeyBuilder<_,_,_>, KeySpine<_,_,_>,_>("Consolidate", |key,&()| key.clone()) + use crate::trace::implementations::{VecChunker, KeyBatcher, KeyBuilder, KeySpine}; + self.consolidate_named::,KeyBatcher<_, _, _>,KeyBuilder<_,_,_>, KeySpine<_,_,_>,_>("Consolidate", |key,&()| key.clone()) } /// As `consolidate` but with the ability to name the operator, specify the trace type, /// and provide the function `reify` to produce owned keys and values.. - pub fn consolidate_named(&self, name: &str, reify: F) -> Self + pub fn consolidate_named(&self, name: &str, reify: F) -> Self where - Ba: Batcher, Time=G::Timestamp> + 'static, + Chu: ContainerBuilder + for<'a> PushInto<&'a mut Vec<((D, ()), G::Timestamp, R)>>, + Ba: Batcher + 'static, Tr: for<'a> crate::trace::Trace+'static, - Bu: Builder, + Bu: Builder, F: Fn(Tr::Key<'_>, Tr::Val<'_>) -> D + 'static, { use crate::operators::arrange::arrangement::Arrange; self.map(|k| (k, ())) - .arrange_named::(name) + .arrange_named::(name) .as_collection(reify) } diff --git a/differential-dataflow/src/trace/implementations/merge_batcher.rs b/differential-dataflow/src/trace/implementations/merge_batcher.rs index 90a74f662..daf53e174 100644 --- a/differential-dataflow/src/trace/implementations/merge_batcher.rs +++ b/differential-dataflow/src/trace/implementations/merge_batcher.rs @@ -10,23 +10,16 @@ //! Implementations of `MergeBatcher` can be instantiated through the choice of both //! the chunker and the merger, provided their respective output and input types align. -use std::marker::PhantomData; - use timely::progress::frontier::AntichainRef; use timely::progress::{frontier::Antichain, Timestamp}; use timely::Container; -use timely::container::{ContainerBuilder, PushInto}; +use timely::container::PushInto; use crate::logging::{BatcherEvent, Logger}; use crate::trace::{Batcher, Builder, Description}; /// Creates batches from containers of unordered tuples. -/// -/// To implement `Batcher`, the container builder `C` must accept `&mut Input` as inputs, -/// and must produce outputs of type `M::Chunk`. -pub struct MergeBatcher { - /// Transforms input streams to chunks of sorted, consolidated data. - chunker: C, +pub struct MergeBatcher { /// A sequence of power-of-two length lists of sorted, consolidated containers. /// /// Do not push/pop directly but use the corresponding functions ([`Self::chain_push`]/[`Self::chain_pop`]). @@ -43,40 +36,24 @@ pub struct MergeBatcher { logger: Option, /// Timely operator ID. operator_id: usize, - /// The `Input` type needs to be called out as the type of container accepted, but it is not otherwise present. - _marker: PhantomData, } -impl Batcher for MergeBatcher +impl Batcher for MergeBatcher where - C: ContainerBuilder + Default + for<'a> PushInto<&'a mut Input>, M: Merger, { - type Input = Input; type Time = M::Time; - type Output = M::Chunk; + type Container = M::Chunk; fn new(logger: Option, operator_id: usize) -> Self { Self { logger, operator_id, - chunker: C::default(), merger: M::default(), chains: Vec::new(), stash: Vec::new(), frontier: Antichain::new(), lower: Antichain::from_elem(M::Time::minimum()), - _marker: PhantomData, - } - } - - /// Push a container of data into this merge batcher. Updates the internal chain structure if - /// needed. - fn push_container(&mut self, container: &mut Input) { - self.chunker.push_into(container); - while let Some(chunk) = self.chunker.extract() { - let chunk = std::mem::take(chunk); - self.insert_chain(vec![chunk]); } } @@ -84,13 +61,7 @@ where // in `upper`. All updates must have time greater or equal to the previously used `upper`, // which we call `lower`, by assumption that after sealing a batcher we receive no more // updates with times not greater or equal to `upper`. - fn seal>(&mut self, upper: Antichain) -> B::Output { - // Finish - while let Some(chunk) = self.chunker.finish() { - let chunk = std::mem::take(chunk); - self.insert_chain(vec![chunk]); - } - + fn seal>(&mut self, upper: Antichain) -> B::Output { // Merge all remaining chains into a single chain. while self.chains.len() > 1 { let list1 = self.chain_pop().unwrap(); @@ -125,8 +96,16 @@ where self.frontier.borrow() } } +impl PushInto for MergeBatcher +where + M: Merger, +{ + fn push_into(&mut self, item: M::Chunk) { + self.insert_chain(vec![item]); + } +} -impl MergeBatcher { +impl MergeBatcher { /// Insert a chain and maintain chain properties: Chains are geometrically sized and ordered /// by decreasing length. fn insert_chain(&mut self, chain: Vec) { @@ -190,7 +169,7 @@ impl MergeBatcher { } } -impl Drop for MergeBatcher { +impl Drop for MergeBatcher { fn drop(&mut self) { // Cleanup chain to retract accounting information. while self.chain_pop().is_some() {} diff --git a/differential-dataflow/src/trace/implementations/mod.rs b/differential-dataflow/src/trace/implementations/mod.rs index c9c15206a..0ab2370c2 100644 --- a/differential-dataflow/src/trace/implementations/mod.rs +++ b/differential-dataflow/src/trace/implementations/mod.rs @@ -47,6 +47,7 @@ pub mod huffman_container; pub mod chunker; // Opinionated takes on default spines. +pub use self::chunker::{ColumnationChunker, VecChunker}; pub use self::ord_neu::OrdValSpine as ValSpine; pub use self::ord_neu::OrdValBatcher as ValBatcher; pub use self::ord_neu::RcOrdValBuilder as ValBuilder; diff --git a/differential-dataflow/src/trace/implementations/ord_neu.rs b/differential-dataflow/src/trace/implementations/ord_neu.rs index 218fc612a..6aab8c168 100644 --- a/differential-dataflow/src/trace/implementations/ord_neu.rs +++ b/differential-dataflow/src/trace/implementations/ord_neu.rs @@ -11,7 +11,6 @@ use std::rc::Rc; use crate::containers::TimelyStack; -use crate::trace::implementations::chunker::{ColumnationChunker, VecChunker}; use crate::trace::implementations::spine_fueled::Spine; use crate::trace::implementations::merge_batcher::{MergeBatcher, VecMerger, ColMerger}; use crate::trace::rc_blanket_impls::RcBuilder; @@ -24,7 +23,7 @@ pub use self::key_batch::{OrdKeyBatch, OrdKeyBuilder}; /// A trace implementation using a spine of ordered lists. pub type OrdValSpine = Spine>>>; /// A batcher using ordered lists. -pub type OrdValBatcher = MergeBatcher, VecChunker<((K,V),T,R)>, VecMerger<(K, V), T, R>>; +pub type OrdValBatcher = MergeBatcher>; /// A builder using ordered lists. pub type RcOrdValBuilder = RcBuilder, Vec<((K,V),T,R)>>>; @@ -34,14 +33,14 @@ pub type RcOrdValBuilder = RcBuilder = Spine>>>; /// A batcher for columnar storage. -pub type ColValBatcher = MergeBatcher, ColumnationChunker<((K,V),T,R)>, ColMerger<(K,V),T,R>>; +pub type ColValBatcher = MergeBatcher>; /// A builder for columnar storage. pub type ColValBuilder = RcBuilder, TimelyStack<((K,V),T,R)>>>; /// A trace implementation using a spine of ordered lists. pub type OrdKeySpine = Spine>>>; /// A batcher for ordered lists. -pub type OrdKeyBatcher = MergeBatcher, VecChunker<((K,()),T,R)>, VecMerger<(K, ()), T, R>>; +pub type OrdKeyBatcher = MergeBatcher>; /// A builder for ordered lists. pub type RcOrdKeyBuilder = RcBuilder, Vec<((K,()),T,R)>>>; @@ -51,7 +50,7 @@ pub type RcOrdKeyBuilder = RcBuilder /// A trace implementation backed by columnar storage. pub type ColKeySpine = Spine>>>; /// A batcher for columnar storage -pub type ColKeyBatcher = MergeBatcher, ColumnationChunker<((K,()),T,R)>, ColMerger<(K,()),T,R>>; +pub type ColKeyBatcher = MergeBatcher>; /// A builder for columnar storage pub type ColKeyBuilder = RcBuilder, TimelyStack<((K,()),T,R)>>>; diff --git a/differential-dataflow/src/trace/implementations/rhh.rs b/differential-dataflow/src/trace/implementations/rhh.rs index 71d20b69d..ee2a1f0d1 100644 --- a/differential-dataflow/src/trace/implementations/rhh.rs +++ b/differential-dataflow/src/trace/implementations/rhh.rs @@ -12,7 +12,6 @@ use serde::{Deserialize, Serialize}; use crate::Hashable; use crate::containers::TimelyStack; -use crate::trace::implementations::chunker::{ColumnationChunker, VecChunker}; use crate::trace::implementations::merge_batcher::{MergeBatcher, VecMerger, ColMerger}; use crate::trace::implementations::spine_fueled::Spine; use crate::trace::rc_blanket_impls::RcBuilder; @@ -24,7 +23,7 @@ use self::val_batch::{RhhValBatch, RhhValBuilder}; /// A trace implementation using a spine of ordered lists. pub type VecSpine = Spine>>>; /// A batcher for ordered lists. -pub type VecBatcher = MergeBatcher, VecChunker<((K,V),T,R)>, VecMerger<(K, V), T, R>>; +pub type VecBatcher = MergeBatcher>; /// A builder for ordered lists. pub type VecBuilder = RcBuilder, Vec<((K,V),T,R)>>>; @@ -34,7 +33,7 @@ pub type VecBuilder = RcBuilder, Vec< /// A trace implementation backed by columnar storage. pub type ColSpine = Spine>>>; /// A batcher for columnar storage. -pub type ColBatcher = MergeBatcher, ColumnationChunker<((K,V),T,R)>, ColMerger<(K,V),T,R>>; +pub type ColBatcher = MergeBatcher>; /// A builder for columnar storage. pub type ColBuilder = RcBuilder, TimelyStack<((K,V),T,R)>>>; diff --git a/differential-dataflow/src/trace/mod.rs b/differential-dataflow/src/trace/mod.rs index 0197e09c0..7edfa0e9b 100644 --- a/differential-dataflow/src/trace/mod.rs +++ b/differential-dataflow/src/trace/mod.rs @@ -12,6 +12,7 @@ pub mod description; pub mod implementations; pub mod wrappers; +use timely::container::PushInto; use timely::progress::{Antichain, frontier::AntichainRef}; use timely::progress::Timestamp; @@ -299,20 +300,17 @@ pub trait Batch : BatchReader + Sized { fn empty(lower: Antichain, upper: Antichain) -> Self; } -/// Functionality for collecting and batching updates. -pub trait Batcher { - /// Type pushed into the batcher. - type Input; +/// Functionality for collecting and batching updates. Accepts chunks and transforms them into +/// chains of chunks. The chunks have type `Container` and must be sorted. +pub trait Batcher: PushInto { /// Type produced by the batcher. - type Output; + type Container: Default; /// Times at which batches are formed. type Time: Timestamp; /// Allocates a new empty batcher. fn new(logger: Option, operator_id: usize) -> Self; - /// Adds an unordered container of elements to the batcher. - fn push_container(&mut self, batch: &mut Self::Input); /// Returns all updates not greater or equal to an element of `upper`. - fn seal>(&mut self, upper: Antichain) -> B::Output; + fn seal>(&mut self, upper: Antichain) -> B::Output; /// Returns the lower envelope of contained update times. fn frontier(&mut self) -> AntichainRef<'_, Self::Time>; } diff --git a/differential-dataflow/tests/trace.rs b/differential-dataflow/tests/trace.rs index 54f111a7d..75911e5bc 100644 --- a/differential-dataflow/tests/trace.rs +++ b/differential-dataflow/tests/trace.rs @@ -1,3 +1,4 @@ +use timely::container::PushInto; use timely::dataflow::operators::generic::OperatorInfo; use timely::progress::{Antichain, frontier::AntichainRef}; @@ -14,7 +15,7 @@ fn get_trace() -> ValSpine { { let mut batcher = ValBatcher::::new(None, 0); - batcher.push_container(&mut vec![ + batcher.push_into(vec![ ((1, 2), 0, 1), ((2, 3), 1, 1), ((2, 3), 2, -1), diff --git a/experiments/src/bin/deals.rs b/experiments/src/bin/deals.rs index f9776d17a..0132f2512 100644 --- a/experiments/src/bin/deals.rs +++ b/experiments/src/bin/deals.rs @@ -6,7 +6,7 @@ use differential_dataflow::input::Input; use differential_dataflow::Collection; use differential_dataflow::operators::*; -use differential_dataflow::trace::implementations::{ValSpine, KeySpine, KeyBatcher, KeyBuilder, ValBatcher, ValBuilder}; +use differential_dataflow::trace::implementations::{ValSpine, KeySpine, KeyBatcher, KeyBuilder, ValBatcher, ValBuilder, VecChunker}; use differential_dataflow::operators::arrange::TraceAgent; use differential_dataflow::operators::arrange::Arranged; use differential_dataflow::operators::arrange::Arrange; @@ -41,7 +41,7 @@ fn main() { let (input, graph) = scope.new_collection(); // each edge should exist in both directions. - let graph = graph.arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(); + let graph = graph.arrange::, ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(); match program.as_str() { "tc" => tc(&graph).filter(move |_| inspect).map(|_| ()).consolidate().inspect(|x| println!("tc count: {:?}", x)).probe(), @@ -94,10 +94,10 @@ fn tc>(edges: &EdgeArranged) -> C let result = inner .map(|(x,y)| (y,x)) - .arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() + .arrange::,ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() .join_core(&edges, |_y,&x,&z| Some((x, z))) .concat(&edges.as_collection(|&k,&v| (k,v))) - .arrange::, KeyBuilder<_,_,_>, KeySpine<_,_,_>>() + .arrange::,KeyBatcher<_,_,_>, KeyBuilder<_,_,_>, KeySpine<_,_,_>>() .threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None }) ; @@ -121,12 +121,12 @@ fn sg>(edges: &EdgeArranged) -> C let result = inner - .arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() + .arrange::,ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() .join_core(&edges, |_,&x,&z| Some((x, z))) - .arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() + .arrange::,ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() .join_core(&edges, |_,&x,&z| Some((x, z))) .concat(&peers) - .arrange::, KeyBuilder<_,_,_>, KeySpine<_,_,_>>() + .arrange::,KeyBatcher<_,_,_>, KeyBuilder<_,_,_>, KeySpine<_,_,_>>() .threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None }) ; diff --git a/experiments/src/bin/graspan1.rs b/experiments/src/bin/graspan1.rs index e93bb5381..eee0c2940 100644 --- a/experiments/src/bin/graspan1.rs +++ b/experiments/src/bin/graspan1.rs @@ -6,7 +6,7 @@ use timely::order::Product; use differential_dataflow::difference::Present; use differential_dataflow::input::Input; -use differential_dataflow::trace::implementations::{ValBatcher, ValBuilder, ValSpine}; +use differential_dataflow::trace::implementations::{ValBatcher, ValBuilder, ValSpine, VecChunker}; use differential_dataflow::operators::*; use differential_dataflow::operators::arrange::Arrange; use differential_dataflow::operators::iterate::SemigroupVariable; @@ -31,7 +31,7 @@ fn main() { let (n_handle, nodes) = scope.new_collection(); let (e_handle, edges) = scope.new_collection(); - let edges = edges.arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(); + let edges = edges.arrange::,ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(); // a N c <- a N b && b E c // N(a,c) <- N(a,b), E(b, c) @@ -46,7 +46,7 @@ fn main() { let next = labels.join_core(&edges, |_b, a, c| Some((*c, *a))) .concat(&nodes) - .arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() + .arrange::,ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() // .distinct_total_core::(); .threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None }); diff --git a/experiments/src/bin/graspan2.rs b/experiments/src/bin/graspan2.rs index 042e9f486..bbc03137d 100644 --- a/experiments/src/bin/graspan2.rs +++ b/experiments/src/bin/graspan2.rs @@ -10,7 +10,7 @@ use differential_dataflow::Collection; use differential_dataflow::input::Input; use differential_dataflow::operators::*; use differential_dataflow::operators::arrange::Arrange; -use differential_dataflow::trace::implementations::{ValSpine, KeySpine, ValBatcher, KeyBatcher, ValBuilder, KeyBuilder}; +use differential_dataflow::trace::implementations::{ValSpine, KeySpine, ValBatcher, KeyBatcher, ValBuilder, KeyBuilder, VecChunker}; use differential_dataflow::difference::Present; type Node = u32; @@ -47,7 +47,7 @@ fn unoptimized() { .flat_map(|(a,b)| vec![a,b]) .concat(&dereference.flat_map(|(a,b)| vec![a,b])); - let dereference = dereference.arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(); + let dereference = dereference.arrange::,ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(); let (value_flow, memory_alias, value_alias) = scope @@ -60,14 +60,14 @@ fn unoptimized() { let value_flow = SemigroupVariable::new(scope, Product::new(Default::default(), 1)); let memory_alias = SemigroupVariable::new(scope, Product::new(Default::default(), 1)); - let value_flow_arranged = value_flow.arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(); - let memory_alias_arranged = memory_alias.arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(); + let value_flow_arranged = value_flow.arrange::,ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(); + let memory_alias_arranged = memory_alias.arrange::,ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(); // VA(a,b) <- VF(x,a),VF(x,b) // VA(a,b) <- VF(x,a),MA(x,y),VF(y,b) let value_alias_next = value_flow_arranged.join_core(&value_flow_arranged, |_,&a,&b| Some((a,b))); let value_alias_next = value_flow_arranged.join_core(&memory_alias_arranged, |_,&a,&b| Some((b,a))) - .arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() + .arrange::,ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() .join_core(&value_flow_arranged, |_,&a,&b| Some((a,b))) .concat(&value_alias_next); @@ -77,16 +77,16 @@ fn unoptimized() { let value_flow_next = assignment .map(|(a,b)| (b,a)) - .arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() + .arrange::,ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() .join_core(&memory_alias_arranged, |_,&a,&b| Some((b,a))) .concat(&assignment.map(|(a,b)| (b,a))) - .arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() + .arrange::,ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() .join_core(&value_flow_arranged, |_,&a,&b| Some((a,b))) .concat(&nodes.map(|n| (n,n))); let value_flow_next = value_flow_next - .arrange::, KeyBuilder<_,_,_>, KeySpine<_,_,_>>() + .arrange::,KeyBatcher<_,_,_>, KeyBuilder<_,_,_>, KeySpine<_,_,_>>() // .distinct_total_core::() .threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None }) ; @@ -95,12 +95,12 @@ fn unoptimized() { let memory_alias_next: Collection<_,_,Present> = value_alias_next .join_core(&dereference, |_x,&y,&a| Some((y,a))) - .arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() + .arrange::,ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() .join_core(&dereference, |_y,&a,&b| Some((a,b))); let memory_alias_next: Collection<_,_,Present> = memory_alias_next - .arrange::, KeyBuilder<_,_,_>, KeySpine<_,_,_>>() + .arrange::,KeyBatcher<_,_,_>, KeyBuilder<_,_,_>, KeySpine<_,_,_>>() // .distinct_total_core::() .threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None }) ; @@ -172,7 +172,7 @@ fn optimized() { .flat_map(|(a,b)| vec![a,b]) .concat(&dereference.flat_map(|(a,b)| vec![a,b])); - let dereference = dereference.arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(); + let dereference = dereference.arrange::,ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(); let (value_flow, memory_alias) = scope @@ -185,8 +185,8 @@ fn optimized() { let value_flow = SemigroupVariable::new(scope, Product::new(Default::default(), 1)); let memory_alias = SemigroupVariable::new(scope, Product::new(Default::default(), 1)); - let value_flow_arranged = value_flow.arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(); - let memory_alias_arranged = memory_alias.arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(); + let value_flow_arranged = value_flow.arrange::,ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(); + let memory_alias_arranged = memory_alias.arrange::,ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(); // VF(a,a) <- // VF(a,b) <- A(a,x),VF(x,b) @@ -194,13 +194,13 @@ fn optimized() { let value_flow_next = assignment .map(|(a,b)| (b,a)) - .arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() + .arrange::,ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() .join_core(&memory_alias_arranged, |_,&a,&b| Some((b,a))) .concat(&assignment.map(|(a,b)| (b,a))) - .arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() + .arrange::,ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() .join_core(&value_flow_arranged, |_,&a,&b| Some((a,b))) .concat(&nodes.map(|n| (n,n))) - .arrange::, KeyBuilder<_,_,_>, KeySpine<_,_,_>>() + .arrange::,KeyBatcher<_,_,_>, KeyBuilder<_,_,_>, KeySpine<_,_,_>>() // .distinct_total_core::() .threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None }) ; @@ -209,9 +209,9 @@ fn optimized() { let value_flow_deref = value_flow .map(|(a,b)| (b,a)) - .arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() + .arrange::,ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() .join_core(&dereference, |_x,&a,&b| Some((a,b))) - .arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(); + .arrange::,ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>(); // MA(a,b) <- VFD(x,a),VFD(y,b) // MA(a,b) <- VFD(x,a),MA(x,y),VFD(y,b) @@ -222,10 +222,10 @@ fn optimized() { let memory_alias_next = memory_alias_arranged .join_core(&value_flow_deref, |_x,&y,&a| Some((y,a))) - .arrange::, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() + .arrange::,ValBatcher<_,_,_,_>, ValBuilder<_,_,_,_>, ValSpine<_,_,_,_>>() .join_core(&value_flow_deref, |_y,&a,&b| Some((a,b))) .concat(&memory_alias_next) - .arrange::, KeyBuilder<_,_,_>, KeySpine<_,_,_>>() + .arrange::,KeyBatcher<_,_,_>, KeyBuilder<_,_,_>, KeySpine<_,_,_>>() // .distinct_total_core::() .threshold_semigroup(|_,_,x| if x.is_none() { Some(Present) } else { None }) ;