Skip to content

Commit

Permalink
refactor(hydro_deploy)!: make Host trait use &self interior mutab…
Browse files Browse the repository at this point in the history
…ility to remove `RwLock` wrappings #430 (#1347)

Depends on #1346
  • Loading branch information
MingweiSamuel authored Jul 20, 2024
1 parent f536ecc commit c5a8de2
Show file tree
Hide file tree
Showing 16 changed files with 184 additions and 255 deletions.
92 changes: 43 additions & 49 deletions hydro_deploy/core/src/azure.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::{Arc, Mutex, OnceLock};

use anyhow::Result;
use async_trait::async_trait;
Expand All @@ -12,6 +12,7 @@ use super::{
ServerStrategy,
};
use crate::ssh::LaunchedSshHost;
use crate::HostStrategyGetter;

pub struct LaunchedVirtualMachine {
resource_result: Arc<ResourceResult>,
Expand Down Expand Up @@ -50,8 +51,8 @@ pub struct AzureHost {
pub image: Option<HashMap<String, String>>,
pub region: String,
pub user: Option<String>,
pub launched: Option<Arc<LaunchedVirtualMachine>>,
external_ports: Vec<u16>,
pub launched: OnceLock<Arc<LaunchedVirtualMachine>>,
external_ports: Mutex<Vec<u16>>,
}

impl AzureHost {
Expand All @@ -72,8 +73,8 @@ impl AzureHost {
image,
region,
user,
launched: None,
external_ports: vec![],
launched: OnceLock::new(),
external_ports: Mutex::new(Vec::new()),
}
}
}
Expand All @@ -84,17 +85,17 @@ impl Host for AzureHost {
HostTargetType::Linux
}

fn request_port(&mut self, bind_type: &ServerStrategy) {
fn request_port(&self, bind_type: &ServerStrategy) {
match bind_type {
ServerStrategy::UnixSocket => {}
ServerStrategy::InternalTcpPort => {}
ServerStrategy::ExternalTcpPort(port) => {
if !self.external_ports.contains(port) {
if self.launched.is_some() {
let mut external_ports = self.external_ports.lock().unwrap();
if !external_ports.contains(port) {
if self.launched.get().is_some() {
todo!("Cannot adjust firewall after host has been launched");
}

self.external_ports.push(*port);
external_ports.push(*port);
}
}
ServerStrategy::Demux(demux) => {
Expand All @@ -114,7 +115,7 @@ impl Host for AzureHost {
}
}

fn request_custom_binary(&mut self) {
fn request_custom_binary(&self) {
self.request_port(&ServerStrategy::ExternalTcpPort(22));
}

Expand All @@ -126,12 +127,8 @@ impl Host for AzureHost {
self
}

fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
self
}

fn collect_resources(&self, resource_batch: &mut ResourceBatch) {
if self.launched.is_some() {
if self.launched.get().is_some() {
return;
}

Expand Down Expand Up @@ -398,46 +395,43 @@ impl Host for AzureHost {

fn launched(&self) -> Option<Arc<dyn LaunchedHost>> {
self.launched
.as_ref()
.get()
.map(|a| a.clone() as Arc<dyn LaunchedHost>)
}

fn provision(&mut self, resource_result: &Arc<ResourceResult>) -> Arc<dyn LaunchedHost> {
if self.launched.is_none() {
let id = self.id;

let internal_ip = resource_result
.terraform
.outputs
.get(&format!("vm-instance-{id}-internal-ip"))
.unwrap()
.value
.clone();

let external_ip = resource_result
.terraform
.outputs
.get(&format!("vm-instance-{id}-public-ip"))
.map(|v| v.value.clone());

self.launched = Some(Arc::new(LaunchedVirtualMachine {
resource_result: resource_result.clone(),
user: self.user.as_ref().cloned().unwrap_or("hydro".to_string()),
internal_ip,
external_ip,
}))
}

self.launched.as_ref().unwrap().clone()
fn provision(&self, resource_result: &Arc<ResourceResult>) -> Arc<dyn LaunchedHost> {
self.launched
.get_or_init(|| {
let id = self.id;

let internal_ip = resource_result
.terraform
.outputs
.get(&format!("vm-instance-{id}-internal-ip"))
.unwrap()
.value
.clone();

let external_ip = resource_result
.terraform
.outputs
.get(&format!("vm-instance-{id}-public-ip"))
.map(|v| v.value.clone());

Arc::new(LaunchedVirtualMachine {
resource_result: resource_result.clone(),
user: self.user.as_ref().cloned().unwrap_or("hydro".to_string()),
internal_ip,
external_ip,
})
})
.clone()
}

fn strategy_as_server<'a>(
&'a self,
client_host: &dyn Host,
) -> Result<(
ClientStrategy<'a>,
Box<dyn FnOnce(&mut dyn std::any::Any) -> ServerStrategy>,
)> {
) -> Result<(ClientStrategy<'a>, HostStrategyGetter)> {
if client_host.can_connect_to(ClientStrategy::UnixSocket(self.id)) {
Ok((
ClientStrategy::UnixSocket(self.id),
Expand All @@ -452,7 +446,7 @@ impl Host for AzureHost {
Ok((
ClientStrategy::ForwardedTcpPort(self),
Box::new(|me| {
me.downcast_mut::<AzureHost>()
me.downcast_ref::<AzureHost>()
.unwrap()
.request_port(&ServerStrategy::ExternalTcpPort(22)); // needed to forward
ServerStrategy::InternalTcpPort
Expand Down
27 changes: 10 additions & 17 deletions hydro_deploy/core/src/custom_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::hydroflow_crate::ports::ReverseSinkInstantiator;
/// Represents an unknown, third-party service that is not part of the Hydroflow ecosystem.
pub struct CustomService {
_id: usize,
on: Arc<RwLock<dyn Host>>,
on: Arc<dyn Host>,

/// The ports that the service wishes to expose to the public internet.
external_ports: Vec<u16>,
Expand All @@ -25,7 +25,7 @@ pub struct CustomService {
}

impl CustomService {
pub fn new(id: usize, on: Arc<RwLock<dyn Host>>, external_ports: Vec<u16>) -> Self {
pub fn new(id: usize, on: Arc<dyn Host>, external_ports: Vec<u16>) -> Self {
Self {
_id: id,
on,
Expand All @@ -46,10 +46,7 @@ impl Service for CustomService {
return;
}

let mut host = self
.on
.try_write()
.expect("No one should be reading/writing the host while resources are collected");
let host = &self.on;

for port in self.external_ports.iter() {
host.request_port(&ServerStrategy::ExternalTcpPort(*port));
Expand All @@ -61,8 +58,8 @@ impl Service for CustomService {
return Ok(());
}

let mut host_write = self.on.write().await;
let launched = host_write.provision(resource_result);
let host = &self.on;
let launched = host.provision(resource_result);
self.launched_host = Some(launched);
Ok(())
}
Expand Down Expand Up @@ -118,7 +115,7 @@ impl HydroflowSource for CustomClientPort {
SourcePath::Direct(self.on.upgrade().unwrap().try_read().unwrap().on.clone())
}

fn host(&self) -> Arc<RwLock<dyn Host>> {
fn host(&self) -> Arc<dyn Host> {
panic!("Custom services cannot be used as the server")
}

Expand Down Expand Up @@ -149,28 +146,24 @@ impl HydroflowSink for CustomClientPort {

fn instantiate_reverse(
&self,
server_host: &Arc<RwLock<dyn Host>>,
server_host: &Arc<dyn Host>,
server_sink: Arc<dyn HydroflowServer>,
wrap_client_port: &dyn Fn(ServerConfig) -> ServerConfig,
) -> Result<ReverseSinkInstantiator> {
let client = self.on.upgrade().unwrap();
let client_read = client.try_read().unwrap();

let server_host_clone = server_host.clone();
let server_host = server_host_clone.try_read().unwrap();
let server_host = server_host.clone();

let (conn_type, bind_type) =
server_host.strategy_as_server(client_read.on.try_read().unwrap().deref())?;
let (conn_type, bind_type) = server_host.strategy_as_server(client_read.on.deref())?;

let client_port = wrap_client_port(ServerConfig::from_strategy(&conn_type, server_sink));

let server_host_clone = server_host_clone.clone();
Ok(Box::new(move |me| {
let mut server_host = server_host_clone.try_write().unwrap();
me.downcast_ref::<CustomClientPort>()
.unwrap()
.record_server_config(client_port);
bind_type(server_host.as_any_mut())
bind_type(server_host.as_any())
}))
}
}
38 changes: 14 additions & 24 deletions hydro_deploy/core/src/deployment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::ServiceBuilder;

#[derive(Default)]
pub struct Deployment {
pub hosts: Vec<Arc<RwLock<dyn Host>>>,
pub hosts: Vec<Arc<dyn Host>>,
pub services: Vec<Weak<RwLock<dyn Service>>>,
pub resource_pool: ResourcePool,
last_resource_result: Option<Arc<ResourceResult>>,
Expand All @@ -27,7 +27,7 @@ impl Deployment {
}

#[allow(non_snake_case)]
pub fn Localhost(&mut self) -> Arc<RwLock<LocalhostHost>> {
pub fn Localhost(&mut self) -> Arc<LocalhostHost> {
self.add_host(LocalhostHost::new)
}

Expand All @@ -40,7 +40,7 @@ impl Deployment {
region: impl Into<String>,
network: Arc<RwLock<GcpNetwork>>,
user: Option<String>,
) -> Arc<RwLock<GcpComputeEngineHost>> {
) -> Arc<GcpComputeEngineHost> {
self.add_host(|id| {
GcpComputeEngineHost::new(id, project, machine_type, image, region, network, user)
})
Expand All @@ -49,7 +49,7 @@ impl Deployment {
#[allow(non_snake_case)]
pub fn CustomService(
&mut self,
on: Arc<RwLock<dyn Host>>,
on: Arc<dyn Host>,
external_ports: Vec<u16>,
) -> Arc<RwLock<CustomService>> {
self.add_service(|id| CustomService::new(id, on, external_ports))
Expand All @@ -76,29 +76,22 @@ impl Deployment {
}

for host in self.hosts.iter_mut() {
host.write().await.collect_resources(&mut resource_batch);
host.collect_resources(&mut resource_batch);
}

let result = Arc::new(
let resource_result = Arc::new(
progress::ProgressTracker::with_group("provision", None, || async {
resource_batch
.provision(&mut self.resource_pool, self.last_resource_result.clone())
.await
})
.await?,
);
self.last_resource_result = Some(result.clone());

progress::ProgressTracker::with_group("provision", None, || {
let hosts_provisioned =
self.hosts
.iter_mut()
.map(|host: &mut Arc<RwLock<dyn Host>>| async {
host.write().await.provision(&result);
});
futures::future::join_all(hosts_provisioned)
})
.await;
self.last_resource_result = Some(resource_result.clone());

for host in self.hosts.iter() {
host.provision(&resource_result);
}

progress::ProgressTracker::with_group("deploy", None, || {
let services_future = self
Expand All @@ -110,7 +103,7 @@ impl Deployment {
.unwrap()
.write()
.await
.deploy(&result)
.deploy(&resource_result)
.await
})
.collect::<Vec<_>>();
Expand Down Expand Up @@ -163,11 +156,8 @@ impl Deployment {
Ok(())
}

pub fn add_host<T: Host + 'static, F: FnOnce(usize) -> T>(
&mut self,
host: F,
) -> Arc<RwLock<T>> {
let arc = Arc::new(RwLock::new(host(self.next_host_id)));
pub fn add_host<T: Host + 'static, F: FnOnce(usize) -> T>(&mut self, host: F) -> Arc<T> {
let arc = Arc::new(host(self.next_host_id));
self.next_host_id += 1;

self.hosts.push(arc.clone());
Expand Down
Loading

0 comments on commit c5a8de2

Please sign in to comment.