diff --git a/dogsdogsdogs/src/operators/half_join.rs b/dogsdogsdogs/src/operators/half_join.rs index 283284525..877b1399e 100644 --- a/dogsdogsdogs/src/operators/half_join.rs +++ b/dogsdogsdogs/src/operators/half_join.rs @@ -208,9 +208,9 @@ where // Use TOTAL ORDER to allow the release of `time`. yielded = yielded || yield_function(timer, work); if !yielded && !input2.frontier.frontier().iter().any(|t| comparison(t, initial)) { - use differential_dataflow::trace::cursor::MyTrait; - cursor.seek_key(&storage, MyTrait::borrow_as(key)); - if cursor.get_key(&storage) == Some(MyTrait::borrow_as(key)) { + use differential_dataflow::trace::cursor::IntoOwned; + cursor.seek_key(&storage, IntoOwned::borrow_as(key)); + if cursor.get_key(&storage) == Some(IntoOwned::borrow_as(key)) { while let Some(val2) = cursor.get_val(&storage) { cursor.map_times(&storage, |t, d| { if comparison(t, initial) { diff --git a/dogsdogsdogs/src/operators/lookup_map.rs b/dogsdogsdogs/src/operators/lookup_map.rs index e8ab279ff..f1369eab1 100644 --- a/dogsdogsdogs/src/operators/lookup_map.rs +++ b/dogsdogsdogs/src/operators/lookup_map.rs @@ -93,9 +93,9 @@ where for &mut (ref prefix, ref time, ref mut diff) in prefixes.iter_mut() { if !input2.frontier.less_equal(time) { logic2(prefix, &mut key1); - use differential_dataflow::trace::cursor::MyTrait; - cursor.seek_key(&storage, MyTrait::borrow_as(&key1)); - if cursor.get_key(&storage) == Some(MyTrait::borrow_as(&key1)) { + use differential_dataflow::trace::cursor::IntoOwned; + cursor.seek_key(&storage, IntoOwned::borrow_as(&key1)); + if cursor.get_key(&storage) == Some(IntoOwned::borrow_as(&key1)) { while let Some(value) = cursor.get_val(&storage) { let mut count = Tr::Diff::zero(); cursor.map_times(&storage, |t, d| { diff --git a/src/operators/arrange/upsert.rs b/src/operators/arrange/upsert.rs index 65cdc0b4b..a36781a5e 100644 --- a/src/operators/arrange/upsert.rs +++ b/src/operators/arrange/upsert.rs @@ -244,14 +244,14 @@ where let mut builder = Tr::Builder::new(); for (key, mut list) in to_process.drain(..) { - use trace::cursor::MyTrait; + use trace::cursor::IntoOwned; // The prior value associated with the key. let mut prev_value: Option = None; // Attempt to find the key in the trace. - trace_cursor.seek_key_owned(&trace_storage, &key); - if trace_cursor.get_key(&trace_storage).map(|k| k.equals(&key)).unwrap_or(false) { + trace_cursor.seek_key(&trace_storage, IntoOwned::borrow_as(&key)); + if trace_cursor.get_key(&trace_storage).map(|k| k.eq(&IntoOwned::borrow_as(&key))).unwrap_or(false) { // Determine the prior value associated with the key. while let Some(val) = trace_cursor.get_val(&trace_storage) { let mut count = 0; diff --git a/src/operators/consolidate.rs b/src/operators/consolidate.rs index 2235c4df9..86b7e729c 100644 --- a/src/operators/consolidate.rs +++ b/src/operators/consolidate.rs @@ -57,7 +57,7 @@ where Tr::Batcher: Batcher>, { use crate::operators::arrange::arrangement::Arrange; - use crate::trace::cursor::MyTrait; + use crate::trace::cursor::IntoOwned; self.map(|k| (k, ())) .arrange_named::(name) .as_collection(|d, _| d.into_owned()) diff --git a/src/operators/count.rs b/src/operators/count.rs index 0779bde06..42a030786 100644 --- a/src/operators/count.rs +++ b/src/operators/count.rs @@ -70,7 +70,7 @@ where move |input, output| { - use crate::trace::cursor::MyTrait; + use crate::trace::cursor::IntoOwned; input.for_each(|capability, batches| { batches.swap(&mut buffer); let mut session = output.session(&capability); diff --git a/src/operators/reduce.rs b/src/operators/reduce.rs index 8711e66a0..ffaf1652b 100644 --- a/src/operators/reduce.rs +++ b/src/operators/reduce.rs @@ -478,10 +478,10 @@ where while batch_cursor.key_valid(batch_storage) || exposed_position < exposed.len() { use std::borrow::Borrow; - use crate::trace::cursor::MyTrait; + use crate::trace::cursor::IntoOwned; // Determine the next key we will work on; could be synthetic, could be from a batch. - let key1 = exposed.get(exposed_position).map(|x| <_ as MyTrait>::borrow_as(&x.0)); + let key1 = exposed.get(exposed_position).map(|x| <_ as IntoOwned>::borrow_as(&x.0)); let key2 = batch_cursor.get_key(batch_storage); let key = match (key1, key2) { (Some(key1), Some(key2)) => ::std::cmp::min(key1, key2), @@ -497,7 +497,7 @@ where interesting_times.clear(); // Populate `interesting_times` with synthetic interesting times (below `upper_limit`) for this key. - while exposed.get(exposed_position).map(|x| x.0.borrow()).map(|k| key.equals(k)).unwrap_or(false) { + while exposed.get(exposed_position).map(|x| x.0.borrow()).map(|k| key.eq(& as IntoOwned>::borrow_as(&k))).unwrap_or(false) { interesting_times.push(exposed[exposed_position].1.clone()); exposed_position += 1; } diff --git a/src/trace/cursor/mod.rs b/src/trace/cursor/mod.rs index 927dba4b2..2631c2aea 100644 --- a/src/trace/cursor/mod.rs +++ b/src/trace/cursor/mod.rs @@ -14,56 +14,40 @@ pub mod cursor_list; pub use self::cursor_list::CursorList; use std::borrow::Borrow; -use std::cmp::Ordering; -/// A type that may be converted into and compared with another type. +/// A reference type corresponding to an owned type, supporting conversion in each direction. /// -/// The type must also be comparable with itself, and follow the same -/// order as if converting instances to `T` and comparing the results. -pub trait MyTrait<'a> : Ord { +/// This trait can be implemented by a GAT, and enables owned types to be borrowed as a GAT. +/// This trait is analogous to `ToOwned`, but not as prescriptive. Specifically, it avoids the +/// requirement that the other trait implement `Borrow`, for which a borrow must result in a +/// `&'self Borrowed`, which cannot move the lifetime into a GAT borrowed type. +pub trait IntoOwned<'a> { /// Owned type into which this type can be converted. type Owned; /// Conversion from an instance of this type to the owned type. fn into_owned(self) -> Self::Owned; - /// + /// Clones `self` onto an existing instance of the owned type. fn clone_onto(&self, other: &mut Self::Owned); - /// Indicates that `self <= other`; used for sorting. - fn compare(&self, other: &Self::Owned) -> Ordering; - /// `self <= other` - fn less_equals(&self, other: &Self::Owned) -> bool { - self.compare(other) != Ordering::Greater - } - /// `self == other` - fn equals(&self, other: &Self::Owned) -> bool { - self.compare(other) == Ordering::Equal - } - /// `self < other` - fn less_than(&self, other: &Self::Owned) -> bool { - self.compare(other) == Ordering::Less - } - /// Borrows an owned instance as onesself. - fn borrow_as(other: &'a Self::Owned) -> Self; + /// Borrows an owned instance as oneself. + fn borrow_as(owned: &'a Self::Owned) -> Self; } -impl<'a, T: Ord+ToOwned+?Sized> MyTrait<'a> for &'a T { +impl<'a, T: ToOwned+?Sized> IntoOwned<'a> for &'a T { type Owned = T::Owned; fn into_owned(self) -> Self::Owned { self.to_owned() } fn clone_onto(&self, other: &mut Self::Owned) { ::clone_into(self, other) } - fn compare(&self, other: &Self::Owned) -> Ordering { self.cmp(&other.borrow()) } - fn borrow_as(other: &'a Self::Owned) -> Self { - other.borrow() - } + fn borrow_as(owned: &'a Self::Owned) -> Self { owned.borrow() } } /// A cursor for navigating ordered `(key, val, time, diff)` updates. pub trait Cursor { /// Key by which updates are indexed. - type Key<'a>: Copy + Clone + MyTrait<'a, Owned = Self::KeyOwned>; + type Key<'a>: Copy + Clone + Ord + IntoOwned<'a, Owned = Self::KeyOwned>; /// Owned version of the above. type KeyOwned: Ord + Clone; /// Values associated with keys. - type Val<'a>: Copy + Clone + MyTrait<'a> + for<'b> PartialOrd>; + type Val<'a>: Copy + Clone + Ord + IntoOwned<'a> + for<'b> PartialOrd>; /// Timestamps associated with updates type Time: Timestamp + Lattice + Ord + Clone; /// Associated update. @@ -103,10 +87,6 @@ pub trait Cursor { fn step_key(&mut self, storage: &Self::Storage); /// Advances the cursor to the specified key. fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>); - /// Convenience method to get access by reference to an owned key. - fn seek_key_owned(&mut self, storage: &Self::Storage, key: &Self::KeyOwned) { - self.seek_key(storage, MyTrait::borrow_as(key)); - } /// Advances the cursor to the next value. fn step_val(&mut self, storage: &Self::Storage); diff --git a/src/trace/implementations/huffman_container.rs b/src/trace/implementations/huffman_container.rs index 290b5d870..4fa67b2c7 100644 --- a/src/trace/implementations/huffman_container.rs +++ b/src/trace/implementations/huffman_container.rs @@ -32,12 +32,8 @@ where } } -impl BatchContainer for HuffmanContainer -where - B: Ord + Clone + Sized + 'static, -{ - type PushItem = Vec; - type ReadItem<'a> = Wrapped<'a, B>; +use crate::trace::implementations::containers::Push; +impl Push> for HuffmanContainer { fn push(&mut self, item: Vec) { for x in item.iter() { *self.stats.entry(x.clone()).or_insert(0) += 1; } match &mut self.inner { @@ -51,10 +47,12 @@ where } } } - fn copy_push(&mut self, item: &Vec) { - use crate::trace::MyTrait; - self.copy(<_ as MyTrait>::borrow_as(item)); - } +} + +impl BatchContainer for HuffmanContainer { + type OwnedItem = Vec; + type ReadItem<'a> = Wrapped<'a, B>; + fn copy(&mut self, item: Self::ReadItem<'_>) { match item.decode() { Ok(decoded) => { @@ -152,7 +150,7 @@ impl Default for HuffmanContainer { mod wrapper { - use crate::trace::MyTrait; + use crate::trace::IntoOwned; use super::Encoded; pub struct Wrapped<'a, B: Ord> { @@ -205,7 +203,7 @@ mod wrapper { self.partial_cmp(other).unwrap() } } - impl<'a, B: Ord+Clone> MyTrait<'a> for Wrapped<'a, B> { + impl<'a, B: Ord+Clone> IntoOwned<'a> for Wrapped<'a, B> { type Owned = Vec; fn into_owned(self) -> Self::Owned { match self.decode() { @@ -220,14 +218,8 @@ mod wrapper { Err(bytes) => other.extend_from_slice(bytes), } } - fn compare(&self, other: &Self::Owned) -> std::cmp::Ordering { - match self.decode() { - Ok(decode) => decode.partial_cmp(other.iter()).unwrap(), - Err(bytes) => bytes.cmp(&other[..]), - } - } - fn borrow_as(other: &'a Self::Owned) -> Self { - Self { inner: Err(&other[..]) } + fn borrow_as(owned: &'a Self::Owned) -> Self { + Self { inner: Err(&owned[..]) } } } } diff --git a/src/trace/implementations/mod.rs b/src/trace/implementations/mod.rs index d0e4a459a..ea14b09ce 100644 --- a/src/trace/implementations/mod.rs +++ b/src/trace/implementations/mod.rs @@ -52,7 +52,6 @@ pub use self::ord_neu::OrdValSpine as ValSpine; pub use self::ord_neu::OrdKeySpine as KeySpine; use std::borrow::{ToOwned}; -use std::cmp::Ordering; use timely::Container; use timely::container::columnation::{Columnation, TimelyStack}; @@ -86,21 +85,22 @@ where type Diff = R; } +use crate::trace::implementations::containers::Push; + /// A type with opinions on how updates should be laid out. pub trait Layout { /// The represented update. type Target: Update + ?Sized; /// Container for update keys. - type KeyContainer: - BatchContainer::Key>; + type KeyContainer: BatchContainer::Key> + Push<::Key>; /// Container for update vals. - type ValContainer: - BatchContainer::Val>; + type ValContainer: BatchContainer::Val> + Push<::Val>; /// Container for update vals. type UpdContainer: - for<'a> BatchContainer::Time, ::Diff), ReadItem<'a> = &'a (::Time, ::Diff)>; + Push<(::Time, ::Diff)> + + for<'a> BatchContainer = &'a (::Time, ::Diff), OwnedItem = (::Time, ::Diff)>; /// Container for offsets. - type OffsetContainer: BatchContainer; + type OffsetContainer: BatchContainer + Push; } /// A layout that uses vectors @@ -144,7 +144,7 @@ where /// Examples include types that implement `Clone` who prefer pub trait PreferredContainer : ToOwned { /// The preferred container for the type. - type Container: BatchContainer; + type Container: BatchContainer + Push; } impl PreferredContainer for T { @@ -195,7 +195,7 @@ where use std::convert::TryInto; use std::ops::Deref; use abomonation_derive::Abomonation; -use crate::trace::cursor::MyTrait; +use crate::trace::cursor::IntoOwned; /// A list of unsigned integers that uses `u32` elements as long as they are small enough, and switches to `u64` once they are not. #[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Abomonation)] @@ -293,7 +293,7 @@ impl PushInto for usize { } } -/// Helper struct to provide `MyTrait` for `Copy` types. +/// Helper struct to provide `IntoOwned` for `Copy` types. #[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Copy)] pub struct Wrapper(T); @@ -305,7 +305,7 @@ impl Deref for Wrapper { } } -impl<'a, T: Copy + Ord> MyTrait<'a> for Wrapper { +impl<'a, T: Copy + Ord> IntoOwned<'a> for Wrapper { type Owned = T; fn into_owned(self) -> Self::Owned { @@ -316,27 +316,21 @@ impl<'a, T: Copy + Ord> MyTrait<'a> for Wrapper { *other = self.0; } - fn compare(&self, other: &Self::Owned) -> Ordering { - self.0.cmp(other) + fn borrow_as(owned: &'a Self::Owned) -> Self { + Self(*owned) } +} - fn borrow_as(other: &'a Self::Owned) -> Self { - Self(*other) +impl Push for OffsetList { + fn push(&mut self, item: usize) { + self.push(item); } } impl BatchContainer for OffsetList { - type PushItem = usize; + type OwnedItem = usize; type ReadItem<'a> = Wrapper; - fn push(&mut self, item: Self::PushItem) { - self.push(item); - } - - fn copy_push(&mut self, item: &Self::PushItem) { - self.push(*item); - } - fn copy(&mut self, item: Self::ReadItem<'_>) { self.push(item.0); } @@ -454,11 +448,11 @@ where } fn key_eq(this: &&K::Owned, other: <::Container as BatchContainer>::ReadItem<'_>) -> bool { - other.equals(this) + other.eq(&<<::Container as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(this)) } fn val_eq(this: &&V::Owned, other: <::Container as BatchContainer>::ReadItem<'_>) -> bool { - other.equals(this) + other.eq(&<<::Container as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(this)) } } @@ -471,24 +465,27 @@ pub mod containers { use timely::container::PushInto; use std::borrow::ToOwned; - use crate::trace::MyTrait; + use crate::trace::IntoOwned; + + /// Supports the ability to receive an item of type `T`. + pub trait Push { + /// Pushes the item into `self`. + fn push(&mut self, item: T); + } + + impl Push for TimelyStack { + fn push(&mut self, item: T) { + self.copy(&item); + } + } /// A general-purpose container resembling `Vec`. pub trait BatchContainer: 'static { - /// The type of contained item. - /// - /// The container only supplies references to the item, so it needn't be sized. - type PushItem; + /// An type that all `Self::ReadItem<'_>` can be converted into. + type OwnedItem; /// The type that can be read back out of the container. - type ReadItem<'a>: Copy + MyTrait<'a, Owned = Self::PushItem> + for<'b> PartialOrd>; - /// Inserts an owned item. - fn push(&mut self, item: Self::PushItem) { - self.copy_push(&item); - } - /// Inserts an owned item. - fn copy_push(&mut self, item: &Self::PushItem) { - self.copy(MyTrait::borrow_as(item)); - } + type ReadItem<'a>: Copy + IntoOwned<'a, Owned = Self::OwnedItem> + Ord + for<'b> PartialOrd>; + /// Inserts a borrowed item. fn copy(&mut self, item: Self::ReadItem<'_>); /// Extends from a range of items in another`Self`. @@ -563,18 +560,18 @@ pub mod containers { } } + impl Push for Vec { + fn push(&mut self, item: T) { + self.push(item); + } + } + // All `T: Clone` also implement `ToOwned`, but without the constraint Rust // struggles to understand why the owned type must be `T` (i.e. the one blanket impl). impl BatchContainer for Vec { - type PushItem = T; - type ReadItem<'a> = &'a Self::PushItem; + type OwnedItem = T; + type ReadItem<'a> = &'a Self::OwnedItem; - fn push(&mut self, item: T) { - self.push(item); - } - fn copy_push(&mut self, item: &T) { - self.copy(item); - } fn copy(&mut self, item: &T) { self.push(item.clone()); } @@ -598,12 +595,9 @@ pub mod containers { // The `ToOwned` requirement exists to satisfy `self.reserve_items`, who must for now // be presented with the actual contained type, rather than a type that borrows into it. impl + 'static> BatchContainer for TimelyStack { - type PushItem = T; - type ReadItem<'a> = &'a Self::PushItem; + type OwnedItem = T; + type ReadItem<'a> = &'a Self::OwnedItem; - fn copy_push(&mut self, item: &Self::PushItem) { - self.copy(item); - } fn copy(&mut self, item: &T) { self.copy(item); } @@ -653,21 +647,22 @@ pub mod containers { } } - impl BatchContainer for SliceContainer - where - B: Ord + Clone + Sized + 'static, - { - type PushItem = Vec; - type ReadItem<'a> = &'a [B]; + impl Push> for SliceContainer { fn push(&mut self, item: Vec) { for x in item.into_iter() { self.inner.push(x); } self.offsets.push(self.inner.len()); } - fn copy_push(&mut self, item: &Vec) { - self.copy(&item[..]); - } + } + + impl BatchContainer for SliceContainer + where + B: Ord + Clone + Sized + 'static, + { + type OwnedItem = Vec; + type ReadItem<'a> = &'a [B]; + fn copy(&mut self, item: Self::ReadItem<'_>) { for x in item.iter() { self.inner.copy(x); diff --git a/src/trace/implementations/option_container.rs b/src/trace/implementations/option_container.rs index 18f93de74..a6d2eff03 100644 --- a/src/trace/implementations/option_container.rs +++ b/src/trace/implementations/option_container.rs @@ -1,6 +1,6 @@ //! A container optimized for identical contents. -use crate::trace::cursor::MyTrait; +use crate::trace::cursor::IntoOwned; use crate::trace::implementations::BatchContainer; /// A container that effectively represents default values. @@ -14,15 +14,13 @@ pub struct OptionContainer { container: C, } -impl BatchContainer for OptionContainer +use crate::trace::implementations::containers::Push; +impl Push for OptionContainer where - C: BatchContainer, - C::PushItem: Default + Ord, + C: BatchContainer + Push, + C::OwnedItem: Default + Ord, { - type PushItem = C::PushItem; - type ReadItem<'a> = OptionWrapper<'a, C>; - - fn push(&mut self, item: Self::PushItem) { + fn push(&mut self, item: C::OwnedItem) { if item == Default::default() && self.container.is_empty() { self.defaults += 1; } @@ -30,8 +28,18 @@ where self.container.push(item) } } +} + +impl BatchContainer for OptionContainer +where + C: BatchContainer , + C::OwnedItem: Default + Ord, +{ + type OwnedItem = C::OwnedItem; + type ReadItem<'a> = OptionWrapper<'a, C>; + fn copy<'a>(&mut self, item: Self::ReadItem<'a>) { - if item.equals(&Default::default()) && self.container.is_empty() { + if item.eq(&IntoOwned::borrow_as(&Default::default())) && self.container.is_empty() { self.defaults += 1; } else { @@ -39,7 +47,7 @@ where self.container.copy(item); } else { - self.container.push(Default::default()); + self.container.copy(IntoOwned::borrow_as(&Default::default())); } } } @@ -82,22 +90,22 @@ impl<'a, C: BatchContainer> Clone for OptionWrapper<'a, C> { use std::cmp::Ordering; impl<'a, 'b, C: BatchContainer> PartialEq> for OptionWrapper<'b, C> where - C::PushItem: Default + Ord, + C::OwnedItem: Default + Ord, { fn eq(&self, other: &OptionWrapper<'a, C>) -> bool { match (&self.inner, &other.inner) { (None, None) => true, - (None, Some(item2)) => item2.equals(&Default::default()), - (Some(item1), None) => item1.equals(&Default::default()), + (None, Some(item2)) => item2.eq(& as IntoOwned>::borrow_as(&Default::default())), + (Some(item1), None) => item1.eq(& as IntoOwned>::borrow_as(&Default::default())), (Some(item1), Some(item2)) => item1.eq(item2) } } } -impl<'a, C: BatchContainer> Eq for OptionWrapper<'a, C> where -C::PushItem: Default + Ord -{ } + +impl<'a, C: BatchContainer> Eq for OptionWrapper<'a, C> where C::OwnedItem: Default + Ord { } + impl<'a, 'b, C: BatchContainer> PartialOrd> for OptionWrapper<'b, C> where -C::PushItem: Default + Ord, +C::OwnedItem: Default + Ord, { fn partial_cmp(&self, other: &OptionWrapper<'a, C>) -> Option { let default = Default::default(); @@ -110,7 +118,7 @@ C::PushItem: Default + Ord, } } impl<'a, C: BatchContainer> Ord for OptionWrapper<'a, C> where -C::PushItem: Default + Ord, +C::OwnedItem: Default + Ord, { fn cmp(&self, other: &Self) -> Ordering { self.partial_cmp(other).unwrap() @@ -118,11 +126,11 @@ C::PushItem: Default + Ord, } -impl<'a, C: BatchContainer> MyTrait<'a> for OptionWrapper<'a, C> +impl<'a, C: BatchContainer> IntoOwned<'a> for OptionWrapper<'a, C> where - C::PushItem : Default + Ord, + C::OwnedItem : Default + Ord, { - type Owned = C::PushItem; + type Owned = C::OwnedItem; fn into_owned(self) -> Self::Owned { self.inner.map(|r| r.into_owned()).unwrap_or_else(Default::default) @@ -135,17 +143,9 @@ where *other = Default::default(); } } - fn compare(&self, other: &Self::Owned) -> std::cmp::Ordering { - if let Some(item) = &self.inner { - item.compare(other) - } - else { - ::default().cmp(other) - } - } - fn borrow_as(other: &'a Self::Owned) -> Self { + fn borrow_as(owned: &'a Self::Owned) -> Self { Self { - inner: Some(<_>::borrow_as(other)) + inner: Some(IntoOwned::borrow_as(owned)) } } } diff --git a/src/trace/implementations/ord_neu.rs b/src/trace/implementations/ord_neu.rs index ddc8a4409..a83cf728e 100644 --- a/src/trace/implementations/ord_neu.rs +++ b/src/trace/implementations/ord_neu.rs @@ -72,9 +72,11 @@ mod val_batch { use timely::container::PushInto; use timely::progress::{Antichain, frontier::AntichainRef}; + use crate::trace::implementations::containers::Push; + use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger}; use crate::trace::implementations::{BatchContainer, BuilderInput}; - use crate::trace::cursor::MyTrait; + use crate::trace::cursor::IntoOwned; use super::{Layout, Update}; @@ -416,7 +418,7 @@ mod val_batch { if !self.update_stash.is_empty() { // If there is a single element, equal to a just-prior recorded update, // we push nothing and report an unincremented offset to encode this case. - if self.update_stash.len() == 1 && self.result.updates.last().map(|ud| self.update_stash.last().unwrap().equals(ud)).unwrap_or(false) { + if self.update_stash.len() == 1 && self.result.updates.last().map(|ud| self.update_stash.last().unwrap().eq(IntoOwned::borrow_as(ud))).unwrap_or(false) { // Just clear out update_stash, as we won't drain it here. self.update_stash.clear(); self.singletons += 1; @@ -620,9 +622,10 @@ mod key_batch { use timely::container::PushInto; use timely::progress::{Antichain, frontier::AntichainRef}; + use crate::trace::implementations::containers::Push; use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger}; use crate::trace::implementations::{BatchContainer, BuilderInput}; - use crate::trace::cursor::MyTrait; + use crate::trace::cursor::IntoOwned; use super::{Layout, Update}; diff --git a/src/trace/implementations/rhh.rs b/src/trace/implementations/rhh.rs index 60ed6afd4..b5cc93ffa 100644 --- a/src/trace/implementations/rhh.rs +++ b/src/trace/implementations/rhh.rs @@ -83,7 +83,6 @@ impl Hashable for &HashWrapper { mod val_batch { - use std::borrow::Borrow; use std::convert::TryInto; use std::marker::PhantomData; use abomonation_derive::Abomonation; @@ -92,9 +91,11 @@ mod val_batch { use crate::hashable::Hashable; + use crate::trace::implementations::containers::Push; + use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger}; use crate::trace::implementations::{BatchContainer, BuilderInput}; - use crate::trace::cursor::MyTrait; + use crate::trace::cursor::IntoOwned; use super::{Layout, Update, HashOrdered}; @@ -183,8 +184,8 @@ mod val_batch { /// If `offset` is specified, we will insert it at the appropriate location. If it is not specified, /// we leave `keys_offs` ready to receive it as the next `push`. This is so that builders that may /// not know the final offset at the moment of key insertion can prepare for receiving the offset. - fn insert_key(&mut self, key: &::Key, offset: Option) { - let desired = self.desired_location(key); + fn insert_key(&mut self, key: ::Key, offset: Option) { + let desired = self.desired_location(&key); // Were we to push the key now, it would be at `self.keys.len()`, so while that is wrong, // push additional blank entries in. while self.keys.len() < desired { @@ -196,7 +197,7 @@ mod val_batch { // Now we insert the key. Even if it is no longer the desired location because of contention. // If an offset has been supplied we insert it, and otherwise leave it for future determination. - self.keys.copy_push(key); + self.keys.push(key); if let Some(offset) = offset { self.keys_offs.push(offset); } @@ -320,8 +321,6 @@ mod val_batch { /// description description: Description<::Time>, - /// Owned key for copying into. - key_owned: <::Key as ToOwned>::Owned, /// Local stash of updates, to use for consolidation. /// /// We could emulate a `ChangeBatch` here, with related compaction smarts. @@ -375,7 +374,6 @@ mod val_batch { key_cursor2: 0, result: storage, description, - key_owned: Default::default(), update_stash: Vec::new(), singletons: 0, } @@ -451,8 +449,7 @@ mod val_batch { // If we have pushed any values, copy the key as well. if self.result.vals.len() > init_vals { - source.keys.index(cursor).clone_onto(&mut self.key_owned); - self.result.insert_key(&self.key_owned, Some(self.result.vals.len())); + self.result.insert_key(source.keys.index(cursor).into_owned(), Some(self.result.vals.len())); } } /// Merge the next key in each of `source1` and `source2` into `self`, updating the appropriate cursors. @@ -472,8 +469,7 @@ mod val_batch { let (lower1, upper1) = source1.values_for_key(self.key_cursor1); let (lower2, upper2) = source2.values_for_key(self.key_cursor2); if let Some(off) = self.merge_vals((source1, lower1, upper1), (source2, lower2, upper2)) { - source1.keys.index(self.key_cursor1).clone_onto(&mut self.key_owned); - self.result.insert_key(&self.key_owned, Some(off)); + self.result.insert_key(source1.keys.index(self.key_cursor1).into_owned(), Some(off)); } // Increment cursors in either case; the keys are merged. self.key_cursor1 += 1; @@ -578,7 +574,7 @@ mod val_batch { if !self.update_stash.is_empty() { // If there is a single element, equal to a just-prior recorded update, // we push nothing and report an unincremented offset to encode this case. - if self.update_stash.len() == 1 && self.result.updates.last().map(|l| l.equals(self.update_stash.last().unwrap())).unwrap_or(false) { + if self.update_stash.len() == 1 && self.result.updates.last().map(|l| l.eq(IntoOwned::borrow_as(self.update_stash.last().unwrap()))).unwrap_or(false) { // Just clear out update_stash, as we won't drain it here. self.update_stash.clear(); self.singletons += 1; @@ -805,7 +801,7 @@ mod val_batch { self.push_update(time, diff); val.push_into(&mut self.result.vals); // Insert the key, but with no specified offset. - self.result.insert_key(key.borrow(), None); + self.result.insert_key(key, None); } } } diff --git a/src/trace/mod.rs b/src/trace/mod.rs index 00b72fc1d..f73730a23 100644 --- a/src/trace/mod.rs +++ b/src/trace/mod.rs @@ -19,7 +19,7 @@ use timely::progress::{Antichain, frontier::AntichainRef}; use timely::progress::Timestamp; use crate::logging::DifferentialEvent; -use crate::trace::cursor::MyTrait; +use crate::trace::cursor::IntoOwned; use crate::difference::Semigroup; use crate::lattice::Lattice; // use ::difference::Semigroup; @@ -52,11 +52,11 @@ pub type ExertionLogic = std::sync::Arc Fn(&'a [(usize, usize, usize pub trait TraceReader { /// Key by which updates are indexed. - type Key<'a>: Copy + Clone + MyTrait<'a, Owned = Self::KeyOwned>; + type Key<'a>: Copy + Clone + Ord + IntoOwned<'a, Owned = Self::KeyOwned>; /// Owned version of the above. type KeyOwned: Ord + Clone; /// Values associated with keys. - type Val<'a>: Copy + Clone + MyTrait<'a>; + type Val<'a>: Copy + Clone + IntoOwned<'a>; /// Timestamps associated with updates type Time: Timestamp + Lattice + Ord + Clone; /// Associated update. @@ -258,11 +258,11 @@ where Self: ::std::marker::Sized, { /// Key by which updates are indexed. - type Key<'a>: Copy + Clone + MyTrait<'a, Owned = Self::KeyOwned>; + type Key<'a>: Copy + Clone + Ord + IntoOwned<'a, Owned = Self::KeyOwned>; /// Owned version of the above. type KeyOwned: Ord + Clone; /// Values associated with keys. - type Val<'a>: Copy + Clone + MyTrait<'a>; + type Val<'a>: Copy + Clone + IntoOwned<'a>; /// Timestamps associated with updates type Time: Timestamp + Lattice + Ord + Clone; /// Associated update.