diff --git a/hydroflow/examples/hello_world/main.rs b/hydroflow/examples/hello_world/main.rs index 50793a1a6f78..98dc502c7487 100644 --- a/hydroflow/examples/hello_world/main.rs +++ b/hydroflow/examples/hello_world/main.rs @@ -5,7 +5,7 @@ pub fn main() { source_iter([1,2,3,4,5]) -> map(hydroflow::lattices::Max::new) -> lattice_merge::<'static, hydroflow::lattices::Max>() - -> assert([hydroflow::lattices::Max(5)]); + -> assert([hydroflow::lattices::Max::new(5)]); }; df.run_available(); } diff --git a/hydroflow/examples/kvs_bench/protocol/serialization/lattices/map_union.rs b/hydroflow/examples/kvs_bench/protocol/serialization/lattices/map_union.rs index 04bd5b350987..1be9a9c4e9b8 100644 --- a/hydroflow/examples/kvs_bench/protocol/serialization/lattices/map_union.rs +++ b/hydroflow/examples/kvs_bench/protocol/serialization/lattices/map_union.rs @@ -22,7 +22,7 @@ impl<'a, const SIZE: usize> Serialize for MapUnionHashMapWrapper<'a, SIZE> { { use serde::ser::SerializeMap; - let inner_map = &self.0 .0; + let inner_map = self.0.as_reveal_ref(); let mut map_serializer = serializer.serialize_map(Some(inner_map.len()))?; @@ -64,7 +64,7 @@ impl<'de, const SIZE: usize> DeserializeSeed<'de> for MapUnionHashMapDeserialize let k: Option = map.next_key()?; if let Some(k) = k { - inner_map.0.insert( + inner_map.as_reveal_mut().insert( k, map.next_value_seed(MyLastWriteWinsDeserializer { collector: self.collector.clone(), diff --git a/hydroflow/examples/kvs_bench/protocol/serialization/lattices/my_last_write_wins.rs b/hydroflow/examples/kvs_bench/protocol/serialization/lattices/my_last_write_wins.rs index d25954741d02..91fc1b71dfb4 100644 --- a/hydroflow/examples/kvs_bench/protocol/serialization/lattices/my_last_write_wins.rs +++ b/hydroflow/examples/kvs_bench/protocol/serialization/lattices/my_last_write_wins.rs @@ -20,8 +20,9 @@ impl<'a, const SIZE: usize> Serialize for MyLastWriteWinsWrapper<'a, SIZE> { let mut struct_serializer = serializer.serialize_struct("DomPair", 2)?; - struct_serializer.serialize_field("key", &self.0.key)?; - struct_serializer.serialize_field("val", &WithBotWrapper(&self.0.val))?; + let (key, val) = self.0.as_reveal_ref(); + struct_serializer.serialize_field("key", key)?; + struct_serializer.serialize_field("val", &WithBotWrapper(val))?; struct_serializer.end() } @@ -58,7 +59,7 @@ impl<'de, const SIZE: usize> DeserializeSeed<'de> for MyLastWriteWinsDeserialize })? .unwrap(); - Ok(Self::Value { key, val }) + Ok(Self::Value::new(key, val)) } fn visit_map(self, mut map: A) -> Result @@ -90,7 +91,7 @@ impl<'de, const SIZE: usize> DeserializeSeed<'de> for MyLastWriteWinsDeserialize let key = key.unwrap(); let val = val.unwrap(); - Ok(Self::Value { key, val }) + Ok(Self::Value::new(key, val)) } } diff --git a/hydroflow/examples/kvs_bench/server.rs b/hydroflow/examples/kvs_bench/server.rs index 78cfa0d7a5cc..8f84a657305d 100644 --- a/hydroflow/examples/kvs_bench/server.rs +++ b/hydroflow/examples/kvs_bench/server.rs @@ -198,7 +198,7 @@ pub fn run_server( ))); }, KvsRequest::Gossip {map} => { - for (key, reg) in map.0 { + for (key, reg) in map.into_reveal() { store.give((key, reg)); } }, diff --git a/hydroflow/examples/lamport_clock/client.rs b/hydroflow/examples/lamport_clock/client.rs index b9a8f92ee477..64ac37b870a0 100644 --- a/hydroflow/examples/lamport_clock/client.rs +++ b/hydroflow/examples/lamport_clock/client.rs @@ -11,7 +11,7 @@ use crate::{GraphType, Opts}; pub(crate) async fn run_client(outbound: UdpSink, inbound: UdpStream, opts: Opts) { // server_addr is required for client let server_addr = opts.server_addr.expect("Client requires a server address"); - let bot: Max = Max(0); + let bot: Max = Max::new(0); println!("Client live!"); @@ -30,7 +30,7 @@ pub(crate) async fn run_client(outbound: UdpSink, inbound: UdpStream, opts: Opts mergevc = union() -> fold::<'static>( bot, |mut old: Max, lamport_clock: Max| { - let bump = Max(old.0 + 1); + let bump = Max::new(old.into_reveal() + 1); old.merge(bump); old.merge(lamport_clock); old diff --git a/hydroflow/examples/lamport_clock/server.rs b/hydroflow/examples/lamport_clock/server.rs index b800f383a938..d559691bad31 100644 --- a/hydroflow/examples/lamport_clock/server.rs +++ b/hydroflow/examples/lamport_clock/server.rs @@ -9,7 +9,7 @@ use hydroflow::util::{UdpSink, UdpStream}; use crate::protocol::EchoMsg; pub(crate) async fn run_server(outbound: UdpSink, inbound: UdpStream) { - let bot: Max = Max(0); + let bot: Max = Max::new(0); println!("Server live!"); let mut flow: Hydroflow = hydroflow_syntax! { @@ -25,7 +25,7 @@ pub(crate) async fn run_server(outbound: UdpSink, inbound: UdpStream) { mergevc = fold::<'static>( bot, |mut old: Max, lamport_clock: Max| { - let bump = Max(old.0 + 1); + let bump = Max::new(old.into_reveal() + 1); old.merge(bump); old.merge(lamport_clock); old diff --git a/hydroflow/examples/vector_clock/client.rs b/hydroflow/examples/vector_clock/client.rs index 63234fa25511..f621716f8aae 100644 --- a/hydroflow/examples/vector_clock/client.rs +++ b/hydroflow/examples/vector_clock/client.rs @@ -34,7 +34,7 @@ pub(crate) async fn run_client( inbound_chan[merge] -> map(|(msg, _sender): (EchoMsg, SocketAddr)| msg.vc) -> [net]mergevc; mergevc = union() -> fold::<'static> (VecClock::default(), |mut old, vc| { let my_addr = format!("{:?}", addr); - let bump = MapUnionSingletonMap::new_from((my_addr.clone(), Max::new(old.0[&my_addr].0 + 1))); + let bump = MapUnionSingletonMap::new_from((my_addr.clone(), Max::new(old.as_reveal_ref()[&my_addr].into_reveal() + 1))); old.merge(bump); old.merge(vc); old diff --git a/hydroflow/examples/vector_clock/server.rs b/hydroflow/examples/vector_clock/server.rs index 27c3e2e9a5b8..bb3a44de11ac 100644 --- a/hydroflow/examples/vector_clock/server.rs +++ b/hydroflow/examples/vector_clock/server.rs @@ -24,7 +24,7 @@ pub(crate) async fn run_server(outbound: UdpSink, inbound: UdpStream, opts: crat inbound_chan[merge] -> map(|(msg, _addr): (EchoMsg, SocketAddr)| msg.vc) -> mergevc; mergevc = fold::<'static> (VecClock::default(), |mut old, vc| { let my_addr = format!("{:?}", opts.addr.unwrap()); - let bump = MapUnionSingletonMap::new_from((my_addr.clone(), Max::new(old.0[&my_addr].0 + 1))); + let bump = MapUnionSingletonMap::new_from((my_addr.clone(), Max::new(old.as_reveal_ref()[&my_addr].into_reveal() + 1))); old.merge(bump); old.merge(vc); old diff --git a/hydroflow/src/compiled/pull/symmetric_hash_join_lattice.rs b/hydroflow/src/compiled/pull/symmetric_hash_join_lattice.rs index 9464376d890b..58ba834b3341 100644 --- a/hydroflow/src/compiled/pull/symmetric_hash_join_lattice.rs +++ b/hydroflow/src/compiled/pull/symmetric_hash_join_lattice.rs @@ -21,7 +21,7 @@ impl Default for HalfJoinStateLattice { impl Clear for HalfJoinStateLattice { fn clear(&mut self) { - self.table.0.clear() + self.table.as_reveal_mut().clear() } } @@ -33,7 +33,7 @@ where where Lattice: Merge + LatticeFrom, { - let entry = self.table.0.entry(k); + let entry = self.table.as_reveal_mut().entry(k); match entry { Entry::Occupied(mut e) => e.get_mut().merge(v), @@ -69,8 +69,8 @@ where fn next(&mut self) -> Option { if let Some(key) = self.updated_keys.next() { - if let Some(lhs) = self.state.0.table.0.get(&key) { - if let Some(rhs) = self.state.1.table.0.get(&key) { + if let Some(lhs) = self.state.0.table.as_reveal_ref().get(&key) { + if let Some(rhs) = self.state.1.table.as_reveal_ref().get(&key) { return Some((key, (lhs.clone(), rhs.clone()))); } } diff --git a/hydroflow_lang/src/graph/ops/lattice_merge.rs b/hydroflow_lang/src/graph/ops/lattice_merge.rs index 1fda39d67065..02b30954ccf4 100644 --- a/hydroflow_lang/src/graph/ops/lattice_merge.rs +++ b/hydroflow_lang/src/graph/ops/lattice_merge.rs @@ -26,7 +26,7 @@ use crate::graph::ops::OperatorWriteOutput; /// source_iter([1,2,3,4,5]) /// -> map(hydroflow::lattices::Max::new) /// -> lattice_merge::<'static, hydroflow::lattices::Max>() -/// -> assert([hydroflow::lattices::Max(5)]); +/// -> assert([hydroflow::lattices::Max::new(5)]); /// ``` pub const LATTICE_MERGE: OperatorConstraints = OperatorConstraints { name: "lattice_merge", diff --git a/lattices/src/conflict.rs b/lattices/src/conflict.rs index 5ff2f6804903..107b74c317ca 100644 --- a/lattices/src/conflict.rs +++ b/lattices/src/conflict.rs @@ -14,7 +14,7 @@ use crate::{IsTop, LatticeFrom, LatticeOrd, Merge}; #[repr(transparent)] #[derive(Copy, Clone, Debug, Default, Eq)] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] -pub struct Conflict(pub Option); +pub struct Conflict(Option); impl Conflict { /// Create a new `Conflict` lattice instance from a value. pub fn new(val: Option) -> Self { @@ -25,6 +25,21 @@ impl Conflict { pub fn new_from(val: impl Into>) -> Self { Self::new(val.into()) } + + /// Reveal the inner value as a shared reference. + pub fn as_reveal_ref(&self) -> Option<&T> { + self.0.as_ref() + } + + /// Reveal the inner value as an exclusive reference. + pub fn as_reveal_mut(&mut self) -> Option<&mut T> { + self.0.as_mut() + } + + /// Gets the inner by value, consuming self. + pub fn into_reveal(self) -> Option { + self.0 + } } impl Merge> for Conflict diff --git a/lattices/src/dom_pair.rs b/lattices/src/dom_pair.rs index 2d9a14f853b3..78f27f5cf065 100644 --- a/lattices/src/dom_pair.rs +++ b/lattices/src/dom_pair.rs @@ -17,10 +17,12 @@ use crate::{IsBot, IsTop, LatticeFrom, LatticeOrd, Merge}; #[derive(Copy, Clone, Debug, Default, Eq)] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] pub struct DomPair { - /// The `Key` of the dominating pair lattice, usually a timestamp + /// The `Key` of the dominating pair lattice, usually a timestamp. + /// + /// This field is public as it is always monotonically increasing in its lattice. pub key: Key, /// The `Val` of the dominating pair lattice. - pub val: Val, + val: Val, } impl DomPair { @@ -33,6 +35,21 @@ impl DomPair { pub fn new_from(key: impl Into, val: impl Into) -> Self { Self::new(key.into(), val.into()) } + + /// Reveal the inner value as a shared reference. + pub fn as_reveal_ref(&self) -> (&Key, &Val) { + (&self.key, &self.val) + } + + /// Reveal the inner value as an exclusive reference. + pub fn as_reveal_mut(&mut self) -> (&mut Key, &mut Val) { + (&mut self.key, &mut self.val) + } + + /// Gets the inner by value, consuming self. + pub fn into_reveal(self) -> (Key, Val) { + (self.key, self.val) + } } impl Merge> diff --git a/lattices/src/map_union.rs b/lattices/src/map_union.rs index 5523278940f5..e4437d5db235 100644 --- a/lattices/src/map_union.rs +++ b/lattices/src/map_union.rs @@ -17,7 +17,7 @@ use crate::{IsBot, LatticeFrom, LatticeOrd, Merge}; #[repr(transparent)] #[derive(Copy, Clone, Debug, Default)] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] -pub struct MapUnion(pub Map); +pub struct MapUnion(Map); impl MapUnion { /// Create a new `MapUnion` from a `Map`. pub fn new(val: Map) -> Self { @@ -28,6 +28,21 @@ impl MapUnion { pub fn new_from(val: impl Into) -> Self { Self::new(val.into()) } + + /// Reveal the inner value as a shared reference. + pub fn as_reveal_ref(&self) -> &Map { + &self.0 + } + + /// Reveal the inner value as an exclusive reference. + pub fn as_reveal_mut(&mut self) -> &mut Map { + &mut self.0 + } + + /// Gets the inner by value, consuming self. + pub fn into_reveal(self) -> Map { + self.0 + } } impl Merge> for MapUnion diff --git a/lattices/src/ord.rs b/lattices/src/ord.rs index 4e19044c2b76..d8e5dd3dca2a 100644 --- a/lattices/src/ord.rs +++ b/lattices/src/ord.rs @@ -6,7 +6,7 @@ use crate::{LatticeFrom, LatticeOrd, Merge}; #[repr(transparent)] #[derive(Copy, Clone, Debug, Default, PartialOrd, Ord, PartialEq, Eq)] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] -pub struct Max(pub T); +pub struct Max(T); impl Max { /// Create a new `Max` lattice instance from a `T`. pub fn new(val: T) -> Self { @@ -17,6 +17,21 @@ impl Max { pub fn from(val: impl Into) -> Self { Self::new(val.into()) } + + /// Reveal the inner value as a shared reference. + pub fn as_reveal_ref(&self) -> &T { + &self.0 + } + + /// Reveal the inner value as an exclusive reference. + pub fn as_reveal_mut(&mut self) -> &mut T { + &mut self.0 + } + + /// Gets the inner by value, consuming self. + pub fn into_reveal(self) -> T { + self.0 + } } impl Merge> for Max @@ -48,7 +63,7 @@ impl LatticeOrd for Max where Self: PartialOrd {} #[repr(transparent)] #[derive(Copy, Clone, Debug, Default, PartialEq, Eq)] #[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] -pub struct Min(pub T); +pub struct Min(T); impl Min { /// Create a new `Min` lattice instance from a `T`. pub fn new(val: T) -> Self { @@ -59,6 +74,21 @@ impl Min { pub fn new_from(val: impl Into) -> Self { Self::new(val.into()) } + + /// Reveal the inner value as a shared reference. + pub fn as_reveal_ref(&self) -> &T { + &self.0 + } + + /// Reveal the inner value as an exclusive reference. + pub fn as_reveal_mut(&mut self) -> &mut T { + &mut self.0 + } + + /// Gets the inner by value, consuming self. + pub fn into_reveal(self) -> T { + self.0 + } } impl Merge> for Min diff --git a/lattices/src/pair.rs b/lattices/src/pair.rs index 78f612fb61f6..a74b0050cd57 100644 --- a/lattices/src/pair.rs +++ b/lattices/src/pair.rs @@ -24,6 +24,21 @@ impl Pair { pub fn new_from(a: impl Into, b: impl Into) -> Self { Self::new(a.into(), b.into()) } + + /// Reveal the inner value as a shared reference. + pub fn as_reveal_ref(&self) -> (&LatA, &LatB) { + (&self.a, &self.b) + } + + /// Reveal the inner value as an exclusive reference. + pub fn as_reveal_mut(&mut self) -> (&mut LatA, &mut LatB) { + (&mut self.a, &mut self.b) + } + + /// Gets the inner by value, consuming self. + pub fn into_reveal(self) -> (LatA, LatB) { + (self.a, self.b) + } } impl Merge> diff --git a/lattices/src/seq.rs b/lattices/src/seq.rs index 4675eb78e4f0..4931be48454b 100644 --- a/lattices/src/seq.rs +++ b/lattices/src/seq.rs @@ -27,6 +27,21 @@ impl Seq { pub fn new_from(seq: impl Into>) -> Self { Self::new(seq.into()) } + + /// Reveal the inner value as a shared reference. + pub fn as_reveal_ref(&self) -> &Vec { + &self.seq + } + + /// Reveal the inner value as an exclusive reference. + pub fn as_reveal_mut(&mut self) -> &mut Vec { + &mut self.seq + } + + /// Gets the inner by value, consuming self. + pub fn into_reveal(self) -> Vec { + self.seq + } } impl Default for Seq { diff --git a/lattices/src/set_union.rs b/lattices/src/set_union.rs index 205184babedf..37054af45cdc 100644 --- a/lattices/src/set_union.rs +++ b/lattices/src/set_union.rs @@ -24,6 +24,21 @@ impl SetUnion { pub fn new_from(val: impl Into) -> Self { Self::new(val.into()) } + + /// Reveal the inner value as a shared reference. + pub fn as_reveal_ref(&self) -> &Set { + &self.0 + } + + /// Reveal the inner value as an exclusive reference. + pub fn as_reveal_mut(&mut self) -> &mut Set { + &mut self.0 + } + + /// Gets the inner by value, consuming self. + pub fn into_reveal(self) -> Set { + self.0 + } } impl Merge> for SetUnion diff --git a/lattices/src/with_bot.rs b/lattices/src/with_bot.rs index 1fa973288263..0bea2e2c2960 100644 --- a/lattices/src/with_bot.rs +++ b/lattices/src/with_bot.rs @@ -21,6 +21,21 @@ impl WithBot { pub fn new_from(val: impl Into>) -> Self { Self::new(val.into()) } + + /// Reveal the inner value as a shared reference. + pub fn as_reveal_ref(&self) -> Option<&Inner> { + self.0.as_ref() + } + + /// Reveal the inner value as an exclusive reference. + pub fn as_reveal_mut(&mut self) -> Option<&mut Inner> { + self.0.as_mut() + } + + /// Gets the inner by value, consuming self. + pub fn into_reveal(self) -> Option { + self.0 + } } // Cannot auto derive because the generated implementation has the wrong trait bounds. diff --git a/lattices/src/with_top.rs b/lattices/src/with_top.rs index a09397425682..f962e043628c 100644 --- a/lattices/src/with_top.rs +++ b/lattices/src/with_top.rs @@ -21,6 +21,21 @@ impl WithTop { pub fn new_from(val: impl Into>) -> Self { Self::new(val.into()) } + + /// Reveal the inner value as a shared reference. + pub fn as_reveal_ref(&self) -> Option<&Inner> { + self.0.as_ref() + } + + /// Reveal the inner value as an exclusive reference. + pub fn as_reveal_mut(&mut self) -> Option<&mut Inner> { + self.0.as_mut() + } + + /// Gets the inner by value, consuming self. + pub fn into_reveal(self) -> Option { + self.0 + } } impl Merge> for WithTop