diff --git a/.github/workflows/deploy.yml b/.github/workflows/deploy.yml index babc088a8..41c27604b 100644 --- a/.github/workflows/deploy.yml +++ b/.github/workflows/deploy.yml @@ -9,7 +9,7 @@ jobs: deploy: runs-on: ubuntu-22.04 steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - run: cargo install mdbook --version 0.4.31 - run: cd mdbook && mdbook build - uses: JamesIves/github-pages-deploy-action@v4 diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index e4643725d..e3c9b6087 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -16,7 +16,7 @@ jobs: toolchain: - stable - 1.72 - name: cargo test on ${{ matrix.os }} + name: cargo test on ${{ matrix.os }}, rust ${{ matrix.toolchain }} runs-on: ${{ matrix.os }}-latest steps: - uses: actions/checkout@v4 @@ -24,7 +24,7 @@ jobs: with: toolchain: ${{ matrix.toolchain }} - name: Cargo test - run: cargo test + run: cargo test --workspace --all-targets # Check formatting with rustfmt mdbook: diff --git a/doop/src/main.rs b/doop/src/main.rs index 6628a7e30..d6ad19c19 100644 --- a/doop/src/main.rs +++ b/doop/src/main.rs @@ -1,4 +1,4 @@ -#![allow(non_snake_case)] +#![allow(non_snake_case, dead_code)] use std::collections::HashMap; use std::rc::Rc; @@ -145,7 +145,7 @@ fn load<'a>(filename: &str, interner: Rc>) -> impl Itera }) } -fn load1<'a>(index: usize, prefix: &str, filename: &str, interner: Rc>) -> impl Iterator+'a { +fn load1<'a>(index: usize, prefix: &str, filename: &str, interner: Rc>) -> impl Iterator+'a { read_file(&format!("{}{}", prefix, filename)) .filter(move |_| index == 0) .map(move |line| { @@ -791,7 +791,7 @@ fn main() { let SupertypeOf = SupertypeOf.enter(scope); // Required by all - let mut Reachable = Relation::<_,(Method)>::new(scope); + let mut Reachable = Relation::<_,Method>::new(scope); // NOTE: Common subexpression. let Reachable_Invocation = @@ -805,7 +805,7 @@ fn main() { // let Reachable = ReachableFinal.clone(); // Class initialization - let mut InitializedClass = Relation::<_,(Type)>::new(scope); + let mut InitializedClass = Relation::<_,Type>::new(scope); // ClassInitializer(?type, ?method) :- basic.MethodImplemented("", "void()", ?type, ?method). let temp1 = interner.borrow_mut().intern(""); diff --git a/examples/accumulate.rs b/examples/accumulate.rs index 4693431e4..0fa792926 100644 --- a/examples/accumulate.rs +++ b/examples/accumulate.rs @@ -16,7 +16,14 @@ fn main() { let mut input = worker.dataflow::<(), _, _>(|scope| { let (input, data) = scope.new_collection::<_, isize>(); - data.consolidate(); + + use timely::dataflow::Scope; + scope.iterative::(|inner| { + data.enter_at(inner, |_| 0) + .consolidate() + .leave() + }); + input }); diff --git a/experiments/src/bin/graphs-interactive-alt.rs b/experiments/src/bin/graphs-interactive-alt.rs index 55c8a8cf6..18dc0923f 100644 --- a/experiments/src/bin/graphs-interactive-alt.rs +++ b/experiments/src/bin/graphs-interactive-alt.rs @@ -1,3 +1,5 @@ +#![allow(dead_code)] + use rand::{Rng, SeedableRng, StdRng}; use timely::dataflow::*; @@ -389,4 +391,4 @@ where G::Timestamp: Lattice { .concat(&prop) .reduce(|_, s, t| { t.push((*s[0].0, 1)); }) }) -} \ No newline at end of file +} diff --git a/experiments/src/bin/graphs-interactive-neu.rs b/experiments/src/bin/graphs-interactive-neu.rs index a01f23d97..6210113ed 100644 --- a/experiments/src/bin/graphs-interactive-neu.rs +++ b/experiments/src/bin/graphs-interactive-neu.rs @@ -1,3 +1,5 @@ +#![allow(dead_code)] + use rand::{Rng, SeedableRng, StdRng}; use timely::dataflow::*; @@ -392,4 +394,4 @@ where G::Timestamp: Lattice+Ord { reached.leave() }) -} \ No newline at end of file +} diff --git a/interactive/src/command.rs b/interactive/src/command.rs index c31963afe..d795ec13a 100644 --- a/interactive/src/command.rs +++ b/interactive/src/command.rs @@ -151,7 +151,7 @@ where println!("\tTimely logging connection {} of {}", index, number); let socket = listener.incoming().next().unwrap().unwrap(); socket.set_nonblocking(true).expect("failed to set nonblocking"); - streams.push(EventReader::::new(socket)); + streams.push(EventReader::,_>::new(socket)); } println!("\tAll logging connections established"); @@ -174,7 +174,7 @@ where for _ in 0 .. number { let socket = listener.incoming().next().unwrap().unwrap(); socket.set_nonblocking(true).expect("failed to set nonblocking"); - streams.push(EventReader::::new(socket)); + streams.push(EventReader::,_>::new(socket)); } } crate::logging::publish_differential_logging(manager, worker, granularity, &name_as, streams); @@ -195,4 +195,4 @@ where pub fn serialize_into(&self, writer: W) { bincode::serialize_into(writer, self).expect("bincode: serialization failed"); } -} \ No newline at end of file +} diff --git a/interactive/src/logging.rs b/interactive/src/logging.rs index cfc524660..be07fa4b6 100644 --- a/interactive/src/logging.rs +++ b/interactive/src/logging.rs @@ -30,7 +30,7 @@ where V: ExchangeData+Hash+LoggingValue+Datum, A: Allocate, I : IntoIterator, - ::Item: EventIterator+'static + ::Item: EventIterator>+'static { let (operates, channels, schedule, messages, shutdown, park, text) = worker.dataflow(move |scope| { @@ -217,7 +217,7 @@ where V: ExchangeData+Hash+LoggingValue+Datum, A: Allocate, I : IntoIterator, - ::Item: EventIterator+'static + ::Item: EventIterator>+'static { let (merge,batch) = worker.dataflow(move |scope| { @@ -280,4 +280,4 @@ where manager.traces.set_unkeyed(&Plan::Source(format!("logs/{}/differential/arrange/batch", name)), &batch); manager.traces.set_unkeyed(&Plan::Source(format!("logs/{}/differential/arrange/merge", name)), &merge); -} \ No newline at end of file +} diff --git a/src/collection.rs b/src/collection.rs index aeb6aad38..2e82c7e62 100644 --- a/src/collection.rs +++ b/src/collection.rs @@ -322,20 +322,16 @@ impl Collection where G::Timestamp: Da /// data.assert_eq(&result); /// }); /// ``` - pub fn enter_at<'a, T, F>(&self, child: &Iterative<'a, G, T>, initial: F) -> Collection, D, R> + pub fn enter_at<'a, T, F>(&self, child: &Iterative<'a, G, T>, mut initial: F) -> Collection, D, R> where T: Timestamp+Hash, F: FnMut(&D) -> T + Clone + 'static, G::Timestamp: Hash, { - - let mut initial1 = initial.clone(); - let mut initial2 = initial.clone(); - self.inner - .enter_at(child, move |x| initial1(&x.0)) + .enter(child) .map(move |(data, time, diff)| { - let new_time = Product::new(time, initial2(&data)); + let new_time = Product::new(time, initial(&data)); (data, new_time, diff) }) .as_collection() diff --git a/src/operators/arrange/arrangement.rs b/src/operators/arrange/arrangement.rs index 031e587b9..9856e38e0 100644 --- a/src/operators/arrange/arrangement.rs +++ b/src/operators/arrange/arrangement.rs @@ -19,7 +19,7 @@ use timely::dataflow::operators::{Enter, Map}; use timely::order::PartialOrder; -use timely::dataflow::{Scope, Stream}; +use timely::dataflow::{Scope, Stream, StreamCore}; use timely::dataflow::operators::generic::Operator; use timely::dataflow::channels::pact::{ParallelizationContract, Pipeline, Exchange}; use timely::progress::Timestamp; @@ -444,7 +444,7 @@ where T2::ValOwned: Data, T2::Diff: Abelian, T2::Batch: Batch, - T2::Builder: Builder, + T2::Builder: Builder, L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(::ValOwned, T2::Diff)>)+'static, { self.reduce_core::<_,T2>(name, move |key, input, output, change| { @@ -462,7 +462,7 @@ where T2: for<'a> Trace=T1::Key<'a>, Time=T1::Time>+'static, T2::ValOwned: Data, T2::Batch: Batch, - T2::Builder: Builder, + T2::Builder: Builder, L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(::ValOwned,T2::Diff)>, &mut Vec<(::ValOwned, T2::Diff)>)+'static, { use crate::operators::reduce::reduce_trace; @@ -489,249 +489,252 @@ where } } -/// A type that can be arranged as if a collection of updates shaped as `((K,V),G::Timestamp,R)`. -/// -/// This trait is primarily implemented by `Collection`. -/// -/// The resulting arrangements may not present as `((K,V),T,R)`, as their output types are unconstrained. -/// This allows e.g. for `Vec` inputs to present as `&[u8]` when read, but that relationship is not -/// constrained by this trait. -pub trait Arrange +/// A type that can be arranged as if a collection of updates. +pub trait Arrange where G: Scope, G::Timestamp: Lattice, { - /// Arranges a stream of `(Key, Val)` updates by `Key`. - /// - /// This operator arranges a stream of values into a shared trace, whose contents it maintains. + /// Arranges updates into a shared trace. fn arrange(&self) -> Arranged> where Tr: Trace + 'static, - K: ExchangeData + Hashable, - V: ExchangeData, - R: ExchangeData, Tr::Batch: Batch, - Tr::Batcher: Batcher, - Tr::Builder: Builder, + Tr::Batcher: Batcher, { self.arrange_named("Arrange") } - /// Arranges a stream of `(Key, Val)` updates by `Key`, and presents with a `name` argument. - /// - /// This operator arranges a stream of values into a shared trace, whose contents it maintains. + /// Arranges updates into a shared trace, with a supplied name. fn arrange_named(&self, name: &str) -> Arranged> where Tr: Trace + 'static, - K: ExchangeData + Hashable, - V: ExchangeData, - R: ExchangeData, Tr::Batch: Batch, - Tr::Batcher: Batcher, - Tr::Builder: Builder, - { - let exchange = Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into()); - self.arrange_core(exchange, name) - } + Tr::Batcher: Batcher, + ; - /// Arranges a stream of `(Key, Val)` updates by `Key`, configured with a name and a parallelization contract. - /// - /// This operator arranges a stream of values into a shared trace, whose contents it maintains. - /// It uses the supplied parallelization contract to distribute the data, which does not need to - /// be consistently by key (though this is the most common). + /// Arranges updates into a shared trace, using a supplied parallelization contract, with a supplied name. fn arrange_core(&self, pact: P, name: &str) -> Arranged> where - P: ParallelizationContract, - K: Clone, - V: Clone, - R: Clone, + P: ParallelizationContract, Tr: Trace+'static, Tr::Batch: Batch, - Tr::Batcher: Batcher, - Tr::Builder: Builder, + Tr::Batcher: Batcher, ; } -impl Arrange for Collection +impl Arrange> for Collection where G: Scope, G::Timestamp: Lattice, - K: Clone + 'static, - V: Clone + 'static, - R: Semigroup, + K: ExchangeData + Hashable, + V: ExchangeData, + R: ExchangeData + Semigroup, { + fn arrange_named(&self, name: &str) -> Arranged> + where + Tr: Trace + 'static, + Tr::Batch: Batch, + Tr::Batcher: Batcher>, + { + let exchange = Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into()); + self.arrange_core(exchange, name) + } + fn arrange_core(&self, pact: P, name: &str) -> Arranged> where - P: ParallelizationContract, + P: ParallelizationContract>, Tr: Trace+'static, Tr::Batch: Batch, - Tr::Batcher: Batcher, - Tr::Builder: Builder, + Tr::Batcher: Batcher>, { - // The `Arrange` operator is tasked with reacting to an advancing input - // frontier by producing the sequence of batches whose lower and upper - // bounds are those frontiers, containing updates at times greater or - // equal to lower and not greater or equal to upper. - // - // The operator uses its batch type's `Batcher`, which accepts update - // triples and responds to requests to "seal" batches (presented as new - // upper frontiers). - // - // Each sealed batch is presented to the trace, and if at all possible - // transmitted along the outgoing channel. Empty batches may not have - // a corresponding capability, as they are only retained for actual data - // held by the batcher, which may prevents the operator from sending an - // empty batch. - - let mut reader: Option> = None; - - // fabricate a data-parallel operator using the `unary_notify` pattern. - let stream = { - - let reader = &mut reader; - - self.inner.unary_frontier(pact, name, move |_capability, info| { - - // Acquire a logger for arrange events. - let logger = { - let scope = self.scope(); - let register = scope.log_register(); - register.get::("differential/arrange") - }; - - // Where we will deposit received updates, and from which we extract batches. - let mut batcher = Tr::Batcher::new(logger.clone(), info.global_id); - - // Capabilities for the lower envelope of updates in `batcher`. - let mut capabilities = Antichain::>::new(); - - let activator = Some(self.scope().activator_for(&info.address[..])); - let mut empty_trace = Tr::new(info.clone(), logger.clone(), activator); - // If there is default exertion logic set, install it. - if let Some(exert_logic) = self.inner.scope().config().get::("differential/default_exert_logic").cloned() { - empty_trace.set_exert_logic(exert_logic); - } - - let (reader_local, mut writer) = TraceAgent::new(empty_trace, info, logger); - - *reader = Some(reader_local); - - // Initialize to the minimal input frontier. - let mut prev_frontier = Antichain::from_elem(::minimum()); - - move |input, output| { + arrange_core(&self.inner, pact, name) + } +} - // As we receive data, we need to (i) stash the data and (ii) keep *enough* capabilities. - // We don't have to keep all capabilities, but we need to be able to form output messages - // when we realize that time intervals are complete. +/// Arranges a stream of updates by a key, configured with a name and a parallelization contract. +/// +/// This operator arranges a stream of values into a shared trace, whose contents it maintains. +/// It uses the supplied parallelization contract to distribute the data, which does not need to +/// be consistently by key (though this is the most common). +fn arrange_core(stream: &StreamCore::Input>, pact: P, name: &str) -> Arranged> +where + G: Scope, + G::Timestamp: Lattice, + P: ParallelizationContract::Input>, + Tr: Trace+'static, + Tr::Batch: Batch, + ::Input: timely::Container, +{ + // The `Arrange` operator is tasked with reacting to an advancing input + // frontier by producing the sequence of batches whose lower and upper + // bounds are those frontiers, containing updates at times greater or + // equal to lower and not greater or equal to upper. + // + // The operator uses its batch type's `Batcher`, which accepts update + // triples and responds to requests to "seal" batches (presented as new + // upper frontiers). + // + // Each sealed batch is presented to the trace, and if at all possible + // transmitted along the outgoing channel. Empty batches may not have + // a corresponding capability, as they are only retained for actual data + // held by the batcher, which may prevents the operator from sending an + // empty batch. + + let mut reader: Option> = None; + + // fabricate a data-parallel operator using the `unary_notify` pattern. + let reader_ref = &mut reader; + let scope = stream.scope(); + + let stream = stream.unary_frontier(pact, name, move |_capability, info| { + + // Acquire a logger for arrange events. + let logger = { + let register = scope.log_register(); + register.get::("differential/arrange") + }; - input.for_each(|cap, data| { - capabilities.insert(cap.retain()); - batcher.push_batch(data); - }); + // Where we will deposit received updates, and from which we extract batches. + let mut batcher = Tr::Batcher::new(logger.clone(), info.global_id); - // The frontier may have advanced by multiple elements, which is an issue because - // timely dataflow currently only allows one capability per message. This means we - // must pretend to process the frontier advances one element at a time, batching - // and sending smaller bites than we might have otherwise done. + // Capabilities for the lower envelope of updates in `batcher`. + let mut capabilities = Antichain::>::new(); - // Assert that the frontier never regresses. - assert!(PartialOrder::less_equal(&prev_frontier.borrow(), &input.frontier().frontier())); + let activator = Some(scope.activator_for(&info.address[..])); + let mut empty_trace = Tr::new(info.clone(), logger.clone(), activator); + // If there is default exertion logic set, install it. + if let Some(exert_logic) = scope.config().get::("differential/default_exert_logic").cloned() { + empty_trace.set_exert_logic(exert_logic); + } - // Test to see if strict progress has occurred, which happens whenever the new - // frontier isn't equal to the previous. It is only in this case that we have any - // data processing to do. - if prev_frontier.borrow() != input.frontier().frontier() { - // There are two cases to handle with some care: - // - // 1. If any held capabilities are not in advance of the new input frontier, - // we must carve out updates now in advance of the new input frontier and - // transmit them as batches, which requires appropriate *single* capabilites; - // Until timely dataflow supports multiple capabilities on messages, at least. - // - // 2. If there are no held capabilities in advance of the new input frontier, - // then there are no updates not in advance of the new input frontier and - // we can simply create an empty input batch with the new upper frontier - // and feed this to the trace agent (but not along the timely output). + let (reader_local, mut writer) = TraceAgent::new(empty_trace, info, logger); - // If there is at least one capability not in advance of the input frontier ... - if capabilities.elements().iter().any(|c| !input.frontier().less_equal(c.time())) { + *reader_ref = Some(reader_local); - let mut upper = Antichain::new(); // re-used allocation for sealing batches. + // Initialize to the minimal input frontier. + let mut prev_frontier = Antichain::from_elem(::minimum()); - // For each capability not in advance of the input frontier ... - for (index, capability) in capabilities.elements().iter().enumerate() { + move |input, output| { - if !input.frontier().less_equal(capability.time()) { + // As we receive data, we need to (i) stash the data and (ii) keep *enough* capabilities. + // We don't have to keep all capabilities, but we need to be able to form output messages + // when we realize that time intervals are complete. - // Assemble the upper bound on times we can commit with this capabilities. - // We must respect the input frontier, and *subsequent* capabilities, as - // we are pretending to retire the capability changes one by one. - upper.clear(); - for time in input.frontier().frontier().iter() { - upper.insert(time.clone()); - } - for other_capability in &capabilities.elements()[(index + 1) .. ] { - upper.insert(other_capability.time().clone()); - } + input.for_each(|cap, data| { + capabilities.insert(cap.retain()); + batcher.push_batch(data); + }); - // Extract updates not in advance of `upper`. - let batch = batcher.seal::(upper.clone()); + // The frontier may have advanced by multiple elements, which is an issue because + // timely dataflow currently only allows one capability per message. This means we + // must pretend to process the frontier advances one element at a time, batching + // and sending smaller bites than we might have otherwise done. + + // Assert that the frontier never regresses. + assert!(PartialOrder::less_equal(&prev_frontier.borrow(), &input.frontier().frontier())); + + // Test to see if strict progress has occurred, which happens whenever the new + // frontier isn't equal to the previous. It is only in this case that we have any + // data processing to do. + if prev_frontier.borrow() != input.frontier().frontier() { + // There are two cases to handle with some care: + // + // 1. If any held capabilities are not in advance of the new input frontier, + // we must carve out updates now in advance of the new input frontier and + // transmit them as batches, which requires appropriate *single* capabilites; + // Until timely dataflow supports multiple capabilities on messages, at least. + // + // 2. If there are no held capabilities in advance of the new input frontier, + // then there are no updates not in advance of the new input frontier and + // we can simply create an empty input batch with the new upper frontier + // and feed this to the trace agent (but not along the timely output). + + // If there is at least one capability not in advance of the input frontier ... + if capabilities.elements().iter().any(|c| !input.frontier().less_equal(c.time())) { + + let mut upper = Antichain::new(); // re-used allocation for sealing batches. + + // For each capability not in advance of the input frontier ... + for (index, capability) in capabilities.elements().iter().enumerate() { + + if !input.frontier().less_equal(capability.time()) { + + // Assemble the upper bound on times we can commit with this capabilities. + // We must respect the input frontier, and *subsequent* capabilities, as + // we are pretending to retire the capability changes one by one. + upper.clear(); + for time in input.frontier().frontier().iter() { + upper.insert(time.clone()); + } + for other_capability in &capabilities.elements()[(index + 1) .. ] { + upper.insert(other_capability.time().clone()); + } - writer.insert(batch.clone(), Some(capability.time().clone())); + // Extract updates not in advance of `upper`. + let batch = batcher.seal::(upper.clone()); - // send the batch to downstream consumers, empty or not. - output.session(&capabilities.elements()[index]).give(batch); - } - } + writer.insert(batch.clone(), Some(capability.time().clone())); - // Having extracted and sent batches between each capability and the input frontier, - // we should downgrade all capabilities to match the batcher's lower update frontier. - // This may involve discarding capabilities, which is fine as any new updates arrive - // in messages with new capabilities. + // send the batch to downstream consumers, empty or not. + output.session(&capabilities.elements()[index]).give(batch); + } + } - let mut new_capabilities = Antichain::new(); - for time in batcher.frontier().iter() { - if let Some(capability) = capabilities.elements().iter().find(|c| c.time().less_equal(time)) { - new_capabilities.insert(capability.delayed(time)); - } - else { - panic!("failed to find capability"); - } - } + // Having extracted and sent batches between each capability and the input frontier, + // we should downgrade all capabilities to match the batcher's lower update frontier. + // This may involve discarding capabilities, which is fine as any new updates arrive + // in messages with new capabilities. - capabilities = new_capabilities; + let mut new_capabilities = Antichain::new(); + for time in batcher.frontier().iter() { + if let Some(capability) = capabilities.elements().iter().find(|c| c.time().less_equal(time)) { + new_capabilities.insert(capability.delayed(time)); } else { - // Announce progress updates, even without data. - let _batch = batcher.seal::(input.frontier().frontier().to_owned()); - writer.seal(input.frontier().frontier().to_owned()); + panic!("failed to find capability"); } - - prev_frontier.clear(); - prev_frontier.extend(input.frontier().frontier().iter().cloned()); } - writer.exert(); + capabilities = new_capabilities; + } + else { + // Announce progress updates, even without data. + let _batch = batcher.seal::(input.frontier().frontier().to_owned()); + writer.seal(input.frontier().frontier().to_owned()); } - }) - }; - Arranged { stream, trace: reader.unwrap() } - } + prev_frontier.clear(); + prev_frontier.extend(input.frontier().frontier().iter().cloned()); + } + + writer.exert(); + } + }); + + Arranged { stream, trace: reader.unwrap() } } -impl Arrange for Collection +impl Arrange> for Collection where G::Timestamp: Lattice+Ord, { + fn arrange_named(&self, name: &str) -> Arranged> + where + Tr: Trace + 'static, + Tr::Batch: Batch, + Tr::Batcher: Batcher>, + { + let exchange = Exchange::new(move |update: &((K,()),G::Timestamp,R)| (update.0).0.hashed().into()); + self.arrange_core(exchange, name) + } + fn arrange_core(&self, pact: P, name: &str) -> Arranged> where - P: ParallelizationContract, + P: ParallelizationContract>, Tr: Trace+'static, Tr::Batch: Batch, - Tr::Batcher: Batcher, - Tr::Builder: Builder, + Tr::Batcher: Batcher>, { self.map(|k| (k, ())) .arrange_core(pact, name) diff --git a/src/operators/arrange/upsert.rs b/src/operators/arrange/upsert.rs index e9d1744ff..4ca9dc9b3 100644 --- a/src/operators/arrange/upsert.rs +++ b/src/operators/arrange/upsert.rs @@ -136,7 +136,7 @@ where Tr::ValOwned: ExchangeData, Tr::Time: TotalOrder+ExchangeData, Tr::Batch: Batch, - Tr::Builder: Builder, + Tr::Builder: Builder, { let mut reader: Option> = None; diff --git a/src/operators/consolidate.rs b/src/operators/consolidate.rs index 5339a54aa..b67c1c4b8 100644 --- a/src/operators/consolidate.rs +++ b/src/operators/consolidate.rs @@ -13,7 +13,7 @@ use crate::difference::Semigroup; use crate::Data; use crate::lattice::Lattice; -use crate::trace::{Batcher, Builder}; +use crate::trace::Batcher; /// Methods which require data be arrangeable. impl Collection @@ -53,8 +53,7 @@ where where Tr: crate::trace::Trace+'static, Tr::Batch: crate::trace::Batch, - Tr::Batcher: Batcher, - Tr::Builder: Builder, + Tr::Batcher: Batcher>, { use crate::operators::arrange::arrangement::Arrange; use crate::trace::cursor::MyTrait; diff --git a/src/operators/iterate.rs b/src/operators/iterate.rs index 8a60da4bb..6d0e27144 100644 --- a/src/operators/iterate.rs +++ b/src/operators/iterate.rs @@ -154,7 +154,7 @@ impl Iterate for G { pub struct Variable where G::Timestamp: Lattice { collection: Collection, - feedback: Handle, + feedback: Handle>, source: Option>, step: ::Summary, } @@ -225,7 +225,7 @@ impl Deref for Variable where G::Timesta pub struct SemigroupVariable where G::Timestamp: Lattice { collection: Collection, - feedback: Handle, + feedback: Handle>, step: ::Summary, } diff --git a/src/operators/join.rs b/src/operators/join.rs index ee1c28724..a31e682a5 100644 --- a/src/operators/join.rs +++ b/src/operators/join.rs @@ -617,7 +617,7 @@ where /// Process keys until at least `fuel` output tuples produced, or the work is exhausted. #[inline(never)] - fn work(&mut self, output: &mut OutputHandle>, mut logic: L, fuel: &mut usize) + fn work(&mut self, output: &mut OutputHandle>>, mut logic: L, fuel: &mut usize) where I: IntoIterator, L: for<'a> FnMut(C1::Key<'a>, C1::Val<'a>, C2::Val<'a>, &T, &C1::Diff, &C2::Diff)->I, diff --git a/src/operators/reduce.rs b/src/operators/reduce.rs index 7f322c00e..37d98d8ca 100644 --- a/src/operators/reduce.rs +++ b/src/operators/reduce.rs @@ -251,7 +251,7 @@ pub trait ReduceCore, + T2::Builder: Builder, L: FnMut(&K, &[(&V, R)], &mut Vec<(::ValOwned, T2::Diff)>)+'static, { self.reduce_core::<_,T2>(name, move |key, input, output, change| { @@ -273,7 +273,7 @@ pub trait ReduceCore Trace=&'a K, Time=G::Timestamp>+'static, T2::ValOwned: Data, T2::Batch: Batch, - T2::Builder: Builder, + T2::Builder: Builder, L: FnMut(&K, &[(&V, R)], &mut Vec<(::ValOwned,T2::Diff)>, &mut Vec<(::ValOwned, T2::Diff)>)+'static, ; } @@ -292,7 +292,7 @@ where T2::ValOwned: Data, T2: for<'a> Trace=&'a K, Time=G::Timestamp>+'static, T2::Batch: Batch, - T2::Builder: Builder, + T2::Builder: Builder, L: FnMut(&K, &[(&V, R)], &mut Vec<(::ValOwned,T2::Diff)>, &mut Vec<(::ValOwned, T2::Diff)>)+'static, { self.arrange_by_key_named(&format!("Arrange: {}", name)) @@ -310,7 +310,7 @@ where T2: for<'a> Trace=T1::Key<'a>, Time=T1::Time> + 'static, T2::ValOwned: Data, T2::Batch: Batch, - T2::Builder: Builder, + T2::Builder: Builder, L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwned,T2::Diff)>, &mut Vec<(T2::ValOwned, T2::Diff)>)+'static, { let mut result_trace = None; diff --git a/src/trace/implementations/merge_batcher.rs b/src/trace/implementations/merge_batcher.rs index 680aa5306..2cf27ed1b 100644 --- a/src/trace/implementations/merge_batcher.rs +++ b/src/trace/implementations/merge_batcher.rs @@ -25,7 +25,8 @@ where T: Timestamp, D: Semigroup, { - type Item = ((K,V),T,D); + type Input = Vec<((K,V),T,D)>; + type Output = ((K,V),T,D); type Time = T; fn new(logger: Option>, operator_id: usize) -> Self { @@ -37,7 +38,7 @@ where } #[inline(never)] - fn push_batch(&mut self, batch: RefOrMut>) { + fn push_batch(&mut self, batch: RefOrMut) { // `batch` is either a shared reference or an owned allocations. match batch { RefOrMut::Ref(reference) => { @@ -58,7 +59,7 @@ where // which we call `lower`, by assumption that after sealing a batcher we receive no more // updates with times not greater or equal to `upper`. #[inline(never)] - fn seal>(&mut self, upper: Antichain) -> B::Output { + fn seal>(&mut self, upper: Antichain) -> B::Output { let mut merged = Vec::new(); self.sorter.finish_into(&mut merged); diff --git a/src/trace/implementations/merge_batcher_col.rs b/src/trace/implementations/merge_batcher_col.rs index 9c2faab12..21416170d 100644 --- a/src/trace/implementations/merge_batcher_col.rs +++ b/src/trace/implementations/merge_batcher_col.rs @@ -31,7 +31,8 @@ where T: Columnation + Timestamp + 'static, D: Columnation + Semigroup + 'static, { - type Item = ((K,V),T,D); + type Input = Vec<((K,V),T,D)>; + type Output = ((K,V),T,D); type Time = T; fn new(logger: Option>, operator_id: usize) -> Self { @@ -43,7 +44,7 @@ where } #[inline] - fn push_batch(&mut self, batch: RefOrMut>) { + fn push_batch(&mut self, batch: RefOrMut) { // `batch` is either a shared reference or an owned allocations. match batch { RefOrMut::Ref(reference) => { @@ -62,7 +63,7 @@ where // which we call `lower`, by assumption that after sealing a batcher we receive no more // updates with times not greater or equal to `upper`. #[inline] - fn seal>(&mut self, upper: Antichain) -> B::Output { + fn seal>(&mut self, upper: Antichain) -> B::Output { let mut merged = Default::default(); self.sorter.finish_into(&mut merged); diff --git a/src/trace/implementations/ord_neu.rs b/src/trace/implementations/ord_neu.rs index bf4dd9544..ce0c6532a 100644 --- a/src/trace/implementations/ord_neu.rs +++ b/src/trace/implementations/ord_neu.rs @@ -540,7 +540,7 @@ mod val_batch { impl Builder for OrdValBuilder { - type Item = ((::Key, ::Val), ::Time, ::Diff); + type Input = ((::Key, ::Val), ::Time, ::Diff); type Time = ::Time; type Output = OrdValBatch; @@ -560,7 +560,7 @@ mod val_batch { } #[inline] - fn push(&mut self, ((key, val), time, diff): Self::Item) { + fn push(&mut self, ((key, val), time, diff): Self::Input) { // Perhaps this is a continuation of an already received key. if self.result.keys.last().map(|k| k.equals(&key)).unwrap_or(false) { @@ -586,7 +586,7 @@ mod val_batch { } #[inline] - fn copy(&mut self, ((key, val), time, diff): &Self::Item) { + fn copy(&mut self, ((key, val), time, diff): &Self::Input) { // Perhaps this is a continuation of an already received key. if self.result.keys.last().map(|k| k.equals(key)).unwrap_or(false) { @@ -1006,7 +1006,7 @@ mod key_batch { impl Builder for OrdKeyBuilder { - type Item = ((::Key, ()), ::Time, ::Diff); + type Input = ((::Key, ()), ::Time, ::Diff); type Time = ::Time; type Output = OrdKeyBatch; @@ -1024,7 +1024,7 @@ mod key_batch { } #[inline] - fn push(&mut self, ((key, ()), time, diff): Self::Item) { + fn push(&mut self, ((key, ()), time, diff): Self::Input) { // Perhaps this is a continuation of an already received key. if self.result.keys.last().map(|k| k.equals(&key)).unwrap_or(false) { @@ -1040,7 +1040,7 @@ mod key_batch { } #[inline] - fn copy(&mut self, ((key, ()), time, diff): &Self::Item) { + fn copy(&mut self, ((key, ()), time, diff): &Self::Input) { // Perhaps this is a continuation of an already received key. if self.result.keys.last().map(|k| k.equals(key)).unwrap_or(false) { diff --git a/src/trace/implementations/rhh.rs b/src/trace/implementations/rhh.rs index 94ed3b95b..f29d39a8b 100644 --- a/src/trace/implementations/rhh.rs +++ b/src/trace/implementations/rhh.rs @@ -731,7 +731,7 @@ mod val_batch { ::Key: Default + HashOrdered, // RhhValBatch: Batch::Key, Val=::Val, Time=::Time, Diff=::Diff>, { - type Item = ((::Key, ::Val), ::Time, ::Diff); + type Input = ((::Key, ::Val), ::Time, ::Diff); type Time = ::Time; type Output = RhhValBatch; @@ -763,7 +763,7 @@ mod val_batch { } #[inline] - fn push(&mut self, ((key, val), time, diff): Self::Item) { + fn push(&mut self, ((key, val), time, diff): Self::Input) { // Perhaps this is a continuation of an already received key. if self.result.keys.last().map(|k| k.equals(&key)).unwrap_or(false) { @@ -790,7 +790,7 @@ mod val_batch { } #[inline] - fn copy(&mut self, ((key, val), time, diff): &Self::Item) { + fn copy(&mut self, ((key, val), time, diff): &Self::Input) { // Perhaps this is a continuation of an already received key. if self.result.keys.last().map(|k| k.equals(key)).unwrap_or(false) { diff --git a/src/trace/implementations/spine_fueled.rs b/src/trace/implementations/spine_fueled.rs index fd3af3b76..c616e2b7d 100644 --- a/src/trace/implementations/spine_fueled.rs +++ b/src/trace/implementations/spine_fueled.rs @@ -255,7 +255,7 @@ impl Trace for Spine where B: Batch+Clone+'static, BA: Batcher