Skip to content

Commit

Permalink
feat: implement udp and tcp proxies
Browse files Browse the repository at this point in the history
  • Loading branch information
junkurihara committed Oct 25, 2023
1 parent ebc1ca3 commit faea387
Show file tree
Hide file tree
Showing 17 changed files with 570 additions and 47 deletions.
7 changes: 1 addition & 6 deletions dap-lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,5 @@ trust-dns-resolver = { version = "0.23.2", default-features = false, features =

# authentication
auth-client = { git = "https://github.com/junkurihara/rust-token-server", package = "rust-token-server-client", branch = "develop" }
jwt-simple = "0.11.7"
chrono = "0.4.31"
serde = { version = "1.0.189", features = ["derive"] }
serde_json = "1.0.107"
p256 = { version = "0.13.2", features = ["jwk", "pem"] }
jwt-compact = { version = "0.8.0", features = ["p256", "ed25519-compact"] }
hot_reload = "0.1.4"
socket2 = "0.5.5"
8 changes: 7 additions & 1 deletion dap-lib/src/bootstrap.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
use crate::{error::*, globals::BootstrapDns, log::*, ResolveIpResponse, ResolveIps};
use crate::{
error::*,
globals::BootstrapDns,
log::*,
trait_resolve_ips::{ResolveIpResponse, ResolveIps},
};
use async_trait::async_trait;
use reqwest::Url;
use std::net::SocketAddr;
Expand All @@ -10,6 +15,7 @@ use trust_dns_resolver::{

/// stub resolver using bootstrap DNS resolver
pub struct BootstrapDnsResolver {
/// wrapper of trust-dns-resolver
pub inner: AsyncResolver<GenericConnector<TokioRuntimeProvider>>,
}

Expand Down
5 changes: 4 additions & 1 deletion dap-lib/src/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@
// Cannot override by config.toml
pub const UDP_BUFFER_SIZE: usize = 2048; // TODO: バッファサイズめちゃ適当
pub const UDP_CHANNEL_CAPACITY: usize = 1024; // TODO: channelキャパシティめちゃ適当
pub const UDP_TIMEOUT_SEC: u64 = 10;
pub const TCP_LISTEN_BACKLOG: u32 = 1024;

pub const MAX_CONNECTIONS: usize = 128; // TODO: 最大接続数(UDP+TCP)めちゃ適当
pub const TIMEOUT_SEC: u64 = 10;
pub const HTTP_TIMEOUT_SEC: u64 = 10;

pub const MIN_TTL: u32 = 10; // TTL for overridden records (plugin)

Expand Down
21 changes: 21 additions & 0 deletions dap-lib/src/doh_client/doh_client_main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
use crate::{error::*, globals::Globals, http_client::HttpClientInner};
use std::sync::Arc;
use tokio::sync::RwLock;

#[derive(Debug)]
/// DoH, ODoH, MODoH client
pub struct DoHClient {
inner: Arc<RwLock<HttpClientInner>>,
}

impl DoHClient {
/// Create a new DoH client
pub fn new(inner: Arc<RwLock<HttpClientInner>>) -> Self {
Self { inner }
}

/// Make DoH query
pub async fn make_doh_query(&self, packet_buf: &[u8], globals: &Arc<Globals>) -> Result<Vec<u8>> {
Ok(vec![])
}
}
4 changes: 4 additions & 0 deletions dap-lib/src/client/mod.rs → dap-lib/src/doh_client/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
mod doh_client_main;

pub use doh_client_main::DoHClient;

#[derive(PartialEq, Eq, Debug, Clone)]
pub enum DoHMethod {
Get,
Expand Down
13 changes: 13 additions & 0 deletions dap-lib/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub use anyhow::{anyhow, bail, ensure, Context};
use std::net::SocketAddr;
use thiserror::Error;

pub type Result<T> = std::result::Result<T, DapError>;
Expand All @@ -22,6 +23,18 @@ pub enum DapError {
HttpClientError(#[from] reqwest::Error),
#[error("HttpClient build error")]
HttpClientBuildError,
#[error("Io Error: {0}")]
Io(#[from] std::io::Error),
#[error("Null TCP stream")]
NullTcpStream,
#[error("Udp channel send error")]
UdpChannelSendError(#[from] tokio::sync::mpsc::error::SendError<(Vec<u8>, SocketAddr)>),
#[error("Invalid DNS response size")]
InvalidDnsResponseSize,
#[error("Too many connections")]
TooManyConnections,
#[error("Failed to make DoH query")]
FailedToMakeDohQuery,

#[error(transparent)]
Other(#[from] anyhow::Error),
Expand Down
40 changes: 30 additions & 10 deletions dap-lib/src/globals.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,34 @@
use crate::{client::DoHMethod, constants::*, http_client::HttpClient};
use crate::{
constants::*,
doh_client::{DoHClient, DoHMethod},
http_client::HttpClient,
};
use auth_client::AuthenticationConfig;
use std::{
net::{IpAddr, SocketAddr},
sync::{Arc, RwLock},
sync::Arc,
};
use tokio::{
sync::{Notify, RwLock},
time::Duration,
};
use tokio::{sync::Notify, time::Duration};
use url::Url;

#[derive(Debug, Clone)]
#[derive(Debug)]
/// Global objects containing shared resources
pub struct Globals {
// pub cache: Arc<Cache>,
// pub counter: ConnCounter,
pub http_client: Arc<RwLock<HttpClient>>,
/// HTTP client shared by DoH client and authentication client, etc.
pub http_client: Arc<HttpClient>,

/// proxy configuration
pub proxy_config: ProxyConfig,

/// tokio runtime handler
pub runtime_handle: tokio::runtime::Handle,

/// notifier for termination at spawned tokio tasks
pub term_notify: Option<Arc<Notify>>,
// pub cache: Arc<Cache>,
}

#[derive(PartialEq, Eq, Debug, Clone)]
Expand All @@ -27,10 +40,14 @@ pub struct ProxyConfig {
/// bootstrap DNS
pub bootstrap_dns: BootstrapDns,

// udp proxy setting
// udp and tcp proxy setting
pub udp_buffer_size: usize,
pub udp_channel_capacity: usize,
pub timeout_sec: Duration,
pub udp_timeout_sec: Duration,
pub tcp_listen_backlog: u32,

/// timeout for HTTP requests (DoH, ODoH, and authentication requests)
pub http_timeout_sec: Duration,

// doh, odoh, modoh target settings
pub target_config: TargetConfig,
Expand Down Expand Up @@ -103,7 +120,10 @@ impl Default for ProxyConfig {

udp_buffer_size: UDP_BUFFER_SIZE,
udp_channel_capacity: UDP_CHANNEL_CAPACITY,
timeout_sec: Duration::from_secs(TIMEOUT_SEC),
udp_timeout_sec: Duration::from_secs(UDP_TIMEOUT_SEC),
tcp_listen_backlog: TCP_LISTEN_BACKLOG,

http_timeout_sec: Duration::from_secs(HTTP_TIMEOUT_SEC),

target_config: TargetConfig::default(),
nexthop_relay_config: None,
Expand Down
5 changes: 4 additions & 1 deletion dap-lib/src/http_client/http_client_main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use std::sync::Arc;

use crate::{error::*, ResolveIpResponse, ResolveIps};
use crate::{
error::*,
trait_resolve_ips::{ResolveIpResponse, ResolveIps},
};
use futures::future::join_all;
use reqwest::{header::HeaderMap, Client, IntoUrl, RequestBuilder, Url};
use tokio::{sync::RwLock, time::Duration};
Expand Down
2 changes: 1 addition & 1 deletion dap-lib/src/http_client/http_client_service.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::HttpClient;
use crate::{error::*, log::*, ResolveIps};
use crate::{error::*, log::*, trait_resolve_ips::ResolveIps};
use std::sync::Arc;

impl HttpClient {
Expand Down
61 changes: 34 additions & 27 deletions dap-lib/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,33 +1,23 @@
mod auth;
mod bootstrap;
mod client;
mod constants;
mod doh_client;
mod error;
mod globals;
mod http_client;
mod log;
mod proxy;
mod trait_resolve_ips;

use crate::{error::*, globals::Globals, http_client::HttpClient, log::info};
use async_trait::async_trait;
use std::{net::SocketAddr, sync::Arc};
use url::Url;
use crate::{doh_client::DoHClient, error::*, globals::Globals, http_client::HttpClient, log::*, proxy::Proxy};
use futures::future::select_all;
use std::sync::Arc;

pub use auth_client::AuthenticationConfig;
pub use client::DoHMethod;
pub use doh_client::DoHMethod;
pub use globals::{NextHopRelayConfig, ProxyConfig, SubseqRelayConfig, TargetConfig};

#[async_trait]
/// Trait that resolves ip addresses from a given url.
/// This will be used both for bootstrap DNS resolver and MODoH resolver itself.
pub trait ResolveIps {
async fn resolve_ips(&self, target_url: &Url) -> Result<ResolveIpResponse>;
}
pub struct ResolveIpResponse {
pub hostname: String,
pub addresses: Vec<SocketAddr>,
}

/// entrypoint of DoH w/ Auth Proxy
pub async fn entrypoint(
proxy_config: &ProxyConfig,
runtime_handle: &tokio::runtime::Handle,
Expand All @@ -51,36 +41,53 @@ pub async fn entrypoint(
}
let http_client = HttpClient::new(
&endpoint_candidates,
proxy_config.timeout_sec,
proxy_config.http_timeout_sec,
None,
bootstrap_dns_resolver,
)
.await?;

// spawn authentication service
let term_notify_clone = term_notify.clone();
if let Some(auth_config) = &proxy_config.authentication_config {
let authenticator = auth::Authenticator::new(auth_config, http_client.inner()).await?;
runtime_handle.spawn(async move {
authenticator
.start_service(term_notify.clone())
.start_service(term_notify_clone)
.await
.with_context(|| "auth service got down")
});
}

tokio::time::sleep(tokio::time::Duration::from_secs(600)).await;

// TODO: services
// - Authentication refresh/re-login service loop (Done)
// - HTTP client update service loop, changing DNS resolver to the self when it works
// - Health check service checking every path, flag unreachable patterns as unhealthy

// // build global
// let globals = Arc::new(Globals {
// proxy_config: proxy_config.clone(),
// runtime_handle: runtime_handle.clone(),
// term_notify: term_notify.clone(),
// });
// build doh_client
let doh_client = Arc::new(DoHClient::new(http_client.inner()));

// TODO: doh_clientにResolveIps traitを実装、http client ip updateサービスをここでspawn

// build global
let globals = Arc::new(Globals {
http_client: Arc::new(http_client),
proxy_config: proxy_config.clone(),
runtime_handle: runtime_handle.clone(),
term_notify,
});

// Start proxy for each listen address
let addresses = globals.proxy_config.listen_addresses.clone();
let futures = select_all(addresses.into_iter().map(|addr| {
let proxy = Proxy::new(globals.clone(), &addr, &doh_client);
globals.runtime_handle.spawn(proxy.start())
}));

// wait for all future
if let (Ok(Err(e)), _, _) = futures.await {
error!("Some proxy services are down: {:?}", e);
};

Ok(())
}
93 changes: 93 additions & 0 deletions dap-lib/src/proxy/counter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
use crate::log::*;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

#[derive(Debug, Clone)]
pub enum CounterType {
Tcp,
Udp,
}
impl CounterType {
fn as_str(&self) -> &'static str {
match self {
CounterType::Tcp => "TCP",
CounterType::Udp => "UDP",
}
}
}

#[derive(Debug, Clone, Default)]
/// Connection counter
pub struct ConnCounter {
pub cnt_total: Arc<AtomicUsize>,
pub cnt_udp: Arc<AtomicUsize>,
pub cnt_tcp: Arc<AtomicUsize>,
}

impl ConnCounter {
pub fn get_current_total(&self) -> usize {
self.cnt_total.load(Ordering::Relaxed)
}

pub fn get_current(&self, ctype: CounterType) -> usize {
match ctype {
CounterType::Tcp => self.cnt_tcp.load(Ordering::Relaxed),
CounterType::Udp => self.cnt_udp.load(Ordering::Relaxed),
}
}

pub fn increment(&self, ctype: CounterType) -> usize {
self.cnt_total.fetch_add(1, Ordering::Relaxed);
let c = match ctype {
CounterType::Tcp => self.cnt_tcp.fetch_add(1, Ordering::Relaxed),
CounterType::Udp => self.cnt_udp.fetch_add(1, Ordering::Relaxed),
};

debug!(
"{} connection count++: {} (total = {})",
&ctype.as_str(),
self.get_current(ctype),
self.get_current_total()
);
c
}

pub fn decrement(&self, ctype: CounterType) {
let cnt;
match ctype {
CounterType::Tcp => {
let res = {
cnt = self.cnt_tcp.load(Ordering::Relaxed);
cnt > 0
&& self
.cnt_tcp
.compare_exchange(cnt, cnt - 1, Ordering::Relaxed, Ordering::Relaxed)
!= Ok(cnt)
};
if res {}
}
CounterType::Udp => {
let res = {
cnt = self.cnt_udp.load(Ordering::Relaxed);
cnt > 0
&& self
.cnt_udp
.compare_exchange(cnt, cnt - 1, Ordering::Relaxed, Ordering::Relaxed)
!= Ok(cnt)
};
if res {}
}
};
self.cnt_total.store(
self.cnt_udp.load(Ordering::Relaxed) + self.cnt_tcp.load(Ordering::Relaxed),
Ordering::Relaxed,
);

debug!(
"{} connection count--: {} (total = {})",
&ctype.as_str(),
self.get_current(ctype),
self.get_current_total()
);
}
}
7 changes: 7 additions & 0 deletions dap-lib/src/proxy/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
mod counter;
mod proxy_main;
mod proxy_tcp;
mod proxy_udp;
mod socket;

pub use proxy_main::Proxy;
Loading

0 comments on commit faea387

Please sign in to comment.