From 2f1378f31616099da385e09d69905ec8881d4603 Mon Sep 17 00:00:00 2001
From: zzlk <2418897+zzlk@users.noreply.github.com>
Date: Fri, 30 Jun 2023 13:16:55 -0700
Subject: [PATCH 1/3] feat: add join_multiset()
also remove documentation about HalfJoinMultiset, the way to access
that now is to use join_multiset()
---
benches/benches/reachability.rs | 2 +-
.../quickstart/example_5_reachability.mdx | 8 ++-
hydroflow/examples/example_5_reachability.rs | 10 +--
.../examples/example_6_unreachability.rs | 14 ++--
...e_codegen__covid_tracing@graphvis_dot.snap | 2 +-
...degen__covid_tracing@graphvis_mermaid.snap | 2 +-
...face_examples__example_5_reachability.snap | 34 ++++-----
...ce_examples__example_6_unreachability.snap | 58 ++++++++--------
hydroflow/tests/surface_codegen.rs | 3 +-
hydroflow_lang/src/graph/ops/join.rs | 30 ++------
hydroflow_lang/src/graph/ops/join_multiset.rs | 69 +++++++++++++++++++
hydroflow_lang/src/graph/ops/mod.rs | 1 +
12 files changed, 139 insertions(+), 94 deletions(-)
create mode 100644 hydroflow_lang/src/graph/ops/join_multiset.rs
diff --git a/benches/benches/reachability.rs b/benches/benches/reachability.rs
index 1b9b9e50e1b0..2737d74796e8 100644
--- a/benches/benches/reachability.rs
+++ b/benches/benches/reachability.rs
@@ -336,7 +336,7 @@ fn benchmark_hydroflow_surface(c: &mut Criterion) {
hydroflow_syntax! {
origin = source_iter(vec![1]);
stream_of_edges = source_iter(edges);
- reached_vertices = union() -> unique();
+ reached_vertices = union();
origin -> reached_vertices;
my_join_tee = join() -> flat_map(|(src, ((), dst))| [src, dst]) -> tee();
diff --git a/docs/docs/hydroflow/quickstart/example_5_reachability.mdx b/docs/docs/hydroflow/quickstart/example_5_reachability.mdx
index 3cd62102b232..e9834ff8c8e3 100644
--- a/docs/docs/hydroflow/quickstart/example_5_reachability.mdx
+++ b/docs/docs/hydroflow/quickstart/example_5_reachability.mdx
@@ -67,10 +67,14 @@ We route the `origin` vertex into it as one input right away:
{getLines(exampleCode, 8, 12)}
+Note the square-bracket syntax for differentiating the multiple inputs to `union()`
+is the same as that of `join()` (except that union can have an unbounded number of inputs,
+whereas `join()` is defined to only have two.)
+
Now, `join()` is defined to only have one output. In our program, we want to copy the joined
output to two places: to the original `for_each` from above to print output, and *also*
-back to the `union` operator we called `reached_vertices`. This is also the reason why `reached_vertices` must now also contain a `unique()` because without it the data would cycle endlessly in the graph.
-We feed the `join()` output through a `flat_map()` as before, and then we feed the result into a [`tee()`](../syntax/surface_ops_gen.md#tee) operator,
+back to the `union` operator we called `reached_vertices`. We feed the `join()` output
+through a `flat_map()` as before, and then we feed the result into a [`tee()`](../syntax/surface_ops_gen.md#tee) operator,
which is the mirror image of `union()`: instead of merging many inputs to one output,
it copies one input to many different outputs. Each input element is _cloned_, in Rust terms, and
given to each of the outputs. The syntax for the outputs of `tee()` mirrors that of union: we *append*
diff --git a/hydroflow/examples/example_5_reachability.rs b/hydroflow/examples/example_5_reachability.rs
index 76bd6fdc6389..749ce0a2d635 100644
--- a/hydroflow/examples/example_5_reachability.rs
+++ b/hydroflow/examples/example_5_reachability.rs
@@ -8,17 +8,17 @@ pub fn main() {
// inputs: the origin vertex (vertex 0) and stream of input edges
origin = source_iter(vec![0]);
stream_of_edges = source_stream(edges_recv);
- origin -> reached_vertices;
- reached_vertices = union() -> unique();
+ origin -> [0]reached_vertices;
+ reached_vertices = union();
// the join
reached_vertices -> map(|v| (v, ())) -> [0]my_join_tee;
stream_of_edges -> [1]my_join_tee;
- my_join_tee = join() -> flat_map(|(src, ((), dst))| [src, dst]) -> unique() -> tee();
+ my_join_tee = join() -> flat_map(|(src, ((), dst))| [src, dst]) -> tee();
// the loop and the output
- my_join_tee[0] -> reached_vertices;
- my_join_tee[1] -> for_each(|x| println!("Reached: {}", x));
+ my_join_tee[0] -> [1]reached_vertices;
+ my_join_tee[1] -> unique() -> for_each(|x| println!("Reached: {}", x));
};
println!(
diff --git a/hydroflow/examples/example_6_unreachability.rs b/hydroflow/examples/example_6_unreachability.rs
index 11884b7928d8..6fd8a2100dbf 100644
--- a/hydroflow/examples/example_6_unreachability.rs
+++ b/hydroflow/examples/example_6_unreachability.rs
@@ -7,26 +7,26 @@ pub fn main() {
let mut flow = hydroflow_syntax! {
origin = source_iter(vec![0]);
stream_of_edges = source_stream(pairs_recv) -> tee();
- reached_vertices = union() -> unique() -> tee();
- origin -> reached_vertices;
+ reached_vertices = union()->tee();
+ origin -> [0]reached_vertices;
// the join for reachable vertices
my_join = join() -> flat_map(|(src, ((), dst))| [src, dst]);
- reached_vertices -> map(|v| (v, ())) -> [0]my_join;
+ reached_vertices[0] -> map(|v| (v, ())) -> [0]my_join;
stream_of_edges[1] -> [1]my_join;
// the loop
- my_join -> reached_vertices;
+ my_join -> [1]reached_vertices;
// the difference all_vertices - reached_vertices
all_vertices = stream_of_edges[0]
-> flat_map(|(src, dst)| [src, dst]) -> tee();
unreached_vertices = difference();
- all_vertices -> [pos]unreached_vertices;
- reached_vertices -> [neg]unreached_vertices;
+ all_vertices[0] -> [pos]unreached_vertices;
+ reached_vertices[1] -> [neg]unreached_vertices;
// the output
- all_vertices -> for_each(|v| println!("Received vertex: {}", v));
+ all_vertices[1] -> unique() -> for_each(|v| println!("Received vertex: {}", v));
unreached_vertices -> for_each(|v| println!("unreached_vertices vertex: {}", v));
};
diff --git a/hydroflow/tests/snapshots/surface_codegen__covid_tracing@graphvis_dot.snap b/hydroflow/tests/snapshots/surface_codegen__covid_tracing@graphvis_dot.snap
index 01d304ef872b..59f5ab2ea69d 100644
--- a/hydroflow/tests/snapshots/surface_codegen__covid_tracing@graphvis_dot.snap
+++ b/hydroflow/tests/snapshots/surface_codegen__covid_tracing@graphvis_dot.snap
@@ -12,7 +12,7 @@ digraph {
n4v1 [label="(n4v1) source_stream(diagnosed_recv)", fontname=Monaco, shape=invhouse, style = filled, color = "#0022ff", fontcolor = "#ffffff"]
n9v1 [label="(n9v1) map(|(pid, t)| (pid, (t, t + TRANSMISSIBLE_DURATION)))", fontname=Monaco, shape=invhouse, style = filled, color = "#0022ff", fontcolor = "#ffffff"]
n3v1 [label="(n3v1) union()", fontname=Monaco, shape=invhouse, style = filled, color = "#0022ff", fontcolor = "#ffffff"]
- n5v1 [label="(n5v1) join::()", fontname=Monaco, shape=invhouse, style = filled, color = "#0022ff", fontcolor = "#ffffff"]
+ n5v1 [label="(n5v1) join()", fontname=Monaco, shape=invhouse, style = filled, color = "#0022ff", fontcolor = "#ffffff"]
n6v1 [label="(n6v1) filter(|(_pid_a, ((_pid_b, t_contact), (t_from, t_to)))| {\l (t_from..=t_to).contains(&t_contact)\l})\l", fontname=Monaco, shape=invhouse, style = filled, color = "#0022ff", fontcolor = "#ffffff"]
n7v1 [label="(n7v1) map(|(_pid_a, (pid_b_t_contact, _t_from_to))| pid_b_t_contact)", fontname=Monaco, shape=invhouse, style = filled, color = "#0022ff", fontcolor = "#ffffff"]
n8v1 [label="(n8v1) tee()", fontname=Monaco, shape=house, style = filled, color = "#ffff00"]
diff --git a/hydroflow/tests/snapshots/surface_codegen__covid_tracing@graphvis_mermaid.snap b/hydroflow/tests/snapshots/surface_codegen__covid_tracing@graphvis_mermaid.snap
index 85f4bd81fb49..13da967d776c 100644
--- a/hydroflow/tests/snapshots/surface_codegen__covid_tracing@graphvis_mermaid.snap
+++ b/hydroflow/tests/snapshots/surface_codegen__covid_tracing@graphvis_mermaid.snap
@@ -13,7 +13,7 @@ subgraph sg_1v1 ["sg_1v1 stratum 0"]
4v1[\"(4v1) source_stream(diagnosed_recv)
"/]:::pullClass
9v1[\"(9v1) map(|(pid, t)| (pid, (t, t + TRANSMISSIBLE_DURATION)))
"/]:::pullClass
3v1[\"(3v1) union()
"/]:::pullClass
- 5v1[\"(5v1) join::<HalfSetJoinState>()
"/]:::pullClass
+ 5v1[\"(5v1) join()
"/]:::pullClass
6v1[\"(6v1)
filter(|(_pid_a, ((_pid_b, t_contact), (t_from, t_to)))| {
(t_from..=t_to).contains(&t_contact)
})
"/]:::pullClass
7v1[\"(7v1) map(|(_pid_a, (pid_b_t_contact, _t_from_to))| pid_b_t_contact)
"/]:::pullClass
8v1[/"(8v1) tee()
"\]:::pushClass
diff --git a/hydroflow/tests/snapshots/surface_examples__example_5_reachability.snap b/hydroflow/tests/snapshots/surface_examples__example_5_reachability.snap
index 1fe8e6fd5baa..c80718351812 100644
--- a/hydroflow/tests/snapshots/surface_examples__example_5_reachability.snap
+++ b/hydroflow/tests/snapshots/surface_examples__example_5_reachability.snap
@@ -11,37 +11,33 @@ subgraph sg_1v1 ["sg_1v1 stratum 0"]
1v1[\"(1v1) source_iter(vec![0])
"/]:::pullClass
2v1[\"(2v1) source_stream(edges_recv)
"/]:::pullClass
3v1[\"(3v1) union()
"/]:::pullClass
- 4v1[\"(4v1) unique()
"/]:::pullClass
- 5v1[\"(5v1) map(|v| (v, ()))
"/]:::pullClass
- 6v1[\"(6v1) join()
"/]:::pullClass
- 7v1[\"(7v1) flat_map(|(src, ((), dst))| [src, dst])
"/]:::pullClass
- 8v1[\"(8v1) unique()
"/]:::pullClass
- 9v1[/"(9v1) tee()
"\]:::pushClass
- 10v1[/"(10v1) for_each(|x| println!("Reached: {}", x))
"\]:::pushClass
- 11v1["(11v1) handoff
"]:::otherClass
- 11v1--->3v1
- 1v1--->3v1
- 2v1--1--->6v1
+ 4v1[\"(4v1) map(|v| (v, ()))
"/]:::pullClass
+ 5v1[\"(5v1) join()
"/]:::pullClass
+ 6v1[\"(6v1) flat_map(|(src, ((), dst))| [src, dst])
"/]:::pullClass
+ 7v1[/"(7v1) tee()
"\]:::pushClass
+ 8v1[/"(8v1) unique()
"\]:::pushClass
+ 9v1[/"(9v1) for_each(|x| println!("Reached: {}", x))
"\]:::pushClass
+ 10v1["(10v1) handoff
"]:::otherClass
+ 10v1--1--->3v1
+ 1v1--0--->3v1
+ 2v1--1--->5v1
3v1--->4v1
- 4v1--->5v1
- 5v1--0--->6v1
+ 4v1--0--->5v1
+ 5v1--->6v1
6v1--->7v1
- 7v1--->8v1
+ 7v1--0--->10v1
+ 7v1--1--->8v1
8v1--->9v1
- 9v1--0--->11v1
- 9v1--1--->10v1
subgraph sg_1v1_var_my_join_tee ["var my_join_tee"]
+ 5v1
6v1
7v1
- 8v1
- 9v1
end
subgraph sg_1v1_var_origin ["var origin"]
1v1
end
subgraph sg_1v1_var_reached_vertices ["var reached_vertices"]
3v1
- 4v1
end
subgraph sg_1v1_var_stream_of_edges ["var stream_of_edges"]
2v1
diff --git a/hydroflow/tests/snapshots/surface_examples__example_6_unreachability.snap b/hydroflow/tests/snapshots/surface_examples__example_6_unreachability.snap
index 64f559eecb9b..b1cda51e5c06 100644
--- a/hydroflow/tests/snapshots/surface_examples__example_6_unreachability.snap
+++ b/hydroflow/tests/snapshots/surface_examples__example_6_unreachability.snap
@@ -9,24 +9,22 @@ classDef pushClass fill:#ff8,stroke:#000,text-align:left,white-space:pre
linkStyle default stroke:#aaa,stroke-width:4px,color:red,font-size:1.5em;
subgraph sg_1v1 ["sg_1v1 stratum 0"]
1v1[\"(1v1) source_iter(vec![0])
"/]:::pullClass
- 9v1[\"(9v1) map(|v| (v, ()))
"/]:::pullClass
- 7v1[\"(7v1) join()
"/]:::pullClass
- 8v1[\"(8v1) flat_map(|(src, ((), dst))| [src, dst])
"/]:::pullClass
+ 8v1[\"(8v1) map(|v| (v, ()))
"/]:::pullClass
+ 6v1[\"(6v1) join()
"/]:::pullClass
+ 7v1[\"(7v1) flat_map(|(src, ((), dst))| [src, dst])
"/]:::pullClass
4v1[\"(4v1) union()
"/]:::pullClass
- 5v1[\"(5v1) unique()
"/]:::pullClass
- 6v1[/"(6v1) tee()
"\]:::pushClass
+ 5v1[/"(5v1) tee()
"\]:::pushClass
15v1["(15v1) handoff
"]:::otherClass
- 15v1--->9v1
- 1v1--->4v1
- 9v1--0--->7v1
- 7v1--->8v1
- 8v1--->4v1
+ 15v1--->8v1
+ 1v1--0--->4v1
+ 8v1--0--->6v1
+ 6v1--->7v1
+ 7v1--1--->4v1
4v1--->5v1
- 5v1--->6v1
- 6v1--->15v1
+ 5v1--0--->15v1
subgraph sg_1v1_var_my_join ["var my_join"]
+ 6v1
7v1
- 8v1
end
subgraph sg_1v1_var_origin ["var origin"]
1v1
@@ -34,22 +32,23 @@ subgraph sg_1v1 ["sg_1v1 stratum 0"]
subgraph sg_1v1_var_reached_vertices ["var reached_vertices"]
4v1
5v1
- 6v1
end
end
subgraph sg_2v1 ["sg_2v1 stratum 0"]
2v1[\"(2v1) source_stream(pairs_recv)
"/]:::pullClass
3v1[/"(3v1) tee()
"\]:::pushClass
- 10v1[/"(10v1) flat_map(|(src, dst)| [src, dst])
"\]:::pushClass
- 11v1[/"(11v1) tee()
"\]:::pushClass
+ 9v1[/"(9v1) flat_map(|(src, dst)| [src, dst])
"\]:::pushClass
+ 10v1[/"(10v1) tee()
"\]:::pushClass
+ 12v1[/"(12v1) unique()
"\]:::pushClass
13v1[/"(13v1) for_each(|v| println!("Received vertex: {}", v))
"\]:::pushClass
2v1--->3v1
- 3v1--0--->10v1
- 10v1--->11v1
- 11v1--->13v1
+ 3v1--0--->9v1
+ 9v1--->10v1
+ 10v1--1--->12v1
+ 12v1--->13v1
subgraph sg_2v1_var_all_vertices ["var all_vertices"]
+ 9v1
10v1
- 11v1
end
subgraph sg_2v1_var_stream_of_edges ["var stream_of_edges"]
2v1
@@ -57,31 +56,28 @@ subgraph sg_2v1 ["sg_2v1 stratum 0"]
end
end
subgraph sg_3v1 ["sg_3v1 stratum 1"]
- 12v1[\"(12v1) difference()
"/]:::pullClass
+ 11v1[\"(11v1) difference()
"/]:::pullClass
14v1[/"(14v1) for_each(|v| println!("unreached_vertices vertex: {}", v))
"\]:::pushClass
- 12v1--->14v1
+ 11v1--->14v1
subgraph sg_3v1_var_unreached_vertices ["var unreached_vertices"]
- 12v1
+ 11v1
end
end
3v1--1--->16v1
-6v1--->18v1
-11v1--->17v1
+5v1--1--->18v1
+10v1--0--->17v1
16v1["(16v1) handoff
"]:::otherClass
-16v1--1--->7v1
+16v1--1--->6v1
17v1["(17v1) handoff
"]:::otherClass
-17v1--pos--->12v1
+17v1--pos--->11v1
18v1["(18v1) handoff
"]:::otherClass
-18v1==neg===o12v1
+18v1==neg===o11v1
Received vertex: 5
Received vertex: 10
Received vertex: 0
Received vertex: 3
-Received vertex: 3
-Received vertex: 6
Received vertex: 6
-Received vertex: 5
Received vertex: 11
Received vertex: 12
unreached_vertices vertex: 11
diff --git a/hydroflow/tests/surface_codegen.rs b/hydroflow/tests/surface_codegen.rs
index b6386be13dfb..0991ba258615 100644
--- a/hydroflow/tests/surface_codegen.rs
+++ b/hydroflow/tests/surface_codegen.rs
@@ -1,6 +1,5 @@
use std::collections::HashSet;
-use hydroflow::compiled::pull::HalfSetJoinState;
use hydroflow::scheduled::graph::Hydroflow;
use hydroflow::util::collect_ready;
use hydroflow::{assert_graphvis_snapshots, hydroflow_syntax};
@@ -740,7 +739,7 @@ pub fn test_covid_tracing() {
source_stream(diagnosed_recv) -> [0]exposed;
new_exposed = (
- join::() ->
+ join() ->
filter(|(_pid_a, ((_pid_b, t_contact), (t_from, t_to)))| {
(t_from..=t_to).contains(&t_contact)
}) ->
diff --git a/hydroflow_lang/src/graph/ops/join.rs b/hydroflow_lang/src/graph/ops/join.rs
index 2e739ae24e9f..7d69935dcb68 100644
--- a/hydroflow_lang/src/graph/ops/join.rs
+++ b/hydroflow_lang/src/graph/ops/join.rs
@@ -13,10 +13,10 @@ use crate::graph::{OpInstGenerics, OperatorInstance};
/// Forms the equijoin of the tuples in the input streams by their first (key) attribute. Note that the result nests the 2nd input field (values) into a tuple in the 2nd output field.
///
/// ```hydroflow
-/// // should print `(hello, (world, cleveland))`
-/// source_iter(vec![("hello", "world"), ("stay", "gold")]) -> [0]my_join;
-/// source_iter(vec![("hello", "cleveland"), ("hello", "cleveland")]) -> [1]my_join;
-/// my_join = join() -> assert([("hello", ("world", "cleveland")), ("hello", ("world", "cleveland"))]);
+/// source_iter(vec![("hello", "world"), ("stay", "gold"), ("hello", "world")]) -> [0]my_join;
+/// source_iter(vec![("hello", "cleveland")]) -> [1]my_join;
+/// my_join = join()
+/// -> assert([("hello", ("world", "cleveland"))]);
/// ```
///
/// `join` can also be provided with one or two generic lifetime persistence arguments, either
@@ -43,26 +43,6 @@ use crate::graph::{OpInstGenerics, OperatorInstance};
/// // etc.
/// ```
///
-/// Join also accepts one type argument that controls how the join state is built up. This (currently) allows switching between a SetUnion and NonSetUnion implementation.
-/// The default is HalfMultisetJoinState
-/// For example:
-/// ```hydroflow
-/// lhs = source_iter([("a", 0), ("a", 0)]) -> tee();
-/// rhs = source_iter([("a", 0)]) -> tee();
-///
-/// lhs -> [0]default_join;
-/// rhs -> [1]default_join;
-/// default_join = join() -> assert([("a", (0, 0)), ("a", (0, 0))]);
-///
-/// lhs -> [0]multiset_join;
-/// rhs -> [1]multiset_join;
-/// multiset_join = join::() -> assert([("a", (0, 0)), ("a", (0, 0))]);
-///
-/// lhs -> [0]set_join;
-/// rhs -> [1]set_join;
-/// set_join = join::() -> assert([("a", (0, 0))]);
-/// ```
-///
/// ### Examples
///
/// ```rustbook
@@ -140,7 +120,7 @@ pub const JOIN: OperatorConstraints = OperatorConstraints {
.get(0)
.map(ToTokens::to_token_stream)
.unwrap_or(quote_spanned!(op_span=>
- #root::compiled::pull::HalfMultisetJoinState
+ #root::compiled::pull::HalfSetJoinState
));
// TODO: This is really bad.
diff --git a/hydroflow_lang/src/graph/ops/join_multiset.rs b/hydroflow_lang/src/graph/ops/join_multiset.rs
new file mode 100644
index 000000000000..ac83572af9b1
--- /dev/null
+++ b/hydroflow_lang/src/graph/ops/join_multiset.rs
@@ -0,0 +1,69 @@
+use syn::{parse_quote, parse_quote_spanned};
+
+use super::{
+ FlowProperties, FlowPropertyVal, OperatorCategory, OperatorConstraints, WriteContextArgs,
+ RANGE_0, RANGE_1,
+};
+use crate::graph::{OpInstGenerics, OperatorInstance};
+
+/// > 2 input streams of type <(K, V1)> and <(K, V2)>, 1 output stream of type <(K, (V1, V2))>
+///
+/// This operator is equivalent to `join` except that the LHS and RHS are collected into multisets rather than sets before joining.
+///
+/// For example:
+/// ```hydroflow
+/// lhs = source_iter([("a", 0), ("a", 0)]) -> tee();
+/// rhs = source_iter([("a", 0)]) -> tee();
+///
+/// lhs -> [0]multiset_join;
+/// rhs -> [1]multiset_join;
+/// multiset_join = join_multiset() -> assert([("a", (0, 0)), ("a", (0, 0))]);
+///
+/// lhs -> [0]set_join;
+/// rhs -> [1]set_join;
+/// set_join = join() -> assert([("a", (0, 0))]);
+/// ```
+pub const JOIN_MULTISET: OperatorConstraints = OperatorConstraints {
+ name: "join_multiset",
+ categories: &[OperatorCategory::MultiIn],
+ hard_range_inn: &(2..=2),
+ soft_range_inn: &(2..=2),
+ hard_range_out: RANGE_1,
+ soft_range_out: RANGE_1,
+ num_args: 0,
+ persistence_args: &(0..=2),
+ type_args: RANGE_0,
+ is_external_input: false,
+ ports_inn: Some(|| super::PortListSpec::Fixed(parse_quote! { 0, 1 })),
+ ports_out: None,
+ properties: FlowProperties {
+ deterministic: FlowPropertyVal::Preserve,
+ monotonic: FlowPropertyVal::Preserve,
+ inconsistency_tainted: false,
+ },
+ input_delaytype_fn: |_| None,
+ write_fn: |wc @ &WriteContextArgs {
+ root,
+ op_span,
+ op_inst: op_inst @ OperatorInstance { .. },
+ ..
+ },
+ diagnostics| {
+ let join_type = parse_quote_spanned! {op_span=> // Uses `lat_type.span()`!
+ #root::compiled::pull::HalfMultisetJoinState
+ };
+
+ let wc = WriteContextArgs {
+ op_inst: &OperatorInstance {
+ generics: OpInstGenerics {
+ type_args: vec![join_type],
+ ..wc.op_inst.generics.clone()
+ },
+ ..op_inst.clone()
+ },
+ ..wc.clone()
+ };
+
+ (super::join::JOIN.write_fn)(&wc, diagnostics)
+ },
+};
diff --git a/hydroflow_lang/src/graph/ops/mod.rs b/hydroflow_lang/src/graph/ops/mod.rs
index 1574251cfc08..19c2110f8505 100644
--- a/hydroflow_lang/src/graph/ops/mod.rs
+++ b/hydroflow_lang/src/graph/ops/mod.rs
@@ -248,6 +248,7 @@ declare_ops![
initialize::INITIALIZE,
inspect::INSPECT,
join::JOIN,
+ join_multiset::JOIN_MULTISET,
fold_keyed::FOLD_KEYED,
reduce_keyed::REDUCE_KEYED,
lattice_batch::LATTICE_BATCH,
From c88151bae79964f76d2217b9c018fcdca7303126 Mon Sep 17 00:00:00 2001
From: zzlk <2418897+zzlk@users.noreply.github.com>
Date: Fri, 30 Jun 2023 14:54:36 -0700
Subject: [PATCH 2/3] address comments
---
hydroflow_lang/src/graph/ops/join_multiset.rs | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/hydroflow_lang/src/graph/ops/join_multiset.rs b/hydroflow_lang/src/graph/ops/join_multiset.rs
index ac83572af9b1..27793bc4149e 100644
--- a/hydroflow_lang/src/graph/ops/join_multiset.rs
+++ b/hydroflow_lang/src/graph/ops/join_multiset.rs
@@ -13,7 +13,7 @@ use crate::graph::{OpInstGenerics, OperatorInstance};
/// For example:
/// ```hydroflow
/// lhs = source_iter([("a", 0), ("a", 0)]) -> tee();
-/// rhs = source_iter([("a", 0)]) -> tee();
+/// rhs = source_iter([("a", "hydro")]) -> tee();
///
/// lhs -> [0]multiset_join;
/// rhs -> [1]multiset_join;
From b3902ebea3d4d16145d4b6e092b8825476177237 Mon Sep 17 00:00:00 2001
From: zzlk <2418897+zzlk@users.noreply.github.com>
Date: Fri, 30 Jun 2023 15:45:47 -0700
Subject: [PATCH 3/3] fix assert
---
hydroflow_lang/src/graph/ops/join_multiset.rs | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/hydroflow_lang/src/graph/ops/join_multiset.rs b/hydroflow_lang/src/graph/ops/join_multiset.rs
index 27793bc4149e..b01de28de545 100644
--- a/hydroflow_lang/src/graph/ops/join_multiset.rs
+++ b/hydroflow_lang/src/graph/ops/join_multiset.rs
@@ -17,11 +17,11 @@ use crate::graph::{OpInstGenerics, OperatorInstance};
///
/// lhs -> [0]multiset_join;
/// rhs -> [1]multiset_join;
-/// multiset_join = join_multiset() -> assert([("a", (0, 0)), ("a", (0, 0))]);
+/// multiset_join = join_multiset() -> assert([("a", (0, "hydro")), ("a", (0, "hydro"))]);
///
/// lhs -> [0]set_join;
/// rhs -> [1]set_join;
-/// set_join = join() -> assert([("a", (0, 0))]);
+/// set_join = join() -> assert([("a", (0, "hydro"))]);
/// ```
pub const JOIN_MULTISET: OperatorConstraints = OperatorConstraints {
name: "join_multiset",