Skip to content

Commit

Permalink
refactor(hydro_deploy): remove unneeded Arc<RwLock< wrapping of `la…
Browse files Browse the repository at this point in the history
…unch_binary` return value (1/3) (#1338)

> Curious if there was any intention behind why it was `Arc<RwLock<`?

> I think before some refactors we took the I/O handles instead of using broadcast channels.
  • Loading branch information
MingweiSamuel authored Jul 15, 2024
1 parent 947ebc1 commit 8470238
Show file tree
Hide file tree
Showing 7 changed files with 30 additions and 66 deletions.
4 changes: 2 additions & 2 deletions hydro_deploy/core/src/azure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use super::{
ClientStrategy, Host, HostTargetType, LaunchedHost, ResourceBatch, ResourceResult,
ServerStrategy,
};
use crate::ssh::LaunchedSshHost;
use crate::ssh::LaunchedSSHHost;

pub struct LaunchedVirtualMachine {
resource_result: Arc<ResourceResult>,
Expand All @@ -20,7 +20,7 @@ pub struct LaunchedVirtualMachine {
pub external_ip: Option<String>,
}

impl LaunchedSshHost for LaunchedVirtualMachine {
impl LaunchedSSHHost for LaunchedVirtualMachine {
fn get_external_ip(&self) -> Option<String> {
self.external_ip.clone()
}
Expand Down
4 changes: 2 additions & 2 deletions hydro_deploy/core/src/gcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use super::{
ClientStrategy, Host, HostTargetType, LaunchedHost, ResourceBatch, ResourceResult,
ServerStrategy,
};
use crate::ssh::LaunchedSshHost;
use crate::ssh::LaunchedSSHHost;

pub struct LaunchedComputeEngine {
resource_result: Arc<ResourceResult>,
Expand All @@ -21,7 +21,7 @@ pub struct LaunchedComputeEngine {
pub external_ip: Option<String>,
}

impl LaunchedSshHost for LaunchedComputeEngine {
impl LaunchedSSHHost for LaunchedComputeEngine {
fn get_external_ip(&self) -> Option<String> {
self.external_ip.clone()
}
Expand Down
50 changes: 9 additions & 41 deletions hydro_deploy/core/src/hydroflow_crate/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub struct HydroflowCrateService {
/// in `server_ports`.
pub(super) server_defns: Arc<RwLock<HashMap<String, ServerPort>>>,

launched_binary: Option<Arc<RwLock<dyn LaunchedBinary>>>,
launched_binary: Option<Box<dyn LaunchedBinary>>,
started: bool,
}

Expand Down Expand Up @@ -140,33 +140,15 @@ impl HydroflowCrateService {
}

pub async fn stdout(&self) -> Receiver<String> {
self.launched_binary
.as_ref()
.unwrap()
.read()
.await
.stdout()
.await
self.launched_binary.as_deref().unwrap().stdout().await
}

pub async fn stderr(&self) -> Receiver<String> {
self.launched_binary
.as_ref()
.unwrap()
.read()
.await
.stderr()
.await
self.launched_binary.as_deref().unwrap().stderr().await
}

pub async fn exit_code(&self) -> Option<i32> {
self.launched_binary
.as_ref()
.unwrap()
.read()
.await
.exit_code()
.await
self.launched_binary.as_deref().unwrap().exit_code().await
}

fn build(&self) -> impl Future<Output = Result<&'static BuildOutput, BuildError>> {
Expand Down Expand Up @@ -262,11 +244,9 @@ impl Service for HydroflowCrateService {
serde_json::to_string::<InitConfig>(&(bind_config, self.meta.clone())).unwrap();

// request stdout before sending config so we don't miss the "ready" response
let stdout_receiver = binary.write().await.cli_stdout().await;
let stdout_receiver = binary.cli_stdout().await;

binary
.write()
.await
.stdin()
.await
.send(format!("{formatted_bind_config}\n"))
Expand Down Expand Up @@ -306,18 +286,14 @@ impl Service for HydroflowCrateService {

let stdout_receiver = self
.launched_binary
.as_mut()
.as_deref_mut()
.unwrap()
.write()
.await
.cli_stdout()
.await;

self.launched_binary
.as_mut()
.as_deref_mut()
.unwrap()
.write()
.await
.stdin()
.await
.send(format!("start: {formatted_defns}\n"))
Expand All @@ -339,22 +315,14 @@ impl Service for HydroflowCrateService {

async fn stop(&mut self) -> Result<()> {
self.launched_binary
.as_mut()
.as_deref_mut()
.unwrap()
.write()
.await
.stdin()
.await
.send("stop\n".to_string())
.await?;

self.launched_binary
.as_mut()
.unwrap()
.write()
.await
.wait()
.await;
self.launched_binary.as_deref_mut().unwrap().wait().await;

Ok(())
}
Expand Down
3 changes: 1 addition & 2 deletions hydro_deploy/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use anyhow::Result;
use async_channel::{Receiver, Sender};
use async_trait::async_trait;
use hydroflow_cli_integration::ServerBindConfig;
use tokio::sync::RwLock;

pub mod deployment;
pub use deployment::Deployment;
Expand Down Expand Up @@ -103,7 +102,7 @@ pub trait LaunchedHost: Send + Sync {
binary: &BuildOutput,
args: &[String],
perf: Option<PathBuf>,
) -> Result<Arc<RwLock<dyn LaunchedBinary>>>;
) -> Result<Box<dyn LaunchedBinary>>;

async fn forward_port(&self, addr: &SocketAddr) -> Result<SocketAddr>;
}
Expand Down
7 changes: 2 additions & 5 deletions hydro_deploy/core/src/localhost/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use anyhow::{Context, Result};
use async_process::{Command, Stdio};
use async_trait::async_trait;
use hydroflow_cli_integration::ServerBindConfig;
use tokio::sync::RwLock;

use super::{
ClientStrategy, Host, HostTargetType, LaunchedBinary, LaunchedHost, ResourceBatch,
Expand Down Expand Up @@ -158,7 +157,7 @@ impl LaunchedHost for LaunchedLocalhost {
binary: &BuildOutput,
args: &[String],
perf: Option<PathBuf>,
) -> Result<Arc<RwLock<dyn LaunchedBinary>>> {
) -> Result<Box<dyn LaunchedBinary>> {
let mut command = if let Some(perf) = perf {
println!("Profiling binary with perf");
let mut tmp = Command::new("perf");
Expand Down Expand Up @@ -186,9 +185,7 @@ impl LaunchedHost for LaunchedLocalhost {
.spawn()
.with_context(|| format!("Failed to execute command: {:?}", command))?;

Ok(Arc::new(RwLock::new(LaunchedLocalhostBinary::new(
child, id,
))))
Ok(Box::new(LaunchedLocalhostBinary::new(child, id)))
}

async fn forward_port(&self, addr: &SocketAddr) -> Result<SocketAddr> {
Expand Down
26 changes: 13 additions & 13 deletions hydro_deploy/core/src/ssh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use super::{LaunchedBinary, LaunchedHost, ResourceResult, ServerStrategy};
use crate::hydroflow_crate::build::BuildOutput;
use crate::util::prioritized_broadcast;

struct LaunchedSshBinary {
struct LaunchedSSHBinary {
_resource_result: Arc<ResourceResult>,
session: Option<AsyncSession<TcpStream>>,
channel: AsyncChannel<TcpStream>,
Expand All @@ -34,7 +34,7 @@ struct LaunchedSshBinary {
}

#[async_trait]
impl LaunchedBinary for LaunchedSshBinary {
impl LaunchedBinary for LaunchedSSHBinary {
async fn stdin(&self) -> Sender<String> {
self.stdin_sender.clone()
}
Expand Down Expand Up @@ -82,7 +82,7 @@ impl LaunchedBinary for LaunchedSshBinary {
}
}

impl Drop for LaunchedSshBinary {
impl Drop for LaunchedSSHBinary {
fn drop(&mut self) {
let session = self.session.take().unwrap();
std::thread::scope(|s| {
Expand All @@ -99,7 +99,7 @@ impl Drop for LaunchedSshBinary {
}

#[async_trait]
pub trait LaunchedSshHost: Send + Sync {
pub trait LaunchedSSHHost: Send + Sync {
fn get_internal_ip(&self) -> String;
fn get_external_ip(&self) -> Option<String>;
fn get_cloud_provider(&self) -> String;
Expand Down Expand Up @@ -127,21 +127,21 @@ pub trait LaunchedSshHost: Send + Sync {
ServerStrategy::Demux(demux) => {
let mut config_map = HashMap::new();
for (key, underlying) in demux {
config_map.insert(*key, LaunchedSshHost::server_config(self, underlying));
config_map.insert(*key, LaunchedSSHHost::server_config(self, underlying));
}

ServerBindConfig::Demux(config_map)
}
ServerStrategy::Merge(merge) => {
let mut configs = vec![];
for underlying in merge {
configs.push(LaunchedSshHost::server_config(self, underlying));
configs.push(LaunchedSSHHost::server_config(self, underlying));
}

ServerBindConfig::Merge(configs)
}
ServerStrategy::Tagged(underlying, id) => ServerBindConfig::Tagged(
Box::new(LaunchedSshHost::server_config(self, underlying)),
Box::new(LaunchedSSHHost::server_config(self, underlying)),
*id,
),
ServerStrategy::Null => ServerBindConfig::Null,
Expand Down Expand Up @@ -198,9 +198,9 @@ pub trait LaunchedSshHost: Send + Sync {
}

#[async_trait]
impl<T: LaunchedSshHost> LaunchedHost for T {
impl<T: LaunchedSSHHost> LaunchedHost for T {
fn server_config(&self, bind_type: &ServerStrategy) -> ServerBindConfig {
LaunchedSshHost::server_config(self, bind_type)
LaunchedSSHHost::server_config(self, bind_type)
}

async fn copy_binary(&self, binary: &BuildOutput) -> Result<()> {
Expand Down Expand Up @@ -277,7 +277,7 @@ impl<T: LaunchedSshHost> LaunchedHost for T {
binary: &BuildOutput,
args: &[String],
perf: Option<PathBuf>,
) -> Result<Arc<RwLock<dyn LaunchedBinary>>> {
) -> Result<Box<dyn LaunchedBinary>> {
let session = self.open_ssh_session().await?;

let unique_name = &binary.unique_id;
Expand Down Expand Up @@ -310,7 +310,7 @@ impl<T: LaunchedSshHost> LaunchedHost for T {
.exec(&format!("{binary_path_string}{args_string}"))
.await?;
if perf.is_some() {
todo!("Perf profiling on remote machines is not supported");
todo!("Profiling on remote machines is not (yet) supported");
}

anyhow::Ok(channel)
Expand Down Expand Up @@ -340,15 +340,15 @@ impl<T: LaunchedSshHost> LaunchedHost for T {
eprintln!("[{id}] {s}")
});

Ok(Arc::new(RwLock::new(LaunchedSshBinary {
Ok(Box::new(LaunchedSSHBinary {
_resource_result: self.resource_result().clone(),
session: Some(session),
channel,
stdin_sender,
stdout_cli_receivers,
stdout_receivers,
stderr_receivers,
})))
}))
}

async fn forward_port(&self, addr: &SocketAddr) -> Result<SocketAddr> {
Expand Down
2 changes: 1 addition & 1 deletion hydro_deploy/hydro_cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use tokio::sync::RwLock;

mod cli;
use hydro_deploy as core;
use hydro_deploy::ssh::LaunchedSshHost;
use hydro_deploy::ssh::LaunchedSSHHost;

static TOKIO_RUNTIME: std::sync::RwLock<Option<tokio::runtime::Runtime>> =
std::sync::RwLock::new(None);
Expand Down

0 comments on commit 8470238

Please sign in to comment.