diff --git a/.github/workflows/nightly-nym-wallet-build.yml b/.github/workflows/nightly-nym-wallet-build.yml index 33df95ba451..aa684e62d93 100644 --- a/.github/workflows/nightly-nym-wallet-build.yml +++ b/.github/workflows/nightly-nym-wallet-build.yml @@ -14,7 +14,7 @@ jobs: strategy: fail-fast: false matrix: - os: [custom-linux, macos-latest, windows10] + os: [custom-ubuntu-20.04, macos-latest, windows10] runs-on: ${{ matrix.os }} env: CARGO_TERM_COLOR: always diff --git a/.github/workflows/publish-nym-binaries.yml b/.github/workflows/publish-nym-binaries.yml index 7c9592ae174..f93f39524d0 100644 --- a/.github/workflows/publish-nym-binaries.yml +++ b/.github/workflows/publish-nym-binaries.yml @@ -20,7 +20,7 @@ jobs: strategy: fail-fast: false matrix: - platform: [custom-runner-linux] + platform: [custom-ubuntu-20.04] runs-on: ${{ matrix.platform }} outputs: diff --git a/.github/workflows/publish-nym-connect-ubuntu.yml b/.github/workflows/publish-nym-connect-ubuntu.yml index 436f397e8de..c542f5fc872 100644 --- a/.github/workflows/publish-nym-connect-ubuntu.yml +++ b/.github/workflows/publish-nym-connect-ubuntu.yml @@ -14,7 +14,7 @@ jobs: strategy: fail-fast: false matrix: - platform: [custom-runner-linux] + platform: [custom-ubuntu-20.04] runs-on: ${{ matrix.platform }} outputs: diff --git a/.github/workflows/publish-nym-contracts.yml b/.github/workflows/publish-nym-contracts.yml index fa15926d290..f1cd855e020 100644 --- a/.github/workflows/publish-nym-contracts.yml +++ b/.github/workflows/publish-nym-contracts.yml @@ -7,7 +7,7 @@ on: jobs: build: if: ${{ (startsWith(github.ref, 'refs/tags/nym-contracts-') && github.event_name == 'release') || github.event_name == 'workflow_dispatch' }} - runs-on: [self-hosted, custom-runner-linux] + runs-on: [self-hosted, custom-ubuntu-20.04] steps: - uses: actions/checkout@v2 diff --git a/.github/workflows/publish-nym-wallet-ubuntu.yml b/.github/workflows/publish-nym-wallet-ubuntu.yml index cdf714e4e3b..5f71862a452 100644 --- a/.github/workflows/publish-nym-wallet-ubuntu.yml +++ b/.github/workflows/publish-nym-wallet-ubuntu.yml @@ -14,7 +14,7 @@ jobs: strategy: fail-fast: false matrix: - platform: [custom-runner-linux] + platform: [custom-ubuntu-20.04] runs-on: ${{ matrix.platform }} outputs: diff --git a/.github/workflows/publish-nyms5-android-apk.yml b/.github/workflows/publish-nyms5-android-apk.yml index ca8179eb16f..1dcbcdb1719 100644 --- a/.github/workflows/publish-nyms5-android-apk.yml +++ b/.github/workflows/publish-nyms5-android-apk.yml @@ -12,7 +12,7 @@ on: jobs: build: name: Build APK - runs-on: custom-runner-linux + runs-on: custom-ubuntu-20.04 env: ANDROID_HOME: ${{ github.workspace }}/android-sdk NDK_VERSION: 25.2.9519653 diff --git a/CHANGELOG.md b/CHANGELOG.md index fe08ef04e3b..0db3dfaf1fc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,9 @@ Post 1.0.0 release, the changelog format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## [Unreleased] +- add client registry to Gateway ([#3955]) +- add HTTP API to Gateway ([#3955]) +- add `/client/`, `clients` and `register` routes to the gateway ([#3955]) ## [2023.1-milka] (2023-09-24) diff --git a/Cargo.lock b/Cargo.lock index bddbffb5537..aa98e9ab952 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -766,7 +766,11 @@ dependencies = [ "pin-project-lite 0.2.12", "rustversion", "serde", + "serde_json", + "serde_path_to_error", + "serde_urlencoded", "sync_wrapper", + "tokio", "tower", "tower-layer", "tower-service", @@ -789,6 +793,18 @@ dependencies = [ "tower-service", ] +[[package]] +name = "axum-macros" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cdca6a10ecad987bda04e95606ef85a5417dcaac1a78455242d72e031e2b6b62" +dependencies = [ + "heck 0.4.1", + "proc-macro2", + "quote", + "syn 2.0.28", +] + [[package]] name = "backtrace" version = "0.3.68" @@ -868,7 +884,7 @@ dependencies = [ "pbkdf2", "rand_core 0.6.4", "ripemd", - "sha2 0.10.7", + "sha2 0.10.8", "subtle 2.4.1", "zeroize", ] @@ -1076,7 +1092,7 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f5353f36341f7451062466f0b755b96ac3a9547e4d7f6b70d603fc721a7d7896" dependencies = [ - "sha2 0.10.7", + "sha2 0.10.8", ] [[package]] @@ -1739,7 +1755,7 @@ dependencies = [ "schemars", "serde", "serde-json-wasm", - "sha2 0.10.7", + "sha2 0.10.8", "thiserror", ] @@ -4365,7 +4381,7 @@ dependencies = [ "cfg-if", "ecdsa 0.14.8", "elliptic-curve 0.12.3", - "sha2 0.10.7", + "sha2 0.10.8", ] [[package]] @@ -4378,7 +4394,7 @@ dependencies = [ "ecdsa 0.16.8", "elliptic-curve 0.13.5", "once_cell", - "sha2 0.10.7", + "sha2 0.10.8", "signature 2.1.0", ] @@ -4612,7 +4628,7 @@ dependencies = [ "rand 0.8.5", "rw-stream-sink 0.3.0 (git+https://github.com/ChainSafe/rust-libp2p.git?rev=e3440d25681df380c9f0f8cfdcfd5ecc0a4f2fb6)", "sec1 0.3.0", - "sha2 0.10.7", + "sha2 0.10.8", "smallvec", "thiserror", "unsigned-varint", @@ -4698,7 +4714,7 @@ dependencies = [ "prost-codec", "rand 0.8.5", "regex", - "sha2 0.10.7", + "sha2 0.10.8", "smallvec", "thiserror", "unsigned-varint", @@ -4730,7 +4746,7 @@ dependencies = [ "rand 0.8.5", "regex", "serde", - "sha2 0.10.7", + "sha2 0.10.8", "smallvec", "thiserror", "unsigned-varint", @@ -4773,7 +4789,7 @@ dependencies = [ "quick-protobuf", "rand 0.8.5", "serde", - "sha2 0.10.7", + "sha2 0.10.8", "thiserror", "zeroize", ] @@ -4799,7 +4815,7 @@ dependencies = [ "quick-protobuf", "rand 0.8.5", "serde", - "sha2 0.10.7", + "sha2 0.10.8", "smallvec", "thiserror", "uint", @@ -4904,7 +4920,7 @@ dependencies = [ "prost", "prost-build", "rand 0.8.5", - "sha2 0.10.7", + "sha2 0.10.8", "snow", "static_assertions", "thiserror", @@ -4927,7 +4943,7 @@ dependencies = [ "once_cell", "quick-protobuf", "rand 0.8.5", - "sha2 0.10.7", + "sha2 0.10.8", "snow", "static_assertions", "thiserror", @@ -5616,7 +5632,7 @@ dependencies = [ "multihash-derive", "serde", "serde-big-array", - "sha2 0.10.7", + "sha2 0.10.8", "unsigned-varint", ] @@ -6193,7 +6209,7 @@ dependencies = [ "reqwest", "serde", "serde_json", - "sha2 0.10.7", + "sha2 0.10.8", "sqlx 0.6.3", "tap", "tempfile", @@ -6463,6 +6479,9 @@ dependencies = [ "anyhow", "async-trait", "atty", + "axum", + "axum-macros", + "base64 0.21.4", "bip39", "bs58 0.4.0", "clap 4.3.21", @@ -6470,8 +6489,11 @@ dependencies = [ "dashmap", "dirs 4.0.0", "dotenvy", + "fastrand 2.0.0", "futures", + "hmac 0.12.1", "humantime-serde", + "hyper", "lazy_static", "log", "nym-api-requests", @@ -6495,8 +6517,10 @@ dependencies = [ "once_cell", "pretty_env_logger", "rand 0.7.3", + "rand 0.8.5", "serde", "serde_json", + "sha2 0.10.8", "sqlx 0.5.11", "subtle-encoding", "thiserror", @@ -6504,7 +6528,9 @@ dependencies = [ "tokio-stream", "tokio-tungstenite", "tokio-util", + "tower", "url", + "x25519-dalek 2.0.0", "zeroize", ] @@ -7444,6 +7470,8 @@ dependencies = [ "dashmap", "etherparse", "futures", + "ip_network", + "ip_network_table", "log", "nym-task", "tap", @@ -7694,7 +7722,7 @@ checksum = "51f44edd08f51e2ade572f141051021c5af22677e42b7dd28a88155151c33594" dependencies = [ "ecdsa 0.14.8", "elliptic-curve 0.12.3", - "sha2 0.10.7", + "sha2 0.10.8", ] [[package]] @@ -7705,7 +7733,7 @@ checksum = "dfc8c5bf642dde52bb9e87c0ecd8ca5a76faac2eeed98dedb7c717997e1080aa" dependencies = [ "ecdsa 0.14.8", "elliptic-curve 0.12.3", - "sha2 0.10.7", + "sha2 0.10.8", ] [[package]] @@ -7939,7 +7967,7 @@ checksum = "56af0a30af74d0445c0bf6d9d051c979b516a1a5af790d251daee76005420a48" dependencies = [ "once_cell", "pest", - "sha2 0.10.7", + "sha2 0.10.8", ] [[package]] @@ -9075,7 +9103,7 @@ version = "7.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "512b0ab6853f7e14e3c8754acb43d6f748bb9ced66aa5915a6553ac8213f7731" dependencies = [ - "sha2 0.10.7", + "sha2 0.10.8", "walkdir", ] @@ -9569,6 +9597,16 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_path_to_error" +version = "0.1.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4beec8bce849d58d06238cb50db2e1c417cfeafa4c63f692b15c82b7c80f8335" +dependencies = [ + "itoa", + "serde", +] + [[package]] name = "serde_repr" version = "0.1.16" @@ -9640,9 +9678,9 @@ dependencies = [ [[package]] name = "sha2" -version = "0.10.7" +version = "0.10.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "479fb9d862239e610720565ca91403019f2f00410f1864c5aa7479b950a76ed8" +checksum = "793db75ad2bcafc3ffa7c68b215fee268f537982cd901d132f89c6343f3a3dc8" dependencies = [ "cfg-if", "cpufeatures", @@ -9796,7 +9834,7 @@ dependencies = [ "rand_core 0.6.4", "ring", "rustc_version 0.4.0", - "sha2 0.10.7", + "sha2 0.10.8", "subtle 2.4.1", ] @@ -9982,7 +10020,7 @@ dependencies = [ "paste", "percent-encoding", "rustls 0.19.1", - "sha2 0.10.7", + "sha2 0.10.8", "smallvec", "sqlformat 0.1.8", "sqlx-rt 0.5.13", @@ -10030,7 +10068,7 @@ dependencies = [ "percent-encoding", "rustls 0.20.8", "rustls-pemfile", - "sha2 0.10.7", + "sha2 0.10.8", "smallvec", "sqlformat 0.2.1", "sqlx-rt 0.6.3", @@ -10053,7 +10091,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "sha2 0.10.7", + "sha2 0.10.8", "sqlx-core 0.5.13", "sqlx-rt 0.5.13", "syn 1.0.109", @@ -10072,7 +10110,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "sha2 0.10.7", + "sha2 0.10.8", "sqlx-core 0.6.3", "sqlx-rt 0.6.3", "syn 1.0.109", @@ -10374,7 +10412,7 @@ dependencies = [ "serde_bytes", "serde_json", "serde_repr", - "sha2 0.10.7", + "sha2 0.10.8", "signature 2.1.0", "subtle 2.4.1", "subtle-encoding", @@ -11762,7 +11800,7 @@ dependencies = [ "sdp", "serde", "serde_json", - "sha2 0.10.7", + "sha2 0.10.8", "stun", "thiserror", "time", @@ -11825,7 +11863,7 @@ dependencies = [ "sec1 0.3.0", "serde", "sha1", - "sha2 0.10.7", + "sha2 0.10.8", "signature 1.6.4", "subtle 2.4.1", "thiserror", diff --git a/common/client-libs/gateway-client/src/socket_state.rs b/common/client-libs/gateway-client/src/socket_state.rs index 6f82ef4939a..6f325b3520d 100644 --- a/common/client-libs/gateway-client/src/socket_state.rs +++ b/common/client-libs/gateway-client/src/socket_state.rs @@ -74,7 +74,7 @@ impl PartiallyDelegated { fn route_socket_messages( ws_msgs: Vec, - packet_router: &mut PacketRouter, + packet_router: &PacketRouter, shared_key: &SharedKeys, ) -> Result<(), GatewayClientError> { let plaintexts = Self::recover_received_plaintexts(ws_msgs, shared_key); @@ -97,7 +97,6 @@ impl PartiallyDelegated { let mixnet_receiver_future = async move { let mut notify_receiver = notify_receiver; let mut chunk_stream = (&mut stream).ready_chunks(8); - let mut packet_router = packet_router; let ret_err = loop { tokio::select! { @@ -115,7 +114,7 @@ impl PartiallyDelegated { Ok(msgs) => msgs }; - if let Err(err) = Self::route_socket_messages(ws_msgs, &mut packet_router, shared_key.as_ref()) { + if let Err(err) = Self::route_socket_messages(ws_msgs, &packet_router, shared_key.as_ref()) { log::warn!("Route socket messages failed: {err}"); } } diff --git a/common/client-libs/gateway-client/src/traits.rs b/common/client-libs/gateway-client/src/traits.rs index 54b53ffc814..cb0a3f4d221 100644 --- a/common/client-libs/gateway-client/src/traits.rs +++ b/common/client-libs/gateway-client/src/traits.rs @@ -42,7 +42,9 @@ pub trait GatewayPacketRouter { } n if n - == PacketSize::OutfoxRegularPacket.plaintext_size() - outfox_ack_overhead => + == PacketSize::OutfoxRegularPacket + .plaintext_size() + .saturating_sub(outfox_ack_overhead) => { trace!("received regular outfox packet"); received_messages.push(received_packet); diff --git a/common/client-libs/validator-client/Cargo.toml b/common/client-libs/validator-client/Cargo.toml index 0ee5c8b3af0..4b3533ef7e8 100644 --- a/common/client-libs/validator-client/Cargo.toml +++ b/common/client-libs/validator-client/Cargo.toml @@ -86,4 +86,5 @@ required-features = ["http-client"] default = ["http-client"] http-client = ["cosmrs/rpc", "openssl"] generate-ts = [] +contract-testing = ["nym-mixnet-contract-common/contract-testing"] diff --git a/common/client-libs/validator-client/src/nyxd/contract_traits/mixnet_signing_client.rs b/common/client-libs/validator-client/src/nyxd/contract_traits/mixnet_signing_client.rs index f8b5521e0d3..9b5761bed63 100644 --- a/common/client-libs/validator-client/src/nyxd/contract_traits/mixnet_signing_client.rs +++ b/common/client-libs/validator-client/src/nyxd/contract_traits/mixnet_signing_client.rs @@ -683,13 +683,14 @@ pub trait MixnetSigningClient { .await } - #[cfg(feature = "nym_mixnet_contract_common/contract-testing")] + #[cfg(feature = "contract-testing")] async fn testing_resolve_all_pending_events( + &self, fee: Option, ) -> Result { self.execute_mixnet_contract( fee, - MixnetExecuteMsg::TestingResolveAllPendingEvents {}, + MixnetExecuteMsg::TestingResolveAllPendingEvents { limit: None }, vec![], ) .await @@ -928,8 +929,8 @@ mod tests { .withdraw_delegator_reward_on_behalf(owner.parse().unwrap(), mix_id, None) .ignore(), - #[cfg(feature = "nym_mixnet_contract_common/contract-testing")] - MixnetExecuteMsg::TestingResolveAllPendingEvents {} => { + #[cfg(feature = "contract-testing")] + MixnetExecuteMsg::TestingResolveAllPendingEvents { .. } => { client.testing_resolve_all_pending_events(None).ignore() } }; diff --git a/common/wireguard/Cargo.toml b/common/wireguard/Cargo.toml index b20fd260361..a78920f7f58 100644 --- a/common/wireguard/Cargo.toml +++ b/common/wireguard/Cargo.toml @@ -22,6 +22,8 @@ bytes = "1.5.0" dashmap = "5.5.3" etherparse = "0.13.0" futures = "0.3.28" +ip_network = "0.4.1" +ip_network_table = "0.2.0" log.workspace = true nym-task = { path = "../task" } tap.workspace = true diff --git a/common/wireguard/src/event.rs b/common/wireguard/src/event.rs index 7d425cd5d6e..9b62bf2b4e0 100644 --- a/common/wireguard/src/event.rs +++ b/common/wireguard/src/event.rs @@ -3,23 +3,29 @@ use std::fmt::{Display, Formatter}; use bytes::Bytes; #[allow(unused)] -#[derive(Debug, Clone)] +#[derive(Debug)] pub enum Event { /// IP packet received from the WireGuard tunnel that should be passed through to the corresponding virtual device/internet. /// Original implementation also has protocol here since it understands it, but we'll have to infer it downstream - WgPacket(Bytes), + Wg(Bytes), + /// IP packet received from the UDP listener that was verified as part of the handshake + WgVerified(Bytes), /// IP packet to be sent through the WireGuard tunnel as crafted by the virtual device. - IpPacket(Bytes), + Ip(Bytes), } impl Display for Event { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { match self { - Event::WgPacket(data) => { + Event::Wg(data) => { let size = data.len(); write!(f, "WgPacket{{ size={size} }}") } - Event::IpPacket(data) => { + Event::WgVerified(data) => { + let size = data.len(); + write!(f, "WgVerifiedPacket{{ size={size} }}") + } + Event::Ip(data) => { let size = data.len(); write!(f, "IpPacket{{ size={size} }}") } diff --git a/common/wireguard/src/lib.rs b/common/wireguard/src/lib.rs index 15383eb335c..a21d7f6035a 100644 --- a/common/wireguard/src/lib.rs +++ b/common/wireguard/src/lib.rs @@ -1,40 +1,48 @@ #![cfg_attr(not(target_os = "linux"), allow(dead_code))] -use nym_task::TaskClient; - mod error; mod event; +mod network_table; mod platform; mod setup; -mod tun; mod udp_listener; +mod wg_tunnel; // Currently the module related to setting up the virtual network device is platform specific. #[cfg(target_os = "linux")] use platform::linux::tun_device; -type ActivePeers = - dashmap::DashMap>; +#[derive(Clone)] +struct TunTaskTx(tokio::sync::mpsc::UnboundedSender>); + +impl TunTaskTx { + fn send(&self, packet: Vec) -> Result<(), tokio::sync::mpsc::error::SendError>> { + self.0.send(packet) + } +} #[cfg(target_os = "linux")] pub async fn start_wireguard( - task_client: TaskClient, + task_client: nym_task::TaskClient, ) -> Result<(), Box> { + use std::sync::Arc; + // The set of active tunnels indexed by the peer's address - let active_peers = std::sync::Arc::new(ActivePeers::new()); + let active_peers = Arc::new(udp_listener::ActivePeers::new()); + let peers_by_ip = Arc::new(std::sync::Mutex::new(network_table::NetworkTable::new())); // Start the tun device that is used to relay traffic outbound - let tun_task_tx = tun_device::start_tun_device(active_peers.clone()); + let tun_task_tx = tun_device::start_tun_device(peers_by_ip.clone()); // Start the UDP listener that clients connect to - udp_listener::start_udp_listener(tun_task_tx, active_peers, task_client).await?; + udp_listener::start_udp_listener(tun_task_tx, active_peers, peers_by_ip, task_client).await?; Ok(()) } #[cfg(not(target_os = "linux"))] pub async fn start_wireguard( - _task_client: TaskClient, + _task_client: nym_task::TaskClient, ) -> Result<(), Box> { todo!("WireGuard is currently only supported on Linux") } diff --git a/common/wireguard/src/network_table.rs b/common/wireguard/src/network_table.rs new file mode 100644 index 00000000000..83008c5632d --- /dev/null +++ b/common/wireguard/src/network_table.rs @@ -0,0 +1,25 @@ +use std::net::IpAddr; + +use ip_network::IpNetwork; +use ip_network_table::IpNetworkTable; + +#[derive(Default)] +pub(crate) struct NetworkTable { + ips: IpNetworkTable, +} + +impl NetworkTable { + pub(crate) fn new() -> Self { + Self { + ips: IpNetworkTable::new(), + } + } + + pub fn insert>(&mut self, network: N, data: T) -> Option { + self.ips.insert(network, data) + } + + pub fn longest_match>(&self, ip: I) -> Option<(IpNetwork, &T)> { + self.ips.longest_match(ip) + } +} diff --git a/common/wireguard/src/platform/linux/tun_device.rs b/common/wireguard/src/platform/linux/tun_device.rs index 9d7aef79b2e..96b8ffeaa18 100644 --- a/common/wireguard/src/platform/linux/tun_device.rs +++ b/common/wireguard/src/platform/linux/tun_device.rs @@ -1,14 +1,17 @@ use std::{net::Ipv4Addr, sync::Arc}; use etherparse::{InternetSlice, SlicedPacket}; +use tap::TapFallible; use tokio::{ io::{AsyncReadExt, AsyncWriteExt}, - sync::mpsc::{self, UnboundedSender}, + sync::mpsc::{self}, }; use crate::{ + event::Event, setup::{TUN_BASE_NAME, TUN_DEVICE_ADDRESS, TUN_DEVICE_NETMASK}, - ActivePeers, + udp_listener::PeersByIp, + TunTaskTx, }; fn setup_tokio_tun_device(name: &str, address: Ipv4Addr, netmask: Ipv4Addr) -> tokio_tun::Tun { @@ -25,7 +28,7 @@ fn setup_tokio_tun_device(name: &str, address: Ipv4Addr, netmask: Ipv4Addr) -> t .expect("Failed to setup tun device, do you have permission?") } -pub fn start_tun_device(_active_peers: Arc) -> UnboundedSender> { +pub(crate) fn start_tun_device(peers_by_ip: Arc>) -> TunTaskTx { let tun = setup_tokio_tun_device( format!("{}%d", TUN_BASE_NAME).as_str(), TUN_DEVICE_ADDRESS.parse().unwrap(), @@ -37,6 +40,7 @@ pub fn start_tun_device(_active_peers: Arc) -> UnboundedSender>(); + let tun_task_tx = TunTaskTx(tun_task_tx); tokio::spawn(async move { let mut buf = [0u8; 1024]; @@ -55,8 +59,16 @@ pub fn start_tun_device(_active_peers: Arc) -> UnboundedSender {dst_addr}, {len} bytes)"); - // TODO: route packet to the correct peer. - log::info!("...forward packet to the correct peer (NOT YET IMPLEMENTED)"); + // Route packet to the correct peer. + if let Some(peer_tx) = peers_by_ip.lock().unwrap().longest_match(dst_addr).map(|(_, tx)| tx) { + log::info!("Forward packet to wg tunnel"); + peer_tx + .send(Event::Ip(packet.to_vec().into())) + .tap_err(|err| log::error!("{err}")) + .unwrap(); + } else { + log::info!("No peer found, packet dropped"); + } }, Err(err) => { log::info!("iface: read error: {err}"); diff --git a/common/wireguard/src/setup.rs b/common/wireguard/src/setup.rs index 1df9b559769..3fd948f487e 100644 --- a/common/wireguard/src/setup.rs +++ b/common/wireguard/src/setup.rs @@ -1,3 +1,5 @@ +use std::net::IpAddr; + use base64::{engine::general_purpose, Engine as _}; use boringtun::x25519; use log::info; @@ -15,35 +17,47 @@ pub const TUN_DEVICE_NETMASK: &str = "255.255.255.0"; // Corresponding public key: "WM8s8bYegwMa0TJ+xIwhk+dImk2IpDUKslDBCZPizlE=" const PRIVATE_KEY: &str = "AEqXrLFT4qjYq3wmX0456iv94uM6nDj5ugp6Jedcflg="; -// The public keys of the registered peers (clients) -const PEERS: &[&str; 1] = &[ - // Corresponding private key: "ILeN6gEh6vJ3Ju8RJ3HVswz+sPgkcKtAYTqzQRhTtlo=" - "NCIhkgiqxFx1ckKl3Zuh595DzIFl8mxju1Vg995EZhI=", - // Another key - // "mxV/mw7WZTe+0Msa0kvJHMHERDA/cSskiZWQce+TdEs=", -]; +// The public keys of the registered peer (clients) +// Corresponding private key: "ILeN6gEh6vJ3Ju8RJ3HVswz+sPgkcKtAYTqzQRhTtlo=" +const PEER: &str = "NCIhkgiqxFx1ckKl3Zuh595DzIFl8mxju1Vg995EZhI="; -pub fn init_static_dev_keys() -> (x25519::StaticSecret, x25519::PublicKey) { - // TODO: this is a temporary solution for development - let static_private_bytes: [u8; 32] = general_purpose::STANDARD - .decode(PRIVATE_KEY) +// The AllowedIPs for the connected peer, which is one a single IP and the same as the IP that the +// peer has configured on their side. +const ALLOWED_IPS: &str = "10.0.0.2"; + +fn decode_base64_key(base64_key: &str) -> [u8; 32] { + general_purpose::STANDARD + .decode(base64_key) .unwrap() .try_into() - .unwrap(); + .unwrap() +} + +pub fn server_static_private_key() -> x25519::StaticSecret { + // TODO: this is a temporary solution for development + let static_private_bytes: [u8; 32] = decode_base64_key(PRIVATE_KEY); let static_private = x25519::StaticSecret::try_from(static_private_bytes).unwrap(); let static_public = x25519::PublicKey::from(&static_private); info!( "wg public key: {}", general_purpose::STANDARD.encode(static_public) ); + static_private +} - // TODO: A single static public key is used for all peers during development - let peer_static_public_bytes: [u8; 32] = general_purpose::STANDARD - .decode(PEERS[0]) - .unwrap() - .try_into() - .unwrap(); +pub fn peer_static_public_key() -> x25519::PublicKey { + // A single static public key is used during development + let peer_static_public_bytes: [u8; 32] = decode_base64_key(PEER); let peer_static_public = x25519::PublicKey::try_from(peer_static_public_bytes).unwrap(); + info!( + "peer public key: {}", + general_purpose::STANDARD.encode(peer_static_public) + ); + peer_static_public +} - (static_private, peer_static_public) +pub fn peer_allowed_ips() -> ip_network::IpNetwork { + let key: IpAddr = ALLOWED_IPS.parse().unwrap(); + let cidr = 0u8; + ip_network::IpNetwork::new_truncate(key, cidr).unwrap() } diff --git a/common/wireguard/src/udp_listener.rs b/common/wireguard/src/udp_listener.rs index ed1440894df..7539b343a71 100644 --- a/common/wireguard/src/udp_listener.rs +++ b/common/wireguard/src/udp_listener.rs @@ -1,36 +1,81 @@ -use std::{net::SocketAddr, sync::Arc}; +use std::{collections::HashMap, net::SocketAddr, sync::Arc}; +use boringtun::{ + noise::{handshake::parse_handshake_anon, rate_limiter::RateLimiter, TunnResult}, + x25519, +}; +use dashmap::DashMap; use futures::StreamExt; +use ip_network::IpNetwork; use log::error; use nym_task::TaskClient; use tap::TapFallible; -use tokio::{net::UdpSocket, sync::mpsc::UnboundedSender}; +use tokio::{ + net::UdpSocket, + sync::mpsc::{self}, +}; use crate::{ event::Event, - setup::{WG_ADDRESS, WG_PORT}, - ActivePeers, + network_table::NetworkTable, + setup::{self, WG_ADDRESS, WG_PORT}, + TunTaskTx, }; const MAX_PACKET: usize = 65535; -pub async fn start_udp_listener( - tun_task_tx: UnboundedSender>, +pub(crate) type PeerIdx = u32; +pub(crate) type ActivePeers = DashMap>; +pub(crate) type PeersByIp = NetworkTable>; + +#[derive(Debug)] +struct RegisteredPeer { + public_key: x25519::PublicKey, + allowed_ips: IpNetwork, + // endpoint: SocketAddr, +} + +pub(crate) async fn start_udp_listener( + tun_task_tx: TunTaskTx, active_peers: Arc, + peers_by_ip: Arc>, mut task_client: TaskClient, ) -> Result<(), Box> { let wg_address = SocketAddr::new(WG_ADDRESS.parse().unwrap(), WG_PORT); log::info!("Starting wireguard UDP listener on {wg_address}"); let udp_socket = Arc::new(UdpSocket::bind(wg_address).await?); - // Setup some static keys for development - let (static_private, peer_static_public) = crate::setup::init_static_dev_keys(); + // Setup our own keys + let static_private = setup::server_static_private_key(); + let static_public = x25519::PublicKey::from(&static_private); + let handshake_max_rate = 100u64; + let rate_limiter = RateLimiter::new(&static_public, handshake_max_rate); + + // Test peer + let peer_static_public = setup::peer_static_public_key(); + let peer_allowed_ips = setup::peer_allowed_ips(); + let peer_index = 0; + let test_peer = Arc::new(tokio::sync::Mutex::new(RegisteredPeer { + public_key: peer_static_public, + allowed_ips: peer_allowed_ips, + })); + + type PeerIdx = u32; + let mut registered_peers: HashMap>> = + HashMap::new(); + let mut registered_peers_by_idx: HashMap>> = + HashMap::new(); + + registered_peers.insert(peer_static_public, Arc::clone(&test_peer)); + registered_peers_by_idx.insert(0, test_peer); tokio::spawn(async move { // Each tunnel is run in its own task, and the task handle is stored here so we can remove // it from `active_peers` when the tunnel is closed let mut active_peers_task_handles = futures::stream::FuturesUnordered::new(); + let mut buf = [0u8; MAX_PACKET]; + let mut dst_buf = [0u8; MAX_PACKET]; while !task_client.is_shutdown() { tokio::select! { @@ -39,11 +84,12 @@ pub async fn start_udp_listener( break; } // Handle tunnel closing - Some(addr) = active_peers_task_handles.next() => { - match addr { - Ok(addr) => { - log::info!("Removing peer: {addr:?}"); - active_peers.remove(&addr); + Some(public_key) = active_peers_task_handles.next() => { + match public_key { + Ok(public_key) => { + log::info!("Removing peer: {public_key:?}"); + active_peers.remove(&public_key); + // TODO: remove from peers_by_ip } Err(err) => { error!("WireGuard UDP listener: error receiving shutdown from peer: {err}"); @@ -54,28 +100,78 @@ pub async fn start_udp_listener( Ok((len, addr)) = udp_socket.recv_from(&mut buf) => { log::trace!("udp: received {} bytes from {}", len, addr); - if let Some(peer_tx) = active_peers.get_mut(&addr) { + // Verify the incoming packet + let verified_packet = match rate_limiter.verify_packet(Some(addr.ip()), &buf[..len], &mut dst_buf) { + Ok(packet) => packet, + Err(TunnResult::WriteToNetwork(cookie)) => { + log::info!("WireGuard UDP listener: send back cookie"); + udp_socket.send_to(cookie, addr).await.unwrap(); + continue; + } + Err(err) => { + log::warn!("{err:?}"); + continue; + } + }; + + // Check if this is a registered peer, if not, just skip + let registered_peer = match verified_packet { + boringtun::noise::Packet::HandshakeInit(ref packet) => { + let Ok(handshake) = parse_handshake_anon(&static_private, &static_public, packet) else { + log::warn!("Handshake failed"); + continue; + }; + registered_peers.get(&x25519::PublicKey::from(handshake.peer_static_public)) + }, + boringtun::noise::Packet::HandshakeResponse(packet) => { + let peer_idx = packet.receiver_idx >> 8; + registered_peers_by_idx.get(&peer_idx) + }, + boringtun::noise::Packet::PacketCookieReply(packet) => { + let peer_idx = packet.receiver_idx >> 8; + registered_peers_by_idx.get(&peer_idx) + }, + boringtun::noise::Packet::PacketData(packet) => { + let peer_idx = packet.receiver_idx >> 8; + registered_peers_by_idx.get(&peer_idx) + }, + }; + + let registered_peer = if let Some(registered_peer) = registered_peer { + registered_peer.lock().await + } else { + log::warn!("Peer not registered"); + continue; + }; + + // Look up if the peer is already connected + if let Some(peer_tx) = active_peers.get_mut(®istered_peer.public_key) { + // If it is, send it the packet to deal with log::info!("udp: received {len} bytes from {addr} from known peer"); - peer_tx.send(Event::WgPacket(buf[..len].to_vec().into())) + peer_tx.send(Event::WgVerified(buf[..len].to_vec().into())) .tap_err(|err| log::error!("{err}")) .unwrap(); } else { + // If it isn't, start a new tunnel log::info!("udp: received {len} bytes from {addr} from unknown peer, starting tunnel"); - let (join_handle, peer_tx) = crate::tun::start_wg_tunnel( + let (join_handle, peer_tx) = crate::wg_tunnel::start_wg_tunnel( addr, udp_socket.clone(), static_private.clone(), - peer_static_public, + registered_peer.public_key, + registered_peer.allowed_ips, + peer_index, tun_task_tx.clone(), ); - peer_tx.send(Event::WgPacket(buf[..len].to_vec().into())) + + peers_by_ip.lock().unwrap().insert(registered_peer.allowed_ips, peer_tx.clone()); + + peer_tx.send(Event::Wg(buf[..len].to_vec().into())) .tap_err(|err| log::error!("{err}")) .unwrap(); - // WIP(JON): active peers should probably be keyed by peer_static_public - // instead. Does this current setup lead to any issues? log::info!("Adding peer: {addr}"); - active_peers.insert(addr, peer_tx); + active_peers.insert(registered_peer.public_key, peer_tx); active_peers_task_handles.push(join_handle); } }, diff --git a/common/wireguard/src/tun.rs b/common/wireguard/src/wg_tunnel.rs similarity index 68% rename from common/wireguard/src/tun.rs rename to common/wireguard/src/wg_tunnel.rs index 3dbafecf322..11f8485cf38 100644 --- a/common/wireguard/src/tun.rs +++ b/common/wireguard/src/wg_tunnel.rs @@ -2,11 +2,10 @@ use std::{net::SocketAddr, sync::Arc, time::Duration}; use async_recursion::async_recursion; use boringtun::{ - noise::{errors::WireGuardError, Tunn, TunnResult}, + noise::{errors::WireGuardError, rate_limiter::RateLimiter, Tunn, TunnResult}, x25519, }; use bytes::Bytes; -use etherparse::{InternetSlice, SlicedPacket}; use log::{debug, error, info, warn}; use tap::TapFallible; use tokio::{ @@ -15,7 +14,11 @@ use tokio::{ time::timeout, }; -use crate::{error::WgError, event::Event}; +use crate::{ + error::WgError, event::Event, network_table::NetworkTable, udp_listener::PeerIdx, TunTaskTx, +}; + +const HANDSHAKE_MAX_RATE: u64 = 10; const MAX_PACKET: usize = 65535; @@ -27,10 +30,10 @@ pub struct WireGuardTunnel { udp: Arc, // Peer endpoint - endpoint: SocketAddr, + endpoint: Arc>, - // The source address of the last packet received from the peer - source_addr: Arc>>, + // AllowedIPs for this peer + allowed_ips: NetworkTable<()>, // `boringtun` tunnel, used for crypto & WG protocol wg_tunnel: Arc>, @@ -40,7 +43,7 @@ pub struct WireGuardTunnel { close_rx: broadcast::Receiver<()>, // Send data to the task that handles sending data through the tun device - tun_task_tx: mpsc::UnboundedSender>, + tun_task_tx: TunTaskTx, } impl Drop for WireGuardTunnel { @@ -51,12 +54,15 @@ impl Drop for WireGuardTunnel { } impl WireGuardTunnel { - pub fn new( + pub(crate) fn new( udp: Arc, endpoint: SocketAddr, static_private: x25519::StaticSecret, peer_static_public: x25519::PublicKey, - tunnel_tx: mpsc::UnboundedSender>, + peer_allowed_ips: ip_network::IpNetwork, + index: PeerIdx, + // rate_limiter: Option, + tunnel_tx: TunTaskTx, ) -> (Self, mpsc::UnboundedSender) { let local_addr = udp.local_addr().unwrap(); let peer_addr = udp.peer_addr(); @@ -64,8 +70,12 @@ impl WireGuardTunnel { let preshared_key = None; let persistent_keepalive = None; - let index = 0; - let rate_limiter = None; + + let static_public = x25519::PublicKey::from(&static_private); + let rate_limiter = Some(Arc::new(RateLimiter::new( + &static_public, + HANDSHAKE_MAX_RATE, + ))); let wg_tunnel = Arc::new(tokio::sync::Mutex::new( Tunn::new( @@ -85,11 +95,14 @@ impl WireGuardTunnel { // Signal close tunnel let (close_tx, close_rx) = broadcast::channel(1); + let mut allowed_ips = NetworkTable::new(); + allowed_ips.insert(peer_allowed_ips, ()); + let tunnel = WireGuardTunnel { peer_rx, udp, - endpoint, - source_addr: Default::default(), + endpoint: Arc::new(tokio::sync::RwLock::new(endpoint)), + allowed_ips, wg_tunnel, close_tx, close_rx, @@ -114,12 +127,17 @@ impl WireGuardTunnel { Some(packet) => { info!("event loop: {packet}"); match packet { - Event::WgPacket(data) => { + Event::Wg(data) => { let _ = self.consume_wg(&data) .await .tap_err(|err| error!("WireGuard tunnel: consume_wg error: {err}")); }, - Event::IpPacket(data) => self.consume_eth(&data).await, + Event::WgVerified(data) => { + let _ = self.consume_verified_wg(&data) + .await + .tap_err(|err| error!("WireGuard tunnel: consume_verified_wg error: {err}")); + } + Event::Ip(data) => self.consume_eth(&data).await, } }, None => { @@ -134,7 +152,7 @@ impl WireGuardTunnel { }, } } - info!("WireGuard tunnel ({}): closed", self.endpoint); + info!("WireGuard tunnel ({}): closed", self.endpoint.read().await); } async fn wg_tunnel_lock(&self) -> Result, WgError> { @@ -143,16 +161,11 @@ impl WireGuardTunnel { .map_err(|_| WgError::UnableToGetTunnel) } - fn set_source_addr(&self, source_addr: std::net::Ipv4Addr) { - let to_update = { - let stored_source_addr = self.source_addr.read().unwrap(); - stored_source_addr - .map(|sa| sa != source_addr) - .unwrap_or(true) - }; - if to_update { - log::info!("wg tunnel set_source_addr: {source_addr}"); - *self.source_addr.write().unwrap() = Some(source_addr); + #[allow(unused)] + async fn set_endpoint(&self, addr: SocketAddr) { + if *self.endpoint.read().await != addr { + log::info!("wg tunnel update endpoint: {addr}"); + *self.endpoint.write().await = addr; } } @@ -161,8 +174,9 @@ impl WireGuardTunnel { let mut tunnel = self.wg_tunnel_lock().await?; match tunnel.decapsulate(None, data, &mut send_buf) { TunnResult::WriteToNetwork(packet) => { - log::info!("udp: send {} bytes to {}", packet.len(), self.endpoint); - if let Err(err) = self.udp.send_to(packet, self.endpoint).await { + let endpoint = self.endpoint.read().await; + log::info!("udp: send {} bytes to {}", packet.len(), *endpoint); + if let Err(err) = self.udp.send_to(packet, *endpoint).await { error!("Failed to send decapsulation-instructed packet to WireGuard endpoint: {err:?}"); }; // Flush pending queue @@ -170,8 +184,8 @@ impl WireGuardTunnel { let mut send_buf = [0u8; MAX_PACKET]; match tunnel.decapsulate(None, &[], &mut send_buf) { TunnResult::WriteToNetwork(packet) => { - log::info!("udp: send {} bytes to {}", packet.len(), self.endpoint); - if let Err(err) = self.udp.send_to(packet, self.endpoint).await { + log::info!("udp: send {} bytes to {}", packet.len(), *endpoint); + if let Err(err) = self.udp.send_to(packet, *endpoint).await { error!("Failed to send decapsulation-instructed packet to WireGuard endpoint: {err:?}"); break; }; @@ -182,14 +196,19 @@ impl WireGuardTunnel { } } } - TunnResult::WriteToTunnelV4(packet, _) | TunnResult::WriteToTunnelV6(packet, _) => { - let headers = SlicedPacket::from_ip(packet).unwrap(); - let (source_addr, _destination_addr) = match headers.ip.unwrap() { - InternetSlice::Ipv4(ip, _) => (ip.source_addr(), ip.destination_addr()), - InternetSlice::Ipv6(_, _) => unimplemented!(), - }; - self.set_source_addr(source_addr); - self.tun_task_tx.send(packet.to_vec()).unwrap(); + TunnResult::WriteToTunnelV4(packet, addr) => { + if self.allowed_ips.longest_match(addr).is_some() { + self.tun_task_tx.send(packet.to_vec()).unwrap(); + } else { + warn!("Packet from {addr} not in allowed_ips"); + } + } + TunnResult::WriteToTunnelV6(packet, addr) => { + if self.allowed_ips.longest_match(addr).is_some() { + self.tun_task_tx.send(packet.to_vec()).unwrap(); + } else { + warn!("Packet (v6) from {addr} not in allowed_ips"); + } } TunnResult::Done => { debug!("WireGuard: decapsulate done"); @@ -201,6 +220,13 @@ impl WireGuardTunnel { Ok(()) } + async fn consume_verified_wg(&mut self, data: &[u8]) -> Result<(), WgError> { + // Potentially we could take some shortcuts here in the name of performance, but currently + // I don't see that the needed functions in boringtun is exposed in the public API. + // TODO: make sure we don't put double pressure on the rate limiter! + self.consume_wg(data).await + } + async fn consume_eth(&self, data: &Bytes) { info!("consume_eth: raw packet size: {}", data.len()); let encapsulated_packet = self.encapsulate_packet(data).await; @@ -209,9 +235,10 @@ impl WireGuardTunnel { encapsulated_packet.len() ); - info!("consume_eth: send to {}: {}", self.endpoint, data.len()); + let endpoint = self.endpoint.read().await; + info!("consume_eth: send to {}: {}", *endpoint, data.len()); self.udp - .send_to(&encapsulated_packet, self.endpoint) + .send_to(&encapsulated_packet, *endpoint) .await .unwrap(); } @@ -244,12 +271,9 @@ impl WireGuardTunnel { async fn handle_routine_tun_result<'a: 'async_recursion>(&self, result: TunnResult<'a>) { match result { TunnResult::WriteToNetwork(packet) => { - log::info!( - "routine: write to network: {}: {}", - self.endpoint, - packet.len() - ); - if let Err(err) = self.udp.send_to(packet, self.endpoint).await { + let endpoint = self.endpoint.read().await; + log::info!("routine: write to network: {}: {}", endpoint, packet.len()); + if let Err(err) = self.udp.send_to(packet, *endpoint).await { error!("routine: failed to send packet: {err:?}"); }; } @@ -276,21 +300,30 @@ impl WireGuardTunnel { } } -pub fn start_wg_tunnel( +pub(crate) fn start_wg_tunnel( endpoint: SocketAddr, udp: Arc, static_private: x25519::StaticSecret, peer_static_public: x25519::PublicKey, - tunnel_tx: mpsc::UnboundedSender>, + peer_allowed_ips: ip_network::IpNetwork, + peer_index: PeerIdx, + tunnel_tx: TunTaskTx, ) -> ( - tokio::task::JoinHandle, + tokio::task::JoinHandle, mpsc::UnboundedSender, ) { - let (mut tunnel, peer_tx) = - WireGuardTunnel::new(udp, endpoint, static_private, peer_static_public, tunnel_tx); + let (mut tunnel, peer_tx) = WireGuardTunnel::new( + udp, + endpoint, + static_private, + peer_static_public, + peer_allowed_ips, + peer_index, + tunnel_tx, + ); let join_handle = tokio::spawn(async move { tunnel.spin_off().await; - endpoint + peer_static_public }); (join_handle, peer_tx) } diff --git a/documentation/operators/src/faq/smoosh-faq.md b/documentation/operators/src/faq/smoosh-faq.md index 5284b901a0c..d68f7c764ee 100644 --- a/documentation/operators/src/faq/smoosh-faq.md +++ b/documentation/operators/src/faq/smoosh-faq.md @@ -40,7 +40,7 @@ Yes, to run a mix node only is an option. However it will be less rewarded as no ### What are the incentives for the node operator? -In the original setup there were no incentives to run a `network-requester`. After the transition all the users will buy multiple tickets of zkNyms credentials and use those as [anonymous e-cash](https://arxiv.org/abs/2303.08221) to pay for their data traffic (`[Nym API](https://github.com/nymtech/nym/tree/master/nym-api)` will do the do cryptographical checks to prevent double-spending). All collected fees get distributed to all active nodes proportionally to their work by the end of each epoch. +In the original setup there were no incentives to run a `network-requester`. After the transition all the users will buy multiple tickets of zkNyms credentials and use those as [anonymous e-cash](https://arxiv.org/abs/2303.08221) to pay for their data traffic ([`Nym API`](https://github.com/nymtech/nym/tree/master/nym-api) will do the do cryptographical checks to prevent double-spending). All collected fees get distributed to all active nodes proportionally to their work by the end of each epoch. ### How does this change the token economics? diff --git a/ephemera/src/core/api_cmd.rs b/ephemera/src/core/api_cmd.rs index 2895d8a79a3..a2473054123 100644 --- a/ephemera/src/core/api_cmd.rs +++ b/ephemera/src/core/api_cmd.rs @@ -101,7 +101,7 @@ impl ApiCmdProcessor { } fn ephemera_config( - ephemera: &mut Ephemera, + ephemera: &Ephemera, reply: Sender>, ) { let node_info = ephemera.node_info.clone(); diff --git a/ephemera/src/core/builder.rs b/ephemera/src/core/builder.rs index 0c89f9ecfd3..31d466420eb 100644 --- a/ephemera/src/core/builder.rs +++ b/ephemera/src/core/builder.rs @@ -191,10 +191,10 @@ impl EphemeraStarterWithApplication { let block_manager = self.init_block_manager(&mut storage)?; - let (mut shutdown_manager, shutdown_handle) = ShutdownManager::init(); + let (shutdown_manager, shutdown_handle) = ShutdownManager::init(); let mut service_data = ServiceInfo::default(); - let services = self.init_services(&mut service_data, &mut shutdown_manager, provider)?; + let services = self.init_services(&mut service_data, &shutdown_manager, provider)?; Ok(EphemeraStarterWithProvider { with_application: self, @@ -237,7 +237,7 @@ impl EphemeraStarterWithApplication { >( &mut self, service_data: &mut ServiceInfo, - shutdown_manager: &mut ShutdownManager, + shutdown_manager: &ShutdownManager, provider: P, ) -> anyhow::Result>>> { let services = vec![ diff --git a/gateway/Cargo.toml b/gateway/Cargo.toml index 7439ab83367..e6046f93ca8 100644 --- a/gateway/Cargo.toml +++ b/gateway/Cargo.toml @@ -15,6 +15,13 @@ rust-version = "1.56" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +axum = "0.6.20" +sha2 = "0.10.8" +hmac = "0.12.1" +axum-macros = "0.3.8" # Useful for debugging axum Handler trait errors +fastrand = "2" +x25519-dalek = { version = "2.0.0", features = ["static_secrets"] } +base64 = "0.21.4" anyhow = { workspace = true } async-trait = { workspace = true } atty = "0.2" @@ -34,10 +41,21 @@ pretty_env_logger = "0.4" rand = "0.7" serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } -sqlx = { version = "0.5", features = [ "runtime-tokio-rustls", "sqlite", "macros", "migrate", ] } +sqlx = { version = "0.5", features = [ + "runtime-tokio-rustls", + "sqlite", + "macros", + "migrate", +] } subtle-encoding = { version = "0.5", features = ["bech32-preview"] } thiserror = "1" -tokio = { workspace = true, features = [ "rt-multi-thread", "net", "signal", "fs", "time" ] } +tokio = { workspace = true, features = [ + "rt-multi-thread", + "net", + "signal", + "fs", + "time", +] } tokio-stream = { version = "0.1.11", features = ["fs"] } tokio-tungstenite = { version = "0.20.1" } tokio-util = { version = "0.7.4", features = ["codec"] } @@ -64,6 +82,11 @@ nym-types = { path = "../common/types" } nym-validator-client = { path = "../common/client-libs/validator-client" } nym-wireguard = { path = "../common/wireguard", optional = true } +[dev-dependencies] +tower = "0.4.13" +rand = "0.8.5" +hyper = "0.14.27" + [build-dependencies] tokio = { version = "1.24.1", features = ["rt-multi-thread", "macros"] } sqlx = { version = "0.5", features = [ diff --git a/gateway/src/error.rs b/gateway/src/error.rs index dde06909792..80fee0a7fa1 100644 --- a/gateway/src/error.rs +++ b/gateway/src/error.rs @@ -106,6 +106,16 @@ pub(crate) enum GatewayError { #[from] source: NyxdError, }, + #[error("Error verifying hmac digest")] + HmacDigestError { + #[from] + source: hmac::digest::MacError, + }, + #[error("Invalid hmac length")] + HmacInvalidLength { + #[from] + source: hmac::digest::InvalidLength, + }, } impl From for GatewayError { diff --git a/gateway/src/node/client_handling/client_registration.rs b/gateway/src/node/client_handling/client_registration.rs new file mode 100644 index 00000000000..b8fa5a1cbea --- /dev/null +++ b/gateway/src/node/client_handling/client_registration.rs @@ -0,0 +1,181 @@ +use std::{ + collections::HashMap, + fmt, + hash::{Hash, Hasher}, + net::SocketAddr, + ops::Deref, + str::FromStr, +}; + +use base64::{engine::general_purpose, Engine}; +use hmac::{Hmac, Mac}; +use nym_crypto::asymmetric::encryption::PrivateKey; +use serde::{Deserialize, Serialize}; +use sha2::Sha256; +use x25519_dalek::StaticSecret; + +use crate::error::GatewayError; + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub(crate) enum ClientMessage { + Init(InitMessage), + Final(Client), +} + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub(crate) struct InitMessage { + pub_key: ClientPublicKey, +} + +impl InitMessage { + pub fn pub_key(&self) -> &ClientPublicKey { + &self.pub_key + } + #[allow(dead_code)] + pub fn new(pub_key: ClientPublicKey) -> Self { + InitMessage { pub_key } + } +} + +// Client that wants to register sends its PublicKey and SocketAddr bytes mac digest encrypted with a DH shared secret. +// Gateway can then verify pub_key payload using the sme process +#[derive(Debug, Clone, Deserialize, Serialize)] +pub(crate) struct Client { + // base64 encoded public key, using x25519-dalek for impl + pub(crate) pub_key: ClientPublicKey, + pub(crate) socket: SocketAddr, + pub(crate) mac: ClientMac, +} + +pub type HmacSha256 = Hmac; + +impl Client { + // Reusable secret should be gateways Wireguard PK + // Client should perform this step when generating its payload, using its own WG PK + pub fn verify(&self, gateway_key: &PrivateKey, nonce: u64) -> Result<(), GatewayError> { + #[allow(clippy::expect_used)] + let static_secret = + StaticSecret::try_from(gateway_key.to_bytes()).expect("This is infalliable"); + let dh = static_secret.diffie_hellman(&self.pub_key); + let mut mac = HmacSha256::new_from_slice(dh.as_bytes())?; + mac.update(self.pub_key.as_bytes()); + mac.update(self.socket.ip().to_string().as_bytes()); + mac.update(self.socket.port().to_string().as_bytes()); + mac.update(&nonce.to_le_bytes()); + Ok(mac.verify_slice(&self.mac)?) + } + + pub fn pub_key(&self) -> &ClientPublicKey { + &self.pub_key + } + + pub fn socket(&self) -> SocketAddr { + self.socket + } +} + +// This should go into nym-wireguard crate +#[derive(Debug, Clone, Eq, PartialEq)] +pub struct ClientPublicKey(x25519_dalek::PublicKey); +#[derive(Debug, Clone)] +pub(crate) struct ClientMac(Vec); + +impl ClientMac { + #[allow(dead_code)] + pub fn new(mac: Vec) -> Self { + ClientMac(mac) + } +} + +impl Deref for ClientMac { + type Target = Vec; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl fmt::Display for ClientPublicKey { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", general_purpose::STANDARD.encode(self.0.as_bytes())) + } +} + +impl Hash for ClientPublicKey { + fn hash(&self, state: &mut H) { + self.0.as_bytes().hash(state) + } +} + +impl FromStr for ClientMac { + type Err = String; + + fn from_str(s: &str) -> Result { + let mac_bytes: Vec = general_purpose::STANDARD + .decode(s) + .map_err(|_| "Could not base64 decode public key mac representation".to_string())?; + Ok(ClientMac(mac_bytes)) + } +} + +impl Serialize for ClientMac { + fn serialize(&self, serializer: S) -> Result { + let encoded_key = general_purpose::STANDARD.encode(self.0.clone()); + serializer.serialize_str(&encoded_key) + } +} + +impl<'de> Deserialize<'de> for ClientMac { + fn deserialize>(deserializer: D) -> Result { + let encoded_key = String::deserialize(deserializer)?; + ClientMac::from_str(&encoded_key).map_err(serde::de::Error::custom) + } +} + +impl ClientPublicKey { + #[allow(dead_code)] + pub fn new(key: x25519_dalek::PublicKey) -> Self { + ClientPublicKey(key) + } + + pub fn as_bytes(&self) -> &[u8] { + self.0.as_bytes() + } +} + +impl Deref for ClientPublicKey { + type Target = x25519_dalek::PublicKey; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl FromStr for ClientPublicKey { + type Err = String; + + fn from_str(s: &str) -> Result { + let key_bytes: [u8; 32] = general_purpose::STANDARD + .decode(s) + .map_err(|_| "Could not base64 decode public key representation".to_string())? + .try_into() + .map_err(|_| "Invalid key length".to_string())?; + Ok(ClientPublicKey(x25519_dalek::PublicKey::from(key_bytes))) + } +} + +impl Serialize for ClientPublicKey { + fn serialize(&self, serializer: S) -> Result { + let encoded_key = general_purpose::STANDARD.encode(self.0.as_bytes()); + serializer.serialize_str(&encoded_key) + } +} + +impl<'de> Deserialize<'de> for ClientPublicKey { + fn deserialize>(deserializer: D) -> Result { + let encoded_key = String::deserialize(deserializer)?; + Ok(ClientPublicKey::from_str(&encoded_key).map_err(serde::de::Error::custom))? + } +} + +pub(crate) type ClientRegistry = HashMap; diff --git a/gateway/src/node/client_handling/mod.rs b/gateway/src/node/client_handling/mod.rs index bc63605d58c..a5712842f90 100644 --- a/gateway/src/node/client_handling/mod.rs +++ b/gateway/src/node/client_handling/mod.rs @@ -3,6 +3,7 @@ pub(crate) mod active_clients; mod bandwidth; +pub(crate) mod client_registration; pub(crate) mod embedded_network_requester; pub(crate) mod websocket; diff --git a/gateway/src/node/client_handling/websocket/connection_handler/coconut.rs b/gateway/src/node/client_handling/websocket/connection_handler/coconut.rs index 356a884161b..32806da3850 100644 --- a/gateway/src/node/client_handling/websocket/connection_handler/coconut.rs +++ b/gateway/src/node/client_handling/websocket/connection_handler/coconut.rs @@ -91,7 +91,7 @@ impl CoconutVerifier { let req = nym_api_requests::coconut::VerifyCredentialBody::new( credential.clone(), proposal_id, - self.nyxd_client.address().clone(), + self.nyxd_client.address(), ); for client in api_clients { self.nyxd_client diff --git a/gateway/src/node/http/api/mod.rs b/gateway/src/node/http/api/mod.rs new file mode 100644 index 00000000000..c0753a12f84 --- /dev/null +++ b/gateway/src/node/http/api/mod.rs @@ -0,0 +1 @@ +pub(crate) mod v1; diff --git a/gateway/src/node/http/api/v1/client_registry.rs b/gateway/src/node/http/api/v1/client_registry.rs new file mode 100644 index 00000000000..8e0a63b03a3 --- /dev/null +++ b/gateway/src/node/http/api/v1/client_registry.rs @@ -0,0 +1,105 @@ +use axum::extract::Path; +use std::sync::Arc; + +use axum::http::StatusCode; +use axum::{extract::State, Json}; +use std::str::FromStr; + +// use axum_macros::debug_handler; + +use crate::node::client_handling::client_registration::{ + Client, ClientMessage, ClientPublicKey, InitMessage, +}; +use crate::node::http::ApiState; + +async fn process_final_message(client: Client, state: Arc) -> StatusCode { + let preshared_nonce = { + let in_progress_ro = state.registration_in_progress.read().await; + if let Some(nonce) = in_progress_ro.get(client.pub_key()) { + *nonce + } else { + return StatusCode::BAD_REQUEST; + } + }; + + if client + .verify(state.sphinx_key_pair.private_key(), preshared_nonce) + .is_ok() + { + { + let mut in_progress_rw = state.registration_in_progress.write().await; + in_progress_rw.remove(client.pub_key()); + } + { + let mut registry_rw = state.client_registry.write().await; + registry_rw.insert(client.socket(), client); + } + return StatusCode::OK; + } + + StatusCode::BAD_REQUEST +} + +async fn process_init_message(init_message: InitMessage, state: Arc) -> u64 { + let nonce: u64 = fastrand::u64(..); + let mut registry_rw = state.registration_in_progress.write().await; + registry_rw.insert(init_message.pub_key().clone(), nonce); + nonce +} + +// #[debug_handler] +pub(crate) async fn register_client( + State(state): State>, + Json(payload): Json, +) -> (StatusCode, Json>) { + match payload { + ClientMessage::Init(i) => ( + StatusCode::OK, + Json(Some(process_init_message(i, Arc::clone(&state)).await)), + ), + ClientMessage::Final(client) => ( + process_final_message(client, Arc::clone(&state)).await, + Json(None), + ), + } +} + +pub(crate) async fn get_all_clients( + State(state): State>, +) -> (StatusCode, Json>) { + let registry_ro = state.client_registry.read().await; + ( + StatusCode::OK, + Json( + registry_ro + .values() + .map(|c| c.pub_key().clone()) + .collect::>(), + ), + ) +} + +pub(crate) async fn get_client( + Path(pub_key): Path, + State(state): State>, +) -> (StatusCode, Json>) { + let pub_key = match ClientPublicKey::from_str(&pub_key) { + Ok(pub_key) => pub_key, + Err(_) => return (StatusCode::BAD_REQUEST, Json(vec![])), + }; + let registry_ro = state.client_registry.read().await; + let clients = registry_ro + .iter() + .filter_map(|(_, c)| { + if c.pub_key() == &pub_key { + Some(c.clone()) + } else { + None + } + }) + .collect::>(); + if clients.is_empty() { + return (StatusCode::NOT_FOUND, Json(clients)); + } + (StatusCode::OK, Json(clients)) +} diff --git a/gateway/src/node/http/api/v1/mod.rs b/gateway/src/node/http/api/v1/mod.rs new file mode 100644 index 00000000000..082c8dd9549 --- /dev/null +++ b/gateway/src/node/http/api/v1/mod.rs @@ -0,0 +1 @@ +pub(crate) mod client_registry; diff --git a/gateway/src/node/http/mod.rs b/gateway/src/node/http/mod.rs new file mode 100644 index 00000000000..9d091f232fe --- /dev/null +++ b/gateway/src/node/http/mod.rs @@ -0,0 +1,206 @@ +use std::{collections::HashMap, sync::Arc}; + +use axum::{ + routing::{get, post}, + Router, +}; +use log::info; +use nym_crypto::asymmetric::encryption; +use tokio::sync::RwLock; + +mod api; +use api::v1::client_registry::*; + +use super::client_handling::client_registration::{ClientPublicKey, ClientRegistry}; + +const ROUTE_PREFIX: &str = "/api/v1/gateway/client-interfaces/wireguard"; + +pub struct ApiState { + client_registry: Arc>, + sphinx_key_pair: Arc, + registration_in_progress: Arc>>, +} + +fn router_with_state(state: Arc) -> Router { + Router::new() + .route(&format!("{}/clients", ROUTE_PREFIX), get(get_all_clients)) + .route(&format!("{}/client", ROUTE_PREFIX), post(register_client)) + .route( + &format!("{}/client/:pub_key", ROUTE_PREFIX), + get(get_client), + ) + .with_state(state) +} + +pub(crate) async fn start_http_api( + client_registry: Arc>, + sphinx_key_pair: Arc, +) { + // Port should be 80 post smoosh + let port = 8000; + + info!("Started HTTP API on port {}", port); + + let client_registry = Arc::clone(&client_registry); + + let state = Arc::new(ApiState { + client_registry, + sphinx_key_pair, + registration_in_progress: Arc::new(RwLock::new(HashMap::new())), + }); + + let routes = router_with_state(state); + + #[allow(clippy::unwrap_used)] + axum::Server::bind(&format!("0.0.0.0:{}", port).parse().unwrap()) + .serve(routes.into_make_service()) + .await + .unwrap(); +} + +#[cfg(test)] +mod test { + use std::net::SocketAddr; + use std::str::FromStr; + use std::{collections::HashMap, sync::Arc}; + + use axum::body::Body; + use axum::http::Request; + use axum::http::StatusCode; + use hmac::Mac; + use tower::Service; + use tower::ServiceExt; + + use nym_crypto::asymmetric::encryption; + use tokio::sync::RwLock; + use x25519_dalek::{PublicKey, StaticSecret}; + + use crate::node::client_handling::client_registration::{ + Client, ClientMac, ClientMessage, InitMessage, + }; + use crate::node::client_handling::client_registration::{ClientPublicKey, HmacSha256}; + use crate::node::http::{router_with_state, ApiState, ROUTE_PREFIX}; + + #[tokio::test] + async fn registration() { + // 1. Provision random keys for gateway and client + // 2. Generate DH shared secret + // 3. Client submits its public key to the gateway to start the handshake process, gateway responds with nonce + // 4. Client generates mac digest using DH shared secret, its own public key, socket address and port, and nonce + // 5. Client sends its public key, socket address and port, nonce and mac digest to the gateway + // 6. Gateway verifies mac digest and nonce, and stores client's public key and socket address and port + + let mut rng = rand::thread_rng(); + + let gateway_key_pair = encryption::KeyPair::new(&mut rng); + let client_key_pair = encryption::KeyPair::new(&mut rng); + + let gateway_static_public = + PublicKey::try_from(gateway_key_pair.public_key().to_bytes()).unwrap(); + + let client_static_private = + StaticSecret::try_from(client_key_pair.private_key().to_bytes()).unwrap(); + let client_static_public = + PublicKey::try_from(client_key_pair.public_key().to_bytes()).unwrap(); + + let client_dh = client_static_private.diffie_hellman(&gateway_static_public); + + let registration_in_progress = Arc::new(RwLock::new(HashMap::new())); + let client_registry = Arc::new(RwLock::new(HashMap::new())); + + let state = Arc::new(ApiState { + client_registry: Arc::clone(&client_registry), + sphinx_key_pair: Arc::new(gateway_key_pair), + registration_in_progress: Arc::clone(®istration_in_progress), + }); + + // `Router` implements `tower::Service>` so we can + // call it like any tower service, no need to run an HTTP server. + let mut app = router_with_state(state); + + let init_message = + ClientMessage::Init(InitMessage::new(ClientPublicKey::new(client_static_public))); + + let init_request = Request::builder() + .method("POST") + .uri(format!("{}/client", ROUTE_PREFIX)) + .header("Content-type", "application/json") + .body(Body::from(serde_json::to_vec(&init_message).unwrap())) + .unwrap(); + + let response = ServiceExt::>::ready(&mut app) + .await + .unwrap() + .call(init_request) + .await + .unwrap(); + + assert_eq!(response.status(), StatusCode::OK); + assert!(!registration_in_progress.read().await.is_empty()); + + let nonce: Option = + serde_json::from_slice(&hyper::body::to_bytes(response.into_body()).await.unwrap()) + .unwrap(); + assert!(nonce.is_some()); + + let mut mac = HmacSha256::new_from_slice(client_dh.as_bytes()).unwrap(); + mac.update(client_static_public.as_bytes()); + mac.update("127.0.0.1".as_bytes()); + mac.update("8080".as_bytes()); + mac.update(&nonce.unwrap().to_le_bytes()); + let mac = mac.finalize().into_bytes(); + + let finalized_message = ClientMessage::Final(Client { + pub_key: ClientPublicKey::new(client_static_public), + socket: SocketAddr::from_str("127.0.0.1:8080").unwrap(), + mac: ClientMac::new(mac.as_slice().to_vec()), + }); + + let final_request = Request::builder() + .method("POST") + .uri(format!("{}/client", ROUTE_PREFIX)) + .header("Content-type", "application/json") + .body(Body::from(serde_json::to_vec(&finalized_message).unwrap())) + .unwrap(); + + let response = ServiceExt::>::ready(&mut app) + .await + .unwrap() + .call(final_request) + .await + .unwrap(); + + assert_eq!(response.status(), StatusCode::OK); + assert!(!client_registry.read().await.is_empty()); + + let clients_request = Request::builder() + .method("GET") + .uri(format!("{}/clients", ROUTE_PREFIX)) + .body(Body::empty()) + .unwrap(); + + let response = ServiceExt::>::ready(&mut app) + .await + .unwrap() + .call(clients_request) + .await + .unwrap(); + + assert_eq!(response.status(), StatusCode::OK); + + let clients: Vec = + serde_json::from_slice(&hyper::body::to_bytes(response.into_body()).await.unwrap()) + .unwrap(); + + assert!(!clients.is_empty()); + + let ro_clients = client_registry.read().await.clone(); + assert_eq!( + ro_clients + .values() + .map(|c| c.pub_key().clone()) + .collect::>(), + clients + ) + } +} diff --git a/gateway/src/node/mod.rs b/gateway/src/node/mod.rs index 6a59b1e3e7f..132671c8a10 100644 --- a/gateway/src/node/mod.rs +++ b/gateway/src/node/mod.rs @@ -1,6 +1,7 @@ // Copyright 2020-2023 - Nym Technologies SA // SPDX-License-Identifier: Apache-2.0 +use self::client_handling::client_registration::ClientRegistry; use self::storage::PersistentStorage; use crate::commands::helpers::{override_network_requester_config, OverrideNetworkRequesterConfig}; use crate::config::Config; @@ -12,6 +13,7 @@ use crate::node::client_handling::embedded_network_requester::{ use crate::node::client_handling::websocket; use crate::node::client_handling::websocket::connection_handler::coconut::CoconutVerifier; use crate::node::helpers::{initialise_main_storage, load_network_requester_config}; +use crate::node::http::start_http_api; use crate::node::mixnet_handling::receiver::connection_handler::ConnectionHandler; use crate::node::statistics::collector::GatewayStatisticsCollector; use crate::node::storage::Storage; @@ -27,13 +29,16 @@ use nym_task::{TaskClient, TaskManager}; use nym_validator_client::{nyxd, DirectSigningHttpRpcNyxdClient}; use rand::seq::SliceRandom; use rand::thread_rng; +use std::collections::HashMap; use std::error::Error; use std::net::SocketAddr; use std::path::PathBuf; use std::sync::Arc; +use tokio::sync::RwLock; pub(crate) mod client_handling; pub(crate) mod helpers; +pub(crate) mod http; pub(crate) mod mixnet_handling; pub(crate) mod statistics; pub(crate) mod storage; @@ -85,6 +90,8 @@ pub(crate) struct Gateway { /// x25519 keypair used for Diffie-Hellman. Currently only used for sphinx key derivation. sphinx_keypair: Arc, storage: St, + + client_registry: Arc>, } impl Gateway { @@ -100,6 +107,7 @@ impl Gateway { sphinx_keypair: Arc::new(helpers::load_sphinx_keys(&config)?), config, network_requester_opts, + client_registry: Arc::new(RwLock::new(HashMap::new())), }) } @@ -117,6 +125,7 @@ impl Gateway { identity_keypair: Arc::new(identity_keypair), sphinx_keypair: Arc::new(sphinx_keypair), storage, + client_registry: Arc::new(RwLock::new(HashMap::new())), } } @@ -375,6 +384,12 @@ impl Gateway { bail!("{err}") } + // This should likely be wireguard feature gated, but its easier to test if it hangs in here + tokio::spawn(start_http_api( + Arc::clone(&self.client_registry), + Arc::clone(&self.sphinx_keypair), + )); + info!("Finished nym gateway startup procedure - it should now be able to receive mix and client traffic!"); if let Err(err) = Self::wait_for_interrupt(shutdown).await { diff --git a/sdk/rust/nym-sdk/examples/libp2p_ping/main.rs b/sdk/rust/nym-sdk/examples/libp2p_ping/main.rs index 24ea089c102..1a787afb1b7 100644 --- a/sdk/rust/nym-sdk/examples/libp2p_ping/main.rs +++ b/sdk/rust/nym-sdk/examples/libp2p_ping/main.rs @@ -45,7 +45,6 @@ use libp2p::ping::Success; use libp2p::swarm::{keep_alive, NetworkBehaviour, SwarmEvent}; use libp2p::{identity, ping, Multiaddr, PeerId}; use log::{debug, info, LevelFilter}; -use nym_sdk::mixnet::MixnetClient; use std::error::Error; use std::time::Duration; @@ -70,7 +69,7 @@ async fn main() -> Result<(), Box> { use libp2p::swarm::SwarmBuilder; use rust_libp2p_nym::transport::NymTransport; - let client = MixnetClient::connect_new().await.unwrap(); + let client = nym_sdk::mixnet::MixnetClient::connect_new().await.unwrap(); let transport = NymTransport::new(client, local_key.clone()).await?; SwarmBuilder::with_tokio_executor( diff --git a/sdk/rust/nym-sdk/examples/libp2p_shared/transport.rs b/sdk/rust/nym-sdk/examples/libp2p_shared/transport.rs index 88142cc37f7..79c3de5cbbf 100644 --- a/sdk/rust/nym-sdk/examples/libp2p_shared/transport.rs +++ b/sdk/rust/nym-sdk/examples/libp2p_shared/transport.rs @@ -80,6 +80,7 @@ pub struct NymTransport { impl NymTransport { /// New transport. + #[allow(unused)] pub async fn new(client: MixnetClient, keypair: Keypair) -> Result { Self::new_maybe_with_notify_inbound(client, keypair, None, None).await } diff --git a/service-providers/network-requester/src/core.rs b/service-providers/network-requester/src/core.rs index 0bad4c36e76..39b35708fd8 100644 --- a/service-providers/network-requester/src/core.rs +++ b/service-providers/network-requester/src/core.rs @@ -34,7 +34,7 @@ use nym_socks5_requests::{ }; use nym_sphinx::addressing::clients::Recipient; use nym_sphinx::anonymous_replies::requests::AnonymousSenderTag; -use nym_sphinx::params::PacketSize; +use nym_sphinx::params::{PacketSize, PacketType}; use nym_sphinx::receiver::ReconstructedMessage; use nym_statistics_common::collector::StatisticsSender; use nym_task::connections::LaneQueueLengths; @@ -186,7 +186,7 @@ impl NRServiceProviderBuilder { allowed_hosts::HostsStore::new(&config.storage_paths.unknown_list_location); let outbound_request_filter = - OutboundRequestFilter::new(allowed_hosts.clone(), standard_list.clone(), unknown_hosts); + OutboundRequestFilter::new(allowed_hosts, standard_list, unknown_hosts); NRServiceProviderBuilder { config, @@ -321,6 +321,7 @@ impl NRServiceProviderBuilder { let stats_collector_clone = stats_collector.clone(); let mixnet_client_sender = mixnet_client.split_sender(); let self_address = *mixnet_client.nym_address(); + let packet_type = self.config.base.debug.traffic.packet_type; // start the listener for mix messages tokio::spawn(async move { @@ -328,6 +329,7 @@ impl NRServiceProviderBuilder { mixnet_client_sender, mix_input_receiver, stats_collector_clone, + packet_type, ) .await; }); @@ -420,6 +422,7 @@ impl NRServiceProvider { mixnet_client_sender: nym_sdk::mixnet::MixnetClientSender, mut mix_input_reader: MixProxyReader, stats_collector: Option, + packet_type: PacketType, ) { loop { tokio::select! { @@ -440,7 +443,7 @@ impl NRServiceProvider { } } - let response_message = msg.into_input_message(); + let response_message = msg.into_input_message(packet_type); mixnet_client_sender.send(response_message).await.unwrap(); } else { log::error!("Exiting: channel closed!"); diff --git a/service-providers/network-requester/src/reply.rs b/service-providers/network-requester/src/reply.rs index e3eac3a03c9..4a2c620eb6f 100644 --- a/service-providers/network-requester/src/reply.rs +++ b/service-providers/network-requester/src/reply.rs @@ -11,6 +11,7 @@ use nym_socks5_requests::{ }; use nym_sphinx::addressing::clients::Recipient; use nym_sphinx::anonymous_replies::requests::AnonymousSenderTag; +use nym_sphinx::params::PacketType; use nym_task::connections::TransmissionLane; use std::fmt::{Debug, Formatter}; @@ -148,8 +149,9 @@ impl MixnetMessage { self.data.len() } - pub(crate) fn into_input_message(self) -> InputMessage { - self.address.send_back_to(self.data, self.connection_id) + pub(crate) fn into_input_message(self, packet_type: PacketType) -> InputMessage { + self.address + .send_back_to(self.data, self.connection_id, packet_type) } } @@ -175,17 +177,28 @@ impl MixnetAddress { None } - pub(super) fn send_back_to(self, message: Vec, connection_id: u64) -> InputMessage { + pub(super) fn send_back_to( + self, + message: Vec, + connection_id: u64, + packet_type: PacketType, + ) -> InputMessage { match self { - MixnetAddress::Known(recipient) => InputMessage::Regular { - recipient: *recipient, - data: message, - lane: TransmissionLane::ConnectionId(connection_id), + MixnetAddress::Known(recipient) => InputMessage::MessageWrapper { + message: Box::new(InputMessage::Regular { + recipient: *recipient, + data: message, + lane: TransmissionLane::ConnectionId(connection_id), + }), + packet_type, }, - MixnetAddress::Anonymous(sender_tag) => InputMessage::Reply { - recipient_tag: sender_tag, - data: message, - lane: TransmissionLane::ConnectionId(connection_id), + MixnetAddress::Anonymous(sender_tag) => InputMessage::MessageWrapper { + message: Box::new(InputMessage::Reply { + recipient_tag: sender_tag, + data: message, + lane: TransmissionLane::ConnectionId(connection_id), + }), + packet_type, }, } }