diff --git a/Cargo.lock b/Cargo.lock index ba116e178..dade00d52 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -236,19 +236,19 @@ dependencies = [ "async-stream", "async-trait", "blake2", - "env_logger", + "env_logger 0.10.2", "futures", - "hyper", + "hyper 0.14.30", "itertools", - "k8s-openapi 0.20.0", - "kube 0.87.2", + "k8s-openapi 0.22.0", + "kube 0.91.0", "kube-runtime", "lazy_static", "log", "mock_instant", "mockall", "mockall_double", - "prometheus", + "prometheus 0.12.0", "prost", "serde", "serde_derive", @@ -336,9 +336,9 @@ dependencies = [ "base64 0.13.1", "bytes", "chrono", - "env_logger", + "env_logger 0.10.2", "futures-util", - "hyper", + "hyper 0.14.30", "log", "mockall", "serde", @@ -381,12 +381,12 @@ dependencies = [ "anyhow", "async-trait", "either", - "env_logger", - "k8s-openapi 0.20.0", - "kube 0.87.2", + "env_logger 0.10.2", + "k8s-openapi 0.22.0", + "kube 0.91.0", "log", "mockall", - "prometheus", + "prometheus 0.12.0", "rand", "schemars", "serde", @@ -406,7 +406,7 @@ dependencies = [ "akri-discovery-utils", "anyhow", "async-trait", - "env_logger", + "env_logger 0.10.2", "log", "mockall", "pest", @@ -512,6 +512,18 @@ version = "1.0.89" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "86fdf8605db99b54d3cd748a44c6d04df638eb5dafb219b135d0149bd0db01f6" +[[package]] +name = "async-broadcast" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "20cd0e2e25ea8e5f7e9df04578dc6cf5c83577fd09b1a46aaf5c85e1c33f2a7e" +dependencies = [ + "event-listener 5.3.1", + "event-listener-strategy", + "futures-core", + "pin-project-lite", +] + [[package]] name = "async-channel" version = "1.9.0" @@ -682,8 +694,8 @@ dependencies = [ "bytes", "futures-util", "http 0.2.12", - "http-body", - "hyper", + "http-body 0.4.6", + "hyper 0.14.30", "itoa", "matchit", "memchr", @@ -708,7 +720,7 @@ dependencies = [ "bytes", "futures-util", "http 0.2.12", - "http-body", + "http-body 0.4.6", "mime", "rustversion", "tower-layer", @@ -940,16 +952,17 @@ dependencies = [ "anyhow", "async-std", "chrono", - "env_logger", + "either", + "env_logger 0.11.5", "futures", - "k8s-openapi 0.20.0", - "kube 0.87.2", - "kube-runtime", + "k8s-openapi 0.22.0", + "kube 0.91.0", "lazy_static", "log", "mockall", - "prometheus", + "prometheus 0.13.4", "serde_json", + "thiserror", "tokio", ] @@ -1112,7 +1125,7 @@ version = "0.13.3" dependencies = [ "akri-debug-echo", "akri-discovery-utils", - "env_logger", + "env_logger 0.10.2", "log", "tokio", ] @@ -1217,6 +1230,16 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "env_filter" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4f2c92ceda6ceec50f43169f9ee8424fe2db276791afde7b2cd8bc084cb376ab" +dependencies = [ + "log", + "regex", +] + [[package]] name = "env_logger" version = "0.10.2" @@ -1230,6 +1253,19 @@ dependencies = [ "termcolor", ] +[[package]] +name = "env_logger" +version = "0.11.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e13fa619b91fb2381732789fc5de83b45675e882f66623b7d8cb4f643017018d" +dependencies = [ + "anstream", + "anstyle", + "env_filter", + "humantime", + "log", +] + [[package]] name = "equivalent" version = "1.0.1" @@ -1621,6 +1657,29 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "http-body" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184" +dependencies = [ + "bytes", + "http 1.1.0", +] + +[[package]] +name = "http-body-util" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "793429d76616a256bcb62c2a2ec2bed781c8307e797e2598c50010f2bee2544f" +dependencies = [ + "bytes", + "futures-util", + "http 1.1.0", + "http-body 1.0.1", + "pin-project-lite", +] + [[package]] name = "http-range-header" version = "0.3.1" @@ -1657,7 +1716,7 @@ dependencies = [ "futures-util", "h2", "http 0.2.12", - "http-body", + "http-body 0.4.6", "httparse", "httpdate", "itoa", @@ -1669,6 +1728,25 @@ dependencies = [ "want", ] +[[package]] +name = "hyper" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50dfd22e0e76d0f662d429a5f80fcaf3855009297eab6a0a9f8543834744ba05" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "http 1.1.0", + "http-body 1.0.1", + "httparse", + "itoa", + "pin-project-lite", + "smallvec", + "tokio", + "want", +] + [[package]] name = "hyper-openssl" version = "0.9.2" @@ -1676,7 +1754,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6ee5d7a8f718585d1c3c61dfde28ef5b0bb14734b4db13f5ada856cdc6c612b" dependencies = [ "http 0.2.12", - "hyper", + "hyper 0.14.30", "linked_hash_set", "once_cell", "openssl", @@ -1689,18 +1767,21 @@ dependencies = [ [[package]] name = "hyper-rustls" -version = "0.24.2" +version = "0.27.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590" +checksum = "08afdbb5c31130e3034af566421053ab03787c640246a446327f550d11bcb333" dependencies = [ "futures-util", - "http 0.2.12", - "hyper", + "http 1.1.0", + "hyper 1.4.1", + "hyper-util", "log", - "rustls", + "rustls 0.23.14", "rustls-native-certs", + "rustls-pki-types", "tokio", - "tokio-rustls", + "tokio-rustls 0.26.0", + "tower-service", ] [[package]] @@ -1709,12 +1790,44 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" dependencies = [ - "hyper", + "hyper 0.14.30", "pin-project-lite", "tokio", "tokio-io-timeout", ] +[[package]] +name = "hyper-timeout" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3203a961e5c83b6f5498933e78b6b263e208c197b63e9c6c53cc82ffd3f63793" +dependencies = [ + "hyper 1.4.1", + "hyper-util", + "pin-project-lite", + "tokio", + "tower-service", +] + +[[package]] +name = "hyper-util" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41296eb09f183ac68eec06e03cdbea2e759633d4067b2f6552fc2e009bcad08b" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "http 1.1.0", + "http-body 1.0.1", + "hyper 1.4.1", + "pin-project-lite", + "socket2", + "tokio", + "tower-service", + "tracing", +] + [[package]] name = "iana-time-zone" version = "0.1.61" @@ -1869,10 +1982,12 @@ dependencies = [ [[package]] name = "jsonpath-rust" -version = "0.3.5" +version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "06cc127b7c3d270be504572364f9569761a180b981919dd0d87693a7f5fb7829" +checksum = "19d8fe85bd70ff715f31ce8c739194b423d79811a19602115d611a3ec85d6200" dependencies = [ + "lazy_static", + "once_cell", "pest", "pest_derive", "regex", @@ -1908,12 +2023,11 @@ dependencies = [ [[package]] name = "k8s-openapi" -version = "0.20.0" +version = "0.22.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "edc3606fd16aca7989db2f84bb25684d0270c6d6fa1dbcd0025af7b4130523a6" +checksum = "19501afb943ae5806548bc3ebd7f3374153ca057a38f480ef30adfde5ef09755" dependencies = [ - "base64 0.21.7", - "bytes", + "base64 0.22.1", "chrono", "schemars", "serde", @@ -1935,14 +2049,15 @@ dependencies = [ [[package]] name = "kube" -version = "0.87.2" +version = "0.91.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3499c8d60c763246c7a213f51caac1e9033f46026904cb89bc8951ae8601f26e" +checksum = "264461a7ebf4fb0fcf23e4c7e4f9387c5696ee61d003de207d9b5a895ff37bfa" dependencies = [ - "k8s-openapi 0.20.0", - "kube-client 0.87.2", - "kube-core 0.87.2", - "kube-derive 0.87.2", + "k8s-openapi 0.22.0", + "kube-client 0.91.0", + "kube-core 0.91.0", + "kube-derive 0.91.0", + "kube-runtime", ] [[package]] @@ -1958,10 +2073,10 @@ dependencies = [ "either", "futures", "http 0.2.12", - "http-body", - "hyper", + "http-body 0.4.6", + "hyper 0.14.30", "hyper-openssl", - "hyper-timeout", + "hyper-timeout 0.4.1", "jsonpath_lib", "k8s-openapi 0.17.0", "kube-core 0.80.0", @@ -1982,28 +2097,29 @@ dependencies = [ [[package]] name = "kube-client" -version = "0.87.2" +version = "0.91.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "033450dfa0762130565890dadf2f8835faedf749376ca13345bcd8ecd6b5f29f" +checksum = "47164ad6c47398ee4bdf90509c7b44026229721cb1377eb4623a1ec2a00a85e9" dependencies = [ - "base64 0.21.7", + "base64 0.22.1", "bytes", "chrono", "either", "futures", "home", - "http 0.2.12", - "http-body", - "hyper", + "http 1.1.0", + "http-body 1.0.1", + "http-body-util", + "hyper 1.4.1", "hyper-rustls", - "hyper-timeout", + "hyper-timeout 0.5.1", + "hyper-util", "jsonpath-rust", - "k8s-openapi 0.20.0", - "kube-core 0.87.2", + "k8s-openapi 0.22.0", + "kube-core 0.91.0", "pem 3.0.4", - "pin-project", - "rustls", - "rustls-pemfile", + "rustls 0.23.14", + "rustls-pemfile 2.2.0", "secrecy", "serde", "serde_json", @@ -2012,7 +2128,7 @@ dependencies = [ "tokio", "tokio-util", "tower", - "tower-http 0.4.4", + "tower-http 0.5.2", "tracing", ] @@ -2035,16 +2151,15 @@ dependencies = [ [[package]] name = "kube-core" -version = "0.87.2" +version = "0.91.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b5bba93d054786eba7994d03ce522f368ef7d48c88a1826faa28478d85fb63ae" +checksum = "2797d3044a238825432129cd9537e12c2a6dacbbb5352381af5ea55e1505ed4f" dependencies = [ "chrono", "form_urlencoded", - "http 0.2.12", + "http 1.1.0", "json-patch", - "k8s-openapi 0.20.0", - "once_cell", + "k8s-openapi 0.22.0", "schemars", "serde", "serde_json", @@ -2066,9 +2181,9 @@ dependencies = [ [[package]] name = "kube-derive" -version = "0.87.2" +version = "0.91.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91e98dd5e5767c7b894c1f0e41fd628b145f808e981feb8b08ed66455d47f1a4" +checksum = "fcf837edaa0c478f85e9a3cddb17fa80d58a57c1afa722b3a9e55753ea162f41" dependencies = [ "darling 0.20.10", "proc-macro2", @@ -2079,19 +2194,21 @@ dependencies = [ [[package]] name = "kube-runtime" -version = "0.87.2" +version = "0.91.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2d8893eb18fbf6bb6c80ef6ee7dd11ec32b1dc3c034c988ac1b3a84d46a230ae" +checksum = "e463e89a1fb222c65a5469b568803153d1bf13d084a8dd42b659e6cca66edc6e" dependencies = [ "ahash", + "async-broadcast", + "async-stream", "async-trait", "backoff", "derivative", "futures", "hashbrown 0.14.5", "json-patch", - "k8s-openapi 0.20.0", - "kube-client 0.87.2", + "k8s-openapi 0.22.0", + "kube-client 0.91.0", "parking_lot 0.12.3", "pin-project", "serde", @@ -2374,7 +2491,7 @@ version = "0.13.3" dependencies = [ "akri-discovery-utils", "akri-onvif", - "env_logger", + "env_logger 0.10.2", "log", "tokio", ] @@ -2423,7 +2540,7 @@ version = "0.13.3" dependencies = [ "akri-discovery-utils", "akri-opcua", - "env_logger", + "env_logger 0.10.2", "log", "tokio", ] @@ -2789,6 +2906,29 @@ dependencies = [ "libc", ] +[[package]] +name = "procfs" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "731e0d9356b0c25f16f33b5be79b1c57b562f141ebfcdb0ad8ac2c13a24293b4" +dependencies = [ + "bitflags 2.6.0", + "hex", + "lazy_static", + "procfs-core", + "rustix", +] + +[[package]] +name = "procfs-core" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d3554923a69f4ce04c4a754260c338f505ce22642d3830e049a399fc2059a29" +dependencies = [ + "bitflags 2.6.0", + "hex", +] + [[package]] name = "prometheus" version = "0.12.0" @@ -2801,7 +2941,24 @@ dependencies = [ "libc", "memchr", "parking_lot 0.11.2", - "procfs", + "procfs 0.9.1", + "protobuf", + "thiserror", +] + +[[package]] +name = "prometheus" +version = "0.13.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3d33c28a30771f7f96db69893f78b857f7450d7e0237e9c8fc6427a81bae7ed1" +dependencies = [ + "cfg-if", + "fnv", + "lazy_static", + "libc", + "memchr", + "parking_lot 0.12.3", + "procfs 0.16.0", "protobuf", "thiserror", ] @@ -2987,8 +3144,8 @@ dependencies = [ "futures-util", "h2", "http 0.2.12", - "http-body", - "hyper", + "http-body 0.4.6", + "hyper 0.14.30", "ipnet", "js-sys", "log", @@ -3071,18 +3228,34 @@ checksum = "3f56a14d1f48b391359b22f731fd4bd7e43c97f3c50eee276f3aa09c94784d3e" dependencies = [ "log", "ring", - "rustls-webpki", + "rustls-webpki 0.101.7", "sct", ] +[[package]] +name = "rustls" +version = "0.23.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "415d9944693cb90382053259f89fbb077ea730ad7273047ec63b19bc9b160ba8" +dependencies = [ + "log", + "once_cell", + "ring", + "rustls-pki-types", + "rustls-webpki 0.102.8", + "subtle", + "zeroize", +] + [[package]] name = "rustls-native-certs" -version = "0.6.3" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a9aace74cb666635c918e9c12bc0d348266037aa8eb599b5cba565709a8dff00" +checksum = "fcaf18a4f2be7326cd874a5fa579fae794320a0f388d365dca7e480e55f83f8a" dependencies = [ "openssl-probe", - "rustls-pemfile", + "rustls-pemfile 2.2.0", + "rustls-pki-types", "schannel", "security-framework", ] @@ -3096,6 +3269,21 @@ dependencies = [ "base64 0.21.7", ] +[[package]] +name = "rustls-pemfile" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dce314e5fee3f39953d46bb63bb8a46d40c2f8fb7cc5a3b6cab2bde9721d6e50" +dependencies = [ + "rustls-pki-types", +] + +[[package]] +name = "rustls-pki-types" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e696e35370c65c9c541198af4543ccd580cf17fc25d8e05c5a242b202488c55" + [[package]] name = "rustls-webpki" version = "0.101.7" @@ -3106,6 +3294,17 @@ dependencies = [ "untrusted", ] +[[package]] +name = "rustls-webpki" +version = "0.102.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64ca1bc8749bd4cf37b5ce386cc146580777b4e8572c7b97baf22c83f444bee9" +dependencies = [ + "ring", + "rustls-pki-types", + "untrusted", +] + [[package]] name = "rustversion" version = "1.0.17" @@ -3630,7 +3829,18 @@ version = "0.24.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" dependencies = [ - "rustls", + "rustls 0.21.12", + "tokio", +] + +[[package]] +name = "tokio-rustls" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c7bc40d0e5a97695bb96e27995cd3a08538541b0a846f65bba7a359f36700d4" +dependencies = [ + "rustls 0.23.14", + "rustls-pki-types", "tokio", ] @@ -3685,16 +3895,16 @@ dependencies = [ "bytes", "h2", "http 0.2.12", - "http-body", - "hyper", - "hyper-timeout", + "http-body 0.4.6", + "hyper 0.14.30", + "hyper-timeout 0.4.1", "percent-encoding 2.3.1", "pin-project", "prost", - "rustls", - "rustls-pemfile", + "rustls 0.21.12", + "rustls-pemfile 1.0.4", "tokio", - "tokio-rustls", + "tokio-rustls 0.24.1", "tokio-stream", "tower", "tower-layer", @@ -3747,7 +3957,7 @@ dependencies = [ "futures-core", "futures-util", "http 0.2.12", - "http-body", + "http-body 0.4.6", "http-range-header", "pin-project-lite", "tower-layer", @@ -3757,18 +3967,16 @@ dependencies = [ [[package]] name = "tower-http" -version = "0.4.4" +version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61c5bb1d698276a2443e5ecfabc1008bf15a36c12e6a7176e7bf089ea9131140" +checksum = "1e9cd434a998747dd2c4276bc96ee2e0c7a2eadf3cae88e52be55a05fa9053f5" dependencies = [ "base64 0.21.7", "bitflags 2.6.0", "bytes", - "futures-core", - "futures-util", - "http 0.2.12", - "http-body", - "http-range-header", + "http 1.1.0", + "http-body 1.0.1", + "http-body-util", "mime", "pin-project-lite", "tower-layer", @@ -3879,7 +4087,7 @@ version = "0.13.3" dependencies = [ "akri-discovery-utils", "akri-udev", - "env_logger", + "env_logger 0.10.2", "log", "tokio", ] @@ -3889,10 +4097,10 @@ name = "udev-video-broker" version = "0.13.3" dependencies = [ "akri-shared", - "env_logger", + "env_logger 0.10.2", "lazy_static", "log", - "prometheus", + "prometheus 0.12.0", "prost", "regex", "rscam", @@ -4039,7 +4247,7 @@ dependencies = [ "futures-util", "headers", "http 0.2.12", - "hyper", + "hyper 0.14.30", "log", "mime", "mime_guess", diff --git a/agent/Cargo.toml b/agent/Cargo.toml index 90d22ad28..6ea6c1e51 100644 --- a/agent/Cargo.toml +++ b/agent/Cargo.toml @@ -25,9 +25,9 @@ env_logger = "0.10.0" futures = { version = "0.3.1", package = "futures" } hyper = "0.14.2" itertools = "0.12.0" -k8s-openapi = { version = "0.20.0", default-features = false, features = ["schemars", "v1_23"] } -kube = { version = "0.87.1", features = ["derive"] } -kube-runtime = { version = "0.87.1", features = ["unstable-runtime-reconcile-on"] } +k8s-openapi = { version = "0.22.0", default-features = false, features = ["schemars", "v1_25"] } +kube = { version = "0.91.0", features = [ "derive", "runtime"] } +kube-runtime = { version = "0.91.0", features = ["unstable-runtime-reconcile-on"] } lazy_static = "1.4" log = "0.4" mockall_double = "0.3.1" diff --git a/agent/src/discovery_handler_manager/discovery_handler_registry.rs b/agent/src/discovery_handler_manager/discovery_handler_registry.rs index 27c34466e..cc4e9706a 100644 --- a/agent/src/discovery_handler_manager/discovery_handler_registry.rs +++ b/agent/src/discovery_handler_manager/discovery_handler_registry.rs @@ -34,7 +34,7 @@ use futures::future::try_join_all; use futures::FutureExt; use itertools::Itertools; use kube::core::ObjectMeta; -use kube_runtime::reflector::ObjectRef; +use kube::runtime::reflector::ObjectRef; use tokio::select; use tokio::sync::mpsc; use tokio::sync::watch; diff --git a/agent/src/discovery_handler_manager/mod.rs b/agent/src/discovery_handler_manager/mod.rs index 6a4c1434d..e4317774e 100644 --- a/agent/src/discovery_handler_manager/mod.rs +++ b/agent/src/discovery_handler_manager/mod.rs @@ -9,7 +9,7 @@ use std::{collections::HashMap, sync::Arc}; use akri_shared::{akri::configuration::Configuration, k8s::api::IntoApi}; use k8s_openapi::api::core::v1::{ConfigMap, Secret}; -use kube_runtime::reflector::ObjectRef; +use kube::runtime::reflector::ObjectRef; use thiserror::Error; use tokio::sync::{mpsc, watch}; diff --git a/agent/src/plugin_manager/v1.rs b/agent/src/plugin_manager/v1.rs index 7f3183f3f..f016fb20b 100644 --- a/agent/src/plugin_manager/v1.rs +++ b/agent/src/plugin_manager/v1.rs @@ -87,8 +87,8 @@ pub struct NumaNode { /// Generated client implementations. pub mod pod_resources_lister_client { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] - use tonic::codegen::http::Uri; use tonic::codegen::*; + use tonic::codegen::http::Uri; /// PodResourcesLister is a service provided by the kubelet that provides information about the /// node resources consumed by pods and containers on the node #[derive(Debug, Clone)] @@ -134,8 +134,9 @@ pub mod pod_resources_lister_client { >::ResponseBody, >, >, - >>::Error: - Into + Send + Sync, + , + >>::Error: Into + Send + Sync, { PodResourcesListerClient::new(InterceptedService::new(inner, interceptor)) } @@ -173,16 +174,23 @@ pub mod pod_resources_lister_client { pub async fn list( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> - { - self.inner.ready().await.map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/v1.PodResourcesLister/List"); + let path = http::uri::PathAndQuery::from_static( + "/v1.PodResourcesLister/List", + ); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("v1.PodResourcesLister", "List")); @@ -191,23 +199,28 @@ pub mod pod_resources_lister_client { pub async fn get_allocatable_resources( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> - { - self.inner.ready().await.map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); let path = http::uri::PathAndQuery::from_static( "/v1.PodResourcesLister/GetAllocatableResources", ); let mut req = request.into_request(); - req.extensions_mut().insert(GrpcMethod::new( - "v1.PodResourcesLister", - "GetAllocatableResources", - )); + req.extensions_mut() + .insert( + GrpcMethod::new("v1.PodResourcesLister", "GetAllocatableResources"), + ); self.inner.unary(req, path, codec).await } } @@ -222,11 +235,17 @@ pub mod pod_resources_lister_server { async fn list( &self, request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; async fn get_allocatable_resources( &self, request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; } /// PodResourcesLister is a service provided by the kubelet that provides information about the /// node resources consumed by pods and containers on the node @@ -253,7 +272,10 @@ pub mod pod_resources_lister_server { max_encoding_message_size: None, } } - pub fn with_interceptor(inner: T, interceptor: F) -> InterceptedService + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> InterceptedService where F: tonic::service::Interceptor, { @@ -309,11 +331,15 @@ pub mod pod_resources_lister_server { "/v1.PodResourcesLister/List" => { #[allow(non_camel_case_types)] struct ListSvc(pub Arc); - impl - tonic::server::UnaryService for ListSvc - { + impl< + T: PodResourcesLister, + > tonic::server::UnaryService + for ListSvc { type Response = super::ListPodResourcesResponse; - type Future = BoxFuture, tonic::Status>; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; fn call( &mut self, request: tonic::Request, @@ -351,12 +377,15 @@ pub mod pod_resources_lister_server { "/v1.PodResourcesLister/GetAllocatableResources" => { #[allow(non_camel_case_types)] struct GetAllocatableResourcesSvc(pub Arc); - impl - tonic::server::UnaryService - for GetAllocatableResourcesSvc - { + impl< + T: PodResourcesLister, + > tonic::server::UnaryService + for GetAllocatableResourcesSvc { type Response = super::AllocatableResourcesResponse; - type Future = BoxFuture, tonic::Status>; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; fn call( &mut self, request: tonic::Request, @@ -364,9 +393,10 @@ pub mod pod_resources_lister_server { let inner = Arc::clone(&self.0); let fut = async move { ::get_allocatable_resources( - &inner, request, - ) - .await + &inner, + request, + ) + .await }; Box::pin(fut) } @@ -394,14 +424,18 @@ pub mod pod_resources_lister_server { }; Box::pin(fut) } - _ => Box::pin(async move { - Ok(http::Response::builder() - .status(200) - .header("grpc-status", "12") - .header("content-type", "application/grpc") - .body(empty_body()) - .unwrap()) - }), + _ => { + Box::pin(async move { + Ok( + http::Response::builder() + .status(200) + .header("grpc-status", "12") + .header("content-type", "application/grpc") + .body(empty_body()) + .unwrap(), + ) + }) + } } } } @@ -427,7 +461,8 @@ pub mod pod_resources_lister_server { write!(f, "{:?}", self.0) } } - impl tonic::server::NamedService for PodResourcesListerServer { + impl tonic::server::NamedService + for PodResourcesListerServer { const NAME: &'static str = "v1.PodResourcesLister"; } } diff --git a/agent/src/plugin_manager/v1beta1.rs b/agent/src/plugin_manager/v1beta1.rs index a88570f4d..7b068ecab 100644 --- a/agent/src/plugin_manager/v1beta1.rs +++ b/agent/src/plugin_manager/v1beta1.rs @@ -96,7 +96,9 @@ pub struct PreStartContainerResponse {} #[derive(Clone, PartialEq, ::prost::Message)] pub struct PreferredAllocationRequest { #[prost(message, repeated, tag = "1")] - pub container_requests: ::prost::alloc::vec::Vec, + pub container_requests: ::prost::alloc::vec::Vec< + ContainerPreferredAllocationRequest, + >, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -106,7 +108,9 @@ pub struct ContainerPreferredAllocationRequest { pub available_device_i_ds: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, /// List of deviceIDs that must be included in the preferred allocation #[prost(string, repeated, tag = "2")] - pub must_include_device_i_ds: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, + pub must_include_device_i_ds: ::prost::alloc::vec::Vec< + ::prost::alloc::string::String, + >, /// Number of devices to include in the preferred allocation #[prost(int32, tag = "3")] pub allocation_size: i32, @@ -117,7 +121,9 @@ pub struct ContainerPreferredAllocationRequest { #[derive(Clone, PartialEq, ::prost::Message)] pub struct PreferredAllocationResponse { #[prost(message, repeated, tag = "1")] - pub container_responses: ::prost::alloc::vec::Vec, + pub container_responses: ::prost::alloc::vec::Vec< + ContainerPreferredAllocationResponse, + >, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -162,8 +168,10 @@ pub struct AllocateResponse { pub struct ContainerAllocateResponse { /// List of environment variable to be set in the container to access one of more devices. #[prost(map = "string, string", tag = "1")] - pub envs: - ::std::collections::HashMap<::prost::alloc::string::String, ::prost::alloc::string::String>, + pub envs: ::std::collections::HashMap< + ::prost::alloc::string::String, + ::prost::alloc::string::String, + >, /// Mounts for the container. #[prost(message, repeated, tag = "2")] pub mounts: ::prost::alloc::vec::Vec, @@ -172,8 +180,10 @@ pub struct ContainerAllocateResponse { pub devices: ::prost::alloc::vec::Vec, /// Container annotations to pass to the container runtime #[prost(map = "string, string", tag = "4")] - pub annotations: - ::std::collections::HashMap<::prost::alloc::string::String, ::prost::alloc::string::String>, + pub annotations: ::std::collections::HashMap< + ::prost::alloc::string::String, + ::prost::alloc::string::String, + >, } /// Mount specifies a host volume to mount into a container. /// where device library or tools are installed on host and container @@ -210,8 +220,8 @@ pub struct DeviceSpec { /// Generated client implementations. pub mod registration_client { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] - use tonic::codegen::http::Uri; use tonic::codegen::*; + use tonic::codegen::http::Uri; /// Registration is the service advertised by the Kubelet /// Only when Kubelet answers with a success code to a Register Request /// may Device Plugins start their service @@ -261,8 +271,9 @@ pub mod registration_client { >::ResponseBody, >, >, - >>::Error: - Into + Send + Sync, + , + >>::Error: Into + Send + Sync, { RegistrationClient::new(InterceptedService::new(inner, interceptor)) } @@ -301,14 +312,19 @@ pub mod registration_client { &mut self, request: impl tonic::IntoRequest, ) -> std::result::Result, tonic::Status> { - self.inner.ready().await.map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/v1beta1.Registration/Register"); + let path = http::uri::PathAndQuery::from_static( + "/v1beta1.Registration/Register", + ); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("v1beta1.Registration", "Register")); @@ -319,8 +335,8 @@ pub mod registration_client { /// Generated client implementations. pub mod device_plugin_client { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] - use tonic::codegen::http::Uri; use tonic::codegen::*; + use tonic::codegen::http::Uri; /// DevicePlugin is the service advertised by Device Plugins #[derive(Debug, Clone)] pub struct DevicePluginClient { @@ -365,8 +381,9 @@ pub mod device_plugin_client { >::ResponseBody, >, >, - >>::Error: - Into + Send + Sync, + , + >>::Error: Into + Send + Sync, { DevicePluginClient::new(InterceptedService::new(inner, interceptor)) } @@ -406,23 +423,28 @@ pub mod device_plugin_client { pub async fn get_device_plugin_options( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> - { - self.inner.ready().await.map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); let path = http::uri::PathAndQuery::from_static( "/v1beta1.DevicePlugin/GetDevicePluginOptions", ); let mut req = request.into_request(); - req.extensions_mut().insert(GrpcMethod::new( - "v1beta1.DevicePlugin", - "GetDevicePluginOptions", - )); + req.extensions_mut() + .insert( + GrpcMethod::new("v1beta1.DevicePlugin", "GetDevicePluginOptions"), + ); self.inner.unary(req, path, codec).await } /// ListAndWatch returns a stream of List of Devices @@ -435,14 +457,19 @@ pub mod device_plugin_client { tonic::Response>, tonic::Status, > { - self.inner.ready().await.map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/v1beta1.DevicePlugin/ListAndWatch"); + let path = http::uri::PathAndQuery::from_static( + "/v1beta1.DevicePlugin/ListAndWatch", + ); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("v1beta1.DevicePlugin", "ListAndWatch")); @@ -456,23 +483,28 @@ pub mod device_plugin_client { pub async fn get_preferred_allocation( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> - { - self.inner.ready().await.map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); let path = http::uri::PathAndQuery::from_static( "/v1beta1.DevicePlugin/GetPreferredAllocation", ); let mut req = request.into_request(); - req.extensions_mut().insert(GrpcMethod::new( - "v1beta1.DevicePlugin", - "GetPreferredAllocation", - )); + req.extensions_mut() + .insert( + GrpcMethod::new("v1beta1.DevicePlugin", "GetPreferredAllocation"), + ); self.inner.unary(req, path, codec).await } /// Allocate is called during container creation so that the Device @@ -481,15 +513,23 @@ pub mod device_plugin_client { pub async fn allocate( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> { - self.inner.ready().await.map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static("/v1beta1.DevicePlugin/Allocate"); + let path = http::uri::PathAndQuery::from_static( + "/v1beta1.DevicePlugin/Allocate", + ); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("v1beta1.DevicePlugin", "Allocate")); @@ -501,17 +541,23 @@ pub mod device_plugin_client { pub async fn pre_start_container( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result, tonic::Status> - { - self.inner.ready().await.map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = - http::uri::PathAndQuery::from_static("/v1beta1.DevicePlugin/PreStartContainer"); + let path = http::uri::PathAndQuery::from_static( + "/v1beta1.DevicePlugin/PreStartContainer", + ); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("v1beta1.DevicePlugin", "PreStartContainer")); @@ -560,7 +606,10 @@ pub mod registration_server { max_encoding_message_size: None, } } - pub fn with_interceptor(inner: T, interceptor: F) -> InterceptedService + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> InterceptedService where F: tonic::service::Interceptor, { @@ -616,16 +665,23 @@ pub mod registration_server { "/v1beta1.Registration/Register" => { #[allow(non_camel_case_types)] struct RegisterSvc(pub Arc); - impl tonic::server::UnaryService for RegisterSvc { + impl< + T: Registration, + > tonic::server::UnaryService + for RegisterSvc { type Response = super::Empty; - type Future = BoxFuture, tonic::Status>; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; fn call( &mut self, request: tonic::Request, ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = - async move { ::register(&inner, request).await }; + let fut = async move { + ::register(&inner, request).await + }; Box::pin(fut) } } @@ -652,14 +708,18 @@ pub mod registration_server { }; Box::pin(fut) } - _ => Box::pin(async move { - Ok(http::Response::builder() - .status(200) - .header("grpc-status", "12") - .header("content-type", "application/grpc") - .body(empty_body()) - .unwrap()) - }), + _ => { + Box::pin(async move { + Ok( + http::Response::builder() + .status(200) + .header("grpc-status", "12") + .header("content-type", "application/grpc") + .body(empty_body()) + .unwrap(), + ) + }) + } } } } @@ -701,11 +761,15 @@ pub mod device_plugin_server { async fn get_device_plugin_options( &self, request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; /// Server streaming response type for the ListAndWatch method. type ListAndWatchStream: tonic::codegen::tokio_stream::Stream< Item = std::result::Result, - > + Send + > + + Send + 'static; /// ListAndWatch returns a stream of List of Devices /// Whenever a Device state change or a Device disappears, ListAndWatch @@ -713,7 +777,10 @@ pub mod device_plugin_server { async fn list_and_watch( &self, request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; /// GetPreferredAllocation returns a preferred set of devices to allocate /// from a list of available ones. The resulting preferred allocation is not /// guaranteed to be the allocation ultimately performed by the @@ -722,21 +789,30 @@ pub mod device_plugin_server { async fn get_preferred_allocation( &self, request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; /// Allocate is called during container creation so that the Device /// Plugin can run device specific operations and instruct Kubelet /// of the steps to make the Device available in the container async fn allocate( &self, request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; /// PreStartContainer is called, if indicated by Device Plugin during registeration phase, /// before each container start. Device plugin can run device specific operations /// such as resetting the device before making devices available to the container async fn pre_start_container( &self, request: tonic::Request, - ) -> std::result::Result, tonic::Status>; + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; } /// DevicePlugin is the service advertised by Device Plugins #[derive(Debug)] @@ -762,7 +838,10 @@ pub mod device_plugin_server { max_encoding_message_size: None, } } - pub fn with_interceptor(inner: T, interceptor: F) -> InterceptedService + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> InterceptedService where F: tonic::service::Interceptor, { @@ -818,13 +897,23 @@ pub mod device_plugin_server { "/v1beta1.DevicePlugin/GetDevicePluginOptions" => { #[allow(non_camel_case_types)] struct GetDevicePluginOptionsSvc(pub Arc); - impl tonic::server::UnaryService for GetDevicePluginOptionsSvc { + impl tonic::server::UnaryService + for GetDevicePluginOptionsSvc { type Response = super::DevicePluginOptions; - type Future = BoxFuture, tonic::Status>; - fn call(&mut self, request: tonic::Request) -> Self::Future { + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { - ::get_device_plugin_options(&inner, request) + ::get_device_plugin_options( + &inner, + request, + ) .await }; Box::pin(fut) @@ -856,12 +945,20 @@ pub mod device_plugin_server { "/v1beta1.DevicePlugin/ListAndWatch" => { #[allow(non_camel_case_types)] struct ListAndWatchSvc(pub Arc); - impl tonic::server::ServerStreamingService for ListAndWatchSvc { + impl< + T: DevicePlugin, + > tonic::server::ServerStreamingService + for ListAndWatchSvc { type Response = super::ListAndWatchResponse; type ResponseStream = T::ListAndWatchStream; - type Future = - BoxFuture, tonic::Status>; - fn call(&mut self, request: tonic::Request) -> Self::Future { + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { ::list_and_watch(&inner, request).await @@ -895,19 +992,26 @@ pub mod device_plugin_server { "/v1beta1.DevicePlugin/GetPreferredAllocation" => { #[allow(non_camel_case_types)] struct GetPreferredAllocationSvc(pub Arc); - impl - tonic::server::UnaryService - for GetPreferredAllocationSvc - { + impl< + T: DevicePlugin, + > tonic::server::UnaryService + for GetPreferredAllocationSvc { type Response = super::PreferredAllocationResponse; - type Future = BoxFuture, tonic::Status>; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; fn call( &mut self, request: tonic::Request, ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { - ::get_preferred_allocation(&inner, request).await + ::get_preferred_allocation( + &inner, + request, + ) + .await }; Box::pin(fut) } @@ -938,16 +1042,23 @@ pub mod device_plugin_server { "/v1beta1.DevicePlugin/Allocate" => { #[allow(non_camel_case_types)] struct AllocateSvc(pub Arc); - impl tonic::server::UnaryService for AllocateSvc { + impl< + T: DevicePlugin, + > tonic::server::UnaryService + for AllocateSvc { type Response = super::AllocateResponse; - type Future = BoxFuture, tonic::Status>; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; fn call( &mut self, request: tonic::Request, ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = - async move { ::allocate(&inner, request).await }; + let fut = async move { + ::allocate(&inner, request).await + }; Box::pin(fut) } } @@ -977,19 +1088,23 @@ pub mod device_plugin_server { "/v1beta1.DevicePlugin/PreStartContainer" => { #[allow(non_camel_case_types)] struct PreStartContainerSvc(pub Arc); - impl - tonic::server::UnaryService - for PreStartContainerSvc - { + impl< + T: DevicePlugin, + > tonic::server::UnaryService + for PreStartContainerSvc { type Response = super::PreStartContainerResponse; - type Future = BoxFuture, tonic::Status>; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; fn call( &mut self, request: tonic::Request, ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { - ::pre_start_container(&inner, request).await + ::pre_start_container(&inner, request) + .await }; Box::pin(fut) } @@ -1017,14 +1132,18 @@ pub mod device_plugin_server { }; Box::pin(fut) } - _ => Box::pin(async move { - Ok(http::Response::builder() - .status(200) - .header("grpc-status", "12") - .header("content-type", "application/grpc") - .body(empty_body()) - .unwrap()) - }), + _ => { + Box::pin(async move { + Ok( + http::Response::builder() + .status(200) + .header("grpc-status", "12") + .header("content-type", "application/grpc") + .body(empty_body()) + .unwrap(), + ) + }) + } } } } diff --git a/agent/src/util/discovery_configuration_controller.rs b/agent/src/util/discovery_configuration_controller.rs index 496a783fa..8998e97cc 100644 --- a/agent/src/util/discovery_configuration_controller.rs +++ b/agent/src/util/discovery_configuration_controller.rs @@ -18,12 +18,12 @@ use crate::discovery_handler_manager::{ discovery_handler_registry::DiscoveryHandlerRegistry, DiscoveryError, }; -use kube::{Resource, ResourceExt}; -use kube_runtime::{ +use kube::runtime::{ controller::Action, reflector::{ObjectRef, Store}, Controller, }; +use kube::{Resource, ResourceExt}; use thiserror::Error; #[derive(Debug, Error)] diff --git a/controller/Cargo.toml b/controller/Cargo.toml index 1bee29183..fc063ac76 100644 --- a/controller/Cargo.toml +++ b/controller/Cargo.toml @@ -13,14 +13,17 @@ akri-shared = { path = "../shared" } anyhow = "1.0.38" async-std = "1.5.0" chrono = "0.4.10" -env_logger = "0.10.0" +either = "1.13" +env_logger = "0.11.5" futures = "0.3.1" -k8s-openapi = { version = "0.20.0", default-features = false, features = ["schemars", "v1_23"] } -kube = { version = "0.87.1", features = ["derive"] } -kube-runtime = "0.87.1" +k8s-openapi = { version = "0.22.0", default-features = false, features = ["schemars", "v1_25"] } +kube = { version = "0.91.0", features = ["runtime", "client", "derive" ] } lazy_static = "1.4" log = "0.4" -prometheus = { version = "0.12.0", features = ["process"] } +prometheus = { version = "0.13.4", features = ["process"] } +# Used for patch API +serde_json = "1.0.45" +thiserror = "1" tokio = { version = "1.0.2", features = ["full"] } [dev-dependencies] diff --git a/controller/src/main.rs b/controller/src/main.rs index 82d6d0c35..63181d8b7 100644 --- a/controller/src/main.rs +++ b/controller/src/main.rs @@ -2,11 +2,18 @@ extern crate lazy_static; mod util; -use akri_shared::akri::{metrics::run_metrics_server, API_NAMESPACE}; -use async_std::sync::Mutex; +use akri_shared::{ + akri::{metrics::run_metrics_server, API_NAMESPACE}, + k8s::AKRI_CONFIGURATION_LABEL_NAME, +}; +use futures::StreamExt; +use kube::runtime::{watcher::Config, Controller}; use prometheus::IntGaugeVec; use std::sync::Arc; -use util::{instance_action, node_watcher, pod_watcher}; +use util::{ + context::{InstanceControllerContext, NodeWatcherContext, PodWatcherContext}, + instance_action, node_watcher, pod_watcher, +}; /// Length of time to sleep between controller system validation checks pub const SYSTEM_CHECK_DELAY_SECS: u64 = 30; @@ -33,45 +40,44 @@ async fn main() -> Result<(), Box log::info!("{} Controller logging started", API_NAMESPACE); - let synchronization = Arc::new(Mutex::new(())); - let instance_watch_synchronization = synchronization.clone(); - let mut tasks = Vec::new(); - // Start server for prometheus metrics - tasks.push(tokio::spawn(async move { - run_metrics_server().await.unwrap(); - })); + tokio::spawn(run_metrics_server()); + let client = Arc::new(kube::Client::try_default().await?); + let node_watcher_ctx = Arc::new(NodeWatcherContext::new(client.clone())); + let pod_watcher_ctx = Arc::new(PodWatcherContext::new(client.clone())); + + node_watcher::check(client.clone()).await?; + let node_controller = Controller::new( + node_watcher_ctx.client.all().as_inner(), + Config::default().any_semantic(), + ) + .shutdown_on_signal() + .run( + node_watcher::reconcile, + node_watcher::error_policy, + node_watcher_ctx, + ) + .filter_map(|x| async move { std::result::Result::ok(x) }) + .for_each(|_| futures::future::ready(())); - // Handle existing instances - tasks.push(tokio::spawn({ - async move { - instance_action::handle_existing_instances().await.unwrap(); - } - })); - // Handle instance changes - tasks.push(tokio::spawn({ - async move { - instance_action::do_instance_watch(instance_watch_synchronization) - .await - .unwrap(); - } - })); - // Watch for node disappearance - tasks.push(tokio::spawn({ - async move { - let mut node_watcher = node_watcher::NodeWatcher::new(); - node_watcher.watch().await.unwrap(); - } - })); - // Watch for broker Pod state changes - tasks.push(tokio::spawn({ - async move { - let mut broker_pod_watcher = pod_watcher::BrokerPodWatcher::new(); - broker_pod_watcher.watch().await.unwrap(); - } - })); + pod_watcher::check(client.clone()).await?; + let pod_controller = Controller::new( + pod_watcher_ctx.client.all().as_inner(), + Config::default().labels(AKRI_CONFIGURATION_LABEL_NAME), + ) + .shutdown_on_signal() + .run( + pod_watcher::reconcile, + pod_watcher::error_policy, + pod_watcher_ctx, + ) + .filter_map(|x| async move { std::result::Result::ok(x) }) + .for_each(|_| futures::future::ready(())); - futures::future::try_join_all(tasks).await?; + tokio::select! { + _ = futures::future::join(node_controller, pod_controller) => {}, + _ = instance_action::run(Arc::new(InstanceControllerContext::new(client))) => {} + } log::info!("{} Controller end", API_NAMESPACE); Ok(()) diff --git a/controller/src/util/context.rs b/controller/src/util/context.rs new file mode 100644 index 000000000..b28b7a8e4 --- /dev/null +++ b/controller/src/util/context.rs @@ -0,0 +1,124 @@ +use std::collections::HashMap; +use std::sync::Arc; + +use akri_shared::akri::configuration::Configuration; +use akri_shared::akri::instance::Instance; +use akri_shared::k8s::api::IntoApi; + +use k8s_openapi::api::batch::v1::Job; +use k8s_openapi::api::core::v1::{Node, Pod, Service}; + +use tokio::sync::RwLock; + +/// Pod states that BrokerPodWatcher is interested in +/// +/// PodState describes the various states that the controller can +/// react to for Pods. +#[derive(Clone, Debug, PartialEq)] +pub enum PodState { + /// Pod is in Pending state and no action is needed. + Pending, + /// Pod is in Running state and needs to ensure that + /// instance and configuration services are running + Running, + /// Pod is in Failed/Completed/Succeeded state and + /// needs to remove any instance and configuration + /// services that are not supported by other Running + /// Pods. Also, at this point, if an Instance still + /// exists, instance_action::handle_instance_change + /// needs to be called to ensure that Pods are + /// restarted + Ended, + /// Pod is in Deleted state and needs to remove any + /// instance and configuration services that are not + /// supported by other Running Pods. Also, at this + /// point, if an Instance still exists, and the Pod is + /// owned by the Instance, + /// instance_action::handle_instance_change needs to be + /// called to ensure that Pods are restarted. Akri + /// places an Instance OwnerReference on all the Pods it + /// deploys. This declares that the Instance owns that + /// Pod and Akri's Controller explicitly manages its + /// deployment. However, if the Pod is not owned by the + /// Instance, Akri should not assume retry logic and + /// should cease action. The owning object (ie Job) will + /// handle retries as necessary. + Deleted, +} + +/// Node states that NodeWatcher is interested in +/// +/// NodeState describes the various states that the controller can +/// react to for Nodes. +#[derive(Clone, Debug, PartialEq)] +pub enum NodeState { + /// Node has been seen, but not Running yet + Known, + /// Node has been seen Running + Running, + /// A previously Running Node has been seen as not Running + /// and the Instances have been cleaned of references to that + /// vanished Node + InstancesCleaned, +} + +pub trait ControllerKubeClient: + IntoApi + + IntoApi + + IntoApi + + IntoApi + + IntoApi + + IntoApi +{ +} + +impl< + T: IntoApi + + IntoApi + + IntoApi + + IntoApi + + IntoApi + + IntoApi, + > ControllerKubeClient for T +{ +} + +pub struct NodeWatcherContext { + /// Kubernetes client + pub client: Arc, + pub known_nodes: Arc>>, +} + +impl NodeWatcherContext { + pub fn new(client: Arc) -> Self { + NodeWatcherContext { + client, + known_nodes: Arc::new(RwLock::new(HashMap::new())), + } + } +} + +pub struct PodWatcherContext { + /// Kubernetes client + pub client: Arc, + pub known_pods: Arc>>, +} + +impl PodWatcherContext { + pub fn new(client: Arc) -> Self { + PodWatcherContext { + client, + known_pods: Arc::new(RwLock::new(HashMap::new())), + } + } +} + +pub struct InstanceControllerContext(Arc); +impl InstanceControllerContext { + pub fn new(client: Arc) -> Self { + InstanceControllerContext(client) + } + pub fn client(&self) -> Arc { + Arc::clone(&self.0) + } +} diff --git a/controller/src/util/instance_action.rs b/controller/src/util/instance_action.rs index 412769c9a..7c6477b6c 100644 --- a/controller/src/util/instance_action.rs +++ b/controller/src/util/instance_action.rs @@ -1,159 +1,107 @@ use super::super::BROKER_POD_COUNT_METRIC; use super::{pod_action::PodAction, pod_action::PodActionInfo}; +use crate::util::context::{ControllerKubeClient, InstanceControllerContext}; +use crate::util::{ControllerError, Result}; +use akri_shared::akri::configuration::Configuration; +use akri_shared::k8s::api::Api; use akri_shared::{ akri::{configuration::BrokerSpec, instance::Instance, AKRI_PREFIX}, k8s::{ - self, job, pod, - pod::{AKRI_INSTANCE_LABEL_NAME, AKRI_TARGET_NODE_LABEL_NAME}, - KubeInterface, OwnershipInfo, OwnershipType, + job, pod, OwnershipInfo, OwnershipType, AKRI_INSTANCE_LABEL_NAME, + AKRI_TARGET_NODE_LABEL_NAME, }, }; -use async_std::sync::Mutex; -use futures::{StreamExt, TryStreamExt}; -use k8s_openapi::api::batch::v1::JobSpec; +use anyhow::Context; +use futures::StreamExt; +use k8s_openapi::api::batch::v1::{Job, JobSpec}; use k8s_openapi::api::core::v1::{Pod, PodSpec}; -use kube::api::Api; -use kube_runtime::watcher::{watcher, Config, Event}; -use kube_runtime::WatchStreamExt; -use log::{error, info, trace}; + +use kube::{ + api::{ListParams, ResourceExt}, + runtime::{ + controller::{Action, Controller}, + finalizer::{finalizer, Event}, + watcher::Config, + }, +}; +use log::{error, trace}; use std::collections::HashMap; use std::sync::Arc; + /// Length of time a Pod can be pending before we give up and retry pub const PENDING_POD_GRACE_PERIOD_MINUTES: i64 = 5; /// Length of time a Pod can be in an error state before we retry pub const FAILED_POD_GRACE_PERIOD_MINUTES: i64 = 0; -/// Instance action types +pub static INSTANCE_FINALIZER: &str = "instances.kube.rs"; + +/// Initialize the instance controller +/// TODO: consider passing state that is shared among controllers such as a metrics exporter +pub async fn run(ctx: Arc) { + let api = ctx.client().all().as_inner(); + if let Err(e) = api.list(&ListParams::default().limit(1)).await { + error!("Instance CRD is not queryable; {e:?}. Is the CRD installed?"); + std::process::exit(1); + } + Controller::new(api, Config::default().any_semantic()) + .shutdown_on_signal() + .run(reconcile, error_policy, ctx.clone()) + .filter_map(|x| async move { std::result::Result::ok(x) }) + .for_each(|_| futures::future::ready(())) + .await; +} + +fn error_policy( + _instance: Arc, + error: &ControllerError, + _ctx: Arc, +) -> Action { + log::warn!("reconcile failed: {:?}", error); + Action::requeue(std::time::Duration::from_secs(5 * 60)) +} + +/// Instance event types /// /// Instance actions describe the types of actions the Controller can /// react to for Instances. /// /// This will determine what broker management actions to take (if any) /// -/// | --> InstanceAction::Add -/// | --> No broker => Do nothing -/// | --> => Deploy a Job -/// | --> => Deploy Pod to each Node on Instance's `nodes` list (up to `capacity` total) -/// | --> InstanceAction::Remove +/// | --> Instance Applied /// | --> No broker => Do nothing -/// | --> => Delete all Jobs labeled with the Instance name -/// | --> => Delete all Pods labeled with the Instance name -/// | --> InstanceAction::Update -/// | --> No broker => Do nothing -/// | --> => No nothing -/// | --> => Ensure that each Node on Instance's `nodes` list (up to `capacity` total) have a Pod -/// -#[derive(Clone, Debug, PartialEq)] -pub enum InstanceAction { - /// An Instance is added - Add, - /// An Instance is removed - Remove, - /// An Instance is updated - Update, -} - -/// This invokes an internal method that watches for Instance events -pub async fn handle_existing_instances( -) -> Result<(), Box> { - internal_handle_existing_instances(&k8s::KubeImpl::new().await?).await +/// | --> => Deploy a Job if one does not exist +/// | --> => Ensure that each Node on Instance's `nodes` list (up to `capacity` total) have a Pod. +/// Deploy Pods as necessary + +/// This function is the main Reconcile function for Instance resources +/// This will get called every time an Instance gets added or is changed, it will also be called for every existing instance on startup. +pub async fn reconcile( + instance: Arc, + ctx: Arc, +) -> Result { + let ns = instance.namespace().unwrap(); // instance has namespace scope + trace!("Reconciling {} in {}", instance.name_any(), ns); + finalizer( + &ctx.client().all().as_inner(), + INSTANCE_FINALIZER, + instance, + |event| reconcile_inner(event, ctx.client()), + ) + .await + .map_err(|e| ControllerError::FinalizerError(Box::new(e))) } -/// This invokes an internal method that watches for Instance events -pub async fn do_instance_watch( - synchronization: Arc>, -) -> Result<(), Box> { - // Watch for instance changes - internal_do_instance_watch(&synchronization, &k8s::KubeImpl::new().await?).await -} - -/// This invokes an internal method that watches for Instance events -async fn internal_handle_existing_instances( - kube_interface: &impl KubeInterface, -) -> Result<(), Box> { - let mut tasks = Vec::new(); - - // Handle existing instances - let pre_existing_instances = kube_interface.get_instances().await?; - for instance in pre_existing_instances { - tasks.push(tokio::spawn(async move { - let inner_kube_interface = k8s::KubeImpl::new().await.unwrap(); - handle_instance_change(&instance, &InstanceAction::Update, &inner_kube_interface) - .await - .unwrap(); - })); - } - futures::future::try_join_all(tasks).await?; - Ok(()) -} - -async fn internal_do_instance_watch( - synchronization: &Arc>, - kube_interface: &impl KubeInterface, -) -> Result<(), Box> { - trace!("internal_do_instance_watch - enter"); - let resource = Api::::all(kube_interface.get_kube_client()); - let watcher = watcher(resource, Config::default()).default_backoff(); - let mut informer = watcher.boxed(); - let mut first_event = true; - // Currently, this does not handle None except to break the loop. - loop { - let event = match informer.try_next().await { - Err(e) => { - error!("Error during watch: {}", e); - continue; - } - Ok(None) => break, - Ok(Some(event)) => event, - }; - // Aquire lock to ensure cleanup_instance_and_configuration_svcs and the - // inner loop handle_instance call in internal_do_instance_watch - // cannot execute at the same time. - let _lock = synchronization.lock().await; - trace!("internal_do_instance_watch - aquired sync lock"); - handle_instance(event, kube_interface, &mut first_event).await?; - } - Ok(()) -} - -/// This takes an event off the Instance stream and delegates it to the -/// correct function based on the event type. -async fn handle_instance( +async fn reconcile_inner( event: Event, - kube_interface: &impl KubeInterface, - first_event: &mut bool, -) -> anyhow::Result<()> { - trace!("handle_instance - enter"); + client: Arc, +) -> Result { match event { - Event::Applied(instance) => { - info!( - "handle_instance - added or modified Akri Instance {:?}: {:?}", - instance.metadata.name, instance.spec - ); - // TODO: consider renaming `InstanceAction::Add` to `InstanceAction::AddOrUpdate` - // to reflect that this could also be an Update event. Or as we do more specific - // inspection in future, delineation may be useful. - handle_instance_change(&instance, &InstanceAction::Add, kube_interface).await?; - } - Event::Deleted(instance) => { - info!( - "handle_instance - deleted Akri Instance {:?}: {:?}", - instance.metadata.name, instance.spec - ); - handle_instance_change(&instance, &InstanceAction::Remove, kube_interface).await?; - } - Event::Restarted(_instances) => { - if *first_event { - info!("handle_instance - watcher started"); - } else { - return Err(anyhow::anyhow!( - "Instance watcher restarted - throwing error to restart controller" - )); - } + Event::Apply(instance) => handle_instance_change(&instance, client).await, + Event::Cleanup(_) => { + // Do nothing. OwnerReferences are attached to Jobs and Pods to automate cleanup + Ok(default_requeue_action()) } } - *first_event = false; - Ok(()) } /// PodContext stores a set of details required to track/create/delete broker @@ -163,7 +111,6 @@ async fn handle_instance( /// specific Node's protocol broker Pod. /// /// * the node is described by node_name -/// * the protocol (or capability) is described by instance_name and namespace /// * what to do with the broker Pod is described by action // TODO: add Pod name so does not need to be // generated on deletes and remove Option wrappers. @@ -175,24 +122,21 @@ pub(crate) struct PodContext { } pub(crate) fn create_pod_context(k8s_pod: &Pod, action: PodAction) -> anyhow::Result { - let pod_name = k8s_pod.metadata.name.as_ref().unwrap(); - let labels = &k8s_pod - .metadata - .labels - .as_ref() - .ok_or_else(|| anyhow::anyhow!("no labels found for Pod {:?}", pod_name))?; // Early exits above ensure unwrap will not panic - let node_to_run_pod_on = labels.get(AKRI_TARGET_NODE_LABEL_NAME).ok_or_else(|| { - anyhow::anyhow!( - "no {} label found for {:?}", - AKRI_TARGET_NODE_LABEL_NAME, - pod_name - ) - })?; + let node_to_run_pod_on = &k8s_pod + .labels() + .get(AKRI_TARGET_NODE_LABEL_NAME) + .ok_or_else(|| { + anyhow::anyhow!( + "no {} label found for {:?}", + AKRI_TARGET_NODE_LABEL_NAME, + k8s_pod.name_unchecked() + ) + })?; Ok(PodContext { node_name: Some(node_to_run_pod_on.to_string()), - namespace: k8s_pod.metadata.namespace.clone(), + namespace: k8s_pod.namespace(), action, }) } @@ -202,10 +146,9 @@ pub(crate) fn create_pod_context(k8s_pod: &Pod, action: PodAction) -> anyhow::Re /// it will update the nodes_to_act_on map with the required action. fn determine_action_for_pod( k8s_pod: &Pod, - action: &InstanceAction, nodes_to_act_on: &mut HashMap, ) -> anyhow::Result<()> { - let pod_name = k8s_pod.metadata.name.as_ref().unwrap(); + let pod_name = k8s_pod.name_unchecked(); let pod_phase = k8s_pod .status .as_ref() @@ -214,36 +157,34 @@ fn determine_action_for_pod( .as_ref() .ok_or_else(|| anyhow::anyhow!("No pod phase found for Pod {:?}", pod_name))?; - let mut update_pod_context = create_pod_context(k8s_pod, PodAction::NoAction)?; - let node_to_run_pod_on = update_pod_context.node_name.as_ref().unwrap(); + let mut ctx = create_pod_context(k8s_pod, PodAction::NoAction)?; // Early exits above ensure unwrap will not panic let pod_start_time = k8s_pod.status.as_ref().unwrap().start_time.clone(); + let node_to_run_pod_on = ctx.node_name.as_ref().unwrap(); let pod_action_info = PodActionInfo { pending_grace_time_in_minutes: PENDING_POD_GRACE_PERIOD_MINUTES, ended_grace_time_in_minutes: FAILED_POD_GRACE_PERIOD_MINUTES, phase: pod_phase.to_string(), - instance_action: action.clone(), status_start_time: pod_start_time, unknown_node: !nodes_to_act_on.contains_key(node_to_run_pod_on), - trace_node_name: k8s_pod.metadata.name.clone().unwrap(), + trace_pod_name: k8s_pod.name_unchecked(), }; - update_pod_context.action = pod_action_info.select_pod_action()?; - nodes_to_act_on.insert(node_to_run_pod_on.to_string(), update_pod_context); + ctx.action = pod_action_info.select_pod_action()?; + nodes_to_act_on.insert(node_to_run_pod_on.to_string(), ctx); Ok(()) } -/// This handles Instance deletion event by deleting the -/// broker Pod, the broker Service (if there are no remaining broker Pods), -/// and the capability Service (if there are no remaining capability Pods). +/// This deliberately deletes the broker Pod, the broker Service (if there are no remaining broker Pods), and the configuration service (if there are no remaining capability Pods). +/// This is done before recreating the broker Pod and svcs async fn handle_deletion_work( instance_name: &str, configuration_name: &str, instance_shared: bool, node_to_delete_pod: &str, context: &PodContext, - kube_interface: &impl KubeInterface, + api: &dyn Api, ) -> anyhow::Result<()> { let context_node_name = context.node_name.as_ref().ok_or_else(|| { anyhow::anyhow!( @@ -252,18 +193,11 @@ async fn handle_deletion_work( context ) })?; - let context_namespace = context.namespace.as_ref().ok_or_else(|| { - anyhow::anyhow!( - "handle_deletion_work - Context namespace is missing for {}: {:?}", - node_to_delete_pod, - context - ) - })?; trace!( "handle_deletion_work - pod::create_broker_app_name({:?}, {:?}, {:?}, {:?})", &instance_name, - context_node_name, + context.node_name, instance_shared, "pod" ); @@ -276,111 +210,34 @@ async fn handle_deletion_work( trace!( "handle_deletion_work - pod::remove_pod name={:?}, namespace={:?}", &pod_app_name, - &context_namespace + &context.namespace ); - kube_interface - .remove_pod(&pod_app_name, context_namespace) - .await?; - trace!("handle_deletion_work - pod::remove_pod succeeded",); + api.delete(&pod_app_name).await?; + trace!("handle_deletion_work - pod::remove_pod succeeded"); BROKER_POD_COUNT_METRIC .with_label_values(&[configuration_name, context_node_name]) .dec(); Ok(()) } -#[cfg(test)] -mod handle_deletion_work_tests { - use super::*; - use akri_shared::k8s::MockKubeInterface; - - #[tokio::test] - async fn test_handle_deletion_work_with_no_node_name() { - let _ = env_logger::builder().is_test(true).try_init(); - - let context = PodContext { - node_name: None, - namespace: Some("namespace".into()), - action: PodAction::NoAction, - }; - - assert!(handle_deletion_work( - "instance_name", - "configuration_name", - true, - "node_to_delete_pod", - &context, - &MockKubeInterface::new(), - ) - .await - .is_err()); - } - - #[tokio::test] - async fn test_handle_deletion_work_with_no_namespace() { - let _ = env_logger::builder().is_test(true).try_init(); - - let context = PodContext { - node_name: Some("node-a".into()), - namespace: None, - action: PodAction::NoAction, - }; - - assert!(handle_deletion_work( - "instance_name", - "configuration_name", - true, - "node_to_delete_pod", - &context, - &MockKubeInterface::new(), - ) - .await - .is_err()); - } -} - /// This handles Instance addition event by creating the -/// broker Pod, the broker Service, and the capability Service. -/// TODO: reduce parameters by passing Instance object instead of -/// individual fields -#[allow(clippy::too_many_arguments)] +/// broker Pod. async fn handle_addition_work( - instance_name: &str, - instance_uid: &str, - instance_namespace: &str, - instance_class_name: &str, - instance_shared: bool, + api: &dyn Api, + pod: Pod, + configuration_name: &str, new_node: &str, - podspec: &PodSpec, - kube_interface: &impl KubeInterface, ) -> anyhow::Result<()> { trace!( "handle_addition_work - Create new Pod for Node={:?}", new_node ); - let capability_id = format!("{}/{}", AKRI_PREFIX, instance_name); - let new_pod = pod::create_new_pod_from_spec( - instance_namespace, - instance_name, - instance_class_name, - OwnershipInfo::new( - OwnershipType::Instance, - instance_name.to_string(), - instance_uid.to_string(), - ), - &capability_id, - new_node, - instance_shared, - podspec, - )?; - - trace!("handle_addition_work - New pod spec={:?}", new_pod); - kube_interface - .create_pod(&new_pod, instance_namespace) - .await?; + trace!("handle_addition_work - New pod spec={:?}", pod); + api.apply(pod, INSTANCE_FINALIZER).await?; trace!("handle_addition_work - pod::create_pod succeeded",); BROKER_POD_COUNT_METRIC - .with_label_values(&[instance_class_name, new_node]) + .with_label_values(&[configuration_name, new_node]) .inc(); Ok(()) @@ -391,150 +248,96 @@ async fn handle_addition_work( /// 2) calling the appropriate handler depending on the broker type (Pod or Job) if any pub async fn handle_instance_change( instance: &Instance, - action: &InstanceAction, - kube_interface: &impl KubeInterface, -) -> anyhow::Result<()> { - trace!("handle_instance_change - enter {:?}", action); - let instance_name = instance.metadata.name.clone().unwrap(); - let instance_namespace = - instance.metadata.namespace.as_ref().ok_or_else(|| { - anyhow::anyhow!("Namespace not found for instance: {}", &instance_name) - })?; - let configuration = match kube_interface - .find_configuration(&instance.spec.configuration_name, instance_namespace) - .await - { - Ok(config) => config, - _ => { - if action != &InstanceAction::Remove { - // In this scenario, a configuration has been deleted without the Akri Agent deleting the associated Instances. - // Furthermore, Akri Agent is still modifying the Instances. This should not happen beacuse Agent - // is designed to shutdown when it's Configuration watcher fails. - error!( - "handle_instance_change - no configuration found for {:?} yet instance {:?} exists - check that device plugin is running properly", - &instance.spec.configuration_name, &instance.metadata.name + client: Arc, +) -> Result { + trace!("handle_instance_change - enter"); + let instance_namespace = instance.namespace().unwrap(); + let api: Box> = client.namespaced(&instance_namespace); + let Ok(Some(configuration)) = api.get(&instance.spec.configuration_name).await else { + // In this scenario, a configuration has been deleted without the Akri Agent deleting the associated Instances. + // Furthermore, Akri Agent is still modifying the Instances. This should not happen beacuse Agent + // is designed to shutdown when it's Configuration watcher fails. + error!("handle_instance_change - no configuration found for {:?} yet instance {:?} exists - check that device plugin is running properly", + &instance.spec.configuration_name, &instance.name_unchecked() ); - } - return Ok(()); - } + + return Ok(default_requeue_action()); }; - if let Some(broker_spec) = &configuration.spec.broker_spec { - let instance_change_result = match broker_spec { - BrokerSpec::BrokerPodSpec(p) => { - handle_instance_change_pod(instance, p, action, kube_interface).await - } - BrokerSpec::BrokerJobSpec(j) => { - handle_instance_change_job( - instance, - *configuration.metadata.generation.as_ref().unwrap(), - j, - action, - kube_interface, - ) - .await - } - }; - if let Err(e) = instance_change_result { - error!("Unable to handle Broker action: {:?}", e); + let Some(broker_spec) = &configuration.spec.broker_spec else { + return Ok(default_requeue_action()); + }; + let res = match broker_spec { + BrokerSpec::BrokerPodSpec(p) => handle_instance_change_pod(instance, p, client).await, + BrokerSpec::BrokerJobSpec(j) => { + handle_instance_change_job( + instance, + *configuration.metadata.generation.as_ref().unwrap(), + j, + client.clone(), + ) + .await } + }; + if let Err(e) = res { + error!("Unable to handle Broker action: {:?}", e); } - Ok(()) + Ok(default_requeue_action()) } /// Called when an Instance has changed that requires a Job broker. Action determined by InstanceAction. -/// InstanceAction::Add => Deploy a Job with JobSpec from Configuration. Label with Instance name. -/// InstanceAction::Remove => Delete all Jobs labeled with the Instance name -/// InstanceAction::Update => No nothing +/// First check if a job with the instance name exists. If it does, do nothing. Otherwise, deploy a Job +/// with JobSpec from Configuration and label with Instance name. pub async fn handle_instance_change_job( instance: &Instance, config_generation: i64, job_spec: &JobSpec, - action: &InstanceAction, - kube_interface: &impl KubeInterface, + client: Arc, ) -> anyhow::Result<()> { - trace!("handle_instance_change_job - enter {:?}", action); + trace!("handle_instance_change_job - enter"); + let api: Box> = client.namespaced(&instance.namespace().unwrap()); + if api.get(&instance.name_unchecked()).await?.is_some() { + // Job already exists, do nothing + return Ok(()); + } + let instance_name = instance.name_unchecked(); // Create name for Job. Includes Configuration generation in the suffix // to track what version of the Configuration the Job is associated with. let job_name = pod::create_broker_app_name( - instance.metadata.name.as_ref().unwrap(), + &instance_name, None, instance.spec.shared, &format!("{}-job", config_generation), ); - let instance_name = instance.metadata.name.as_ref().unwrap(); - let instance_namespace = instance.metadata.namespace.as_ref().unwrap(); - let instance_uid = instance - .metadata - .uid - .as_ref() - .ok_or_else(|| anyhow::anyhow!("UID not found for instance: {}", &instance_name))?; - match action { - InstanceAction::Add => { - trace!("handle_instance_change_job - instance added"); - let capability_id = format!("{}/{}", AKRI_PREFIX, instance_name); - let new_job = job::create_new_job_from_spec( - instance, - OwnershipInfo::new( - OwnershipType::Instance, - instance_name.to_string(), - instance_uid.to_string(), - ), - &capability_id, - job_spec, - &job_name, - )?; - kube_interface - .create_job(&new_job, instance_namespace) - .await?; - } - InstanceAction::Remove => { - trace!("handle_instance_change_job - instance removed"); - // Find all jobs with the label - let instance_jobs = kube_interface - .find_jobs_with_label(&format!("{}={}", AKRI_INSTANCE_LABEL_NAME, instance_name)) - .await?; - let delete_tasks = instance_jobs.into_iter().map(|j| async move { - kube_interface - .remove_job( - j.metadata.name.as_ref().unwrap(), - j.metadata.namespace.as_ref().unwrap(), - ) - .await - }); - - futures::future::try_join_all(delete_tasks).await?; - } - InstanceAction::Update => { - trace!("handle_instance_change_job - instance updated"); - // TODO: Broker could have encountered unexpected admission error and need to be removed and added - } - } + trace!("handle_instance_change_job - instance added"); + let capability_id = format!("{}/{}", AKRI_PREFIX, instance_name); + let new_job = job::create_new_job_from_spec( + instance, + OwnershipInfo::new( + OwnershipType::Instance, + instance_name, + instance.uid().unwrap(), + ), + &capability_id, + job_spec, + &job_name, + )?; + let api: Box> = client.namespaced(&instance.namespace().unwrap()); + // TODO: Consider using server side apply instead of create + api.create(&new_job).await?; Ok(()) } /// Called when an Instance has changed that requires a Pod broker. -/// Action determined by InstanceAction and changes to the Instance's `nodes` list. -/// Starts broker Pods that are missing and stops Pods that are no longer needed. -/// InstanceAction::Add => Deploy Pod to each Node on Instance's `nodes` list (up to `capacity` total) -/// InstanceAction::Remove => Delete all Pods labeled with the Instance name -/// InstanceAction::Update => Ensure that each Node on Instance's `nodes` list (up to `capacity` total) have a Pod +/// Ensures that each Node on Instance's `nodes` list (up to `capacity` total) has a running Pod pub async fn handle_instance_change_pod( instance: &Instance, podspec: &PodSpec, - action: &InstanceAction, - kube_interface: &impl KubeInterface, + client: Arc, ) -> anyhow::Result<()> { - trace!("handle_instance_change_pod - enter {:?}", action); - - let instance_name = instance.metadata.name.clone().unwrap(); - - // If InstanceAction::Remove, assume all nodes require PodAction::NoAction (reflect that there is no running Pod unless we find one) - // Otherwise, assume all nodes require PodAction::Add (reflect that there is no running Pod, unless we find one) - let default_action = match action { - InstanceAction::Remove => PodAction::NoAction, - _ => PodAction::Add, - }; + trace!("handle_instance_change_pod - enter"); + // Assume all nodes require PodAction::Add (reflect that there is no running Pod, unless we find one) + let default_action = PodAction::Add; let mut nodes_to_act_on: HashMap = instance .spec .nodes @@ -550,38 +353,31 @@ pub async fn handle_instance_change_pod( ) }) .collect(); - trace!( - "handle_instance_change - nodes tracked from instance={:?}", - nodes_to_act_on - ); - trace!( - "handle_instance_change - find all pods that have {}={}", + let lp = ListParams::default().labels(&format!( + "{}={}", AKRI_INSTANCE_LABEL_NAME, - instance_name - ); - let instance_pods = kube_interface - .find_pods_with_label(&format!("{}={}", AKRI_INSTANCE_LABEL_NAME, instance_name)) - .await?; + instance.name_unchecked() + )); + let api = client.namespaced(&instance.namespace().context("no namespace")?); + let instance_pods = api.list(&lp).await?; trace!( "handle_instance_change - found {} pods", instance_pods.items.len() ); - - trace!("handle_instance_change - update actions based on the existing pods"); // By default, assume any pod tracked by the instance need to be added. // Query the existing pods to see if some of these are already added, or // need to be removed instance_pods .items .iter() - .try_for_each(|x| determine_action_for_pod(x, action, &mut nodes_to_act_on))?; + .try_for_each(|x| determine_action_for_pod(x, &mut nodes_to_act_on))?; trace!( "handle_instance_change - nodes tracked after querying existing pods={:?}", nodes_to_act_on ); - do_pod_action_for_nodes(nodes_to_act_on, instance, podspec, kube_interface).await?; + do_pod_action_for_nodes(nodes_to_act_on, instance, podspec, api).await?; trace!("handle_instance_change - exit"); Ok(()) @@ -591,7 +387,7 @@ pub(crate) async fn do_pod_action_for_nodes( nodes_to_act_on: HashMap, instance: &Instance, podspec: &PodSpec, - kube_interface: &impl KubeInterface, + api: Box>, ) -> anyhow::Result<()> { trace!("do_pod_action_for_nodes - enter"); // Iterate over nodes_to_act_on where value == (PodAction::Remove | PodAction::RemoveAndAdd) @@ -599,12 +395,12 @@ pub(crate) async fn do_pod_action_for_nodes( ((v.action) == PodAction::Remove) | ((v.action) == PodAction::RemoveAndAdd) }) { handle_deletion_work( - instance.metadata.name.as_ref().unwrap(), + &instance.name_unchecked(), &instance.spec.configuration_name, instance.spec.shared, node_to_delete_pod, context, - kube_interface, + api.as_ref(), ) .await? } @@ -622,118 +418,54 @@ pub(crate) async fn do_pod_action_for_nodes( .collect::>(); // Iterate over nodes_to_act_on where value == (PodAction::Add | PodAction::RemoveAndAdd) + let instance_name = instance.name_unchecked(); + let capability_id = format!("{}/{}", AKRI_PREFIX, instance_name); for new_node in nodes_to_add { - handle_addition_work( - instance.metadata.name.as_ref().unwrap(), - instance.metadata.uid.as_ref().unwrap(), - instance.metadata.namespace.as_ref().unwrap(), + let new_pod = pod::create_new_pod_from_spec( + &instance.namespace().unwrap(), + &instance_name, &instance.spec.configuration_name, - instance.spec.shared, + OwnershipInfo::new( + OwnershipType::Instance, + instance_name.clone(), + instance.uid().unwrap(), + ), + &capability_id, &new_node, + instance.spec.shared, podspec, - kube_interface, + )?; + handle_addition_work( + api.as_ref(), + new_pod, + &instance.spec.configuration_name, + &new_node, ) .await?; } Ok(()) } +// Default action for finalizers for the instance controller +fn default_requeue_action() -> Action { + Action::await_change() +} + #[cfg(test)] mod handle_instance_tests { - use super::super::shared_test_utils::config_for_tests; - use super::super::shared_test_utils::config_for_tests::PodList; + use crate::util::shared_test_utils::mock_client::MockControllerKubeClient; + + use super::super::shared_test_utils::config_for_tests::*; use super::*; use akri_shared::{ akri::instance::Instance, - k8s::{pod::AKRI_INSTANCE_LABEL_NAME, MockKubeInterface}, + k8s::{api::MockApi, pod::AKRI_INSTANCE_LABEL_NAME}, os::file, }; use chrono::prelude::*; use chrono::Utc; use mockall::predicate::*; - fn configure_find_pods_with_phase( - mock: &mut MockKubeInterface, - pod_selector: &'static str, - result_file: &'static str, - specified_phase: &'static str, - ) { - trace!( - "mock.expect_find_pods_with_label pod_selector:{}", - pod_selector - ); - mock.expect_find_pods_with_label() - .times(1) - .withf(move |selector| selector == pod_selector) - .returning(move |_| { - let pods_json = file::read_file_to_string(result_file); - let phase_adjusted_json = pods_json.replace( - "\"phase\": \"Running\"", - &format!("\"phase\": \"{}\"", specified_phase), - ); - let pods: PodList = serde_json::from_str(&phase_adjusted_json).unwrap(); - Ok(pods) - }); - } - - fn configure_find_pods_with_phase_and_start_time( - mock: &mut MockKubeInterface, - pod_selector: &'static str, - result_file: &'static str, - specified_phase: &'static str, - start_time: DateTime, - ) { - trace!( - "mock.expect_find_pods_with_label pod_selector:{}", - pod_selector - ); - mock.expect_find_pods_with_label() - .times(1) - .withf(move |selector| selector == pod_selector) - .returning(move |_| { - let pods_json = file::read_file_to_string(result_file); - let phase_adjusted_json = pods_json.replace( - "\"phase\": \"Running\"", - &format!("\"phase\": \"{}\"", specified_phase), - ); - let start_time_adjusted_json = phase_adjusted_json.replace( - "\"startTime\": \"2020-02-25T20:48:03Z\"", - &format!( - "\"startTime\": \"{}\"", - start_time.format("%Y-%m-%dT%H:%M:%SZ") - ), - ); - let pods: PodList = serde_json::from_str(&start_time_adjusted_json).unwrap(); - Ok(pods) - }); - } - - fn configure_find_pods_with_phase_and_no_start_time( - mock: &mut MockKubeInterface, - pod_selector: &'static str, - result_file: &'static str, - specified_phase: &'static str, - ) { - trace!( - "mock.expect_find_pods_with_label pod_selector:{}", - pod_selector - ); - mock.expect_find_pods_with_label() - .times(1) - .withf(move |selector| selector == pod_selector) - .returning(move |_| { - let pods_json = file::read_file_to_string(result_file); - let phase_adjusted_json = pods_json.replace( - "\"phase\": \"Running\"", - &format!("\"phase\": \"{}\"", specified_phase), - ); - let start_time_adjusted_json = - phase_adjusted_json.replace("\"startTime\": \"2020-02-25T20:48:03Z\",", ""); - let pods: PodList = serde_json::from_str(&start_time_adjusted_json).unwrap(); - Ok(pods) - }); - } - #[derive(Clone)] struct HandleInstanceWork { find_pods_selector: &'static str, @@ -754,10 +486,11 @@ mod handle_instance_tests { } fn configure_for_handle_instance_change( - mock: &mut MockKubeInterface, + mock: &mut MockControllerKubeClient, work: &HandleInstanceWork, ) { - config_for_tests::configure_find_config( + let mut mock_pod_api: MockApi = MockApi::new(); + configure_find_config( mock, work.config_work.find_config_name, work.config_work.find_config_namespace, @@ -767,7 +500,7 @@ mod handle_instance_tests { if let Some(phase) = work.find_pods_phase { if let Some(start_time) = work.find_pods_start_time { configure_find_pods_with_phase_and_start_time( - mock, + &mut mock_pod_api, work.find_pods_selector, work.find_pods_result, phase, @@ -775,35 +508,39 @@ mod handle_instance_tests { ); } else if work.find_pods_delete_start_time { configure_find_pods_with_phase_and_no_start_time( - mock, + &mut mock_pod_api, work.find_pods_selector, work.find_pods_result, phase, ); } else { configure_find_pods_with_phase( - mock, + &mut mock_pod_api, work.find_pods_selector, work.find_pods_result, phase, ); } } else { - config_for_tests::configure_find_pods( - mock, + configure_find_pods( + &mut mock_pod_api, work.find_pods_selector, + work.config_work.find_config_namespace, work.find_pods_result, false, ); } if let Some(deletion_work) = &work.deletion_work { - configure_for_handle_deletion_work(mock, deletion_work); + configure_for_handle_deletion_work(&mut mock_pod_api, deletion_work); } if let Some(addition_work) = &work.addition_work { - configure_for_handle_addition_work(mock, addition_work); + configure_for_handle_addition_work(&mut mock_pod_api, addition_work); } + mock.pod + .expect_namespaced() + .return_once(move |_| Box::new(mock_pod_api)); } #[derive(Clone)] @@ -821,20 +558,12 @@ mod handle_instance_tests { } } - fn configure_deletion_work_for_config_a_b494b6() -> HandleDeletionWork { - HandleDeletionWork { - broker_pod_names: vec!["config-a-b494b6-pod"], - // instance_svc_names: vec!["config-a-b494b6-svc"], - cleanup_namespaces: vec!["config-a-namespace"], - } - } - - fn configure_for_handle_deletion_work(mock: &mut MockKubeInterface, work: &HandleDeletionWork) { + fn configure_for_handle_deletion_work(mock: &mut MockApi, work: &HandleDeletionWork) { for i in 0..work.broker_pod_names.len() { let broker_pod_name = work.broker_pod_names[i]; let cleanup_namespace = work.cleanup_namespaces[i]; - config_for_tests::configure_remove_pod(mock, broker_pod_name, cleanup_namespace); + configure_remove_pod(mock, broker_pod_name, cleanup_namespace); } } @@ -870,10 +599,10 @@ mod handle_instance_tests { } } - fn configure_for_handle_addition_work(mock: &mut MockKubeInterface, work: &HandleAdditionWork) { + fn configure_for_handle_addition_work(mock_api: &mut MockApi, work: &HandleAdditionWork) { for i in 0..work.new_pod_names.len() { - config_for_tests::configure_add_pod( - mock, + configure_add_pod( + mock_api, work.new_pod_names[i], work.new_pod_namespaces[i], AKRI_INSTANCE_LABEL_NAME, @@ -884,62 +613,23 @@ mod handle_instance_tests { } async fn run_handle_instance_change_test( - mock: &mut MockKubeInterface, + client: Arc, instance_file: &'static str, - action: &'static InstanceAction, ) { trace!("run_handle_instance_change_test enter"); let instance_json = file::read_file_to_string(instance_file); let instance: Instance = serde_json::from_str(&instance_json).unwrap(); - handle_instance( - match action { - InstanceAction::Add | InstanceAction::Update => Event::Applied(instance), - InstanceAction::Remove => Event::Deleted(instance), - }, - mock, - &mut false, - ) - .await - .unwrap(); + reconcile_inner(Event::Apply(Arc::new(instance)), client) + .await + .unwrap(); trace!("run_handle_instance_change_test exit"); } - // Test that watcher errors on restarts unless it is the first restart (aka initial startup) - #[tokio::test] - async fn test_handle_watcher_restart() { - let _ = env_logger::builder().is_test(true).try_init(); - let mut first_event = true; - assert!(handle_instance( - Event::Restarted(Vec::new()), - &MockKubeInterface::new(), - &mut first_event - ) - .await - .is_ok()); - first_event = false; - assert!(handle_instance( - Event::Restarted(Vec::new()), - &MockKubeInterface::new(), - &mut first_event - ) - .await - .is_err()); - } - - #[tokio::test] - async fn test_internal_handle_existing_instances_no_instances() { - let _ = env_logger::builder().is_test(true).try_init(); - - let mut mock = MockKubeInterface::new(); - config_for_tests::configure_get_instances(&mut mock, "../test/json/empty-list.json", false); - internal_handle_existing_instances(&mock).await.unwrap(); - } - #[tokio::test] async fn test_handle_instance_change_for_add_new_local_instance() { let _ = env_logger::builder().is_test(true).try_init(); - let mut mock = MockKubeInterface::new(); + let mut mock = MockControllerKubeClient::default(); configure_for_handle_instance_change( &mut mock, &HandleInstanceWork { @@ -953,19 +643,14 @@ mod handle_instance_tests { addition_work: Some(configure_add_local_config_a_b494b6(false)), }, ); - run_handle_instance_change_test( - &mut mock, - "../test/json/local-instance.json", - &InstanceAction::Add, - ) - .await; + run_handle_instance_change_test(Arc::new(mock), "../test/json/local-instance.json").await; } #[tokio::test] async fn test_handle_instance_change_for_add_new_local_instance_error() { let _ = env_logger::builder().is_test(true).try_init(); - let mut mock = MockKubeInterface::new(); + let mut mock = MockControllerKubeClient::default(); configure_for_handle_instance_change( &mut mock, &HandleInstanceWork { @@ -979,45 +664,14 @@ mod handle_instance_tests { addition_work: Some(configure_add_local_config_a_b494b6(true)), }, ); - run_handle_instance_change_test( - &mut mock, - "../test/json/local-instance.json", - &InstanceAction::Add, - ) - .await; - } - - #[tokio::test] - async fn test_handle_instance_change_for_remove_running_local_instance() { - let _ = env_logger::builder().is_test(true).try_init(); - - let mut mock = MockKubeInterface::new(); - configure_for_handle_instance_change( - &mut mock, - &HandleInstanceWork { - find_pods_selector: "akri.sh/instance=config-a-b494b6", - find_pods_result: "../test/json/running-pod-list-for-config-a-local.json", - find_pods_phase: None, - find_pods_start_time: None, - find_pods_delete_start_time: false, - config_work: get_config_work(), - deletion_work: Some(configure_deletion_work_for_config_a_b494b6()), - addition_work: None, - }, - ); - run_handle_instance_change_test( - &mut mock, - "../test/json/local-instance.json", - &InstanceAction::Remove, - ) - .await; + run_handle_instance_change_test(Arc::new(mock), "../test/json/local-instance.json").await; } #[tokio::test] async fn test_handle_instance_change_for_add_new_shared_instance() { let _ = env_logger::builder().is_test(true).try_init(); - let mut mock = MockKubeInterface::new(); + let mut mock = MockControllerKubeClient::default(); configure_for_handle_instance_change( &mut mock, &HandleInstanceWork { @@ -1033,45 +687,14 @@ mod handle_instance_tests { )), }, ); - run_handle_instance_change_test( - &mut mock, - "../test/json/shared-instance.json", - &InstanceAction::Add, - ) - .await; - } - - #[tokio::test] - async fn test_handle_instance_change_for_remove_running_shared_instance() { - let _ = env_logger::builder().is_test(true).try_init(); - - let mut mock = MockKubeInterface::new(); - configure_for_handle_instance_change( - &mut mock, - &HandleInstanceWork { - find_pods_selector: "akri.sh/instance=config-a-359973", - find_pods_result: "../test/json/running-pod-list-for-config-a-shared.json", - find_pods_phase: None, - find_pods_start_time: None, - find_pods_delete_start_time: false, - config_work: get_config_work(), - deletion_work: Some(configure_deletion_work_for_config_a_359973()), - addition_work: None, - }, - ); - run_handle_instance_change_test( - &mut mock, - "../test/json/shared-instance.json", - &InstanceAction::Remove, - ) - .await; + run_handle_instance_change_test(Arc::new(mock), "../test/json/shared-instance.json").await; } #[tokio::test] async fn test_handle_instance_change_for_update_active_shared_instance() { let _ = env_logger::builder().is_test(true).try_init(); - let mut mock = MockKubeInterface::new(); + let mut mock = MockControllerKubeClient::default(); configure_for_handle_instance_change( &mut mock, &HandleInstanceWork { @@ -1087,12 +710,8 @@ mod handle_instance_tests { )), }, ); - run_handle_instance_change_test( - &mut mock, - "../test/json/shared-instance-update.json", - &InstanceAction::Update, - ) - .await; + run_handle_instance_change_test(Arc::new(mock), "../test/json/shared-instance-update.json") + .await; } #[tokio::test] @@ -1127,7 +746,7 @@ mod handle_instance_tests { }) .collect::>(); - let mut mock = MockKubeInterface::new(); + let mut mock = MockControllerKubeClient::default(); configure_for_handle_instance_change( &mut mock, &HandleInstanceWork { @@ -1143,12 +762,11 @@ mod handle_instance_tests { )), }, ); - run_handle_instance_change_test(&mut mock, instance_file, &InstanceAction::Update).await; + run_handle_instance_change_test(Arc::new(mock), instance_file).await; } /// Checks that the BROKER_POD_COUNT_METRIC is appropriately incremented - /// and decremented when an instance is added and deleted (and pods are - /// created and deleted). Cannot be run in parallel with other tests + /// instance is added and pods are created. Cannot be run in parallel with other tests /// due to the metric being a global variable and modified unpredictably by /// other tests. /// Run with: cargo test -- test_broker_pod_count_metric --ignored @@ -1160,7 +778,7 @@ mod handle_instance_tests { .with_label_values(&["config-a", "node-a"]) .set(0); - let mut mock = MockKubeInterface::new(); + let mut mock = MockControllerKubeClient::default(); configure_for_handle_instance_change( &mut mock, &HandleInstanceWork { @@ -1174,12 +792,7 @@ mod handle_instance_tests { addition_work: Some(configure_add_local_config_a_b494b6(false)), }, ); - run_handle_instance_change_test( - &mut mock, - "../test/json/local-instance.json", - &InstanceAction::Add, - ) - .await; + run_handle_instance_change_test(Arc::new(mock), "../test/json/local-instance.json").await; // Check that broker pod count metric has been incremented to include new pod for this instance assert_eq!( @@ -1188,33 +801,5 @@ mod handle_instance_tests { .get(), 1 ); - - configure_for_handle_instance_change( - &mut mock, - &HandleInstanceWork { - find_pods_selector: "akri.sh/instance=config-a-b494b6", - find_pods_result: "../test/json/running-pod-list-for-config-a-local.json", - find_pods_phase: None, - find_pods_start_time: None, - find_pods_delete_start_time: false, - config_work: get_config_work(), - deletion_work: Some(configure_deletion_work_for_config_a_b494b6()), - addition_work: None, - }, - ); - run_handle_instance_change_test( - &mut mock, - "../test/json/local-instance.json", - &InstanceAction::Remove, - ) - .await; - - // Check that broker pod count metric has been decremented to reflect deleted instance and pod - assert_eq!( - BROKER_POD_COUNT_METRIC - .with_label_values(&["config-a", "node-a"]) - .get(), - 0 - ); } } diff --git a/controller/src/util/mod.rs b/controller/src/util/mod.rs index 4c6953c2d..6fb37f63d 100644 --- a/controller/src/util/mod.rs +++ b/controller/src/util/mod.rs @@ -1,5 +1,27 @@ +pub(crate) mod context; pub mod instance_action; pub mod node_watcher; mod pod_action; pub mod pod_watcher; mod shared_test_utils; + +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum ControllerError { + #[error(transparent)] + KubeError(#[from] kube::Error), + + #[error("Finalizer Error: {0}")] + // NB: awkward type because finalizer::Error embeds the reconciler error (which is this) + // so boxing this error to break cycles + FinalizerError(#[source] Box>), + + #[error("Watcher Error: {0}")] + WatcherError(#[from] kube::runtime::watcher::Error), + + #[error(transparent)] + Other(#[from] anyhow::Error), +} + +pub type Result = std::result::Result; diff --git a/controller/src/util/node_watcher.rs b/controller/src/util/node_watcher.rs index 2b94b3d54..af082eaa7 100644 --- a/controller/src/util/node_watcher.rs +++ b/controller/src/util/node_watcher.rs @@ -1,679 +1,475 @@ -use akri_shared::{ - akri::{ - instance::device_usage::NodeUsage, - instance::{Instance, InstanceSpec}, - retry::{random_delay, MAX_INSTANCE_UPDATE_TRIES}, - }, - k8s, - k8s::KubeInterface, +//! This is used to handle Nodes disappearing. +//! +//! When a Node disapears, make sure that any Instance that +//! references the Node is cleaned. This means that the +//! Instance.nodes property no longer contains the node and +//! that the Instance.deviceUsage property no longer contains +//! slots that are occupied by the node. +use crate::util::{ + context::{NodeState, NodeWatcherContext}, + ControllerError, Result, }; -use futures::{StreamExt, TryStreamExt}; +use akri_shared::akri::instance::{device_usage::NodeUsage, Instance}; +use akri_shared::k8s::api::Api; +use anyhow::Context; use k8s_openapi::api::core::v1::{Node, NodeStatus}; -use kube::api::Api; -use kube_runtime::watcher::{watcher, Config, Event}; -use kube_runtime::WatchStreamExt; -use log::{error, info, trace}; -use std::collections::HashMap; +use kube::{ + api::{ + ListParams, NotUsed, Object, ObjectList, ObjectMeta, Patch, PatchParams, ResourceExt, + TypeMeta, + }, + runtime::{ + controller::Action, + finalizer::{finalizer, Event}, + reflector::Lookup, + }, +}; +use log::{info, trace}; use std::str::FromStr; +use std::{collections::HashMap, sync::Arc}; -/// Node states that NodeWatcher is interested in -/// -/// NodeState describes the various states that the controller can -/// react to for Nodes. -#[derive(Clone, Debug, PartialEq)] -enum NodeState { - /// Node has been seen, but not Running yet - Known, - /// Node has been seen Running - Running, - /// A previously Running Node has been seen as not Running - /// and the Instances have been cleaned of references to that - /// vanished Node - InstancesCleaned, -} +use super::context::ControllerKubeClient; -/// This is used to handle Nodes disappearing. -/// -/// When a Node disapears, make sure that any Instance that -/// references the Node is cleaned. This means that the -/// Instance.nodes property no longer contains the node and -/// that the Instance.deviceUsage property no longer contains -/// slots that are occupied by the node. -pub struct NodeWatcher { - known_nodes: HashMap, -} +pub static NODE_FINALIZER: &str = "akri-node-watcher.kube.rs"; -impl NodeWatcher { - /// Create new instance of BrokerPodWatcher - pub fn new() -> Self { - NodeWatcher { - known_nodes: HashMap::new(), - } +pub async fn check(client: Arc) -> anyhow::Result<()> { + let api: Box> = client.all(); + if let Err(e) = api.list(&ListParams::default().limit(1)).await { + anyhow::bail!("Nodes are not queryable; {e:?}") } + Ok(()) +} - /// This watches for Node events - pub async fn watch( - &mut self, - ) -> Result<(), Box> { - trace!("watch - enter"); - let kube_interface = k8s::KubeImpl::new().await?; - let resource = Api::::all(kube_interface.get_kube_client()); - let watcher = watcher(resource, Config::default()).default_backoff(); - let mut informer = watcher.boxed(); - let mut first_event = true; - - // Currently, this does not handle None except to break the loop. - loop { - let event = match informer.try_next().await { - Err(e) => { - error!("Error during watch: {}", e); - continue; - } - Ok(None) => break, - Ok(Some(event)) => event, - }; - self.handle_node(event, &kube_interface, &mut first_event) - .await?; - } +pub fn error_policy( + _node: Arc, + error: &ControllerError, + _ctx: Arc, +) -> Action { + log::warn!("reconcile failed: {:?}", error); + Action::requeue(std::time::Duration::from_secs(5 * 60)) +} - Ok(()) - } +/// This function is the main Reconcile function for Node resources +/// This will get called every time an Node is added, deleted, or changed, it will also be called for every existing Node on startup. +/// +/// Nodes are constantly updated. Cleanup work for our services only +/// needs to be called once. +/// +/// To achieve this, store each Node's state as either Known (Node has +/// been seen, but not Running), Running (Node has been seen as Running), +/// and InstanceCleaned (previously Running Node has been seen as not +/// Running). +/// +/// When a Node is in the Known state, it is not Running. If it has +/// never been seen as Running, it is likely being created and there is +/// no need to clean any Instance. +/// +/// Once a Node moves through the Running state into a non Running +/// state, it becomes important to clean Instances referencing the +/// non-Running Node. +pub async fn reconcile(node: Arc, ctx: Arc) -> Result { + trace!("Reconciling node {}", node.name_any()); + finalizer( + &ctx.client.clone().all().as_inner(), + NODE_FINALIZER, + node, + |event| reconcile_inner(event, ctx.clone()), + ) + .await + // .map_err(|_e| anyhow!("todo")) + .map_err(|e| ControllerError::FinalizerError(Box::new(e))) +} - /// This takes an event off the Node stream and if a Node is no longer - /// available, it calls handle_node_disappearance. - /// - /// Nodes are constantly updated. Cleanup work for our services only - /// needs to be called once. - /// - /// To achieve this, store each Node's state as either Known (Node has - /// been seen, but not Running), Running (Node has been seen as Running), - /// and InstanceCleaned (previously Running Node has been seen as not - /// Running). - /// - /// When a Node is in the Known state, it is not Running. If it has - /// never been seen as Running, it is likely being created and there is - /// no need to clean any Instance. - /// - /// Once a Node moves through the Running state into a non Running - /// state, it becomes important to clean Instances referencing the - /// non-Running Node. - async fn handle_node( - &mut self, - event: Event, - kube_interface: &impl KubeInterface, - first_event: &mut bool, - ) -> anyhow::Result<()> { - trace!("handle_node - enter"); - match event { - Event::Applied(node) => { - let node_name = node.metadata.name.clone().unwrap(); - info!("handle_node - Added or modified: {}", node_name); - if self.is_node_ready(&node) { - self.known_nodes.insert(node_name, NodeState::Running); - } else if let std::collections::hash_map::Entry::Vacant(e) = - self.known_nodes.entry(node_name) - { +async fn reconcile_inner(event: Event, ctx: Arc) -> Result { + match event { + Event::Apply(node) => { + let node_name = node.name_unchecked(); + info!("handle_node - Added or modified: {}", node_name); + if is_node_ready(&node) { + ctx.known_nodes + .write() + .await + .insert(node_name, NodeState::Running); + } else { + let mut guard = ctx.known_nodes.write().await; + if let std::collections::hash_map::Entry::Vacant(e) = guard.entry(node_name) { e.insert(NodeState::Known); } else { // Node Modified - self.call_handle_node_disappearance_if_needed(&node, kube_interface) - .await?; - } - } - Event::Deleted(node) => { - info!("handle_node - Deleted: {:?}", &node.metadata.name); - self.call_handle_node_disappearance_if_needed(&node, kube_interface) - .await?; - } - Event::Restarted(_nodes) => { - if *first_event { - info!("handle_node - watcher started"); - } else { - return Err(anyhow::anyhow!( - "Node watcher restarted - throwing error to restart controller" - )); + drop(guard); + handle_node_disappearance(&node, ctx.clone()).await?; } } - }; - *first_event = false; - Ok(()) - } - - /// This should be called for Nodes that are either !Ready or Deleted. - /// This function ensures that handle_node_disappearance is called - /// only once for any Node as it disappears. - async fn call_handle_node_disappearance_if_needed( - &mut self, - node: &Node, - kube_interface: &impl KubeInterface, - ) -> anyhow::Result<()> { - let node_name = node.metadata.name.clone().unwrap(); - trace!( - "call_handle_node_disappearance_if_needed - enter: {:?}", - &node.metadata.name - ); - let last_known_state = self - .known_nodes - .get(&node_name) - .unwrap_or(&NodeState::Running); - trace!( - "call_handle_node_disappearance_if_needed - last_known_state: {:?}", - &last_known_state - ); - // Nodes are updated roughly once a minute ... try to only call - // handle_node_disappearance once for a node that disappears. - // - // Also, there is no need to call handle_node_disappearance if a - // Node has never been in the Running state. - if last_known_state == &NodeState::Running { - trace!( - "call_handle_node_disappearance_if_needed - call handle_node_disappearance: {:?}", - &node.metadata.name - ); - self.handle_node_disappearance(&node_name, kube_interface) - .await?; - self.known_nodes - .insert(node_name, NodeState::InstancesCleaned); + Ok(Action::await_change()) + } + Event::Cleanup(node) => { + info!("handle_node - Deleted: {:?}", &node.name_unchecked()); + handle_node_disappearance(&node, ctx.clone()).await?; + ctx.known_nodes.write().await.remove(&node.name_unchecked()); + Ok(Action::await_change()) } - Ok(()) - } - - /// This determines if a node is in the Ready state. - fn is_node_ready(&self, k8s_node: &Node) -> bool { - trace!("is_node_ready - for node {:?}", k8s_node.metadata.name); - k8s_node - .status - .as_ref() - .unwrap_or(&NodeStatus::default()) - .conditions - .as_ref() - .unwrap_or(&Vec::new()) - .iter() - .filter_map(|condition| { - if condition.type_ == "Ready" { - Some(condition.status == "True") - } else { - None - } - }) - .collect::>() - .last() - .unwrap_or(&false) - == &true } +} - /// This handles when a node disappears by clearing nodes from - /// the nodes list and deviceUsage map and then trying 5 times to - /// update the Instance. - async fn handle_node_disappearance( - &self, - vanished_node_name: &str, - kube_interface: &impl KubeInterface, - ) -> anyhow::Result<()> { - trace!( - "handle_node_disappearance - enter vanished_node_name={:?}", - vanished_node_name, - ); - - let instances = kube_interface.get_instances().await?; +/// This should be called for Nodes that are either !Ready or Deleted. +/// This function will clean up any Instances that reference a Node that +/// was previously Running. +async fn handle_node_disappearance( + node: &Node, + ctx: Arc, +) -> anyhow::Result<()> { + let node_name = node.name_unchecked(); + trace!( + "handle_node_disappearance - enter: {:?}", + &node.metadata.name + ); + let last_known_state = ctx + .known_nodes + .read() + .await + .get(&node_name) + .unwrap_or(&NodeState::Running) + .clone(); + trace!( + "handle_node_disappearance - last_known_state: {:?}", + &last_known_state + ); + + // If the node was running and no longer is, clear the node from + // each instance's nodes list and deviceUsage map. + if last_known_state == NodeState::Running { + let api = ctx.client.all(); + let instances: ObjectList = api.list(&ListParams::default()).await?; trace!( "handle_node_disappearance - found {:?} instances", instances.items.len() ); for instance in instances.items { - let instance_name = instance.metadata.name.clone().unwrap(); - let instance_namespace = instance.metadata.namespace.as_ref().ok_or_else(|| { - anyhow::anyhow!("Namespace not found for instance: {}", instance_name) - })?; - - trace!( - "handle_node_disappearance - make sure node is not referenced here: {:?}", - &instance_name - ); - - // Try up to MAX_INSTANCE_UPDATE_TRIES times to update/create/get instance - for x in 0..MAX_INSTANCE_UPDATE_TRIES { - match if x == 0 { - self.try_remove_nodes_from_instance( - vanished_node_name, - &instance_name, - instance_namespace, - &instance, - kube_interface, - ) - .await - } else { - let retry_instance = kube_interface - .find_instance(&instance_name, instance_namespace) - .await?; - self.try_remove_nodes_from_instance( - vanished_node_name, - &instance_name, - instance_namespace, - &retry_instance, - kube_interface, - ) - .await - } { - Ok(_) => break, - Err(e) => { - if x == (MAX_INSTANCE_UPDATE_TRIES - 1) { - return Err(e); - } - random_delay().await; - } - } - } + let instance_name = instance.name_unchecked(); + try_remove_nodes_from_instance(&node_name, &instance_name, &instance, api.as_ref()) + .await?; + api.remove_finalizer(&instance, &node_name).await?; } - - trace!("handle_node_disappearance - exit"); - Ok(()) + ctx.known_nodes + .write() + .await + .insert(node_name.to_string(), NodeState::InstancesCleaned); } + Ok(()) +} - /// This attempts to remove nodes from the nodes list and deviceUsage - /// map in an Instance. An attempt is made to update - /// the instance in etcd, any failure is returned. - async fn try_remove_nodes_from_instance( - &self, - vanished_node_name: &str, - instance_name: &str, - instance_namespace: &str, - instance: &Instance, - kube_interface: &impl KubeInterface, - ) -> Result<(), anyhow::Error> { - trace!( - "try_remove_nodes_from_instance - vanished_node_name: {:?}", - &vanished_node_name - ); - let modified_nodes = instance - .spec - .nodes - .iter() - .filter(|node| &vanished_node_name != node) - .map(|node| node.into()) - .collect::>(); - // Remove nodes from instance.deviceusage - let modified_device_usage = instance - .spec - .device_usage - .iter() - .map(|(slot, usage)| { - let same_node_name = match NodeUsage::from_str(usage) { - Ok(node_usage) => node_usage.is_same_node(vanished_node_name), - Err(_) => false, - }; - - ( - slot.to_string(), - if same_node_name { - NodeUsage::default().to_string() - } else { - usage.into() - }, - ) - }) - .collect::>(); - - // Save the instance - let modified_instance = InstanceSpec { - cdi_name: instance.spec.cdi_name.clone(), - capacity: instance.spec.capacity, - configuration_name: instance.spec.configuration_name.clone(), - broker_properties: instance.spec.broker_properties.clone(), - shared: instance.spec.shared, - device_usage: modified_device_usage, - nodes: modified_nodes, - }; - - trace!( - "handle_node_disappearance - kube_interface.update_instance name: {}, namespace: {}, {:?}", - &instance_name, - &instance_namespace, - &modified_instance - ); +/// This determines if a node is in the Ready state. +fn is_node_ready(k8s_node: &Node) -> bool { + trace!("is_node_ready - for node {:?}", k8s_node.metadata.name); + k8s_node + .status + .as_ref() + .unwrap_or(&NodeStatus::default()) + .conditions + .as_ref() + .unwrap_or(&Vec::new()) + .last() + .map_or(false, |condition| { + condition.type_ == "Ready" && condition.status == "True" + }) +} - kube_interface - .update_instance(&modified_instance, instance_name, instance_namespace) - .await - } +/// This attempts to remove nodes from the nodes list and deviceUsage +/// map in an Instance. An attempt is made to update +/// the instance in etcd, any failure is returned. +async fn try_remove_nodes_from_instance( + vanished_node_name: &str, + instance_name: &str, + instance: &Instance, + api: &dyn Api, +) -> Result<(), anyhow::Error> { + trace!( + "try_remove_nodes_from_instance - vanished_node_name: {:?}", + &vanished_node_name + ); + let modified_nodes = instance + .spec + .nodes + .iter() + .filter(|node| &vanished_node_name != node) + .map(|node| node.into()) + .collect::>(); + // Remove nodes from instance.deviceusage + let modified_device_usage = instance + .spec + .device_usage + .iter() + .map(|(slot, usage)| match NodeUsage::from_str(usage) { + Ok(node_usage) if node_usage.is_same_node(vanished_node_name) => { + (slot.to_owned(), NodeUsage::default().to_string()) + } + Ok(_) => (slot.to_owned(), usage.into()), + Err(_) => (slot.to_owned(), usage.into()), + }) + .collect::>(); + let mut modified_spec = instance.spec.clone(); + modified_spec.nodes = modified_nodes; + modified_spec.device_usage = modified_device_usage; + let patch = Patch::Merge( + serde_json::to_value(Object { + types: Some(TypeMeta { + api_version: Instance::api_version(&()).to_string(), + kind: Instance::kind(&()).to_string(), + }), + status: None::, + spec: modified_spec, + metadata: ObjectMeta { + name: Some(instance_name.to_string()), + ..Default::default() + }, + }) + .context("Could not create instance patch")?, + ); + api.raw_patch(instance_name, &patch, &PatchParams::default()) + .await?; + Ok(()) } #[cfg(test)] mod tests { - use super::super::shared_test_utils::config_for_tests; + use super::super::shared_test_utils::mock_client::MockControllerKubeClient; use super::*; - use akri_shared::{akri::instance::InstanceList, k8s::MockKubeInterface, os::file}; - - #[derive(Clone)] - struct UpdateInstance { - instance_to_update: InstanceSpec, - instance_name: &'static str, - instance_namespace: &'static str, - } - - #[derive(Clone)] - struct HandleNodeDisappearance { - get_instances_result_file: &'static str, - get_instances_result_listify: bool, - update_instance: Option, - } - - fn configure_for_handle_node_disappearance( - mock: &mut MockKubeInterface, - work: &HandleNodeDisappearance, - ) { - config_for_tests::configure_get_instances( - mock, - work.get_instances_result_file, - work.get_instances_result_listify, - ); - - if let Some(update_instance) = &work.update_instance { - config_for_tests::configure_update_instance( - mock, - update_instance.instance_to_update.clone(), - update_instance.instance_name, - update_instance.instance_namespace, - false, - ); - } - } - - // Test that watcher errors on restarts unless it is the first restart (aka initial startup) - #[tokio::test] - async fn test_handle_watcher_restart() { - let _ = env_logger::builder().is_test(true).try_init(); - let mut pod_watcher = NodeWatcher::new(); - let mut first_event = true; - assert!(pod_watcher - .handle_node( - Event::Restarted(Vec::new()), - &MockKubeInterface::new(), - &mut first_event - ) - .await - .is_ok()); - first_event = false; - assert!(pod_watcher - .handle_node( - Event::Restarted(Vec::new()), - &MockKubeInterface::new(), - &mut first_event - ) - .await - .is_err()); - } - - #[tokio::test] - async fn test_handle_node_added_unready() { - let _ = env_logger::builder().is_test(true).try_init(); - let node_json = file::read_file_to_string("../test/json/node-a-not-ready.json"); - let node: Node = serde_json::from_str(&node_json).unwrap(); - let mut node_watcher = NodeWatcher::new(); - node_watcher - .handle_node(Event::Applied(node), &MockKubeInterface::new(), &mut false) - .await - .unwrap(); - - assert_eq!(1, node_watcher.known_nodes.len()); + use akri_shared::{akri::instance::InstanceSpec, k8s::api::MockApi, os::file}; - assert_eq!( - &NodeState::Known, - node_watcher.known_nodes.get("node-a").unwrap() - ) + fn instances_list( + instance_name: &str, + instance_namespace: &str, + ) -> kube::Result> { + let list = serde_json::json!({ + "apiVersion": "v1", + "kind": "List", + "metadata": { + "resourceVersion": "", + "selfLink": "" + }, + "items": [ + { + "apiVersion": "akri.sh/v0", + "kind": "Instance", + "metadata": { + "name": instance_name, + "namespace": instance_namespace, + "uid": "abcdegfh-ijkl-mnop-qrst-uvwxyz012345" + }, + "spec": { + "configurationName": "config-a", + "capacity": 5, + "cdiName": "akri.sh/config-a=359973", + "deviceUsage": { + format!("{instance_name}-0"): "node-b", + format!("{instance_name}-1"): "node-a", + format!("{instance_name}-2"): "node-b", + format!("{instance_name}-3"): "node-a", + format!("{instance_name}-4"): "node-c", + format!("{instance_name}-5"): "" + }, + "nodes": [ "node-a", "node-b", "node-c" ], + "shared": true + } + } + ] + }); + Ok(serde_json::from_value(list).unwrap()) } #[tokio::test] - async fn test_handle_node_added_ready() { + async fn test_reconcile_node_apply_ready() { let _ = env_logger::builder().is_test(true).try_init(); - let node_json = file::read_file_to_string("../test/json/node-a.json"); let node: Node = serde_json::from_str(&node_json).unwrap(); - let mut node_watcher = NodeWatcher::new(); - node_watcher - .handle_node(Event::Applied(node), &MockKubeInterface::new(), &mut false) + let node_name = node.name_unchecked(); + let mut mock = MockControllerKubeClient::default(); + mock.node + .expect_all() + .return_once(|| Box::new(MockApi::new())); + let ctx = Arc::new(NodeWatcherContext::new(Arc::new(mock))); + reconcile_inner(Event::Apply(Arc::new(node)), ctx.clone()) .await .unwrap(); - assert_eq!(1, node_watcher.known_nodes.len()); - assert_eq!( &NodeState::Running, - node_watcher.known_nodes.get("node-a").unwrap() - ) + ctx.known_nodes.read().await.get(&node_name).unwrap() + ); } #[tokio::test] - async fn test_handle_node_modified_unready_unknown() { + async fn test_reconcile_node_apply_unready_unknown() { let _ = env_logger::builder().is_test(true).try_init(); - - let node_json = file::read_file_to_string("../test/json/node-b-not-ready.json"); + let node_json = file::read_file_to_string("../test/json/node-a-not-ready.json"); let node: Node = serde_json::from_str(&node_json).unwrap(); - let mut node_watcher = NodeWatcher::new(); - let instance_file = "../test/json/shared-instance-update.json"; - let instance_json = file::read_file_to_string(instance_file); - let kube_object_instance: Instance = serde_json::from_str(&instance_json).unwrap(); - let mut instance = kube_object_instance.spec; - instance.nodes.clear(); - instance - .device_usage - .insert("config-a-359973-2".to_string(), "".to_string()); - - let mut mock = MockKubeInterface::new(); - configure_for_handle_node_disappearance( - &mut mock, - &HandleNodeDisappearance { - get_instances_result_file: "../test/json/shared-instance-update.json", - get_instances_result_listify: true, - update_instance: Some(UpdateInstance { - instance_to_update: instance, - instance_name: "config-a-359973", - instance_namespace: "config-a-namespace", - }), - }, - ); - // Insert node into list of known_nodes to mock being previously applied - node_watcher - .known_nodes - .insert(node.metadata.name.clone().unwrap(), NodeState::Running); - node_watcher - .handle_node(Event::Applied(node), &mock, &mut false) + let node_name = node.name_unchecked(); + let mut mock = MockControllerKubeClient::default(); + mock.node + .expect_all() + .return_once(|| Box::new(MockApi::new())); + let ctx = Arc::new(NodeWatcherContext::new(Arc::new(mock))); + reconcile_inner(Event::Apply(Arc::new(node)), ctx.clone()) .await .unwrap(); - assert_eq!(1, node_watcher.known_nodes.len()); - assert_eq!( - &NodeState::InstancesCleaned, - node_watcher.known_nodes.get("node-b").unwrap() - ) + &NodeState::Known, + ctx.known_nodes.read().await.get(&node_name).unwrap() + ); } - + // If a known node is modified and is still not ready, it should remain in the known state #[tokio::test] - async fn test_handle_node_modified_ready_unknown() { + async fn test_reconcile_node_apply_unready_known() { let _ = env_logger::builder().is_test(true).try_init(); - - let node_json = file::read_file_to_string("../test/json/node-b.json"); + let node_json = file::read_file_to_string("../test/json/node-a-not-ready.json"); let node: Node = serde_json::from_str(&node_json).unwrap(); - let mut node_watcher = NodeWatcher::new(); - - let mock = MockKubeInterface::new(); - node_watcher - .handle_node(Event::Applied(node), &mock, &mut false) + let node_name = node.name_unchecked(); + let mut mock = MockControllerKubeClient::default(); + mock.node + .expect_all() + .return_once(|| Box::new(MockApi::new())); + let ctx = Arc::new(NodeWatcherContext::new(Arc::new(mock))); + ctx.known_nodes + .write() + .await + .insert(node_name.clone(), NodeState::Known); + reconcile_inner(Event::Apply(Arc::new(node)), ctx.clone()) .await .unwrap(); - assert_eq!(1, node_watcher.known_nodes.len()); - assert_eq!( - &NodeState::Running, - node_watcher.known_nodes.get("node-b").unwrap() - ) + &NodeState::Known, + ctx.known_nodes.read().await.get(&node_name).unwrap() + ); } + // If previously running node is modified and is not ready, it should remove the node from the instances' node lists #[tokio::test] - async fn test_handle_node_deleted_unready_unknown() { + async fn test_reconcile_node_apply_unready_previously_running() { let _ = env_logger::builder().is_test(true).try_init(); - - let node_json = file::read_file_to_string("../test/json/node-b-not-ready.json"); + let node_json = file::read_file_to_string("../test/json/node-a-not-ready.json"); let node: Node = serde_json::from_str(&node_json).unwrap(); - let mut node_watcher = NodeWatcher::new(); - - let instance_file = "../test/json/shared-instance-update.json"; - let instance_json = file::read_file_to_string(instance_file); - let kube_object_instance: Instance = serde_json::from_str(&instance_json).unwrap(); - let mut instance = kube_object_instance.spec; - instance.nodes.clear(); - instance - .device_usage - .insert("config-a-359973-2".to_string(), "".to_string()); - - let mut mock = MockKubeInterface::new(); - configure_for_handle_node_disappearance( - &mut mock, - &HandleNodeDisappearance { - get_instances_result_file: "../test/json/shared-instance-update.json", - get_instances_result_listify: true, - update_instance: Some(UpdateInstance { - instance_to_update: instance, - instance_name: "config-a-359973", - instance_namespace: "config-a-namespace", - }), - }, - ); - - node_watcher - .handle_node(Event::Deleted(node), &mock, &mut false) + let node_name = node.name_unchecked(); + let mut mock = MockControllerKubeClient::default(); + mock.node + .expect_all() + .return_once(|| Box::new(MockApi::new())); + let mut instance_api_mock: MockApi = MockApi::new(); + let instance_name = "config-a-359973"; + instance_api_mock + .expect_list() + .return_once(|_| instances_list(instance_name, "unused")); + instance_api_mock + .expect_raw_patch() + .return_once(|_, _, _| Ok(Instance::new("unused", InstanceSpec::default()))) + .withf(|_, patch, _| match patch { + Patch::Merge(v) => { + let instance: Instance = serde_json::from_value(v.clone()).unwrap(); + !instance.spec.nodes.contains(&"node-a".to_owned()) + } + _ => false, + }); + instance_api_mock + .expect_remove_finalizer() + .returning(|_, _| Ok(())); + mock.instance + .expect_all() + .return_once(move || Box::new(instance_api_mock)); + let ctx = Arc::new(NodeWatcherContext::new(Arc::new(mock))); + ctx.known_nodes + .write() + .await + .insert(node_name.clone(), NodeState::Running); + reconcile_inner(Event::Apply(Arc::new(node)), ctx.clone()) .await .unwrap(); - - assert_eq!(1, node_watcher.known_nodes.len()); - assert_eq!( &NodeState::InstancesCleaned, - node_watcher.known_nodes.get("node-b").unwrap() - ) - } - - const LIST_PREFIX: &str = r#" -{ - "apiVersion": "v1", - "items": ["#; - const LIST_SUFFIX: &str = r#" - ], - "kind": "List", - "metadata": { - "resourceVersion": "", - "selfLink": "" - } -}"#; - fn listify_node(node_json: &str) -> String { - format!("{}\n{}\n{}", LIST_PREFIX, node_json, LIST_SUFFIX) + ctx.known_nodes.read().await.get(&node_name).unwrap() + ); } + // If previously running node enters the cleanup state, it should remove the node from the instances' node lists + // and ensure that the node is removed from the known_nodes #[tokio::test] - async fn test_handle_node_disappearance_update_failure_retries() { + async fn test_reconcile_node_cleanup() { let _ = env_logger::builder().is_test(true).try_init(); - - let mut mock = MockKubeInterface::new(); - mock.expect_get_instances().times(1).returning(move || { - let instance_file = "../test/json/shared-instance-update.json"; - let instance_json = file::read_file_to_string(instance_file); - let instance_list_json = listify_node(&instance_json); - let list: InstanceList = serde_json::from_str(&instance_list_json).unwrap(); - Ok(list) - }); - mock.expect_update_instance() - .times(MAX_INSTANCE_UPDATE_TRIES as usize) - .withf(move |_instance, n, ns| n == "config-a-359973" && ns == "config-a-namespace") - .returning(move |_, _, _| Err(None.ok_or_else(|| anyhow::anyhow!("failure"))?)); - mock.expect_find_instance() - .times((MAX_INSTANCE_UPDATE_TRIES - 1) as usize) - .withf(move |n, ns| n == "config-a-359973" && ns == "config-a-namespace") - .returning(move |_, _| { - let instance_file = "../test/json/shared-instance-update.json"; - let instance_json = file::read_file_to_string(instance_file); - let instance: Instance = serde_json::from_str(&instance_json).unwrap(); - Ok(instance) + let node_json = file::read_file_to_string("../test/json/node-a-not-ready.json"); + let node: Node = serde_json::from_str(&node_json).unwrap(); + let node_name = node.name_unchecked(); + let mut mock = MockControllerKubeClient::default(); + mock.node + .expect_all() + .return_once(|| Box::new(MockApi::new())); + let mut instance_api_mock: MockApi = MockApi::new(); + let instance_name = "config-a-359973"; + instance_api_mock + .expect_list() + .return_once(|_| instances_list(instance_name, "unused")); + instance_api_mock + .expect_raw_patch() + .return_once(|_, _, _| Ok(Instance::new("unused", InstanceSpec::default()))) + .withf(|_, patch, _| match patch { + Patch::Merge(v) => { + let instance: Instance = serde_json::from_value(v.clone()).unwrap(); + !instance.spec.nodes.contains(&"node-a".to_owned()) + } + _ => false, }); - - let node_watcher = NodeWatcher::new(); - assert!(node_watcher - .handle_node_disappearance("foo-a", &mock) + instance_api_mock + .expect_remove_finalizer() + .returning(|_, _| Ok(())); + mock.instance + .expect_all() + .return_once(move || Box::new(instance_api_mock)); + let ctx = Arc::new(NodeWatcherContext::new(Arc::new(mock))); + ctx.known_nodes + .write() + .await + .insert(node_name.clone(), NodeState::Running); + reconcile_inner(Event::Cleanup(Arc::new(node)), ctx.clone()) .await - .is_err()); + .unwrap(); + assert!(ctx.known_nodes.read().await.get(&node_name).is_none()); } + // If unknown node is deleted, it should remove the node from the instances' node lists #[tokio::test] - async fn test_try_remove_nodes_from_instance() { + async fn test_reconcile_node_cleanup_unknown() { let _ = env_logger::builder().is_test(true).try_init(); - - let instance_file = "../test/json/shared-instance-update.json"; - let instance_json = file::read_file_to_string(instance_file); - let kube_object_instance: Instance = serde_json::from_str(&instance_json).unwrap(); - - let mut mock = MockKubeInterface::new(); - mock.expect_update_instance() - .times(1) - .withf(move |ins, n, ns| { - n == "config-a" - && ns == "config-a-namespace" - && !ins.nodes.contains(&"node-b".to_string()) - && ins - .device_usage - .iter() - .filter_map(|(_slot, value)| { - if value == &"node-b".to_string() { - Some(value.to_string()) - } else { - None - } - }) - .collect::>() - .first() - .is_none() - }) - .returning(move |_, _, _| Ok(())); - - let node_watcher = NodeWatcher::new(); - assert!(node_watcher - .try_remove_nodes_from_instance( - "node-b", - "config-a", - "config-a-namespace", - &kube_object_instance, - &mock, - ) + let node_json = file::read_file_to_string("../test/json/node-a-not-ready.json"); + let node: Node = serde_json::from_str(&node_json).unwrap(); + let node_name = node.name_unchecked(); + let mut mock = MockControllerKubeClient::default(); + mock.node + .expect_all() + .return_once(|| Box::new(MockApi::new())); + let mut instance_api_mock: MockApi = MockApi::new(); + let instance_name = "config-a-359973"; + instance_api_mock + .expect_list() + .return_once(|_| instances_list(instance_name, "unused")); + instance_api_mock + .expect_raw_patch() + .return_once(|_, _, _| Ok(Instance::new("unused", InstanceSpec::default()))) + .withf(|_, patch, _| match patch { + Patch::Merge(v) => { + let instance: Instance = serde_json::from_value(v.clone()).unwrap(); + !instance.spec.nodes.contains(&"node-a".to_owned()) + } + _ => false, + }); + instance_api_mock + .expect_remove_finalizer() + .returning(|_, _| Ok(())); + mock.instance + .expect_all() + .return_once(move || Box::new(instance_api_mock)); + let ctx = Arc::new(NodeWatcherContext::new(Arc::new(mock))); + reconcile_inner(Event::Cleanup(Arc::new(node)), ctx.clone()) .await - .is_ok()); - } - - #[test] - fn test_is_node_ready_ready() { - let _ = env_logger::builder().is_test(true).try_init(); - - let tests = [ - ("../test/json/node-a.json", true), - ("../test/json/node-a-not-ready.json", false), - ("../test/json/node-a-no-conditions.json", false), - ("../test/json/node-a-no-ready-condition.json", false), - ]; - - for (node_file, result) in tests.iter() { - trace!( - "Testing {} should reflect node is ready={}", - node_file, - result - ); - - let node_json = file::read_file_to_string(node_file); - let kube_object_node: Node = serde_json::from_str(&node_json).unwrap(); - - let node_watcher = NodeWatcher::new(); - assert_eq!( - result.clone(), - node_watcher.is_node_ready(&kube_object_node) - ); - } + .unwrap(); + assert!(ctx.known_nodes.read().await.get(&node_name).is_none()); } } diff --git a/controller/src/util/pod_action.rs b/controller/src/util/pod_action.rs index 71b5172b6..ad2777622 100644 --- a/controller/src/util/pod_action.rs +++ b/controller/src/util/pod_action.rs @@ -1,4 +1,3 @@ -use super::instance_action::InstanceAction; use chrono::Utc; use k8s_openapi::apimachinery::pkg::apis::meta::v1::Time; @@ -23,7 +22,6 @@ pub enum PodAction { /// a broker Pod. /// /// The action to take is based on several factors: -/// 1. what the InstanceAction is (Add, Delete, Modify) /// 1. what phase the Pod is in (Running, Pending, etc) /// 1. when the Pod started /// 1. the relevant grace time @@ -32,10 +30,9 @@ pub struct PodActionInfo { pub pending_grace_time_in_minutes: i64, pub ended_grace_time_in_minutes: i64, pub phase: String, - pub instance_action: InstanceAction, pub status_start_time: Option