diff --git a/hydroflow/examples/kvs_replicated_v2/client.rs b/hydroflow/examples/kvs_replicated_v2/client.rs new file mode 100644 index 000000000000..023cd75139c1 --- /dev/null +++ b/hydroflow/examples/kvs_replicated_v2/client.rs @@ -0,0 +1,35 @@ +use hydroflow::hydroflow_syntax; +use hydroflow::util::{UdpSink, UdpStream}; + +use crate::helpers::parse_command; +use crate::protocol::ServerResp; +use crate::Opts; + +pub(crate) async fn run_client(outbound: UdpSink, inbound: UdpStream, opts: Opts) { + println!("Client live!"); + + let server_addr = opts.server_addr.unwrap(); + let mut hf = hydroflow_syntax! { + // set up channels + outbound_chan = dest_sink_serde(outbound); + inbound_chan = source_stream_serde(inbound) -> map(Result::unwrap); + + // read in commands from stdin and forward to server + source_stdin() + -> filter_map(|line| parse_command(line.unwrap())) + -> map(|msg| { (msg, server_addr) }) + -> outbound_chan; + + // print inbound msgs + inbound_chan -> for_each(|(response, addr): (ServerResp, _)| println!("Got a Response: {:?} from: {:?}", response, addr)); + }; + + if let Some(graph) = opts.graph { + let serde_graph = hf + .meta_graph() + .expect("No graph found, maybe failed to parse."); + serde_graph.open_graph(graph, opts.write_config).unwrap(); + } + + hf.run_async().await.unwrap(); +} diff --git a/hydroflow/examples/kvs_replicated_v2/helpers.rs b/hydroflow/examples/kvs_replicated_v2/helpers.rs new file mode 100644 index 000000000000..07396427e2af --- /dev/null +++ b/hydroflow/examples/kvs_replicated_v2/helpers.rs @@ -0,0 +1,25 @@ +use regex::Regex; + +use crate::protocol::ServerReq; + +pub fn parse_command(line: String) -> Option { + let re = Regex::new(r"([A-z]+)\s+(.+)").unwrap(); + let caps = re.captures(line.as_str())?; + + let binding = caps.get(1).unwrap().as_str().to_uppercase(); + let cmdstr = binding.as_str(); + let args = caps.get(2).unwrap().as_str(); + match cmdstr { + "PUT" => { + let kv = args.split_once(',')?; + Some(ServerReq::ClientPut { + key: kv.0.trim().to_string(), + value: kv.1.trim().to_string(), + }) + } + "GET" => Some(ServerReq::ClientGet { + key: args.trim().to_string(), + }), + _ => None, + } +} diff --git a/hydroflow/examples/kvs_replicated_v2/left_outer_join.hf b/hydroflow/examples/kvs_replicated_v2/left_outer_join.hf new file mode 100644 index 000000000000..766d49caaff2 --- /dev/null +++ b/hydroflow/examples/kvs_replicated_v2/left_outer_join.hf @@ -0,0 +1,16 @@ +lhs = mod[0] -> tee(); +rhs = mod[1] -> tee(); + +lhs -> [0]joined; +rhs -> [1]joined; + +joined = join_multiset() -> map(|(k, (lhs, rhs))| (k, (lhs, Some(rhs)))) -> combined; + +lhs -> [pos]missed; +rhs -> map(|(k, _v)| k) -> [neg]missed; + +missed = anti_join() + -> map(|(k, v)| (k, (v, None))) + -> combined; + +combined = union() -> mod; diff --git a/hydroflow/examples/kvs_replicated_v2/main.rs b/hydroflow/examples/kvs_replicated_v2/main.rs new file mode 100644 index 000000000000..c948ff0a3e4d --- /dev/null +++ b/hydroflow/examples/kvs_replicated_v2/main.rs @@ -0,0 +1,223 @@ +use std::net::SocketAddr; +use std::pin::Pin; + +use bytes::{Bytes, BytesMut}; +use clap::{Parser, ValueEnum}; +use client::run_client; +use futures::stream::{SplitSink, SplitStream}; +use futures::task::noop_waker; +use futures::SinkExt; +use hydroflow::lang::graph::{WriteConfig, WriteGraphType}; +use hydroflow::tokio; +use hydroflow::util::{bind_udp_bytes, ipv4_resolve, TcpFramedStream}; +use multiplatform_test::multiplatform_test; +use server::run_server; +use tokio_util::codec::LengthDelimitedCodec; +use tokio_util::udp::UdpFramed; + +use crate::protocol::{ServerReq, ServerResp}; + +mod client; +mod helpers; +mod protocol; +mod server; + +#[derive(Clone, ValueEnum, Debug)] +enum Role { + Client, + Server, +} + +#[derive(Parser, Debug)] +struct Opts { + #[clap(value_enum, long)] + role: Role, + #[clap(long, value_parser = ipv4_resolve)] + addr: SocketAddr, + #[clap(long, value_parser = ipv4_resolve)] + server_addr: Option, + #[clap(long)] + graph: Option, + #[clap(flatten)] + write_config: Option, +} + +#[hydroflow::main] +async fn main() { + let opts = Opts::parse(); + let addr = opts.addr; + + match opts.role { + Role::Client => { + let (outbound, inbound, _) = bind_udp_bytes(addr).await; + println!("Client is bound to {:?}", addr); + println!("Attempting to connect to server at {:?}", opts.server_addr); + run_client(outbound, inbound, opts).await; + } + Role::Server => { + run_server(opts.addr).await; + } + } +} + +async fn send( + outbound: &mut SplitSink, (Bytes, SocketAddr)>, + x: ServerReq, + addr: SocketAddr, +) { + outbound + .send((hydroflow::util::serialize_to_bytes(x), addr)) + .await + .unwrap(); +} + +async fn read(inbound: &mut SplitStream>) -> ServerResp { + use futures::StreamExt; + + let Some(Ok((bytes, src))) = inbound.next().await else { + panic!() + }; + + hydroflow::util::deserialize_from_bytes(bytes).unwrap() +} + +// #[multiplatform_test(hydroflow, env_tracing)] +#[hydroflow::test] +async fn test_server() { + let server_addr_1 = "127.0.0.1:2098".parse().unwrap(); + let server_addr_2: SocketAddr = "127.0.0.1:2096".parse().unwrap(); + + tokio::task::spawn_local(run_server(server_addr_1)); + // tokio::task::spawn_local(run_server(server_addr_2)); + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + let (mut outbound, mut inbound, _) = bind_udp_bytes("127.0.0.1:0".parse().unwrap()).await; + + send( + &mut outbound, + ServerReq::AddNode { + node_id: server_addr_1, + }, + server_addr_1, + ) + .await; + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + send( + &mut outbound, + ServerReq::AddNode { + node_id: server_addr_2, + }, + server_addr_1, + ) + .await; + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + // send( + // &mut outbound, + // ServerReq::RemoveNode { + // node_id: server_addr_2, + // }, + // server_addr_1, + // ) + // .await; + // tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + send( + &mut outbound, + ServerReq::ClientPut { + key: "mykey".to_owned(), + value: "myval".to_owned(), + }, + server_addr_1, + ) + .await; + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + // send( + // &mut outbound, + // ServerReq::ClientPut { + // key: "mykey".to_owned(), + // value: "myval2".to_owned(), + // }, + // server_addr_1, + // ) + // .await; + // tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + send( + &mut outbound, + ServerReq::ClientGet { + key: "mykey".to_owned(), + }, + server_addr_1, + ) + .await; + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + println!("received from srv: {:?}", read(&mut inbound).await); + + // send( + // &mut outbound, + // ServerReq::ClientGet { + // key: "mykey2".to_owned(), + // }, + // server_addr_1, + // ) + // .await; + // tokio::time::sleep(std::time::Duration::from_millis(100)).await; + + // Need this sleep otherwise the last sent messages won't get processed before the whole process terminates. + tokio::time::sleep(std::time::Duration::from_millis(1000)).await; +} + +#[test] +fn test() { + use std::io::Write; + + use hydroflow::util::{run_cargo_example, wait_for_process_output}; + + let (_server_1, _, mut server_1_stdout) = + run_cargo_example("kvs_replicated_v2", "--role server --addr 127.0.0.1:2051"); + + let mut server_1_output = String::new(); + wait_for_process_output(&mut server_1_output, &mut server_1_stdout, "Server live!"); + + let (_client_1, mut client_1_stdin, mut client_1_stdout) = run_cargo_example( + "kvs_replicated_v2", + "--role client --addr 127.0.0.1:2052 --server-addr 127.0.0.1:2051", + ); + + let mut client_1_output = String::new(); + wait_for_process_output(&mut client_1_output, &mut client_1_stdout, "Client live!"); + + client_1_stdin.write_all(b"PUT a,7\n").unwrap(); + + // let (_server_2, _, mut server_2_stdout) = run_cargo_example( + // "kvs_replicated_v2", + // "--role server --addr 127.0.0.1:2053 --server-addr 127.0.0.1:2051", + // ); + + // let (_client_2, mut client_2_stdin, mut client_2_stdout) = run_cargo_example( + // "kvs_replicated_v2", + // "--role client --addr 127.0.0.1:2054 --server-addr 127.0.0.1:2053", + // ); + + // let mut server_2_output = String::new(); + // wait_for_process_output(&mut server_2_output, &mut server_2_stdout, "Server live!"); + // wait_for_process_output( + // &mut server_2_output, + // &mut server_2_stdout, + // r#"Message received PeerGossip \{ key: "a", value: "7" \} from 127\.0\.0\.1:2051"#, + // ); + + // let mut client_2_output = String::new(); + // wait_for_process_output(&mut client_2_output, &mut client_2_stdout, "Client live!"); + + // client_2_stdin.write_all(b"GET a\n").unwrap(); + // wait_for_process_output( + // &mut client_2_output, + // &mut client_2_stdout, + // r#"Got a Response: ServerResponse \{ key: "a", value: "7" \}"#, + // ); +} diff --git a/hydroflow/examples/kvs_replicated_v2/protocol.rs b/hydroflow/examples/kvs_replicated_v2/protocol.rs new file mode 100644 index 000000000000..58d49a8f4fc7 --- /dev/null +++ b/hydroflow/examples/kvs_replicated_v2/protocol.rs @@ -0,0 +1,146 @@ +use std::collections::HashSet; +use std::net::SocketAddr; + +use hydroflow_macro::DemuxEnum; +use lattices::map_union::{MapUnionBTreeMap, MapUnionHashMap}; +// use lattices::map_union_with_tombstones:: +use lattices::set_union::SetUnionHashSet; +use lattices::{DomPair, IsBot, LatticeFrom, Max, Merge}; +use serde::{Deserialize, Serialize}; + +pub type Key = String; +pub type Value = String; +pub type NodeId = SocketAddr; + +pub type VClock = MapUnionHashMap>; +// pub type Anna = MapUnionHashMap>>; + +pub type Anna = MapUnionHashMap>; + +#[derive(Clone, Debug)] +pub enum AnnaValue { + Value(SetUnionHashSet), + HashRing(MapUnionBTreeMap>), +} + +impl Merge for AnnaValue { + fn merge(&mut self, other: AnnaValue) -> bool { + match (self, other) { + (AnnaValue::Value(x), AnnaValue::Value(y)) => x.merge(y), + (AnnaValue::HashRing(x), AnnaValue::HashRing(y)) => x.merge(y), + _ => panic!(), + } + } +} + +impl IsBot for AnnaValue { + fn is_bot(&self) -> bool { + match self { + AnnaValue::Value(x) => x.is_bot(), + AnnaValue::HashRing(x) => x.is_bot(), + } + } +} + +impl LatticeFrom for AnnaValue { + fn lattice_from(other: AnnaValue) -> Self { + other + } +} + +#[derive(Clone, Debug, Serialize, Deserialize, DemuxEnum)] +pub enum ServerReq { + ClientPut { key: String, value: String }, + ClientGet { key: String }, + AddNode { node_id: NodeId }, + RemoveNode { node_id: NodeId }, +} + +#[derive(Clone, Debug, Serialize, Deserialize, DemuxEnum)] +pub enum ServerReqWithSrc { + ClientPut { + src: NodeId, + key: String, + value: String, + }, + ClientGet { + src: NodeId, + key: String, + }, + AddNode { + src: NodeId, + node_id: NodeId, + }, + RemoveNode { + src: NodeId, + node_id: NodeId, + }, +} + +impl ServerReqWithSrc { + pub fn from_server_req(server_req: ServerReq, src: NodeId) -> Self { + match server_req { + ServerReq::ClientPut { key, value } => Self::ClientPut { src, key, value }, + ServerReq::ClientGet { key } => Self::ClientGet { src, key }, + ServerReq::AddNode { node_id } => Self::AddNode { src, node_id }, + ServerReq::RemoveNode { node_id } => Self::RemoveNode { src, node_id }, + } + } +} + +#[derive(Clone, Debug, Serialize, Deserialize, DemuxEnum)] +pub enum ServerResp { + ServerResponse { + key: String, + value: Option>, + }, + NotOwned { + hash_ring: MapUnionBTreeMap>, + }, +} + +// #[derive(Clone, Debug, Serialize, Deserialize, DemuxEnum)] +// pub enum KvsMessage { +// ServerResponse { key: String, value: String }, +// ClientPut { key: String, value: String }, +// ClientGet { key: String }, +// PeerGossip { key: String, value: String }, +// PeerJoin, +// } + +// #[derive(Clone, Debug, DemuxEnum)] +// pub enum KvsMessageWithAddr { +// ServerResponse { +// key: String, +// value: String, +// addr: SocketAddr, +// }, +// ClientPut { +// key: String, +// value: String, +// addr: SocketAddr, +// }, +// ClientGet { +// key: String, +// addr: SocketAddr, +// }, +// PeerGossip { +// key: String, +// value: String, +// addr: SocketAddr, +// }, +// PeerJoin { +// addr: SocketAddr, +// }, +// } +// impl KvsMessageWithAddr { +// pub fn from_message(message: KvsMessage, addr: SocketAddr) -> Self { +// match message { +// KvsMessage::ServerResponse { key, value } => Self::ServerResponse { key, value, addr }, +// KvsMessage::ClientPut { key, value } => Self::ClientPut { key, value, addr }, +// KvsMessage::ClientGet { key } => Self::ClientGet { key, addr }, +// KvsMessage::PeerGossip { key, value } => Self::PeerGossip { key, value, addr }, +// KvsMessage::PeerJoin => Self::PeerJoin { addr }, +// } +// } +// } diff --git a/hydroflow/examples/kvs_replicated_v2/right_outer_join.hf b/hydroflow/examples/kvs_replicated_v2/right_outer_join.hf new file mode 100644 index 000000000000..e90597b39a6d --- /dev/null +++ b/hydroflow/examples/kvs_replicated_v2/right_outer_join.hf @@ -0,0 +1,6 @@ +// flip the lhs and rhs +mod[0] -> [1]left_outer_join; +mod[1] -> [0]left_outer_join; + +// flip them back +left_outer_join = import!("left_outer_join.hf") -> map(|(k, (v1, v2))| (k, (v2, v1))) -> mod; diff --git a/hydroflow/examples/kvs_replicated_v2/server.rs b/hydroflow/examples/kvs_replicated_v2/server.rs new file mode 100644 index 000000000000..7b3542192791 --- /dev/null +++ b/hydroflow/examples/kvs_replicated_v2/server.rs @@ -0,0 +1,335 @@ +use std::cell::RefCell; +use std::hash::Hasher; +use std::net::SocketAddr; + +use hydroflow::hydroflow_syntax; +use hydroflow::scheduled::graph::Hydroflow; +use hydroflow::util::bind_udp_bytes; +use hydroflow_lang::graph::WriteConfig; +use hydroflow_macro::DemuxEnum; +use lattices::map_union::{MapUnionBTreeMap, MapUnionHashMap, MapUnionSingletonMap}; +use lattices::set_union::{SetUnionHashSet, SetUnionSingletonSet}; +use lattices::{Atomize, DomPair, Max}; +use rustc_hash::FxHasher; + +use crate::protocol::{Anna, AnnaValue, Key, NodeId, ServerReqWithSrc, ServerResp, Value}; +use crate::Opts; + +const HASH_RING: &str = "HASHRING"; + +// #[derive(Clone, Debug, Hash, Eq, PartialEq, DemuxEnum)] +// enum LookupType { +// Get(NodeId), +// Put(NodeId, Value), +// HashRingGet(NodeId, Key), +// HashRingPut(NodeId, Key, Value), +// } + +fn lookup_in_hash_ring( + ring: &MapUnionBTreeMap>, + hash: u64, +) -> &SetUnionHashSet { + if let Some((_keyhash, nid)) = ring.as_reveal_ref().range(hash..).next() { + return &nid; + } + + if let Some((_keyhash, nid)) = ring.as_reveal_ref().range(..hash).next() { + return &nid; + } + + panic!(); // probably the hash ring is empty... +} + +pub(crate) async fn run_server(self_node_id: NodeId) { + let (outbound, inbound, _) = bind_udp_bytes(self_node_id).await; + println!("Server live! id: {self_node_id:?}"); + + let current_version_clock = &*Box::leak(Box::new(RefCell::new(0usize))); + + let mut hf: Hydroflow = hydroflow_syntax! { + network_recv = source_stream_serde(inbound) + -> map(Result::unwrap) + // -> inspect(|(msg, addr)| println!("Message received {:?} from {:?}", msg, addr)) + -> map(|(msg, addr)| ServerReqWithSrc::from_server_req(msg, addr)) + -> demux_enum::(); + + network_recv[RemoveNode] + -> null(); + + // Request to add a node to the hash ring. + network_recv[AddNode] + -> map(|(_src, node_id)| { + let mut current_version = current_version_clock.borrow_mut(); + *current_version += 1; + let new_version = *current_version; + + let mut hasher = FxHasher::default(); + std::hash::Hash::hash(&node_id, &mut hasher); + let hash = hasher.finish(); + + println!("Adding Node: {} with hash: {}", node_id, hash); + + MapUnionSingletonMap::new_from(( + HASH_RING.to_owned(), + DomPair::new( + MapUnionSingletonMap::new_from((self_node_id, Max::new(new_version))), + AnnaValue::HashRing(MapUnionBTreeMap::new_from([(hash, SetUnionHashSet::new_from([node_id]))])), + )), + ) + }) + -> _upcast(Some(Delta)) + -> lattice_updates; + + lattice_updates = union() + -> persist() + -> lattice_fold(Anna::default) // This is where all the anna state lives, hash ring and writes. This should be gossiped. + -> identity::() + // -> inspect(|x| println!("{}, after_lattice_fold. x: {x:?}", context.current_tick())) + -> flat_map(|x: Anna| { + // should be able to atomize this thing without it being so manual. + // Not sure what atomizing a dompair means tho. + x.into_reveal().into_iter().map(|(k, v)| { + (k.clone(), v.into_reveal().1) + }) + }) // probably this is where the gossping should be hooked in, everything after this is internal representation for querying. + -> demux(|(k, v): (String, AnnaValue), var_args!(hash_ring_updates, puts)| { + eprintln!("k: {k}, v: {v:?}"); + if k.as_str() == HASH_RING { + hash_ring_updates.give(v); + } else { + puts.give((k, v)); + } + }); + + lattice_updates[hash_ring_updates] + // -> inspect(|x| println!("{}, lattice_updates[hash_ring_updates] output. x: {x:?}", context.current_tick())) + -> [0]hash_ring_lookup; + + network_recv[ClientGet] + -> map(|(src, key): (SocketAddr, String)| ServerReqWithSrc::ClientGet{src, key}) + -> hash_ring_lookup_union; + + network_recv[ClientPut] + // -> inspect(|x| println!("network_recv[ClientPut] output. x: {x:?}")) + -> map(|(src, key, value): (SocketAddr, String, String)| ServerReqWithSrc::ClientPut{src, key, value}) + -> hash_ring_lookup_union; + + hash_ring_lookup_union = union() + -> [1]hash_ring_lookup; + + hash_ring_lookup = cross_join_multiset() + // -> inspect(|(x, y)| println!("hash_ring_lookup output. x: {x:?}, y: {y:?}")) + -> filter_map(|(hash_ring, req)| { + let key = match &req { + ServerReqWithSrc::ClientPut { src, key, value } => key, + ServerReqWithSrc::ClientGet { src, key } => key, + ServerReqWithSrc::AddNode { src, node_id } => panic!(), + ServerReqWithSrc::RemoveNode { src, node_id } => panic!(), + }; + + let mut hasher = FxHasher::default(); + std::hash::Hash::hash(&key, &mut hasher); + let hash = hasher.finish(); + + let AnnaValue::HashRing(hash_ring) = hash_ring else { + panic!(); + }; + + if lookup_in_hash_ring(&hash_ring, hash).as_reveal_ref().contains(&self_node_id) { + Some(req) + } else { + eprintln!("key: {key}, not owned by node: {self_node_id}, keyhash: {hash}, hash_ring: {hash_ring:?}"); + None + } + }) + -> defer_tick() // TODO: to mitigate some persist() -> fold issue causing the graph to tick endlessly even with no data flowing. + -> demux_enum::(); + + hash_ring_lookup[ClientGet] + -> map(|(src, k)| (k, src)) + // -> inspect(|(key, src)| println!("hash_ring_lookup[ClientGet]. key: {key}, src: {src}")) + -> [1]puts_and_gets; + hash_ring_lookup[ClientPut] + -> map(|(src, k, v)| { + let mut current_version = current_version_clock.borrow_mut(); + *current_version += 1; + let new_version = *current_version; + + MapUnionSingletonMap::new_from(( + k, + DomPair::new( + MapUnionSingletonMap::new_from((self_node_id, Max::new(new_version))), + AnnaValue::Value(SetUnionHashSet::new_from([v])), + )), + ) + }) + -> _upcast(Some(Delta)) + -> lattice_updates; + + hash_ring_lookup[AddNode] + -> inspect(|_| panic!()) + -> null(); + + hash_ring_lookup[RemoveNode] + -> inspect(|_| panic!()) + -> null(); + + lattice_updates[puts] + -> map(|(key, value)| { + let AnnaValue::Value(value) = value else { + panic!(); + }; + + (key, value) + }) + -> [0]puts_and_gets; + + puts_and_gets = import!("right_outer_join.hf") + -> inspect(|(key, (x, y))| println!("puts_and_gets output. key: {key}, x: {x:?}, y: {y}")) + -> map(|(key, (value, src))| (ServerResp::ServerResponse{ key, value: value.map(|x| x.into_reveal()) }, src)) + -> dest_sink_serde(outbound); + + + + + // -> [0]puts_and_gets; + + // demux(|(k, v): (String, AnnaValue), var_args!(hash_ring_updates, puts)| { + + // }); + + // hash_ring_join = join_multiset() + // -> null(); + + // upgrade request to lattice delta + // For a put, first we have to figure out if we are responsible for this key or not. + + + // into_lookup = union() + // -> [1]puts_and_gets; + + // network_recv[ClientPut] + // -> persist() // Although lattice_fold complains about it, we need this here. + // -> map(|(_src, k, v): (SocketAddr, String, String)| { + // let mut current_version = current_version_clock.borrow_mut(); + // *current_version += 1; + // let new_version = *current_version; + + // MapUnionSingletonMap::new_from(( + // k, + // DomPair::new( + // MapUnionSingletonMap::new_from((node_id, Max::new(new_version))), + // AnnaValue::Value(SetUnionHashSet::new_from([v])), + // )), + // ) + // }) -> store_writes; + + // puts_and_gets = import!("right_outer_join.hf") + // -> filter_map(|(key, (ring, action))| { + + // match action { + // LookupType::HashRingGet(src, key) => { + // let mut hasher = FxHasher::default(); + // std::hash::Hash::hash(&key, &mut hasher); + // let hash = hasher.finish(); + + // eprintln!("key: {} -> {}", key, hash); + + // match ring { + // Some(AnnaValue::HashRing(ring)) => { + // if lookup_in_hash_ring(&ring, hash).as_reveal_ref().contains(&self_node_id) { + // Some((key.clone(), LookupType::Get(src))) + // } else { + // eprintln!("key: {key}, not owned by node: {self_node_id}"); + // None + // } + // }, + // _ => panic!(), + // } + // }, + // LookupType::HashRingPut(src, key, value) => { + // let mut hasher = FxHasher::default(); + // std::hash::Hash::hash(&key, &mut hasher); + // let hash = hasher.finish(); + + // eprintln!("key: {} -> {}", key, hash); + + // match ring { + // Some(AnnaValue::HashRing(ring)) => { + // if lookup_in_hash_ring(&ring, hash).as_reveal_ref().contains(&self_node_id) { + // Some((key.clone(), LookupType::Put(src, value))) + // } else { + // eprintln!("key: {key}, not owned by node: {self_node_id}"); + // None + // } + // }, + // _ => panic!(), + // } + // }, + // LookupType::Get(src) => { + // println!("get. key: {key}"); + // None + // }, + // LookupType::Put(src, v) => { + // println!("put. key: {key}, value: {v}"); + // None + // }, + // } + // }) + // -> inspect(|x| eprintln!("{:?}", x)) + // -> into_lookup; + + // // Join PUTs and GETs by key + // writes = union() -> tee(); + // writes -> map(|(key, value, _addr)| (key, value)) -> writes_store; + // writes_store = persist() -> tee(); + // writes_store -> [0]lookup; + // gets -> [1]lookup; + // lookup = join(); + + // network_send = union() -> dest_sink_serde(outbound); + + // // Send GET responses back to the client address. + // lookup[1] + // -> inspect(|tup| println!("Found a match: {:?}", tup)) + // -> map(|(key, (value, client_addr))| (KvsMessage::ServerResponse { key, value }, client_addr)) + // -> network_send; + + // // Join as a peer if peer_server is set. + // source_iter_delta(peer_server) -> map(|peer_addr| (KvsMessage::PeerJoin, peer_addr)) -> network_send; + + // // Peers: When a new peer joins, send them all data. + // writes_store -> [0]peer_join; + // peers -> [1]peer_join; + // peer_join = cross_join() + // -> map(|((key, value), peer_addr)| (KvsMessage::PeerGossip { key, value }, peer_addr)) + // -> network_send; + + // // Outbound gossip. Send updates to peers. + // peers -> peer_store; + // source_iter_delta(peer_server) -> peer_store; + // peer_store = union() -> persist(); + // writes -> [0]outbound_gossip; + // peer_store -> [1]outbound_gossip; + // outbound_gossip = cross_join() + // // Don't send gossip back to the sender. + // -> filter(|((_key, _value, writer_addr), peer_addr)| writer_addr != peer_addr) + // -> map(|((key, value, _writer_addr), peer_addr)| (KvsMessage::PeerGossip { key, value }, peer_addr)) + // -> network_send; + }; + + // let serde_graph = hf + // .meta_graph() + // .expect("No graph found, maybe failed to parse."); + // serde_graph + // .open_graph( + // hydroflow_lang::graph::WriteGraphType::Mermaid, + // Some(WriteConfig { + // op_short_text: true, + // ..Default::default() + // }), + // ) + // .unwrap(); + + hf.run_async().await.unwrap(); +} diff --git a/hydroflow/src/scheduled/graph.rs b/hydroflow/src/scheduled/graph.rs index 6bb16372e6bc..a21a4947783c 100644 --- a/hydroflow/src/scheduled/graph.rs +++ b/hydroflow/src/scheduled/graph.rs @@ -202,7 +202,11 @@ impl<'a> Hydroflow<'a> { let sg_data = &mut self.subgraphs[sg_id.0]; // This must be true for the subgraph to be enqueued. assert!(sg_data.is_scheduled.take()); - tracing::trace!(sg_id = sg_id.0, "Running subgraph."); + tracing::trace!( + sg_id = sg_id.0, + sg_name = &*sg_data.name, + "Running subgraph." + ); self.context.subgraph_id = sg_id; self.context.subgraph_last_tick_run_in = sg_data.last_tick_run_in; @@ -220,6 +224,11 @@ impl<'a> Hydroflow<'a> { // If we have sent data to the next tick, then we can start the next tick. if succ_sg_data.stratum < self.context.current_stratum && !sg_data.is_lazy { self.can_start_tick = true; + tracing::trace!( + sg_id = succ_id.0, + sg_name = &*succ_sg_data.name, + "successor subgraph scheduled" + ); } // Add subgraph to stratum queue if it is not already scheduled. if !succ_sg_data.is_scheduled.replace(true) {