Skip to content

Commit

Permalink
Apply suggestions
Browse files Browse the repository at this point in the history
Signed-off-by: Nicolas Belouin <[email protected]>
  • Loading branch information
diconico07 committed Apr 26, 2024
1 parent d109b36 commit 39d346b
Show file tree
Hide file tree
Showing 12 changed files with 72 additions and 357 deletions.
5 changes: 0 additions & 5 deletions agent/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,4 @@ fn main() {
&["./proto"],
)
.expect("failed to compile protos");
tonic_build::configure()
.build_client(false)
.out_dir("./src/plugin_manager")
.compile(&["./proto/pluginregistration.proto"], &["./proto"])
.expect("failed to compile protos");
}
50 changes: 0 additions & 50 deletions agent/proto/pluginregistration.proto

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ async fn solve_value_from_secret(
mod tests {
use std::collections::BTreeMap;

use akri_shared::k8s::crud::MockApi;
use akri_shared::k8s::api::MockApi;
use k8s_openapi::ByteString;

use crate::discovery_handler_manager::mock::MockDiscoveryManagerKubeInterface;
Expand Down
4 changes: 2 additions & 2 deletions agent/src/discovery_handler_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ mod registration_socket;

use std::{collections::HashMap, sync::Arc};

use akri_shared::{akri::configuration::Configuration, k8s::crud::IntoApi};
use akri_shared::{akri::configuration::Configuration, k8s::api::IntoApi};
use k8s_openapi::api::core::v1::{ConfigMap, Secret};

use kube_runtime::reflector::ObjectRef;
Expand Down Expand Up @@ -58,7 +58,7 @@ impl<T: IntoApi<Secret> + IntoApi<ConfigMap>> DiscoveryManagerKubeInterface for
#[cfg(test)]
mod mock {

use akri_shared::k8s::crud::{Api, IntoApi, MockIntoApi};
use akri_shared::k8s::api::{Api, IntoApi, MockIntoApi};
use k8s_openapi::api::core::v1::{ConfigMap, Secret};
#[derive(Default)]
pub struct MockDiscoveryManagerKubeInterface {
Expand Down
6 changes: 3 additions & 3 deletions agent/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>
run_metrics_server().await.unwrap();
}));

let (device_notifier, discovery_handler_registry, conf_notifier) =
let (device_notifier, discovery_handler_registry, config_notifier) =
discovery_handler_manager::new_registry(kube_client.clone());

let dh_registry = Arc::new(discovery_handler_registry);
Expand Down Expand Up @@ -85,15 +85,15 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>
instances_cache,
dh_registry,
client: kube_client.clone(),
agent_instance_name: node_name.clone(),
agent_identifier: node_name.clone(),
error_backoffs: Mutex::new(HashMap::new()),
},
);

tasks.push(tokio::spawn(async {
util::discovery_configuration_controller::start_controller(
config_controller_context,
conf_notifier,
config_notifier,
)
.await;
}));
Expand Down
31 changes: 24 additions & 7 deletions agent/src/plugin_manager/device_plugin_instance_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::fmt::Display;
use std::str::FromStr;
use std::{collections::HashMap, sync::Arc, time::Duration};

use akri_shared::{akri::instance::Instance, k8s::crud::IntoApi};
use akri_shared::{akri::instance::Instance, k8s::api::IntoApi};
use async_trait::async_trait;
use futures::StreamExt;
use itertools::Itertools;
Expand Down Expand Up @@ -718,8 +718,11 @@ pub struct DevicePluginManager {
node_name: String,
kube_client: Arc<dyn IntoApi<Instance>>,
device_manager: Arc<dyn DeviceManager>,
error_backoffs: std::sync::Mutex<HashMap<String, Duration>>,
}

const SUCCESS_REQUEUE: Duration = Duration::from_secs(600);

impl DevicePluginManager {
pub fn new(
node_name: String,
Expand All @@ -732,6 +735,7 @@ impl DevicePluginManager {
node_name,
kube_client,
device_manager,
error_backoffs: std::sync::Mutex::new(HashMap::default()),
}
}

Expand Down Expand Up @@ -885,20 +889,33 @@ pub async fn reconcile(
.add_plugin(instance.name_any(), instance_plugin)
.await;
}
Ok(Action::requeue(Duration::from_secs(300)))
ctx.error_backoffs
.lock()
.unwrap()
.remove(&instance.name_any());
Ok(Action::requeue(SUCCESS_REQUEUE))
}

pub fn error_policy(
dc: Arc<Instance>,
error: &DevicePluginError,
_ctx: Arc<DevicePluginManager>,
ctx: Arc<DevicePluginManager>,
) -> Action {
error!(
"Error during reconciliation of Instance {}: {:?}",
let mut error_backoffs = ctx.error_backoffs.lock().unwrap();
let previous_duration = error_backoffs
.get(&dc.name_any())
.cloned()
.unwrap_or(Duration::from_millis(500));
let next_duration = previous_duration * 2;
warn!(
"Error during reconciliation of Instance {:?}::{}, retrying in {}s: {:?}",
dc.namespace(),
dc.name_any(),
next_duration.as_secs_f32(),
error
);
Action::requeue(Duration::from_secs(60))
error_backoffs.insert(dc.name_any(), next_duration);
Action::requeue(next_duration)
}

#[cfg(test)]
Expand All @@ -907,7 +924,7 @@ mod tests {

use akri_shared::{
akri::instance::InstanceSpec,
k8s::crud::{MockApi, MockIntoApi},
k8s::api::{MockApi, MockIntoApi},
};
use tokio_stream::StreamExt;

Expand Down
30 changes: 20 additions & 10 deletions agent/src/plugin_manager/device_plugin_slot_reclaimer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ use super::{

/// Path of the Kubelet registry socket
pub const KUBELET_SOCKET: &str = "/var/lib/kubelet/pod-resources/kubelet.sock";
const SLOT_GRACE_PERIOD: Duration = Duration::from_secs(20);
const SLOT_RECLAIM_INTERVAL: Duration = Duration::from_secs(10);

async fn get_used_slots() -> Result<HashSet<String>, anyhow::Error> {
// We will ignore this dummy uri because UDS does not use it.
Expand Down Expand Up @@ -73,25 +75,33 @@ pub async fn start_reclaimer(dp_manager: Arc<DevicePluginManager>) {
let theoretical_slots = dp_manager.get_used_slots().await;
trace!("theoretical slots: {:?}", theoretical_slots);
let mut new_stalled_slots: HashMap<String, Instant> = HashMap::new();
let now = Instant::now();
for slot in theoretical_slots.difference(&used_slots) {
if let Some(at) = stalled_slots.get(slot) {
if now.saturating_duration_since(*at) >= Duration::from_secs(20) {
trace!("freeing slot: {}", slot);
if dp_manager.free_slot(slot.to_string()).await.is_err() {
new_stalled_slots.insert(slot.to_string(), at.to_owned());
let reclaim_iteration_start = Instant::now();
for slot_to_reclaim in theoretical_slots.difference(&used_slots) {
// See if slot was already stalled at previous iteration
if let Some(at) = stalled_slots.get(slot_to_reclaim) {
if reclaim_iteration_start.saturating_duration_since(*at) >= SLOT_GRACE_PERIOD {
// Slot is stalled for more than grace period, free it
trace!("freeing slot: {}", slot_to_reclaim);
if dp_manager
.free_slot(slot_to_reclaim.to_string())
.await
.is_err()
{
new_stalled_slots.insert(slot_to_reclaim.to_string(), at.to_owned());
};
} else {
new_stalled_slots.insert(slot.to_string(), at.to_owned());
// Keep slot as stall
new_stalled_slots.insert(slot_to_reclaim.to_string(), at.to_owned());
}
} else {
new_stalled_slots.insert(slot.to_string(), now);
// Mark slot as stall
new_stalled_slots.insert(slot_to_reclaim.to_string(), reclaim_iteration_start);
}
}
stalled_slots = new_stalled_slots;
}
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(10)) => {},
_ = tokio::time::sleep(SLOT_RECLAIM_INTERVAL) => {},
_ = signal.recv() => return,
};
}
Expand Down
1 change: 0 additions & 1 deletion agent/src/plugin_manager/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
pub mod pluginregistration; // Pros generated pluginregistration module
pub mod v1; // Prost generated podresources module
pub mod v1beta1; // Prost generated pluginapi module

Expand Down
Loading

0 comments on commit 39d346b

Please sign in to comment.