From d0a1cabe12c76b58e6c79bbb2fcf16123c06ebcc Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Thu, 13 Jun 2024 16:06:22 -0400 Subject: [PATCH] Merge batcher input generic over containers (#494) Enable flatcontainers in merge batchers. --------- Signed-off-by: Moritz Hoffmann --- examples/spines.rs | 4 +- src/consolidation.rs | 194 +++++++++- src/trace/implementations/chunker.rs | 364 ++++++++++++++++++ src/trace/implementations/merge_batcher.rs | 127 ++---- .../implementations/merge_batcher_col.rs | 87 +---- .../implementations/merge_batcher_flat.rs | 332 ++++++++++++++++ src/trace/implementations/mod.rs | 161 ++++---- src/trace/implementations/ord_neu.rs | 41 +- src/trace/implementations/rhh.rs | 7 +- tests/bfs.rs | 154 ++++++-- 10 files changed, 1157 insertions(+), 314 deletions(-) create mode 100644 src/trace/implementations/chunker.rs create mode 100644 src/trace/implementations/merge_batcher_flat.rs diff --git a/examples/spines.rs b/examples/spines.rs index 9fa407977..6720575fe 100644 --- a/examples/spines.rs +++ b/examples/spines.rs @@ -66,8 +66,8 @@ fn main() { }, "flat" => { use differential_dataflow::trace::implementations::ord_neu::FlatKeySpine; - let data = data.arrange::>(); - let keys = keys.arrange::>(); + let data = data.arrange::>(); + let keys = keys.arrange::>(); keys.join_core(&data, |_k, (), ()| Option::<()>::None) .probe_with(&mut probe); } diff --git a/src/consolidation.rs b/src/consolidation.rs index 47573eed8..b9495d104 100644 --- a/src/consolidation.rs +++ b/src/consolidation.rs @@ -10,10 +10,15 @@ //! you need specific behavior, it may be best to defensively copy, paste, and maintain the //! specific behavior you require. +use std::cmp::Ordering; use std::collections::VecDeque; +use timely::Container; use timely::container::{ContainerBuilder, PushInto, SizableContainer}; +use timely::container::flatcontainer::{FlatStack, Push, Region}; +use timely::container::flatcontainer::impls::tuple::{TupleABCRegion, TupleABRegion}; use crate::Data; -use crate::difference::Semigroup; +use crate::difference::{IsZero, Semigroup}; +use crate::trace::cursor::IntoOwned; /// Sorts and consolidates `vec`. /// @@ -218,6 +223,136 @@ where } } +/// Layout of containers and their read items to be consolidated. +/// +/// This trait specifies behavior to extract keys and diffs from container's read +/// items. Consolidation accumulates the diffs per key. +/// +/// The trait requires `Container` to have access to its `Item` GAT. +pub trait ConsolidateLayout: Container { + /// Key portion of data, essentially everything minus the diff + type Key<'a>: Eq where Self: 'a; + + /// GAT diff type. + type Diff<'a>: IntoOwned<'a, Owned = Self::DiffOwned> where Self: 'a; + + /// Owned diff type. + type DiffOwned: for<'a> Semigroup>; + + /// Deconstruct an item into key and diff. Must be cheap. + fn into_parts(item: Self::Item<'_>) -> (Self::Key<'_>, Self::Diff<'_>); + + /// Push an element to a compatible container. + /// + /// This function is odd to have, so let's explain why it exists. Ideally, the container + /// would accept a `(key, diff)` pair and we wouldn't need this function. However, we + /// might never be in a position where this is true: Vectors can push any `T`, which would + /// collide with a specific implementation for pushing tuples of mixes GATs and owned types. + /// + /// For this reason, we expose a function here that takes a GAT key and an owned diff, and + /// leave it to the implementation to "patch" a suitable item that can be pushed into `self`. + fn push_with_diff(&mut self, key: Self::Key<'_>, diff: Self::DiffOwned); + + /// Compare two items by key to sort containers. + fn cmp(item1: &Self::Item<'_>, item2: &Self::Item<'_>) -> Ordering; +} + +impl ConsolidateLayout for Vec<(D, T, R)> +where + D: Ord + Clone + 'static, + T: Ord + Clone + 'static, + for<'a> R: Semigroup + IntoOwned<'a, Owned = R> + Clone + 'static, +{ + type Key<'a> = (D, T) where Self: 'a; + type Diff<'a> = R where Self: 'a; + type DiffOwned = R; + + fn into_parts((data, time, diff): Self::Item<'_>) -> (Self::Key<'_>, Self::Diff<'_>) { + ((data, time), diff) + } + + fn cmp<'a>(item1: &Self::Item<'_>, item2: &Self::Item<'_>) -> Ordering { + (&item1.0, &item1.1).cmp(&(&item2.0, &item2.1)) + } + + fn push_with_diff(&mut self, (data, time): Self::Key<'_>, diff: Self::DiffOwned) { + self.push((data, time, diff)); + } +} + +impl ConsolidateLayout for FlatStack, T, R>> +where + for<'a> K: Region + Push<::ReadItem<'a>> + Clone + 'static, + for<'a> K::ReadItem<'a>: Ord + Copy, + for<'a> V: Region + Push<::ReadItem<'a>> + Clone + 'static, + for<'a> V::ReadItem<'a>: Ord + Copy, + for<'a> T: Region + Push<::ReadItem<'a>> + Clone + 'static, + for<'a> T::ReadItem<'a>: Ord + Copy, + R: Region + Push<::Owned> + Clone + 'static, + for<'a> R::Owned: Semigroup>, +{ + type Key<'a> = (K::ReadItem<'a>, V::ReadItem<'a>, T::ReadItem<'a>) where Self: 'a; + type Diff<'a> = R::ReadItem<'a> where Self: 'a; + type DiffOwned = R::Owned; + + fn into_parts(((key, val), time, diff): Self::Item<'_>) -> (Self::Key<'_>, Self::Diff<'_>) { + ((key, val, time), diff) + } + + fn cmp<'a>(((key1, val1), time1, _diff1): &Self::Item<'_>, ((key2, val2), time2, _diff2): &Self::Item<'_>) -> Ordering { + (K::reborrow(*key1), V::reborrow(*val1), T::reborrow(*time1)).cmp(&(K::reborrow(*key2), V::reborrow(*val2), T::reborrow(*time2))) + } + + fn push_with_diff(&mut self, (key, value, time): Self::Key<'_>, diff: Self::DiffOwned) { + self.copy(((key, value), time, diff)); + } +} + +/// Consolidate the supplied container. +pub fn consolidate_container(container: &mut C, target: &mut C) { + // Sort input data + let mut permutation = Vec::new(); + permutation.extend(container.drain()); + permutation.sort_by(|a, b| C::cmp(a, b)); + + // Consolidate sorted data. + let mut previous: Option<(C::Key<'_>, C::DiffOwned)> = None; + // TODO: We should ensure that `target` has sufficient capacity, but `Container` doesn't + // offer a suitable API. + for item in permutation.drain(..) { + let (key, diff) = C::into_parts(item); + match &mut previous { + // Initial iteration, remeber key and diff. + // TODO: Opportunity for GatCow for diff. + None => previous = Some((key, diff.into_owned())), + Some((prevkey, d)) => { + // Second and following iteration, compare and accumulate or emit. + if key == *prevkey { + // Keys match, keep accumulating. + d.plus_equals(&diff); + } else { + // Keys don't match, write down result if non-zero. + if !d.is_zero() { + // Unwrap because we checked for `Some` above. + let (prevkey, diff) = previous.take().unwrap(); + target.push_with_diff(prevkey, diff); + } + // Remember current key and diff as `previous` + previous = Some((key, diff.into_owned())); + } + } + } + } + // Write any residual data, if non-zero. + if let Some((previtem, d)) = previous { + if !d.is_zero() { + target.push_with_diff(previtem, d); + } + } +} + + + #[cfg(test)] mod tests { use super::*; @@ -308,4 +443,61 @@ mod tests { assert_eq!((i, 0, 1), collected[i]); } } + + #[test] + fn test_consolidate_container() { + let mut data = vec![(1, 1, 1), (2, 1, 1), (1, 1, -1)]; + let mut target = Vec::default(); + data.sort(); + consolidate_container(&mut data, &mut target); + assert_eq!(target, [(2, 1, 1)]); + } + + #[cfg(not(debug_assertions))] + const LEN: usize = 256 << 10; + #[cfg(not(debug_assertions))] + const REPS: usize = 10 << 10; + + #[cfg(debug_assertions)] + const LEN: usize = 256 << 1; + #[cfg(debug_assertions)] + const REPS: usize = 10 << 1; + + #[test] + fn test_consolidator_duration() { + let mut data = Vec::with_capacity(LEN); + let mut data2 = Vec::with_capacity(LEN); + let mut target = Vec::new(); + let mut duration = std::time::Duration::default(); + for _ in 0..REPS { + data.clear(); + data2.clear(); + target.clear(); + data.extend((0..LEN).map(|i| (i/4, 1, -2isize + ((i % 4) as isize)))); + data2.extend((0..LEN).map(|i| (i/4, 1, -2isize + ((i % 4) as isize)))); + data.sort_by(|x,y| x.0.cmp(&y.0)); + let start = std::time::Instant::now(); + consolidate_container(&mut data, &mut target); + duration += start.elapsed(); + + consolidate_updates(&mut data2); + assert_eq!(target, data2); + } + println!("elapsed consolidator {duration:?}"); + } + + #[test] + fn test_consolidator_duration_vec() { + let mut data = Vec::with_capacity(LEN); + let mut duration = std::time::Duration::default(); + for _ in 0..REPS { + data.clear(); + data.extend((0..LEN).map(|i| (i/4, 1, -2isize + ((i % 4) as isize)))); + data.sort_by(|x,y| x.0.cmp(&y.0)); + let start = std::time::Instant::now(); + consolidate_updates(&mut data); + duration += start.elapsed(); + } + println!("elapsed vec {duration:?}"); + } } diff --git a/src/trace/implementations/chunker.rs b/src/trace/implementations/chunker.rs new file mode 100644 index 000000000..527a614d0 --- /dev/null +++ b/src/trace/implementations/chunker.rs @@ -0,0 +1,364 @@ +//! Organize streams of data into sorted chunks. + +use std::collections::VecDeque; +use timely::communication::message::RefOrMut; +use timely::Container; +use timely::container::columnation::{Columnation, TimelyStack}; +use timely::container::{ContainerBuilder, PushInto, SizableContainer}; +use crate::consolidation::{consolidate_updates, consolidate_container, ConsolidateLayout}; +use crate::difference::Semigroup; + +/// Chunk a stream of vectors into chains of vectors. +pub struct VecChunker { + pending: Vec, + ready: VecDeque>, + empty: Option>, +} + +impl Default for VecChunker { + fn default() -> Self { + Self { + pending: Vec::default(), + ready: VecDeque::default(), + empty: None, + } + } +} + +impl VecChunker<((K, V), T, R)> +where + K: Ord, + V: Ord, + T: Ord, + R: Semigroup, +{ + const BUFFER_SIZE_BYTES: usize = 8 << 10; + fn chunk_capacity() -> usize { + let size = ::std::mem::size_of::<((K, V), T, R)>(); + if size == 0 { + Self::BUFFER_SIZE_BYTES + } else if size <= Self::BUFFER_SIZE_BYTES { + Self::BUFFER_SIZE_BYTES / size + } else { + 1 + } + } + + /// Form chunks out of pending data, if needed. This function is meant to be applied to + /// potentially full buffers, and ensures that if the buffer was full when called it is at most + /// half full when the function returns. + /// + /// `form_chunk` does the following: + /// * If pending is full, consolidate. + /// * If after consolidation it's more than half full, peel off chunks, + /// leaving behind any partial chunk in pending. + fn form_chunk(&mut self) { + consolidate_updates(&mut self.pending); + if self.pending.len() >= Self::chunk_capacity() { + while self.pending.len() > Self::chunk_capacity() { + let mut chunk = Vec::with_capacity(Self::chunk_capacity()); + chunk.extend(self.pending.drain(..chunk.capacity())); + self.ready.push_back(chunk); + } + } + } +} + +impl<'a, K, V, T, R> PushInto>> for VecChunker<((K, V), T, R)> +where + K: Ord + Clone, + V: Ord + Clone, + T: Ord + Clone, + R: Semigroup + Clone, +{ + fn push_into(&mut self, container: RefOrMut<'a, Vec<((K, V), T, R)>>) { + // Ensure `self.pending` has the desired capacity. We should never have a larger capacity + // because we don't write more than capacity elements into the buffer. + // Important: Consolidation requires `pending` to have twice the chunk capacity to + // amortize its cost. Otherwise, it risks to do quadratic work. + if self.pending.capacity() < Self::chunk_capacity() * 2 { + self.pending.reserve(Self::chunk_capacity() * 2 - self.pending.len()); + } + + // `container` is either a shared reference or an owned allocations. + match container { + RefOrMut::Ref(vec) => { + let mut slice = &vec[..]; + while !slice.is_empty() { + let (head, tail) = slice.split_at(std::cmp::min(self.pending.capacity() - self.pending.len(), slice.len())); + slice = tail; + self.pending.extend_from_slice(head); + if self.pending.len() == self.pending.capacity() { + self.form_chunk(); + } + } + } + RefOrMut::Mut(vec) => { + let mut drain = vec.drain(..).peekable(); + while drain.peek().is_some() { + self.pending.extend((&mut drain).take(self.pending.capacity() - self.pending.len())); + if self.pending.len() == self.pending.capacity() { + self.form_chunk(); + } + } + } + } + } +} + +impl ContainerBuilder for VecChunker<((K, V), T, R)> +where + K: Ord + Clone + 'static, + V: Ord + Clone + 'static, + T: Ord + Clone + 'static, + R: Semigroup + Clone + 'static, +{ + type Container = Vec<((K, V), T, R)>; + + fn extract(&mut self) -> Option<&mut Self::Container> { + if let Some(ready) = self.ready.pop_front() { + self.empty = Some(ready); + self.empty.as_mut() + } else { + None + } + } + + fn finish(&mut self) -> Option<&mut Self::Container> { + if !self.pending.is_empty() { + consolidate_updates(&mut self.pending); + while !self.pending.is_empty() { + let mut chunk = Vec::with_capacity(Self::chunk_capacity()); + chunk.extend(self.pending.drain(..std::cmp::min(self.pending.len(), chunk.capacity()))); + self.ready.push_back(chunk); + } + } + self.empty = self.ready.pop_front(); + self.empty.as_mut() + } +} + +/// Chunk a stream of vectors into chains of vectors. +pub struct ColumnationChunker { + pending: Vec, + ready: VecDeque>, + empty: Option>, +} + +impl Default for ColumnationChunker { + fn default() -> Self { + Self { + pending: Vec::default(), + ready: VecDeque::default(), + empty: None, + } + } +} + +impl ColumnationChunker<((K, V), T, R)> +where + K: Columnation + Ord, + V: Columnation + Ord, + T: Columnation + Ord, + R: Columnation + Semigroup, +{ + const BUFFER_SIZE_BYTES: usize = 64 << 10; + fn chunk_capacity() -> usize { + let size = ::std::mem::size_of::<((K, V), T, R)>(); + if size == 0 { + Self::BUFFER_SIZE_BYTES + } else if size <= Self::BUFFER_SIZE_BYTES { + Self::BUFFER_SIZE_BYTES / size + } else { + 1 + } + } + + /// Form chunks out of pending data, if needed. This function is meant to be applied to + /// potentially full buffers, and ensures that if the buffer was full when called it is at most + /// half full when the function returns. + /// + /// `form_chunk` does the following: + /// * If pending is full, consolidate. + /// * If after consolidation it's more than half full, peel off chunks, + /// leaving behind any partial chunk in pending. + fn form_chunk(&mut self) { + consolidate_updates(&mut self.pending); + if self.pending.len() >= Self::chunk_capacity() { + while self.pending.len() > Self::chunk_capacity() { + let mut chunk = TimelyStack::with_capacity(Self::chunk_capacity()); + for item in self.pending.drain(..chunk.capacity()) { + chunk.copy(&item); + } + self.ready.push_back(chunk); + } + } + } +} + +impl<'a, K, V, T, R> PushInto>> for ColumnationChunker<((K, V), T, R)> +where + K: Columnation + Ord + Clone, + V: Columnation + Ord + Clone, + T: Columnation + Ord + Clone, + R: Columnation + Semigroup + Clone, +{ + fn push_into(&mut self, container: RefOrMut<'a, Vec<((K, V), T, R)>>) { + // Ensure `self.pending` has the desired capacity. We should never have a larger capacity + // because we don't write more than capacity elements into the buffer. + if self.pending.capacity() < Self::chunk_capacity() * 2 { + self.pending.reserve(Self::chunk_capacity() * 2 - self.pending.len()); + } + + // `container` is either a shared reference or an owned allocations. + match container { + RefOrMut::Ref(vec) => { + let mut slice = &vec[..]; + while !slice.is_empty() { + let (head, tail) = slice.split_at(std::cmp::min(self.pending.capacity() - self.pending.len(), slice.len())); + slice = tail; + self.pending.extend_from_slice(head); + if self.pending.len() == self.pending.capacity() { + self.form_chunk(); + } + } + } + RefOrMut::Mut(vec) => { + let mut drain = vec.drain(..).peekable(); + while drain.peek().is_some() { + self.pending.extend((&mut drain).take(self.pending.capacity() - self.pending.len())); + if self.pending.len() == self.pending.capacity() { + self.form_chunk(); + } + } + } + } + } +} + +impl ContainerBuilder for ColumnationChunker<((K, V), T, R)> +where + K: Columnation + Ord + Clone + 'static, + V: Columnation + Ord + Clone + 'static, + T: Columnation + Ord + Clone + 'static, + R: Columnation + Semigroup + Clone + 'static, +{ + type Container = TimelyStack<((K,V),T,R)>; + + fn extract(&mut self) -> Option<&mut Self::Container> { + if let Some(ready) = self.ready.pop_front() { + self.empty = Some(ready); + self.empty.as_mut() + } else { + None + } + } + + fn finish(&mut self) -> Option<&mut Self::Container> { + consolidate_updates(&mut self.pending); + while !self.pending.is_empty() { + let mut chunk = TimelyStack::with_capacity(Self::chunk_capacity()); + for item in self.pending.drain(..std::cmp::min(self.pending.len(), chunk.capacity())) { + chunk.copy(&item); + } + self.ready.push_back(chunk); + } + self.empty = self.ready.pop_front(); + self.empty.as_mut() + } +} + +/// Chunk a stream of containers into chains of vectors. +pub struct ContainerChunker { + pending: Output, + ready: VecDeque, + empty: Output, +} + +impl Default for ContainerChunker +where + Output: Default, +{ + fn default() -> Self { + Self { + pending: Output::default(), + ready: VecDeque::default(), + empty: Output::default(), + } + } +} + +impl<'a, Input, Output> PushInto> for ContainerChunker +where + Input: Container, + Output: SizableContainer + + ConsolidateLayout + + PushInto> + + PushInto>, +{ + fn push_into(&mut self, container: RefOrMut<'a, Input>) { + if self.pending.capacity() < Output::preferred_capacity() { + self.pending.reserve(Output::preferred_capacity() - self.pending.len()); + } + let form_batch = |this: &mut Self| { + if this.pending.len() == this.pending.capacity() { + consolidate_container(&mut this.pending, &mut this.empty); + std::mem::swap(&mut this.pending, &mut this.empty); + this.empty.clear(); + if this.pending.len() > this.pending.capacity() / 2 { + // Note that we're pushing non-full containers, which is a deviation from + // other implementation. The reason for this is that we cannot extract + // partial data from `this.pending`. We should revisit this in the future. + this.ready.push_back(std::mem::take(&mut this.pending)); + } + } + }; + match container { + RefOrMut::Ref(container) => { + for item in container.iter() { + self.pending.push(item); + form_batch(self); + } + } + RefOrMut::Mut(container) => { + for item in container.drain() { + self.pending.push(item); + form_batch(self); + } + } + } + } +} + +impl ContainerBuilder for ContainerChunker +where + Output: SizableContainer + ConsolidateLayout, +{ + type Container = Output; + + fn extract(&mut self) -> Option<&mut Self::Container> { + if let Some(ready) = self.ready.pop_front() { + self.empty = ready; + Some(&mut self.empty) + } else { + None + } + } + + fn finish(&mut self) -> Option<&mut Self::Container> { + if !self.pending.is_empty() { + consolidate_container(&mut self.pending, &mut self.empty); + std::mem::swap(&mut self.pending, &mut self.empty); + self.empty.clear(); + if !self.pending.is_empty() { + self.ready.push_back(std::mem::take(&mut self.pending)); + } + } + if let Some(ready) = self.ready.pop_front() { + self.empty = ready; + Some(&mut self.empty) + } else { + None + } + } +} diff --git a/src/trace/implementations/merge_batcher.rs b/src/trace/implementations/merge_batcher.rs index bb13cf650..cd4e7e72a 100644 --- a/src/trace/implementations/merge_batcher.rs +++ b/src/trace/implementations/merge_batcher.rs @@ -1,6 +1,7 @@ //! A general purpose `Batcher` implementation based on radix sort. use std::collections::VecDeque; +use std::marker::PhantomData; use timely::communication::message::RefOrMut; use timely::logging::WorkerIdentifier; @@ -8,17 +9,18 @@ use timely::logging_core::Logger; use timely::progress::frontier::AntichainRef; use timely::progress::{frontier::Antichain, Timestamp}; use timely::{Container, PartialOrder}; +use timely::container::{ContainerBuilder, PushInto}; -use crate::consolidation::consolidate_updates; use crate::difference::Semigroup; use crate::logging::{BatcherEvent, DifferentialEvent}; use crate::trace::{Batcher, Builder}; use crate::Data; /// Creates batches from unordered tuples. -pub struct MergeBatcher +pub struct MergeBatcher where - M: Merger, + C: ContainerBuilder + Default, + M: Merger