From 960b93124fe980833805389c27121e15d8b16baf Mon Sep 17 00:00:00 2001 From: LightQuantum Date: Wed, 19 Jun 2024 19:33:22 +0800 Subject: [PATCH] feat(fetcher): support socks5 proxy --- Cargo.lock | 13 +++++++++ rsync-fetcher/Cargo.toml | 1 + rsync-fetcher/src/rsync.rs | 57 +++++++++++++++++++++++++++++++++++--- 3 files changed, 67 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bb14c04..2d1a858 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3348,6 +3348,7 @@ dependencies = [ "tempfile", "test-strategy", "tokio", + "tokio-socks", "tokio-util", "tracing", "unix_mode", @@ -4419,6 +4420,18 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-socks" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51165dfa029d2a65969413a6cc96f354b86b464498702f174a4efa13608fd8c0" +dependencies = [ + "either", + "futures-util", + "thiserror", + "tokio", +] + [[package]] name = "tokio-stream" version = "0.1.14" diff --git a/rsync-fetcher/Cargo.toml b/rsync-fetcher/Cargo.toml index 91b4224..d316f88 100644 --- a/rsync-fetcher/Cargo.toml +++ b/rsync-fetcher/Cargo.toml @@ -33,6 +33,7 @@ sqlx = { version = "0.7", features = ["runtime-tokio-rustls", "postgres", "chron tap = "1.0" tempfile = "3.3" tokio = { version = "1.25", features = ["full"] } +tokio-socks = "0.5" tokio-util = { version = "0.7", features = ["compat"] } tracing = "0.1" unix_mode = "0.1" diff --git a/rsync-fetcher/src/rsync.rs b/rsync-fetcher/src/rsync.rs index f4e98f3..483a8a2 100644 --- a/rsync-fetcher/src/rsync.rs +++ b/rsync-fetcher/src/rsync.rs @@ -1,6 +1,8 @@ -use eyre::{Context, ContextCompat, Result}; +use eyre::{bail, Context, ContextCompat, Result}; use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt}; use tokio::net::TcpStream; +use tokio_socks::tcp::Socks5Stream; +use tracing::info; use url::Url; use crate::rsync::downloader::Downloader; @@ -36,19 +38,66 @@ pub struct TaskBuilders { pub progress: ProgressDisplay, } +async fn connect_with_proxy(target: &str) -> Result { + let proxy = std::env::var("SOCKS5_PROXY") + .ok() + .and_then(|s| (!s.is_empty()).then_some(s)) + .or_else(|| { + std::env::var("socks5_proxy") + .ok() + .and_then(|s| (!s.is_empty()).then_some(s)) + }); + + if let Some(proxy) = proxy { + let proxy = Url::parse(&proxy).context("invalid proxy URL")?; + if proxy.scheme().to_lowercase() != "socks5" { + bail!("unsupported proxy scheme: {}", proxy.scheme()); + } + let proxy_addr = proxy.host_str().context("missing proxy host")?; + let proxy_port = proxy.port().unwrap_or(1080); + let proxy_username = proxy.username(); + let proxy_password = proxy.password().unwrap_or_default(); + + let stream = if proxy_username.is_empty() { + info!("connecting to {} via SOCKS5 proxy {}", target, proxy); + Socks5Stream::connect((proxy_addr, proxy_port), target) + .await + .context("proxy or rsync server refused connection. Are they running?")? + } else { + info!( + "connecting to {} via SOCKS5 proxy {} as {}", + target, proxy, proxy_username + ); + Socks5Stream::connect_with_password( + (proxy_addr, proxy_port), + target, + proxy_username, + proxy_password, + ) + .await + .context("proxy or rsync server refused connection. Are they running?")? + }; + + Ok(stream.into_inner()) + } else { + TcpStream::connect(target) + .await + .context("rsync server refused connection. Is it running?") + } +} + pub async fn start_handshake(url: &Url) -> Result { let port = url.port().unwrap_or(873); let path = url.path().trim_start_matches('/'); let auth = Auth::from_url_and_env(url); let module = path.split('/').next().context("empty remote path")?; - let stream = TcpStream::connect(format!( + let stream = connect_with_proxy(&format!( "{}:{}", url.host_str().context("missing remote host")?, port )) - .await - .context("rsync server refused connection. Is it running?")?; + .await?; let mut handshake = HandshakeConn::new(stream); handshake.start_inband_exchange(module, path, auth).await?;