From d4e20a01c8071da1e356a428e3753d36faa508cf Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Thu, 21 Mar 2024 12:53:23 -0400 Subject: [PATCH 1/7] Update to track TD changes (#467) --- src/operators/arrange/arrangement.rs | 6 +++--- src/operators/join.rs | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/operators/arrange/arrangement.rs b/src/operators/arrange/arrangement.rs index 031e587b9..fe65a702d 100644 --- a/src/operators/arrange/arrangement.rs +++ b/src/operators/arrange/arrangement.rs @@ -541,7 +541,7 @@ where /// be consistently by key (though this is the most common). fn arrange_core(&self, pact: P, name: &str) -> Arranged> where - P: ParallelizationContract, + P: ParallelizationContract>, K: Clone, V: Clone, R: Clone, @@ -562,7 +562,7 @@ where { fn arrange_core(&self, pact: P, name: &str) -> Arranged> where - P: ParallelizationContract, + P: ParallelizationContract>, Tr: Trace+'static, Tr::Batch: Batch, Tr::Batcher: Batcher, @@ -727,7 +727,7 @@ where { fn arrange_core(&self, pact: P, name: &str) -> Arranged> where - P: ParallelizationContract, + P: ParallelizationContract>, Tr: Trace+'static, Tr::Batch: Batch, Tr::Batcher: Batcher, diff --git a/src/operators/join.rs b/src/operators/join.rs index ee1c28724..a31e682a5 100644 --- a/src/operators/join.rs +++ b/src/operators/join.rs @@ -617,7 +617,7 @@ where /// Process keys until at least `fuel` output tuples produced, or the work is exhausted. #[inline(never)] - fn work(&mut self, output: &mut OutputHandle>, mut logic: L, fuel: &mut usize) + fn work(&mut self, output: &mut OutputHandle>>, mut logic: L, fuel: &mut usize) where I: IntoIterator, L: for<'a> FnMut(C1::Key<'a>, C1::Val<'a>, C2::Val<'a>, &T, &C1::Diff, &C2::Diff)->I, From 84d0d4d6440971bfd6fefba112f631c98506dfc4 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Thu, 21 Mar 2024 15:34:52 -0400 Subject: [PATCH 2/7] Name Handle's container type (#468) --- src/operators/iterate.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/operators/iterate.rs b/src/operators/iterate.rs index 8a60da4bb..6d0e27144 100644 --- a/src/operators/iterate.rs +++ b/src/operators/iterate.rs @@ -154,7 +154,7 @@ impl Iterate for G { pub struct Variable where G::Timestamp: Lattice { collection: Collection, - feedback: Handle, + feedback: Handle>, source: Option>, step: ::Summary, } @@ -225,7 +225,7 @@ impl Deref for Variable where G::Timesta pub struct SemigroupVariable where G::Timestamp: Lattice { collection: Collection, - feedback: Handle, + feedback: Handle>, step: ::Summary, } From dfc0062b9ae34c6515568b84b215d2d2744413d8 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Thu, 21 Mar 2024 19:13:02 -0400 Subject: [PATCH 3/7] Prevent enter_at from using enter_at (#469) --- examples/accumulate.rs | 9 ++++++++- src/collection.rs | 10 +++------- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/examples/accumulate.rs b/examples/accumulate.rs index 4693431e4..0fa792926 100644 --- a/examples/accumulate.rs +++ b/examples/accumulate.rs @@ -16,7 +16,14 @@ fn main() { let mut input = worker.dataflow::<(), _, _>(|scope| { let (input, data) = scope.new_collection::<_, isize>(); - data.consolidate(); + + use timely::dataflow::Scope; + scope.iterative::(|inner| { + data.enter_at(inner, |_| 0) + .consolidate() + .leave() + }); + input }); diff --git a/src/collection.rs b/src/collection.rs index aeb6aad38..2e82c7e62 100644 --- a/src/collection.rs +++ b/src/collection.rs @@ -322,20 +322,16 @@ impl Collection where G::Timestamp: Da /// data.assert_eq(&result); /// }); /// ``` - pub fn enter_at<'a, T, F>(&self, child: &Iterative<'a, G, T>, initial: F) -> Collection, D, R> + pub fn enter_at<'a, T, F>(&self, child: &Iterative<'a, G, T>, mut initial: F) -> Collection, D, R> where T: Timestamp+Hash, F: FnMut(&D) -> T + Clone + 'static, G::Timestamp: Hash, { - - let mut initial1 = initial.clone(); - let mut initial2 = initial.clone(); - self.inner - .enter_at(child, move |x| initial1(&x.0)) + .enter(child) .map(move |(data, time, diff)| { - let new_time = Product::new(time, initial2(&data)); + let new_time = Product::new(time, initial(&data)); (data, new_time, diff) }) .as_collection() From 1dee988cc90badf79bb6b1d7dad9d9e91e8e90a2 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Sat, 23 Mar 2024 03:30:42 -0400 Subject: [PATCH 4/7] Track TD Capture changes (#470) --- tests/import.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/import.rs b/tests/import.rs index 8176f14af..1a15d3d7a 100644 --- a/tests/import.rs +++ b/tests/import.rs @@ -9,7 +9,7 @@ use differential_dataflow::operators::reduce::Reduce; use differential_dataflow::trace::TraceReader; use itertools::Itertools; -type Result = std::sync::mpsc::Receiver>; +type Result = std::sync::mpsc::Receiver>>; fn run_test(test: T, expected: Vec<(usize, Vec<((u64, i64), i64)>)>) -> () where T: FnOnce(Vec>)-> Result + ::std::panic::UnwindSafe From 51966ad62680bd7c674e786dd56d1c8daec8d9d7 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Fri, 12 Apr 2024 14:11:07 -0400 Subject: [PATCH 5/7] WIP: Extract batcher input to assoc type, arrange_core freestanding (#471) * Extract batcher input to assoc type, arrange_core freestanding Signed-off-by: Moritz Hoffmann * Fix warnings Signed-off-by: Moritz Hoffmann * documentation Signed-off-by: Moritz Hoffmann * formatting Signed-off-by: Moritz Hoffmann --------- Signed-off-by: Moritz Hoffmann --- .github/workflows/deploy.yml | 2 +- .github/workflows/test.yml | 4 +- doop/src/main.rs | 8 +- experiments/src/bin/graphs-interactive-alt.rs | 4 +- experiments/src/bin/graphs-interactive-neu.rs | 4 +- interactive/src/command.rs | 6 +- interactive/src/logging.rs | 6 +- src/operators/arrange/arrangement.rs | 284 +++++++++--------- src/operators/consolidate.rs | 2 +- src/trace/implementations/merge_batcher.rs | 1 + .../implementations/merge_batcher_col.rs | 1 + src/trace/mod.rs | 4 +- 12 files changed, 175 insertions(+), 151 deletions(-) diff --git a/.github/workflows/deploy.yml b/.github/workflows/deploy.yml index babc088a8..41c27604b 100644 --- a/.github/workflows/deploy.yml +++ b/.github/workflows/deploy.yml @@ -9,7 +9,7 @@ jobs: deploy: runs-on: ubuntu-22.04 steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - run: cargo install mdbook --version 0.4.31 - run: cd mdbook && mdbook build - uses: JamesIves/github-pages-deploy-action@v4 diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index e4643725d..e3c9b6087 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -16,7 +16,7 @@ jobs: toolchain: - stable - 1.72 - name: cargo test on ${{ matrix.os }} + name: cargo test on ${{ matrix.os }}, rust ${{ matrix.toolchain }} runs-on: ${{ matrix.os }}-latest steps: - uses: actions/checkout@v4 @@ -24,7 +24,7 @@ jobs: with: toolchain: ${{ matrix.toolchain }} - name: Cargo test - run: cargo test + run: cargo test --workspace --all-targets # Check formatting with rustfmt mdbook: diff --git a/doop/src/main.rs b/doop/src/main.rs index 6628a7e30..d6ad19c19 100644 --- a/doop/src/main.rs +++ b/doop/src/main.rs @@ -1,4 +1,4 @@ -#![allow(non_snake_case)] +#![allow(non_snake_case, dead_code)] use std::collections::HashMap; use std::rc::Rc; @@ -145,7 +145,7 @@ fn load<'a>(filename: &str, interner: Rc>) -> impl Itera }) } -fn load1<'a>(index: usize, prefix: &str, filename: &str, interner: Rc>) -> impl Iterator+'a { +fn load1<'a>(index: usize, prefix: &str, filename: &str, interner: Rc>) -> impl Iterator+'a { read_file(&format!("{}{}", prefix, filename)) .filter(move |_| index == 0) .map(move |line| { @@ -791,7 +791,7 @@ fn main() { let SupertypeOf = SupertypeOf.enter(scope); // Required by all - let mut Reachable = Relation::<_,(Method)>::new(scope); + let mut Reachable = Relation::<_,Method>::new(scope); // NOTE: Common subexpression. let Reachable_Invocation = @@ -805,7 +805,7 @@ fn main() { // let Reachable = ReachableFinal.clone(); // Class initialization - let mut InitializedClass = Relation::<_,(Type)>::new(scope); + let mut InitializedClass = Relation::<_,Type>::new(scope); // ClassInitializer(?type, ?method) :- basic.MethodImplemented("", "void()", ?type, ?method). let temp1 = interner.borrow_mut().intern(""); diff --git a/experiments/src/bin/graphs-interactive-alt.rs b/experiments/src/bin/graphs-interactive-alt.rs index 55c8a8cf6..18dc0923f 100644 --- a/experiments/src/bin/graphs-interactive-alt.rs +++ b/experiments/src/bin/graphs-interactive-alt.rs @@ -1,3 +1,5 @@ +#![allow(dead_code)] + use rand::{Rng, SeedableRng, StdRng}; use timely::dataflow::*; @@ -389,4 +391,4 @@ where G::Timestamp: Lattice { .concat(&prop) .reduce(|_, s, t| { t.push((*s[0].0, 1)); }) }) -} \ No newline at end of file +} diff --git a/experiments/src/bin/graphs-interactive-neu.rs b/experiments/src/bin/graphs-interactive-neu.rs index a01f23d97..6210113ed 100644 --- a/experiments/src/bin/graphs-interactive-neu.rs +++ b/experiments/src/bin/graphs-interactive-neu.rs @@ -1,3 +1,5 @@ +#![allow(dead_code)] + use rand::{Rng, SeedableRng, StdRng}; use timely::dataflow::*; @@ -392,4 +394,4 @@ where G::Timestamp: Lattice+Ord { reached.leave() }) -} \ No newline at end of file +} diff --git a/interactive/src/command.rs b/interactive/src/command.rs index c31963afe..d795ec13a 100644 --- a/interactive/src/command.rs +++ b/interactive/src/command.rs @@ -151,7 +151,7 @@ where println!("\tTimely logging connection {} of {}", index, number); let socket = listener.incoming().next().unwrap().unwrap(); socket.set_nonblocking(true).expect("failed to set nonblocking"); - streams.push(EventReader::::new(socket)); + streams.push(EventReader::,_>::new(socket)); } println!("\tAll logging connections established"); @@ -174,7 +174,7 @@ where for _ in 0 .. number { let socket = listener.incoming().next().unwrap().unwrap(); socket.set_nonblocking(true).expect("failed to set nonblocking"); - streams.push(EventReader::::new(socket)); + streams.push(EventReader::,_>::new(socket)); } } crate::logging::publish_differential_logging(manager, worker, granularity, &name_as, streams); @@ -195,4 +195,4 @@ where pub fn serialize_into(&self, writer: W) { bincode::serialize_into(writer, self).expect("bincode: serialization failed"); } -} \ No newline at end of file +} diff --git a/interactive/src/logging.rs b/interactive/src/logging.rs index cfc524660..be07fa4b6 100644 --- a/interactive/src/logging.rs +++ b/interactive/src/logging.rs @@ -30,7 +30,7 @@ where V: ExchangeData+Hash+LoggingValue+Datum, A: Allocate, I : IntoIterator, - ::Item: EventIterator+'static + ::Item: EventIterator>+'static { let (operates, channels, schedule, messages, shutdown, park, text) = worker.dataflow(move |scope| { @@ -217,7 +217,7 @@ where V: ExchangeData+Hash+LoggingValue+Datum, A: Allocate, I : IntoIterator, - ::Item: EventIterator+'static + ::Item: EventIterator>+'static { let (merge,batch) = worker.dataflow(move |scope| { @@ -280,4 +280,4 @@ where manager.traces.set_unkeyed(&Plan::Source(format!("logs/{}/differential/arrange/batch", name)), &batch); manager.traces.set_unkeyed(&Plan::Source(format!("logs/{}/differential/arrange/merge", name)), &merge); -} \ No newline at end of file +} diff --git a/src/operators/arrange/arrangement.rs b/src/operators/arrange/arrangement.rs index fe65a702d..653ed1e87 100644 --- a/src/operators/arrange/arrangement.rs +++ b/src/operators/arrange/arrangement.rs @@ -19,7 +19,7 @@ use timely::dataflow::operators::{Enter, Map}; use timely::order::PartialOrder; -use timely::dataflow::{Scope, Stream}; +use timely::dataflow::{Scope, Stream, StreamCore}; use timely::dataflow::operators::generic::Operator; use timely::dataflow::channels::pact::{ParallelizationContract, Pipeline, Exchange}; use timely::progress::Timestamp; @@ -511,7 +511,7 @@ where V: ExchangeData, R: ExchangeData, Tr::Batch: Batch, - Tr::Batcher: Batcher, + Tr::Batcher: Batcher, Item = ((K,V),G::Timestamp,R), Time = G::Timestamp>, Tr::Builder: Builder, { self.arrange_named("Arrange") @@ -527,7 +527,7 @@ where V: ExchangeData, R: ExchangeData, Tr::Batch: Batch, - Tr::Batcher: Batcher, + Tr::Batcher: Batcher, Item = ((K,V),G::Timestamp,R), Time = G::Timestamp>, Tr::Builder: Builder, { let exchange = Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into()); @@ -547,7 +547,7 @@ where R: Clone, Tr: Trace+'static, Tr::Batch: Batch, - Tr::Batcher: Batcher, + Tr::Batcher: Batcher, Item = ((K,V),G::Timestamp,R), Time = G::Timestamp>, Tr::Builder: Builder, ; } @@ -565,160 +565,176 @@ where P: ParallelizationContract>, Tr: Trace+'static, Tr::Batch: Batch, - Tr::Batcher: Batcher, + Tr::Batcher: Batcher, Item = ((K,V),G::Timestamp,R), Time = G::Timestamp>, Tr::Builder: Builder, { - // The `Arrange` operator is tasked with reacting to an advancing input - // frontier by producing the sequence of batches whose lower and upper - // bounds are those frontiers, containing updates at times greater or - // equal to lower and not greater or equal to upper. - // - // The operator uses its batch type's `Batcher`, which accepts update - // triples and responds to requests to "seal" batches (presented as new - // upper frontiers). - // - // Each sealed batch is presented to the trace, and if at all possible - // transmitted along the outgoing channel. Empty batches may not have - // a corresponding capability, as they are only retained for actual data - // held by the batcher, which may prevents the operator from sending an - // empty batch. - - let mut reader: Option> = None; - - // fabricate a data-parallel operator using the `unary_notify` pattern. - let stream = { - - let reader = &mut reader; - - self.inner.unary_frontier(pact, name, move |_capability, info| { - - // Acquire a logger for arrange events. - let logger = { - let scope = self.scope(); - let register = scope.log_register(); - register.get::("differential/arrange") - }; - - // Where we will deposit received updates, and from which we extract batches. - let mut batcher = Tr::Batcher::new(logger.clone(), info.global_id); - - // Capabilities for the lower envelope of updates in `batcher`. - let mut capabilities = Antichain::>::new(); - - let activator = Some(self.scope().activator_for(&info.address[..])); - let mut empty_trace = Tr::new(info.clone(), logger.clone(), activator); - // If there is default exertion logic set, install it. - if let Some(exert_logic) = self.inner.scope().config().get::("differential/default_exert_logic").cloned() { - empty_trace.set_exert_logic(exert_logic); - } - - let (reader_local, mut writer) = TraceAgent::new(empty_trace, info, logger); - - *reader = Some(reader_local); - - // Initialize to the minimal input frontier. - let mut prev_frontier = Antichain::from_elem(::minimum()); - - move |input, output| { + arrange_core(&self.inner, pact, name) + } +} - // As we receive data, we need to (i) stash the data and (ii) keep *enough* capabilities. - // We don't have to keep all capabilities, but we need to be able to form output messages - // when we realize that time intervals are complete. +/// Arranges a stream of updates by a key, configured with a name and a parallelization contract. +/// +/// This operator arranges a stream of values into a shared trace, whose contents it maintains. +/// It uses the supplied parallelization contract to distribute the data, which does not need to +/// be consistently by key (though this is the most common). +fn arrange_core(stream: &StreamCore::Input>, pact: P, name: &str) -> Arranged> +where + G: Scope, + G::Timestamp: Lattice, + P: ParallelizationContract::Input>, + Tr: Trace+'static, + Tr::Batch: Batch, + Tr::Batcher: Batcher