diff --git a/src/operators/arrange/arrangement.rs b/src/operators/arrange/arrangement.rs index 13bbb915b..d8fb9720d 100644 --- a/src/operators/arrange/arrangement.rs +++ b/src/operators/arrange/arrangement.rs @@ -75,6 +75,8 @@ where use ::timely::dataflow::scopes::Child; use ::timely::progress::timestamp::Refines; +use timely::Container; +use timely::container::PushInto; impl Arranged where @@ -292,7 +294,8 @@ where F: Fn(T2::Val<'_>) -> V + 'static, T2::Diff: Abelian, T2::Batch: Batch, - T2::Builder: Builder, + ::Input: Container, + ((T1::KeyOwned, V), T2::Time, T2::Diff): PushInto<::Input>, L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>)+'static, { self.reduce_core::<_,V,F,T2>(name, from, move |key, input, output, change| { @@ -311,7 +314,8 @@ where V: Data, F: Fn(T2::Val<'_>) -> V + 'static, T2::Batch: Batch, - T2::Builder: Builder, + ::Input: Container, + ((T1::KeyOwned, V), T2::Time, T2::Diff): PushInto<::Input>, L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static, { use crate::operators::reduce::reduce_trace; diff --git a/src/operators/arrange/upsert.rs b/src/operators/arrange/upsert.rs index 758ec8df3..65cdc0b4b 100644 --- a/src/operators/arrange/upsert.rs +++ b/src/operators/arrange/upsert.rs @@ -138,7 +138,7 @@ where F: Fn(Tr::Val<'_>) -> V + 'static, Tr::Time: TotalOrder+ExchangeData, Tr::Batch: Batch, - Tr::Builder: Builder, + Tr::Builder: Builder>, { let mut reader: Option> = None; @@ -282,9 +282,7 @@ where } // Must insert updates in (key, val, time) order. updates.sort(); - for update in updates.drain(..) { - builder.push(update); - } + builder.push(&mut updates); } let batch = builder.done(prev_frontier.clone(), upper.clone(), Antichain::from_elem(G::Timestamp::minimum())); prev_frontier.clone_from(&upper); diff --git a/src/operators/reduce.rs b/src/operators/reduce.rs index dda549bca..8711e66a0 100644 --- a/src/operators/reduce.rs +++ b/src/operators/reduce.rs @@ -5,6 +5,8 @@ //! to the key and the list of values. //! The function is expected to populate a list of output values. +use timely::Container; +use timely::container::PushInto; use crate::hashable::Hashable; use crate::{Data, ExchangeData, Collection}; use crate::difference::{Semigroup, Abelian}; @@ -252,7 +254,7 @@ pub trait ReduceCore where F: Fn(T2::Val<'_>) -> V + 'static, T2::Diff: Abelian, T2::Batch: Batch, - T2::Builder: Builder, + T2::Builder: Builder>, L: FnMut(&K, &[(&V, R)], &mut Vec<(V, T2::Diff)>)+'static, { self.reduce_core::<_,_,T2>(name, from, move |key, input, output, change| { @@ -274,7 +276,7 @@ pub trait ReduceCore where T2: for<'a> Trace=&'a K, Time=G::Timestamp>+'static, F: Fn(T2::Val<'_>) -> V + 'static, T2::Batch: Batch, - T2::Builder: Builder, + T2::Builder: Builder>, L: FnMut(&K, &[(&V, R)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static, ; } @@ -293,7 +295,7 @@ where F: Fn(T2::Val<'_>) -> V + 'static, T2: for<'a> Trace=&'a K, Time=G::Timestamp>+'static, T2::Batch: Batch, - T2::Builder: Builder, + T2::Builder: Builder>, L: FnMut(&K, &[(&V, R)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static, { self.arrange_by_key_named(&format!("Arrange: {}", name)) @@ -312,7 +314,8 @@ where V: Data, F: Fn(T2::Val<'_>) -> V + 'static, T2::Batch: Batch, - T2::Builder: Builder, + ::Input: Container, + ((T1::KeyOwned, V), T2::Time, T2::Diff): PushInto<::Input>, L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V,T2::Diff)>, &mut Vec<(V, T2::Diff)>)+'static, { let mut result_trace = None; @@ -454,6 +457,8 @@ where builders.push(T2::Builder::new()); } + let mut buffer = Default::default(); + // cursors for navigating input and output traces. let (mut source_cursor, source_storage): (T1::Cursor, _) = source_trace.cursor_through(lower_limit.borrow()).expect("failed to acquire source cursor"); let source_storage = &source_storage; @@ -531,7 +536,9 @@ where for index in 0 .. buffers.len() { buffers[index].1.sort_by(|x,y| x.0.cmp(&y.0)); for (val, time, diff) in buffers[index].1.drain(..) { - builders[index].push(((key.into_owned(), val), time, diff)); + ((key.into_owned(), val), time, diff).push_into(&mut buffer); + builders[index].push(&mut buffer); + buffer.clear(); } } } @@ -648,7 +655,7 @@ where where F: Fn(C2::Val<'_>) -> V, L: FnMut( - C1::Key<'a>, + C1::Key<'a>, &[(C1::Val<'a>, C1::Diff)], &mut Vec<(V, C2::Diff)>, &mut Vec<(V, C2::Diff)>, @@ -728,7 +735,7 @@ mod history_replay { where F: Fn(C2::Val<'_>) -> V, L: FnMut( - C1::Key<'a>, + C1::Key<'a>, &[(C1::Val<'a>, C1::Diff)], &mut Vec<(V, C2::Diff)>, &mut Vec<(V, C2::Diff)>, @@ -1020,7 +1027,7 @@ mod history_replay { new_interesting.push(next_time.clone()); debug_assert!(outputs.iter().any(|(t,_)| t.less_equal(&next_time))) } - + // Update `meet` to track the meet of each source of times. meet = None;//T::maximum(); diff --git a/src/trace/implementations/merge_batcher.rs b/src/trace/implementations/merge_batcher.rs index 45e2a60f8..bb13cf650 100644 --- a/src/trace/implementations/merge_batcher.rs +++ b/src/trace/implementations/merge_batcher.rs @@ -282,7 +282,7 @@ where type Time = T; type Input = Vec<((K, V), T, R)>; type Chunk = Vec<((K, V), T, R)>; - type Output = ((K, V), T, R); + type Output = Vec<((K, V), T, R)>; fn accept(&mut self, container: RefOrMut, stash: &mut Vec) -> Vec { // Ensure `self.pending` has the desired capacity. We should never have a larger capacity @@ -497,8 +497,8 @@ where } let mut builder = B::with_capacity(keys, vals, upds); - for datum in chain.drain(..).flatten() { - builder.push(datum); + for mut chunk in chain.drain(..) { + builder.push(&mut chunk); } builder.done(lower.to_owned(), upper.to_owned(), since.to_owned()) diff --git a/src/trace/implementations/merge_batcher_col.rs b/src/trace/implementations/merge_batcher_col.rs index aed0039d8..265f2e649 100644 --- a/src/trace/implementations/merge_batcher_col.rs +++ b/src/trace/implementations/merge_batcher_col.rs @@ -67,7 +67,7 @@ where type Time = T; type Input = Vec<((K, V), T, R)>; type Chunk = TimelyStack<((K, V), T, R)>; - type Output = ((K, V), T, R); + type Output = TimelyStack<((K, V), T, R)>; fn accept(&mut self, container: RefOrMut, stash: &mut Vec) -> Vec { // Ensure `self.pending` has the desired capacity. We should never have a larger capacity @@ -290,11 +290,8 @@ where } } let mut builder = B::with_capacity(keys, vals, upds); - - for chunk in chain.drain(..) { - for datum in chunk.iter() { - builder.copy(datum); - } + for mut chunk in chain.drain(..) { + builder.push(&mut chunk); } builder.done(lower.to_owned(), upper.to_owned(), since.to_owned()) diff --git a/src/trace/implementations/mod.rs b/src/trace/implementations/mod.rs index 128ec0bf1..d0e4a459a 100644 --- a/src/trace/implementations/mod.rs +++ b/src/trace/implementations/mod.rs @@ -54,7 +54,10 @@ pub use self::ord_neu::OrdKeySpine as KeySpine; use std::borrow::{ToOwned}; use std::cmp::Ordering; +use timely::Container; use timely::container::columnation::{Columnation, TimelyStack}; +use timely::container::PushInto; +use timely::progress::Timestamp; use crate::lattice::Lattice; use crate::difference::Semigroup; @@ -138,7 +141,7 @@ where /// A type with a preferred container. /// -/// Examples include types that implement `Clone` who prefer +/// Examples include types that implement `Clone` who prefer pub trait PreferredContainer : ToOwned { /// The preferred container for the type. type Container: BatchContainer; @@ -161,8 +164,8 @@ impl Update for Preferred where K: ToOwned + ?Sized, K::Owned: Ord+Clone+'static, - V: ToOwned + ?Sized + 'static, - V::Owned: Ord+Clone, + V: ToOwned + ?Sized, + V::Owned: Ord+Clone+'static, T: Ord+Lattice+timely::progress::Timestamp+Clone, R: Semigroup+Clone, { @@ -177,8 +180,8 @@ where K: Ord+ToOwned+PreferredContainer + ?Sized, K::Owned: Ord+Clone+'static, // for<'a> K::Container: BatchContainer = &'a K>, - V: Ord+ToOwned+PreferredContainer + ?Sized + 'static, - V::Owned: Ord+Clone, + V: Ord+ToOwned+PreferredContainer + ?Sized, + V::Owned: Ord+Clone+'static, T: Ord+Lattice+timely::progress::Timestamp+Clone, D: Semigroup+Clone, { @@ -195,7 +198,7 @@ use abomonation_derive::Abomonation; use crate::trace::cursor::MyTrait; /// A list of unsigned integers that uses `u32` elements as long as they are small enough, and switches to `u64` once they are not. -#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Debug, Abomonation)] +#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Abomonation)] pub struct OffsetList { /// Length of a prefix of zero elements. pub zero_prefix: usize, @@ -205,6 +208,12 @@ pub struct OffsetList { pub chonk: Vec, } +impl std::fmt::Debug for OffsetList { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_list().entries(self.into_iter()).finish() + } +} + impl OffsetList { /// Allocate a new list with a specified capacity. pub fn with_capacity(cap: usize) -> Self { @@ -222,7 +231,7 @@ impl OffsetList { else if self.chonk.is_empty() { if let Ok(smol) = offset.try_into() { self.smol.push(smol); - } + } else { self.chonk.push(offset.try_into().unwrap()) } @@ -249,6 +258,41 @@ impl OffsetList { } } +impl<'a> IntoIterator for &'a OffsetList { + type Item = usize; + type IntoIter = OffsetListIter<'a>; + + fn into_iter(self) -> Self::IntoIter { + OffsetListIter {list: self, index: 0 } + } +} + +/// An iterator for [`OffsetList`]. +pub struct OffsetListIter<'a> { + list: &'a OffsetList, + index: usize, +} + +impl<'a> Iterator for OffsetListIter<'a> { + type Item = usize; + + fn next(&mut self) -> Option { + if self.index < self.list.len() { + let res = Some(self.list.index(self.index)); + self.index += 1; + res + } else { + None + } + } +} + +impl PushInto for usize { + fn push_into(self, target: &mut OffsetList) { + target.push(self); + } +} + /// Helper struct to provide `MyTrait` for `Copy` types. #[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Copy)] pub struct Wrapper(T); @@ -320,12 +364,111 @@ impl BatchContainer for OffsetList { } } +/// Behavior to split an update into principal components. +pub trait BuilderInput: Container { + /// Key portion + type Key<'a>: Ord; + /// Value portion + type Val<'a>: Ord; + /// Time + type Time; + /// Diff + type Diff; + + /// Split an item into separate parts. + fn into_parts<'a>(item: Self::Item<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff); + + /// Test that the key equals a key in the layout's key container. + fn key_eq(this: &Self::Key<'_>, other: ::ReadItem<'_>) -> bool; + + /// Test that the value equals a key in the layout's value container. + fn val_eq(this: &Self::Val<'_>, other: ::ReadItem<'_>) -> bool; +} + +impl BuilderInput> for Vec<((K, V), T, R)> +where + K: Ord + Clone + 'static, + V: Ord + Clone + 'static, + T: Timestamp + Lattice + Clone + 'static, + R: Semigroup + Clone + 'static, +{ + type Key<'a> = K; + type Val<'a> = V; + type Time = T; + type Diff = R; + + fn into_parts<'a>(((key, val), time, diff): Self::Item<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff) { + (key, val, time, diff) + } + + fn key_eq(this: &K, other: &K) -> bool { + this == other + } + + fn val_eq(this: &V, other: &V) -> bool { + this == other + } +} + +impl BuilderInput> for TimelyStack<((K, V), T, R)> +where + K: Ord + Columnation + Clone + 'static, + V: Ord + Columnation + Clone + 'static, + T: Timestamp + Lattice + Columnation + Clone + 'static, + R: Semigroup + Columnation + Clone + 'static, +{ + type Key<'a> = &'a K; + type Val<'a> = &'a V; + type Time = T; + type Diff = R; + + fn into_parts<'a>(((key, val), time, diff): Self::Item<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff) { + (key, val, time.clone(), diff.clone()) + } + + fn key_eq(this: &&K, other: &K) -> bool { + *this == other + } + + fn val_eq(this: &&V, other: &V) -> bool { + *this == other + } +} + +impl BuilderInput> for TimelyStack<((::Owned, ::Owned), T, R)> +where + K: Ord+ToOwned+PreferredContainer + ?Sized, + K::Owned: Columnation + Ord+Clone+'static, + V: Ord+ToOwned+PreferredContainer + ?Sized, + V::Owned: Columnation + Ord+Clone+'static, + T: Columnation + Ord+Lattice+Timestamp+Clone, + R: Columnation + Semigroup+Clone, +{ + type Key<'a> = &'a K::Owned; + type Val<'a> = &'a V::Owned; + type Time = T; + type Diff = R; + + fn into_parts<'a>(((key, val), time, diff): Self::Item<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff) { + (key, val, time.clone(), diff.clone()) + } + + fn key_eq(this: &&K::Owned, other: <::Container as BatchContainer>::ReadItem<'_>) -> bool { + other.equals(this) + } + + fn val_eq(this: &&V::Owned, other: <::Container as BatchContainer>::ReadItem<'_>) -> bool { + other.equals(this) + } +} + pub use self::containers::{BatchContainer, SliceContainer}; /// Containers for data that resemble `Vec`, with leaner implementations. pub mod containers { use timely::container::columnation::{Columnation, TimelyStack}; + use timely::container::PushInto; use std::borrow::ToOwned; use crate::trace::MyTrait; @@ -498,6 +641,18 @@ pub mod containers { inner: Vec, } + impl PushInto> for &[B] { + fn push_into(self, target: &mut SliceContainer) { + target.copy(self) + } + } + + impl PushInto> for &Vec { + fn push_into(self, target: &mut SliceContainer) { + target.copy(self) + } + } + impl BatchContainer for SliceContainer where B: Ord + Clone + Sized + 'static, diff --git a/src/trace/implementations/ord_neu.rs b/src/trace/implementations/ord_neu.rs index a5afee109..ddc8a4409 100644 --- a/src/trace/implementations/ord_neu.rs +++ b/src/trace/implementations/ord_neu.rs @@ -9,6 +9,7 @@ //! and should consume fewer resources (computation and memory) when it applies. use std::rc::Rc; +use timely::container::columnation::{TimelyStack}; use crate::trace::implementations::spine_fueled::Spine; use crate::trace::implementations::merge_batcher::{MergeBatcher, VecMerger}; @@ -24,7 +25,7 @@ pub use self::key_batch::{OrdKeyBatch, OrdKeyBuilder}; pub type OrdValSpine = Spine< Rc>>, MergeBatcher, T>, - RcBuilder>>, + RcBuilder, Vec<((K,V),T,R)>>>, >; // /// A trace implementation for empty values using a spine of ordered lists. // pub type OrdKeySpine = Spine>>>; @@ -33,14 +34,14 @@ pub type OrdValSpine = Spine< pub type ColValSpine = Spine< Rc>>, MergeBatcher, T>, - RcBuilder>>, + RcBuilder, TimelyStack<((K,V),T,R)>>>, >; /// A trace implementation using a spine of ordered lists. pub type OrdKeySpine = Spine< Rc>>, MergeBatcher, T>, - RcBuilder>>, + RcBuilder, Vec<((K,()),T,R)>>>, >; // /// A trace implementation for empty values using a spine of ordered lists. // pub type OrdKeySpine = Spine>>>; @@ -49,28 +50,30 @@ pub type OrdKeySpine = Spine< pub type ColKeySpine = Spine< Rc>>, MergeBatcher, T>, - RcBuilder>>, + RcBuilder, TimelyStack<((K,()),T,R)>>>, >; /// A trace implementation backed by columnar storage. pub type PreferredSpine = Spine< Rc>>, MergeBatcher::Owned,::Owned),T,R)>,T>, - RcBuilder>>, + RcBuilder, TimelyStack<((::Owned,::Owned),T,R)>>>, >; // /// A trace implementation backed by columnar storage. // pub type ColKeySpine = Spine>>>; + mod val_batch { use std::marker::PhantomData; use abomonation_derive::Abomonation; + use timely::container::PushInto; use timely::progress::{Antichain, frontier::AntichainRef}; use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger}; - use crate::trace::implementations::BatchContainer; + use crate::trace::implementations::{BatchContainer, BuilderInput}; use crate::trace::cursor::MyTrait; use super::{Layout, Update}; @@ -148,7 +151,7 @@ mod val_batch { OrdValCursor { key_cursor: 0, val_cursor: 0, - phantom: std::marker::PhantomData, + phantom: PhantomData, } } fn len(&self) -> usize { @@ -189,7 +192,7 @@ mod val_batch { impl Merger> for OrdValMerger where - OrdValBatch: Batch::Time> + OrdValBatch: Batch::Time>, { fn new(batch1: &OrdValBatch, batch2: &OrdValBatch, compaction_frontier: AntichainRef<::Time>) -> Self { @@ -498,7 +501,7 @@ mod val_batch { } /// A builder for creating layers from unsorted update tuples. - pub struct OrdValBuilder { + pub struct OrdValBuilder { result: OrdValStorage, singleton: Option<(::Time, ::Diff)>, /// Counts the number of singleton optimizations we performed. @@ -506,9 +509,10 @@ mod val_batch { /// This number allows us to correctly gauge the total number of updates reflected in a batch, /// even though `updates.len()` may be much shorter than this amount. singletons: usize, + _marker: PhantomData, } - impl OrdValBuilder { + impl OrdValBuilder { /// Pushes a single update, which may set `self.singleton` rather than push. /// /// This operation is meant to be equivalent to `self.results.updates.push((time, diff))`. @@ -536,9 +540,15 @@ mod val_batch { } } - impl Builder for OrdValBuilder { + impl Builder for OrdValBuilder + where + L: Layout, + CI: for<'a> BuilderInput::Time, Diff=::Diff>, + for<'a> CI::Key<'a>: PushInto, + for<'a> CI::Val<'a>: PushInto, + { - type Input = ((::Key, ::Val), ::Time, ::Diff); + type Input = CI; type Time = ::Time; type Output = OrdValBatch; @@ -554,62 +564,35 @@ mod val_batch { }, singleton: None, singletons: 0, + _marker: PhantomData, } } #[inline] - fn push(&mut self, ((key, val), time, diff): Self::Input) { - - // Perhaps this is a continuation of an already received key. - if self.result.keys.last().map(|k| k.equals(&key)).unwrap_or(false) { - // Perhaps this is a continuation of an already received value. - if self.result.vals.last().map(|v| v.equals(&val)).unwrap_or(false) { - self.push_update(time, diff); + fn push(&mut self, chunk: &mut Self::Input) { + for item in chunk.drain() { + let (key, val, time, diff) = CI::into_parts(item); + // Perhaps this is a continuation of an already received key. + if self.result.keys.last().map(|k| CI::key_eq(&key, k)).unwrap_or(false) { + // Perhaps this is a continuation of an already received value. + if self.result.vals.last().map(|v| CI::val_eq(&val, v)).unwrap_or(false) { + self.push_update(time, diff); + } else { + // New value; complete representation of prior value. + self.result.vals_offs.push(self.result.updates.len()); + if self.singleton.take().is_some() { self.singletons += 1; } + self.push_update(time, diff); + val.push_into(&mut self.result.vals); + } } else { - // New value; complete representation of prior value. + // New key; complete representation of prior key. self.result.vals_offs.push(self.result.updates.len()); if self.singleton.take().is_some() { self.singletons += 1; } + self.result.keys_offs.push(self.result.vals.len()); self.push_update(time, diff); - self.result.vals.push(val); - } - } else { - // New key; complete representation of prior key. - self.result.vals_offs.push(self.result.updates.len()); - if self.singleton.take().is_some() { self.singletons += 1; } - self.result.keys_offs.push(self.result.vals.len()); - self.push_update(time, diff); - self.result.vals.push(val); - self.result.keys.push(key); - } - } - - #[inline] - fn copy(&mut self, ((key, val), time, diff): &Self::Input) { - - // Perhaps this is a continuation of an already received key. - if self.result.keys.last().map(|k| k.equals(key)).unwrap_or(false) { - // Perhaps this is a continuation of an already received value. - if self.result.vals.last().map(|v| v.equals(val)).unwrap_or(false) { - // TODO: here we could look for repetition, and not push the update in that case. - // More logic (and state) would be required to correctly wrangle this. - self.push_update(time.clone(), diff.clone()); - } else { - // New value; complete representation of prior value. - self.result.vals_offs.push(self.result.updates.len()); - // Remove any pending singleton, and if it was set increment our count. - if self.singleton.take().is_some() { self.singletons += 1; } - self.push_update(time.clone(), diff.clone()); - self.result.vals.copy_push(val); + val.push_into(&mut self.result.vals); + key.push_into(&mut self.result.keys); } - } else { - // New key; complete representation of prior key. - self.result.vals_offs.push(self.result.updates.len()); - // Remove any pending singleton, and if it was set increment our count. - if self.singleton.take().is_some() { self.singletons += 1; } - self.result.keys_offs.push(self.result.vals.len()); - self.push_update(time.clone(), diff.clone()); - self.result.vals.copy_push(val); - self.result.keys.copy_push(key); } } @@ -634,10 +617,11 @@ mod key_batch { use std::marker::PhantomData; use abomonation_derive::Abomonation; + use timely::container::PushInto; use timely::progress::{Antichain, frontier::AntichainRef}; use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger}; - use crate::trace::implementations::BatchContainer; + use crate::trace::implementations::{BatchContainer, BuilderInput}; use crate::trace::cursor::MyTrait; use super::{Layout, Update}; @@ -962,7 +946,7 @@ mod key_batch { } /// A builder for creating layers from unsorted update tuples. - pub struct OrdKeyBuilder { + pub struct OrdKeyBuilder { result: OrdKeyStorage, singleton: Option<(::Time, ::Diff)>, /// Counts the number of singleton optimizations we performed. @@ -970,9 +954,10 @@ mod key_batch { /// This number allows us to correctly gauge the total number of updates reflected in a batch, /// even though `updates.len()` may be much shorter than this amount. singletons: usize, + _marker: PhantomData, } - impl OrdKeyBuilder { + impl OrdKeyBuilder { /// Pushes a single update, which may set `self.singleton` rather than push. /// /// This operation is meant to be equivalent to `self.results.updates.push((time, diff))`. @@ -1000,9 +985,14 @@ mod key_batch { } } - impl Builder for OrdKeyBuilder { + impl Builder for OrdKeyBuilder + where + L: Layout, + CI: for<'a> BuilderInput::Time, Diff=::Diff>, + for<'a> CI::Key<'a>: PushInto, + { - type Input = ((::Key, ()), ::Time, ::Diff); + type Input = CI; type Time = ::Time; type Output = OrdKeyBatch; @@ -1016,38 +1006,25 @@ mod key_batch { }, singleton: None, singletons: 0, + _marker: PhantomData, } } #[inline] - fn push(&mut self, ((key, ()), time, diff): Self::Input) { - - // Perhaps this is a continuation of an already received key. - if self.result.keys.last().map(|k| k.equals(&key)).unwrap_or(false) { - self.push_update(time, diff); - } else { - // New key; complete representation of prior key. - self.result.keys_offs.push(self.result.updates.len()); - // Remove any pending singleton, and if it was set increment our count. - if self.singleton.take().is_some() { self.singletons += 1; } - self.push_update(time, diff); - self.result.keys.push(key); - } - } - - #[inline] - fn copy(&mut self, ((key, ()), time, diff): &Self::Input) { - - // Perhaps this is a continuation of an already received key. - if self.result.keys.last().map(|k| k.equals(key)).unwrap_or(false) { - self.push_update(time.clone(), diff.clone()); - } else { - // New key; complete representation of prior key. - self.result.keys_offs.push(self.result.updates.len()); - // Remove any pending singleton, and if it was set increment our count. - if self.singleton.take().is_some() { self.singletons += 1; } - self.push_update(time.clone(), diff.clone()); - self.result.keys.copy_push(key); + fn push(&mut self, chunk: &mut Self::Input) { + for item in chunk.drain() { + let (key, _val, time, diff) = CI::into_parts(item); + // Perhaps this is a continuation of an already received key. + if self.result.keys.last().map(|k| CI::key_eq(&key, k)).unwrap_or(false) { + self.push_update(time, diff); + } else { + // New key; complete representation of prior key. + self.result.keys_offs.push(self.result.updates.len()); + // Remove any pending singleton, and if it was set increment our count. + if self.singleton.take().is_some() { self.singletons += 1; } + self.push_update(time, diff); + key.push_into(&mut self.result.keys); + } } } diff --git a/src/trace/implementations/rhh.rs b/src/trace/implementations/rhh.rs index 8389b62a6..60ed6afd4 100644 --- a/src/trace/implementations/rhh.rs +++ b/src/trace/implementations/rhh.rs @@ -9,6 +9,7 @@ use std::rc::Rc; use std::cmp::Ordering; use abomonation_derive::Abomonation; +use timely::container::columnation::TimelyStack; use crate::Hashable; use crate::trace::implementations::merge_batcher::{MergeBatcher, VecMerger}; @@ -24,7 +25,7 @@ use self::val_batch::{RhhValBatch, RhhValBuilder}; pub type VecSpine = Spine< Rc>>, MergeBatcher, T>, - RcBuilder>>, + RcBuilder, Vec<((K,V),T,R)>>>, >; // /// A trace implementation for empty values using a spine of ordered lists. // pub type OrdKeySpine = Spine>>>; @@ -33,7 +34,7 @@ pub type VecSpine = Spine< pub type ColSpine = Spine< Rc>>, MergeBatcher, T>, - RcBuilder>>, + RcBuilder, TimelyStack<((K,V),T,R)>>>, >; // /// A trace implementation backed by columnar storage. // pub type ColKeySpine = Spine>>>; @@ -86,12 +87,13 @@ mod val_batch { use std::convert::TryInto; use std::marker::PhantomData; use abomonation_derive::Abomonation; + use timely::container::PushInto; use timely::progress::{Antichain, frontier::AntichainRef}; use crate::hashable::Hashable; use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger}; - use crate::trace::implementations::BatchContainer; + use crate::trace::implementations::{BatchContainer, BuilderInput}; use crate::trace::cursor::MyTrait; use super::{Layout, Update, HashOrdered}; @@ -695,7 +697,7 @@ mod val_batch { } /// A builder for creating layers from unsorted update tuples. - pub struct RhhValBuilder + pub struct RhhValBuilder where ::Key: Default + HashOrdered, { @@ -706,9 +708,10 @@ mod val_batch { /// This number allows us to correctly gauge the total number of updates reflected in a batch, /// even though `updates.len()` may be much shorter than this amount. singletons: usize, + _marker: PhantomData, } - impl RhhValBuilder + impl RhhValBuilder where ::Key: Default + HashOrdered, { @@ -739,12 +742,14 @@ mod val_batch { } } - impl Builder for RhhValBuilder + impl Builder for RhhValBuilder where ::Key: Default + HashOrdered, // RhhValBatch: Batch::Key, Val=::Val, Time=::Time, Diff=::Diff>, + CI: for<'a> BuilderInput = ::Key, Time=::Time, Diff=::Diff>, + for<'a> CI::Val<'a>: PushInto, { - type Input = ((::Key, ::Val), ::Time, ::Diff); + type Input = CI; type Time = ::Time; type Output = RhhValBatch; @@ -772,64 +777,36 @@ mod val_batch { }, singleton: None, singletons: 0, + _marker: PhantomData, } } #[inline] - fn push(&mut self, ((key, val), time, diff): Self::Input) { - - // Perhaps this is a continuation of an already received key. - if self.result.keys.last().map(|k| k.equals(&key)).unwrap_or(false) { - // Perhaps this is a continuation of an already received value. - if self.result.vals.last().map(|v| v.equals(&val)).unwrap_or(false) { - self.push_update(time, diff); + fn push(&mut self, chunk: &mut Self::Input) { + for item in chunk.drain() { + let (key, val, time, diff) = CI::into_parts(item); + // Perhaps this is a continuation of an already received key. + if self.result.keys.last().map(|k| CI::key_eq(&key, k)).unwrap_or(false) { + // Perhaps this is a continuation of an already received value. + if self.result.vals.last().map(|v| CI::val_eq(&val, v)).unwrap_or(false) { + self.push_update(time, diff); + } else { + // New value; complete representation of prior value. + self.result.vals_offs.push(self.result.updates.len()); + if self.singleton.take().is_some() { self.singletons += 1; } + self.push_update(time, diff); + val.push_into(&mut self.result.vals); + } } else { - // New value; complete representation of prior value. + // New key; complete representation of prior key. self.result.vals_offs.push(self.result.updates.len()); if self.singleton.take().is_some() { self.singletons += 1; } + self.result.keys_offs.push(self.result.vals.len()); self.push_update(time, diff); - self.result.vals.push(val); + val.push_into(&mut self.result.vals); + // Insert the key, but with no specified offset. + self.result.insert_key(key.borrow(), None); } - } else { - // New key; complete representation of prior key. - self.result.vals_offs.push(self.result.updates.len()); - if self.singleton.take().is_some() { self.singletons += 1; } - self.result.keys_offs.push(self.result.vals.len()); - self.push_update(time, diff); - self.result.vals.push(val); - // Insert the key, but with no specified offset. - self.result.insert_key(key.borrow(), None); - } - } - - #[inline] - fn copy(&mut self, ((key, val), time, diff): &Self::Input) { - - // Perhaps this is a continuation of an already received key. - if self.result.keys.last().map(|k| k.equals(key)).unwrap_or(false) { - // Perhaps this is a continuation of an already received value. - if self.result.vals.last().map(|v| v.equals(val)).unwrap_or(false) { - // TODO: here we could look for repetition, and not push the update in that case. - // More logic (and state) would be required to correctly wrangle this. - self.push_update(time.clone(), diff.clone()); - } else { - // New value; complete representation of prior value. - self.result.vals_offs.push(self.result.updates.len()); - // Remove any pending singleton, and if it was set increment our count. - if self.singleton.take().is_some() { self.singletons += 1; } - self.push_update(time.clone(), diff.clone()); - self.result.vals.copy_push(val); - } - } else { - // New key; complete representation of prior key. - self.result.vals_offs.push(self.result.updates.len()); - // Remove any pending singleton, and if it was set increment our count. - if self.singleton.take().is_some() { self.singletons += 1; } - self.result.keys_offs.push(self.result.vals.len()); - self.push_update(time.clone(), diff.clone()); - self.result.vals.copy_push(val); - // Insert the key, but with no specified offset. - self.result.insert_key(key, None); } } diff --git a/src/trace/mod.rs b/src/trace/mod.rs index afee7c22e..00b72fc1d 100644 --- a/src/trace/mod.rs +++ b/src/trace/mod.rs @@ -336,15 +336,10 @@ pub trait Builder: Sized { /// /// They represent respectively the number of distinct `key`, `(key, val)`, and total updates. fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self; - /// Adds an element to the batch. + /// Adds a chunk of elements to the batch. /// - /// The default implementation uses `self.copy` with references to the owned arguments. - /// One should override it if the builder can take advantage of owned arguments. - fn push(&mut self, element: Self::Input) { - self.copy(&element); - } - /// Adds an element to the batch. - fn copy(&mut self, element: &Self::Input); + /// Adds all elements from `chunk` to the builder and leaves `chunk` in an undefined state. + fn push(&mut self, chunk: &mut Self::Input); /// Completes building and returns the batch. fn done(self, lower: Antichain, upper: Antichain, since: Antichain) -> Self::Output; } @@ -454,8 +449,7 @@ pub mod rc_blanket_impls { type Time = B::Time; type Output = Rc; fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self { RcBuilder { builder: B::with_capacity(keys, vals, upds) } } - fn push(&mut self, element: Self::Input) { self.builder.push(element) } - fn copy(&mut self, element: &Self::Input) { self.builder.copy(element) } + fn push(&mut self, input: &mut Self::Input) { self.builder.push(input) } fn done(self, lower: Antichain, upper: Antichain, since: Antichain) -> Rc { Rc::new(self.builder.done(lower, upper, since)) } } @@ -561,8 +555,7 @@ pub mod abomonated_blanket_impls { type Time = B::Time; type Output = Abomonated>; fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self { AbomonatedBuilder { builder: B::with_capacity(keys, vals, upds) } } - fn push(&mut self, element: Self::Input) { self.builder.push(element) } - fn copy(&mut self, element: &Self::Input) { self.builder.copy(element) } + fn push(&mut self, input: &mut Self::Input) { self.builder.push(input) } fn done(self, lower: Antichain, upper: Antichain, since: Antichain) -> Self::Output { let batch = self.builder.done(lower, upper, since); let mut bytes = Vec::with_capacity(measure(&batch));