Skip to content

Commit

Permalink
compiles
Browse files Browse the repository at this point in the history
  • Loading branch information
frankmcsherry committed Nov 29, 2023
1 parent 3182200 commit 9176261
Show file tree
Hide file tree
Showing 16 changed files with 265 additions and 228 deletions.
1 change: 1 addition & 0 deletions examples/cursors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ where
Tr::Val: Debug + Clone,
Tr::Time: Debug + Clone,
Tr::Diff: Debug + Clone,
<Tr::Cursor as Cursor>::ValOwned: Debug,
{
let (mut cursor, storage) = trace.cursor();
for ((k, v), diffs) in cursor.to_vec(&storage).iter() {
Expand Down
23 changes: 9 additions & 14 deletions src/operators/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -666,8 +666,6 @@ where
R: Semigroup,
C1: Cursor<Time=T>,
C2: Cursor<Key=C1::Key, Time=T>,
C1::Val: Ord,
C2::Val: Ord,
C1::Diff: Semigroup,
C2::Diff: Semigroup,
D: Ord+Clone+Data,
Expand All @@ -686,8 +684,6 @@ where
C1::Key: Ord+Eq,
C1: Cursor<Time=T>,
C2: Cursor<Key=C1::Key, Time=T>,
C1::Val: Ord,
C2::Val: Ord,
C1::Diff: Semigroup,
C2::Diff: Semigroup,
T: Timestamp+Lattice+Ord,
Expand All @@ -713,7 +709,10 @@ where
/// Process keys until at least `fuel` output tuples produced, or the work is exhausted.
#[inline(never)]
fn work<L, I>(&mut self, output: &mut OutputHandle<T, (D, T, R), Tee<T, (D, T, R)>>, mut logic: L, fuel: &mut usize)
where I: IntoIterator<Item=(D, T, R)>, L: FnMut(&C1::Key, &C1::Val, &C2::Val, &T, &C1::Diff, &C2::Diff)->I {
where
I: IntoIterator<Item=(D, T, R)>,
L: for<'a> FnMut(&C1::Key, C1::Val<'a>, C2::Val<'a>, &T, &C1::Diff, &C2::Diff)->I,
{

let meet = self.capability.time();

Expand Down Expand Up @@ -781,8 +780,6 @@ struct JoinThinker<'a, C1, C2>
where
C1: Cursor,
C2: Cursor<Time = C1::Time>,
C1::Val: Ord,
C2::Val: Ord,
C1::Time: Lattice+Ord+Clone,
C1::Diff: Semigroup,
C2::Diff: Semigroup,
Expand All @@ -795,8 +792,6 @@ impl<'a, C1, C2> JoinThinker<'a, C1, C2>
where
C1: Cursor,
C2: Cursor<Time = C1::Time>,
C1::Val: Ord,
C2::Val: Ord,
C1::Time: Lattice+Ord+Clone,
C1::Diff: Semigroup,
C2::Diff: Semigroup,
Expand All @@ -808,7 +803,7 @@ where
}
}

fn think<F: FnMut(&C1::Val,&C2::Val,C1::Time,&C1::Diff,&C2::Diff)>(&mut self, mut results: F) {
fn think<'b, F: FnMut(C1::Val<'a>,C2::Val<'a>,C1::Time,&C1::Diff,&C2::Diff)>(&'b mut self, mut results: F) {

// for reasonably sized edits, do the dead-simple thing.
if self.history1.edits.len() < 10 || self.history2.edits.len() < 10 {
Expand All @@ -833,15 +828,15 @@ where

if replay1.time().unwrap().cmp(&replay2.time().unwrap()) == ::std::cmp::Ordering::Less {
replay2.advance_buffer_by(replay1.meet().unwrap());
for &((ref val2, ref time2), ref diff2) in replay2.buffer().iter() {
for &((val2, ref time2), ref diff2) in replay2.buffer().iter() {
let (val1, time1, diff1) = replay1.edit().unwrap();
results(val1, val2, time1.join(time2), diff1, diff2);
}
replay1.step();
}
else {
replay1.advance_buffer_by(replay2.meet().unwrap());
for &((ref val1, ref time1), ref diff1) in replay1.buffer().iter() {
for &((val1, ref time1), ref diff1) in replay1.buffer().iter() {
let (val2, time2, diff2) = replay2.edit().unwrap();
results(val1, val2, time1.join(time2), diff1, diff2);
}
Expand All @@ -851,15 +846,15 @@ where

while !replay1.is_done() {
replay2.advance_buffer_by(replay1.meet().unwrap());
for &((ref val2, ref time2), ref diff2) in replay2.buffer().iter() {
for &((val2, ref time2), ref diff2) in replay2.buffer().iter() {
let (val1, time1, diff1) = replay1.edit().unwrap();
results(val1, val2, time1.join(time2), diff1, diff2);
}
replay1.step();
}
while !replay2.is_done() {
replay1.advance_buffer_by(replay2.meet().unwrap());
for &((ref val1, ref time1), ref diff1) in replay1.buffer().iter() {
for &((val1, ref time1), ref diff1) in replay1.buffer().iter() {
let (val2, time2, diff2) = replay2.edit().unwrap();
results(val1, val2, time1.join(time2), diff1, diff2);
}
Expand Down
22 changes: 11 additions & 11 deletions src/operators/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use trace::Cursor;

/// An accumulation of (value, time, diff) updates.
struct EditList<'a, C: Cursor> where C::Time: Sized, C::Diff: Sized {
values: Vec<(&'a C::Val, usize)>,
values: Vec<(C::Val<'a>, usize)>,
edits: Vec<(C::Time, C::Diff)>,
}

Expand Down Expand Up @@ -64,19 +64,19 @@ impl<'a, C: Cursor> EditList<'a, C> where C::Time: Ord+Clone, C::Diff: Semigroup
}
/// Associates all edits pushed since the previous `seal_value` call with `value`.
#[inline]
fn seal(&mut self, value: &'a C::Val) {
fn seal(&mut self, value: C::Val<'a>) {
let prev = self.values.last().map(|x| x.1).unwrap_or(0);
crate::consolidation::consolidate_from(&mut self.edits, prev);
if self.edits.len() > prev {
self.values.push((value, self.edits.len()));
}
}
fn map<F: FnMut(&C::Val, &C::Time, &C::Diff)>(&self, mut logic: F) {
fn map<F: FnMut(C::Val<'a>, &C::Time, &C::Diff)>(&self, mut logic: F) {
for index in 0 .. self.values.len() {
let lower = if index == 0 { 0 } else { self.values[index-1].1 };
let upper = self.values[index].1;
for edit in lower .. upper {
logic(&self.values[index].0, &self.edits[edit].0, &self.edits[edit].1);
logic(self.values[index].0, &self.edits[edit].0, &self.edits[edit].1);
}
}
}
Expand All @@ -86,12 +86,12 @@ struct ValueHistory<'storage, C: Cursor> where C::Time: Sized, C::Diff: Sized {

edits: EditList<'storage, C>,
history: Vec<(C::Time, C::Time, usize, usize)>, // (time, meet, value_index, edit_offset)
buffer: Vec<((&'storage C::Val, C::Time), C::Diff)>, // where we accumulate / collapse updates.
buffer: Vec<((C::Val<'storage>, C::Time), C::Diff)>, // where we accumulate / collapse updates.
}

impl<'storage, C: Cursor> ValueHistory<'storage, C>
where
C::Val: Ord+'storage,
// C::Val: Ord+'storage,
C::Time: Lattice+Ord+Clone,
C::Diff: Semigroup,
{
Expand Down Expand Up @@ -137,7 +137,7 @@ where
}

/// Organizes history based on current contents of edits.
fn replay<'history>(&'history mut self) -> HistoryReplay<'storage, 'history, C> {
fn replay<'history>(&'history mut self) -> HistoryReplay<'storage, 'history, C> where 'storage: 'history {

self.buffer.clear();
self.history.clear();
Expand Down Expand Up @@ -165,7 +165,7 @@ struct HistoryReplay<'storage, 'history, C>
where
'storage: 'history,
C: Cursor,
C::Val: Ord+'storage,
// C::Val: Ord+'storage,
C::Time: Lattice+Ord+Clone+'history,
C::Diff: Semigroup+'history,
{
Expand All @@ -176,17 +176,17 @@ impl<'storage, 'history, C> HistoryReplay<'storage, 'history, C>
where
'storage: 'history,
C: Cursor,
C::Val: Ord+'storage,
// C::Val: Ord+'storage,
C::Time: Lattice+Ord+Clone+'history,
C::Diff: Semigroup+'history,
{
fn time(&self) -> Option<&C::Time> { self.replay.history.last().map(|x| &x.0) }
fn meet(&self) -> Option<&C::Time> { self.replay.history.last().map(|x| &x.1) }
fn edit(&self) -> Option<(&C::Val, &C::Time, &C::Diff)> {
fn edit<'s>(&'s self) -> Option<(C::Val<'storage>, &'s C::Time, &'s C::Diff)> {
self.replay.history.last().map(|&(ref t, _, v, e)| (self.replay.edits.values[v].0, t, &self.replay.edits.edits[e].1))
}

fn buffer(&self) -> &[((&'storage C::Val, C::Time), C::Diff)] {
fn buffer(&self) -> &[((C::Val<'storage>, C::Time), C::Diff)] {
&self.replay.buffer[..]
}

Expand Down
75 changes: 38 additions & 37 deletions src/operators/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,10 +274,12 @@ pub trait ReduceCore<G: Scope, K: ToOwned + ?Sized, V: ToOwned + ?Sized, R: Semi
where
T2: Trace+TraceReader<Key=K, Time=G::Timestamp>+'static,
T2::Val: Data,
T2::Val: Ord + ToOwned<Owned = <T2::Cursor as Cursor>::ValOwned>,
<T2::Val as ToOwned>::Owned: Data,
T2::Diff: Abelian,
T2::Batch: Batch,
T2::Builder: Builder<Output=T2::Batch, Item = ((K::Owned, T2::Val), G::Timestamp, T2::Diff)>,
L: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val, T2::Diff)>)+'static,
T2::Builder: Builder<Output=T2::Batch, Item = ((K::Owned, <T2::Val as ToOwned>::Owned), T2::Time, T2::Diff)>,
L: FnMut(&K, &[(&V, R)], &mut Vec<(<T2::Cursor as Cursor>::ValOwned, T2::Diff)>)+'static,
{
self.reduce_core::<_,T2>(name, move |key, input, output, change| {
if !input.is_empty() {
Expand All @@ -297,10 +299,12 @@ pub trait ReduceCore<G: Scope, K: ToOwned + ?Sized, V: ToOwned + ?Sized, R: Semi
where
T2: Trace+TraceReader<Key=K, Time=G::Timestamp>+'static,
T2::Val: Data,
T2::Val: Ord + ToOwned<Owned = <T2::Cursor as Cursor>::ValOwned>,
<T2::Val as ToOwned>::Owned: Data,
T2::Diff: Semigroup,
T2::Batch: Batch,
T2::Builder: Builder<Output=T2::Batch, Item = ((K::Owned, T2::Val), G::Timestamp, T2::Diff)>,
L: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val,T2::Diff)>, &mut Vec<(T2::Val,T2::Diff)>)+'static
T2::Builder: Builder<Output=T2::Batch, Item = ((K::Owned, <T2::Val as ToOwned>::Owned), T2::Time, T2::Diff)>,
L: FnMut(&K, &[(&V, R)], &mut Vec<(<T2::Cursor as Cursor>::ValOwned,T2::Diff)>, &mut Vec<(<T2::Cursor as Cursor>::ValOwned, T2::Diff)>)+'static,
;
}

Expand All @@ -316,11 +320,13 @@ where
fn reduce_core<L, T2>(&self, name: &str, logic: L) -> Arranged<G, TraceAgent<T2>>
where
T2::Val: Data,
T2::Val: Ord + ToOwned<Owned = <T2::Cursor as Cursor>::ValOwned>,
<T2::Val as ToOwned>::Owned: Data,
T2::Diff: Semigroup,
T2: Trace+TraceReader<Key=K, Time=G::Timestamp>+'static,
T2::Batch: Batch,
T2::Builder: Builder<Output=T2::Batch, Item = ((K, T2::Val), G::Timestamp, T2::Diff)>,
L: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val,T2::Diff)>, &mut Vec<(T2::Val, T2::Diff)>)+'static
T2::Builder: Builder<Output=T2::Batch, Item = ((K::Owned, <T2::Val as ToOwned>::Owned), T2::Time, T2::Diff)>,
L: FnMut(&K, &[(&V, R)], &mut Vec<(<T2::Cursor as Cursor>::ValOwned,T2::Diff)>, &mut Vec<(<T2::Cursor as Cursor>::ValOwned, T2::Diff)>)+'static,
{
self.arrange_by_key_named(&format!("Arrange: {}", name))
.reduce_core(name, logic)
Expand All @@ -338,12 +344,12 @@ where
fn reduce_core<L, T2>(&self, name: &str, logic: L) -> Arranged<G, TraceAgent<T2>>
where
T2: Trace+TraceReader<Key=K, Time=G::Timestamp>+'static,
T2::Val: Ord + ToOwned,
T2::Val: Ord + ToOwned<Owned = <T2::Cursor as Cursor>::ValOwned>,
<T2::Val as ToOwned>::Owned: Data,
T2::Diff: Semigroup,
T2::Batch: Batch,
T2::Builder: Builder<Output=T2::Batch, Item = ((K::Owned, <T2::Val as ToOwned>::Owned), T2::Time, T2::Diff)>,
L: FnMut(&K, &[(&V, R)], &mut Vec<(<T2::Val as ToOwned>::Owned,T2::Diff)>, &mut Vec<(<T2::Val as ToOwned>::Owned, T2::Diff)>)+'static,
L: FnMut(&K, &[(&V, R)], &mut Vec<(<T2::Cursor as Cursor>::ValOwned,T2::Diff)>, &mut Vec<(<T2::Cursor as Cursor>::ValOwned, T2::Diff)>)+'static,
{
reduce_trace(self, name, logic)
}
Expand All @@ -359,12 +365,12 @@ where
T1::Val: Ord,
T1::Diff: Semigroup,
T2: Trace+TraceReader<Key=T1::Key, Time=G::Timestamp> + 'static,
T2::Val: Ord + ToOwned,
T2::Val: Ord + ToOwned<Owned = <T2::Cursor as Cursor>::ValOwned>,
<T2::Val as ToOwned>::Owned: Data,
T2::Diff: Semigroup,
T2::Batch: Batch,
T2::Builder: Builder<Output=T2::Batch, Item = ((<T1::Key as ToOwned>::Owned, <T2::Val as ToOwned>::Owned), T2::Time, T2::Diff)>,
L: FnMut(&T1::Key, &[(&T1::Val, T1::Diff)], &mut Vec<(<T2::Val as ToOwned>::Owned,T2::Diff)>, &mut Vec<(<T2::Val as ToOwned>::Owned, T2::Diff)>)+'static,
L: FnMut(&T1::Key, &[(&T1::Val, T1::Diff)], &mut Vec<(<T2::Cursor as Cursor>::ValOwned,T2::Diff)>, &mut Vec<(<T2::Cursor as Cursor>::ValOwned, T2::Diff)>)+'static,
{
let mut result_trace = None;

Expand Down Expand Up @@ -498,7 +504,7 @@ where
//
// TODO: It would be better if all updates went into one batch, but timely dataflow prevents
// this as long as it requires that there is only one capability for each message.
let mut buffers = Vec::<(G::Timestamp, Vec<(<T2::Val as ToOwned>::Owned, G::Timestamp, T2::Diff)>)>::new();
let mut buffers = Vec::<(G::Timestamp, Vec<(<T2::Cursor as Cursor>::ValOwned, G::Timestamp, T2::Diff)>)>::new();
let mut builders = Vec::new();
for i in 0 .. capabilities.len() {
buffers.push((capabilities[i].time().clone(), Vec::new()));
Expand Down Expand Up @@ -678,10 +684,8 @@ trait PerKeyCompute<'a, C1, C2, C3>
where
C1: Cursor,
C2: Cursor<Key = C1::Key, Time = C1::Time>,
C3: Cursor<Key = C1::Key, Val = C1::Val, Time = C1::Time, Diff = C1::Diff>,
C1::Val: Ord,
C2::Val: Ord + ToOwned,
<C2::Val as ToOwned>::Owned: Ord + Clone,
C3: Cursor<Key = C1::Key, Val<'a> = C1::Val<'a>, Time = C1::Time, Diff = C1::Diff>,
C2::ValOwned: Ord + Clone,
C1::Time: Lattice+Ord+Clone,
C1::Diff: Semigroup,
C2::Diff: Semigroup,
Expand All @@ -696,14 +700,14 @@ where
times: &mut Vec<C1::Time>,
logic: &mut L,
upper_limit: &Antichain<C1::Time>,
outputs: &mut [(C2::Time, Vec<(<C2::Val as ToOwned>::Owned, C2::Time, C2::Diff)>)],
outputs: &mut [(C2::Time, Vec<(C2::ValOwned, C2::Time, C2::Diff)>)],
new_interesting: &mut Vec<C1::Time>) -> (usize, usize)
where
C1::Key: Eq,
L: FnMut(
&C1::Key, &[(&C1::Val, C1::Diff)],
&mut Vec<(<C2::Val as ToOwned>::Owned, C2::Diff)>,
&mut Vec<(<C2::Val as ToOwned>::Owned, C2::Diff)>,
&C1::Key, &[(C1::Val<'a>, C1::Diff)],
&mut Vec<(C2::ValOwned, C2::Diff)>,
&mut Vec<(C2::ValOwned, C2::Diff)>,
);
}

Expand All @@ -727,21 +731,19 @@ mod history_replay {
where
C1: Cursor,
C2: Cursor<Key = C1::Key, Time = C1::Time>,
C3: Cursor<Key = C1::Key, Val = C1::Val, Time = C1::Time, Diff = C1::Diff>,
C1::Val: Ord,
C2::Val: Ord + ToOwned,
<C2::Val as ToOwned>::Owned: Ord + Clone,
C3: Cursor<Key = C1::Key, Val<'a> = C1::Val<'a>, Time = C1::Time, Diff = C1::Diff>,
C2::ValOwned: Ord + Clone,
C1::Time: Lattice+Ord+Clone,
C1::Diff: Semigroup,
C2::Diff: Semigroup,
{
input_history: ValueHistory<'a, C1>,
output_history: ValueHistory<'a, C2>,
batch_history: ValueHistory<'a, C3>,
input_buffer: Vec<(&'a C1::Val, C1::Diff)>,
output_buffer: Vec<(<C2::Val as ToOwned>::Owned, C2::Diff)>,
update_buffer: Vec<(<C2::Val as ToOwned>::Owned, C2::Diff)>,
output_produced: Vec<((<C2::Val as ToOwned>::Owned, C2::Time), C2::Diff)>,
input_buffer: Vec<(C1::Val<'a>, C1::Diff)>,
output_buffer: Vec<(C2::ValOwned, C2::Diff)>,
update_buffer: Vec<(C2::ValOwned, C2::Diff)>,
output_produced: Vec<((C2::ValOwned, C2::Time), C2::Diff)>,
synth_times: Vec<C1::Time>,
meets: Vec<C1::Time>,
times_current: Vec<C1::Time>,
Expand All @@ -752,10 +754,8 @@ mod history_replay {
where
C1: Cursor,
C2: Cursor<Key = C1::Key, Time = C1::Time>,
C3: Cursor<Key = C1::Key, Val = C1::Val, Time = C1::Time, Diff = C1::Diff>,
C1::Val: Ord,
C2::Val: Ord + ToOwned,
<C2::Val as ToOwned>::Owned: Ord + Clone,
C3: Cursor<Key = C1::Key, Val<'a> = C1::Val<'a>, Time = C1::Time, Diff = C1::Diff>,
C2::ValOwned: Ord + Clone,
C1::Time: Lattice+Ord+Clone,
C1::Diff: Semigroup,
C2::Diff: Semigroup,
Expand Down Expand Up @@ -785,14 +785,14 @@ mod history_replay {
times: &mut Vec<C1::Time>,
logic: &mut L,
upper_limit: &Antichain<C1::Time>,
outputs: &mut [(C2::Time, Vec<(<C2::Val as ToOwned>::Owned, C2::Time, C2::Diff)>)],
outputs: &mut [(C2::Time, Vec<(C2::ValOwned, C2::Time, C2::Diff)>)],
new_interesting: &mut Vec<C1::Time>) -> (usize, usize)
where
C1::Key: Eq,
L: FnMut(
&C1::Key, &[(&C1::Val, C1::Diff)],
&mut Vec<(<C2::Val as ToOwned>::Owned, C2::Diff)>,
&mut Vec<(<C2::Val as ToOwned>::Owned, C2::Diff)>,
&C1::Key, &[(C1::Val<'a>, C1::Diff)],
&mut Vec<(C2::ValOwned, C2::Diff)>,
&mut Vec<(C2::ValOwned, C2::Diff)>,
)
{

Expand Down Expand Up @@ -953,9 +953,10 @@ mod history_replay {
crate::consolidation::consolidate(&mut self.input_buffer);

meet.as_ref().map(|meet| output_replay.advance_buffer_by(&meet));
for &((ref value, ref time), ref diff) in output_replay.buffer().iter() {
for &((value, ref time), ref diff) in output_replay.buffer().iter() {
if time.less_equal(&next_time) {
self.output_buffer.push(((*value).to_owned(), diff.clone()));
use trace::cursor::MyTrait;
self.output_buffer.push((<_ as MyTrait>::to_owned(value), diff.clone()));
}
else {
self.temporary.push(next_time.join(time));
Expand Down
Loading

0 comments on commit 9176261

Please sign in to comment.