Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(lattices)!: Add reveal methods, make fields private (#789) #789

Merged
merged 1 commit into from
Jun 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion hydroflow/examples/hello_world/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ pub fn main() {
source_iter([1,2,3,4,5])
-> map(hydroflow::lattices::Max::new)
-> lattice_merge::<'static, hydroflow::lattices::Max<usize>>()
-> assert([hydroflow::lattices::Max(5)]);
-> assert([hydroflow::lattices::Max::new(5)]);
};
df.run_available();
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ impl<'a, const SIZE: usize> Serialize for MapUnionHashMapWrapper<'a, SIZE> {
{
use serde::ser::SerializeMap;

let inner_map = &self.0 .0;
let inner_map = self.0.as_reveal_ref();

let mut map_serializer = serializer.serialize_map(Some(inner_map.len()))?;

Expand Down Expand Up @@ -64,7 +64,7 @@ impl<'de, const SIZE: usize> DeserializeSeed<'de> for MapUnionHashMapDeserialize
let k: Option<u64> = map.next_key()?;

if let Some(k) = k {
inner_map.0.insert(
inner_map.as_reveal_mut().insert(
k,
map.next_value_seed(MyLastWriteWinsDeserializer {
collector: self.collector.clone(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ impl<'a, const SIZE: usize> Serialize for MyLastWriteWinsWrapper<'a, SIZE> {

let mut struct_serializer = serializer.serialize_struct("DomPair", 2)?;

struct_serializer.serialize_field("key", &self.0.key)?;
struct_serializer.serialize_field("val", &WithBotWrapper(&self.0.val))?;
let (key, val) = self.0.as_reveal_ref();
struct_serializer.serialize_field("key", key)?;
struct_serializer.serialize_field("val", &WithBotWrapper(val))?;

struct_serializer.end()
}
Expand Down Expand Up @@ -58,7 +59,7 @@ impl<'de, const SIZE: usize> DeserializeSeed<'de> for MyLastWriteWinsDeserialize
})?
.unwrap();

Ok(Self::Value { key, val })
Ok(Self::Value::new(key, val))
}

fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error>
Expand Down Expand Up @@ -90,7 +91,7 @@ impl<'de, const SIZE: usize> DeserializeSeed<'de> for MyLastWriteWinsDeserialize
let key = key.unwrap();
let val = val.unwrap();

Ok(Self::Value { key, val })
Ok(Self::Value::new(key, val))
}
}

Expand Down
2 changes: 1 addition & 1 deletion hydroflow/examples/kvs_bench/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ pub fn run_server<RX>(
)));
},
KvsRequest::Gossip {map} => {
for (key, reg) in map.0 {
for (key, reg) in map.into_reveal() {
store.give((key, reg));
}
},
Expand Down
4 changes: 2 additions & 2 deletions hydroflow/examples/lamport_clock/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::{GraphType, Opts};
pub(crate) async fn run_client(outbound: UdpSink, inbound: UdpStream, opts: Opts) {
// server_addr is required for client
let server_addr = opts.server_addr.expect("Client requires a server address");
let bot: Max<usize> = Max(0);
let bot: Max<usize> = Max::new(0);

println!("Client live!");

Expand All @@ -30,7 +30,7 @@ pub(crate) async fn run_client(outbound: UdpSink, inbound: UdpStream, opts: Opts
mergevc = union() -> fold::<'static>(
bot,
|mut old: Max<usize>, lamport_clock: Max<usize>| {
let bump = Max(old.0 + 1);
let bump = Max::new(old.into_reveal() + 1);
old.merge(bump);
old.merge(lamport_clock);
old
Expand Down
4 changes: 2 additions & 2 deletions hydroflow/examples/lamport_clock/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use hydroflow::util::{UdpSink, UdpStream};
use crate::protocol::EchoMsg;

pub(crate) async fn run_server(outbound: UdpSink, inbound: UdpStream) {
let bot: Max<usize> = Max(0);
let bot: Max<usize> = Max::new(0);
println!("Server live!");

let mut flow: Hydroflow = hydroflow_syntax! {
Expand All @@ -25,7 +25,7 @@ pub(crate) async fn run_server(outbound: UdpSink, inbound: UdpStream) {
mergevc = fold::<'static>(
bot,
|mut old: Max<usize>, lamport_clock: Max<usize>| {
let bump = Max(old.0 + 1);
let bump = Max::new(old.into_reveal() + 1);
old.merge(bump);
old.merge(lamport_clock);
old
Expand Down
2 changes: 1 addition & 1 deletion hydroflow/examples/vector_clock/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub(crate) async fn run_client(
inbound_chan[merge] -> map(|(msg, _sender): (EchoMsg, SocketAddr)| msg.vc) -> [net]mergevc;
mergevc = union() -> fold::<'static> (VecClock::default(), |mut old, vc| {
let my_addr = format!("{:?}", addr);
let bump = MapUnionSingletonMap::new_from((my_addr.clone(), Max::new(old.0[&my_addr].0 + 1)));
let bump = MapUnionSingletonMap::new_from((my_addr.clone(), Max::new(old.as_reveal_ref()[&my_addr].into_reveal() + 1)));
old.merge(bump);
old.merge(vc);
old
Expand Down
2 changes: 1 addition & 1 deletion hydroflow/examples/vector_clock/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub(crate) async fn run_server(outbound: UdpSink, inbound: UdpStream, opts: crat
inbound_chan[merge] -> map(|(msg, _addr): (EchoMsg, SocketAddr)| msg.vc) -> mergevc;
mergevc = fold::<'static> (VecClock::default(), |mut old, vc| {
let my_addr = format!("{:?}", opts.addr.unwrap());
let bump = MapUnionSingletonMap::new_from((my_addr.clone(), Max::new(old.0[&my_addr].0 + 1)));
let bump = MapUnionSingletonMap::new_from((my_addr.clone(), Max::new(old.as_reveal_ref()[&my_addr].into_reveal() + 1)));
old.merge(bump);
old.merge(vc);
old
Expand Down
8 changes: 4 additions & 4 deletions hydroflow/src/compiled/pull/symmetric_hash_join_lattice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ impl<K, Lattice> Default for HalfJoinStateLattice<K, Lattice> {

impl<K, Lattice> Clear for HalfJoinStateLattice<K, Lattice> {
fn clear(&mut self) {
self.table.0.clear()
self.table.as_reveal_mut().clear()
}
}

Expand All @@ -33,7 +33,7 @@ where
where
Lattice: Merge<LatticeDelta> + LatticeFrom<LatticeDelta>,
{
let entry = self.table.0.entry(k);
let entry = self.table.as_reveal_mut().entry(k);

match entry {
Entry::Occupied(mut e) => e.get_mut().merge(v),
Expand Down Expand Up @@ -69,8 +69,8 @@ where

fn next(&mut self) -> Option<Self::Item> {
if let Some(key) = self.updated_keys.next() {
if let Some(lhs) = self.state.0.table.0.get(&key) {
if let Some(rhs) = self.state.1.table.0.get(&key) {
if let Some(lhs) = self.state.0.table.as_reveal_ref().get(&key) {
if let Some(rhs) = self.state.1.table.as_reveal_ref().get(&key) {
return Some((key, (lhs.clone(), rhs.clone())));
}
}
Expand Down
2 changes: 1 addition & 1 deletion hydroflow_lang/src/graph/ops/lattice_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::graph::ops::OperatorWriteOutput;
/// source_iter([1,2,3,4,5])
/// -> map(hydroflow::lattices::Max::new)
/// -> lattice_merge::<'static, hydroflow::lattices::Max<usize>>()
/// -> assert([hydroflow::lattices::Max(5)]);
/// -> assert([hydroflow::lattices::Max::new(5)]);
MingweiSamuel marked this conversation as resolved.
Show resolved Hide resolved
/// ```
pub const LATTICE_MERGE: OperatorConstraints = OperatorConstraints {
name: "lattice_merge",
Expand Down
17 changes: 16 additions & 1 deletion lattices/src/conflict.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use crate::{IsTop, LatticeFrom, LatticeOrd, Merge};
#[repr(transparent)]
#[derive(Copy, Clone, Debug, Default, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct Conflict<T>(pub Option<T>);
pub struct Conflict<T>(Option<T>);
impl<T> Conflict<T> {
/// Create a new `Conflict` lattice instance from a value.
pub fn new(val: Option<T>) -> Self {
Expand All @@ -25,6 +25,21 @@ impl<T> Conflict<T> {
pub fn new_from(val: impl Into<Option<T>>) -> Self {
Self::new(val.into())
}

/// Reveal the inner value as a shared reference.
pub fn as_reveal_ref(&self) -> Option<&T> {
self.0.as_ref()
}

/// Reveal the inner value as an exclusive reference.
pub fn as_reveal_mut(&mut self) -> Option<&mut T> {
self.0.as_mut()
}

/// Gets the inner by value, consuming self.
pub fn into_reveal(self) -> Option<T> {
self.0
}
}

impl<T, O> Merge<Conflict<O>> for Conflict<T>
Expand Down
21 changes: 19 additions & 2 deletions lattices/src/dom_pair.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@ use crate::{IsBot, IsTop, LatticeFrom, LatticeOrd, Merge};
#[derive(Copy, Clone, Debug, Default, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct DomPair<Key, Val> {
/// The `Key` of the dominating pair lattice, usually a timestamp
/// The `Key` of the dominating pair lattice, usually a timestamp.
///
/// This field is public as it is always monotonically increasing in its lattice.
pub key: Key,
/// The `Val` of the dominating pair lattice.
pub val: Val,
val: Val,
}

impl<Key, Val> DomPair<Key, Val> {
Expand All @@ -33,6 +35,21 @@ impl<Key, Val> DomPair<Key, Val> {
pub fn new_from(key: impl Into<Key>, val: impl Into<Val>) -> Self {
Self::new(key.into(), val.into())
}

/// Reveal the inner value as a shared reference.
pub fn as_reveal_ref(&self) -> (&Key, &Val) {
(&self.key, &self.val)
}

/// Reveal the inner value as an exclusive reference.
pub fn as_reveal_mut(&mut self) -> (&mut Key, &mut Val) {
(&mut self.key, &mut self.val)
}

/// Gets the inner by value, consuming self.
pub fn into_reveal(self) -> (Key, Val) {
(self.key, self.val)
}
}

impl<KeySelf, KeyOther, ValSelf, ValOther> Merge<DomPair<KeyOther, ValOther>>
Expand Down
17 changes: 16 additions & 1 deletion lattices/src/map_union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::{IsBot, LatticeFrom, LatticeOrd, Merge};
#[repr(transparent)]
#[derive(Copy, Clone, Debug, Default)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct MapUnion<Map>(pub Map);
pub struct MapUnion<Map>(Map);
impl<Map> MapUnion<Map> {
/// Create a new `MapUnion` from a `Map`.
pub fn new(val: Map) -> Self {
Expand All @@ -28,6 +28,21 @@ impl<Map> MapUnion<Map> {
pub fn new_from(val: impl Into<Map>) -> Self {
Self::new(val.into())
}

/// Reveal the inner value as a shared reference.
pub fn as_reveal_ref(&self) -> &Map {
&self.0
}

/// Reveal the inner value as an exclusive reference.
pub fn as_reveal_mut(&mut self) -> &mut Map {
&mut self.0
}

/// Gets the inner by value, consuming self.
pub fn into_reveal(self) -> Map {
self.0
}
}

impl<MapSelf, MapOther, K, ValSelf, ValOther> Merge<MapUnion<MapOther>> for MapUnion<MapSelf>
Expand Down
34 changes: 32 additions & 2 deletions lattices/src/ord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{LatticeFrom, LatticeOrd, Merge};
#[repr(transparent)]
#[derive(Copy, Clone, Debug, Default, PartialOrd, Ord, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct Max<T>(pub T);
pub struct Max<T>(T);
impl<T> Max<T> {
/// Create a new `Max` lattice instance from a `T`.
pub fn new(val: T) -> Self {
Expand All @@ -17,6 +17,21 @@ impl<T> Max<T> {
pub fn from(val: impl Into<T>) -> Self {
Self::new(val.into())
}

/// Reveal the inner value as a shared reference.
pub fn as_reveal_ref(&self) -> &T {
&self.0
}

/// Reveal the inner value as an exclusive reference.
pub fn as_reveal_mut(&mut self) -> &mut T {
&mut self.0
}

/// Gets the inner by value, consuming self.
pub fn into_reveal(self) -> T {
self.0
}
}

impl<T> Merge<Max<T>> for Max<T>
Expand Down Expand Up @@ -48,7 +63,7 @@ impl<T> LatticeOrd<Self> for Max<T> where Self: PartialOrd<Self> {}
#[repr(transparent)]
#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct Min<T>(pub T);
pub struct Min<T>(T);
impl<T> Min<T> {
/// Create a new `Min` lattice instance from a `T`.
pub fn new(val: T) -> Self {
Expand All @@ -59,6 +74,21 @@ impl<T> Min<T> {
pub fn new_from(val: impl Into<T>) -> Self {
Self::new(val.into())
}

/// Reveal the inner value as a shared reference.
pub fn as_reveal_ref(&self) -> &T {
&self.0
}

/// Reveal the inner value as an exclusive reference.
pub fn as_reveal_mut(&mut self) -> &mut T {
&mut self.0
}

/// Gets the inner by value, consuming self.
pub fn into_reveal(self) -> T {
self.0
}
}

impl<T> Merge<Min<T>> for Min<T>
Expand Down
15 changes: 15 additions & 0 deletions lattices/src/pair.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,21 @@ impl<LatA, LatB> Pair<LatA, LatB> {
pub fn new_from(a: impl Into<LatA>, b: impl Into<LatB>) -> Self {
Self::new(a.into(), b.into())
}

/// Reveal the inner value as a shared reference.
pub fn as_reveal_ref(&self) -> (&LatA, &LatB) {
(&self.a, &self.b)
}

/// Reveal the inner value as an exclusive reference.
pub fn as_reveal_mut(&mut self) -> (&mut LatA, &mut LatB) {
(&mut self.a, &mut self.b)
}

/// Gets the inner by value, consuming self.
pub fn into_reveal(self) -> (LatA, LatB) {
(self.a, self.b)
}
}

impl<LatASelf, LatAOther, LatBSelf, LatBOther> Merge<Pair<LatAOther, LatBOther>>
Expand Down
15 changes: 15 additions & 0 deletions lattices/src/seq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,21 @@ impl<Lat> Seq<Lat> {
pub fn new_from(seq: impl Into<Vec<Lat>>) -> Self {
Self::new(seq.into())
}

/// Reveal the inner value as a shared reference.
pub fn as_reveal_ref(&self) -> &Vec<Lat> {
&self.seq
}

/// Reveal the inner value as an exclusive reference.
pub fn as_reveal_mut(&mut self) -> &mut Vec<Lat> {
&mut self.seq
}

/// Gets the inner by value, consuming self.
pub fn into_reveal(self) -> Vec<Lat> {
self.seq
}
}

impl<Lat> Default for Seq<Lat> {
Expand Down
15 changes: 15 additions & 0 deletions lattices/src/set_union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,21 @@ impl<Set> SetUnion<Set> {
pub fn new_from(val: impl Into<Set>) -> Self {
Self::new(val.into())
}

/// Reveal the inner value as a shared reference.
pub fn as_reveal_ref(&self) -> &Set {
&self.0
}

/// Reveal the inner value as an exclusive reference.
pub fn as_reveal_mut(&mut self) -> &mut Set {
&mut self.0
}

/// Gets the inner by value, consuming self.
pub fn into_reveal(self) -> Set {
self.0
}
}

impl<SetSelf, SetOther, Item> Merge<SetUnion<SetOther>> for SetUnion<SetSelf>
Expand Down
Loading
Loading