Skip to content

Commit

Permalink
feat: wip healthcheck service. done framework.
Browse files Browse the repository at this point in the history
  • Loading branch information
junkurihara committed Oct 30, 2023
1 parent a5f3d79 commit da3135e
Show file tree
Hide file tree
Showing 11 changed files with 120 additions and 45 deletions.
25 changes: 17 additions & 8 deletions dap-bin/src/config/toml.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use super::utils_verifier::*;
use crate::{constants::*, error::*, log::*};
use doh_auth_proxy_lib::{
AuthenticationConfig, DoHMethod, NextHopRelayConfig, ProxyConfig, SubseqRelayConfig, TargetConfig,
};
use doh_auth_proxy_lib::{AuthenticationConfig, NextHopRelayConfig, ProxyConfig, SubseqRelayConfig};
use serde::Deserialize;
use std::{env, fs};
use tokio::time::Duration;
Expand All @@ -11,7 +9,8 @@ use tokio::time::Duration;
pub struct ConfigToml {
pub listen_addresses: Option<Vec<String>>,
pub bootstrap_dns: Option<Vec<String>>,
pub reboot_period: Option<usize>,
pub endoint_resolution_period: Option<usize>,
pub healthcheck_period: Option<usize>,
pub max_cache_size: Option<usize>,
pub target_urls: Option<Vec<String>>,
pub target_randomization: Option<bool>,
Expand Down Expand Up @@ -74,15 +73,25 @@ impl TryInto<ProxyConfig> for &ConfigToml {
info!("Bootstrap DNS: {:?}", proxy_config.bootstrap_dns.ips);

/////////////////////////////
// reboot period
if let Some(val) = self.reboot_period {
// endpoint re-resolution period
if let Some(val) = self.endoint_resolution_period {
proxy_config.endpoint_resolution_period_sec = Duration::from_secs((val as u64) * 60);
}
info!(
"Target DoH and auth server addresses are re-fetched every {:?} min via DoH itself or Bootsrap DNS",
"Nexthop nodes (DoH target or (MO)DoH next hop relay) and auth server addresses are re-resolved every {:?} min via DoH itself or Bootsrap DNS",
proxy_config.endpoint_resolution_period_sec.as_secs() / 60
);

/////////////////////////////
// health check period
if let Some(val) = self.healthcheck_period {
proxy_config.healthcheck_period_sec = Duration::from_secs((val as u64) * 60);
}
info!(
"Check for health of all possible path candidates and purge DNS cache every {:?} min",
proxy_config.healthcheck_period_sec.as_secs() / 60
);

/////////////////////////////
// cache size
if let Some(val) = self.max_cache_size {
Expand Down Expand Up @@ -115,7 +124,7 @@ impl TryInto<ProxyConfig> for &ConfigToml {
}
if let Some(val) = self.use_get_method {
if val {
proxy_config.target_config.doh_method = DoHMethod::Get;
proxy_config.target_config.use_get = true;
info!("Use GET method for query");
}
}
Expand Down
16 changes: 13 additions & 3 deletions dap-lib/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ pub const UDP_CHANNEL_CAPACITY: usize = 1024; // TODO: channelキャパシティ
pub const UDP_TIMEOUT_SEC: u64 = 10;
pub const TCP_LISTEN_BACKLOG: u32 = 1024;

pub const MAX_CONNECTIONS: usize = 128; // TODO: 最大接続数(UDP+TCP)めちゃ適当
/// Max connections via UPD and TCP (total) TODO: めちゃ適当
pub const MAX_CONNECTIONS: usize = 128;
/// Time out secs for HTTP requests
pub const HTTP_TIMEOUT_SEC: u64 = 10;

pub const MIN_TTL: u32 = 10; // TTL for overridden records (plugin)
Expand All @@ -28,9 +30,13 @@ pub const BOOTSTRAP_DNS_PORT: u16 = 53;
/// Endpoint resolution period in minutes
pub const ENDPOINT_RESOLUTION_PERIOD_MIN: u64 = 60;

/// Health check: Check for health of paths and purge cache for every 600 secs
pub const HEALTHCHECK_PERIOD_MIN: u64 = 10;

/// Default DoH target server
pub const DOH_TARGET_URL: &[&str] = &["https://dns.google/dns-query"];

/// Max cache size of DNS response messages
pub const MAX_CACHE_SIZE: usize = 16384;

///////////////////////////////
Expand Down Expand Up @@ -58,5 +64,9 @@ pub const TOKEN_RELOGIN_WAITING_SEC: u64 = 10;
/// relogin at most 5 times
pub const MAX_RELOGIN_ATTEMPTS: usize = 5;

pub const HEALTHCHECK_TARGET_FQDN: &str = "dns.google."; // client
pub const HEALTHCHECK_TARGET_ADDR: &str = "8.8.8.8"; // client
// Health check

/// Health check target FQDN
pub const HEALTHCHECK_TARGET_FQDN: &str = "dns.google.";
/// Health check target IP address for assertion
pub const HEALTHCHECK_TARGET_ADDR: &str = "8.8.8.8";
2 changes: 1 addition & 1 deletion dap-lib/src/doh_client/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ impl Cache {
res
}

/// Purge expired entries
/// Purges expired entries, returns the number of purged entries
pub async fn purge_expired_entries(&self) -> usize {
let lru_cache_clone = self.cache.lock().await.clone();
let expired = lru_cache_clone.iter().filter(|(_, v)| v.expired()).clone();
Expand Down
18 changes: 16 additions & 2 deletions dap-lib/src/doh_client/doh_client_healthcheck.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,23 @@ impl DoHClient {
Ok(())
}

/// Health check service periodically checks the health of the path and purge the cache
/// Health check service periodically executes
/// - health of every path;
/// - purge expired DNS cache
async fn healthcheck_service(&self) -> Result<()> {
//TODO:
// purge expired DNS cache
loop {
let cache_clone = self.cache.clone();
self.runtime_handle.spawn(async move {
let purged = cache_clone.purge_expired_entries().await;
debug!("Purged {} expired entries from cache", purged);
});

// TODO: health check for every path
// TODO: Health checkの時はキャッシュを無効化しないとダメなのでmake doh queryをいじる
tokio::time::sleep(self.healthcheck_period_sec).await;
}

Ok(())
}
}
22 changes: 20 additions & 2 deletions dap-lib/src/doh_client/doh_client_main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,17 @@ pub struct DoHClient {
/// odoh config store
odoh_configs: Option<Arc<ODoHConfigStore>>,
/// DNS cache
cache: Arc<Cache>,
pub(super) cache: Arc<Cache>,
/// DoH type
doh_type: DoHType,
/// DoH method
doh_method: DoHMethod,
/// base headers
headers: header::HeaderMap,
/// runtime handle
pub(super) runtime_handle: tokio::runtime::Handle,
/// health check interval
pub(super) healthcheck_period_sec: tokio::time::Duration,
}

impl DoHClient {
Expand Down Expand Up @@ -92,13 +96,25 @@ impl DoHClient {

// doh method
let doh_method = match doh_type {
DoHType::Standard => globals.proxy_config.target_config.doh_method.clone(),
DoHType::Standard => {
if globals.proxy_config.target_config.use_get {
DoHMethod::Get
} else {
DoHMethod::Post
}
}
DoHType::Oblivious => DoHMethod::Post,
};

// cache
let cache = Arc::new(Cache::new(globals.proxy_config.max_cache_size));

// runtime handle
let runtime_handle = globals.runtime_handle.clone();

// health check period
let healthcheck_period_sec = globals.proxy_config.healthcheck_period_sec;

Ok(Self {
http_client,
auth_client,
Expand All @@ -108,6 +124,8 @@ impl DoHClient {
doh_type,
doh_method,
headers,
runtime_handle,
healthcheck_period_sec,
})
}

Expand Down
2 changes: 2 additions & 0 deletions dap-lib/src/doh_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ mod path_manage;
pub use doh_client_main::DoHClient;

#[derive(PartialEq, Eq, Debug, Clone)]
/// DoH method, GET or POST
pub enum DoHMethod {
Get,
Post,
}

#[derive(Debug, Clone)]
/// DoH type, Standard or Oblivious
pub(super) enum DoHType {
Standard,
Oblivious,
Expand Down
10 changes: 7 additions & 3 deletions dap-lib/src/globals.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{constants::*, doh_client::DoHMethod};
use crate::constants::*;
use auth_client::AuthenticationConfig;
use std::{
net::{IpAddr, SocketAddr},
Expand Down Expand Up @@ -29,7 +29,10 @@ pub struct ProxyConfig {

/// bootstrap DNS
pub bootstrap_dns: BootstrapDns,
/// endpoint resolution period
pub endpoint_resolution_period_sec: Duration,
/// health check period
pub healthcheck_period_sec: Duration,

// udp and tcp proxy setting
pub udp_buffer_size: usize,
Expand Down Expand Up @@ -65,7 +68,7 @@ pub struct BootstrapDns {
#[derive(PartialEq, Eq, Debug, Clone)]
/// doh, odoh, modoh target settings
pub struct TargetConfig {
pub doh_method: DoHMethod,
pub use_get: bool,
pub doh_target_urls: Vec<Url>,
pub target_randomization: bool,
}
Expand All @@ -87,7 +90,7 @@ pub struct SubseqRelayConfig {
impl Default for TargetConfig {
fn default() -> Self {
Self {
doh_method: DoHMethod::Post,
use_get: false,
doh_target_urls: DOH_TARGET_URL.iter().map(|v| v.parse().unwrap()).collect(),
target_randomization: true,
}
Expand All @@ -106,6 +109,7 @@ impl Default for ProxyConfig {
port: BOOTSTRAP_DNS_PORT,
},
endpoint_resolution_period_sec: Duration::from_secs(ENDPOINT_RESOLUTION_PERIOD_MIN * 60),
healthcheck_period_sec: Duration::from_secs(HEALTHCHECK_PERIOD_MIN * 60),

udp_buffer_size: UDP_BUFFER_SIZE,
udp_channel_capacity: UDP_CHANNEL_CAPACITY,
Expand Down
12 changes: 6 additions & 6 deletions dap-lib/src/http_client/http_client_main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ pub struct HttpClient {
/// timeout for http request
timeout_sec: Duration,

/// rebootstrap period for endpoint ip resolution
rebootstrap_period_sec: Duration,
/// period for endpoint ip resolution, such as next hop relay
endpoint_resolution_period_sec: Duration,
}

impl HttpClient {
Expand All @@ -33,7 +33,7 @@ impl HttpClient {
timeout_sec: Duration,
default_headers: Option<&HeaderMap>,
resolver_ips: impl ResolveIps,
rebootstrap_period_sec: Duration,
endpoint_resolution_period_sec: Duration,
) -> Result<Self> {
let resolved_ips = resolve_ips(endpoints, resolver_ips).await?;
Ok(Self {
Expand All @@ -43,7 +43,7 @@ impl HttpClient {
default_headers: default_headers.cloned(),
timeout_sec,
endpoints: endpoints.to_vec(),
rebootstrap_period_sec,
endpoint_resolution_period_sec,
})
}

Expand All @@ -68,8 +68,8 @@ impl HttpClient {
}

/// Get rebootstrap period
pub fn rebootstrap_period_sec(&self) -> Duration {
self.rebootstrap_period_sec
pub fn endpoint_resolution_period_sec(&self) -> Duration {
self.endpoint_resolution_period_sec
}
}

Expand Down
2 changes: 1 addition & 1 deletion dap-lib/src/http_client/http_client_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ impl HttpClient {
) -> Result<()> {
let mut fail_cnt = 0;
loop {
sleep(self.rebootstrap_period_sec()).await;
sleep(self.endpoint_resolution_period_sec()).await;
let endpoints = self.endpoints();

let primary_res = resolve_ips(endpoints, primary_resolver.clone()).await;
Expand Down
46 changes: 29 additions & 17 deletions dap-lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@ use futures::{
use std::sync::Arc;

pub use auth_client::AuthenticationConfig;
pub use doh_client::DoHMethod;
pub use globals::{NextHopRelayConfig, ProxyConfig, SubseqRelayConfig, TargetConfig};

/// entrypoint of DoH w/ Auth Proxy
/// This spawns UDP and TCP listeners and spawns the following services
/// - Authentication refresh/re-login service loop (Done)
/// - HTTP client update service loop, changing DNS resolver to the self when it works (Done)
/// - Health check service checking every path, flag unreachable patterns as unhealthy (as individual service inside doh_client?),
/// which also needs ODoH config refresh.
pub async fn entrypoint(
proxy_config: &ProxyConfig,
runtime_handle: &tokio::runtime::Handle,
Expand Down Expand Up @@ -79,13 +83,9 @@ pub async fn entrypoint(
// build doh_client
let doh_client = Arc::new(DoHClient::new(globals.clone(), http_client.inner(), authenticator).await?);

// TODO: 3. spawn healthcheck for every possible path? too many?
// TODO: 4. cache purge service, simultaneously with healthcheck?
// TODO: 5. implement query plugins

// spawn endpoint ip update service with bootstrap dns resolver and doh_client
let doh_client_clone = doh_client.clone();
let term_notify_clone = term_notify;
let term_notify_clone = term_notify.clone();
let http_client_clone = http_client.clone();
let ip_resolution_service = runtime_handle.spawn(async move {
http_client_clone
Expand All @@ -94,6 +94,18 @@ pub async fn entrypoint(
.with_context(|| "endpoint ip update service got down")
});

// spawn health check service for checking every possible path and purging expired DNS cache
let doh_client_clone = doh_client.clone();
let term_notify_clone = term_notify.clone();
let healthcheck_service = runtime_handle.spawn(async move {
doh_client_clone
.start_healthcheck_service(term_notify_clone)
.await
.with_context(|| "health check service for path and dns cache got down")
});

// TODO: 5. implement query plugins

// Start proxy for each listen address
let addresses = globals.proxy_config.listen_addresses.clone();
let proxy_service = select_all(addresses.into_iter().map(|addr| {
Expand All @@ -104,14 +116,17 @@ pub async fn entrypoint(
// wait for all future
if let Some(auth_service) = auth_service {
select! {
_ = auth_service.fuse() => {
warn!("Auth service is down, or term notified");
}
_ = proxy_service.fuse() => {
warn!("Proxy services are down, or term notified");
},
_ = auth_service.fuse() => {
warn!("Auth services are down, or term notified");
}
_ = ip_resolution_service.fuse() => {
warn!("Ip resolution service are down, or term notified");
warn!("Ip resolution service is down, or term notified");
},
_ = healthcheck_service.fuse() => {
warn!("Health check service is down, or term notified");
}
}
} else {
Expand All @@ -120,16 +135,13 @@ pub async fn entrypoint(
warn!("Proxy services are down, or term notified");
},
_ = ip_resolution_service.fuse() => {
warn!("Ip resolution service are down, or term notified");
warn!("Ip resolution service is down, or term notified");
},
_ = healthcheck_service.fuse() => {
warn!("Health check service is down, or term notified");
}
}
}

// TODO: services
// - Authentication refresh/re-login service loop (Done)
// - HTTP client update service loop, changing DNS resolver to the self when it works (Done)
// - Health check service checking every path, flag unreachable patterns as unhealthy (as individual service inside doh_client?),
// which also needs ODoH config refresh.

Ok(())
}
Loading

0 comments on commit da3135e

Please sign in to comment.