From f7cb6670ea64016a99eb44cc0cbbfdbc6aebdbba Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Wed, 3 Jan 2024 13:05:42 -0500 Subject: [PATCH] Introduce container for default values --- src/trace/implementations/mod.rs | 22 ++- src/trace/implementations/option_container.rs | 151 ++++++++++++++++++ 2 files changed, 166 insertions(+), 7 deletions(-) create mode 100644 src/trace/implementations/option_container.rs diff --git a/src/trace/implementations/mod.rs b/src/trace/implementations/mod.rs index b6a50990c..759649206 100644 --- a/src/trace/implementations/mod.rs +++ b/src/trace/implementations/mod.rs @@ -48,6 +48,7 @@ pub use self::merge_batcher::MergeBatcher as Batcher; pub mod ord_neu; pub mod rhh; pub mod huffman_container; +pub mod option_container; // Opinionated takes on default spines. pub use self::ord_neu::OrdValSpine as ValSpine; @@ -320,7 +321,7 @@ pub mod containers { use timely::container::columnation::{Columnation, TimelyStack}; - use std::borrow::{Borrow, ToOwned}; + use std::borrow::ToOwned; use crate::trace::MyTrait; /// A general-purpose container resembling `Vec`. @@ -332,13 +333,21 @@ pub mod containers { /// The type that can be read back out of the container. type ReadItem<'a>: Copy + MyTrait<'a, Owned = Self::PushItem> + for<'b> PartialOrd>; /// Inserts an owned item. - fn push(&mut self, item: Self::PushItem); + fn push(&mut self, item: Self::PushItem) { + self.copy_push(&item); + } /// Inserts an owned item. - fn copy_push(&mut self, item: &Self::PushItem); + fn copy_push(&mut self, item: &Self::PushItem) { + self.copy(MyTrait::borrow_as(item)); + } /// Inserts a borrowed item. fn copy(&mut self, item: Self::ReadItem<'_>); /// Extends from a range of items in another`Self`. - fn copy_range(&mut self, other: &Self, start: usize, end: usize); + fn copy_range(&mut self, other: &Self, start: usize, end: usize) { + for index in start .. end { + self.copy(other.index(index)); + } + } /// Creates a new container with sufficient capacity. fn with_capacity(size: usize) -> Self; /// Creates a new container with sufficient capacity. @@ -357,6 +366,8 @@ pub mod containers { None } } + /// Indicates if the length is zero. + fn is_empty(&self) -> bool { self.len() == 0 } /// Reports the number of elements satisfing the predicate. /// @@ -441,9 +452,6 @@ pub mod containers { type PushItem = T; type ReadItem<'a> = &'a Self::PushItem; - fn push(&mut self, item: Self::PushItem) { - self.copy(item.borrow()); - } fn copy_push(&mut self, item: &Self::PushItem) { self.copy(item); } diff --git a/src/trace/implementations/option_container.rs b/src/trace/implementations/option_container.rs new file mode 100644 index 000000000..4d9ab2122 --- /dev/null +++ b/src/trace/implementations/option_container.rs @@ -0,0 +1,151 @@ +//! A container optimized for identical contents. + +use crate::trace::cursor::MyTrait; +use crate::trace::implementations::BatchContainer; + +/// A container that effectively represents default values. +/// +/// This container is meant to be a minimal non-trivial container, +/// and may be useful in unifying `OrdVal` and `OrdKey` spines. +struct OptionContainer { + /// Number of default items pushed. + defaults: usize, + /// Spill-over for non-empty rows. + container: C, +} + +impl BatchContainer for OptionContainer +where + C: BatchContainer, + C::PushItem: Default + Ord, +{ + type PushItem = C::PushItem; + type ReadItem<'a> = OptionWrapper<'a, C>; + + fn push(&mut self, item: Self::PushItem) { + if item == Default::default() && self.container.is_empty() { + self.defaults += 1; + } + else { + self.container.push(item) + } + } + fn copy<'a>(&mut self, item: Self::ReadItem<'a>) { + if item.equals(&Default::default()) && self.container.is_empty() { + self.defaults += 1; + } + else { + if let Some(item) = item.inner { + self.container.copy(item); + } + else { + self.container.push(Default::default()); + } + } + } + fn with_capacity(size: usize) -> Self { + Self { + defaults: 0, + container: C::with_capacity(size), + } + } + fn merge_capacity(cont1: &Self, cont2: &Self) -> Self { + Self { + defaults: 0, + container: C::merge_capacity(&cont1.container, &cont2.container), + } + } + fn index(&self, index: usize) -> Self::ReadItem<'_> { + if index < self.defaults { + OptionWrapper { inner: None } + } + else { + OptionWrapper { inner: Some(self.container.index(index - self.defaults))} + } + } + fn len(&self) -> usize { + self.container.len() + self.defaults + } +} + +/// A read wrapper that presents as +struct OptionWrapper<'a, C: BatchContainer> { + inner: Option>, +} + +impl<'a, C: BatchContainer> Copy for OptionWrapper<'a, C> { } +impl<'a, C: BatchContainer> Clone for OptionWrapper<'a, C> { + fn clone(&self) -> Self { *self } +} + + +use std::cmp::Ordering; +impl<'a, 'b, C: BatchContainer> PartialEq> for OptionWrapper<'b, C> +where + C::PushItem: Default + Ord, +{ + fn eq(&self, other: &OptionWrapper<'a, C>) -> bool { + match (&self.inner, &other.inner) { + (None, None) => true, + (None, Some(item2)) => item2.equals(&Default::default()), + (Some(item1), None) => item1.equals(&Default::default()), + (Some(item1), Some(item2)) => item1.eq(item2) + } + } +} +impl<'a, C: BatchContainer> Eq for OptionWrapper<'a, C> where +C::PushItem: Default + Ord +{ } +impl<'a, 'b, C: BatchContainer> PartialOrd> for OptionWrapper<'b, C> where +C::PushItem: Default + Ord, +{ + fn partial_cmp(&self, other: &OptionWrapper<'a, C>) -> Option { + let default = Default::default(); + match (&self.inner, &other.inner) { + (None, None) => Some(Ordering::Equal), + (None, Some(item2)) => item2.partial_cmp(&C::ReadItem::<'_>::borrow_as(&default)).map(|x| x.reverse()), + (Some(item1), None) => item1.partial_cmp(&C::ReadItem::<'_>::borrow_as(&default)), + (Some(item1), Some(item2)) => item1.partial_cmp(item2) + } + } +} +impl<'a, C: BatchContainer> Ord for OptionWrapper<'a, C> where +C::PushItem: Default + Ord, +{ + fn cmp(&self, other: &Self) -> Ordering { + self.partial_cmp(other).unwrap() + } +} + + +impl<'a, C: BatchContainer> MyTrait<'a> for OptionWrapper<'a, C> +where + C::PushItem : Default + Ord, +{ + type Owned = C::PushItem; + + fn into_owned(self) -> Self::Owned { + self.inner.map(|r| r.into_owned()).unwrap_or_else(Default::default) + } + fn clone_onto(&self, other: &mut Self::Owned) { + if let Some(item) = &self.inner { + item.clone_onto(other) + } + else { + *other = Default::default(); + } + } + fn compare(&self, other: &Self::Owned) -> std::cmp::Ordering { + if let Some(item) = &self.inner { + item.compare(other) + } + else { + ::default().cmp(other) + } + } + fn borrow_as(other: &'a Self::Owned) -> Self { + Self { + inner: Some(<_>::borrow_as(other)) + } + } +}