From 8470238bdd4fb0d6aa9b4dcf6064dfab29ce5af6 Mon Sep 17 00:00:00 2001 From: Mingwei Samuel Date: Sun, 14 Jul 2024 18:52:03 -0700 Subject: [PATCH] refactor(hydro_deploy): remove unneeded `Arc Curious if there was any intention behind why it was `Arc I think before some refactors we took the I/O handles instead of using broadcast channels. --- hydro_deploy/core/src/azure.rs | 4 +- hydro_deploy/core/src/gcp.rs | 4 +- .../core/src/hydroflow_crate/service.rs | 50 ++++--------------- hydro_deploy/core/src/lib.rs | 3 +- hydro_deploy/core/src/localhost/mod.rs | 7 +-- hydro_deploy/core/src/ssh.rs | 26 +++++----- hydro_deploy/hydro_cli/src/lib.rs | 2 +- 7 files changed, 30 insertions(+), 66 deletions(-) diff --git a/hydro_deploy/core/src/azure.rs b/hydro_deploy/core/src/azure.rs index 554d4675ad63..978360abc7b8 100644 --- a/hydro_deploy/core/src/azure.rs +++ b/hydro_deploy/core/src/azure.rs @@ -11,7 +11,7 @@ use super::{ ClientStrategy, Host, HostTargetType, LaunchedHost, ResourceBatch, ResourceResult, ServerStrategy, }; -use crate::ssh::LaunchedSshHost; +use crate::ssh::LaunchedSSHHost; pub struct LaunchedVirtualMachine { resource_result: Arc, @@ -20,7 +20,7 @@ pub struct LaunchedVirtualMachine { pub external_ip: Option, } -impl LaunchedSshHost for LaunchedVirtualMachine { +impl LaunchedSSHHost for LaunchedVirtualMachine { fn get_external_ip(&self) -> Option { self.external_ip.clone() } diff --git a/hydro_deploy/core/src/gcp.rs b/hydro_deploy/core/src/gcp.rs index 2bd34ce6d851..86ab009f137b 100644 --- a/hydro_deploy/core/src/gcp.rs +++ b/hydro_deploy/core/src/gcp.rs @@ -12,7 +12,7 @@ use super::{ ClientStrategy, Host, HostTargetType, LaunchedHost, ResourceBatch, ResourceResult, ServerStrategy, }; -use crate::ssh::LaunchedSshHost; +use crate::ssh::LaunchedSSHHost; pub struct LaunchedComputeEngine { resource_result: Arc, @@ -21,7 +21,7 @@ pub struct LaunchedComputeEngine { pub external_ip: Option, } -impl LaunchedSshHost for LaunchedComputeEngine { +impl LaunchedSSHHost for LaunchedComputeEngine { fn get_external_ip(&self) -> Option { self.external_ip.clone() } diff --git a/hydro_deploy/core/src/hydroflow_crate/service.rs b/hydro_deploy/core/src/hydroflow_crate/service.rs index bcb9398d1f78..71531265b89e 100644 --- a/hydro_deploy/core/src/hydroflow_crate/service.rs +++ b/hydro_deploy/core/src/hydroflow_crate/service.rs @@ -42,7 +42,7 @@ pub struct HydroflowCrateService { /// in `server_ports`. pub(super) server_defns: Arc>>, - launched_binary: Option>>, + launched_binary: Option>, started: bool, } @@ -140,33 +140,15 @@ impl HydroflowCrateService { } pub async fn stdout(&self) -> Receiver { - self.launched_binary - .as_ref() - .unwrap() - .read() - .await - .stdout() - .await + self.launched_binary.as_deref().unwrap().stdout().await } pub async fn stderr(&self) -> Receiver { - self.launched_binary - .as_ref() - .unwrap() - .read() - .await - .stderr() - .await + self.launched_binary.as_deref().unwrap().stderr().await } pub async fn exit_code(&self) -> Option { - self.launched_binary - .as_ref() - .unwrap() - .read() - .await - .exit_code() - .await + self.launched_binary.as_deref().unwrap().exit_code().await } fn build(&self) -> impl Future> { @@ -262,11 +244,9 @@ impl Service for HydroflowCrateService { serde_json::to_string::(&(bind_config, self.meta.clone())).unwrap(); // request stdout before sending config so we don't miss the "ready" response - let stdout_receiver = binary.write().await.cli_stdout().await; + let stdout_receiver = binary.cli_stdout().await; binary - .write() - .await .stdin() .await .send(format!("{formatted_bind_config}\n")) @@ -306,18 +286,14 @@ impl Service for HydroflowCrateService { let stdout_receiver = self .launched_binary - .as_mut() + .as_deref_mut() .unwrap() - .write() - .await .cli_stdout() .await; self.launched_binary - .as_mut() + .as_deref_mut() .unwrap() - .write() - .await .stdin() .await .send(format!("start: {formatted_defns}\n")) @@ -339,22 +315,14 @@ impl Service for HydroflowCrateService { async fn stop(&mut self) -> Result<()> { self.launched_binary - .as_mut() + .as_deref_mut() .unwrap() - .write() - .await .stdin() .await .send("stop\n".to_string()) .await?; - self.launched_binary - .as_mut() - .unwrap() - .write() - .await - .wait() - .await; + self.launched_binary.as_deref_mut().unwrap().wait().await; Ok(()) } diff --git a/hydro_deploy/core/src/lib.rs b/hydro_deploy/core/src/lib.rs index 54134f9ef32c..12b8114003f6 100644 --- a/hydro_deploy/core/src/lib.rs +++ b/hydro_deploy/core/src/lib.rs @@ -7,7 +7,6 @@ use anyhow::Result; use async_channel::{Receiver, Sender}; use async_trait::async_trait; use hydroflow_cli_integration::ServerBindConfig; -use tokio::sync::RwLock; pub mod deployment; pub use deployment::Deployment; @@ -103,7 +102,7 @@ pub trait LaunchedHost: Send + Sync { binary: &BuildOutput, args: &[String], perf: Option, - ) -> Result>>; + ) -> Result>; async fn forward_port(&self, addr: &SocketAddr) -> Result; } diff --git a/hydro_deploy/core/src/localhost/mod.rs b/hydro_deploy/core/src/localhost/mod.rs index 2b4c00b818a3..ec27b1901375 100644 --- a/hydro_deploy/core/src/localhost/mod.rs +++ b/hydro_deploy/core/src/localhost/mod.rs @@ -7,7 +7,6 @@ use anyhow::{Context, Result}; use async_process::{Command, Stdio}; use async_trait::async_trait; use hydroflow_cli_integration::ServerBindConfig; -use tokio::sync::RwLock; use super::{ ClientStrategy, Host, HostTargetType, LaunchedBinary, LaunchedHost, ResourceBatch, @@ -158,7 +157,7 @@ impl LaunchedHost for LaunchedLocalhost { binary: &BuildOutput, args: &[String], perf: Option, - ) -> Result>> { + ) -> Result> { let mut command = if let Some(perf) = perf { println!("Profiling binary with perf"); let mut tmp = Command::new("perf"); @@ -186,9 +185,7 @@ impl LaunchedHost for LaunchedLocalhost { .spawn() .with_context(|| format!("Failed to execute command: {:?}", command))?; - Ok(Arc::new(RwLock::new(LaunchedLocalhostBinary::new( - child, id, - )))) + Ok(Box::new(LaunchedLocalhostBinary::new(child, id))) } async fn forward_port(&self, addr: &SocketAddr) -> Result { diff --git a/hydro_deploy/core/src/ssh.rs b/hydro_deploy/core/src/ssh.rs index 5895d5242f06..abbd8ff5a865 100644 --- a/hydro_deploy/core/src/ssh.rs +++ b/hydro_deploy/core/src/ssh.rs @@ -23,7 +23,7 @@ use super::{LaunchedBinary, LaunchedHost, ResourceResult, ServerStrategy}; use crate::hydroflow_crate::build::BuildOutput; use crate::util::prioritized_broadcast; -struct LaunchedSshBinary { +struct LaunchedSSHBinary { _resource_result: Arc, session: Option>, channel: AsyncChannel, @@ -34,7 +34,7 @@ struct LaunchedSshBinary { } #[async_trait] -impl LaunchedBinary for LaunchedSshBinary { +impl LaunchedBinary for LaunchedSSHBinary { async fn stdin(&self) -> Sender { self.stdin_sender.clone() } @@ -82,7 +82,7 @@ impl LaunchedBinary for LaunchedSshBinary { } } -impl Drop for LaunchedSshBinary { +impl Drop for LaunchedSSHBinary { fn drop(&mut self) { let session = self.session.take().unwrap(); std::thread::scope(|s| { @@ -99,7 +99,7 @@ impl Drop for LaunchedSshBinary { } #[async_trait] -pub trait LaunchedSshHost: Send + Sync { +pub trait LaunchedSSHHost: Send + Sync { fn get_internal_ip(&self) -> String; fn get_external_ip(&self) -> Option; fn get_cloud_provider(&self) -> String; @@ -127,7 +127,7 @@ pub trait LaunchedSshHost: Send + Sync { ServerStrategy::Demux(demux) => { let mut config_map = HashMap::new(); for (key, underlying) in demux { - config_map.insert(*key, LaunchedSshHost::server_config(self, underlying)); + config_map.insert(*key, LaunchedSSHHost::server_config(self, underlying)); } ServerBindConfig::Demux(config_map) @@ -135,13 +135,13 @@ pub trait LaunchedSshHost: Send + Sync { ServerStrategy::Merge(merge) => { let mut configs = vec![]; for underlying in merge { - configs.push(LaunchedSshHost::server_config(self, underlying)); + configs.push(LaunchedSSHHost::server_config(self, underlying)); } ServerBindConfig::Merge(configs) } ServerStrategy::Tagged(underlying, id) => ServerBindConfig::Tagged( - Box::new(LaunchedSshHost::server_config(self, underlying)), + Box::new(LaunchedSSHHost::server_config(self, underlying)), *id, ), ServerStrategy::Null => ServerBindConfig::Null, @@ -198,9 +198,9 @@ pub trait LaunchedSshHost: Send + Sync { } #[async_trait] -impl LaunchedHost for T { +impl LaunchedHost for T { fn server_config(&self, bind_type: &ServerStrategy) -> ServerBindConfig { - LaunchedSshHost::server_config(self, bind_type) + LaunchedSSHHost::server_config(self, bind_type) } async fn copy_binary(&self, binary: &BuildOutput) -> Result<()> { @@ -277,7 +277,7 @@ impl LaunchedHost for T { binary: &BuildOutput, args: &[String], perf: Option, - ) -> Result>> { + ) -> Result> { let session = self.open_ssh_session().await?; let unique_name = &binary.unique_id; @@ -310,7 +310,7 @@ impl LaunchedHost for T { .exec(&format!("{binary_path_string}{args_string}")) .await?; if perf.is_some() { - todo!("Perf profiling on remote machines is not supported"); + todo!("Profiling on remote machines is not (yet) supported"); } anyhow::Ok(channel) @@ -340,7 +340,7 @@ impl LaunchedHost for T { eprintln!("[{id}] {s}") }); - Ok(Arc::new(RwLock::new(LaunchedSshBinary { + Ok(Box::new(LaunchedSSHBinary { _resource_result: self.resource_result().clone(), session: Some(session), channel, @@ -348,7 +348,7 @@ impl LaunchedHost for T { stdout_cli_receivers, stdout_receivers, stderr_receivers, - }))) + })) } async fn forward_port(&self, addr: &SocketAddr) -> Result { diff --git a/hydro_deploy/hydro_cli/src/lib.rs b/hydro_deploy/hydro_cli/src/lib.rs index 1d659e7a2ab4..9c2cf769953d 100644 --- a/hydro_deploy/hydro_cli/src/lib.rs +++ b/hydro_deploy/hydro_cli/src/lib.rs @@ -26,7 +26,7 @@ use tokio::sync::RwLock; mod cli; use hydro_deploy as core; -use hydro_deploy::ssh::LaunchedSshHost; +use hydro_deploy::ssh::LaunchedSSHHost; static TOKIO_RUNTIME: std::sync::RwLock> = std::sync::RwLock::new(None);