Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add lattice_reduce and lattice_fold #803

Merged
merged 4 commits into from
Jun 30, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use hydroflow::hydroflow_syntax;
fn main() {
let mut df = hydroflow_syntax! {
source_iter([1,2,3,4,5])
-> lattice_merge::<'static, usize>()
-> lattice_fold::<'static, usize>()
-> for_each(|x| println!("Least upper bound: {:?}", x));
};
df.run_available();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
error[E0277]: the trait bound `usize: Merge<usize>` is not satisfied
--> tests/compile-fail/surface_lattice_merge_badgeneric.rs:6:41
--> tests/compile-fail/surface_lattice_fold_badgeneric.rs:6:41
|
6 | -> lattice_merge::<'static, usize>()
6 | -> lattice_fold::<'static, usize>()
| ^^^^^ the trait `Merge<usize>` is not implemented for `usize`
|
= help: the following other types implement trait `Merge<Other>`:
Expand All @@ -15,22 +15,22 @@ error[E0277]: the trait bound `usize: Merge<usize>` is not satisfied
<SetUnion<SetSelf> as Merge<SetUnion<SetOther>>>
and $N others
note: required by a bound in `check_inputs`
--> tests/compile-fail/surface_lattice_merge_badgeneric.rs:4:18
--> tests/compile-fail/surface_lattice_fold_badgeneric.rs:4:18
|
4 | let mut df = hydroflow_syntax! {
| __________________^
5 | | source_iter([1,2,3,4,5])
6 | | -> lattice_merge::<'static, usize>()
6 | | -> lattice_fold::<'static, usize>()
7 | | -> for_each(|x| println!("Least upper bound: {:?}", x));
8 | | };
| |_____^ required by this bound in `check_inputs`
= note: this error originates in the macro `hydroflow_syntax` (in Nightly builds, run with -Z macro-backtrace for more info)

error[E0277]: the trait bound `usize: Merge<usize>` is not satisfied
--> tests/compile-fail/surface_lattice_merge_badgeneric.rs:5:9
--> tests/compile-fail/surface_lattice_fold_badgeneric.rs:5:9
|
5 | / source_iter([1,2,3,4,5])
6 | | -> lattice_merge::<'static, usize>()
6 | | -> lattice_fold::<'static, usize>()
| |________________________________________________^ the trait `Merge<usize>` is not implemented for `usize`
|
= help: the following other types implement trait `Merge<Other>`:
Expand All @@ -45,12 +45,12 @@ error[E0277]: the trait bound `usize: Merge<usize>` is not satisfied
and $N others

error[E0277]: the trait bound `usize: Merge<usize>` is not satisfied
--> tests/compile-fail/surface_lattice_merge_badgeneric.rs:4:18
--> tests/compile-fail/surface_lattice_fold_badgeneric.rs:4:18
|
4 | let mut df = hydroflow_syntax! {
| __________________^
5 | | source_iter([1,2,3,4,5])
6 | | -> lattice_merge::<'static, usize>()
6 | | -> lattice_fold::<'static, usize>()
7 | | -> for_each(|x| println!("Least upper bound: {:?}", x));
8 | | };
| |_____^ the trait `Merge<usize>` is not implemented for `usize`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use hydroflow::hydroflow_syntax;
fn main() {
let mut df = hydroflow_syntax! {
source_iter([1,2,3,4,5])
-> lattice_merge::<'static>()
-> lattice_fold::<'static>()
-> for_each(|x| println!("Least upper bound: {:?}", x));
};
df.run_available();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
error: `lattice_fold` should have exactly 1 generic type arguments, actually has 0.
--> tests/compile-fail/surface_lattice_fold_nogeneric.rs:6:31
|
6 | -> lattice_fold::<'static>()
| ^^^^^^^
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use hydroflow::hydroflow_syntax;
fn main() {
let mut df = hydroflow_syntax! {
source_iter([1,2,3,4,5])
-> lattice_merge::<'static, hydroflow::lattices::set_union::SetUnionHashSet<u32>>()
-> lattice_fold::<'static, hydroflow::lattices::set_union::SetUnionHashSet<u32>>()
-> for_each(|x| println!("Least upper bound: {:?}", x));
};
df.run_available();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
error[E0277]: the trait bound `SetUnion<HashSet<u32>>: hydroflow::lattices::Merge<{integer}>` is not satisfied
--> tests/compile-fail/surface_lattice_fold_wronggeneric.rs:5:9
|
5 | / source_iter([1,2,3,4,5])
6 | | -> lattice_fold::<'static, hydroflow::lattices::set_union::SetUnionHashSet<u32>>()
| |______________________________________________________________________________________________^ the trait `hydroflow::lattices::Merge<{integer}>` is not implemented for `SetUnion<HashSet<u32>>`
|
= help: the trait `hydroflow::lattices::Merge<SetUnion<SetOther>>` is implemented for `SetUnion<SetSelf>`

error[E0308]: mismatched types
--> tests/compile-fail/surface_lattice_fold_wronggeneric.rs:4:18
|
4 | let mut df = hydroflow_syntax! {
| __________________^
5 | | source_iter([1,2,3,4,5])
6 | | -> lattice_fold::<'static, hydroflow::lattices::set_union::SetUnionHashSet<u32>>()
7 | | -> for_each(|x| println!("Least upper bound: {:?}", x));
8 | | };
| |_____^ expected integer, found `SetUnion<_>`
|
= note: expected type `{integer}`
found struct `SetUnion<_>`
= note: this error originates in the macro `hydroflow_syntax` (in Nightly builds, run with -Z macro-backtrace for more info)

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
use hydroflow::hydroflow_syntax;

fn main() {
let mut df = hydroflow_syntax! {
source_iter([1,2,3,4,5])
-> lattice_reduce::<'static, usize>()
-> for_each(|x| println!("Least upper bound: {:?}", x));
};
df.run_available();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
error[E0277]: the trait bound `usize: Merge<usize>` is not satisfied
--> tests/compile-fail/surface_lattice_reduce_badgeneric.rs:6:41
|
6 | -> lattice_reduce::<'static, usize>()
| ^^^^^ the trait `Merge<usize>` is not implemented for `usize`
|
= help: the following other types implement trait `Merge<Other>`:
<WithBot<Inner> as Merge<WithBot<Other>>>
<DomPair<KeySelf, ValSelf> as Merge<DomPair<KeyOther, ValOther>>>
<MapUnion<MapSelf> as Merge<MapUnion<MapOther>>>
<Max<T> as Merge<Max<T>>>
<Min<T> as Merge<Min<T>>>
<Pair<LatASelf, LatBSelf> as Merge<Pair<LatAOther, LatBOther>>>
<Point<T> as Merge<Point<O>>>
<SetUnion<SetSelf> as Merge<SetUnion<SetOther>>>
and $N others
note: required by a bound in `check_inputs`
--> tests/compile-fail/surface_lattice_reduce_badgeneric.rs:4:18
|
4 | let mut df = hydroflow_syntax! {
| __________________^
5 | | source_iter([1,2,3,4,5])
6 | | -> lattice_reduce::<'static, usize>()
7 | | -> for_each(|x| println!("Least upper bound: {:?}", x));
8 | | };
| |_____^ required by this bound in `check_inputs`
= note: this error originates in the macro `hydroflow_syntax` (in Nightly builds, run with -Z macro-backtrace for more info)

error[E0277]: the trait bound `usize: Merge<usize>` is not satisfied
--> tests/compile-fail/surface_lattice_reduce_badgeneric.rs:5:9
|
5 | / source_iter([1,2,3,4,5])
6 | | -> lattice_reduce::<'static, usize>()
| |________________________________________________^ the trait `Merge<usize>` is not implemented for `usize`
|
= help: the following other types implement trait `Merge<Other>`:
<WithBot<Inner> as Merge<WithBot<Other>>>
<DomPair<KeySelf, ValSelf> as Merge<DomPair<KeyOther, ValOther>>>
<MapUnion<MapSelf> as Merge<MapUnion<MapOther>>>
<Max<T> as Merge<Max<T>>>
<Min<T> as Merge<Min<T>>>
<Pair<LatASelf, LatBSelf> as Merge<Pair<LatAOther, LatBOther>>>
<Point<T> as Merge<Point<O>>>
<SetUnion<SetSelf> as Merge<SetUnion<SetOther>>>
and $N others

error[E0277]: the trait bound `usize: Merge<usize>` is not satisfied
--> tests/compile-fail/surface_lattice_reduce_badgeneric.rs:4:18
|
4 | let mut df = hydroflow_syntax! {
| __________________^
5 | | source_iter([1,2,3,4,5])
6 | | -> lattice_reduce::<'static, usize>()
7 | | -> for_each(|x| println!("Least upper bound: {:?}", x));
8 | | };
| |_____^ the trait `Merge<usize>` is not implemented for `usize`
|
= help: the following other types implement trait `Merge<Other>`:
<WithBot<Inner> as Merge<WithBot<Other>>>
<DomPair<KeySelf, ValSelf> as Merge<DomPair<KeyOther, ValOther>>>
<MapUnion<MapSelf> as Merge<MapUnion<MapOther>>>
<Max<T> as Merge<Max<T>>>
<Min<T> as Merge<Min<T>>>
<Pair<LatASelf, LatBSelf> as Merge<Pair<LatAOther, LatBOther>>>
<Point<T> as Merge<Point<O>>>
<SetUnion<SetSelf> as Merge<SetUnion<SetOther>>>
and $N others
= note: this error originates in the macro `hydroflow_syntax` (in Nightly builds, run with -Z macro-backtrace for more info)
10 changes: 10 additions & 0 deletions hydroflow/tests/compile-fail/surface_lattice_reduce_nogeneric.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
use hydroflow::hydroflow_syntax;

fn main() {
let mut df = hydroflow_syntax! {
source_iter([1,2,3,4,5])
-> lattice_reduce::<'static>()
-> for_each(|x| println!("Least upper bound: {:?}", x));
};
df.run_available();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
error: `lattice_reduce` should have exactly 1 generic type arguments, actually has 0.
--> tests/compile-fail/surface_lattice_reduce_nogeneric.rs:6:33
|
6 | -> lattice_reduce::<'static>()
| ^^^^^^^
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
use hydroflow::hydroflow_syntax;

fn main() {
let mut df = hydroflow_syntax! {
source_iter([1,2,3,4,5])
-> lattice_reduce::<'static, hydroflow::lattices::set_union::SetUnionHashSet<u32>>()
-> for_each(|x| println!("Least upper bound: {:?}", x));
};
df.run_available();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
error[E0271]: expected `Drain<'_, {integer}>` to be an iterator that yields `SetUnion<HashSet<u32>>`, but it yields `{integer}`
--> tests/compile-fail/surface_lattice_reduce_wronggeneric.rs:5:9
|
5 | source_iter([1,2,3,4,5])
| ^^^^^^^^^^^^^^^^^^^^^^^^ expected `SetUnion<HashSet<u32>>`, found integer
6 | -> lattice_reduce::<'static, hydroflow::lattices::set_union::SetUnionHashSet<u32>>()
| ---------------------------------------------------- required by a bound introduced by this call
|
= note: expected struct `SetUnion<HashSet<u32>>`
found type `{integer}`
note: required by a bound in `check_inputs`
--> tests/compile-fail/surface_lattice_reduce_wronggeneric.rs:6:42
|
6 | -> lattice_reduce::<'static, hydroflow::lattices::set_union::SetUnionHashSet<u32>>()
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ required by this bound in `check_inputs`
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
error[E0271]: expected `Drain<'_, {integer}>` to be an iterator that yields `SetUnion<HashSet<u32>>`, but it yields `{integer}`
--> tests/compile-fail/surface_lattice_merge_wronggeneric.rs:5:9
--> tests/compile-fail/surface_lattice_reduce_wronggeneric.rs:5:9
|
5 | source_iter([1,2,3,4,5])
| ^^^^^^^^^^^^^^^^^^^^^^^^ expected `SetUnion<HashSet<u32>>`, found integer
6 | -> lattice_merge::<'static, hydroflow::lattices::set_union::SetUnionHashSet<u32>>()
6 | -> lattice_reduce::<'static, hydroflow::lattices::set_union::SetUnionHashSet<u32>>()
| ---------------------------------------------------- required by a bound introduced by this call
|
= note: expected struct `SetUnion<HashSet<u32>>`
found type `{integer}`
note: required by a bound in `check_inputs`
--> tests/compile-fail/surface_lattice_merge_wronggeneric.rs:6:41
--> tests/compile-fail/surface_lattice_reduce_wronggeneric.rs:6:41
|
6 | -> lattice_merge::<'static, hydroflow::lattices::set_union::SetUnionHashSet<u32>>()
6 | -> lattice_reduce::<'static, hydroflow::lattices::set_union::SetUnionHashSet<u32>>()
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ required by this bound in `check_inputs`
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ fn test_basic() {
let mut df = hydroflow_syntax! {
source_iter([1,2,3,4,5])
-> map(Max::new)
-> lattice_merge::<'static, Max<u32>>()
-> lattice_fold::<'static, Max<u32>>()
-> for_each(|x: Max<u32>| println!("Least upper bound: {:?}", x));
};
df.run_available();
Expand Down
13 changes: 13 additions & 0 deletions hydroflow/tests/surface_lattice_reduce.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
use hydroflow::hydroflow_syntax;
use hydroflow::lattices::Max;

#[test]
fn test_basic() {
let mut df = hydroflow_syntax! {
source_iter([1,2,3,4,5])
-> map(Max::new)
-> lattice_reduce::<'static, Max<u32>>()
-> for_each(|x: Max<u32>| println!("Least upper bound: {:?}", x));
};
df.run_available();
}
79 changes: 79 additions & 0 deletions hydroflow_lang/src/graph/ops/lattice_fold.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
use syn::parse_quote_spanned;
use syn::spanned::Spanned;

use super::{
DelayType, FlowProperties, FlowPropertyVal, OpInstGenerics, OperatorCategory,
OperatorConstraints, OperatorInstance, WriteContextArgs, RANGE_1,
};

/// > 1 input stream, 1 output stream
///
/// > Generic parameters: A `Lattice` type, must implement [`Merge<Self>`](https://hydro-project.github.io/hydroflow/doc/lattices/trait.Merge.html)
/// type.
///
/// A specialized operator for merging lattices together into a accumulated value. Like [`fold()`](#fold)
/// but specialized for lattice types. `lattice_fold::<MyLattice>()` is equivalent to `fold(MyLattice::default(), hydroflow::lattices::Merge::merge_owned)`.
///
/// `lattice_fold` can also be provided with one generic lifetime persistence argument, either
/// `'tick` or `'static`, to specify how data persists. With `'tick`, values will only be collected
/// within the same tick. With `'static`, values will be remembered across ticks and will be
/// aggregated with pairs arriving in later ticks. When not explicitly specified persistence
/// defaults to `'static`.
///
/// `lattice_fold` is differentiated from `lattice_reduce` in that `lattice_fold` can accumulate into a different type from its input.
/// But it also means that the accumulating type must have a sensible default value.
zzlk marked this conversation as resolved.
Show resolved Hide resolved
///
/// ```hydroflow
/// source_iter([hydroflow::lattices::set_union::SetUnionSingletonSet::new_from(7)])
/// -> lattice_fold::<'static, hydroflow::lattices::set_union::SetUnionHashSet<usize>>()
/// -> assert([hydroflow::lattices::set_union::SetUnionHashSet::new_from([7])]);
/// ```
pub const LATTICE_FOLD: OperatorConstraints = OperatorConstraints {
name: "lattice_fold",
categories: &[OperatorCategory::LatticeFold],
hard_range_inn: RANGE_1,
soft_range_inn: RANGE_1,
hard_range_out: RANGE_1,
soft_range_out: RANGE_1,
num_args: 0,
persistence_args: &(0..=1),
type_args: RANGE_1,
is_external_input: false,
ports_inn: None,
ports_out: None,
properties: FlowProperties {
deterministic: FlowPropertyVal::Preserve,
monotonic: FlowPropertyVal::Yes,
inconsistency_tainted: false,
},
input_delaytype_fn: |_| Some(DelayType::Stratum),
write_fn: |wc @ &WriteContextArgs {
root,
is_pull,
op_inst:
op_inst @ OperatorInstance {
generics: OpInstGenerics { type_args, .. },
..
},
..
},
diagnostics| {
assert!(is_pull);

let lat_type = &type_args[0];

let arguments = parse_quote_spanned! {lat_type.span()=> // Uses `lat_type.span()`!
<#lat_type>::default(), #root::lattices::Merge::merge_owned
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

<#lat_type as ::std::default::Default>::default()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, I wonder what is the difference in this case?

};

let wc = WriteContextArgs {
op_inst: &OperatorInstance {
arguments,
..op_inst.clone()
},
..wc.clone()
};

(super::fold::FOLD.write_fn)(&wc, diagnostics)
},
};
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,25 @@ use crate::graph::ops::OperatorWriteOutput;
/// type.
///
/// A specialized operator for merging lattices together into a accumulated value. Like [`reduce()`](#reduce)
/// but specialized for lattice types. `lattice_merge::<MyLattice>()` is equivalent to `reduce(hydroflow::lattices::Merge::merge_owned)`.
/// but specialized for lattice types. `lattice_reduce::<MyLattice>()` is equivalent to `reduce(hydroflow::lattices::Merge::merge_owned)`.
///
/// `lattice_merge` can also be provided with one generic lifetime persistence argument, either
/// `lattice_reduce` can also be provided with one generic lifetime persistence argument, either
/// `'tick` or `'static`, to specify how data persists. With `'tick`, values will only be collected
/// within the same tick. With `'static`, values will be remembered across ticks and will be
/// aggregated with pairs arriving in later ticks. When not explicitly specified persistence
/// defaults to `'static`.
///
/// `lattice_reduce` is differentiated from `lattice_fold` in that `lattice_reduce` the accumulating type does not need to have a sensible default.
zzlk marked this conversation as resolved.
Show resolved Hide resolved
/// But it also means that the accumulating function inputs and the accumulating type must be the same.
///
/// ```hydroflow
/// source_iter([1,2,3,4,5])
/// -> map(hydroflow::lattices::Max::new)
/// -> lattice_merge::<'static, hydroflow::lattices::Max<usize>>()
/// -> lattice_reduce::<'static, hydroflow::lattices::Max<usize>>()
/// -> assert([hydroflow::lattices::Max::new(5)]);
/// ```
pub const LATTICE_MERGE: OperatorConstraints = OperatorConstraints {
name: "lattice_merge",
pub const LATTICE_REDUCE: OperatorConstraints = OperatorConstraints {
name: "lattice_reduce",
categories: &[OperatorCategory::LatticeFold],
hard_range_inn: RANGE_1,
soft_range_inn: RANGE_1,
Expand Down
Loading
Loading