Skip to content
This repository has been archived by the owner on Oct 30, 2024. It is now read-only.

Commit

Permalink
Further clean-up (TimelyDataflow#473)
Browse files Browse the repository at this point in the history
  • Loading branch information
frankmcsherry authored Apr 13, 2024
1 parent 37c505d commit c564e8f
Show file tree
Hide file tree
Showing 10 changed files with 41 additions and 46 deletions.
4 changes: 2 additions & 2 deletions src/operators/arrange/arrangement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ where
T2::ValOwned: Data,
T2::Diff: Abelian,
T2::Batch: Batch,
T2::Builder: Builder<Output=T2::Batch, Item = ((T1::KeyOwned, T2::ValOwned), T2::Time, T2::Diff)>,
T2::Builder: Builder<Input = ((T1::KeyOwned, T2::ValOwned), T2::Time, T2::Diff)>,
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(<T2::Cursor as Cursor>::ValOwned, T2::Diff)>)+'static,
{
self.reduce_core::<_,T2>(name, move |key, input, output, change| {
Expand All @@ -462,7 +462,7 @@ where
T2: for<'a> Trace<Key<'a>=T1::Key<'a>, Time=T1::Time>+'static,
T2::ValOwned: Data,
T2::Batch: Batch,
T2::Builder: Builder<Output=T2::Batch, Item = ((T1::KeyOwned, T2::ValOwned), T2::Time, T2::Diff)>,
T2::Builder: Builder<Input = ((T1::KeyOwned, T2::ValOwned), T2::Time, T2::Diff)>,
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(<T2::Cursor as Cursor>::ValOwned,T2::Diff)>, &mut Vec<(<T2::Cursor as Cursor>::ValOwned, T2::Diff)>)+'static,
{
use crate::operators::reduce::reduce_trace;
Expand Down
2 changes: 1 addition & 1 deletion src/operators/arrange/upsert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ where
Tr::ValOwned: ExchangeData,
Tr::Time: TotalOrder+ExchangeData,
Tr::Batch: Batch,
Tr::Builder: Builder<Item = ((Tr::KeyOwned, Tr::ValOwned), Tr::Time, Tr::Diff)>,
Tr::Builder: Builder<Input = ((Tr::KeyOwned, Tr::ValOwned), Tr::Time, Tr::Diff)>,
{
let mut reader: Option<TraceAgent<Tr>> = None;

Expand Down
5 changes: 2 additions & 3 deletions src/operators/consolidate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::difference::Semigroup;

use crate::Data;
use crate::lattice::Lattice;
use crate::trace::{Batcher, Builder};
use crate::trace::Batcher;

/// Methods which require data be arrangeable.
impl<G, D, R> Collection<G, D, R>
Expand Down Expand Up @@ -53,8 +53,7 @@ where
where
Tr: crate::trace::Trace<KeyOwned = D,ValOwned = (),Time=G::Timestamp,Diff=R>+'static,
Tr::Batch: crate::trace::Batch,
Tr::Batcher: Batcher<Input=Vec<((D,()),G::Timestamp,R)>, Item = ((D,()),G::Timestamp,R), Time = G::Timestamp>,
Tr::Builder: Builder<Item = ((D,()),G::Timestamp,R), Time = G::Timestamp>,
Tr::Batcher: Batcher<Input=Vec<((D,()),G::Timestamp,R)>>,
{
use crate::operators::arrange::arrangement::Arrange;
use crate::trace::cursor::MyTrait;
Expand Down
8 changes: 4 additions & 4 deletions src/operators/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ pub trait ReduceCore<G: Scope, K: ToOwned + ?Sized, V: ToOwned + ?Sized, R: Semi
T2::ValOwned: Data,
T2::Diff: Abelian,
T2::Batch: Batch,
T2::Builder: Builder<Output=T2::Batch, Item = ((K::Owned, T2::ValOwned), T2::Time, T2::Diff)>,
T2::Builder: Builder<Input = ((K::Owned, T2::ValOwned), T2::Time, T2::Diff)>,
L: FnMut(&K, &[(&V, R)], &mut Vec<(<T2::Cursor as Cursor>::ValOwned, T2::Diff)>)+'static,
{
self.reduce_core::<_,T2>(name, move |key, input, output, change| {
Expand All @@ -273,7 +273,7 @@ pub trait ReduceCore<G: Scope, K: ToOwned + ?Sized, V: ToOwned + ?Sized, R: Semi
T2: for<'a> Trace<Key<'a>=&'a K, Time=G::Timestamp>+'static,
T2::ValOwned: Data,
T2::Batch: Batch,
T2::Builder: Builder<Output=T2::Batch, Item = ((K::Owned, T2::ValOwned), T2::Time, T2::Diff)>,
T2::Builder: Builder<Input = ((K::Owned, T2::ValOwned), T2::Time, T2::Diff)>,
L: FnMut(&K, &[(&V, R)], &mut Vec<(<T2::Cursor as Cursor>::ValOwned,T2::Diff)>, &mut Vec<(<T2::Cursor as Cursor>::ValOwned, T2::Diff)>)+'static,
;
}
Expand All @@ -292,7 +292,7 @@ where
T2::ValOwned: Data,
T2: for<'a> Trace<Key<'a>=&'a K, Time=G::Timestamp>+'static,
T2::Batch: Batch,
T2::Builder: Builder<Output=T2::Batch, Item = ((K::Owned, T2::ValOwned), T2::Time, T2::Diff)>,
T2::Builder: Builder<Input = ((K::Owned, T2::ValOwned), T2::Time, T2::Diff)>,
L: FnMut(&K, &[(&V, R)], &mut Vec<(<T2::Cursor as Cursor>::ValOwned,T2::Diff)>, &mut Vec<(<T2::Cursor as Cursor>::ValOwned, T2::Diff)>)+'static,
{
self.arrange_by_key_named(&format!("Arrange: {}", name))
Expand All @@ -310,7 +310,7 @@ where
T2: for<'a> Trace<Key<'a>=T1::Key<'a>, Time=T1::Time> + 'static,
T2::ValOwned: Data,
T2::Batch: Batch,
T2::Builder: Builder<Output=T2::Batch, Item = ((T1::KeyOwned, T2::ValOwned), T2::Time, T2::Diff)>,
T2::Builder: Builder<Input = ((T1::KeyOwned, T2::ValOwned), T2::Time, T2::Diff)>,
L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwned,T2::Diff)>, &mut Vec<(T2::ValOwned, T2::Diff)>)+'static,
{
let mut result_trace = None;
Expand Down
8 changes: 4 additions & 4 deletions src/trace/implementations/merge_batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ where
T: Timestamp,
D: Semigroup,
{
type Input = Vec<Self::Item>;
type Item = ((K,V),T,D);
type Input = Vec<((K,V),T,D)>;
type Output = ((K,V),T,D);
type Time = T;

fn new(logger: Option<Logger<DifferentialEvent, WorkerIdentifier>>, operator_id: usize) -> Self {
Expand All @@ -38,7 +38,7 @@ where
}

#[inline(never)]
fn push_batch(&mut self, batch: RefOrMut<Vec<Self::Item>>) {
fn push_batch(&mut self, batch: RefOrMut<Self::Input>) {
// `batch` is either a shared reference or an owned allocations.
match batch {
RefOrMut::Ref(reference) => {
Expand All @@ -59,7 +59,7 @@ where
// which we call `lower`, by assumption that after sealing a batcher we receive no more
// updates with times not greater or equal to `upper`.
#[inline(never)]
fn seal<B: Builder<Item=Self::Item, Time=Self::Time>>(&mut self, upper: Antichain<T>) -> B::Output {
fn seal<B: Builder<Input=Self::Output, Time=Self::Time>>(&mut self, upper: Antichain<T>) -> B::Output {

let mut merged = Vec::new();
self.sorter.finish_into(&mut merged);
Expand Down
8 changes: 4 additions & 4 deletions src/trace/implementations/merge_batcher_col.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ where
T: Columnation + Timestamp + 'static,
D: Columnation + Semigroup + 'static,
{
type Input = Vec<Self::Item>;
type Item = ((K,V),T,D);
type Input = Vec<((K,V),T,D)>;
type Output = ((K,V),T,D);
type Time = T;

fn new(logger: Option<Logger<DifferentialEvent, WorkerIdentifier>>, operator_id: usize) -> Self {
Expand All @@ -44,7 +44,7 @@ where
}

#[inline]
fn push_batch(&mut self, batch: RefOrMut<Vec<Self::Item>>) {
fn push_batch(&mut self, batch: RefOrMut<Self::Input>) {
// `batch` is either a shared reference or an owned allocations.
match batch {
RefOrMut::Ref(reference) => {
Expand All @@ -63,7 +63,7 @@ where
// which we call `lower`, by assumption that after sealing a batcher we receive no more
// updates with times not greater or equal to `upper`.
#[inline]
fn seal<B: Builder<Item=Self::Item, Time=Self::Time>>(&mut self, upper: Antichain<T>) -> B::Output {
fn seal<B: Builder<Input=Self::Output, Time=Self::Time>>(&mut self, upper: Antichain<T>) -> B::Output {

let mut merged = Default::default();
self.sorter.finish_into(&mut merged);
Expand Down
12 changes: 6 additions & 6 deletions src/trace/implementations/ord_neu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,7 @@ mod val_batch {

impl<L: Layout> Builder for OrdValBuilder<L> {

type Item = ((<L::Target as Update>::Key, <L::Target as Update>::Val), <L::Target as Update>::Time, <L::Target as Update>::Diff);
type Input = ((<L::Target as Update>::Key, <L::Target as Update>::Val), <L::Target as Update>::Time, <L::Target as Update>::Diff);
type Time = <L::Target as Update>::Time;
type Output = OrdValBatch<L>;

Expand All @@ -560,7 +560,7 @@ mod val_batch {
}

#[inline]
fn push(&mut self, ((key, val), time, diff): Self::Item) {
fn push(&mut self, ((key, val), time, diff): Self::Input) {

// Perhaps this is a continuation of an already received key.
if self.result.keys.last().map(|k| k.equals(&key)).unwrap_or(false) {
Expand All @@ -586,7 +586,7 @@ mod val_batch {
}

#[inline]
fn copy(&mut self, ((key, val), time, diff): &Self::Item) {
fn copy(&mut self, ((key, val), time, diff): &Self::Input) {

// Perhaps this is a continuation of an already received key.
if self.result.keys.last().map(|k| k.equals(key)).unwrap_or(false) {
Expand Down Expand Up @@ -1006,7 +1006,7 @@ mod key_batch {

impl<L: Layout> Builder for OrdKeyBuilder<L> {

type Item = ((<L::Target as Update>::Key, ()), <L::Target as Update>::Time, <L::Target as Update>::Diff);
type Input = ((<L::Target as Update>::Key, ()), <L::Target as Update>::Time, <L::Target as Update>::Diff);
type Time = <L::Target as Update>::Time;
type Output = OrdKeyBatch<L>;

Expand All @@ -1024,7 +1024,7 @@ mod key_batch {
}

#[inline]
fn push(&mut self, ((key, ()), time, diff): Self::Item) {
fn push(&mut self, ((key, ()), time, diff): Self::Input) {

// Perhaps this is a continuation of an already received key.
if self.result.keys.last().map(|k| k.equals(&key)).unwrap_or(false) {
Expand All @@ -1040,7 +1040,7 @@ mod key_batch {
}

#[inline]
fn copy(&mut self, ((key, ()), time, diff): &Self::Item) {
fn copy(&mut self, ((key, ()), time, diff): &Self::Input) {

// Perhaps this is a continuation of an already received key.
if self.result.keys.last().map(|k| k.equals(key)).unwrap_or(false) {
Expand Down
6 changes: 3 additions & 3 deletions src/trace/implementations/rhh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -731,7 +731,7 @@ mod val_batch {
<L::Target as Update>::Key: Default + HashOrdered,
// RhhValBatch<L>: Batch<Key=<L::Target as Update>::Key, Val=<L::Target as Update>::Val, Time=<L::Target as Update>::Time, Diff=<L::Target as Update>::Diff>,
{
type Item = ((<L::Target as Update>::Key, <L::Target as Update>::Val), <L::Target as Update>::Time, <L::Target as Update>::Diff);
type Input = ((<L::Target as Update>::Key, <L::Target as Update>::Val), <L::Target as Update>::Time, <L::Target as Update>::Diff);
type Time = <L::Target as Update>::Time;
type Output = RhhValBatch<L>;

Expand Down Expand Up @@ -763,7 +763,7 @@ mod val_batch {
}

#[inline]
fn push(&mut self, ((key, val), time, diff): Self::Item) {
fn push(&mut self, ((key, val), time, diff): Self::Input) {

// Perhaps this is a continuation of an already received key.
if self.result.keys.last().map(|k| k.equals(&key)).unwrap_or(false) {
Expand All @@ -790,7 +790,7 @@ mod val_batch {
}

#[inline]
fn copy(&mut self, ((key, val), time, diff): &Self::Item) {
fn copy(&mut self, ((key, val), time, diff): &Self::Input) {

// Perhaps this is a continuation of an already received key.
if self.result.keys.last().map(|k| k.equals(key)).unwrap_or(false) {
Expand Down
2 changes: 1 addition & 1 deletion src/trace/implementations/spine_fueled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ impl<B, BA, BU> Trace for Spine<B, BA, BU>
where
B: Batch+Clone+'static,
BA: Batcher<Time = B::Time>,
BU: Builder<Item=BA::Item, Time=BA::Time, Output = B>,
BU: Builder<Input=BA::Output, Time=BA::Time, Output = B>,
{
/// A type used to assemble batches from disordered updates.
type Batcher = BA;
Expand Down
32 changes: 14 additions & 18 deletions src/trace/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ where <Self as TraceReader>::Batch: Batch {
/// A type used to assemble batches from disordered updates.
type Batcher: Batcher<Time = Self::Time>;
/// A type used to assemble batches from ordered update sequences.
type Builder: Builder<Item=<Self::Batcher as Batcher>::Item, Time=Self::Time, Output = Self::Batch>;
type Builder: Builder<Input=<Self::Batcher as Batcher>::Output, Time=Self::Time, Output = Self::Batch>;

/// Allocates a new empty trace.
fn new(
Expand Down Expand Up @@ -306,26 +306,26 @@ pub trait Batch : BatchReader where Self: ::std::marker::Sized {

/// Functionality for collecting and batching updates.
pub trait Batcher {
/// Type of update pushed into the batcher.
/// Type pushed into the batcher.
type Input;
/// Type of update pushed into the builder.
type Item;
/// Type produced by the batcher.
type Output;
/// Times at which batches are formed.
type Time: Timestamp;
/// Allocates a new empty batcher.
fn new(logger: Option<Logger<DifferentialEvent, WorkerIdentifier>>, operator_id: usize) -> Self;
/// Adds an unordered batch of elements to the batcher.
fn push_batch(&mut self, batch: RefOrMut<Self::Input>);
/// Returns all updates not greater or equal to an element of `upper`.
fn seal<B: Builder<Item=Self::Item, Time=Self::Time>>(&mut self, upper: Antichain<Self::Time>) -> B::Output;
fn seal<B: Builder<Input=Self::Output, Time=Self::Time>>(&mut self, upper: Antichain<Self::Time>) -> B::Output;
/// Returns the lower envelope of contained update times.
fn frontier(&mut self) -> timely::progress::frontier::AntichainRef<Self::Time>;
}

/// Functionality for building batches from ordered update sequences.
pub trait Builder: Sized {
/// Input item type.
type Item;
type Input;
/// Timestamp type.
type Time: Timestamp;
/// Output batch type.
Expand All @@ -344,15 +344,11 @@ pub trait Builder: Sized {
///
/// The default implementation uses `self.copy` with references to the owned arguments.
/// One should override it if the builder can take advantage of owned arguments.
fn push(&mut self, element: Self::Item) {
fn push(&mut self, element: Self::Input) {
self.copy(&element);
}
/// Adds an element to the batch.
fn copy(&mut self, element: &Self::Item);
/// Adds an ordered sequence of elements to the batch.
fn extend<I: Iterator<Item=Self::Item>>(&mut self, iter: I) {
for item in iter { self.push(item); }
}
fn copy(&mut self, element: &Self::Input);
/// Completes building and returns the batch.
fn done(self, lower: Antichain<Self::Time>, upper: Antichain<Self::Time>, since: Antichain<Self::Time>) -> Self::Output;
}
Expand Down Expand Up @@ -460,12 +456,12 @@ pub mod rc_blanket_impls {

/// Functionality for building batches from ordered update sequences.
impl<B: Builder> Builder for RcBuilder<B> {
type Item = B::Item;
type Input = B::Input;
type Time = B::Time;
type Output = Rc<B::Output>;
fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self { RcBuilder { builder: B::with_capacity(keys, vals, upds) } }
fn push(&mut self, element: Self::Item) { self.builder.push(element) }
fn copy(&mut self, element: &Self::Item) { self.builder.copy(element) }
fn push(&mut self, element: Self::Input) { self.builder.push(element) }
fn copy(&mut self, element: &Self::Input) { self.builder.copy(element) }
fn done(self, lower: Antichain<Self::Time>, upper: Antichain<Self::Time>, since: Antichain<Self::Time>) -> Rc<B::Output> { Rc::new(self.builder.done(lower, upper, since)) }
}

Expand Down Expand Up @@ -569,12 +565,12 @@ pub mod abomonated_blanket_impls {
where
B::Output: Abomonation,
{
type Item = B::Item;
type Input = B::Input;
type Time = B::Time;
type Output = Abomonated<B::Output, Vec<u8>>;
fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self { AbomonatedBuilder { builder: B::with_capacity(keys, vals, upds) } }
fn push(&mut self, element: Self::Item) { self.builder.push(element) }
fn copy(&mut self, element: &Self::Item) { self.builder.copy(element) }
fn push(&mut self, element: Self::Input) { self.builder.push(element) }
fn copy(&mut self, element: &Self::Input) { self.builder.copy(element) }
fn done(self, lower: Antichain<Self::Time>, upper: Antichain<Self::Time>, since: Antichain<Self::Time>) -> Self::Output {
let batch = self.builder.done(lower, upper, since);
let mut bytes = Vec::with_capacity(measure(&batch));
Expand Down

0 comments on commit c564e8f

Please sign in to comment.