Skip to content

Commit

Permalink
feat: add lattice_reduce and lattice_fold (hydro-project#803)
Browse files Browse the repository at this point in the history
* feat: add lattice_reduce and lattice_fold

* address comments

* simplify lattice fold a bit

* address comments
  • Loading branch information
zzlk authored Jun 30, 2023
1 parent 69abd97 commit 047e4aa
Show file tree
Hide file tree
Showing 19 changed files with 264 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use hydroflow::hydroflow_syntax;
fn main() {
let mut df = hydroflow_syntax! {
source_iter([1,2,3,4,5])
-> lattice_merge::<'static, usize>()
-> lattice_fold::<'static, usize>()
-> for_each(|x| println!("Least upper bound: {:?}", x));
};
df.run_available();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
error[E0277]: the trait bound `usize: Merge<usize>` is not satisfied
--> tests/compile-fail/surface_lattice_merge_badgeneric.rs:6:41
--> tests/compile-fail/surface_lattice_fold_badgeneric.rs:6:41
|
6 | -> lattice_merge::<'static, usize>()
6 | -> lattice_fold::<'static, usize>()
| ^^^^^ the trait `Merge<usize>` is not implemented for `usize`
|
= help: the following other types implement trait `Merge<Other>`:
Expand All @@ -15,22 +15,22 @@ error[E0277]: the trait bound `usize: Merge<usize>` is not satisfied
<SetUnion<SetSelf> as Merge<SetUnion<SetOther>>>
and $N others
note: required by a bound in `check_inputs`
--> tests/compile-fail/surface_lattice_merge_badgeneric.rs:4:18
--> tests/compile-fail/surface_lattice_fold_badgeneric.rs:4:18
|
4 | let mut df = hydroflow_syntax! {
| __________________^
5 | | source_iter([1,2,3,4,5])
6 | | -> lattice_merge::<'static, usize>()
6 | | -> lattice_fold::<'static, usize>()
7 | | -> for_each(|x| println!("Least upper bound: {:?}", x));
8 | | };
| |_____^ required by this bound in `check_inputs`
= note: this error originates in the macro `hydroflow_syntax` (in Nightly builds, run with -Z macro-backtrace for more info)

error[E0277]: the trait bound `usize: Merge<usize>` is not satisfied
--> tests/compile-fail/surface_lattice_merge_badgeneric.rs:5:9
--> tests/compile-fail/surface_lattice_fold_badgeneric.rs:5:9
|
5 | / source_iter([1,2,3,4,5])
6 | | -> lattice_merge::<'static, usize>()
6 | | -> lattice_fold::<'static, usize>()
| |________________________________________________^ the trait `Merge<usize>` is not implemented for `usize`
|
= help: the following other types implement trait `Merge<Other>`:
Expand All @@ -45,12 +45,12 @@ error[E0277]: the trait bound `usize: Merge<usize>` is not satisfied
and $N others

error[E0277]: the trait bound `usize: Merge<usize>` is not satisfied
--> tests/compile-fail/surface_lattice_merge_badgeneric.rs:4:18
--> tests/compile-fail/surface_lattice_fold_badgeneric.rs:4:18
|
4 | let mut df = hydroflow_syntax! {
| __________________^
5 | | source_iter([1,2,3,4,5])
6 | | -> lattice_merge::<'static, usize>()
6 | | -> lattice_fold::<'static, usize>()
7 | | -> for_each(|x| println!("Least upper bound: {:?}", x));
8 | | };
| |_____^ the trait `Merge<usize>` is not implemented for `usize`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use hydroflow::hydroflow_syntax;
fn main() {
let mut df = hydroflow_syntax! {
source_iter([1,2,3,4,5])
-> lattice_merge::<'static>()
-> lattice_fold::<'static>()
-> for_each(|x| println!("Least upper bound: {:?}", x));
};
df.run_available();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
error: `lattice_fold` should have exactly 1 generic type arguments, actually has 0.
--> tests/compile-fail/surface_lattice_fold_nogeneric.rs:6:31
|
6 | -> lattice_fold::<'static>()
| ^^^^^^^
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use hydroflow::hydroflow_syntax;
fn main() {
let mut df = hydroflow_syntax! {
source_iter([1,2,3,4,5])
-> lattice_merge::<'static, hydroflow::lattices::set_union::SetUnionHashSet<u32>>()
-> lattice_fold::<'static, hydroflow::lattices::set_union::SetUnionHashSet<u32>>()
-> for_each(|x| println!("Least upper bound: {:?}", x));
};
df.run_available();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
error[E0277]: the trait bound `SetUnion<HashSet<u32>>: hydroflow::lattices::Merge<{integer}>` is not satisfied
--> tests/compile-fail/surface_lattice_fold_wronggeneric.rs:5:9
|
5 | / source_iter([1,2,3,4,5])
6 | | -> lattice_fold::<'static, hydroflow::lattices::set_union::SetUnionHashSet<u32>>()
| |______________________________________________________________________________________________^ the trait `hydroflow::lattices::Merge<{integer}>` is not implemented for `SetUnion<HashSet<u32>>`
|
= help: the trait `hydroflow::lattices::Merge<SetUnion<SetOther>>` is implemented for `SetUnion<SetSelf>`

error[E0308]: mismatched types
--> tests/compile-fail/surface_lattice_fold_wronggeneric.rs:4:18
|
4 | let mut df = hydroflow_syntax! {
| __________________^
5 | | source_iter([1,2,3,4,5])
6 | | -> lattice_fold::<'static, hydroflow::lattices::set_union::SetUnionHashSet<u32>>()
7 | | -> for_each(|x| println!("Least upper bound: {:?}", x));
8 | | };
| |_____^ expected integer, found `SetUnion<_>`
|
= note: expected type `{integer}`
found struct `SetUnion<_>`
= note: this error originates in the macro `hydroflow_syntax` (in Nightly builds, run with -Z macro-backtrace for more info)

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
use hydroflow::hydroflow_syntax;

fn main() {
let mut df = hydroflow_syntax! {
source_iter([1,2,3,4,5])
-> lattice_reduce::<'static, usize>()
-> for_each(|x| println!("Least upper bound: {:?}", x));
};
df.run_available();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
error[E0277]: the trait bound `usize: Merge<usize>` is not satisfied
--> tests/compile-fail/surface_lattice_reduce_badgeneric.rs:6:41
|
6 | -> lattice_reduce::<'static, usize>()
| ^^^^^ the trait `Merge<usize>` is not implemented for `usize`
|
= help: the following other types implement trait `Merge<Other>`:
<WithBot<Inner> as Merge<WithBot<Other>>>
<DomPair<KeySelf, ValSelf> as Merge<DomPair<KeyOther, ValOther>>>
<MapUnion<MapSelf> as Merge<MapUnion<MapOther>>>
<Max<T> as Merge<Max<T>>>
<Min<T> as Merge<Min<T>>>
<Pair<LatASelf, LatBSelf> as Merge<Pair<LatAOther, LatBOther>>>
<Point<T> as Merge<Point<O>>>
<SetUnion<SetSelf> as Merge<SetUnion<SetOther>>>
and $N others
note: required by a bound in `check_inputs`
--> tests/compile-fail/surface_lattice_reduce_badgeneric.rs:4:18
|
4 | let mut df = hydroflow_syntax! {
| __________________^
5 | | source_iter([1,2,3,4,5])
6 | | -> lattice_reduce::<'static, usize>()
7 | | -> for_each(|x| println!("Least upper bound: {:?}", x));
8 | | };
| |_____^ required by this bound in `check_inputs`
= note: this error originates in the macro `hydroflow_syntax` (in Nightly builds, run with -Z macro-backtrace for more info)

error[E0277]: the trait bound `usize: Merge<usize>` is not satisfied
--> tests/compile-fail/surface_lattice_reduce_badgeneric.rs:5:9
|
5 | / source_iter([1,2,3,4,5])
6 | | -> lattice_reduce::<'static, usize>()
| |________________________________________________^ the trait `Merge<usize>` is not implemented for `usize`
|
= help: the following other types implement trait `Merge<Other>`:
<WithBot<Inner> as Merge<WithBot<Other>>>
<DomPair<KeySelf, ValSelf> as Merge<DomPair<KeyOther, ValOther>>>
<MapUnion<MapSelf> as Merge<MapUnion<MapOther>>>
<Max<T> as Merge<Max<T>>>
<Min<T> as Merge<Min<T>>>
<Pair<LatASelf, LatBSelf> as Merge<Pair<LatAOther, LatBOther>>>
<Point<T> as Merge<Point<O>>>
<SetUnion<SetSelf> as Merge<SetUnion<SetOther>>>
and $N others

error[E0277]: the trait bound `usize: Merge<usize>` is not satisfied
--> tests/compile-fail/surface_lattice_reduce_badgeneric.rs:4:18
|
4 | let mut df = hydroflow_syntax! {
| __________________^
5 | | source_iter([1,2,3,4,5])
6 | | -> lattice_reduce::<'static, usize>()
7 | | -> for_each(|x| println!("Least upper bound: {:?}", x));
8 | | };
| |_____^ the trait `Merge<usize>` is not implemented for `usize`
|
= help: the following other types implement trait `Merge<Other>`:
<WithBot<Inner> as Merge<WithBot<Other>>>
<DomPair<KeySelf, ValSelf> as Merge<DomPair<KeyOther, ValOther>>>
<MapUnion<MapSelf> as Merge<MapUnion<MapOther>>>
<Max<T> as Merge<Max<T>>>
<Min<T> as Merge<Min<T>>>
<Pair<LatASelf, LatBSelf> as Merge<Pair<LatAOther, LatBOther>>>
<Point<T> as Merge<Point<O>>>
<SetUnion<SetSelf> as Merge<SetUnion<SetOther>>>
and $N others
= note: this error originates in the macro `hydroflow_syntax` (in Nightly builds, run with -Z macro-backtrace for more info)
10 changes: 10 additions & 0 deletions hydroflow/tests/compile-fail/surface_lattice_reduce_nogeneric.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
use hydroflow::hydroflow_syntax;

fn main() {
let mut df = hydroflow_syntax! {
source_iter([1,2,3,4,5])
-> lattice_reduce::<'static>()
-> for_each(|x| println!("Least upper bound: {:?}", x));
};
df.run_available();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
error: `lattice_reduce` should have exactly 1 generic type arguments, actually has 0.
--> tests/compile-fail/surface_lattice_reduce_nogeneric.rs:6:33
|
6 | -> lattice_reduce::<'static>()
| ^^^^^^^
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
use hydroflow::hydroflow_syntax;

fn main() {
let mut df = hydroflow_syntax! {
source_iter([1,2,3,4,5])
-> lattice_reduce::<'static, hydroflow::lattices::set_union::SetUnionHashSet<u32>>()
-> for_each(|x| println!("Least upper bound: {:?}", x));
};
df.run_available();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
error[E0271]: expected `Drain<'_, {integer}>` to be an iterator that yields `SetUnion<HashSet<u32>>`, but it yields `{integer}`
--> tests/compile-fail/surface_lattice_reduce_wronggeneric.rs:5:9
|
5 | source_iter([1,2,3,4,5])
| ^^^^^^^^^^^^^^^^^^^^^^^^ expected `SetUnion<HashSet<u32>>`, found integer
6 | -> lattice_reduce::<'static, hydroflow::lattices::set_union::SetUnionHashSet<u32>>()
| ---------------------------------------------------- required by a bound introduced by this call
|
= note: expected struct `SetUnion<HashSet<u32>>`
found type `{integer}`
note: required by a bound in `check_inputs`
--> tests/compile-fail/surface_lattice_reduce_wronggeneric.rs:6:42
|
6 | -> lattice_reduce::<'static, hydroflow::lattices::set_union::SetUnionHashSet<u32>>()
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ required by this bound in `check_inputs`
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
error[E0271]: expected `Drain<'_, {integer}>` to be an iterator that yields `SetUnion<HashSet<u32>>`, but it yields `{integer}`
--> tests/compile-fail/surface_lattice_merge_wronggeneric.rs:5:9
--> tests/compile-fail/surface_lattice_reduce_wronggeneric.rs:5:9
|
5 | source_iter([1,2,3,4,5])
| ^^^^^^^^^^^^^^^^^^^^^^^^ expected `SetUnion<HashSet<u32>>`, found integer
6 | -> lattice_merge::<'static, hydroflow::lattices::set_union::SetUnionHashSet<u32>>()
6 | -> lattice_reduce::<'static, hydroflow::lattices::set_union::SetUnionHashSet<u32>>()
| ---------------------------------------------------- required by a bound introduced by this call
|
= note: expected struct `SetUnion<HashSet<u32>>`
found type `{integer}`
note: required by a bound in `check_inputs`
--> tests/compile-fail/surface_lattice_merge_wronggeneric.rs:6:41
--> tests/compile-fail/surface_lattice_reduce_wronggeneric.rs:6:41
|
6 | -> lattice_merge::<'static, hydroflow::lattices::set_union::SetUnionHashSet<u32>>()
6 | -> lattice_reduce::<'static, hydroflow::lattices::set_union::SetUnionHashSet<u32>>()
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ required by this bound in `check_inputs`
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ fn test_basic() {
let mut df = hydroflow_syntax! {
source_iter([1,2,3,4,5])
-> map(Max::new)
-> lattice_merge::<'static, Max<u32>>()
-> lattice_fold::<'static, Max<u32>>()
-> for_each(|x: Max<u32>| println!("Least upper bound: {:?}", x));
};
df.run_available();
Expand Down
13 changes: 13 additions & 0 deletions hydroflow/tests/surface_lattice_reduce.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
use hydroflow::hydroflow_syntax;
use hydroflow::lattices::Max;

#[test]
fn test_basic() {
let mut df = hydroflow_syntax! {
source_iter([1,2,3,4,5])
-> map(Max::new)
-> lattice_reduce::<'static, Max<u32>>()
-> for_each(|x: Max<u32>| println!("Least upper bound: {:?}", x));
};
df.run_available();
}
79 changes: 79 additions & 0 deletions hydroflow_lang/src/graph/ops/lattice_fold.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
use syn::parse_quote_spanned;
use syn::spanned::Spanned;

use super::{
DelayType, FlowProperties, FlowPropertyVal, OpInstGenerics, OperatorCategory,
OperatorConstraints, OperatorInstance, WriteContextArgs, RANGE_1,
};

/// > 1 input stream, 1 output stream
///
/// > Generic parameters: A `Lattice` type, must implement [`Merge<Self>`](https://hydro-project.github.io/hydroflow/doc/lattices/trait.Merge.html)
/// type.
///
/// A specialized operator for merging lattices together into a accumulated value. Like [`fold()`](#fold)
/// but specialized for lattice types. `lattice_fold::<MyLattice>()` is equivalent to `fold(MyLattice::default(), hydroflow::lattices::Merge::merge_owned)`.
///
/// `lattice_fold` can also be provided with one generic lifetime persistence argument, either
/// `'tick` or `'static`, to specify how data persists. With `'tick`, values will only be collected
/// within the same tick. With `'static`, values will be remembered across ticks and will be
/// aggregated with pairs arriving in later ticks. When not explicitly specified persistence
/// defaults to `'static`.
///
/// `lattice_fold` is differentiated from `lattice_reduce` in that `lattice_fold` can accumulate into a different type from its input.
/// But it also means that the accumulating type must implement `Default`
///
/// ```hydroflow
/// source_iter([hydroflow::lattices::set_union::SetUnionSingletonSet::new_from(7)])
/// -> lattice_fold::<'static, hydroflow::lattices::set_union::SetUnionHashSet<usize>>()
/// -> assert([hydroflow::lattices::set_union::SetUnionHashSet::new_from([7])]);
/// ```
pub const LATTICE_FOLD: OperatorConstraints = OperatorConstraints {
name: "lattice_fold",
categories: &[OperatorCategory::LatticeFold],
hard_range_inn: RANGE_1,
soft_range_inn: RANGE_1,
hard_range_out: RANGE_1,
soft_range_out: RANGE_1,
num_args: 0,
persistence_args: &(0..=1),
type_args: RANGE_1,
is_external_input: false,
ports_inn: None,
ports_out: None,
properties: FlowProperties {
deterministic: FlowPropertyVal::Preserve,
monotonic: FlowPropertyVal::Yes,
inconsistency_tainted: false,
},
input_delaytype_fn: |_| Some(DelayType::Stratum),
write_fn: |wc @ &WriteContextArgs {
root,
is_pull,
op_inst:
op_inst @ OperatorInstance {
generics: OpInstGenerics { type_args, .. },
..
},
..
},
diagnostics| {
assert!(is_pull);

let lat_type = &type_args[0];

let arguments = parse_quote_spanned! {lat_type.span()=> // Uses `lat_type.span()`!
<#lat_type as ::std::default::Default>::default(), #root::lattices::Merge::merge_owned
};

let wc = WriteContextArgs {
op_inst: &OperatorInstance {
arguments,
..op_inst.clone()
},
..wc.clone()
};

(super::fold::FOLD.write_fn)(&wc, diagnostics)
},
};
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,25 @@ use crate::graph::ops::OperatorWriteOutput;
/// type.
///
/// A specialized operator for merging lattices together into a accumulated value. Like [`reduce()`](#reduce)
/// but specialized for lattice types. `lattice_merge::<MyLattice>()` is equivalent to `reduce(hydroflow::lattices::Merge::merge_owned)`.
/// but specialized for lattice types. `lattice_reduce::<MyLattice>()` is equivalent to `reduce(hydroflow::lattices::Merge::merge_owned)`.
///
/// `lattice_merge` can also be provided with one generic lifetime persistence argument, either
/// `lattice_reduce` can also be provided with one generic lifetime persistence argument, either
/// `'tick` or `'static`, to specify how data persists. With `'tick`, values will only be collected
/// within the same tick. With `'static`, values will be remembered across ticks and will be
/// aggregated with pairs arriving in later ticks. When not explicitly specified persistence
/// defaults to `'static`.
///
/// `lattice_reduce` is differentiated from `lattice_fold` in that `lattice_reduce` the accumulating type does not need to implement `Default`.
/// But it also means that the accumulating function inputs and the accumulating type must be the same.
///
/// ```hydroflow
/// source_iter([1,2,3,4,5])
/// -> map(hydroflow::lattices::Max::new)
/// -> lattice_merge::<'static, hydroflow::lattices::Max<usize>>()
/// -> lattice_reduce::<'static, hydroflow::lattices::Max<usize>>()
/// -> assert([hydroflow::lattices::Max::new(5)]);
/// ```
pub const LATTICE_MERGE: OperatorConstraints = OperatorConstraints {
name: "lattice_merge",
pub const LATTICE_REDUCE: OperatorConstraints = OperatorConstraints {
name: "lattice_reduce",
categories: &[OperatorCategory::LatticeFold],
hard_range_inn: RANGE_1,
soft_range_inn: RANGE_1,
Expand Down
Loading

0 comments on commit 047e4aa

Please sign in to comment.