From faea387f278fd0472c608c65e36fb36a5ad60225 Mon Sep 17 00:00:00 2001 From: Jun Kurihara Date: Wed, 25 Oct 2023 13:11:55 +0900 Subject: [PATCH] feat: implement udp and tcp proxies --- dap-lib/Cargo.toml | 7 +- dap-lib/src/bootstrap.rs | 8 +- dap-lib/src/constants.rs | 5 +- dap-lib/src/doh_client/doh_client_main.rs | 21 +++ dap-lib/src/{client => doh_client}/mod.rs | 4 + dap-lib/src/error.rs | 13 ++ dap-lib/src/globals.rs | 40 +++-- dap-lib/src/http_client/http_client_main.rs | 5 +- .../src/http_client/http_client_service.rs | 2 +- dap-lib/src/lib.rs | 61 ++++---- dap-lib/src/proxy/counter.rs | 93 +++++++++++ dap-lib/src/proxy/mod.rs | 7 + dap-lib/src/proxy/proxy_main.rs | 54 +++++++ dap-lib/src/proxy/proxy_tcp.rs | 90 +++++++++++ dap-lib/src/proxy/proxy_udp.rs | 148 ++++++++++++++++++ dap-lib/src/proxy/socket.rs | 41 +++++ dap-lib/src/trait_resolve_ips.rs | 18 +++ 17 files changed, 570 insertions(+), 47 deletions(-) create mode 100644 dap-lib/src/doh_client/doh_client_main.rs rename dap-lib/src/{client => doh_client}/mod.rs (56%) create mode 100644 dap-lib/src/proxy/counter.rs create mode 100644 dap-lib/src/proxy/proxy_main.rs create mode 100644 dap-lib/src/proxy/proxy_tcp.rs create mode 100644 dap-lib/src/proxy/proxy_udp.rs create mode 100644 dap-lib/src/proxy/socket.rs create mode 100644 dap-lib/src/trait_resolve_ips.rs diff --git a/dap-lib/Cargo.toml b/dap-lib/Cargo.toml index 32af155..62ee936 100644 --- a/dap-lib/Cargo.toml +++ b/dap-lib/Cargo.toml @@ -61,10 +61,5 @@ trust-dns-resolver = { version = "0.23.2", default-features = false, features = # authentication auth-client = { git = "https://github.com/junkurihara/rust-token-server", package = "rust-token-server-client", branch = "develop" } -jwt-simple = "0.11.7" -chrono = "0.4.31" serde = { version = "1.0.189", features = ["derive"] } -serde_json = "1.0.107" -p256 = { version = "0.13.2", features = ["jwk", "pem"] } -jwt-compact = { version = "0.8.0", features = ["p256", "ed25519-compact"] } -hot_reload = "0.1.4" +socket2 = "0.5.5" diff --git a/dap-lib/src/bootstrap.rs b/dap-lib/src/bootstrap.rs index 268eba0..f3aa975 100644 --- a/dap-lib/src/bootstrap.rs +++ b/dap-lib/src/bootstrap.rs @@ -1,4 +1,9 @@ -use crate::{error::*, globals::BootstrapDns, log::*, ResolveIpResponse, ResolveIps}; +use crate::{ + error::*, + globals::BootstrapDns, + log::*, + trait_resolve_ips::{ResolveIpResponse, ResolveIps}, +}; use async_trait::async_trait; use reqwest::Url; use std::net::SocketAddr; @@ -10,6 +15,7 @@ use trust_dns_resolver::{ /// stub resolver using bootstrap DNS resolver pub struct BootstrapDnsResolver { + /// wrapper of trust-dns-resolver pub inner: AsyncResolver>, } diff --git a/dap-lib/src/constants.rs b/dap-lib/src/constants.rs index 8266822..5cd02e9 100644 --- a/dap-lib/src/constants.rs +++ b/dap-lib/src/constants.rs @@ -4,8 +4,11 @@ // Cannot override by config.toml pub const UDP_BUFFER_SIZE: usize = 2048; // TODO: バッファサイズめちゃ適当 pub const UDP_CHANNEL_CAPACITY: usize = 1024; // TODO: channelキャパシティめちゃ適当 +pub const UDP_TIMEOUT_SEC: u64 = 10; +pub const TCP_LISTEN_BACKLOG: u32 = 1024; + pub const MAX_CONNECTIONS: usize = 128; // TODO: 最大接続数(UDP+TCP)めちゃ適当 -pub const TIMEOUT_SEC: u64 = 10; +pub const HTTP_TIMEOUT_SEC: u64 = 10; pub const MIN_TTL: u32 = 10; // TTL for overridden records (plugin) diff --git a/dap-lib/src/doh_client/doh_client_main.rs b/dap-lib/src/doh_client/doh_client_main.rs new file mode 100644 index 0000000..d092e87 --- /dev/null +++ b/dap-lib/src/doh_client/doh_client_main.rs @@ -0,0 +1,21 @@ +use crate::{error::*, globals::Globals, http_client::HttpClientInner}; +use std::sync::Arc; +use tokio::sync::RwLock; + +#[derive(Debug)] +/// DoH, ODoH, MODoH client +pub struct DoHClient { + inner: Arc>, +} + +impl DoHClient { + /// Create a new DoH client + pub fn new(inner: Arc>) -> Self { + Self { inner } + } + + /// Make DoH query + pub async fn make_doh_query(&self, packet_buf: &[u8], globals: &Arc) -> Result> { + Ok(vec![]) + } +} diff --git a/dap-lib/src/client/mod.rs b/dap-lib/src/doh_client/mod.rs similarity index 56% rename from dap-lib/src/client/mod.rs rename to dap-lib/src/doh_client/mod.rs index 3ea628b..91e4cbc 100644 --- a/dap-lib/src/client/mod.rs +++ b/dap-lib/src/doh_client/mod.rs @@ -1,3 +1,7 @@ +mod doh_client_main; + +pub use doh_client_main::DoHClient; + #[derive(PartialEq, Eq, Debug, Clone)] pub enum DoHMethod { Get, diff --git a/dap-lib/src/error.rs b/dap-lib/src/error.rs index 52d2bcf..ae119a4 100644 --- a/dap-lib/src/error.rs +++ b/dap-lib/src/error.rs @@ -1,4 +1,5 @@ pub use anyhow::{anyhow, bail, ensure, Context}; +use std::net::SocketAddr; use thiserror::Error; pub type Result = std::result::Result; @@ -22,6 +23,18 @@ pub enum DapError { HttpClientError(#[from] reqwest::Error), #[error("HttpClient build error")] HttpClientBuildError, + #[error("Io Error: {0}")] + Io(#[from] std::io::Error), + #[error("Null TCP stream")] + NullTcpStream, + #[error("Udp channel send error")] + UdpChannelSendError(#[from] tokio::sync::mpsc::error::SendError<(Vec, SocketAddr)>), + #[error("Invalid DNS response size")] + InvalidDnsResponseSize, + #[error("Too many connections")] + TooManyConnections, + #[error("Failed to make DoH query")] + FailedToMakeDohQuery, #[error(transparent)] Other(#[from] anyhow::Error), diff --git a/dap-lib/src/globals.rs b/dap-lib/src/globals.rs index 1aa6900..24ee878 100644 --- a/dap-lib/src/globals.rs +++ b/dap-lib/src/globals.rs @@ -1,21 +1,34 @@ -use crate::{client::DoHMethod, constants::*, http_client::HttpClient}; +use crate::{ + constants::*, + doh_client::{DoHClient, DoHMethod}, + http_client::HttpClient, +}; use auth_client::AuthenticationConfig; use std::{ net::{IpAddr, SocketAddr}, - sync::{Arc, RwLock}, + sync::Arc, +}; +use tokio::{ + sync::{Notify, RwLock}, + time::Duration, }; -use tokio::{sync::Notify, time::Duration}; use url::Url; -#[derive(Debug, Clone)] +#[derive(Debug)] +/// Global objects containing shared resources pub struct Globals { - // pub cache: Arc, - // pub counter: ConnCounter, - pub http_client: Arc>, + /// HTTP client shared by DoH client and authentication client, etc. + pub http_client: Arc, + /// proxy configuration pub proxy_config: ProxyConfig, + + /// tokio runtime handler pub runtime_handle: tokio::runtime::Handle, + + /// notifier for termination at spawned tokio tasks pub term_notify: Option>, + // pub cache: Arc, } #[derive(PartialEq, Eq, Debug, Clone)] @@ -27,10 +40,14 @@ pub struct ProxyConfig { /// bootstrap DNS pub bootstrap_dns: BootstrapDns, - // udp proxy setting + // udp and tcp proxy setting pub udp_buffer_size: usize, pub udp_channel_capacity: usize, - pub timeout_sec: Duration, + pub udp_timeout_sec: Duration, + pub tcp_listen_backlog: u32, + + /// timeout for HTTP requests (DoH, ODoH, and authentication requests) + pub http_timeout_sec: Duration, // doh, odoh, modoh target settings pub target_config: TargetConfig, @@ -103,7 +120,10 @@ impl Default for ProxyConfig { udp_buffer_size: UDP_BUFFER_SIZE, udp_channel_capacity: UDP_CHANNEL_CAPACITY, - timeout_sec: Duration::from_secs(TIMEOUT_SEC), + udp_timeout_sec: Duration::from_secs(UDP_TIMEOUT_SEC), + tcp_listen_backlog: TCP_LISTEN_BACKLOG, + + http_timeout_sec: Duration::from_secs(HTTP_TIMEOUT_SEC), target_config: TargetConfig::default(), nexthop_relay_config: None, diff --git a/dap-lib/src/http_client/http_client_main.rs b/dap-lib/src/http_client/http_client_main.rs index 91fcfc9..9f5865a 100644 --- a/dap-lib/src/http_client/http_client_main.rs +++ b/dap-lib/src/http_client/http_client_main.rs @@ -1,6 +1,9 @@ use std::sync::Arc; -use crate::{error::*, ResolveIpResponse, ResolveIps}; +use crate::{ + error::*, + trait_resolve_ips::{ResolveIpResponse, ResolveIps}, +}; use futures::future::join_all; use reqwest::{header::HeaderMap, Client, IntoUrl, RequestBuilder, Url}; use tokio::{sync::RwLock, time::Duration}; diff --git a/dap-lib/src/http_client/http_client_service.rs b/dap-lib/src/http_client/http_client_service.rs index 74a470b..96492e4 100644 --- a/dap-lib/src/http_client/http_client_service.rs +++ b/dap-lib/src/http_client/http_client_service.rs @@ -1,5 +1,5 @@ use super::HttpClient; -use crate::{error::*, log::*, ResolveIps}; +use crate::{error::*, log::*, trait_resolve_ips::ResolveIps}; use std::sync::Arc; impl HttpClient { diff --git a/dap-lib/src/lib.rs b/dap-lib/src/lib.rs index 07773a3..e71bc1f 100644 --- a/dap-lib/src/lib.rs +++ b/dap-lib/src/lib.rs @@ -1,33 +1,23 @@ mod auth; mod bootstrap; -mod client; mod constants; +mod doh_client; mod error; mod globals; mod http_client; mod log; mod proxy; +mod trait_resolve_ips; -use crate::{error::*, globals::Globals, http_client::HttpClient, log::info}; -use async_trait::async_trait; -use std::{net::SocketAddr, sync::Arc}; -use url::Url; +use crate::{doh_client::DoHClient, error::*, globals::Globals, http_client::HttpClient, log::*, proxy::Proxy}; +use futures::future::select_all; +use std::sync::Arc; pub use auth_client::AuthenticationConfig; -pub use client::DoHMethod; +pub use doh_client::DoHMethod; pub use globals::{NextHopRelayConfig, ProxyConfig, SubseqRelayConfig, TargetConfig}; -#[async_trait] -/// Trait that resolves ip addresses from a given url. -/// This will be used both for bootstrap DNS resolver and MODoH resolver itself. -pub trait ResolveIps { - async fn resolve_ips(&self, target_url: &Url) -> Result; -} -pub struct ResolveIpResponse { - pub hostname: String, - pub addresses: Vec, -} - +/// entrypoint of DoH w/ Auth Proxy pub async fn entrypoint( proxy_config: &ProxyConfig, runtime_handle: &tokio::runtime::Handle, @@ -51,36 +41,53 @@ pub async fn entrypoint( } let http_client = HttpClient::new( &endpoint_candidates, - proxy_config.timeout_sec, + proxy_config.http_timeout_sec, None, bootstrap_dns_resolver, ) .await?; // spawn authentication service + let term_notify_clone = term_notify.clone(); if let Some(auth_config) = &proxy_config.authentication_config { let authenticator = auth::Authenticator::new(auth_config, http_client.inner()).await?; runtime_handle.spawn(async move { authenticator - .start_service(term_notify.clone()) + .start_service(term_notify_clone) .await .with_context(|| "auth service got down") }); } - tokio::time::sleep(tokio::time::Duration::from_secs(600)).await; - // TODO: services // - Authentication refresh/re-login service loop (Done) // - HTTP client update service loop, changing DNS resolver to the self when it works // - Health check service checking every path, flag unreachable patterns as unhealthy - // // build global - // let globals = Arc::new(Globals { - // proxy_config: proxy_config.clone(), - // runtime_handle: runtime_handle.clone(), - // term_notify: term_notify.clone(), - // }); + // build doh_client + let doh_client = Arc::new(DoHClient::new(http_client.inner())); + + // TODO: doh_clientにResolveIps traitを実装、http client ip updateサービスをここでspawn + + // build global + let globals = Arc::new(Globals { + http_client: Arc::new(http_client), + proxy_config: proxy_config.clone(), + runtime_handle: runtime_handle.clone(), + term_notify, + }); + + // Start proxy for each listen address + let addresses = globals.proxy_config.listen_addresses.clone(); + let futures = select_all(addresses.into_iter().map(|addr| { + let proxy = Proxy::new(globals.clone(), &addr, &doh_client); + globals.runtime_handle.spawn(proxy.start()) + })); + + // wait for all future + if let (Ok(Err(e)), _, _) = futures.await { + error!("Some proxy services are down: {:?}", e); + }; Ok(()) } diff --git a/dap-lib/src/proxy/counter.rs b/dap-lib/src/proxy/counter.rs new file mode 100644 index 0000000..df69554 --- /dev/null +++ b/dap-lib/src/proxy/counter.rs @@ -0,0 +1,93 @@ +use crate::log::*; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; + +#[derive(Debug, Clone)] +pub enum CounterType { + Tcp, + Udp, +} +impl CounterType { + fn as_str(&self) -> &'static str { + match self { + CounterType::Tcp => "TCP", + CounterType::Udp => "UDP", + } + } +} + +#[derive(Debug, Clone, Default)] +/// Connection counter +pub struct ConnCounter { + pub cnt_total: Arc, + pub cnt_udp: Arc, + pub cnt_tcp: Arc, +} + +impl ConnCounter { + pub fn get_current_total(&self) -> usize { + self.cnt_total.load(Ordering::Relaxed) + } + + pub fn get_current(&self, ctype: CounterType) -> usize { + match ctype { + CounterType::Tcp => self.cnt_tcp.load(Ordering::Relaxed), + CounterType::Udp => self.cnt_udp.load(Ordering::Relaxed), + } + } + + pub fn increment(&self, ctype: CounterType) -> usize { + self.cnt_total.fetch_add(1, Ordering::Relaxed); + let c = match ctype { + CounterType::Tcp => self.cnt_tcp.fetch_add(1, Ordering::Relaxed), + CounterType::Udp => self.cnt_udp.fetch_add(1, Ordering::Relaxed), + }; + + debug!( + "{} connection count++: {} (total = {})", + &ctype.as_str(), + self.get_current(ctype), + self.get_current_total() + ); + c + } + + pub fn decrement(&self, ctype: CounterType) { + let cnt; + match ctype { + CounterType::Tcp => { + let res = { + cnt = self.cnt_tcp.load(Ordering::Relaxed); + cnt > 0 + && self + .cnt_tcp + .compare_exchange(cnt, cnt - 1, Ordering::Relaxed, Ordering::Relaxed) + != Ok(cnt) + }; + if res {} + } + CounterType::Udp => { + let res = { + cnt = self.cnt_udp.load(Ordering::Relaxed); + cnt > 0 + && self + .cnt_udp + .compare_exchange(cnt, cnt - 1, Ordering::Relaxed, Ordering::Relaxed) + != Ok(cnt) + }; + if res {} + } + }; + self.cnt_total.store( + self.cnt_udp.load(Ordering::Relaxed) + self.cnt_tcp.load(Ordering::Relaxed), + Ordering::Relaxed, + ); + + debug!( + "{} connection count--: {} (total = {})", + &ctype.as_str(), + self.get_current(ctype), + self.get_current_total() + ); + } +} diff --git a/dap-lib/src/proxy/mod.rs b/dap-lib/src/proxy/mod.rs index e69de29..f34f836 100644 --- a/dap-lib/src/proxy/mod.rs +++ b/dap-lib/src/proxy/mod.rs @@ -0,0 +1,7 @@ +mod counter; +mod proxy_main; +mod proxy_tcp; +mod proxy_udp; +mod socket; + +pub use proxy_main::Proxy; diff --git a/dap-lib/src/proxy/proxy_main.rs b/dap-lib/src/proxy/proxy_main.rs new file mode 100644 index 0000000..f8c31dc --- /dev/null +++ b/dap-lib/src/proxy/proxy_main.rs @@ -0,0 +1,54 @@ +use super::counter::ConnCounter; +use crate::{doh_client::DoHClient, error::*, globals::Globals, log::*}; +use std::{net::SocketAddr, sync::Arc}; + +/// Proxy object serving UDP and TCP queries +#[derive(Clone)] +pub struct Proxy { + pub(super) globals: Arc, + pub(super) counter: ConnCounter, + pub(super) doh_client: Arc, + pub(super) listening_on: SocketAddr, +} + +impl Proxy { + /// Create a new proxy object + pub fn new(globals: Arc, listening_on: &SocketAddr, doh_client: &Arc) -> Self { + Self { + globals, + counter: ConnCounter::default(), + doh_client: doh_client.clone(), + listening_on: *listening_on, + } + } + /// Start proxy for single port + pub async fn start(self) -> Result<()> { + let term_notify = self.globals.term_notify.clone(); + match term_notify { + Some(term) => { + tokio::select! { + _ = self.clone().start_udp_listener() => { + warn!("UDP listener service got down"); + } + _ = self.start_tcp_listener() => { + warn!("TCP listener service got down"); + } + _ = term.notified() => { + info!("Proxy receives term signal"); + } + } + } + None => { + tokio::select! { + _ = self.clone().start_udp_listener() => { + warn!("UDP listener service got down"); + } + _ = self.start_tcp_listener() => { + warn!("TCP listener service got down"); + } + } + } + } + Ok(()) + } +} diff --git a/dap-lib/src/proxy/proxy_tcp.rs b/dap-lib/src/proxy/proxy_tcp.rs new file mode 100644 index 0000000..73c12dd --- /dev/null +++ b/dap-lib/src/proxy/proxy_tcp.rs @@ -0,0 +1,90 @@ +use super::{counter::CounterType, proxy_main::Proxy, socket::bind_tcp_socket}; +use crate::{error::*, log::*}; +use std::net::SocketAddr; +use tokio::{ + io::{AsyncReadExt, AsyncWriteExt}, + net::TcpStream, +}; + +impl Proxy { + /// Start TCP listener + pub async fn start_tcp_listener(self) -> Result<()> { + let tcp_socket = bind_tcp_socket(&self.listening_on)?; + let tcp_listener = tcp_socket.listen(self.globals.proxy_config.tcp_listen_backlog)?; + info!("Listening on TCP: {:?}", tcp_listener.local_addr()?); + + // receive from src + let tcp_listener_service = async { + loop { + let (stream, src_addr) = match tcp_listener.accept().await { + Err(e) => { + error!("Error in TCP listener: {}", e); + continue; + } + Ok(res) => res, + }; + let self_clone = self.clone(); + self.globals.runtime_handle.spawn(async move { + if let Err(e) = self_clone.serve_tcp_query(stream, src_addr).await { + error!("Failed to handle TCP query: {}", e); + } + }); + } + }; + tcp_listener_service.await; + + Ok(()) + } + + /// Serve TCP query + pub async fn serve_tcp_query(self, mut stream: TcpStream, src_addr: SocketAddr) -> Result<()> { + debug!("handle tcp query from {:?}", src_addr); + let counter = self.counter.clone(); + if counter.increment(CounterType::Tcp) >= self.globals.proxy_config.max_connections { + error!( + "Too many connections: max = {} (udp+tcp)", + self.globals.proxy_config.max_connections + ); + counter.decrement(CounterType::Tcp); + return Err(DapError::TooManyConnections); + } + // let doh_client = self.context.get_random_client().await?; + + // read data from stream + // first 2bytes indicates the length of dns message following from the 3rd byte + let mut length_buf = [0u8; 2]; + stream.read_exact(&mut length_buf).await?; + let msg_length = u16::from_be_bytes(length_buf) as usize; + if msg_length == 0 { + return Err(DapError::NullTcpStream); + } + let mut packet_buf = vec![0u8; msg_length]; + stream.read_exact(&mut packet_buf).await?; + + // make DoH query + let res = tokio::time::timeout( + self.globals.proxy_config.http_timeout_sec + std::time::Duration::from_secs(1), + // serve tcp dns message here + self.doh_client.make_doh_query(&packet_buf, &self.globals), + ) + .await + .ok(); + // debug!("response from DoH server: {:?}", res); + + // send response via stream + counter.decrement(CounterType::Tcp); // decrement counter anyways + + if let Some(Ok(r)) = res { + if r.len() > (u16::MAX as usize) { + return Err(DapError::InvalidDnsResponseSize); + } + let length_buf = u16::to_be_bytes(r.len() as u16); + stream.write_all(&length_buf).await?; + stream.write_all(&r).await?; + } else { + return Err(DapError::FailedToMakeDohQuery); + } + + Ok(()) + } +} diff --git a/dap-lib/src/proxy/proxy_udp.rs b/dap-lib/src/proxy/proxy_udp.rs new file mode 100644 index 0000000..8453e4e --- /dev/null +++ b/dap-lib/src/proxy/proxy_udp.rs @@ -0,0 +1,148 @@ +use super::{counter::CounterType, proxy_main::Proxy, socket::bind_udp_socket}; +use crate::{error::*, log::*}; +use std::{ + net::{SocketAddr, UdpSocket}, + sync::Arc, +}; +use tokio::{ + sync::{mpsc, Notify}, + time::Duration, +}; + +impl Proxy { + /// Start UDP listener + pub async fn start_udp_listener(self) -> Result<()> { + // setup a channel for sending out responses + let (channel_sender, channel_receiver) = + mpsc::channel::<(Vec, SocketAddr)>(self.globals.proxy_config.udp_channel_capacity); + + let udp_socket = bind_udp_socket(&self.listening_on)?; + info!("Listening on UDP: {:?}", udp_socket.local_addr()?); + + let socket_sender = Arc::new(udp_socket); + let socket_receiver = socket_sender.clone(); + + // create sender thread that sends out response given through channel + self.globals.runtime_handle.spawn(Self::udp_responder_service( + socket_sender, + channel_receiver, + self.globals.term_notify.clone(), + )); + + // Setup buffer + let mut udp_buf = vec![0u8; self.globals.proxy_config.udp_buffer_size]; + + // receive from src + let udp_socket_service = async { + loop { + let (buf_size, src_addr) = match socket_receiver.recv_from(&mut udp_buf) { + Err(e) => { + error!("Error in UDP listener: {}", e); + continue; + } + Ok(res) => res, + }; + + let packet_buf = udp_buf[..buf_size].to_vec(); + let self_clone = self.clone(); + let channel_sender_clone = channel_sender.clone(); + self.globals.runtime_handle.spawn(async move { + if let Err(e) = self_clone + .serve_udp_query(packet_buf, src_addr, channel_sender_clone) + .await + { + error!("Failed to handle UDP query: {}", e); + } + }); + } + }; + udp_socket_service.await; + + Ok(()) + } + + /// Send response to source client + async fn udp_responder_service( + socket_sender: Arc, + mut channel_receiver: mpsc::Receiver<(Vec, std::net::SocketAddr)>, + term_notify: Option>, + ) { + let service = async { + loop { + let (bytes, addr) = match channel_receiver.recv().await { + None => { + error!("udp channel_receiver.recv()"); + continue; + } + Some(res) => res, + }; + match &socket_sender.send_to(&bytes, addr) { + Ok(len) => { + debug!("send_to source with response of {:?} bytes", len); + } + Err(e) => { + error!("send_to error: {:?}", e); + } + }; + } + }; + + match term_notify { + Some(term) => { + tokio::select! { + _ = service => { + warn!("Udp responder service got down"); + } + _ = term.notified() => { + info!("Udp responder service receives term signal"); + } + } + } + None => { + service.await; + warn!("Auth service got down"); + } + } + } + + /// Serve UDP query from source client + async fn serve_udp_query( + self, + packet_buf: Vec, + src_addr: SocketAddr, + res_sender: mpsc::Sender<(Vec, SocketAddr)>, + ) -> Result<()> { + debug!("handle udp query from {:?}", src_addr); + let counter = self.counter.clone(); + if counter.increment(CounterType::Udp) >= self.globals.proxy_config.max_connections { + error!( + "Too many connections: max = {} (udp+tcp)", + self.globals.proxy_config.max_connections + ); + counter.decrement(CounterType::Udp); + return Err(DapError::TooManyConnections); + } + + let res = tokio::time::timeout( + self.globals.proxy_config.http_timeout_sec + Duration::from_secs(1), + // serve udp dns message here + self.doh_client.make_doh_query(&packet_buf, &self.globals), + ) + .await + .ok(); + // debug!("response from DoH server: {:?}", res); + + // send response via channel to the dispatch socket + if let Some(Ok(r)) = res { + if let Err(e) = res_sender.send((r, src_addr)).await { + error!("res_sender on channel fail: {:?}", e); + return Err(DapError::UdpChannelSendError(e)); + } + } else { + return Err(DapError::FailedToMakeDohQuery); + } + counter.decrement(CounterType::Udp); + + Ok(()) + } +} diff --git a/dap-lib/src/proxy/socket.rs b/dap-lib/src/proxy/socket.rs new file mode 100644 index 0000000..6702a3f --- /dev/null +++ b/dap-lib/src/proxy/socket.rs @@ -0,0 +1,41 @@ +use crate::{error::*, log::*}; +use socket2::{Domain, Protocol, Socket, Type}; +use std::net::{SocketAddr, UdpSocket}; +use tokio::net::TcpSocket; + +/// Bind TCP socket to the given `SocketAddr`, and returns the TCP socket with `SO_REUSEADDR` and `SO_REUSEPORT` options. +/// This option is required to re-bind the socket address when the proxy instance is reconstructed. +pub(super) fn bind_tcp_socket(listening_on: &SocketAddr) -> Result { + let tcp_socket = if listening_on.is_ipv6() { + TcpSocket::new_v6() + } else { + TcpSocket::new_v4() + }?; + tcp_socket.set_reuseaddr(true)?; + tcp_socket.set_reuseport(true)?; + if let Err(e) = tcp_socket.bind(*listening_on) { + error!("Failed to bind TCP socket: {}", e); + return Err(DapError::Io(e)); + }; + Ok(tcp_socket) +} + +/// Bind UDP socket to the given `SocketAddr`, and returns the UDP socket with `SO_REUSEADDR` and `SO_REUSEPORT` options. +/// This option is required to re-bind the socket address when the proxy instance is reconstructed. +pub(super) fn bind_udp_socket(listening_on: &SocketAddr) -> Result { + let socket = if listening_on.is_ipv6() { + Socket::new(Domain::IPV6, Type::DGRAM, Some(Protocol::UDP)) + } else { + Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP)) + }?; + socket.set_reuse_address(true)?; // This isn't necessary? + socket.set_reuse_port(true)?; + + if let Err(e) = socket.bind(&(*listening_on).into()) { + error!("Failed to bind UDP socket: {}", e); + return Err(DapError::Io(e)); + }; + let udp_socket: UdpSocket = socket.into(); + + Ok(udp_socket) +} diff --git a/dap-lib/src/trait_resolve_ips.rs b/dap-lib/src/trait_resolve_ips.rs new file mode 100644 index 0000000..ee971ee --- /dev/null +++ b/dap-lib/src/trait_resolve_ips.rs @@ -0,0 +1,18 @@ +use crate::error::Result; +use async_trait::async_trait; +use std::net::SocketAddr; +use url::Url; + +#[async_trait] +/// Trait that resolves ip addresses from a given url. +/// This will be used both for bootstrap DNS resolver and MODoH resolver itself. +pub trait ResolveIps { + async fn resolve_ips(&self, target_url: &Url) -> Result; +} +/// Response of ResolveIps trait +pub struct ResolveIpResponse { + /// hostname of target url + pub hostname: String, + /// resolved ip addresses + pub addresses: Vec, +}