From 3082e502578a69c27f52426d3527b90d3620a3d1 Mon Sep 17 00:00:00 2001 From: Kate Goldenring Date: Wed, 2 Oct 2024 11:14:02 -0700 Subject: [PATCH] Cargo fmt code and apply lint suggestions Signed-off-by: Kate Goldenring --- Cross.toml | 11 +- .../device_plugin_instance_controller.rs | 2 +- agent/src/plugin_manager/v1.rs | 131 +++---- agent/src/plugin_manager/v1beta1.rs | 361 ++++++------------ controller/src/util/node_watcher.rs | 10 +- controller/src/util/pod_watcher.rs | 50 +-- .../onvif/src/credential_store.rs | 4 +- .../onvif/src/discovery_utils.rs | 6 +- .../opcua/src/discovery_handler.rs | 2 +- .../udev/src/discovery_handler.rs | 2 +- discovery-handlers/udev/src/discovery_impl.rs | 6 +- discovery-utils/src/discovery/v0.rs | 176 ++++----- .../udev-video-broker/src/util/camera.rs | 63 ++- shared/src/k8s/api.rs | 1 + shared/src/k8s/job.rs | 12 +- shared/src/k8s/pod.rs | 254 ++++++------ shared/src/k8s/service.rs | 354 ++++++++--------- shared/src/os/env_var.rs | 4 +- 18 files changed, 605 insertions(+), 844 deletions(-) diff --git a/Cross.toml b/Cross.toml index 9db2e1525..8adc2fa45 100644 --- a/Cross.toml +++ b/Cross.toml @@ -3,4 +3,13 @@ passthrough = [ "PKG_CONFIG_ALLOW_CROSS", "RUST_LOG", "CARGO_INCREMENTAL", -] \ No newline at end of file +] + +[target.x86_64-unknown-linux-gnu] +image = "ghcr.io/project-akri/akri/rust-crossbuild:x86_64-unknown-linux-gnu-0.1.16-0.0.7" + +[target.armv7-unknown-linux-gnueabihf] +image = "ghcr.io/project-akri/akri/rust-crossbuild:armv7-unknown-linux-gnueabihf-0.1.16-0.0.7" + +[target.aarch64-unknown-linux-gnu] +image = "ghcr.io/project-akri/akri/rust-crossbuild:aarch64-unknown-linux-gnu-0.1.16-0.0.7" \ No newline at end of file diff --git a/agent/src/plugin_manager/device_plugin_instance_controller.rs b/agent/src/plugin_manager/device_plugin_instance_controller.rs index 45a26b8b2..097321b3d 100644 --- a/agent/src/plugin_manager/device_plugin_instance_controller.rs +++ b/agent/src/plugin_manager/device_plugin_instance_controller.rs @@ -174,7 +174,7 @@ impl InstanceDevicePlugin { let mut modified = false; for (k, v) in new_slots.iter() { if current[*k] != *v { - current[*k] = v.to_owned(); + v.clone_into(&mut current[*k]); modified = true; } } diff --git a/agent/src/plugin_manager/v1.rs b/agent/src/plugin_manager/v1.rs index f016fb20b..7f3183f3f 100644 --- a/agent/src/plugin_manager/v1.rs +++ b/agent/src/plugin_manager/v1.rs @@ -87,8 +87,8 @@ pub struct NumaNode { /// Generated client implementations. pub mod pod_resources_lister_client { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] - use tonic::codegen::*; use tonic::codegen::http::Uri; + use tonic::codegen::*; /// PodResourcesLister is a service provided by the kubelet that provides information about the /// node resources consumed by pods and containers on the node #[derive(Debug, Clone)] @@ -134,9 +134,8 @@ pub mod pod_resources_lister_client { >::ResponseBody, >, >, - , - >>::Error: Into + Send + Sync, + >>::Error: + Into + Send + Sync, { PodResourcesListerClient::new(InterceptedService::new(inner, interceptor)) } @@ -174,23 +173,16 @@ pub mod pod_resources_lister_client { pub async fn list( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result< - tonic::Response, - tonic::Status, - > { - self.inner - .ready() - .await - .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; + ) -> std::result::Result, tonic::Status> + { + self.inner.ready().await.map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static( - "/v1.PodResourcesLister/List", - ); + let path = http::uri::PathAndQuery::from_static("/v1.PodResourcesLister/List"); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("v1.PodResourcesLister", "List")); @@ -199,28 +191,23 @@ pub mod pod_resources_lister_client { pub async fn get_allocatable_resources( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result< - tonic::Response, - tonic::Status, - > { - self.inner - .ready() - .await - .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; + ) -> std::result::Result, tonic::Status> + { + self.inner.ready().await.map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); let path = http::uri::PathAndQuery::from_static( "/v1.PodResourcesLister/GetAllocatableResources", ); let mut req = request.into_request(); - req.extensions_mut() - .insert( - GrpcMethod::new("v1.PodResourcesLister", "GetAllocatableResources"), - ); + req.extensions_mut().insert(GrpcMethod::new( + "v1.PodResourcesLister", + "GetAllocatableResources", + )); self.inner.unary(req, path, codec).await } } @@ -235,17 +222,11 @@ pub mod pod_resources_lister_server { async fn list( &self, request: tonic::Request, - ) -> std::result::Result< - tonic::Response, - tonic::Status, - >; + ) -> std::result::Result, tonic::Status>; async fn get_allocatable_resources( &self, request: tonic::Request, - ) -> std::result::Result< - tonic::Response, - tonic::Status, - >; + ) -> std::result::Result, tonic::Status>; } /// PodResourcesLister is a service provided by the kubelet that provides information about the /// node resources consumed by pods and containers on the node @@ -272,10 +253,7 @@ pub mod pod_resources_lister_server { max_encoding_message_size: None, } } - pub fn with_interceptor( - inner: T, - interceptor: F, - ) -> InterceptedService + pub fn with_interceptor(inner: T, interceptor: F) -> InterceptedService where F: tonic::service::Interceptor, { @@ -331,15 +309,11 @@ pub mod pod_resources_lister_server { "/v1.PodResourcesLister/List" => { #[allow(non_camel_case_types)] struct ListSvc(pub Arc); - impl< - T: PodResourcesLister, - > tonic::server::UnaryService - for ListSvc { + impl + tonic::server::UnaryService for ListSvc + { type Response = super::ListPodResourcesResponse; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; + type Future = BoxFuture, tonic::Status>; fn call( &mut self, request: tonic::Request, @@ -377,15 +351,12 @@ pub mod pod_resources_lister_server { "/v1.PodResourcesLister/GetAllocatableResources" => { #[allow(non_camel_case_types)] struct GetAllocatableResourcesSvc(pub Arc); - impl< - T: PodResourcesLister, - > tonic::server::UnaryService - for GetAllocatableResourcesSvc { + impl + tonic::server::UnaryService + for GetAllocatableResourcesSvc + { type Response = super::AllocatableResourcesResponse; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; + type Future = BoxFuture, tonic::Status>; fn call( &mut self, request: tonic::Request, @@ -393,10 +364,9 @@ pub mod pod_resources_lister_server { let inner = Arc::clone(&self.0); let fut = async move { ::get_allocatable_resources( - &inner, - request, - ) - .await + &inner, request, + ) + .await }; Box::pin(fut) } @@ -424,18 +394,14 @@ pub mod pod_resources_lister_server { }; Box::pin(fut) } - _ => { - Box::pin(async move { - Ok( - http::Response::builder() - .status(200) - .header("grpc-status", "12") - .header("content-type", "application/grpc") - .body(empty_body()) - .unwrap(), - ) - }) - } + _ => Box::pin(async move { + Ok(http::Response::builder() + .status(200) + .header("grpc-status", "12") + .header("content-type", "application/grpc") + .body(empty_body()) + .unwrap()) + }), } } } @@ -461,8 +427,7 @@ pub mod pod_resources_lister_server { write!(f, "{:?}", self.0) } } - impl tonic::server::NamedService - for PodResourcesListerServer { + impl tonic::server::NamedService for PodResourcesListerServer { const NAME: &'static str = "v1.PodResourcesLister"; } } diff --git a/agent/src/plugin_manager/v1beta1.rs b/agent/src/plugin_manager/v1beta1.rs index 7b068ecab..a88570f4d 100644 --- a/agent/src/plugin_manager/v1beta1.rs +++ b/agent/src/plugin_manager/v1beta1.rs @@ -96,9 +96,7 @@ pub struct PreStartContainerResponse {} #[derive(Clone, PartialEq, ::prost::Message)] pub struct PreferredAllocationRequest { #[prost(message, repeated, tag = "1")] - pub container_requests: ::prost::alloc::vec::Vec< - ContainerPreferredAllocationRequest, - >, + pub container_requests: ::prost::alloc::vec::Vec, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -108,9 +106,7 @@ pub struct ContainerPreferredAllocationRequest { pub available_device_i_ds: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, /// List of deviceIDs that must be included in the preferred allocation #[prost(string, repeated, tag = "2")] - pub must_include_device_i_ds: ::prost::alloc::vec::Vec< - ::prost::alloc::string::String, - >, + pub must_include_device_i_ds: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, /// Number of devices to include in the preferred allocation #[prost(int32, tag = "3")] pub allocation_size: i32, @@ -121,9 +117,7 @@ pub struct ContainerPreferredAllocationRequest { #[derive(Clone, PartialEq, ::prost::Message)] pub struct PreferredAllocationResponse { #[prost(message, repeated, tag = "1")] - pub container_responses: ::prost::alloc::vec::Vec< - ContainerPreferredAllocationResponse, - >, + pub container_responses: ::prost::alloc::vec::Vec, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -168,10 +162,8 @@ pub struct AllocateResponse { pub struct ContainerAllocateResponse { /// List of environment variable to be set in the container to access one of more devices. #[prost(map = "string, string", tag = "1")] - pub envs: ::std::collections::HashMap< - ::prost::alloc::string::String, - ::prost::alloc::string::String, - >, + pub envs: + ::std::collections::HashMap<::prost::alloc::string::String, ::prost::alloc::string::String>, /// Mounts for the container. #[prost(message, repeated, tag = "2")] pub mounts: ::prost::alloc::vec::Vec, @@ -180,10 +172,8 @@ pub struct ContainerAllocateResponse { pub devices: ::prost::alloc::vec::Vec, /// Container annotations to pass to the container runtime #[prost(map = "string, string", tag = "4")] - pub annotations: ::std::collections::HashMap< - ::prost::alloc::string::String, - ::prost::alloc::string::String, - >, + pub annotations: + ::std::collections::HashMap<::prost::alloc::string::String, ::prost::alloc::string::String>, } /// Mount specifies a host volume to mount into a container. /// where device library or tools are installed on host and container @@ -220,8 +210,8 @@ pub struct DeviceSpec { /// Generated client implementations. pub mod registration_client { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] - use tonic::codegen::*; use tonic::codegen::http::Uri; + use tonic::codegen::*; /// Registration is the service advertised by the Kubelet /// Only when Kubelet answers with a success code to a Register Request /// may Device Plugins start their service @@ -271,9 +261,8 @@ pub mod registration_client { >::ResponseBody, >, >, - , - >>::Error: Into + Send + Sync, + >>::Error: + Into + Send + Sync, { RegistrationClient::new(InterceptedService::new(inner, interceptor)) } @@ -312,19 +301,14 @@ pub mod registration_client { &mut self, request: impl tonic::IntoRequest, ) -> std::result::Result, tonic::Status> { - self.inner - .ready() - .await - .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; + self.inner.ready().await.map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static( - "/v1beta1.Registration/Register", - ); + let path = http::uri::PathAndQuery::from_static("/v1beta1.Registration/Register"); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("v1beta1.Registration", "Register")); @@ -335,8 +319,8 @@ pub mod registration_client { /// Generated client implementations. pub mod device_plugin_client { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] - use tonic::codegen::*; use tonic::codegen::http::Uri; + use tonic::codegen::*; /// DevicePlugin is the service advertised by Device Plugins #[derive(Debug, Clone)] pub struct DevicePluginClient { @@ -381,9 +365,8 @@ pub mod device_plugin_client { >::ResponseBody, >, >, - , - >>::Error: Into + Send + Sync, + >>::Error: + Into + Send + Sync, { DevicePluginClient::new(InterceptedService::new(inner, interceptor)) } @@ -423,28 +406,23 @@ pub mod device_plugin_client { pub async fn get_device_plugin_options( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result< - tonic::Response, - tonic::Status, - > { - self.inner - .ready() - .await - .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; + ) -> std::result::Result, tonic::Status> + { + self.inner.ready().await.map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); let path = http::uri::PathAndQuery::from_static( "/v1beta1.DevicePlugin/GetDevicePluginOptions", ); let mut req = request.into_request(); - req.extensions_mut() - .insert( - GrpcMethod::new("v1beta1.DevicePlugin", "GetDevicePluginOptions"), - ); + req.extensions_mut().insert(GrpcMethod::new( + "v1beta1.DevicePlugin", + "GetDevicePluginOptions", + )); self.inner.unary(req, path, codec).await } /// ListAndWatch returns a stream of List of Devices @@ -457,19 +435,14 @@ pub mod device_plugin_client { tonic::Response>, tonic::Status, > { - self.inner - .ready() - .await - .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; + self.inner.ready().await.map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static( - "/v1beta1.DevicePlugin/ListAndWatch", - ); + let path = http::uri::PathAndQuery::from_static("/v1beta1.DevicePlugin/ListAndWatch"); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("v1beta1.DevicePlugin", "ListAndWatch")); @@ -483,28 +456,23 @@ pub mod device_plugin_client { pub async fn get_preferred_allocation( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result< - tonic::Response, - tonic::Status, - > { - self.inner - .ready() - .await - .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; + ) -> std::result::Result, tonic::Status> + { + self.inner.ready().await.map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); let path = http::uri::PathAndQuery::from_static( "/v1beta1.DevicePlugin/GetPreferredAllocation", ); let mut req = request.into_request(); - req.extensions_mut() - .insert( - GrpcMethod::new("v1beta1.DevicePlugin", "GetPreferredAllocation"), - ); + req.extensions_mut().insert(GrpcMethod::new( + "v1beta1.DevicePlugin", + "GetPreferredAllocation", + )); self.inner.unary(req, path, codec).await } /// Allocate is called during container creation so that the Device @@ -513,23 +481,15 @@ pub mod device_plugin_client { pub async fn allocate( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result< - tonic::Response, - tonic::Status, - > { - self.inner - .ready() - .await - .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; + ) -> std::result::Result, tonic::Status> { + self.inner.ready().await.map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static( - "/v1beta1.DevicePlugin/Allocate", - ); + let path = http::uri::PathAndQuery::from_static("/v1beta1.DevicePlugin/Allocate"); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("v1beta1.DevicePlugin", "Allocate")); @@ -541,23 +501,17 @@ pub mod device_plugin_client { pub async fn pre_start_container( &mut self, request: impl tonic::IntoRequest, - ) -> std::result::Result< - tonic::Response, - tonic::Status, - > { - self.inner - .ready() - .await - .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; + ) -> std::result::Result, tonic::Status> + { + self.inner.ready().await.map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static( - "/v1beta1.DevicePlugin/PreStartContainer", - ); + let path = + http::uri::PathAndQuery::from_static("/v1beta1.DevicePlugin/PreStartContainer"); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("v1beta1.DevicePlugin", "PreStartContainer")); @@ -606,10 +560,7 @@ pub mod registration_server { max_encoding_message_size: None, } } - pub fn with_interceptor( - inner: T, - interceptor: F, - ) -> InterceptedService + pub fn with_interceptor(inner: T, interceptor: F) -> InterceptedService where F: tonic::service::Interceptor, { @@ -665,23 +616,16 @@ pub mod registration_server { "/v1beta1.Registration/Register" => { #[allow(non_camel_case_types)] struct RegisterSvc(pub Arc); - impl< - T: Registration, - > tonic::server::UnaryService - for RegisterSvc { + impl tonic::server::UnaryService for RegisterSvc { type Response = super::Empty; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; + type Future = BoxFuture, tonic::Status>; fn call( &mut self, request: tonic::Request, ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { - ::register(&inner, request).await - }; + let fut = + async move { ::register(&inner, request).await }; Box::pin(fut) } } @@ -708,18 +652,14 @@ pub mod registration_server { }; Box::pin(fut) } - _ => { - Box::pin(async move { - Ok( - http::Response::builder() - .status(200) - .header("grpc-status", "12") - .header("content-type", "application/grpc") - .body(empty_body()) - .unwrap(), - ) - }) - } + _ => Box::pin(async move { + Ok(http::Response::builder() + .status(200) + .header("grpc-status", "12") + .header("content-type", "application/grpc") + .body(empty_body()) + .unwrap()) + }), } } } @@ -761,15 +701,11 @@ pub mod device_plugin_server { async fn get_device_plugin_options( &self, request: tonic::Request, - ) -> std::result::Result< - tonic::Response, - tonic::Status, - >; + ) -> std::result::Result, tonic::Status>; /// Server streaming response type for the ListAndWatch method. type ListAndWatchStream: tonic::codegen::tokio_stream::Stream< Item = std::result::Result, - > - + Send + > + Send + 'static; /// ListAndWatch returns a stream of List of Devices /// Whenever a Device state change or a Device disappears, ListAndWatch @@ -777,10 +713,7 @@ pub mod device_plugin_server { async fn list_and_watch( &self, request: tonic::Request, - ) -> std::result::Result< - tonic::Response, - tonic::Status, - >; + ) -> std::result::Result, tonic::Status>; /// GetPreferredAllocation returns a preferred set of devices to allocate /// from a list of available ones. The resulting preferred allocation is not /// guaranteed to be the allocation ultimately performed by the @@ -789,30 +722,21 @@ pub mod device_plugin_server { async fn get_preferred_allocation( &self, request: tonic::Request, - ) -> std::result::Result< - tonic::Response, - tonic::Status, - >; + ) -> std::result::Result, tonic::Status>; /// Allocate is called during container creation so that the Device /// Plugin can run device specific operations and instruct Kubelet /// of the steps to make the Device available in the container async fn allocate( &self, request: tonic::Request, - ) -> std::result::Result< - tonic::Response, - tonic::Status, - >; + ) -> std::result::Result, tonic::Status>; /// PreStartContainer is called, if indicated by Device Plugin during registeration phase, /// before each container start. Device plugin can run device specific operations /// such as resetting the device before making devices available to the container async fn pre_start_container( &self, request: tonic::Request, - ) -> std::result::Result< - tonic::Response, - tonic::Status, - >; + ) -> std::result::Result, tonic::Status>; } /// DevicePlugin is the service advertised by Device Plugins #[derive(Debug)] @@ -838,10 +762,7 @@ pub mod device_plugin_server { max_encoding_message_size: None, } } - pub fn with_interceptor( - inner: T, - interceptor: F, - ) -> InterceptedService + pub fn with_interceptor(inner: T, interceptor: F) -> InterceptedService where F: tonic::service::Interceptor, { @@ -897,23 +818,13 @@ pub mod device_plugin_server { "/v1beta1.DevicePlugin/GetDevicePluginOptions" => { #[allow(non_camel_case_types)] struct GetDevicePluginOptionsSvc(pub Arc); - impl tonic::server::UnaryService - for GetDevicePluginOptionsSvc { + impl tonic::server::UnaryService for GetDevicePluginOptionsSvc { type Response = super::DevicePluginOptions; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; - fn call( - &mut self, - request: tonic::Request, - ) -> Self::Future { + type Future = BoxFuture, tonic::Status>; + fn call(&mut self, request: tonic::Request) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { - ::get_device_plugin_options( - &inner, - request, - ) + ::get_device_plugin_options(&inner, request) .await }; Box::pin(fut) @@ -945,20 +856,12 @@ pub mod device_plugin_server { "/v1beta1.DevicePlugin/ListAndWatch" => { #[allow(non_camel_case_types)] struct ListAndWatchSvc(pub Arc); - impl< - T: DevicePlugin, - > tonic::server::ServerStreamingService - for ListAndWatchSvc { + impl tonic::server::ServerStreamingService for ListAndWatchSvc { type Response = super::ListAndWatchResponse; type ResponseStream = T::ListAndWatchStream; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; - fn call( - &mut self, - request: tonic::Request, - ) -> Self::Future { + type Future = + BoxFuture, tonic::Status>; + fn call(&mut self, request: tonic::Request) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { ::list_and_watch(&inner, request).await @@ -992,26 +895,19 @@ pub mod device_plugin_server { "/v1beta1.DevicePlugin/GetPreferredAllocation" => { #[allow(non_camel_case_types)] struct GetPreferredAllocationSvc(pub Arc); - impl< - T: DevicePlugin, - > tonic::server::UnaryService - for GetPreferredAllocationSvc { + impl + tonic::server::UnaryService + for GetPreferredAllocationSvc + { type Response = super::PreferredAllocationResponse; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; + type Future = BoxFuture, tonic::Status>; fn call( &mut self, request: tonic::Request, ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { - ::get_preferred_allocation( - &inner, - request, - ) - .await + ::get_preferred_allocation(&inner, request).await }; Box::pin(fut) } @@ -1042,23 +938,16 @@ pub mod device_plugin_server { "/v1beta1.DevicePlugin/Allocate" => { #[allow(non_camel_case_types)] struct AllocateSvc(pub Arc); - impl< - T: DevicePlugin, - > tonic::server::UnaryService - for AllocateSvc { + impl tonic::server::UnaryService for AllocateSvc { type Response = super::AllocateResponse; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; + type Future = BoxFuture, tonic::Status>; fn call( &mut self, request: tonic::Request, ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { - ::allocate(&inner, request).await - }; + let fut = + async move { ::allocate(&inner, request).await }; Box::pin(fut) } } @@ -1088,23 +977,19 @@ pub mod device_plugin_server { "/v1beta1.DevicePlugin/PreStartContainer" => { #[allow(non_camel_case_types)] struct PreStartContainerSvc(pub Arc); - impl< - T: DevicePlugin, - > tonic::server::UnaryService - for PreStartContainerSvc { + impl + tonic::server::UnaryService + for PreStartContainerSvc + { type Response = super::PreStartContainerResponse; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; + type Future = BoxFuture, tonic::Status>; fn call( &mut self, request: tonic::Request, ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { - ::pre_start_container(&inner, request) - .await + ::pre_start_container(&inner, request).await }; Box::pin(fut) } @@ -1132,18 +1017,14 @@ pub mod device_plugin_server { }; Box::pin(fut) } - _ => { - Box::pin(async move { - Ok( - http::Response::builder() - .status(200) - .header("grpc-status", "12") - .header("content-type", "application/grpc") - .body(empty_body()) - .unwrap(), - ) - }) - } + _ => Box::pin(async move { + Ok(http::Response::builder() + .status(200) + .header("grpc-status", "12") + .header("content-type", "application/grpc") + .body(empty_body()) + .unwrap()) + }), } } } diff --git a/controller/src/util/node_watcher.rs b/controller/src/util/node_watcher.rs index 93f53baa5..2b94b3d54 100644 --- a/controller/src/util/node_watcher.rs +++ b/controller/src/util/node_watcher.rs @@ -419,7 +419,7 @@ mod tests { assert_eq!( &NodeState::Known, - node_watcher.known_nodes.get(&"node-a".to_string()).unwrap() + node_watcher.known_nodes.get("node-a").unwrap() ) } @@ -439,7 +439,7 @@ mod tests { assert_eq!( &NodeState::Running, - node_watcher.known_nodes.get(&"node-a".to_string()).unwrap() + node_watcher.known_nodes.get("node-a").unwrap() ) } @@ -485,7 +485,7 @@ mod tests { assert_eq!( &NodeState::InstancesCleaned, - node_watcher.known_nodes.get(&"node-b".to_string()).unwrap() + node_watcher.known_nodes.get("node-b").unwrap() ) } @@ -507,7 +507,7 @@ mod tests { assert_eq!( &NodeState::Running, - node_watcher.known_nodes.get(&"node-b".to_string()).unwrap() + node_watcher.known_nodes.get("node-b").unwrap() ) } @@ -551,7 +551,7 @@ mod tests { assert_eq!( &NodeState::InstancesCleaned, - node_watcher.known_nodes.get(&"node-b".to_string()).unwrap() + node_watcher.known_nodes.get("node-b").unwrap() ) } diff --git a/controller/src/util/pod_watcher.rs b/controller/src/util/pod_watcher.rs index b71b3fc4e..9ebf4a241 100644 --- a/controller/src/util/pod_watcher.rs +++ b/controller/src/util/pod_watcher.rs @@ -834,10 +834,7 @@ mod tests { assert_eq!(1, pod_watcher.known_pods.len()); assert_eq!( &PodState::Pending, - pod_watcher - .known_pods - .get(&"config-a-b494b6-pod".to_string()) - .unwrap() + pod_watcher.known_pods.get("config-a-b494b6-pod").unwrap() ) } } @@ -868,10 +865,7 @@ mod tests { assert_eq!(1, pod_watcher.known_pods.len()); assert_eq!( &PodState::Pending, - pod_watcher - .known_pods - .get(&"config-a-b494b6-pod".to_string()) - .unwrap() + pod_watcher.known_pods.get("config-a-b494b6-pod").unwrap() ) } } @@ -927,10 +921,7 @@ mod tests { assert_eq!(1, pod_watcher.known_pods.len()); assert_eq!( &PodState::Running, - pod_watcher - .known_pods - .get(&"config-a-b494b6-pod".to_string()) - .unwrap() + pod_watcher.known_pods.get("config-a-b494b6-pod").unwrap() ) } @@ -985,10 +976,7 @@ mod tests { assert_eq!(1, pod_watcher.known_pods.len()); assert_eq!( &PodState::Running, - pod_watcher - .known_pods - .get(&"config-a-b494b6-pod".to_string()) - .unwrap() + pod_watcher.known_pods.get("config-a-b494b6-pod").unwrap() ) } @@ -1047,10 +1035,7 @@ mod tests { assert_eq!(1, pod_watcher.known_pods.len()); assert_eq!( &PodState::Ended, - pod_watcher - .known_pods - .get(&"config-a-b494b6-pod".to_string()) - .unwrap() + pod_watcher.known_pods.get("config-a-b494b6-pod").unwrap() ) } @@ -1109,10 +1094,7 @@ mod tests { assert_eq!(1, pod_watcher.known_pods.len()); assert_eq!( &PodState::Deleted, - pod_watcher - .known_pods - .get(&"config-a-b494b6-pod".to_string()) - .unwrap() + pod_watcher.known_pods.get("config-a-b494b6-pod").unwrap() ) } @@ -1171,10 +1153,7 @@ mod tests { assert_eq!(1, pod_watcher.known_pods.len()); assert_eq!( &PodState::Ended, - pod_watcher - .known_pods - .get(&"config-a-b494b6-pod".to_string()) - .unwrap() + pod_watcher.known_pods.get("config-a-b494b6-pod").unwrap() ) } @@ -1247,10 +1226,7 @@ mod tests { assert_eq!(1, pod_watcher.known_pods.len()); assert_eq!( &PodState::Running, - pod_watcher - .known_pods - .get(&"config-a-b494b6-pod".to_string()) - .unwrap() + pod_watcher.known_pods.get("config-a-b494b6-pod").unwrap() ) } @@ -1274,10 +1250,7 @@ mod tests { assert_eq!(1, pod_watcher.known_pods.len()); assert_eq!( &PodState::Ended, - pod_watcher - .known_pods - .get(&"config-a-b494b6-pod".to_string()) - .unwrap() + pod_watcher.known_pods.get("config-a-b494b6-pod").unwrap() ) } @@ -1301,10 +1274,7 @@ mod tests { assert_eq!(1, pod_watcher.known_pods.len()); assert_eq!( &PodState::Deleted, - pod_watcher - .known_pods - .get(&"config-a-b494b6-pod".to_string()) - .unwrap() + pod_watcher.known_pods.get("config-a-b494b6-pod").unwrap() ) } diff --git a/discovery-handlers/onvif/src/credential_store.rs b/discovery-handlers/onvif/src/credential_store.rs index e31fa0722..5f44d95c4 100644 --- a/discovery-handlers/onvif/src/credential_store.rs +++ b/discovery-handlers/onvif/src/credential_store.rs @@ -305,7 +305,7 @@ mod tests { #[test] fn test_credential_store_non_utf8_username() { let _ = env_logger::builder().is_test(true).try_init(); - let test_data = vec![("deviceid-1", vec![200u8, 200u8, 200u8], "password_1")]; + let test_data = [("deviceid-1", vec![200u8, 200u8, 200u8], "password_1")]; let test_entries = test_data .iter() .map(|(id, uname, pwd)| DeviceCredentialData { @@ -323,7 +323,7 @@ mod tests { #[test] fn test_credential_store_non_utf8_password() { let _ = env_logger::builder().is_test(true).try_init(); - let test_data = vec![("deviceid-1", "username_1", vec![200u8, 200u8, 200u8])]; + let test_data = [("deviceid-1", "username_1", vec![200u8, 200u8, 200u8])]; let expected_result = test_data .iter() .map(|(id, uname, pwd)| { diff --git a/discovery-handlers/onvif/src/discovery_utils.rs b/discovery-handlers/onvif/src/discovery_utils.rs index 5080825bf..db6c91da8 100644 --- a/discovery-handlers/onvif/src/discovery_utils.rs +++ b/discovery-handlers/onvif/src/discovery_utils.rs @@ -608,11 +608,9 @@ mod tests { async fn test_inner_get_device_profile_streaming_uri() { let _ = env_logger::builder().is_test(true).try_init(); - let expected_result = vec![ - "rtsp://192.168.0.36:554/user=admin_password=tlJwpbo6_channel=1_stream=0.sdp?real_stream".to_string(), + let expected_result = ["rtsp://192.168.0.36:554/user=admin_password=tlJwpbo6_channel=1_stream=0.sdp?real_stream".to_string(), "rtsp://192.168.1.36:554/user=admin_password=tlJwpbo6_channel=1_stream=0.sdp?real_stream".to_string(), - "rtsp://192.168.2.36:554/user=admin_password=tlJwpbo6_channel=1_stream=0.sdp?real_stream".to_string() - ]; + "rtsp://192.168.2.36:554/user=admin_password=tlJwpbo6_channel=1_stream=0.sdp?real_stream".to_string()]; for (i, expected_uri) in expected_result.iter().enumerate().take(3) { let mut mock = MockHttp::new(); diff --git a/discovery-handlers/opcua/src/discovery_handler.rs b/discovery-handlers/opcua/src/discovery_handler.rs index 511571c42..dffbbebfe 100644 --- a/discovery-handlers/opcua/src/discovery_handler.rs +++ b/discovery-handlers/opcua/src/discovery_handler.rs @@ -142,7 +142,7 @@ impl DiscoveryHandler for DiscoveryHandlerImpl { || matching_device_count != previously_discovered_devices.len() { trace!("discover - for OPC UA, sending updated device list"); - previously_discovered_devices = discovered_devices.clone(); + previously_discovered_devices.clone_from(&discovered_devices); if let Err(e) = discovered_devices_sender .send(Ok(DiscoverResponse { devices: discovered_devices, diff --git a/discovery-handlers/udev/src/discovery_handler.rs b/discovery-handlers/udev/src/discovery_handler.rs index 037ff5d89..343a0343a 100644 --- a/discovery-handlers/udev/src/discovery_handler.rs +++ b/discovery-handlers/udev/src/discovery_handler.rs @@ -135,7 +135,7 @@ impl DiscoveryHandler for DiscoveryHandlerImpl { || matching_device_count != previously_discovered_devices.len() { info!("discover - sending updated device list"); - previously_discovered_devices = discovered_devices.clone(); + previously_discovered_devices.clone_from(&discovered_devices); if let Err(e) = discovered_devices_sender .send(Ok(DiscoverResponse { devices: discovered_devices, diff --git a/discovery-handlers/udev/src/discovery_impl.rs b/discovery-handlers/udev/src/discovery_impl.rs index 14bbc1810..e50d2f559 100644 --- a/discovery-handlers/udev/src/discovery_impl.rs +++ b/discovery-handlers/udev/src/discovery_impl.rs @@ -1331,11 +1331,11 @@ mod discovery_tests { "/devices/pci0/usb0/0-1/0-1.1/0-1.1:1.0/video4linux/video1".to_string(), "/devices/pci0/usb0/0-1/0-1.1/0-1.1:1.1".to_string(), ]; - let unrelated_paths = vec![ + let unrelated_paths = [ "/devices/pci0/usb0/0-1/0-1.2/0-1.2:1.1".to_string(), "/devices/pci0/usb1/0-1/0-1.2/0-1.2:1.1".to_string(), ]; - let parent_path = vec!["/devices/pci0/usb0".to_string()]; + let parent_path = ["/devices/pci0/usb0".to_string()]; let empty: Vec = Vec::new(); // Test with children devices @@ -1366,7 +1366,7 @@ mod discovery_tests { #[test] fn test_insert_device_with_relatives() { let mut devpaths: HashMap> = HashMap::default(); - let related_devices = vec![ + let related_devices = [ ("/sys/device/parent".to_string(), None), ( "/sys/device/parent/child1".to_string(), diff --git a/discovery-utils/src/discovery/v0.rs b/discovery-utils/src/discovery/v0.rs index a20490c38..08781a68e 100644 --- a/discovery-utils/src/discovery/v0.rs +++ b/discovery-utils/src/discovery/v0.rs @@ -9,7 +9,10 @@ pub struct RegisterDiscoveryHandlerRequest { /// Endpoint for the registering `DiscoveryHandler` #[prost(string, tag = "2")] pub endpoint: ::prost::alloc::string::String, - #[prost(enumeration = "register_discovery_handler_request::EndpointType", tag = "3")] + #[prost( + enumeration = "register_discovery_handler_request::EndpointType", + tag = "3" + )] pub endpoint_type: i32, /// Specifies whether this device could be used by multiple nodes (e.g. an IP camera) /// or can only be ever be discovered by a single node (e.g. a local USB device) @@ -19,17 +22,7 @@ pub struct RegisterDiscoveryHandlerRequest { /// Nested message and enum types in `RegisterDiscoveryHandlerRequest`. pub mod register_discovery_handler_request { /// Specifies the type of endpoint. - #[derive( - Clone, - Copy, - Debug, - PartialEq, - Eq, - Hash, - PartialOrd, - Ord, - ::prost::Enumeration - )] + #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] #[repr(i32)] pub enum EndpointType { Uds = 0, @@ -75,10 +68,7 @@ pub struct DiscoverRequest { /// list of Key-value pairs containing additional information /// for the 'DiscoveryHandler' to discover devices #[prost(map = "string, message", tag = "2")] - pub discovery_properties: ::std::collections::HashMap< - ::prost::alloc::string::String, - ByteData, - >, + pub discovery_properties: ::std::collections::HashMap<::prost::alloc::string::String, ByteData>, } #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] @@ -97,10 +87,8 @@ pub struct Device { /// and set as environment variables in the device's broker Pods. May be information /// about where to find the device such as an RTSP URL or a device node (e.g. `/dev/video1`) #[prost(map = "string, string", tag = "2")] - pub properties: ::std::collections::HashMap< - ::prost::alloc::string::String, - ::prost::alloc::string::String, - >, + pub properties: + ::std::collections::HashMap<::prost::alloc::string::String, ::prost::alloc::string::String>, /// Optionally specify mounts for Pods that request this device as a resource #[prost(message, repeated, tag = "3")] pub mounts: ::prost::alloc::vec::Vec, @@ -145,8 +133,8 @@ pub struct DeviceSpec { /// Generated client implementations. pub mod registration_client { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] - use tonic::codegen::*; use tonic::codegen::http::Uri; + use tonic::codegen::*; /// Registration is the service advertised by the Akri Agent. /// Any `DiscoveryHandler` can register with the Akri Agent. #[derive(Debug, Clone)] @@ -192,9 +180,8 @@ pub mod registration_client { >::ResponseBody, >, >, - , - >>::Error: Into + Send + Sync, + >>::Error: + Into + Send + Sync, { RegistrationClient::new(InterceptedService::new(inner, interceptor)) } @@ -233,22 +220,20 @@ pub mod registration_client { &mut self, request: impl tonic::IntoRequest, ) -> std::result::Result, tonic::Status> { - self.inner - .ready() - .await - .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; + self.inner.ready().await.map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static( - "/v0.Registration/RegisterDiscoveryHandler", - ); + let path = + http::uri::PathAndQuery::from_static("/v0.Registration/RegisterDiscoveryHandler"); let mut req = request.into_request(); - req.extensions_mut() - .insert(GrpcMethod::new("v0.Registration", "RegisterDiscoveryHandler")); + req.extensions_mut().insert(GrpcMethod::new( + "v0.Registration", + "RegisterDiscoveryHandler", + )); self.inner.unary(req, path, codec).await } } @@ -256,8 +241,8 @@ pub mod registration_client { /// Generated client implementations. pub mod discovery_handler_client { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] - use tonic::codegen::*; use tonic::codegen::http::Uri; + use tonic::codegen::*; #[derive(Debug, Clone)] pub struct DiscoveryHandlerClient { inner: tonic::client::Grpc, @@ -301,9 +286,8 @@ pub mod discovery_handler_client { >::ResponseBody, >, >, - , - >>::Error: Into + Send + Sync, + >>::Error: + Into + Send + Sync, { DiscoveryHandlerClient::new(InterceptedService::new(inner, interceptor)) } @@ -345,19 +329,14 @@ pub mod discovery_handler_client { tonic::Response>, tonic::Status, > { - self.inner - .ready() - .await - .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; + self.inner.ready().await.map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static( - "/v0.DiscoveryHandler/Discover", - ); + let path = http::uri::PathAndQuery::from_static("/v0.DiscoveryHandler/Discover"); let mut req = request.into_request(); req.extensions_mut() .insert(GrpcMethod::new("v0.DiscoveryHandler", "Discover")); @@ -402,10 +381,7 @@ pub mod registration_server { max_encoding_message_size: None, } } - pub fn with_interceptor( - inner: T, - interceptor: F, - ) -> InterceptedService + pub fn with_interceptor(inner: T, interceptor: F) -> InterceptedService where F: tonic::service::Interceptor, { @@ -461,27 +437,19 @@ pub mod registration_server { "/v0.Registration/RegisterDiscoveryHandler" => { #[allow(non_camel_case_types)] struct RegisterDiscoveryHandlerSvc(pub Arc); - impl< - T: Registration, - > tonic::server::UnaryService - for RegisterDiscoveryHandlerSvc { + impl + tonic::server::UnaryService + for RegisterDiscoveryHandlerSvc + { type Response = super::Empty; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; + type Future = BoxFuture, tonic::Status>; fn call( &mut self, - request: tonic::Request< - super::RegisterDiscoveryHandlerRequest, - >, + request: tonic::Request, ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { - ::register_discovery_handler( - &inner, - request, - ) + ::register_discovery_handler(&inner, request) .await }; Box::pin(fut) @@ -510,18 +478,14 @@ pub mod registration_server { }; Box::pin(fut) } - _ => { - Box::pin(async move { - Ok( - http::Response::builder() - .status(200) - .header("grpc-status", "12") - .header("content-type", "application/grpc") - .body(empty_body()) - .unwrap(), - ) - }) - } + _ => Box::pin(async move { + Ok(http::Response::builder() + .status(200) + .header("grpc-status", "12") + .header("content-type", "application/grpc") + .body(empty_body()) + .unwrap()) + }), } } } @@ -561,8 +525,7 @@ pub mod discovery_handler_server { /// Server streaming response type for the Discover method. type DiscoverStream: tonic::codegen::tokio_stream::Stream< Item = std::result::Result, - > - + Send + > + Send + 'static; async fn discover( &self, @@ -592,10 +555,7 @@ pub mod discovery_handler_server { max_encoding_message_size: None, } } - pub fn with_interceptor( - inner: T, - interceptor: F, - ) -> InterceptedService + pub fn with_interceptor(inner: T, interceptor: F) -> InterceptedService where F: tonic::service::Interceptor, { @@ -651,16 +611,14 @@ pub mod discovery_handler_server { "/v0.DiscoveryHandler/Discover" => { #[allow(non_camel_case_types)] struct DiscoverSvc(pub Arc); - impl< - T: DiscoveryHandler, - > tonic::server::ServerStreamingService - for DiscoverSvc { + impl + tonic::server::ServerStreamingService + for DiscoverSvc + { type Response = super::DiscoverResponse; type ResponseStream = T::DiscoverStream; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; + type Future = + BoxFuture, tonic::Status>; fn call( &mut self, request: tonic::Request, @@ -695,18 +653,14 @@ pub mod discovery_handler_server { }; Box::pin(fut) } - _ => { - Box::pin(async move { - Ok( - http::Response::builder() - .status(200) - .header("grpc-status", "12") - .header("content-type", "application/grpc") - .body(empty_body()) - .unwrap(), - ) - }) - } + _ => Box::pin(async move { + Ok(http::Response::builder() + .status(200) + .header("grpc-status", "12") + .header("content-type", "application/grpc") + .body(empty_body()) + .unwrap()) + }), } } } diff --git a/samples/brokers/udev-video-broker/src/util/camera.rs b/samples/brokers/udev-video-broker/src/util/camera.rs index 4bd208251..437b09cd8 100644 --- a/samples/brokers/udev-video-broker/src/util/camera.rs +++ b/samples/brokers/udev-video-broker/src/util/camera.rs @@ -13,8 +13,8 @@ pub struct NotifyResponse { /// Generated client implementations. pub mod camera_client { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] - use tonic::codegen::*; use tonic::codegen::http::Uri; + use tonic::codegen::*; #[derive(Debug, Clone)] pub struct CameraClient { inner: tonic::client::Grpc, @@ -58,9 +58,8 @@ pub mod camera_client { >::ResponseBody, >, >, - , - >>::Error: Into + Send + Sync, + >>::Error: + Into + Send + Sync, { CameraClient::new(InterceptedService::new(inner, interceptor)) } @@ -99,19 +98,17 @@ pub mod camera_client { &mut self, request: impl tonic::IntoRequest, ) -> std::result::Result, tonic::Status> { - self.inner - .ready() - .await - .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; + self.inner.ready().await.map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); let path = http::uri::PathAndQuery::from_static("/camera.Camera/GetFrame"); let mut req = request.into_request(); - req.extensions_mut().insert(GrpcMethod::new("camera.Camera", "GetFrame")); + req.extensions_mut() + .insert(GrpcMethod::new("camera.Camera", "GetFrame")); self.inner.unary(req, path, codec).await } } @@ -151,10 +148,7 @@ pub mod camera_server { max_encoding_message_size: None, } } - pub fn with_interceptor( - inner: T, - interceptor: F, - ) -> InterceptedService + pub fn with_interceptor(inner: T, interceptor: F) -> InterceptedService where F: tonic::service::Interceptor, { @@ -210,21 +204,16 @@ pub mod camera_server { "/camera.Camera/GetFrame" => { #[allow(non_camel_case_types)] struct GetFrameSvc(pub Arc); - impl tonic::server::UnaryService - for GetFrameSvc { + impl tonic::server::UnaryService for GetFrameSvc { type Response = super::NotifyResponse; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; + type Future = BoxFuture, tonic::Status>; fn call( &mut self, request: tonic::Request, ) -> Self::Future { let inner = Arc::clone(&self.0); - let fut = async move { - ::get_frame(&inner, request).await - }; + let fut = + async move { ::get_frame(&inner, request).await }; Box::pin(fut) } } @@ -251,18 +240,14 @@ pub mod camera_server { }; Box::pin(fut) } - _ => { - Box::pin(async move { - Ok( - http::Response::builder() - .status(200) - .header("grpc-status", "12") - .header("content-type", "application/grpc") - .body(empty_body()) - .unwrap(), - ) - }) - } + _ => Box::pin(async move { + Ok(http::Response::builder() + .status(200) + .header("grpc-status", "12") + .header("content-type", "application/grpc") + .body(empty_body()) + .unwrap()) + }), } } } diff --git a/shared/src/k8s/api.rs b/shared/src/k8s/api.rs index dd90f43d7..88ec79a0f 100644 --- a/shared/src/k8s/api.rs +++ b/shared/src/k8s/api.rs @@ -106,6 +106,7 @@ where } #[automock] +#[allow(clippy::multiple_bound_locations)] pub trait IntoApi: Send + Sync { fn all(&self) -> Box>; fn namespaced(&self, namespace: &str) -> Box> diff --git a/shared/src/k8s/job.rs b/shared/src/k8s/job.rs index 58baf37fa..bcf5c238a 100644 --- a/shared/src/k8s/job.rs +++ b/shared/src/k8s/job.rs @@ -455,7 +455,7 @@ mod broker_jobspec_tests { .owner_references .as_ref() .unwrap() - .get(0) + .first() .unwrap() .name ); @@ -465,7 +465,7 @@ mod broker_jobspec_tests { .owner_references .as_ref() .unwrap() - .get(0) + .first() .unwrap() .uid ); @@ -475,7 +475,7 @@ mod broker_jobspec_tests { .owner_references .as_ref() .unwrap() - .get(0) + .first() .unwrap() .kind ); @@ -485,7 +485,7 @@ mod broker_jobspec_tests { .owner_references .as_ref() .unwrap() - .get(0) + .first() .unwrap() .api_version ); @@ -494,7 +494,7 @@ mod broker_jobspec_tests { .owner_references .as_ref() .unwrap() - .get(0) + .first() .unwrap() .controller .unwrap()); @@ -503,7 +503,7 @@ mod broker_jobspec_tests { .owner_references .as_ref() .unwrap() - .get(0) + .first() .unwrap() .block_owner_deletion .unwrap()); diff --git a/shared/src/k8s/pod.rs b/shared/src/k8s/pod.rs index aa8d03e43..7123701a1 100644 --- a/shared/src/k8s/pod.rs +++ b/shared/src/k8s/pod.rs @@ -280,6 +280,118 @@ pub fn modify_pod_spec( } } +/// Create Kubernetes Pod +/// +/// Example: +/// +/// ```no_run +/// use akri_shared::k8s::pod; +/// use kube::client::Client; +/// use kube::config; +/// use k8s_openapi::api::core::v1::Pod; +/// +/// # #[tokio::main] +/// # async fn main() { +/// let api_client = Client::try_default().await.unwrap(); +/// pod::create_pod(&Pod::default(), "pod_namespace", api_client).await.unwrap(); +/// # } +/// ``` +pub async fn create_pod( + pod_to_create: &Pod, + namespace: &str, + kube_client: Client, +) -> Result<(), anyhow::Error> { + trace!("create_pod enter"); + let pods: Api = Api::namespaced(kube_client, namespace); + info!("create_pod pods.create(...).await?:"); + match pods.create(&PostParams::default(), pod_to_create).await { + Ok(created_pod) => { + info!( + "create_pod pods.create return: {:?}", + created_pod.metadata.name + ); + Ok(()) + } + Err(kube::Error::Api(ae)) => { + if ae.code == ERROR_CONFLICT { + trace!("create_pod - pod already exists"); + Ok(()) + } else { + error!( + "create_pod pods.create [{:?}] returned kube error: {:?}", + serde_json::to_string(&pod_to_create), + ae + ); + Err(anyhow::anyhow!(ae)) + } + } + Err(e) => { + error!( + "create_pod pods.create [{:?}] error: {:?}", + serde_json::to_string(&pod_to_create), + e + ); + Err(anyhow::anyhow!(e)) + } + } +} + +/// Remove Kubernetes Pod +/// +/// Example: +/// +/// ```no_run +/// use akri_shared::k8s::pod; +/// use kube::client::Client; +/// use kube::config; +/// +/// # #[tokio::main] +/// # async fn main() { +/// let api_client = Client::try_default().await.unwrap(); +/// pod::remove_pod("pod_to_remove", "pod_namespace", api_client).await.unwrap(); +/// # } +/// ``` +pub async fn remove_pod( + pod_to_remove: &str, + namespace: &str, + kube_client: Client, +) -> Result<(), anyhow::Error> { + trace!("remove_pod enter"); + let pods: Api = Api::namespaced(kube_client, namespace); + info!("remove_pod pods.delete(...).await?:"); + match pods.delete(pod_to_remove, &DeleteParams::default()).await { + Ok(deleted_pod) => match deleted_pod { + Either::Left(spec) => { + info!("remove_pod pods.delete return: {:?}", &spec.metadata.name); + Ok(()) + } + Either::Right(status) => { + info!("remove_pod pods.delete return: {:?}", &status.status); + Ok(()) + } + }, + Err(kube::Error::Api(ae)) => { + if ae.code == ERROR_NOT_FOUND { + trace!("remove_pod - pod already removed"); + Ok(()) + } else { + error!( + "remove_pod pods.delete [{:?}] returned kube error: {:?}", + &pod_to_remove, ae + ); + Err(anyhow::anyhow!(ae)) + } + } + Err(e) => { + error!( + "remove_pod pods.delete [{:?}] error: {:?}", + &pod_to_remove, e + ); + Err(anyhow::anyhow!(e)) + } + } +} + #[cfg(test)] mod broker_podspec_tests { use super::super::super::akri::API_VERSION; @@ -548,7 +660,7 @@ mod broker_podspec_tests { .clone() .owner_references .unwrap() - .get(0) + .first() .unwrap() .name ); @@ -558,7 +670,7 @@ mod broker_podspec_tests { .clone() .owner_references .unwrap() - .get(0) + .first() .unwrap() .uid ); @@ -568,7 +680,7 @@ mod broker_podspec_tests { .clone() .owner_references .unwrap() - .get(0) + .first() .unwrap() .kind ); @@ -578,7 +690,7 @@ mod broker_podspec_tests { .clone() .owner_references .unwrap() - .get(0) + .first() .unwrap() .api_version ); @@ -587,7 +699,7 @@ mod broker_podspec_tests { .clone() .owner_references .unwrap() - .get(0) + .first() .unwrap() .controller .unwrap()); @@ -596,7 +708,7 @@ mod broker_podspec_tests { .clone() .owner_references .unwrap() - .get(0) + .first() .unwrap() .block_owner_deletion .unwrap()); @@ -630,12 +742,12 @@ mod broker_podspec_tests { .required_during_scheduling_ignored_during_execution .unwrap() .node_selector_terms - .get(0) + .first() .unwrap() .match_fields .as_ref() .unwrap() - .get(0) + .first() .unwrap() .key ); @@ -651,12 +763,12 @@ mod broker_podspec_tests { .required_during_scheduling_ignored_during_execution .unwrap() .node_selector_terms - .get(0) + .first() .unwrap() .match_fields .as_ref() .unwrap() - .get(0) + .first() .unwrap() .operator ); @@ -672,12 +784,12 @@ mod broker_podspec_tests { .required_during_scheduling_ignored_during_execution .unwrap() .node_selector_terms - .get(0) + .first() .unwrap() .match_fields .as_ref() .unwrap() - .get(0) + .first() .unwrap() .values .as_ref() @@ -702,7 +814,7 @@ mod broker_podspec_tests { .match_fields .as_ref() .unwrap() - .get(0) + .first() .unwrap() .key ); @@ -723,7 +835,7 @@ mod broker_podspec_tests { .match_fields .as_ref() .unwrap() - .get(0) + .first() .unwrap() .operator ); @@ -744,7 +856,7 @@ mod broker_podspec_tests { .match_fields .as_ref() .unwrap() - .get(0) + .first() .unwrap() .values .as_ref() @@ -989,115 +1101,3 @@ mod broker_podspec_tests { } } } - -/// Create Kubernetes Pod -/// -/// Example: -/// -/// ```no_run -/// use akri_shared::k8s::pod; -/// use kube::client::Client; -/// use kube::config; -/// use k8s_openapi::api::core::v1::Pod; -/// -/// # #[tokio::main] -/// # async fn main() { -/// let api_client = Client::try_default().await.unwrap(); -/// pod::create_pod(&Pod::default(), "pod_namespace", api_client).await.unwrap(); -/// # } -/// ``` -pub async fn create_pod( - pod_to_create: &Pod, - namespace: &str, - kube_client: Client, -) -> Result<(), anyhow::Error> { - trace!("create_pod enter"); - let pods: Api = Api::namespaced(kube_client, namespace); - info!("create_pod pods.create(...).await?:"); - match pods.create(&PostParams::default(), pod_to_create).await { - Ok(created_pod) => { - info!( - "create_pod pods.create return: {:?}", - created_pod.metadata.name - ); - Ok(()) - } - Err(kube::Error::Api(ae)) => { - if ae.code == ERROR_CONFLICT { - trace!("create_pod - pod already exists"); - Ok(()) - } else { - error!( - "create_pod pods.create [{:?}] returned kube error: {:?}", - serde_json::to_string(&pod_to_create), - ae - ); - Err(anyhow::anyhow!(ae)) - } - } - Err(e) => { - error!( - "create_pod pods.create [{:?}] error: {:?}", - serde_json::to_string(&pod_to_create), - e - ); - Err(anyhow::anyhow!(e)) - } - } -} - -/// Remove Kubernetes Pod -/// -/// Example: -/// -/// ```no_run -/// use akri_shared::k8s::pod; -/// use kube::client::Client; -/// use kube::config; -/// -/// # #[tokio::main] -/// # async fn main() { -/// let api_client = Client::try_default().await.unwrap(); -/// pod::remove_pod("pod_to_remove", "pod_namespace", api_client).await.unwrap(); -/// # } -/// ``` -pub async fn remove_pod( - pod_to_remove: &str, - namespace: &str, - kube_client: Client, -) -> Result<(), anyhow::Error> { - trace!("remove_pod enter"); - let pods: Api = Api::namespaced(kube_client, namespace); - info!("remove_pod pods.delete(...).await?:"); - match pods.delete(pod_to_remove, &DeleteParams::default()).await { - Ok(deleted_pod) => match deleted_pod { - Either::Left(spec) => { - info!("remove_pod pods.delete return: {:?}", &spec.metadata.name); - Ok(()) - } - Either::Right(status) => { - info!("remove_pod pods.delete return: {:?}", &status.status); - Ok(()) - } - }, - Err(kube::Error::Api(ae)) => { - if ae.code == ERROR_NOT_FOUND { - trace!("remove_pod - pod already removed"); - Ok(()) - } else { - error!( - "remove_pod pods.delete [{:?}] returned kube error: {:?}", - &pod_to_remove, ae - ); - Err(anyhow::anyhow!(ae)) - } - } - Err(e) => { - error!( - "remove_pod pods.delete [{:?}] error: {:?}", - &pod_to_remove, e - ); - Err(anyhow::anyhow!(e)) - } - } -} diff --git a/shared/src/k8s/service.rs b/shared/src/k8s/service.rs index 1766f4922..90f9849a2 100644 --- a/shared/src/k8s/service.rs +++ b/shared/src/k8s/service.rs @@ -242,6 +242,177 @@ pub fn update_ownership( Ok(()) } +/// Create Kubernetes Service +/// +/// Example: +/// +/// ```no_run +/// use akri_shared::k8s::service; +/// use kube::client::Client; +/// use kube::config; +/// use k8s_openapi::api::core::v1::Service; +/// +/// # #[tokio::main] +/// # async fn main() { +/// let api_client = Client::try_default().await.unwrap(); +/// service::create_service(&Service::default(), "svc_namespace", api_client).await.unwrap(); +/// # } +/// ``` +pub async fn create_service( + svc_to_create: &Service, + namespace: &str, + kube_client: Client, +) -> Result<(), anyhow::Error> { + trace!("create_service enter"); + let services: Api = Api::namespaced(kube_client, namespace); + info!("create_service svcs.create(...).await?:"); + match services.create(&PostParams::default(), svc_to_create).await { + Ok(created_svc) => { + info!( + "create_service services.create return: {:?}", + created_svc.metadata.name + ); + Ok(()) + } + Err(kube::Error::Api(ae)) => { + error!( + "create_service services.create [{:?}] returned kube error: {:?}", + serde_json::to_string(&svc_to_create), + ae + ); + Ok(()) + } + Err(e) => { + error!( + "create_service services.create [{:?}] error: {:?}", + serde_json::to_string(&svc_to_create), + e + ); + Err(anyhow::anyhow!(e)) + } + } +} + +/// Remove Kubernetes Service +/// +/// Example: +/// +/// ```no_run +/// use akri_shared::k8s::service; +/// use kube::client::Client; +/// use kube::config; +/// +/// # #[tokio::main] +/// # async fn main() { +/// let api_client = Client::try_default().await.unwrap(); +/// service::remove_service("svc_to_remove", "svc_namespace", api_client).await.unwrap(); +/// # } +/// ``` +pub async fn remove_service( + svc_to_remove: &str, + namespace: &str, + kube_client: Client, +) -> Result<(), anyhow::Error> { + trace!("remove_service enter"); + let svcs: Api = Api::namespaced(kube_client, namespace); + info!("remove_service svcs.create(...).await?:"); + match svcs.delete(svc_to_remove, &DeleteParams::default()).await { + Ok(deleted_svc) => match deleted_svc { + Either::Left(spec) => { + info!( + "remove_service svcs.delete return: {:?}", + &spec.metadata.name + ); + Ok(()) + } + Either::Right(status) => { + info!("remove_service svcs.delete return: {:?}", &status.status); + Ok(()) + } + }, + Err(kube::Error::Api(ae)) => { + if ae.code == ERROR_NOT_FOUND { + trace!("remove_service - service already deleted"); + Ok(()) + } else { + error!( + "remove_service svcs.delete [{:?}] returned kube error: {:?}", + &svc_to_remove, ae + ); + Err(anyhow::anyhow!(ae)) + } + } + Err(e) => { + error!( + "remove_service svcs.delete [{:?}] error: {:?}", + &svc_to_remove, e + ); + Err(anyhow::anyhow!(e)) + } + } +} + +/// Update Kubernetes Service +/// +/// Example: +/// +/// ```no_run +/// use akri_shared::k8s::service; +/// use kube::client::Client; +/// use kube::config; +/// +/// # #[tokio::main] +/// # async fn main() { +/// let selector = "environment=production,app=nginx"; +/// let api_client = Client::try_default().await.unwrap(); +/// for svc in service::find_services_with_selector(&selector, api_client).await.unwrap() { +/// let svc_name = &svc.metadata.name.clone().unwrap(); +/// let svc_namespace = &svc.metadata.namespace.as_ref().unwrap().clone(); +/// let loop_api_client = Client::try_default().await.unwrap(); +/// let updated_svc = service::update_service( +/// &svc, +/// &svc_name, +/// &svc_namespace, +/// loop_api_client).await.unwrap(); +/// } +/// # } +/// ``` +pub async fn update_service( + svc_to_update: &Service, + name: &str, + namespace: &str, + kube_client: Client, +) -> Result<(), anyhow::Error> { + trace!( + "update_service enter name:{} namespace: {}", + &name, + &namespace + ); + let svcs: Api = Api::namespaced(kube_client, namespace); + + info!("remove_service svcs.patch(...).await?:"); + match svcs + .patch(name, &PatchParams::default(), &Patch::Merge(&svc_to_update)) + .await + { + Ok(_service_modified) => { + log::trace!("update_service return"); + Ok(()) + } + Err(kube::Error::Api(ae)) => { + log::trace!( + "update_service kube_client.request returned kube error: {:?}", + ae + ); + Err(anyhow::anyhow!(ae)) + } + Err(e) => { + log::trace!("update_service kube_client.request error: {:?}", e); + Err(anyhow::anyhow!(e)) + } + } +} + #[cfg(test)] mod svcspec_tests { use super::super::OwnershipType; @@ -509,7 +680,7 @@ mod svcspec_tests { .owner_references .as_ref() .unwrap() - .get(0) + .first() .unwrap() .name ); @@ -520,7 +691,7 @@ mod svcspec_tests { .owner_references .as_ref() .unwrap() - .get(0) + .first() .unwrap() .uid ); @@ -531,7 +702,7 @@ mod svcspec_tests { .owner_references .as_ref() .unwrap() - .get(0) + .first() .unwrap() .kind ); @@ -541,7 +712,7 @@ mod svcspec_tests { .clone() .owner_references .unwrap() - .get(0) + .first() .unwrap() .api_version ); @@ -550,7 +721,7 @@ mod svcspec_tests { .clone() .owner_references .unwrap() - .get(0) + .first() .unwrap() .controller .unwrap()); @@ -559,7 +730,7 @@ mod svcspec_tests { .clone() .owner_references .unwrap() - .get(0) + .first() .unwrap() .block_owner_deletion .unwrap()); @@ -616,174 +787,3 @@ mod svcspec_tests { } } } - -/// Create Kubernetes Service -/// -/// Example: -/// -/// ```no_run -/// use akri_shared::k8s::service; -/// use kube::client::Client; -/// use kube::config; -/// use k8s_openapi::api::core::v1::Service; -/// -/// # #[tokio::main] -/// # async fn main() { -/// let api_client = Client::try_default().await.unwrap(); -/// service::create_service(&Service::default(), "svc_namespace", api_client).await.unwrap(); -/// # } -/// ``` -pub async fn create_service( - svc_to_create: &Service, - namespace: &str, - kube_client: Client, -) -> Result<(), anyhow::Error> { - trace!("create_service enter"); - let services: Api = Api::namespaced(kube_client, namespace); - info!("create_service svcs.create(...).await?:"); - match services.create(&PostParams::default(), svc_to_create).await { - Ok(created_svc) => { - info!( - "create_service services.create return: {:?}", - created_svc.metadata.name - ); - Ok(()) - } - Err(kube::Error::Api(ae)) => { - error!( - "create_service services.create [{:?}] returned kube error: {:?}", - serde_json::to_string(&svc_to_create), - ae - ); - Ok(()) - } - Err(e) => { - error!( - "create_service services.create [{:?}] error: {:?}", - serde_json::to_string(&svc_to_create), - e - ); - Err(anyhow::anyhow!(e)) - } - } -} - -/// Remove Kubernetes Service -/// -/// Example: -/// -/// ```no_run -/// use akri_shared::k8s::service; -/// use kube::client::Client; -/// use kube::config; -/// -/// # #[tokio::main] -/// # async fn main() { -/// let api_client = Client::try_default().await.unwrap(); -/// service::remove_service("svc_to_remove", "svc_namespace", api_client).await.unwrap(); -/// # } -/// ``` -pub async fn remove_service( - svc_to_remove: &str, - namespace: &str, - kube_client: Client, -) -> Result<(), anyhow::Error> { - trace!("remove_service enter"); - let svcs: Api = Api::namespaced(kube_client, namespace); - info!("remove_service svcs.create(...).await?:"); - match svcs.delete(svc_to_remove, &DeleteParams::default()).await { - Ok(deleted_svc) => match deleted_svc { - Either::Left(spec) => { - info!( - "remove_service svcs.delete return: {:?}", - &spec.metadata.name - ); - Ok(()) - } - Either::Right(status) => { - info!("remove_service svcs.delete return: {:?}", &status.status); - Ok(()) - } - }, - Err(kube::Error::Api(ae)) => { - if ae.code == ERROR_NOT_FOUND { - trace!("remove_service - service already deleted"); - Ok(()) - } else { - error!( - "remove_service svcs.delete [{:?}] returned kube error: {:?}", - &svc_to_remove, ae - ); - Err(anyhow::anyhow!(ae)) - } - } - Err(e) => { - error!( - "remove_service svcs.delete [{:?}] error: {:?}", - &svc_to_remove, e - ); - Err(anyhow::anyhow!(e)) - } - } -} - -/// Update Kubernetes Service -/// -/// Example: -/// -/// ```no_run -/// use akri_shared::k8s::service; -/// use kube::client::Client; -/// use kube::config; -/// -/// # #[tokio::main] -/// # async fn main() { -/// let selector = "environment=production,app=nginx"; -/// let api_client = Client::try_default().await.unwrap(); -/// for svc in service::find_services_with_selector(&selector, api_client).await.unwrap() { -/// let svc_name = &svc.metadata.name.clone().unwrap(); -/// let svc_namespace = &svc.metadata.namespace.as_ref().unwrap().clone(); -/// let loop_api_client = Client::try_default().await.unwrap(); -/// let updated_svc = service::update_service( -/// &svc, -/// &svc_name, -/// &svc_namespace, -/// loop_api_client).await.unwrap(); -/// } -/// # } -/// ``` -pub async fn update_service( - svc_to_update: &Service, - name: &str, - namespace: &str, - kube_client: Client, -) -> Result<(), anyhow::Error> { - trace!( - "update_service enter name:{} namespace: {}", - &name, - &namespace - ); - let svcs: Api = Api::namespaced(kube_client, namespace); - - info!("remove_service svcs.patch(...).await?:"); - match svcs - .patch(name, &PatchParams::default(), &Patch::Merge(&svc_to_update)) - .await - { - Ok(_service_modified) => { - log::trace!("update_service return"); - Ok(()) - } - Err(kube::Error::Api(ae)) => { - log::trace!( - "update_service kube_client.request returned kube error: {:?}", - ae - ); - Err(anyhow::anyhow!(ae)) - } - Err(e) => { - log::trace!("update_service kube_client.request error: {:?}", e); - Err(anyhow::anyhow!(e)) - } - } -} diff --git a/shared/src/os/env_var.rs b/shared/src/os/env_var.rs index f09c5de79..5bf8e5e33 100644 --- a/shared/src/os/env_var.rs +++ b/shared/src/os/env_var.rs @@ -27,8 +27,6 @@ impl EnvVarQuery for ActualEnvVarQuery { } fn get_env_vars(&self) -> Vec<(String, String)> { - env::vars() - .map(|(n, v)| (n, v)) - .collect::>() + env::vars().collect::>() } }