Skip to content

Commit

Permalink
feat(hydroflow_plus)!: strongly-typed runtime cluster IDs
Browse files Browse the repository at this point in the history
Instead of `u32`s everywhere, we now have a `ClusterId<C>` type that ensures that cluster IDs are not misused.
  • Loading branch information
shadaj committed Oct 4, 2024
1 parent a064d60 commit db7ebd7
Show file tree
Hide file tree
Showing 20 changed files with 703 additions and 515 deletions.
59 changes: 46 additions & 13 deletions hydroflow_plus/src/builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ use stageleft::*;

use crate::ir::HfPlusLeaf;
use crate::location::{Cluster, ExternalProcess, Process};
use crate::RuntimeContext;
use crate::staging_util::get_this_crate;
use crate::{ClusterId, RuntimeContext};

pub mod built;
pub mod deploy;
Expand All @@ -31,13 +32,23 @@ pub struct FlowStateInner {

pub type FlowState = Rc<RefCell<FlowStateInner>>;

#[derive(Copy, Clone)]
pub struct ClusterIds<'a> {
pub struct ClusterIds<'a, C> {
pub(crate) id: usize,
pub(crate) _phantom: PhantomData<&'a mut &'a Vec<u32>>,
pub(crate) _phantom: PhantomData<&'a mut &'a C>,
}

impl<'a> FreeVariable<&'a Vec<u32>> for ClusterIds<'a> {
impl<'a, C> Clone for ClusterIds<'a, C> {
fn clone(&self) -> Self {
ClusterIds {
id: self.id,
_phantom: PhantomData,
}
}
}

impl<'a, C> Copy for ClusterIds<'a, C> {}

impl<'a, C> FreeVariable<&'a Vec<ClusterId<C>>> for ClusterIds<'a, C> {
fn to_tokens(self) -> (Option<TokenStream>, Option<TokenStream>)
where
Self: Sized,
Expand All @@ -46,19 +57,36 @@ impl<'a> FreeVariable<&'a Vec<u32>> for ClusterIds<'a> {
&format!("__hydroflow_plus_cluster_ids_{}", self.id),
Span::call_site(),
);
(None, Some(quote! { #ident }))
let root = get_this_crate();
let c_type = quote_type::<C>();
(
None,
Some(
quote! { unsafe { ::std::mem::transmute::<_, &::std::vec::Vec<#root::ClusterId<#c_type>>>(#ident) } },
),
)
}
}

impl<'a> Quoted<'a, &'a Vec<u32>> for ClusterIds<'a> {}
impl<'a, C> Quoted<'a, &'a Vec<ClusterId<C>>> for ClusterIds<'a, C> {}

#[derive(Copy, Clone)]
pub(crate) struct ClusterSelfId<'a> {
pub(crate) struct ClusterSelfId<'a, C> {
pub(crate) id: usize,
pub(crate) _phantom: PhantomData<&'a mut &'a u32>,
pub(crate) _phantom: PhantomData<&'a mut &'a C>,
}

impl<'a> FreeVariable<u32> for ClusterSelfId<'a> {
impl<'a, C> Clone for ClusterSelfId<'a, C> {
fn clone(&self) -> Self {
ClusterSelfId {
id: self.id,
_phantom: PhantomData,
}
}
}

impl<'a, C> Copy for ClusterSelfId<'a, C> {}

impl<'a, C> FreeVariable<ClusterId<C>> for ClusterSelfId<'a, C> {
fn to_tokens(self) -> (Option<TokenStream>, Option<TokenStream>)
where
Self: Sized,
Expand All @@ -67,11 +95,16 @@ impl<'a> FreeVariable<u32> for ClusterSelfId<'a> {
&format!("__hydroflow_plus_cluster_self_id_{}", self.id),
Span::call_site(),
);
(None, Some(quote! { #ident }))
let root = get_this_crate();
let c_type: syn::Type = quote_type::<C>();
(
None,
Some(quote! { #root::ClusterId::<#c_type>::from_raw(#ident) }),
)
}
}

impl<'a> Quoted<'a, u32> for ClusterSelfId<'a> {}
impl<'a, C> Quoted<'a, ClusterId<C>> for ClusterSelfId<'a, C> {}

pub struct FlowBuilder<'a> {
flow_state: FlowState,
Expand Down
3 changes: 1 addition & 2 deletions hydroflow_plus/src/deploy/deploy_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use hydro_deploy::hydroflow_crate::tracing_options::TracingOptions;
use hydro_deploy::hydroflow_crate::HydroflowCrateService;
use hydro_deploy::{CustomService, Deployment, Host, HydroflowCrate};
use hydroflow::futures::StreamExt;
use hydroflow::util::deploy::ConnectedSource;
use hydroflow::util::deploy::{ConnectedSink, ConnectedSource};
use nameof::name_of;
use serde::de::DeserializeOwned;
use serde::Serialize;
Expand All @@ -29,7 +29,6 @@ use super::trybuild::{compile_graph_trybuild, create_trybuild};
use super::{ClusterSpec, Deploy, ExternalSpec, Node, ProcessSpec, RegisterPort};
use crate::futures::SinkExt;
use crate::lang::graph::HydroflowGraph;
use crate::util::deploy::ConnectedSink;

pub struct HydroDeploy {}

Expand Down
7 changes: 3 additions & 4 deletions hydroflow_plus/src/deploy/deploy_runtime.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
use std::collections::HashMap;

use serde::{Deserialize, Serialize};
use stageleft::{q, Quoted, RuntimeData};

use crate::util::deploy::{
use hydroflow::util::deploy::{
ConnectedDemux, ConnectedDirect, ConnectedSink, ConnectedSource, ConnectedTagged, DeployPorts,
};
use serde::{Deserialize, Serialize};
use stageleft::{q, Quoted, RuntimeData};

#[derive(Default, Serialize, Deserialize)]
pub struct HydroflowPlusMeta {
Expand Down
2 changes: 1 addition & 1 deletion hydroflow_plus/src/deploy/macro_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ use std::cell::RefCell;
use std::pin::Pin;
use std::rc::Rc;

use hydroflow::util::deploy::DeployPorts;
use stageleft::{Quoted, RuntimeData};

use super::HydroflowPlusMeta;
use crate::deploy::{ClusterSpec, Deploy, ExternalSpec, Node, ProcessSpec, RegisterPort};
use crate::lang::graph::HydroflowGraph;
use crate::util::deploy::DeployPorts;

pub struct DeployRuntime {}

Expand Down
4 changes: 3 additions & 1 deletion hydroflow_plus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ pub mod singleton;
pub use singleton::{Optional, Singleton};

pub mod location;
pub use location::{Cluster, Location, Process};
pub use location::{Cluster, ClusterId, Location, Process};

pub mod deploy;
pub use deploy::{ClusterSpec, Deploy, ProcessSpec};
Expand All @@ -43,6 +43,8 @@ pub mod profiler;

pub mod properties;

pub(crate) mod staging_util;

#[derive(Clone)]
pub struct RuntimeContext<'a> {
_phantom: PhantomData<&'a mut &'a ()>,
Expand Down
137 changes: 117 additions & 20 deletions hydroflow_plus/src/location.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::fmt::{Debug, Display};
use std::hash::Hash;
use std::marker::PhantomData;
use std::time::Duration;

Expand All @@ -6,8 +8,8 @@ use hydroflow::futures::stream::Stream as FuturesStream;
use hydroflow::{tokio, tokio_stream};
use proc_macro2::Span;
use serde::de::DeserializeOwned;
use serde::Serialize;
use stageleft::{q, Quoted};
use serde::{Deserialize, Serialize};
use stageleft::{q, quote_type, Quoted};

use super::builder::{ClusterIds, ClusterSelfId, FlowState};
use crate::cycle::{CycleCollection, CycleCollectionWithInitial};
Expand Down Expand Up @@ -366,7 +368,7 @@ impl<'a, P> ExternalProcess<'a, P> {
to_key: None,
serialize_pipeline: None,
instantiate_fn: crate::ir::DebugInstantiate::Building(),
deserialize_pipeline: Some(crate::stream::deserialize_bincode::<T>(false)),
deserialize_pipeline: Some(crate::stream::deserialize_bincode::<T>(None)),
input: Box::new(HfPlusNode::Source {
source: HfPlusSource::ExternalNetwork(),
location_kind: LocationId::ExternalProcess(self.id),
Expand Down Expand Up @@ -403,21 +405,116 @@ impl<'a, P> Location<'a> for Process<'a, P> {
}
}

#[repr(transparent)]
pub struct ClusterId<C> {
pub id: u32,
pub(crate) _phantom: PhantomData<C>,
}

impl<C> Debug for ClusterId<C> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"ClusterId::<{}>({})",
std::any::type_name::<C>(),
self.id
)
}
}

impl<C> Display for ClusterId<C> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"ClusterId::<{}>({})",
std::any::type_name::<C>(),
self.id
)
}
}

impl<C> Clone for ClusterId<C> {
fn clone(&self) -> Self {
ClusterId {
id: self.id,
_phantom: PhantomData,
}
}
}

impl<C> Copy for ClusterId<C> {}

impl<C> Serialize for ClusterId<C> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::ser::Serializer,
{
self.id.serialize(serializer)
}
}

impl<'de, C> Deserialize<'de> for ClusterId<C> {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::de::Deserializer<'de>,
{
u32::deserialize(deserializer).map(|id| ClusterId {
id,
_phantom: PhantomData,
})
}
}

impl<C> PartialEq for ClusterId<C> {
fn eq(&self, other: &Self) -> bool {
self.id == other.id
}
}

impl<C> Eq for ClusterId<C> {}

impl<C> PartialOrd for ClusterId<C> {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
self.id.partial_cmp(&other.id)
}
}

impl<C> Ord for ClusterId<C> {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.id.cmp(&other.id)
}
}

impl<C> Hash for ClusterId<C> {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.id.hash(state)
}
}

impl<C> ClusterId<C> {
pub fn from_raw(id: u32) -> Self {
ClusterId {
id,
_phantom: PhantomData,
}
}
}

pub struct Cluster<'a, C> {
pub(crate) id: usize,
pub(crate) flow_state: FlowState,
pub(crate) _phantom: PhantomData<&'a &'a mut C>,
}

impl<'a, C> Cluster<'a, C> {
pub fn self_id(&self) -> impl Quoted<'a, u32> + Copy + 'a {
pub fn self_id(&self) -> impl Quoted<'a, ClusterId<C>> + Copy + 'a {
ClusterSelfId {
id: self.id,
_phantom: PhantomData,
}
}

pub fn members(&self) -> impl Quoted<'a, &'a Vec<u32>> + Copy + 'a {
pub fn members(&self) -> impl Quoted<'a, &'a Vec<ClusterId<C>>> + Copy + 'a {
ClusterIds {
id: self.id,
_phantom: PhantomData,
Expand Down Expand Up @@ -450,7 +547,7 @@ pub trait CanSend<'a, To: Location<'a>>: Location<'a> {
type Out<T>;

fn is_demux() -> bool;
fn is_tagged() -> bool;
fn tagged_type() -> Option<syn::Type>;
}

impl<'a, P1, P2> CanSend<'a, Process<'a, P2>> for Process<'a, P1> {
Expand All @@ -461,47 +558,47 @@ impl<'a, P1, P2> CanSend<'a, Process<'a, P2>> for Process<'a, P1> {
false
}

fn is_tagged() -> bool {
false
fn tagged_type() -> Option<syn::Type> {
None
}
}

impl<'a, P1, C2> CanSend<'a, Cluster<'a, C2>> for Process<'a, P1> {
type In<T> = (u32, T);
type In<T> = (ClusterId<C2>, T);
type Out<T> = T;

fn is_demux() -> bool {
true
}

fn is_tagged() -> bool {
false
fn tagged_type() -> Option<syn::Type> {
None
}
}

impl<'a, C1, P2> CanSend<'a, Process<'a, P2>> for Cluster<'a, C1> {
type In<T> = T;
type Out<T> = (u32, T);
type Out<T> = (ClusterId<C1>, T);

fn is_demux() -> bool {
false
}

fn is_tagged() -> bool {
true
fn tagged_type() -> Option<syn::Type> {
Some(quote_type::<C1>())
}
}

impl<'a, C1, C2> CanSend<'a, Cluster<'a, C2>> for Cluster<'a, C1> {
type In<T> = (u32, T);
type Out<T> = (u32, T);
type In<T> = (ClusterId<C2>, T);
type Out<T> = (ClusterId<C1>, T);

fn is_demux() -> bool {
true
}

fn is_tagged() -> bool {
true
fn tagged_type() -> Option<syn::Type> {
Some(quote_type::<C1>())
}
}

Expand All @@ -513,7 +610,7 @@ impl<'a, P1, E2> CanSend<'a, ExternalProcess<'a, E2>> for Process<'a, P1> {
false
}

fn is_tagged() -> bool {
false
fn tagged_type() -> Option<syn::Type> {
None
}
}
Loading

0 comments on commit db7ebd7

Please sign in to comment.