From 047e4aaf4bfe3e9605234c4cb1077204b7fffce6 Mon Sep 17 00:00:00 2001 From: zzlk <2418897+zzlk@users.noreply.github.com> Date: Fri, 30 Jun 2023 16:07:20 -0700 Subject: [PATCH] feat: add lattice_reduce and lattice_fold (#803) * feat: add lattice_reduce and lattice_fold * address comments * simplify lattice fold a bit * address comments --- ...surface_lattice_fold_badgeneric.rs.ignore} | 2 +- ...=> surface_lattice_fold_badgeneric.stderr} | 16 ++-- ...c.rs => surface_lattice_fold_nogeneric.rs} | 2 +- .../surface_lattice_fold_nogeneric.stderr | 5 ++ ...s => surface_lattice_fold_wronggeneric.rs} | 2 +- .../surface_lattice_fold_wronggeneric.stderr | 23 ++++++ .../surface_lattice_merge_nogeneric.stderr | 5 -- ...urface_lattice_reduce_badgeneric.rs.ignore | 10 +++ .../surface_lattice_reduce_badgeneric.stderr | 68 ++++++++++++++++ .../surface_lattice_reduce_nogeneric.rs | 10 +++ .../surface_lattice_reduce_nogeneric.stderr | 5 ++ .../surface_lattice_reduce_wronggeneric.rs | 10 +++ ...surface_lattice_reduce_wronggeneric.stderr | 15 ++++ ...surface_latticereduce_wronggeneric.stderr} | 8 +- ...ttice_merge.rs => surface_lattice_fold.rs} | 2 +- hydroflow/tests/surface_lattice_reduce.rs | 13 +++ hydroflow_lang/src/graph/ops/lattice_fold.rs | 79 +++++++++++++++++++ .../{lattice_merge.rs => lattice_reduce.rs} | 13 +-- hydroflow_lang/src/graph/ops/mod.rs | 3 +- 19 files changed, 264 insertions(+), 27 deletions(-) rename hydroflow/tests/compile-fail/{surface_lattice_merge_badgeneric.rs.ignore => surface_lattice_fold_badgeneric.rs.ignore} (81%) rename hydroflow/tests/compile-fail/{surface_lattice_merge_badgeneric.stderr => surface_lattice_fold_badgeneric.stderr} (85%) rename hydroflow/tests/compile-fail/{surface_lattice_merge_nogeneric.rs => surface_lattice_fold_nogeneric.rs} (83%) create mode 100644 hydroflow/tests/compile-fail/surface_lattice_fold_nogeneric.stderr rename hydroflow/tests/compile-fail/{surface_lattice_merge_wronggeneric.rs => surface_lattice_fold_wronggeneric.rs} (69%) create mode 100644 hydroflow/tests/compile-fail/surface_lattice_fold_wronggeneric.stderr delete mode 100644 hydroflow/tests/compile-fail/surface_lattice_merge_nogeneric.stderr create mode 100644 hydroflow/tests/compile-fail/surface_lattice_reduce_badgeneric.rs.ignore create mode 100644 hydroflow/tests/compile-fail/surface_lattice_reduce_badgeneric.stderr create mode 100644 hydroflow/tests/compile-fail/surface_lattice_reduce_nogeneric.rs create mode 100644 hydroflow/tests/compile-fail/surface_lattice_reduce_nogeneric.stderr create mode 100644 hydroflow/tests/compile-fail/surface_lattice_reduce_wronggeneric.rs create mode 100644 hydroflow/tests/compile-fail/surface_lattice_reduce_wronggeneric.stderr rename hydroflow/tests/compile-fail/{surface_lattice_merge_wronggeneric.stderr => surface_latticereduce_wronggeneric.stderr} (66%) rename hydroflow/tests/{surface_lattice_merge.rs => surface_lattice_fold.rs} (85%) create mode 100644 hydroflow/tests/surface_lattice_reduce.rs create mode 100644 hydroflow_lang/src/graph/ops/lattice_fold.rs rename hydroflow_lang/src/graph/ops/{lattice_merge.rs => lattice_reduce.rs} (84%) diff --git a/hydroflow/tests/compile-fail/surface_lattice_merge_badgeneric.rs.ignore b/hydroflow/tests/compile-fail/surface_lattice_fold_badgeneric.rs.ignore similarity index 81% rename from hydroflow/tests/compile-fail/surface_lattice_merge_badgeneric.rs.ignore rename to hydroflow/tests/compile-fail/surface_lattice_fold_badgeneric.rs.ignore index 638cdd2b244e..051e7f6f6a71 100644 --- a/hydroflow/tests/compile-fail/surface_lattice_merge_badgeneric.rs.ignore +++ b/hydroflow/tests/compile-fail/surface_lattice_fold_badgeneric.rs.ignore @@ -3,7 +3,7 @@ use hydroflow::hydroflow_syntax; fn main() { let mut df = hydroflow_syntax! { source_iter([1,2,3,4,5]) - -> lattice_merge::<'static, usize>() + -> lattice_fold::<'static, usize>() -> for_each(|x| println!("Least upper bound: {:?}", x)); }; df.run_available(); diff --git a/hydroflow/tests/compile-fail/surface_lattice_merge_badgeneric.stderr b/hydroflow/tests/compile-fail/surface_lattice_fold_badgeneric.stderr similarity index 85% rename from hydroflow/tests/compile-fail/surface_lattice_merge_badgeneric.stderr rename to hydroflow/tests/compile-fail/surface_lattice_fold_badgeneric.stderr index f082da77b67f..a92cdebecbb7 100644 --- a/hydroflow/tests/compile-fail/surface_lattice_merge_badgeneric.stderr +++ b/hydroflow/tests/compile-fail/surface_lattice_fold_badgeneric.stderr @@ -1,7 +1,7 @@ error[E0277]: the trait bound `usize: Merge` is not satisfied - --> tests/compile-fail/surface_lattice_merge_badgeneric.rs:6:41 + --> tests/compile-fail/surface_lattice_fold_badgeneric.rs:6:41 | -6 | -> lattice_merge::<'static, usize>() +6 | -> lattice_fold::<'static, usize>() | ^^^^^ the trait `Merge` is not implemented for `usize` | = help: the following other types implement trait `Merge`: @@ -15,22 +15,22 @@ error[E0277]: the trait bound `usize: Merge` is not satisfied as Merge>> and $N others note: required by a bound in `check_inputs` - --> tests/compile-fail/surface_lattice_merge_badgeneric.rs:4:18 + --> tests/compile-fail/surface_lattice_fold_badgeneric.rs:4:18 | 4 | let mut df = hydroflow_syntax! { | __________________^ 5 | | source_iter([1,2,3,4,5]) -6 | | -> lattice_merge::<'static, usize>() +6 | | -> lattice_fold::<'static, usize>() 7 | | -> for_each(|x| println!("Least upper bound: {:?}", x)); 8 | | }; | |_____^ required by this bound in `check_inputs` = note: this error originates in the macro `hydroflow_syntax` (in Nightly builds, run with -Z macro-backtrace for more info) error[E0277]: the trait bound `usize: Merge` is not satisfied - --> tests/compile-fail/surface_lattice_merge_badgeneric.rs:5:9 + --> tests/compile-fail/surface_lattice_fold_badgeneric.rs:5:9 | 5 | / source_iter([1,2,3,4,5]) -6 | | -> lattice_merge::<'static, usize>() +6 | | -> lattice_fold::<'static, usize>() | |________________________________________________^ the trait `Merge` is not implemented for `usize` | = help: the following other types implement trait `Merge`: @@ -45,12 +45,12 @@ error[E0277]: the trait bound `usize: Merge` is not satisfied and $N others error[E0277]: the trait bound `usize: Merge` is not satisfied - --> tests/compile-fail/surface_lattice_merge_badgeneric.rs:4:18 + --> tests/compile-fail/surface_lattice_fold_badgeneric.rs:4:18 | 4 | let mut df = hydroflow_syntax! { | __________________^ 5 | | source_iter([1,2,3,4,5]) -6 | | -> lattice_merge::<'static, usize>() +6 | | -> lattice_fold::<'static, usize>() 7 | | -> for_each(|x| println!("Least upper bound: {:?}", x)); 8 | | }; | |_____^ the trait `Merge` is not implemented for `usize` diff --git a/hydroflow/tests/compile-fail/surface_lattice_merge_nogeneric.rs b/hydroflow/tests/compile-fail/surface_lattice_fold_nogeneric.rs similarity index 83% rename from hydroflow/tests/compile-fail/surface_lattice_merge_nogeneric.rs rename to hydroflow/tests/compile-fail/surface_lattice_fold_nogeneric.rs index 0f249cd30483..e5954bbcd183 100644 --- a/hydroflow/tests/compile-fail/surface_lattice_merge_nogeneric.rs +++ b/hydroflow/tests/compile-fail/surface_lattice_fold_nogeneric.rs @@ -3,7 +3,7 @@ use hydroflow::hydroflow_syntax; fn main() { let mut df = hydroflow_syntax! { source_iter([1,2,3,4,5]) - -> lattice_merge::<'static>() + -> lattice_fold::<'static>() -> for_each(|x| println!("Least upper bound: {:?}", x)); }; df.run_available(); diff --git a/hydroflow/tests/compile-fail/surface_lattice_fold_nogeneric.stderr b/hydroflow/tests/compile-fail/surface_lattice_fold_nogeneric.stderr new file mode 100644 index 000000000000..636a8c3e9cb5 --- /dev/null +++ b/hydroflow/tests/compile-fail/surface_lattice_fold_nogeneric.stderr @@ -0,0 +1,5 @@ +error: `lattice_fold` should have exactly 1 generic type arguments, actually has 0. + --> tests/compile-fail/surface_lattice_fold_nogeneric.rs:6:31 + | +6 | -> lattice_fold::<'static>() + | ^^^^^^^ diff --git a/hydroflow/tests/compile-fail/surface_lattice_merge_wronggeneric.rs b/hydroflow/tests/compile-fail/surface_lattice_fold_wronggeneric.rs similarity index 69% rename from hydroflow/tests/compile-fail/surface_lattice_merge_wronggeneric.rs rename to hydroflow/tests/compile-fail/surface_lattice_fold_wronggeneric.rs index 1edc48f08e54..87e82dfaa97d 100644 --- a/hydroflow/tests/compile-fail/surface_lattice_merge_wronggeneric.rs +++ b/hydroflow/tests/compile-fail/surface_lattice_fold_wronggeneric.rs @@ -3,7 +3,7 @@ use hydroflow::hydroflow_syntax; fn main() { let mut df = hydroflow_syntax! { source_iter([1,2,3,4,5]) - -> lattice_merge::<'static, hydroflow::lattices::set_union::SetUnionHashSet>() + -> lattice_fold::<'static, hydroflow::lattices::set_union::SetUnionHashSet>() -> for_each(|x| println!("Least upper bound: {:?}", x)); }; df.run_available(); diff --git a/hydroflow/tests/compile-fail/surface_lattice_fold_wronggeneric.stderr b/hydroflow/tests/compile-fail/surface_lattice_fold_wronggeneric.stderr new file mode 100644 index 000000000000..0bebcccbdcf1 --- /dev/null +++ b/hydroflow/tests/compile-fail/surface_lattice_fold_wronggeneric.stderr @@ -0,0 +1,23 @@ +error[E0277]: the trait bound `SetUnion>: hydroflow::lattices::Merge<{integer}>` is not satisfied + --> tests/compile-fail/surface_lattice_fold_wronggeneric.rs:5:9 + | +5 | / source_iter([1,2,3,4,5]) +6 | | -> lattice_fold::<'static, hydroflow::lattices::set_union::SetUnionHashSet>() + | |______________________________________________________________________________________________^ the trait `hydroflow::lattices::Merge<{integer}>` is not implemented for `SetUnion>` + | + = help: the trait `hydroflow::lattices::Merge>` is implemented for `SetUnion` + +error[E0308]: mismatched types + --> tests/compile-fail/surface_lattice_fold_wronggeneric.rs:4:18 + | +4 | let mut df = hydroflow_syntax! { + | __________________^ +5 | | source_iter([1,2,3,4,5]) +6 | | -> lattice_fold::<'static, hydroflow::lattices::set_union::SetUnionHashSet>() +7 | | -> for_each(|x| println!("Least upper bound: {:?}", x)); +8 | | }; + | |_____^ expected integer, found `SetUnion<_>` + | + = note: expected type `{integer}` + found struct `SetUnion<_>` + = note: this error originates in the macro `hydroflow_syntax` (in Nightly builds, run with -Z macro-backtrace for more info) diff --git a/hydroflow/tests/compile-fail/surface_lattice_merge_nogeneric.stderr b/hydroflow/tests/compile-fail/surface_lattice_merge_nogeneric.stderr deleted file mode 100644 index fb0b00b138ce..000000000000 --- a/hydroflow/tests/compile-fail/surface_lattice_merge_nogeneric.stderr +++ /dev/null @@ -1,5 +0,0 @@ -error: `lattice_merge` should have exactly 1 generic type arguments, actually has 0. - --> tests/compile-fail/surface_lattice_merge_nogeneric.rs:6:32 - | -6 | -> lattice_merge::<'static>() - | ^^^^^^^ diff --git a/hydroflow/tests/compile-fail/surface_lattice_reduce_badgeneric.rs.ignore b/hydroflow/tests/compile-fail/surface_lattice_reduce_badgeneric.rs.ignore new file mode 100644 index 000000000000..1fad32e06857 --- /dev/null +++ b/hydroflow/tests/compile-fail/surface_lattice_reduce_badgeneric.rs.ignore @@ -0,0 +1,10 @@ +use hydroflow::hydroflow_syntax; + +fn main() { + let mut df = hydroflow_syntax! { + source_iter([1,2,3,4,5]) + -> lattice_reduce::<'static, usize>() + -> for_each(|x| println!("Least upper bound: {:?}", x)); + }; + df.run_available(); +} diff --git a/hydroflow/tests/compile-fail/surface_lattice_reduce_badgeneric.stderr b/hydroflow/tests/compile-fail/surface_lattice_reduce_badgeneric.stderr new file mode 100644 index 000000000000..29bb4627a538 --- /dev/null +++ b/hydroflow/tests/compile-fail/surface_lattice_reduce_badgeneric.stderr @@ -0,0 +1,68 @@ +error[E0277]: the trait bound `usize: Merge` is not satisfied + --> tests/compile-fail/surface_lattice_reduce_badgeneric.rs:6:41 + | +6 | -> lattice_reduce::<'static, usize>() + | ^^^^^ the trait `Merge` is not implemented for `usize` + | + = help: the following other types implement trait `Merge`: + as Merge>> + as Merge>> + as Merge>> + as Merge>> + as Merge>> + as Merge>> + as Merge>> + as Merge>> + and $N others +note: required by a bound in `check_inputs` + --> tests/compile-fail/surface_lattice_reduce_badgeneric.rs:4:18 + | +4 | let mut df = hydroflow_syntax! { + | __________________^ +5 | | source_iter([1,2,3,4,5]) +6 | | -> lattice_reduce::<'static, usize>() +7 | | -> for_each(|x| println!("Least upper bound: {:?}", x)); +8 | | }; + | |_____^ required by this bound in `check_inputs` + = note: this error originates in the macro `hydroflow_syntax` (in Nightly builds, run with -Z macro-backtrace for more info) + +error[E0277]: the trait bound `usize: Merge` is not satisfied + --> tests/compile-fail/surface_lattice_reduce_badgeneric.rs:5:9 + | +5 | / source_iter([1,2,3,4,5]) +6 | | -> lattice_reduce::<'static, usize>() + | |________________________________________________^ the trait `Merge` is not implemented for `usize` + | + = help: the following other types implement trait `Merge`: + as Merge>> + as Merge>> + as Merge>> + as Merge>> + as Merge>> + as Merge>> + as Merge>> + as Merge>> + and $N others + +error[E0277]: the trait bound `usize: Merge` is not satisfied + --> tests/compile-fail/surface_lattice_reduce_badgeneric.rs:4:18 + | +4 | let mut df = hydroflow_syntax! { + | __________________^ +5 | | source_iter([1,2,3,4,5]) +6 | | -> lattice_reduce::<'static, usize>() +7 | | -> for_each(|x| println!("Least upper bound: {:?}", x)); +8 | | }; + | |_____^ the trait `Merge` is not implemented for `usize` + | + = help: the following other types implement trait `Merge`: + as Merge>> + as Merge>> + as Merge>> + as Merge>> + as Merge>> + as Merge>> + as Merge>> + as Merge>> + and $N others + = note: this error originates in the macro `hydroflow_syntax` (in Nightly builds, run with -Z macro-backtrace for more info) diff --git a/hydroflow/tests/compile-fail/surface_lattice_reduce_nogeneric.rs b/hydroflow/tests/compile-fail/surface_lattice_reduce_nogeneric.rs new file mode 100644 index 000000000000..63fdfdeaa7a0 --- /dev/null +++ b/hydroflow/tests/compile-fail/surface_lattice_reduce_nogeneric.rs @@ -0,0 +1,10 @@ +use hydroflow::hydroflow_syntax; + +fn main() { + let mut df = hydroflow_syntax! { + source_iter([1,2,3,4,5]) + -> lattice_reduce::<'static>() + -> for_each(|x| println!("Least upper bound: {:?}", x)); + }; + df.run_available(); +} diff --git a/hydroflow/tests/compile-fail/surface_lattice_reduce_nogeneric.stderr b/hydroflow/tests/compile-fail/surface_lattice_reduce_nogeneric.stderr new file mode 100644 index 000000000000..2e10a7fb1253 --- /dev/null +++ b/hydroflow/tests/compile-fail/surface_lattice_reduce_nogeneric.stderr @@ -0,0 +1,5 @@ +error: `lattice_reduce` should have exactly 1 generic type arguments, actually has 0. + --> tests/compile-fail/surface_lattice_reduce_nogeneric.rs:6:33 + | +6 | -> lattice_reduce::<'static>() + | ^^^^^^^ diff --git a/hydroflow/tests/compile-fail/surface_lattice_reduce_wronggeneric.rs b/hydroflow/tests/compile-fail/surface_lattice_reduce_wronggeneric.rs new file mode 100644 index 000000000000..08306055ea8b --- /dev/null +++ b/hydroflow/tests/compile-fail/surface_lattice_reduce_wronggeneric.rs @@ -0,0 +1,10 @@ +use hydroflow::hydroflow_syntax; + +fn main() { + let mut df = hydroflow_syntax! { + source_iter([1,2,3,4,5]) + -> lattice_reduce::<'static, hydroflow::lattices::set_union::SetUnionHashSet>() + -> for_each(|x| println!("Least upper bound: {:?}", x)); + }; + df.run_available(); +} diff --git a/hydroflow/tests/compile-fail/surface_lattice_reduce_wronggeneric.stderr b/hydroflow/tests/compile-fail/surface_lattice_reduce_wronggeneric.stderr new file mode 100644 index 000000000000..34a5df9bcd8f --- /dev/null +++ b/hydroflow/tests/compile-fail/surface_lattice_reduce_wronggeneric.stderr @@ -0,0 +1,15 @@ +error[E0271]: expected `Drain<'_, {integer}>` to be an iterator that yields `SetUnion>`, but it yields `{integer}` + --> tests/compile-fail/surface_lattice_reduce_wronggeneric.rs:5:9 + | +5 | source_iter([1,2,3,4,5]) + | ^^^^^^^^^^^^^^^^^^^^^^^^ expected `SetUnion>`, found integer +6 | -> lattice_reduce::<'static, hydroflow::lattices::set_union::SetUnionHashSet>() + | ---------------------------------------------------- required by a bound introduced by this call + | + = note: expected struct `SetUnion>` + found type `{integer}` +note: required by a bound in `check_inputs` + --> tests/compile-fail/surface_lattice_reduce_wronggeneric.rs:6:42 + | +6 | -> lattice_reduce::<'static, hydroflow::lattices::set_union::SetUnionHashSet>() + | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ required by this bound in `check_inputs` diff --git a/hydroflow/tests/compile-fail/surface_lattice_merge_wronggeneric.stderr b/hydroflow/tests/compile-fail/surface_latticereduce_wronggeneric.stderr similarity index 66% rename from hydroflow/tests/compile-fail/surface_lattice_merge_wronggeneric.stderr rename to hydroflow/tests/compile-fail/surface_latticereduce_wronggeneric.stderr index fed054512c1c..714a5d8013ff 100644 --- a/hydroflow/tests/compile-fail/surface_lattice_merge_wronggeneric.stderr +++ b/hydroflow/tests/compile-fail/surface_latticereduce_wronggeneric.stderr @@ -1,15 +1,15 @@ error[E0271]: expected `Drain<'_, {integer}>` to be an iterator that yields `SetUnion>`, but it yields `{integer}` - --> tests/compile-fail/surface_lattice_merge_wronggeneric.rs:5:9 + --> tests/compile-fail/surface_lattice_reduce_wronggeneric.rs:5:9 | 5 | source_iter([1,2,3,4,5]) | ^^^^^^^^^^^^^^^^^^^^^^^^ expected `SetUnion>`, found integer -6 | -> lattice_merge::<'static, hydroflow::lattices::set_union::SetUnionHashSet>() +6 | -> lattice_reduce::<'static, hydroflow::lattices::set_union::SetUnionHashSet>() | ---------------------------------------------------- required by a bound introduced by this call | = note: expected struct `SetUnion>` found type `{integer}` note: required by a bound in `check_inputs` - --> tests/compile-fail/surface_lattice_merge_wronggeneric.rs:6:41 + --> tests/compile-fail/surface_lattice_reduce_wronggeneric.rs:6:41 | -6 | -> lattice_merge::<'static, hydroflow::lattices::set_union::SetUnionHashSet>() +6 | -> lattice_reduce::<'static, hydroflow::lattices::set_union::SetUnionHashSet>() | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ required by this bound in `check_inputs` diff --git a/hydroflow/tests/surface_lattice_merge.rs b/hydroflow/tests/surface_lattice_fold.rs similarity index 85% rename from hydroflow/tests/surface_lattice_merge.rs rename to hydroflow/tests/surface_lattice_fold.rs index a307c1dc57f8..d7fd6ced5b39 100644 --- a/hydroflow/tests/surface_lattice_merge.rs +++ b/hydroflow/tests/surface_lattice_fold.rs @@ -6,7 +6,7 @@ fn test_basic() { let mut df = hydroflow_syntax! { source_iter([1,2,3,4,5]) -> map(Max::new) - -> lattice_merge::<'static, Max>() + -> lattice_fold::<'static, Max>() -> for_each(|x: Max| println!("Least upper bound: {:?}", x)); }; df.run_available(); diff --git a/hydroflow/tests/surface_lattice_reduce.rs b/hydroflow/tests/surface_lattice_reduce.rs new file mode 100644 index 000000000000..70fd4e9d312e --- /dev/null +++ b/hydroflow/tests/surface_lattice_reduce.rs @@ -0,0 +1,13 @@ +use hydroflow::hydroflow_syntax; +use hydroflow::lattices::Max; + +#[test] +fn test_basic() { + let mut df = hydroflow_syntax! { + source_iter([1,2,3,4,5]) + -> map(Max::new) + -> lattice_reduce::<'static, Max>() + -> for_each(|x: Max| println!("Least upper bound: {:?}", x)); + }; + df.run_available(); +} diff --git a/hydroflow_lang/src/graph/ops/lattice_fold.rs b/hydroflow_lang/src/graph/ops/lattice_fold.rs new file mode 100644 index 000000000000..272bef6aa0cb --- /dev/null +++ b/hydroflow_lang/src/graph/ops/lattice_fold.rs @@ -0,0 +1,79 @@ +use syn::parse_quote_spanned; +use syn::spanned::Spanned; + +use super::{ + DelayType, FlowProperties, FlowPropertyVal, OpInstGenerics, OperatorCategory, + OperatorConstraints, OperatorInstance, WriteContextArgs, RANGE_1, +}; + +/// > 1 input stream, 1 output stream +/// +/// > Generic parameters: A `Lattice` type, must implement [`Merge`](https://hydro-project.github.io/hydroflow/doc/lattices/trait.Merge.html) +/// type. +/// +/// A specialized operator for merging lattices together into a accumulated value. Like [`fold()`](#fold) +/// but specialized for lattice types. `lattice_fold::()` is equivalent to `fold(MyLattice::default(), hydroflow::lattices::Merge::merge_owned)`. +/// +/// `lattice_fold` can also be provided with one generic lifetime persistence argument, either +/// `'tick` or `'static`, to specify how data persists. With `'tick`, values will only be collected +/// within the same tick. With `'static`, values will be remembered across ticks and will be +/// aggregated with pairs arriving in later ticks. When not explicitly specified persistence +/// defaults to `'static`. +/// +/// `lattice_fold` is differentiated from `lattice_reduce` in that `lattice_fold` can accumulate into a different type from its input. +/// But it also means that the accumulating type must implement `Default` +/// +/// ```hydroflow +/// source_iter([hydroflow::lattices::set_union::SetUnionSingletonSet::new_from(7)]) +/// -> lattice_fold::<'static, hydroflow::lattices::set_union::SetUnionHashSet>() +/// -> assert([hydroflow::lattices::set_union::SetUnionHashSet::new_from([7])]); +/// ``` +pub const LATTICE_FOLD: OperatorConstraints = OperatorConstraints { + name: "lattice_fold", + categories: &[OperatorCategory::LatticeFold], + hard_range_inn: RANGE_1, + soft_range_inn: RANGE_1, + hard_range_out: RANGE_1, + soft_range_out: RANGE_1, + num_args: 0, + persistence_args: &(0..=1), + type_args: RANGE_1, + is_external_input: false, + ports_inn: None, + ports_out: None, + properties: FlowProperties { + deterministic: FlowPropertyVal::Preserve, + monotonic: FlowPropertyVal::Yes, + inconsistency_tainted: false, + }, + input_delaytype_fn: |_| Some(DelayType::Stratum), + write_fn: |wc @ &WriteContextArgs { + root, + is_pull, + op_inst: + op_inst @ OperatorInstance { + generics: OpInstGenerics { type_args, .. }, + .. + }, + .. + }, + diagnostics| { + assert!(is_pull); + + let lat_type = &type_args[0]; + + let arguments = parse_quote_spanned! {lat_type.span()=> // Uses `lat_type.span()`! + <#lat_type as ::std::default::Default>::default(), #root::lattices::Merge::merge_owned + }; + + let wc = WriteContextArgs { + op_inst: &OperatorInstance { + arguments, + ..op_inst.clone() + }, + ..wc.clone() + }; + + (super::fold::FOLD.write_fn)(&wc, diagnostics) + }, +}; diff --git a/hydroflow_lang/src/graph/ops/lattice_merge.rs b/hydroflow_lang/src/graph/ops/lattice_reduce.rs similarity index 84% rename from hydroflow_lang/src/graph/ops/lattice_merge.rs rename to hydroflow_lang/src/graph/ops/lattice_reduce.rs index 02b30954ccf4..49cc6da026f7 100644 --- a/hydroflow_lang/src/graph/ops/lattice_merge.rs +++ b/hydroflow_lang/src/graph/ops/lattice_reduce.rs @@ -14,22 +14,25 @@ use crate::graph::ops::OperatorWriteOutput; /// type. /// /// A specialized operator for merging lattices together into a accumulated value. Like [`reduce()`](#reduce) -/// but specialized for lattice types. `lattice_merge::()` is equivalent to `reduce(hydroflow::lattices::Merge::merge_owned)`. +/// but specialized for lattice types. `lattice_reduce::()` is equivalent to `reduce(hydroflow::lattices::Merge::merge_owned)`. /// -/// `lattice_merge` can also be provided with one generic lifetime persistence argument, either +/// `lattice_reduce` can also be provided with one generic lifetime persistence argument, either /// `'tick` or `'static`, to specify how data persists. With `'tick`, values will only be collected /// within the same tick. With `'static`, values will be remembered across ticks and will be /// aggregated with pairs arriving in later ticks. When not explicitly specified persistence /// defaults to `'static`. /// +/// `lattice_reduce` is differentiated from `lattice_fold` in that `lattice_reduce` the accumulating type does not need to implement `Default`. +/// But it also means that the accumulating function inputs and the accumulating type must be the same. +/// /// ```hydroflow /// source_iter([1,2,3,4,5]) /// -> map(hydroflow::lattices::Max::new) -/// -> lattice_merge::<'static, hydroflow::lattices::Max>() +/// -> lattice_reduce::<'static, hydroflow::lattices::Max>() /// -> assert([hydroflow::lattices::Max::new(5)]); /// ``` -pub const LATTICE_MERGE: OperatorConstraints = OperatorConstraints { - name: "lattice_merge", +pub const LATTICE_REDUCE: OperatorConstraints = OperatorConstraints { + name: "lattice_reduce", categories: &[OperatorCategory::LatticeFold], hard_range_inn: RANGE_1, soft_range_inn: RANGE_1, diff --git a/hydroflow_lang/src/graph/ops/mod.rs b/hydroflow_lang/src/graph/ops/mod.rs index 19c2110f8505..59497d290b75 100644 --- a/hydroflow_lang/src/graph/ops/mod.rs +++ b/hydroflow_lang/src/graph/ops/mod.rs @@ -252,8 +252,9 @@ declare_ops![ fold_keyed::FOLD_KEYED, reduce_keyed::REDUCE_KEYED, lattice_batch::LATTICE_BATCH, + lattice_fold::LATTICE_FOLD, lattice_join::LATTICE_JOIN, - lattice_merge::LATTICE_MERGE, + lattice_reduce::LATTICE_REDUCE, map::MAP, union::UNION, multiset_delta::MULTISET_DELTA,