diff --git a/differential-dataflow/examples/columnar.rs b/differential-dataflow/examples/columnar.rs index b5af06ef2..e35ffa69c 100644 --- a/differential-dataflow/examples/columnar.rs +++ b/differential-dataflow/examples/columnar.rs @@ -7,8 +7,6 @@ use { timely::dataflow::ProbeHandle, }; -use differential_dataflow::trace::implementations::ord_neu::ColKeySpine; - use differential_dataflow::operators::arrange::arrangement::arrange_core; fn main() { @@ -44,10 +42,10 @@ fn main() { let data_pact = ExchangeCore::,_>::new_core(|x: &((&str,()),&u64,&i64)| (x.0).0.as_bytes().iter().map(|x| *x as u64).sum::() as u64); let keys_pact = ExchangeCore::,_>::new_core(|x: &((&str,()),&u64,&i64)| (x.0).0.as_bytes().iter().map(|x| *x as u64).sum::() as u64); - let data = arrange_core::<_,_,Col2KeyBatcher<_,_,_>, ColKeyBuilder<_,_,_>, ColKeySpine<_,_,_>>(&data, data_pact, "Data"); - let keys = arrange_core::<_,_,Col2KeyBatcher<_,_,_>, ColKeyBuilder<_,_,_>, ColKeySpine<_,_,_>>(&keys, keys_pact, "Keys"); + let data = arrange_core::<_,_,Col2ValBatcher<_,_,_,_>, ColValBuilder<_,_,_,_>, ColValSpine<_,_,_,_>>(&data, data_pact, "Data"); + let keys = arrange_core::<_,_,Col2ValBatcher<_,_,_,_>, ColValBuilder<_,_,_,_>, ColValSpine<_,_,_,_>>(&keys, keys_pact, "Keys"); - keys.join_core(&data, |_k, &(), &()| Option::<()>::None) + keys.join_core(&data, |_k, (), ()| Option::<()>::None) .probe_with(&mut probe); }); @@ -159,6 +157,7 @@ mod container { type BorrowedOf<'a, C> = <::Container as columnar::Container>::Borrowed<'a>; impl Column { + #[inline(always)] pub fn borrow(&self) -> BorrowedOf<'_, C> { match self { Column::Typed(t) => t.borrow(), @@ -166,6 +165,7 @@ mod container { Column::Align(a) => as FromBytes>::from_bytes(&mut Indexed::decode(a)), } } + #[inline(always)] pub fn get(&self, index: usize) -> columnar::Ref<'_, C> { self.borrow().get(index) } @@ -173,6 +173,7 @@ mod container { use timely::Container; impl Container for Column { + #[inline(always)] fn len(&self) -> usize { match self { Column::Typed(t) => t.len(), @@ -181,6 +182,7 @@ mod container { } } // This sets the `Bytes` variant to be an empty `Typed` variant, appropriate for pushing into. + #[inline(always)] fn clear(&mut self) { match self { Column::Typed(t) => t.clear(), @@ -191,6 +193,7 @@ mod container { type ItemRef<'a> = columnar::Ref<'a, C>; type Iter<'a> = IterOwn>; + #[inline(always)] fn iter<'a>(&'a self) -> Self::Iter<'a> { match self { Column::Typed(t) => t.borrow().into_index_iter(), @@ -201,6 +204,7 @@ mod container { type Item<'a> = columnar::Ref<'a, C>; type DrainIter<'a> = IterOwn>; + #[inline(always)] fn drain<'a>(&'a mut self) -> Self::DrainIter<'a> { match self { Column::Typed(t) => t.borrow().into_index_iter(), @@ -212,6 +216,7 @@ mod container { use timely::container::SizableContainer; impl SizableContainer for Column { + #[inline(always)] fn at_capacity(&self) -> bool { match self { Self::Typed(t) => { @@ -222,6 +227,7 @@ mod container { Self::Align(_) => true, } } + #[inline(always)] fn ensure_capacity(&mut self, _stash: &mut Option) { } } @@ -242,6 +248,8 @@ mod container { } use timely::dataflow::channels::ContainerBytes; + use differential_dataflow::trace::implementations::BatchContainer; + impl ContainerBytes for Column { fn from_bytes(bytes: timely::bytes::arc::Bytes) -> Self { // Our expectation / hope is that `bytes` is `u64` aligned and sized. @@ -259,6 +267,7 @@ mod container { } } + #[inline(always)] fn length_in_bytes(&self) -> usize { match self { // We'll need one u64 for the length, then the length rounded up to a multiple of 8. @@ -276,6 +285,64 @@ mod container { } } } + + impl BatchContainer for Column + where + for<'a> columnar::Ref<'a, T>: Ord, + { + type Owned = T; + type ReadItem<'a> = columnar::Ref<'a, T>; + + #[inline(always)] + fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned { + T::into_owned(item) + } + + #[inline(always)] + fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) { + other.copy_from(item); + } + + #[inline(always)] + fn push_ref(&mut self, item: Self::ReadItem<'_>) { + self.push(item); + } + + #[inline(always)] + fn push_own(&mut self, item: &Self::Owned) { + self.push(item); + } + + #[inline(always)] + fn clear(&mut self) { + Container::clear(self); + } + + #[inline(always)] + fn with_capacity(_size: usize) -> Self { + Self::default() + } + + #[inline(always)] + fn merge_capacity(cont1: &Self, cont2: &Self) -> Self { + Self::Typed(T::Container::with_capacity_for([cont1.borrow(), cont2.borrow()].into_iter())) + } + + #[inline(always)] + fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { + T::reborrow(item) + } + + #[inline(always)] + fn index(&self, index: usize) -> Self::ReadItem<'_> { + self.get(index) + } + + #[inline(always)] + fn len(&self) -> usize { + Container::len(self) + } + } } @@ -358,7 +425,7 @@ mod builder { impl> LengthPreservingContainerBuilder for ColumnBuilder { } } -use batcher::Col2KeyBatcher; +use batcher::{Col2KeyBatcher, Col2ValBatcher}; /// Types for consolidating, merging, and extracting columnar update collections. pub mod batcher { @@ -573,27 +640,56 @@ pub mod batcher { } -use dd_builder::ColKeyBuilder; +use dd_builder::{ColKeyBuilder, ColValBuilder, ColKeySpine, ColValSpine}; pub mod dd_builder { - + use std::rc::Rc; use columnar::Columnar; use differential_dataflow::trace::Builder; use differential_dataflow::trace::Description; - use differential_dataflow::trace::implementations::Layout; + use differential_dataflow::trace::implementations::{Layout, Update}; use differential_dataflow::trace::implementations::layout; use differential_dataflow::trace::implementations::BatchContainer; use differential_dataflow::trace::implementations::ord_neu::{OrdValBatch, val_batch::OrdValStorage, OrdKeyBatch, Vals, Upds, layers::UpdsBuilder}; use differential_dataflow::trace::implementations::ord_neu::key_batch::OrdKeyStorage; + use differential_dataflow::trace::implementations::spine_fueled::Spine; use crate::Column; use differential_dataflow::trace::rc_blanket_impls::RcBuilder; - use differential_dataflow::trace::implementations::TStack; - pub type ColValBuilder = RcBuilder>>; - pub type ColKeyBuilder = RcBuilder>>; + pub type ColValBuilder = RcBuilder>>; + pub type ColKeyBuilder = RcBuilder>>; + pub type ColKeySpine = Spine>>>; + pub type ColValSpine = Spine>>>; + + + /// A layout based on columns. + pub struct CStack { + phantom: std::marker::PhantomData, + } + + impl Layout for CStack + where + U: Update< + Key: Columnar, + Val: Columnar, + Time: Columnar, + Diff: Columnar + Ord, + >, + for<'a> columnar::Ref<'a, U::Key>: Ord, + for<'a> columnar::Ref<'a, U::Val>: Ord, + for<'a> columnar::Ref<'a, U::Time>: Ord, + for<'a> columnar::Ref<'a, U::Diff>: Ord, + { + type KeyContainer = Column; + type ValContainer = Column; + type TimeContainer = Column; + type DiffContainer = Column; + type OffsetContainer = differential_dataflow::trace::implementations::OffsetList; + } + /// A builder for creating layers from unsorted update tuples. pub struct OrdValBuilder { @@ -640,11 +736,25 @@ pub mod dd_builder { let mut key_con = L::KeyContainer::with_capacity(1); let mut val_con = L::ValContainer::with_capacity(1); + let mut owned_key = None; + let mut owned_val = None; for ((key,val),time,diff) in chunk.drain() { // It would be great to avoid. - let key = as Columnar>::into_owned(key); - let val = as Columnar>::into_owned(val); + let key = if let Some(owned_key) = &mut owned_key { + Columnar::copy_from(owned_key, key); + &*owned_key + } else { + owned_key = Some( as Columnar>::into_owned(key)); + owned_key.as_ref().unwrap() + }; + let val = if let Some(owned_val) = &mut owned_val { + Columnar::copy_from(owned_val, val); + &*owned_val + } else { + owned_val = Some( as Columnar>::into_owned(val)); + owned_val.as_ref().unwrap() + }; // These feel fine (wrt the other versions) let time = as Columnar>::into_owned(time); let diff = as Columnar>::into_owned(diff);