Skip to content
This repository has been archived by the owner on Oct 30, 2024. It is now read-only.

Pull upstream #10

Merged
merged 5 commits into from
May 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ members = [
# "advent_of_code_2017",
"dogsdogsdogs",
"experiments",
"interactive",
#"interactive",
"server",
"server/dataflows/degr_dist",
"server/dataflows/neighborhood",
Expand Down
6 changes: 3 additions & 3 deletions dogsdogsdogs/examples/delta_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,17 +79,17 @@ fn main() {

// Prior technology
// dQ/dE1 := dE1(a,b), E2(b,c), E3(a,c)
let changes1 = propose(&changes, forward_key_neu.clone(), key2.clone());
let changes1 = propose(&changes, forward_key_neu.clone(), key2.clone(), Clone::clone);
let changes1 = validate(&changes1, forward_self_neu.clone(), key1.clone());
let changes1 = changes1.map(|((a,b),c)| (a,b,c));

// dQ/dE2 := dE2(b,c), E1(a,b), E3(a,c)
let changes2 = propose(&changes, reverse_key_alt.clone(), key1.clone());
let changes2 = propose(&changes, reverse_key_alt.clone(), key1.clone(), Clone::clone);
let changes2 = validate(&changes2, reverse_self_neu.clone(), key2.clone());
let changes2 = changes2.map(|((b,c),a)| (a,b,c));

// dQ/dE3 := dE3(a,c), E1(a,b), E2(b,c)
let changes3 = propose(&changes, forward_key_alt.clone(), key1.clone());
let changes3 = propose(&changes, forward_key_alt.clone(), key1.clone(), Clone::clone);
let changes3 = validate(&changes3, reverse_self_alt.clone(), key2.clone());
let changes3 = changes3.map(|((a,c),b)| (a,b,c));

Expand Down
2 changes: 1 addition & 1 deletion dogsdogsdogs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ where

fn propose(&mut self, prefixes: &Collection<G, P, R>) -> Collection<G, (P, V), R> {
let propose = self.indices.propose_trace.import(&prefixes.scope());
operators::propose::propose(prefixes, propose, self.key_selector.clone())
operators::propose::propose(prefixes, propose, self.key_selector.clone(), |x| x.clone())
}

fn validate(&mut self, extensions: &Collection<G, (P, V), R>) -> Collection<G, (P, V), R> {
Expand Down
2 changes: 1 addition & 1 deletion dogsdogsdogs/src/operators/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub fn count<G, Tr, R, F, P>(
) -> Collection<G, (P, usize, usize), R>
where
G: Scope<Timestamp=Tr::Time>,
Tr: TraceReader<ValOwned=(), Diff=isize>+Clone+'static,
Tr: TraceReader<Diff=isize>+Clone+'static,
Tr::KeyOwned: Hashable + Default,
R: Monoid+Multiply<Output = R>+ExchangeData,
F: Fn(&P)->Tr::KeyOwned+Clone+'static,
Expand Down
19 changes: 12 additions & 7 deletions dogsdogsdogs/src/operators/propose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use differential_dataflow::{ExchangeData, Collection, Hashable};
use differential_dataflow::difference::{Monoid, Multiply};
use differential_dataflow::operators::arrange::Arranged;
use differential_dataflow::trace::TraceReader;
use differential_dataflow::trace::cursor::MyTrait;

/// Proposes extensions to a prefix stream.
///
Expand All @@ -14,24 +13,27 @@ use differential_dataflow::trace::cursor::MyTrait;
/// create a join if the `prefixes` collection is also arranged and responds to changes that
/// `arrangement` undergoes. More complicated patterns are also appropriate, as in the case
/// of delta queries.
pub fn propose<G, Tr, F, P>(
pub fn propose<G, Tr, F, P, V, VF>(
prefixes: &Collection<G, P, Tr::Diff>,
arrangement: Arranged<G, Tr>,
key_selector: F,
) -> Collection<G, (P, Tr::ValOwned), Tr::Diff>
val_from: VF,
) -> Collection<G, (P, V), Tr::Diff>
where
G: Scope<Timestamp=Tr::Time>,
Tr: TraceReader+Clone+'static,
Tr::KeyOwned: Hashable + Default,
Tr::Diff: Monoid+Multiply<Output = Tr::Diff>+ExchangeData,
F: Fn(&P)->Tr::KeyOwned+Clone+'static,
P: ExchangeData,
V: Clone + 'static,
VF: Fn(Tr::Val<'_>) -> V + 'static,
{
crate::operators::lookup_map(
prefixes,
arrangement,
move |p: &P, k: &mut Tr::KeyOwned | { *k = key_selector(p); },
|prefix, diff, value, sum| ((prefix.clone(), value.into_owned()), diff.clone().multiply(sum)),
move |prefix, diff, value, sum| ((prefix.clone(), val_from(value)), diff.clone().multiply(sum)),
Default::default(),
Default::default(),
Default::default(),
Expand All @@ -43,24 +45,27 @@ where
/// Unlike `propose`, this method does not scale the multiplicity of matched
/// prefixes by the number of matches in `arrangement`. This can be useful to
/// avoid the need to prepare an arrangement of distinct extensions.
pub fn propose_distinct<G, Tr, F, P>(
pub fn propose_distinct<G, Tr, F, P, V, VF>(
prefixes: &Collection<G, P, Tr::Diff>,
arrangement: Arranged<G, Tr>,
key_selector: F,
) -> Collection<G, (P, Tr::ValOwned), Tr::Diff>
val_from: VF,
) -> Collection<G, (P, V), Tr::Diff>
where
G: Scope<Timestamp=Tr::Time>,
Tr: TraceReader+Clone+'static,
Tr::KeyOwned: Hashable + Default,
Tr::Diff: Monoid+Multiply<Output = Tr::Diff>+ExchangeData,
F: Fn(&P)->Tr::KeyOwned+Clone+'static,
P: ExchangeData,
V: Clone + 'static,
VF: Fn(Tr::Val<'_>) -> V + 'static,
{
crate::operators::lookup_map(
prefixes,
arrangement,
move |p: &P, k: &mut Tr::KeyOwned| { *k = key_selector(p); },
|prefix, diff, value, _sum| ((prefix.clone(), value.into_owned()), diff.clone()),
move |prefix, diff, value, _sum| ((prefix.clone(), val_from(value)), diff.clone()),
Default::default(),
Default::default(),
Default::default(),
Expand Down
2 changes: 1 addition & 1 deletion dogsdogsdogs/src/operators/validate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub fn validate<G, K, V, Tr, F, P>(
) -> Collection<G, (P, V), Tr::Diff>
where
G: Scope<Timestamp=Tr::Time>,
Tr: TraceReader<KeyOwned=(K,V), ValOwned=()>+Clone+'static,
Tr: TraceReader<KeyOwned=(K,V)>+Clone+'static,
K: Ord+Hash+Clone+Default,
V: ExchangeData+Hash+Default,
Tr::Diff: Monoid+Multiply<Output = Tr::Diff>+ExchangeData,
Expand Down
19 changes: 1 addition & 18 deletions examples/cursors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
//! Final graph: {(2, 1): 1, (3, 2): 1, (3, 4): 1, (4, 3): 1}
//! ```

use std::fmt::Debug;
use std::collections::BTreeMap;

use timely::dataflow::operators::probe::Handle;
Expand Down Expand Up @@ -77,7 +76,6 @@ fn main() {
graph_trace.set_physical_compaction(AntichainRef::new(&[i]));
graph_trace.set_logical_compaction(AntichainRef::new(&[i]));
worker.step_while(|| probe.less_than(&i));
dump_cursor(i, worker.index(), &mut graph_trace);
}
} else {
/* Only worker 0 feeds inputs to the dataflow. */
Expand All @@ -92,13 +90,12 @@ fn main() {
graph_trace.set_physical_compaction(AntichainRef::new(&[i]));
graph_trace.set_logical_compaction(AntichainRef::new(&[i]));
worker.step_while(|| probe.less_than(graph.time()));
dump_cursor(i, worker.index(), &mut graph_trace);
}
}

/* Return trace content after the last round. */
let (mut cursor, storage) = graph_trace.cursor();
cursor.to_vec(&storage)
cursor.to_vec(Clone::clone, &storage)
})
.unwrap().join();

Expand Down Expand Up @@ -128,17 +125,3 @@ fn main() {
expected_graph_content.insert((rounds, rounds+1), 1);
assert_eq!(graph_content, expected_graph_content);
}

fn dump_cursor<Tr>(round: u32, index: usize, trace: &mut Tr)
where
Tr: TraceReader,
Tr::KeyOwned: Debug,
Tr::ValOwned: Debug,
Tr::Time: Debug,
Tr::Diff: Debug,
{
let (mut cursor, storage) = trace.cursor();
for ((k, v), diffs) in cursor.to_vec(&storage).iter() {
println!("round {}, w{} {:?}:{:?}: {:?}", round, index, *k, *v, diffs);
}
}
2 changes: 1 addition & 1 deletion examples/monoid-bfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ where G::Timestamp: Lattice+Ord {
.join_map(&edges, |_k,&(),d| *d)
.concat(&roots)
.map(|x| (x,()))
.reduce_core::<_,KeySpine<_,_,_>>("Reduce", |_key, input, output, updates| {
.reduce_core::<_,_,KeySpine<_,_,_>>("Reduce", Clone::clone, |_key, input, output, updates| {
if output.is_empty() || input[0].1 < output[0].1 {
updates.push(((), input[0].1));
}
Expand Down
4 changes: 2 additions & 2 deletions examples/spines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,11 @@ fn main() {
let data =
data.map(|x| (x.clone().into_bytes(), x.into_bytes()))
.arrange::<PreferredSpine<[u8],[u8],_,_>>()
.reduce_abelian::<_, PreferredSpine<[u8],(),_,_>>("distinct", |_,_,output| output.push(((), 1)));
.reduce_abelian::<_, _, _, PreferredSpine<[u8],(),_,_>>("distinct", |_| (), |_,_,output| output.push(((), 1)));
let keys =
keys.map(|x| (x.clone().into_bytes(), 7))
.arrange::<PreferredSpine<[u8],u8,_,_>>()
.reduce_abelian::<_, PreferredSpine<[u8],(),_,_>>("distinct", |_,_,output| output.push(((), 1)));
.reduce_abelian::<_, _, _, PreferredSpine<[u8],(),_,_>>("distinct", |_| (), |_,_,output| output.push(((), 1)));

keys.join_core(&data, |k,_v1,_v2| {
println!("{:?}", k.text);
Expand Down
2 changes: 1 addition & 1 deletion server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ path="../"
[dependencies]
rand="0.3.13"
libloading="*"
timely = { git = "https://github.com/TimelyDataflow/timely-dataflow" }
timely = { workspace = true }

#[workspace]
#members = [
Expand Down
2 changes: 1 addition & 1 deletion src/algorithms/graphs/propagate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ where
let labels =
proposals
.concat(&nodes)
.reduce_abelian::<_,ValSpine<_,_,_,_>>("Propagate", |_, s, t| t.push((s[0].0.clone(), R::from(1_i8))));
.reduce_abelian::<_,_,ValSpine<_,_,_,_>>("Propagate", |v| v.clone(), |_, s, t| t.push((s[0].0.clone(), R::from(1_i8))));

let propagate: Collection<_, (N, L), R> =
labels
Expand Down
2 changes: 1 addition & 1 deletion src/capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ pub mod source {
for message in source.by_ref() {
match message {
Message::Updates(mut updates) => {
updates_session.give_vec(&mut updates);
updates_session.give_container(&mut updates);
}
Message::Progress(progress) => {
// We must send a copy of each progress message to all workers,
Expand Down
Loading
Loading