Skip to content

Commit

Permalink
chore: bump kube-rs version
Browse files Browse the repository at this point in the history
  • Loading branch information
c-nixon committed Jul 15, 2024
1 parent 11236f5 commit 63814af
Show file tree
Hide file tree
Showing 20 changed files with 636 additions and 295 deletions.
708 changes: 513 additions & 195 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 0 additions & 4 deletions api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ edition = "2018"

[dependencies]
http = { package = "mz-http", path = "../common/http" }
middleware = { package = "middleware", path = "../common/middleware" }

tokio = { package = "tokio", version = "1", features = ["macros", "process", "rt-multi-thread", "time"] }
futures = "0.3"
Expand All @@ -27,9 +26,6 @@ win32job = { package = "win32job", version = "1" }

[dev-dependencies]
env_logger = "0.9"
partial_io = { package = "partial-io", version = "0.5", features = ["tokio1"]}
tokio-test = "0.4"
tracing-test = "0.2"

[features]
default = []
Expand Down
15 changes: 9 additions & 6 deletions bin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ consts = { package = "consts", path = "../common/consts" }
bytes = "1"
time = "0.3"
async-trait = "0.1"
kube = { version = "0.87", default-features = false, features = ["rustls-tls"] }
kube = { version = "0.92", default-features = false, features = ["rustls-tls"] }
env_logger = "0.9"
anyhow = "1"
futures = "0.3"
Expand All @@ -56,6 +56,7 @@ rand = "0.8.5"
shell-words = "1.0"
async-channel = "1.8"
once_cell = "1.10"
rustls = "0.23"

rlimit = "0.10"

Expand Down Expand Up @@ -90,14 +91,13 @@ dhat-ad-hoc = [] # if you are doing ad hoc profiling
[dev-dependencies]
assert_cmd = "2"
escargot = "0.5"
kube = { version = "0.87", default-features = false, features = ["runtime", "rustls-tls"] }
k8s-openapi = { version = "0.20.0", default_features = false, features = ["v1_22"] }
kube = { version = "0.92", default-features = false, features = ["runtime", "rustls-tls"] }
k8s-openapi = { version = "0.22", default-features = false, features = ["v1_24"] }
itertools = "0.10"
predicates = "3"
tempfile = "3"
rustls = "0.21"
rustls-pemfile = "1"
rcgen = "0.11"
rustls-pemfile = "2"
rcgen = "0.13"
logdna_mock_ingester = { package = "logdna-mock-ingester", path = "../common/test/mock-ingester" }
logdna-metrics-recorder = { package = "logdna-metrics-recorder", path = "../utils/metrics-recorder" }
test_types = { package = "types", path = "../common/test/types" }
Expand All @@ -113,9 +113,12 @@ serde_json = "1"
prometheus-parse = { git = "https://github.com/ccakes/prometheus-parse-rs", rev = "a4574e9" }
float-cmp = "0.9.0"
test-log = { version = "0.2", features = ["trace"] }
rustls-pki-types = "1"

[target.'cfg(target_os="linux")'.dev-dependencies]
pnet_datalink = "0.31"

[target.'cfg(integration_tests)'.dev-dependencies]
systemd = { package = "systemd", version = "0.10" }

[build-dependencies]
Expand Down
2 changes: 1 addition & 1 deletion bin/src/_main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ pub async fn _main(
Some(config.http.require_ssl),
concurrency_limit,
fo_state_handles,
));
)?);

if let Some(client) = Arc::get_mut(&mut client) {
client.set_timeout(config.http.timeout);
Expand Down
4 changes: 4 additions & 0 deletions bin/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ fn main() -> anyhow::Result<()> {
#[cfg(feature = "dhat-heap")]
let _profiler = dhat::Profiler::new_heap();

rustls::crypto::ring::default_provider()
.install_default()
.expect("Failed to install rustls crypto provider");

// covert logdna env vars to mezmo ones
Config::process_logdna_env_vars();

Expand Down
27 changes: 18 additions & 9 deletions bin/tests/it/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use core::time;
use rand::seq::IteratorRandom;

use std::collections::HashMap;
use std::convert::TryFrom;
use std::fs::OpenOptions;
use std::io::{BufRead, Write};
use std::path::Path;
Expand Down Expand Up @@ -381,15 +382,23 @@ pub fn self_signed_https_ingester(

let cert = generate_simple_self_signed(subject_alt_names).unwrap();

let cert_bytes = cert.serialize_pem().unwrap();
let cert_bytes = cert.cert.pem();
let certs = rustls_pemfile::certs(&mut std::io::BufReader::new(cert_bytes.as_bytes()))
.map(|certs| certs.into_iter().map(rustls::Certificate).collect())
.unwrap();

let key_bytes = cert.serialize_private_key_pem();
let keys: Vec<rustls::PrivateKey> =
.flat_map(|certs| {
certs
.into_iter()
.map(rustls_pki_types::CertificateDer::from)
})
.collect();

let key_bytes = cert.key_pair.serialize_pem();
let keys: Vec<rustls_pki_types::PrivateKeyDer> =
rustls_pemfile::pkcs8_private_keys(&mut std::io::BufReader::new(key_bytes.as_bytes()))
.map(|keys| keys.into_iter().map(rustls::PrivateKey).collect())
.flat_map(|keys| {
keys.into_iter()
.map(rustls_pki_types::PrivateKeyDer::try_from)
})
.collect::<Result<_, _>>()
.unwrap();

let port = get_available_port().expect("No ports free");
Expand All @@ -398,13 +407,13 @@ pub fn self_signed_https_ingester(

let mut cert_file = tempfile::NamedTempFile::new().expect("Couldn't create cert file");
cert_file
.write_all(cert.serialize_pem().unwrap().as_bytes())
.write_all(cert.cert.pem().as_bytes())
.expect("Couldn't write cert file");

let (server, received, shutdown_handle) = https_ingester_with_processors(
addr,
certs,
keys[0].clone(),
keys[0].clone_key(),
http_version,
req_fn.unwrap_or_else(|| Box::new(|_| None)),
process_fn.unwrap_or_else(|| Box::new(|_| None)),
Expand Down
6 changes: 4 additions & 2 deletions common/http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ state = { package = "state", path = "../state", default-features=false }
rate-limit-macro = { package = "rate-limit-macro", path = "../misc/rate-limit/macro" }

#http
logdna-client = { git = "https://github.com/logdna/logdna-rust.git", branch="0.7.x", version = "0.7.4" }
logdna-client = { git = "https://github.com/logdna/logdna-rust.git", branch="0.8.x", version = "0.8.0" }

#io
tokio = { version = "1", features = ["fs", "io-util", "macros"] }
Expand All @@ -24,7 +24,9 @@ async-compat = "0.2.1"
tracing = "0.1"
bytes = "1"
crossbeam = "0.8"
hyper = { version = "0.14", features = ["http1", "server"] }
hyper = { version = "1", features = ["http1", "server"] }
http-body-util = "0.1"
hyper-util = { version = "0.1", features = ["client", "service"] }
uuid = { version = "1", features = ["v4"] }
serde = { version = "1", features = ["derive", "rc"] }
serde_json = "1"
Expand Down
10 changes: 5 additions & 5 deletions common/http/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub struct Client {

pub enum SendStatus {
Sent,
Retry(hyper::Error),
Retry(hyper_util::client::legacy::Error),
RetryServerError(hyper::StatusCode, String),
RetryTimeout,
}
Expand Down Expand Up @@ -53,17 +53,17 @@ impl Client {
require_ssl: Option<bool>,
concurrency_limit: Option<usize>,
fo_state_handles: Option<(FileOffsetWriteHandle, FileOffsetFlushHandle)>,
) -> Self {
) -> Result<Self, std::io::Error> {
let (state_write, state_flush) = fo_state_handles
.map(|(sw, sf)| (Some(sw), Some(sf)))
.unwrap_or((None, None));
Self {
inner: HttpClient::new(template, require_ssl),
Ok(Self {
inner: HttpClient::new(template, require_ssl)?,
limiter: RateLimiter::new(concurrency_limit.unwrap_or(10)),
retry,
state_write,
state_flush,
}
})
}

pub async fn send<T>(
Expand Down
24 changes: 17 additions & 7 deletions common/http/src/metrics_endpoint.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,40 @@
use futures::TryFutureExt;
use hyper::{
body::{Bytes, Incoming},
header::CONTENT_TYPE,
service::{make_service_fn, service_fn},
Body, Request, Response, Server,
server::conn::http1,
service::service_fn,
Request, Response,
};

use http_body_util::Full;
use hyper_util::rt::TokioIo;
use prometheus::{Encoder, TextEncoder};
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use thiserror::Error;
use tokio::net::TcpListener;
use tracing::info;

#[derive(Debug, Error)]
pub enum Error {
#[error(transparent)]
Server(#[from] hyper::Error),
#[error(transparent)]
Io(#[from] std::io::Error),
}

pub async fn serve(port: &u16) -> Result<(), Error> {
let address = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), *port);
let serve_future = Server::bind(&address).serve(make_service_fn(|_| async {
Ok::<_, hyper::Error>(service_fn(serve_req))
}));
let listener = TcpListener::bind(address).await?;
let (stream, _) = listener.accept().await?;
let io = TokioIo::new(stream);

let serve_future = http1::Builder::new().serve_connection(io, service_fn(serve_req));
info!("Metrics server listening on http://{}", address);
serve_future.map_err(Error::Server).await
}

async fn serve_req(_req: Request<Body>) -> Result<Response<Body>, hyper::Error> {
async fn serve_req(_req: Request<Incoming>) -> Result<Response<Full<Bytes>>, hyper::Error> {
let encoder = TextEncoder::new();

let metric_families = prometheus::gather();
Expand All @@ -34,7 +44,7 @@ async fn serve_req(_req: Request<Body>) -> Result<Response<Body>, hyper::Error>
let response = Response::builder()
.status(200)
.header(CONTENT_TYPE, encoder.format_type())
.body(Body::from(buffer))
.body(Full::from(Bytes::from(buffer)))
.unwrap();

Ok(response)
Expand Down
13 changes: 7 additions & 6 deletions common/k8s/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ chrono = { version = "0.4", features = ["serde"] }
config = { package = "config", path = "../config" }
rate-limit-macro = { package = "rate-limit-macro", path = "../misc/rate-limit/macro" }
anyhow = "1.0.57"
hyper = "0.14"
hyper = "1"
hyper-timeout = "0.4"
hyper-rustls = "0.24"
hyper-util = "0.1"
hyper-rustls = "0.27"
tower = "0.4"
tower-http = { version = "0.4", features = ["set-header", "decompression-gzip"] }
tower-http = { version = "0.5", features = ["set-header", "decompression-gzip"] }
tracing = "0.1"
humantime = "2"
crossbeam = "0.8"
Expand All @@ -30,8 +31,8 @@ lazy_static = "1"
tokio = { package = "tokio", version = "1", features = ["macros", "process", "signal", "rt-multi-thread", "time"] }
futures = "0.3"
thiserror = "1.0"
kube = { version = "0.87", default-features = false, features = ["rustls-tls", "client", "runtime", "gzip", "derive"] }
k8s-openapi = { version = "0.20.0", default_features = false, features = ["v1_22"] }
kube = { version = "0.92", default-features = false, features = ["rustls-tls", "client", "runtime", "gzip", "derive"] }
k8s-openapi = { version = "0.22", default-features = false, features = ["v1_24"] }
serde = { version = "1", features = ["derive"]}
serde_json = "1"
serde_with = "1.3.1"
Expand All @@ -42,5 +43,5 @@ arc-interner = "0.7.0"
async-channel = "1.8"

[dev-dependencies]
hyper_http = { package = "http", version = "0.2" }
hyper_http = { package = "http", version = "*" }
tokio = { version = "1", features = ["macros"] }
6 changes: 3 additions & 3 deletions common/k8s/src/event_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,15 +297,15 @@ impl K8sEventStream {
.labels(&format!("app.kubernetes.io/name={}", &pod_label));
let stream = metadata_watcher(pods.clone(), wc)
.skip_while(|e| {
let matched = matches!(
let matched = !matches!(
e,
Ok(watcher::Event::<PartialObjectMeta<Pod>>::Restarted(_))
Ok(watcher::Event::<PartialObjectMeta<Pod>>::Delete(_))
);
async move { matched }
})
.map({
move |e| match e {
Ok(watcher::Event::Deleted(e)) => {
Ok(watcher::Event::Delete(e)) => {
if let Some(name) = e.metadata.name {
info!("Agent Down {}", name);

Expand Down
1 change: 1 addition & 0 deletions common/k8s/src/kube_stats/pod_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ mod tests {
node_name: Some("node_name".to_string()),
node_selector: None,
overhead: None,
os: None,
preemption_policy: None,
priority: Some(222),
priority_class_name: Some("p_class".to_string()),
Expand Down
24 changes: 7 additions & 17 deletions common/k8s/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ extern crate lazy_static;
use std::env;

use errors::K8sError;
use hyper::{client::HttpConnector, Body};
use hyper_timeout::TimeoutConnector;
use hyper_util::rt::TokioExecutor;
use kube::client::ConfigExt;
use kube::{config::Config, Client};
use tower::ServiceBuilder;
Expand All @@ -31,21 +30,11 @@ fn create_k8s_client(
) -> Result<Client, kube::Error> {
let default_ns = config.default_namespace.clone();

let client: hyper::Client<_, Body> = {
let mut connector = HttpConnector::new();
connector.enforce_http(false);

let connector = hyper_rustls::HttpsConnector::from((
connector,
std::sync::Arc::new(config.rustls_client_config()?),
));

let mut connector = TimeoutConnector::new(connector);
connector.set_connect_timeout(config.connect_timeout);
connector.set_read_timeout(config.read_timeout);

hyper::Client::builder().build(connector)
};
let https = config.rustls_https_connector()?;
let client = ServiceBuilder::new()
.layer(config.base_uri_layer())
.option_layer(config.auth_layer()?)
.service(hyper_util::client::legacy::Client::builder(TokioExecutor::new()).build(https));

let stack = ServiceBuilder::new()
.layer(config.base_uri_layer())
Expand All @@ -67,6 +56,7 @@ fn create_k8s_client(
.option_layer(config.auth_layer()?)
.layer(config.extra_headers_layer()?)
.service(client);

Ok(Client::new(service, default_ns))
}

Expand Down
4 changes: 4 additions & 0 deletions common/k8s/src/metrics_stats_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,8 @@ mod tests {

use std::collections::HashMap;

use kube::api::TypeMeta;

use k8s_openapi::api::core::v1::{Node, Pod};
use kube::api::{ListMeta, ObjectList};
use serde_json::Value;
Expand Down Expand Up @@ -589,6 +591,7 @@ mod tests {
self_link: None,
},
items: Vec::new(),
types: TypeMeta::list::<Pod>(),
};

let mut controller_map: HashMap<String, ControllerStats> = HashMap::new();
Expand Down Expand Up @@ -618,6 +621,7 @@ mod tests {
self_link: None,
},
items: Vec::new(),
types: TypeMeta::list::<Node>(),
};

let mut node_pod_counts_map: HashMap<String, NodePodStats> = HashMap::new();
Expand Down
Loading

0 comments on commit 63814af

Please sign in to comment.