Skip to content

Commit

Permalink
Introduce container for default values (TimelyDataflow#452)
Browse files Browse the repository at this point in the history
  • Loading branch information
frankmcsherry authored Jan 3, 2024
1 parent 621340d commit 66be417
Show file tree
Hide file tree
Showing 2 changed files with 166 additions and 7 deletions.
22 changes: 15 additions & 7 deletions src/trace/implementations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ pub use self::merge_batcher::MergeBatcher as Batcher;
pub mod ord_neu;
pub mod rhh;
pub mod huffman_container;
pub mod option_container;

// Opinionated takes on default spines.
pub use self::ord_neu::OrdValSpine as ValSpine;
Expand Down Expand Up @@ -320,7 +321,7 @@ pub mod containers {

use timely::container::columnation::{Columnation, TimelyStack};

use std::borrow::{Borrow, ToOwned};
use std::borrow::ToOwned;
use crate::trace::MyTrait;

/// A general-purpose container resembling `Vec<T>`.
Expand All @@ -332,13 +333,21 @@ pub mod containers {
/// The type that can be read back out of the container.
type ReadItem<'a>: Copy + MyTrait<'a, Owned = Self::PushItem> + for<'b> PartialOrd<Self::ReadItem<'b>>;
/// Inserts an owned item.
fn push(&mut self, item: Self::PushItem);
fn push(&mut self, item: Self::PushItem) {
self.copy_push(&item);
}
/// Inserts an owned item.
fn copy_push(&mut self, item: &Self::PushItem);
fn copy_push(&mut self, item: &Self::PushItem) {
self.copy(MyTrait::borrow_as(item));
}
/// Inserts a borrowed item.
fn copy(&mut self, item: Self::ReadItem<'_>);
/// Extends from a range of items in another`Self`.
fn copy_range(&mut self, other: &Self, start: usize, end: usize);
fn copy_range(&mut self, other: &Self, start: usize, end: usize) {
for index in start .. end {
self.copy(other.index(index));
}
}
/// Creates a new container with sufficient capacity.
fn with_capacity(size: usize) -> Self;
/// Creates a new container with sufficient capacity.
Expand All @@ -357,6 +366,8 @@ pub mod containers {
None
}
}
/// Indicates if the length is zero.
fn is_empty(&self) -> bool { self.len() == 0 }

/// Reports the number of elements satisfing the predicate.
///
Expand Down Expand Up @@ -441,9 +452,6 @@ pub mod containers {
type PushItem = T;
type ReadItem<'a> = &'a Self::PushItem;

fn push(&mut self, item: Self::PushItem) {
self.copy(item.borrow());
}
fn copy_push(&mut self, item: &Self::PushItem) {
self.copy(item);
}
Expand Down
151 changes: 151 additions & 0 deletions src/trace/implementations/option_container.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
//! A container optimized for identical contents.
use crate::trace::cursor::MyTrait;
use crate::trace::implementations::BatchContainer;

/// A container that effectively represents default values.
///
/// This container is meant to be a minimal non-trivial container,
/// and may be useful in unifying `OrdVal` and `OrdKey` spines.
pub struct OptionContainer<C> {
/// Number of default items pushed.
defaults: usize,
/// Spill-over for non-empty rows.
container: C,
}

impl<C> BatchContainer for OptionContainer<C>
where
C: BatchContainer,
C::PushItem: Default + Ord,
{
type PushItem = C::PushItem;
type ReadItem<'a> = OptionWrapper<'a, C>;

fn push(&mut self, item: Self::PushItem) {
if item == Default::default() && self.container.is_empty() {
self.defaults += 1;
}
else {
self.container.push(item)
}
}
fn copy<'a>(&mut self, item: Self::ReadItem<'a>) {
if item.equals(&Default::default()) && self.container.is_empty() {
self.defaults += 1;
}
else {
if let Some(item) = item.inner {
self.container.copy(item);
}
else {
self.container.push(Default::default());
}
}
}
fn with_capacity(size: usize) -> Self {
Self {
defaults: 0,
container: C::with_capacity(size),
}
}
fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
Self {
defaults: 0,
container: C::merge_capacity(&cont1.container, &cont2.container),
}
}
fn index(&self, index: usize) -> Self::ReadItem<'_> {
if index < self.defaults {
OptionWrapper { inner: None }
}
else {
OptionWrapper { inner: Some(self.container.index(index - self.defaults))}
}
}
fn len(&self) -> usize {
self.container.len() + self.defaults
}
}

/// A read wrapper capable of cheaply representing a default value.
pub struct OptionWrapper<'a, C: BatchContainer> {
inner: Option<C::ReadItem<'a>>,
}

impl<'a, C: BatchContainer> Copy for OptionWrapper<'a, C> { }
impl<'a, C: BatchContainer> Clone for OptionWrapper<'a, C> {
fn clone(&self) -> Self { *self }
}


use std::cmp::Ordering;
impl<'a, 'b, C: BatchContainer> PartialEq<OptionWrapper<'a, C>> for OptionWrapper<'b, C>
where
C::PushItem: Default + Ord,
{
fn eq(&self, other: &OptionWrapper<'a, C>) -> bool {
match (&self.inner, &other.inner) {
(None, None) => true,
(None, Some(item2)) => item2.equals(&Default::default()),
(Some(item1), None) => item1.equals(&Default::default()),
(Some(item1), Some(item2)) => item1.eq(item2)
}
}
}
impl<'a, C: BatchContainer> Eq for OptionWrapper<'a, C> where
C::PushItem: Default + Ord
{ }
impl<'a, 'b, C: BatchContainer> PartialOrd<OptionWrapper<'a, C>> for OptionWrapper<'b, C> where
C::PushItem: Default + Ord,
{
fn partial_cmp(&self, other: &OptionWrapper<'a, C>) -> Option<Ordering> {
let default = Default::default();
match (&self.inner, &other.inner) {
(None, None) => Some(Ordering::Equal),
(None, Some(item2)) => item2.partial_cmp(&C::ReadItem::<'_>::borrow_as(&default)).map(|x| x.reverse()),
(Some(item1), None) => item1.partial_cmp(&C::ReadItem::<'_>::borrow_as(&default)),
(Some(item1), Some(item2)) => item1.partial_cmp(item2)
}
}
}
impl<'a, C: BatchContainer> Ord for OptionWrapper<'a, C> where
C::PushItem: Default + Ord,
{
fn cmp(&self, other: &Self) -> Ordering {
self.partial_cmp(other).unwrap()
}
}


impl<'a, C: BatchContainer> MyTrait<'a> for OptionWrapper<'a, C>
where
C::PushItem : Default + Ord,
{
type Owned = C::PushItem;

fn into_owned(self) -> Self::Owned {
self.inner.map(|r| r.into_owned()).unwrap_or_else(Default::default)
}
fn clone_onto(&self, other: &mut Self::Owned) {
if let Some(item) = &self.inner {
item.clone_onto(other)
}
else {
*other = Default::default();
}
}
fn compare(&self, other: &Self::Owned) -> std::cmp::Ordering {
if let Some(item) = &self.inner {
item.compare(other)
}
else {
<C::PushItem>::default().cmp(other)
}
}
fn borrow_as(other: &'a Self::Owned) -> Self {
Self {
inner: Some(<_>::borrow_as(other))
}
}
}

0 comments on commit 66be417

Please sign in to comment.