-
Notifications
You must be signed in to change notification settings - Fork 11
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(runtime): concurrent ip allocation (#151)
Previously, krata runtime allowed a single permit when performing operations. This was necessary because the only IP allocation storage was xenstore, and the commit of xenstore data happens after allocation. This commit introduces IpVendor, a service which vends IPv4 and IPv6 addresses to guests using a linear address strategy within an IP network space. The IpVendor table is initialized from xenstore, and from there on out, the in-memory table is the source of truth. This implementation is not perfect, but it will allow us to lift the single permit limit, allowing guests to start concurrently.
- Loading branch information
Showing
4 changed files
with
400 additions
and
87 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,331 @@ | ||
use std::{ | ||
collections::HashMap, | ||
net::{Ipv4Addr, Ipv6Addr}, | ||
str::FromStr, | ||
sync::Arc, | ||
}; | ||
|
||
use anyhow::{anyhow, Result}; | ||
use ipnetwork::{Ipv4Network, Ipv6Network}; | ||
use log::error; | ||
use tokio::sync::RwLock; | ||
use uuid::Uuid; | ||
use xenstore::{XsdClient, XsdInterface}; | ||
|
||
#[derive(Default, Clone)] | ||
pub struct IpVendorState { | ||
pub ipv4: HashMap<Ipv4Addr, Uuid>, | ||
pub ipv6: HashMap<Ipv6Addr, Uuid>, | ||
pub pending_ipv4: HashMap<Ipv4Addr, Uuid>, | ||
pub pending_ipv6: HashMap<Ipv6Addr, Uuid>, | ||
} | ||
|
||
#[derive(Clone)] | ||
pub struct IpVendor { | ||
store: XsdClient, | ||
host_uuid: Uuid, | ||
ipv4_network: Ipv4Network, | ||
ipv6_network: Ipv6Network, | ||
gateway_ipv4: Ipv4Addr, | ||
gateway_ipv6: Ipv6Addr, | ||
state: Arc<RwLock<IpVendorState>>, | ||
} | ||
|
||
pub struct IpAssignment { | ||
vendor: IpVendor, | ||
pub uuid: Uuid, | ||
pub ipv4: Ipv4Addr, | ||
pub ipv6: Ipv6Addr, | ||
pub ipv4_prefix: u8, | ||
pub ipv6_prefix: u8, | ||
pub gateway_ipv4: Ipv4Addr, | ||
pub gateway_ipv6: Ipv6Addr, | ||
pub committed: bool, | ||
} | ||
|
||
impl IpAssignment { | ||
pub async fn commit(&mut self) -> Result<()> { | ||
self.vendor.commit(self).await?; | ||
self.committed = true; | ||
Ok(()) | ||
} | ||
} | ||
|
||
impl Drop for IpAssignment { | ||
fn drop(&mut self) { | ||
if !self.committed { | ||
let ipv4 = self.ipv4; | ||
let ipv6 = self.ipv6; | ||
let uuid = self.uuid; | ||
let vendor = self.vendor.clone(); | ||
tokio::task::spawn(async move { | ||
let _ = vendor.recall_raw(ipv4, ipv6, uuid, true).await; | ||
}); | ||
} | ||
} | ||
} | ||
|
||
impl IpVendor { | ||
pub async fn new( | ||
store: XsdClient, | ||
host_uuid: Uuid, | ||
ipv4_network: Ipv4Network, | ||
ipv6_network: Ipv6Network, | ||
) -> Result<Self> { | ||
let mut state = IpVendor::fetch_stored_state(&store).await?; | ||
let (gateway_ipv4, gateway_ipv6) = | ||
IpVendor::allocate_ipset(&mut state, host_uuid, ipv4_network, ipv6_network)?; | ||
let vend = IpVendor { | ||
store, | ||
host_uuid, | ||
ipv4_network, | ||
ipv6_network, | ||
gateway_ipv4, | ||
gateway_ipv6, | ||
state: Arc::new(RwLock::new(state)), | ||
}; | ||
Ok(vend) | ||
} | ||
|
||
async fn fetch_stored_state(store: &XsdClient) -> Result<IpVendorState> { | ||
let mut state = IpVendorState::default(); | ||
for domid_candidate in store.list("/local/domain").await? { | ||
let dom_path = format!("/local/domain/{}", domid_candidate); | ||
let Some(uuid) = store | ||
.read_string(format!("{}/krata/uuid", dom_path)) | ||
.await? | ||
.and_then(|x| Uuid::from_str(&x).ok()) | ||
else { | ||
continue; | ||
}; | ||
let assigned_ipv4 = store | ||
.read_string(format!("{}/krata/network/guest/ipv4", dom_path)) | ||
.await? | ||
.and_then(|x| Ipv4Network::from_str(&x).ok()); | ||
let assigned_ipv6 = store | ||
.read_string(format!("{}/krata/network/guest/ipv6", dom_path)) | ||
.await? | ||
.and_then(|x| Ipv6Network::from_str(&x).ok()); | ||
|
||
if let Some(existing_ipv4) = assigned_ipv4 { | ||
if let Some(previous) = state.ipv4.insert(existing_ipv4.ip(), uuid) { | ||
error!("ipv4 conflict detected: guest {} owned {} but {} also claimed to own it, giving it to {}", previous, existing_ipv4.ip(), uuid, uuid); | ||
} | ||
} | ||
|
||
if let Some(existing_ipv6) = assigned_ipv6 { | ||
if let Some(previous) = state.ipv6.insert(existing_ipv6.ip(), uuid) { | ||
error!("ipv6 conflict detected: guest {} owned {} but {} also claimed to own it, giving it to {}", previous, existing_ipv6.ip(), uuid, uuid); | ||
} | ||
} | ||
} | ||
Ok(state) | ||
} | ||
|
||
fn allocate_ipset( | ||
state: &mut IpVendorState, | ||
uuid: Uuid, | ||
ipv4_network: Ipv4Network, | ||
ipv6_network: Ipv6Network, | ||
) -> Result<(Ipv4Addr, Ipv6Addr)> { | ||
let mut found_ipv4: Option<Ipv4Addr> = None; | ||
for ip in ipv4_network.iter() { | ||
if ip.is_loopback() || ip.is_multicast() || ip.is_broadcast() { | ||
continue; | ||
} | ||
|
||
if !ip.is_private() { | ||
continue; | ||
} | ||
|
||
let last = ip.octets()[3]; | ||
if last == 0 || last > 250 { | ||
continue; | ||
} | ||
|
||
if state.ipv4.contains_key(&ip) { | ||
continue; | ||
} | ||
found_ipv4 = Some(ip); | ||
break; | ||
} | ||
|
||
let mut found_ipv6: Option<Ipv6Addr> = None; | ||
for ip in ipv6_network.iter() { | ||
if ip.is_loopback() || ip.is_multicast() { | ||
continue; | ||
} | ||
|
||
if state.ipv6.contains_key(&ip) { | ||
continue; | ||
} | ||
found_ipv6 = Some(ip); | ||
break; | ||
} | ||
|
||
let Some(ipv4) = found_ipv4 else { | ||
return Err(anyhow!( | ||
"unable to allocate ipv4 address, assigned network is exhausted" | ||
)); | ||
}; | ||
|
||
let Some(ipv6) = found_ipv6 else { | ||
return Err(anyhow!( | ||
"unable to allocate ipv6 address, assigned network is exhausted" | ||
)); | ||
}; | ||
|
||
state.ipv4.insert(ipv4, uuid); | ||
state.ipv6.insert(ipv6, uuid); | ||
|
||
Ok((ipv4, ipv6)) | ||
} | ||
|
||
pub async fn assign(&self, uuid: Uuid) -> Result<IpAssignment> { | ||
let mut state = self.state.write().await; | ||
let (ipv4, ipv6) = | ||
IpVendor::allocate_ipset(&mut state, uuid, self.ipv4_network, self.ipv6_network)?; | ||
state.pending_ipv4.insert(ipv4, uuid); | ||
state.pending_ipv6.insert(ipv6, uuid); | ||
Ok(IpAssignment { | ||
vendor: self.clone(), | ||
uuid, | ||
ipv4, | ||
ipv6, | ||
ipv4_prefix: self.ipv4_network.prefix(), | ||
ipv6_prefix: self.ipv6_network.prefix(), | ||
gateway_ipv4: self.gateway_ipv4, | ||
gateway_ipv6: self.gateway_ipv6, | ||
committed: false, | ||
}) | ||
} | ||
|
||
pub async fn commit(&self, assignment: &IpAssignment) -> Result<()> { | ||
let mut state = self.state.write().await; | ||
if state.pending_ipv4.remove(&assignment.ipv4) != Some(assignment.uuid) { | ||
return Err(anyhow!("matching pending ipv4 assignment was not found")); | ||
} | ||
if state.pending_ipv6.remove(&assignment.ipv6) != Some(assignment.uuid) { | ||
return Err(anyhow!("matching pending ipv6 assignment was not found")); | ||
} | ||
Ok(()) | ||
} | ||
|
||
async fn recall_raw( | ||
&self, | ||
ipv4: Ipv4Addr, | ||
ipv6: Ipv6Addr, | ||
uuid: Uuid, | ||
pending: bool, | ||
) -> Result<()> { | ||
let mut state = self.state.write().await; | ||
if pending { | ||
if state.pending_ipv4.remove(&ipv4) != Some(uuid) { | ||
return Err(anyhow!("matching pending ipv4 assignment was not found")); | ||
} | ||
if state.pending_ipv6.remove(&ipv6) != Some(uuid) { | ||
return Err(anyhow!("matching pending ipv6 assignment was not found")); | ||
} | ||
} | ||
|
||
if state.ipv4.remove(&ipv4) != Some(uuid) { | ||
return Err(anyhow!("matching allocated ipv4 assignment was not found")); | ||
} | ||
|
||
if state.ipv6.remove(&ipv6) != Some(uuid) { | ||
return Err(anyhow!("matching allocated ipv6 assignment was not found")); | ||
} | ||
Ok(()) | ||
} | ||
|
||
pub async fn recall(&self, assignment: &IpAssignment) -> Result<()> { | ||
self.recall_raw(assignment.ipv4, assignment.ipv6, assignment.uuid, false) | ||
.await?; | ||
Ok(()) | ||
} | ||
|
||
pub async fn reload(&self) -> Result<()> { | ||
let mut state = self.state.write().await; | ||
let mut intermediate = IpVendor::fetch_stored_state(&self.store).await?; | ||
intermediate.ipv4.insert(self.gateway_ipv4, self.host_uuid); | ||
intermediate.ipv6.insert(self.gateway_ipv6, self.host_uuid); | ||
for (ipv4, uuid) in &state.pending_ipv4 { | ||
if let Some(previous) = intermediate.ipv4.insert(*ipv4, *uuid) { | ||
error!("ipv4 conflict detected: guest {} owned (pending) {} but {} also claimed to own it, giving it to {}", previous, ipv4, uuid, uuid); | ||
} | ||
intermediate.pending_ipv4.insert(*ipv4, *uuid); | ||
} | ||
for (ipv6, uuid) in &state.pending_ipv6 { | ||
if let Some(previous) = intermediate.ipv6.insert(*ipv6, *uuid) { | ||
error!("ipv6 conflict detected: guest {} owned (pending) {} but {} also claimed to own it, giving it to {}", previous, ipv6, uuid, uuid); | ||
} | ||
intermediate.pending_ipv6.insert(*ipv6, *uuid); | ||
} | ||
*state = intermediate; | ||
Ok(()) | ||
} | ||
|
||
pub async fn read_domain_assignment( | ||
&self, | ||
uuid: Uuid, | ||
domid: u32, | ||
) -> Result<Option<IpAssignment>> { | ||
let dom_path = format!("/local/domain/{}", domid); | ||
let Some(guest_ipv4) = self | ||
.store | ||
.read_string(format!("{}/krata/network/guest/ipv4", dom_path)) | ||
.await? | ||
else { | ||
return Ok(None); | ||
}; | ||
let Some(guest_ipv6) = self | ||
.store | ||
.read_string(format!("{}/krata/network/guest/ipv6", dom_path)) | ||
.await? | ||
else { | ||
return Ok(None); | ||
}; | ||
let Some(gateway_ipv4) = self | ||
.store | ||
.read_string(format!("{}/krata/network/gateway/ipv4", dom_path)) | ||
.await? | ||
else { | ||
return Ok(None); | ||
}; | ||
let Some(gateway_ipv6) = self | ||
.store | ||
.read_string(format!("{}/krata/network/gateway/ipv6", dom_path)) | ||
.await? | ||
else { | ||
return Ok(None); | ||
}; | ||
|
||
let Some(guest_ipv4) = Ipv4Network::from_str(&guest_ipv4).ok() else { | ||
return Ok(None); | ||
}; | ||
let Some(guest_ipv6) = Ipv6Network::from_str(&guest_ipv6).ok() else { | ||
return Ok(None); | ||
}; | ||
let Some(gateway_ipv4) = Ipv4Network::from_str(&gateway_ipv4).ok() else { | ||
return Ok(None); | ||
}; | ||
let Some(gateway_ipv6) = Ipv6Network::from_str(&gateway_ipv6).ok() else { | ||
return Ok(None); | ||
}; | ||
Ok(Some(IpAssignment { | ||
vendor: self.clone(), | ||
uuid, | ||
ipv4: guest_ipv4.ip(), | ||
ipv4_prefix: guest_ipv4.prefix(), | ||
ipv6: guest_ipv6.ip(), | ||
ipv6_prefix: guest_ipv6.prefix(), | ||
gateway_ipv4: gateway_ipv4.ip(), | ||
gateway_ipv6: gateway_ipv6.ip(), | ||
committed: true, | ||
})) | ||
} | ||
|
||
pub async fn read(&self) -> Result<IpVendorState> { | ||
Ok(self.state.read().await.clone()) | ||
} | ||
} |
Oops, something went wrong.