Skip to content

Commit

Permalink
docs: improve multiset_delta() docs
Browse files Browse the repository at this point in the history
  • Loading branch information
MingweiSamuel committed Dec 22, 2023
1 parent 7e65a08 commit 845c2a5
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 26 deletions.
36 changes: 24 additions & 12 deletions hydroflow/tests/surface_multiset_delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ use multiplatform_test::multiplatform_test;

#[multiplatform_test]
pub fn test_multiset_delta() {
let (input_send, input_recv) = hydroflow::util::unbounded_channel::<u32>();
let (result_send, mut result_recv) = hydroflow::util::unbounded_channel::<u32>();
let (input_send, input_recv) = hydroflow::util::unbounded_channel::<char>();
let (result_send, mut result_recv) = hydroflow::util::unbounded_channel::<char>();

let mut flow = hydroflow_syntax! {
source_stream(input_recv)
Expand All @@ -14,19 +14,31 @@ pub fn test_multiset_delta() {
};
assert_graphvis_snapshots!(flow);

input_send.send(3).unwrap();
input_send.send(4).unwrap();
input_send.send(3).unwrap();
input_send.send('a').unwrap();
input_send.send('b').unwrap();
input_send.send('a').unwrap();
flow.run_tick();
assert_eq!(&[3, 4, 3], &*collect_ready::<Vec<_>, _>(&mut result_recv));
// 'a', 'b', 'a'
assert_eq!(&['a', 'b', 'a'], &*collect_ready::<Vec<_>, _>(&mut result_recv));

input_send.send(3).unwrap();
input_send.send(5).unwrap();
input_send.send(3).unwrap();
input_send.send(3).unwrap();
input_send.send('a').unwrap();
input_send.send('c').unwrap();
input_send.send('a').unwrap();
input_send.send('a').unwrap();
flow.run_tick();
// 'c', 'a'
// First two 'a's are removed due to previous tick.
assert_eq!(&['c', 'a'], &*collect_ready::<Vec<_>, _>(&mut result_recv));

input_send.send('b').unwrap();
input_send.send('c').unwrap();
input_send.send('a').unwrap();
input_send.send('a').unwrap();
input_send.send('a').unwrap();
input_send.send('a').unwrap();
flow.run_tick();
// First two "3"s are removed due to previous tick.
assert_eq!(&[5, 3], &*collect_ready::<Vec<_>, _>(&mut result_recv));
// 3 'a's and the 'c' are removed due to previous tick.
assert_eq!(&['b', 'a'], &*collect_ready::<Vec<_>, _>(&mut result_recv));
}

#[multiplatform_test]
Expand Down
50 changes: 37 additions & 13 deletions hydroflow_lang/src/graph/ops/multiset_delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,30 +5,54 @@ use super::{
WriteContextArgs, RANGE_0, RANGE_1,
};

// TODO(mingwei): more doc
/// Multiset delta from the previous tick.
/// The multiset inverse of [`persist()`](#persist).
///
/// > 1 input stream of `T`, 1 output stream of `T`, where `T: Eq + Hash`
///
/// For set semantics, [`unique()`](#unique) can be thought of as a "delta" operator, the inverse
/// of [`persist()`](#persist). In `persist`, new items come in, and all items are repeatedly
/// released out. Conversely, `unique` take repeated items in, and only releases the new ones out.
///
/// This operator does a similar inversion but for multiset semantics, with some caveats. When it
/// receives duplicate items, instead of ignoring them, it "subtracts" them from the items received
/// in the previous tick: i.e. if we received `k` copies of an item in the previous tick, and we
/// receive `l > k` copies in the current tick, we output `l - k` copies of the item.
/// However unlike `unique`, this count is only maintained for the previous tick, not over all time.
///
/// In the example below, in the second tick two 'a's are removed because two 'a's were received in
/// the previous tick. The third 'a' is released though.
///
/// ```rustbook
/// let (input_send, input_recv) = hydroflow::util::unbounded_channel::<u32>();
/// let (input_send, input_recv) = hydroflow::util::unbounded_channel::<char>();
/// let mut flow = hydroflow::hydroflow_syntax! {
/// source_stream(input_recv)
/// -> multiset_delta()
/// -> for_each(|n| println!("{}", n));
/// };
///
/// input_send.send(3).unwrap();
/// input_send.send(4).unwrap();
/// input_send.send(3).unwrap();
/// input_send.send('a').unwrap();
/// input_send.send('b').unwrap();
/// input_send.send('a').unwrap();
/// flow.run_tick();
/// // 'a', 'b', 'a'
///
/// input_send.send('a').unwrap();
/// input_send.send('c').unwrap();
/// input_send.send('a').unwrap();
/// input_send.send('a').unwrap();
/// flow.run_tick();
/// // 3, 4,
/// // 'c', 'a'
/// // First two 'a's are removed due to previous tick.
///
/// input_send.send(3).unwrap();
/// input_send.send(5).unwrap();
/// input_send.send(3).unwrap();
/// input_send.send(3).unwrap();
/// input_send.send('b').unwrap();
/// input_send.send('c').unwrap();
/// input_send.send('a').unwrap();
/// input_send.send('a').unwrap();
/// input_send.send('a').unwrap();
/// input_send.send('a').unwrap();
/// flow.run_tick();
/// // 5, 3
/// // First two "3"s are removed due to previous tick.
/// // 'b', 'a'
/// // 3 'a's and the 'c' are removed due to previous tick.
/// ```
pub const MULTISET_DELTA: OperatorConstraints = OperatorConstraints {
name: "multiset_delta",
Expand Down
4 changes: 3 additions & 1 deletion hydroflow_lang/src/graph/ops/persist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ use super::{
use crate::diagnostic::{Diagnostic, Level};
use crate::graph::{FlowProps, LatticeFlowType};

/// Stores each item as it passes through, and replays all item every tick.
/// Stores each item as it passes through, and replays all items every tick.
///
/// > 1 input stream, 1 output stream
///
/// ```hydroflow
/// // Normally `source_iter(...)` only emits once, but `persist()` will replay the `"hello"`
Expand Down

0 comments on commit 845c2a5

Please sign in to comment.