From fc18bc6a18e5821336fb6b9f3419148577401248 Mon Sep 17 00:00:00 2001 From: Alex Zenla Date: Tue, 14 May 2024 11:29:12 -0700 Subject: [PATCH] feat(runtime): concurrent ip allocation (#151) Previously, krata runtime allowed a single permit when performing operations. This was necessary because the only IP allocation storage was xenstore, and the commit of xenstore data happens after allocation. This commit introduces IpVendor, a service which vends IPv4 and IPv6 addresses to guests using a linear address strategy within an IP network space. The IpVendor table is initialized from xenstore, and from there on out, the in-memory table is the source of truth. This implementation is not perfect, but it will allow us to lift the single permit limit, allowing guests to start concurrently. --- crates/daemon/src/lib.rs | 2 +- crates/runtime/src/ip.rs | 331 +++++++++++++++++++++++++++++++++++ crates/runtime/src/launch.rs | 114 ++++-------- crates/runtime/src/lib.rs | 40 ++++- 4 files changed, 400 insertions(+), 87 deletions(-) create mode 100644 crates/runtime/src/ip.rs diff --git a/crates/daemon/src/lib.rs b/crates/daemon/src/lib.rs index e06d1a75..2f31b210 100644 --- a/crates/daemon/src/lib.rs +++ b/crates/daemon/src/lib.rs @@ -93,7 +93,7 @@ impl Daemon { let addons_path = detect_guest_path(&store, "addons.squashfs")?; let packer = OciPackerService::new(None, &image_cache_dir, OciPlatform::current()).await?; - let runtime = Runtime::new().await?; + let runtime = Runtime::new(host_uuid).await?; let glt = GuestLookupTable::new(0, host_uuid); let guests_db_path = format!("{}/guests.db", store); let guests = GuestStore::open(&PathBuf::from(guests_db_path))?; diff --git a/crates/runtime/src/ip.rs b/crates/runtime/src/ip.rs new file mode 100644 index 00000000..9fedd89c --- /dev/null +++ b/crates/runtime/src/ip.rs @@ -0,0 +1,331 @@ +use std::{ + collections::HashMap, + net::{Ipv4Addr, Ipv6Addr}, + str::FromStr, + sync::Arc, +}; + +use anyhow::{anyhow, Result}; +use ipnetwork::{Ipv4Network, Ipv6Network}; +use log::error; +use tokio::sync::RwLock; +use uuid::Uuid; +use xenstore::{XsdClient, XsdInterface}; + +#[derive(Default, Clone)] +pub struct IpVendorState { + pub ipv4: HashMap, + pub ipv6: HashMap, + pub pending_ipv4: HashMap, + pub pending_ipv6: HashMap, +} + +#[derive(Clone)] +pub struct IpVendor { + store: XsdClient, + host_uuid: Uuid, + ipv4_network: Ipv4Network, + ipv6_network: Ipv6Network, + gateway_ipv4: Ipv4Addr, + gateway_ipv6: Ipv6Addr, + state: Arc>, +} + +pub struct IpAssignment { + vendor: IpVendor, + pub uuid: Uuid, + pub ipv4: Ipv4Addr, + pub ipv6: Ipv6Addr, + pub ipv4_prefix: u8, + pub ipv6_prefix: u8, + pub gateway_ipv4: Ipv4Addr, + pub gateway_ipv6: Ipv6Addr, + pub committed: bool, +} + +impl IpAssignment { + pub async fn commit(&mut self) -> Result<()> { + self.vendor.commit(self).await?; + self.committed = true; + Ok(()) + } +} + +impl Drop for IpAssignment { + fn drop(&mut self) { + if !self.committed { + let ipv4 = self.ipv4; + let ipv6 = self.ipv6; + let uuid = self.uuid; + let vendor = self.vendor.clone(); + tokio::task::spawn(async move { + let _ = vendor.recall_raw(ipv4, ipv6, uuid, true).await; + }); + } + } +} + +impl IpVendor { + pub async fn new( + store: XsdClient, + host_uuid: Uuid, + ipv4_network: Ipv4Network, + ipv6_network: Ipv6Network, + ) -> Result { + let mut state = IpVendor::fetch_stored_state(&store).await?; + let (gateway_ipv4, gateway_ipv6) = + IpVendor::allocate_ipset(&mut state, host_uuid, ipv4_network, ipv6_network)?; + let vend = IpVendor { + store, + host_uuid, + ipv4_network, + ipv6_network, + gateway_ipv4, + gateway_ipv6, + state: Arc::new(RwLock::new(state)), + }; + Ok(vend) + } + + async fn fetch_stored_state(store: &XsdClient) -> Result { + let mut state = IpVendorState::default(); + for domid_candidate in store.list("/local/domain").await? { + let dom_path = format!("/local/domain/{}", domid_candidate); + let Some(uuid) = store + .read_string(format!("{}/krata/uuid", dom_path)) + .await? + .and_then(|x| Uuid::from_str(&x).ok()) + else { + continue; + }; + let assigned_ipv4 = store + .read_string(format!("{}/krata/network/guest/ipv4", dom_path)) + .await? + .and_then(|x| Ipv4Network::from_str(&x).ok()); + let assigned_ipv6 = store + .read_string(format!("{}/krata/network/guest/ipv6", dom_path)) + .await? + .and_then(|x| Ipv6Network::from_str(&x).ok()); + + if let Some(existing_ipv4) = assigned_ipv4 { + if let Some(previous) = state.ipv4.insert(existing_ipv4.ip(), uuid) { + error!("ipv4 conflict detected: guest {} owned {} but {} also claimed to own it, giving it to {}", previous, existing_ipv4.ip(), uuid, uuid); + } + } + + if let Some(existing_ipv6) = assigned_ipv6 { + if let Some(previous) = state.ipv6.insert(existing_ipv6.ip(), uuid) { + error!("ipv6 conflict detected: guest {} owned {} but {} also claimed to own it, giving it to {}", previous, existing_ipv6.ip(), uuid, uuid); + } + } + } + Ok(state) + } + + fn allocate_ipset( + state: &mut IpVendorState, + uuid: Uuid, + ipv4_network: Ipv4Network, + ipv6_network: Ipv6Network, + ) -> Result<(Ipv4Addr, Ipv6Addr)> { + let mut found_ipv4: Option = None; + for ip in ipv4_network.iter() { + if ip.is_loopback() || ip.is_multicast() || ip.is_broadcast() { + continue; + } + + if !ip.is_private() { + continue; + } + + let last = ip.octets()[3]; + if last == 0 || last > 250 { + continue; + } + + if state.ipv4.contains_key(&ip) { + continue; + } + found_ipv4 = Some(ip); + break; + } + + let mut found_ipv6: Option = None; + for ip in ipv6_network.iter() { + if ip.is_loopback() || ip.is_multicast() { + continue; + } + + if state.ipv6.contains_key(&ip) { + continue; + } + found_ipv6 = Some(ip); + break; + } + + let Some(ipv4) = found_ipv4 else { + return Err(anyhow!( + "unable to allocate ipv4 address, assigned network is exhausted" + )); + }; + + let Some(ipv6) = found_ipv6 else { + return Err(anyhow!( + "unable to allocate ipv6 address, assigned network is exhausted" + )); + }; + + state.ipv4.insert(ipv4, uuid); + state.ipv6.insert(ipv6, uuid); + + Ok((ipv4, ipv6)) + } + + pub async fn assign(&self, uuid: Uuid) -> Result { + let mut state = self.state.write().await; + let (ipv4, ipv6) = + IpVendor::allocate_ipset(&mut state, uuid, self.ipv4_network, self.ipv6_network)?; + state.pending_ipv4.insert(ipv4, uuid); + state.pending_ipv6.insert(ipv6, uuid); + Ok(IpAssignment { + vendor: self.clone(), + uuid, + ipv4, + ipv6, + ipv4_prefix: self.ipv4_network.prefix(), + ipv6_prefix: self.ipv6_network.prefix(), + gateway_ipv4: self.gateway_ipv4, + gateway_ipv6: self.gateway_ipv6, + committed: false, + }) + } + + pub async fn commit(&self, assignment: &IpAssignment) -> Result<()> { + let mut state = self.state.write().await; + if state.pending_ipv4.remove(&assignment.ipv4) != Some(assignment.uuid) { + return Err(anyhow!("matching pending ipv4 assignment was not found")); + } + if state.pending_ipv6.remove(&assignment.ipv6) != Some(assignment.uuid) { + return Err(anyhow!("matching pending ipv6 assignment was not found")); + } + Ok(()) + } + + async fn recall_raw( + &self, + ipv4: Ipv4Addr, + ipv6: Ipv6Addr, + uuid: Uuid, + pending: bool, + ) -> Result<()> { + let mut state = self.state.write().await; + if pending { + if state.pending_ipv4.remove(&ipv4) != Some(uuid) { + return Err(anyhow!("matching pending ipv4 assignment was not found")); + } + if state.pending_ipv6.remove(&ipv6) != Some(uuid) { + return Err(anyhow!("matching pending ipv6 assignment was not found")); + } + } + + if state.ipv4.remove(&ipv4) != Some(uuid) { + return Err(anyhow!("matching allocated ipv4 assignment was not found")); + } + + if state.ipv6.remove(&ipv6) != Some(uuid) { + return Err(anyhow!("matching allocated ipv6 assignment was not found")); + } + Ok(()) + } + + pub async fn recall(&self, assignment: &IpAssignment) -> Result<()> { + self.recall_raw(assignment.ipv4, assignment.ipv6, assignment.uuid, false) + .await?; + Ok(()) + } + + pub async fn reload(&self) -> Result<()> { + let mut state = self.state.write().await; + let mut intermediate = IpVendor::fetch_stored_state(&self.store).await?; + intermediate.ipv4.insert(self.gateway_ipv4, self.host_uuid); + intermediate.ipv6.insert(self.gateway_ipv6, self.host_uuid); + for (ipv4, uuid) in &state.pending_ipv4 { + if let Some(previous) = intermediate.ipv4.insert(*ipv4, *uuid) { + error!("ipv4 conflict detected: guest {} owned (pending) {} but {} also claimed to own it, giving it to {}", previous, ipv4, uuid, uuid); + } + intermediate.pending_ipv4.insert(*ipv4, *uuid); + } + for (ipv6, uuid) in &state.pending_ipv6 { + if let Some(previous) = intermediate.ipv6.insert(*ipv6, *uuid) { + error!("ipv6 conflict detected: guest {} owned (pending) {} but {} also claimed to own it, giving it to {}", previous, ipv6, uuid, uuid); + } + intermediate.pending_ipv6.insert(*ipv6, *uuid); + } + *state = intermediate; + Ok(()) + } + + pub async fn read_domain_assignment( + &self, + uuid: Uuid, + domid: u32, + ) -> Result> { + let dom_path = format!("/local/domain/{}", domid); + let Some(guest_ipv4) = self + .store + .read_string(format!("{}/krata/network/guest/ipv4", dom_path)) + .await? + else { + return Ok(None); + }; + let Some(guest_ipv6) = self + .store + .read_string(format!("{}/krata/network/guest/ipv6", dom_path)) + .await? + else { + return Ok(None); + }; + let Some(gateway_ipv4) = self + .store + .read_string(format!("{}/krata/network/gateway/ipv4", dom_path)) + .await? + else { + return Ok(None); + }; + let Some(gateway_ipv6) = self + .store + .read_string(format!("{}/krata/network/gateway/ipv6", dom_path)) + .await? + else { + return Ok(None); + }; + + let Some(guest_ipv4) = Ipv4Network::from_str(&guest_ipv4).ok() else { + return Ok(None); + }; + let Some(guest_ipv6) = Ipv6Network::from_str(&guest_ipv6).ok() else { + return Ok(None); + }; + let Some(gateway_ipv4) = Ipv4Network::from_str(&gateway_ipv4).ok() else { + return Ok(None); + }; + let Some(gateway_ipv6) = Ipv6Network::from_str(&gateway_ipv6).ok() else { + return Ok(None); + }; + Ok(Some(IpAssignment { + vendor: self.clone(), + uuid, + ipv4: guest_ipv4.ip(), + ipv4_prefix: guest_ipv4.prefix(), + ipv6: guest_ipv6.ip(), + ipv6_prefix: guest_ipv6.prefix(), + gateway_ipv4: gateway_ipv4.ip(), + gateway_ipv6: gateway_ipv6.ip(), + committed: true, + })) + } + + pub async fn read(&self) -> Result { + Ok(self.state.read().await.clone()) + } +} diff --git a/crates/runtime/src/launch.rs b/crates/runtime/src/launch.rs index 10e72865..8c19a454 100644 --- a/crates/runtime/src/launch.rs +++ b/crates/runtime/src/launch.rs @@ -1,12 +1,12 @@ use std::collections::HashMap; -use std::net::{IpAddr, Ipv6Addr}; +use std::fs; +use std::net::IpAddr; use std::path::PathBuf; use std::sync::Arc; -use std::{fs, net::Ipv4Addr, str::FromStr}; use advmac::MacAddr6; use anyhow::{anyhow, Result}; -use ipnetwork::{IpNetwork, Ipv4Network}; +use ipnetwork::IpNetwork; use krata::launchcfg::{ LaunchInfo, LaunchNetwork, LaunchNetworkIpv4, LaunchNetworkIpv6, LaunchNetworkResolver, LaunchPackedFormat, LaunchRoot, @@ -15,7 +15,6 @@ use krataoci::packer::OciPackedImage; use tokio::sync::Semaphore; use uuid::Uuid; use xenclient::{DomainChannel, DomainConfig, DomainDisk, DomainNetworkInterface}; -use xenstore::XsdInterface; use crate::cfgblk::ConfigBlock; use crate::RuntimeContext; @@ -66,13 +65,7 @@ impl GuestLauncher { container_mac.set_multicast(false); let _launch_permit = self.launch_semaphore.acquire().await?; - let guest_ipv4 = self.allocate_ipv4(context).await?; - let guest_ipv6 = container_mac.to_link_local_ipv6(); - let gateway_ipv4 = "10.75.70.1"; - let gateway_ipv6 = "fe80::1"; - let ipv4_network_mask: u32 = 16; - let ipv6_network_mask: u32 = 10; - + let mut ip = context.ipvendor.assign(uuid).await?; let launch_config = LaunchInfo { root: LaunchRoot { format: request.format.clone(), @@ -87,12 +80,12 @@ impl GuestLauncher { network: Some(LaunchNetwork { link: "eth0".to_string(), ipv4: LaunchNetworkIpv4 { - address: format!("{}/{}", guest_ipv4, ipv4_network_mask), - gateway: gateway_ipv4.to_string(), + address: format!("{}/{}", ip.ipv4, ip.ipv4_prefix), + gateway: ip.gateway_ipv4.to_string(), }, ipv6: LaunchNetworkIpv6 { - address: format!("{}/{}", guest_ipv6, ipv6_network_mask), - gateway: gateway_ipv6.to_string(), + address: format!("{}/{}", ip.ipv6, ip.ipv6_prefix), + gateway: ip.gateway_ipv6.to_string(), }, resolver: LaunchNetworkResolver { nameservers: vec![ @@ -198,11 +191,11 @@ impl GuestLauncher { ("krata/loops".to_string(), loops.join(",")), ( "krata/network/guest/ipv4".to_string(), - format!("{}/{}", guest_ipv4, ipv4_network_mask), + format!("{}/{}", ip.ipv4, ip.ipv4_prefix), ), ( "krata/network/guest/ipv6".to_string(), - format!("{}/{}", guest_ipv6, ipv6_network_mask), + format!("{}/{}", ip.ipv6, ip.ipv6_prefix), ), ( "krata/network/guest/mac".to_string(), @@ -210,11 +203,11 @@ impl GuestLauncher { ), ( "krata/network/gateway/ipv4".to_string(), - format!("{}/{}", gateway_ipv4, ipv4_network_mask), + format!("{}/{}", ip.gateway_ipv4, ip.ipv4_prefix), ), ( "krata/network/gateway/ipv6".to_string(), - format!("{}/{}", gateway_ipv6, ipv6_network_mask), + format!("{}/{}", ip.gateway_ipv6, ip.ipv6_prefix), ), ( "krata/network/gateway/mac".to_string(), @@ -253,32 +246,29 @@ impl GuestLauncher { extra_rw_paths: vec!["krata/guest".to_string()], }; match context.xen.create(&config).await { - Ok(created) => Ok(GuestInfo { - name: request.name.as_ref().map(|x| x.to_string()), - uuid, - domid: created.domid, - image: request.image.digest, - loops: vec![], - guest_ipv4: Some(IpNetwork::new( - IpAddr::V4(guest_ipv4), - ipv4_network_mask as u8, - )?), - guest_ipv6: Some(IpNetwork::new( - IpAddr::V6(guest_ipv6), - ipv6_network_mask as u8, - )?), - guest_mac: Some(guest_mac_string.clone()), - gateway_ipv4: Some(IpNetwork::new( - IpAddr::V4(Ipv4Addr::from_str(gateway_ipv4)?), - ipv4_network_mask as u8, - )?), - gateway_ipv6: Some(IpNetwork::new( - IpAddr::V6(Ipv6Addr::from_str(gateway_ipv6)?), - ipv6_network_mask as u8, - )?), - gateway_mac: Some(gateway_mac_string.clone()), - state: GuestState { exit_code: None }, - }), + Ok(created) => { + ip.commit().await?; + Ok(GuestInfo { + name: request.name.as_ref().map(|x| x.to_string()), + uuid, + domid: created.domid, + image: request.image.digest, + loops: vec![], + guest_ipv4: Some(IpNetwork::new(IpAddr::V4(ip.ipv4), ip.ipv4_prefix)?), + guest_ipv6: Some(IpNetwork::new(IpAddr::V6(ip.ipv6), ip.ipv6_prefix)?), + guest_mac: Some(guest_mac_string.clone()), + gateway_ipv4: Some(IpNetwork::new( + IpAddr::V4(ip.gateway_ipv4), + ip.ipv4_prefix, + )?), + gateway_ipv6: Some(IpNetwork::new( + IpAddr::V6(ip.gateway_ipv6), + ip.ipv6_prefix, + )?), + gateway_mac: Some(gateway_mac_string.clone()), + state: GuestState { exit_code: None }, + }) + } Err(error) => { let _ = context.autoloop.unloop(&image_squashfs_loop.path).await; let _ = context.autoloop.unloop(&cfgblk_squashfs_loop.path).await; @@ -287,38 +277,4 @@ impl GuestLauncher { } } } - - async fn allocate_ipv4(&self, context: &RuntimeContext) -> Result { - let network = Ipv4Network::new(Ipv4Addr::new(10, 75, 80, 0), 24)?; - let mut used: Vec = vec![]; - for domid_candidate in context.xen.store.list("/local/domain").await? { - let dom_path = format!("/local/domain/{}", domid_candidate); - let ip_path = format!("{}/krata/network/guest/ipv4", dom_path); - let existing_ip = context.xen.store.read_string(&ip_path).await?; - if let Some(existing_ip) = existing_ip { - let ipv4_network = Ipv4Network::from_str(&existing_ip)?; - used.push(ipv4_network.ip()); - } - } - - let mut found: Option = None; - for ip in network.iter() { - let last = ip.octets()[3]; - if last == 0 || last == 255 { - continue; - } - if !used.contains(&ip) { - found = Some(ip); - break; - } - } - - if found.is_none() { - return Err(anyhow!( - "unable to find ipv4 to allocate to guest, ipv4 addresses are exhausted" - )); - } - - Ok(found.unwrap()) - } } diff --git a/crates/runtime/src/lib.rs b/crates/runtime/src/lib.rs index 288da314..8bab3e42 100644 --- a/crates/runtime/src/lib.rs +++ b/crates/runtime/src/lib.rs @@ -1,7 +1,9 @@ -use std::{fs, path::PathBuf, str::FromStr, sync::Arc}; +use std::{fs, net::Ipv4Addr, path::PathBuf, str::FromStr, sync::Arc}; use anyhow::{anyhow, Result}; -use ipnetwork::IpNetwork; +use ip::IpVendor; +use ipnetwork::{IpNetwork, Ipv4Network, Ipv6Network}; +use log::error; use loopdev::LoopControl; use tokio::sync::Semaphore; use uuid::Uuid; @@ -16,6 +18,7 @@ use self::{ pub mod autoloop; pub mod cfgblk; pub mod channel; +pub mod ip; pub mod launch; pub struct GuestLoopInfo { @@ -47,14 +50,20 @@ pub struct GuestInfo { pub struct RuntimeContext { pub autoloop: AutoLoop, pub xen: XenClient, + pub ipvendor: IpVendor, } impl RuntimeContext { - pub async fn new() -> Result { + pub async fn new(host_uuid: Uuid) -> Result { let xen = XenClient::open(0).await?; + let ipv4_network = Ipv4Network::new(Ipv4Addr::new(10, 75, 80, 0), 24)?; + let ipv6_network = Ipv6Network::from_str("fdd4:1476:6c7e::/48")?; + let ipvend = + IpVendor::new(xen.store.clone(), host_uuid, ipv4_network, ipv6_network).await?; Ok(RuntimeContext { autoloop: AutoLoop::new(LoopControl::open()?), xen, + ipvendor: ipvend, }) } @@ -217,16 +226,18 @@ impl RuntimeContext { #[derive(Clone)] pub struct Runtime { + host_uuid: Uuid, context: RuntimeContext, launch_semaphore: Arc, } impl Runtime { - pub async fn new() -> Result { - let context = RuntimeContext::new().await?; + pub async fn new(host_uuid: Uuid) -> Result { + let context = RuntimeContext::new(host_uuid).await?; Ok(Self { + host_uuid, context, - launch_semaphore: Arc::new(Semaphore::new(1)), + launch_semaphore: Arc::new(Semaphore::new(10)), }) } @@ -260,6 +271,11 @@ impl Runtime { return Err(anyhow!("unable to find krata uuid based on the domain",)); } let uuid = Uuid::parse_str(&uuid)?; + let ip = self + .context + .ipvendor + .read_domain_assignment(uuid, domid) + .await?; let loops = store .read_string(format!("{}/krata/loops", dom_path).as_str()) .await?; @@ -279,6 +295,16 @@ impl Runtime { } } } + + if let Some(ip) = ip { + if let Err(error) = self.context.ipvendor.recall(&ip).await { + error!( + "failed to recall ip assignment for guest {}: {}", + uuid, error + ); + } + } + Ok(uuid) } @@ -287,6 +313,6 @@ impl Runtime { } pub async fn dupe(&self) -> Result { - Runtime::new().await + Runtime::new(self.host_uuid).await } }