Skip to content

Commit

Permalink
Refine logging
Browse files Browse the repository at this point in the history
  • Loading branch information
octol committed Oct 3, 2023
1 parent 0c9f58e commit 9ba2175
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 86 deletions.
116 changes: 50 additions & 66 deletions common/wireguard/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,15 @@ mod error;
mod event;
mod tun;

// const WG_ADDRESS: &str = "0.0.0.0:51820";
const WG_ADDRESS: &str = "0.0.0.0:51822";

// The wireguard UDP listener
const WG_ADDRESS: &str = "0.0.0.0";
const WG_PORT: u16 = 51822;

// The interface used to route traffic
const TUN_BASE_NAME: &str = "nymtun";
const TUN_DEVICE_ADDRESS: &str = "10.0.0.1";
const TUN_DEVICE_NETMASK: &str = "255.255.255.0";

// The private key of the listener
// Corresponding public key: "WM8s8bYegwMa0TJ+xIwhk+dImk2IpDUKslDBCZPizlE="
const PRIVATE_KEY: &str = "AEqXrLFT4qjYq3wmX0456iv94uM6nDj5ugp6Jedcflg=";
Expand All @@ -44,10 +48,6 @@ const PEERS: &[&str; 1] = &[
// "mxV/mw7WZTe+0Msa0kvJHMHERDA/cSskiZWQce+TdEs=",
];

const TUN_BASE_NAME: &str = "nymtun";
const TUN_DEVICE_ADDRESS: &str = "10.0.0.1";
const TUN_DEVICE_NETMASK: &str = "255.255.255.0";

const MAX_PACKET: usize = 65535;

type ActivePeers = DashMap<SocketAddr, mpsc::UnboundedSender<Event>>;
Expand Down Expand Up @@ -107,13 +107,10 @@ fn setup_tokio_tun_device(name: &str, address: Ipv4Addr, netmask: Ipv4Addr) -> t
.expect("Failed to setup tun device, do you have permission?")
}

fn start_tun_device(active_peers: Arc<ActivePeers>) -> UnboundedSender<Vec<u8>> {
fn start_tun_device(_active_peers: Arc<ActivePeers>) -> UnboundedSender<Vec<u8>> {
let tun = setup_tokio_tun_device(
format!("{}%d", TUN_BASE_NAME).as_str(),
// "nymtun%d",
// Ipv4Addr::new(10, 0, 0, 1),
TUN_DEVICE_ADDRESS.parse().unwrap(),
// Ipv4Addr::new(255, 255, 255, 0),
TUN_DEVICE_NETMASK.parse().unwrap(),
);
log::info!("Created TUN device: {}", tun.name());
Expand All @@ -128,59 +125,45 @@ fn start_tun_device(active_peers: Arc<ActivePeers>) -> UnboundedSender<Vec<u8>>
loop {
tokio::select! {
// Reading from the TUN device
// TODO: handle Err(_)
Ok(len) = tun_device_rx.read(&mut buf) => {
log::info!("tun device: read {} bytes from tun", len);
log::info!("tun device: sending data to peers");

// Figure out the peer it's meant for.
let packet = &buf[..len];

let boringtun_parsed_dst_address = boringtun::noise::Tunn::dst_address(packet).unwrap();
log::info!("borintun parsed dst address: {boringtun_parsed_dst_address}");

let headers = SlicedPacket::from_ip(packet).unwrap();
let peer = match headers.ip.unwrap() {
InternetSlice::Ipv4(ip, _) => {
let source_addr = ip.source_addr();
let destination_addr = ip.destination_addr();
log::info!("IPv4: {source_addr} -> {destination_addr}");

log::info!("Mapping to peer using: {destination_addr}:{WG_PORT}");
// NOTE: I think we should probably use the allowed IP here instead to
// route the packet to the correct peer
// Consier using ip_network_table crate
dbg!(&active_peers);
active_peers.get(&SocketAddr::new(
std::net::IpAddr::V4(destination_addr),
WG_PORT,
))
},
InternetSlice::Ipv6(ip, _) => {
let source_addr = ip.source_addr();
let destination_addr = ip.destination_addr();
log::info!("IPv6: {source_addr} -> {destination_addr} (NOT IMPLEMENTED)");
continue;
},
};

let Some(peer) = peer else {
log::warn!("No matching peers connected, dropping packet");
continue;
};

// Forward to the peer
log::info!("Forward to peer: {len}");
peer.send(Event::IpPacket(buf[..len].to_vec().into())).unwrap();
len = tun_device_rx.read(&mut buf) => match len {
Ok(len) => {
let packet = &buf[..len];
let dst_addr = boringtun::noise::Tunn::dst_address(packet).unwrap();

let headers = SlicedPacket::from_ip(packet).unwrap();
let src_addr = match headers.ip.unwrap() {
InternetSlice::Ipv4(ip, _) => ip.source_addr().to_string(),
InternetSlice::Ipv6(ip, _) => ip.source_addr().to_string(),
};
log::info!("iface: read Packet({src_addr} -> {dst_addr}, {len} bytes)");

// TODO: route packet to the correct peer.
log::info!("...forward packet to the correct peer (NOT YET IMPLEMENTED)");
},
Err(err) => {
log::info!("iface: read error: {err}");
break;
}
},

// Writing to the TUN device
Some(data) = tun_task_rx.recv() => {
log::info!("tun device: writing {} bytes to tun", data.len());
let headers = SlicedPacket::from_ip(&data).unwrap();
let (source_addr, destination_addr) = match headers.ip.unwrap() {
InternetSlice::Ipv4(ip, _) => (ip.source_addr(), ip.destination_addr()),
InternetSlice::Ipv6(_, _) => unimplemented!(),
};

log::info!(
"iface: write Packet({source_addr} -> {destination_addr}, {} bytes)",
data.len()
);
// log::info!("iface: writing {} bytes", data.len());
tun_device_tx.write_all(&data).await.unwrap();
}
}
log::info!("tun device: shutting down");
}
log::info!("TUN device shutting down");
});
tun_task_tx
}
Expand All @@ -190,8 +173,9 @@ async fn start_udp_listener(
active_peers: Arc<ActivePeers>,
mut task_client: TaskClient,
) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
log::info!("Starting wireguard udp listener on {}", WG_ADDRESS);
let udp_socket = Arc::new(UdpSocket::bind(WG_ADDRESS).await?);
let wg_address = SocketAddr::new(WG_ADDRESS.parse().unwrap(), WG_PORT);
log::info!("Starting wireguard UDP listener on {wg_address}");
let udp_socket = Arc::new(UdpSocket::bind(wg_address).await?);

// Setup some static keys for development
let (static_private, peer_static_public) = init_static_dev_keys();
Expand All @@ -205,32 +189,32 @@ async fn start_udp_listener(
while !task_client.is_shutdown() {
tokio::select! {
_ = task_client.recv() => {
log::trace!("WireGuard listener: received shutdown");
log::trace!("WireGuard UDP listener: received shutdown");
break;
}
// Handle tunnel closing
Some(addr) = active_peers_task_handles.next() => {
match addr {
Ok(addr) => {
info!("Removing peer: {addr:?}");
log::info!("Removing peer: {addr:?}");
active_peers.remove(&addr);
}
Err(err) => {
error!("WireGuard listener: error receiving shutdown from peer: {err}");
error!("WireGuard UDP listener: error receiving shutdown from peer: {err}");
}
}
},
// Handle incoming packets
Ok((len, addr)) = udp_socket.recv_from(&mut buf) => {
log::info!("Received {} bytes from {}", len, addr);
log::trace!("udp: received {} bytes from {}", len, addr);

if let Some(peer_tx) = active_peers.get_mut(&addr) {
log::info!("WireGuard listener: received packet from known peer");
log::info!("udp: received {len} bytes from {addr} from known peer");
peer_tx.send(Event::WgPacket(buf[..len].to_vec().into()))
.tap_err(|err| log::error!("{err}"))
.unwrap();
} else {
log::info!("WireGuard listener: received packet from unknown peer, starting tunnel");
log::info!("udp: received {len} bytes from {addr} from unknown peer, starting tunnel");
let (join_handle, peer_tx) = start_wg_tunnel(
addr,
udp_socket.clone(),
Expand All @@ -243,7 +227,7 @@ async fn start_udp_listener(
.unwrap();

// WIP(JON): active peers should probably be keyed by peer_static_public
// instead. Does this current setup lead to any vulnerabilities?
// instead. Does this current setup lead to any issues?
log::info!("Adding peer: {addr}");
active_peers.insert(addr, peer_tx);
active_peers_task_handles.push(join_handle);
Expand Down
27 changes: 7 additions & 20 deletions common/wireguard/src/tun.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@ impl WireGuardTunnel {
peer_static_public: x25519::PublicKey,
tunnel_tx: mpsc::UnboundedSender<Vec<u8>>,
) -> (Self, mpsc::UnboundedSender<Event>) {
log::info!("New wg tunnel: endpoint: {endpoint}, udp: {udp:?}");
let local_addr = udp.local_addr().unwrap();
let peer_addr = udp.peer_addr();
log::info!("New wg tunnel: endpoint: {endpoint}, local_addr: {local_addr}, peer_addr: {peer_addr:?}");

let preshared_key = None;
let persistent_keepalive = None;
let index = 0;
Expand Down Expand Up @@ -158,11 +161,7 @@ impl WireGuardTunnel {
let mut tunnel = self.wg_tunnel_lock().await?;
match tunnel.decapsulate(None, data, &mut send_buf) {
TunnResult::WriteToNetwork(packet) => {
log::info!(
"consume_wg: write to network: endpoint: {}: {}",
self.endpoint,
packet.len()
);
log::info!("udp: send {} bytes to {}", packet.len(), self.endpoint);
if let Err(err) = self.udp.send_to(packet, self.endpoint).await {
error!("Failed to send decapsulation-instructed packet to WireGuard endpoint: {err:?}");
};
Expand All @@ -171,11 +170,7 @@ impl WireGuardTunnel {
let mut send_buf = [0u8; MAX_PACKET];
match tunnel.decapsulate(None, &[], &mut send_buf) {
TunnResult::WriteToNetwork(packet) => {
log::info!(
"consume_wg: write to network: endpoint: {}: {}",
self.endpoint,
packet.len()
);
log::info!("udp: send {} bytes to {}", packet.len(), self.endpoint);
if let Err(err) = self.udp.send_to(packet, self.endpoint).await {
error!("Failed to send decapsulation-instructed packet to WireGuard endpoint: {err:?}");
break;
Expand All @@ -188,19 +183,11 @@ impl WireGuardTunnel {
}
}
TunnResult::WriteToTunnelV4(packet, _) | TunnResult::WriteToTunnelV6(packet, _) => {
log::info!("consume_wg: write to tunnel: {}", packet.len());

// Parse the `packet` and inspect it's contents.
let headers = SlicedPacket::from_ip(packet).unwrap();
let (source_addr, destination_addr) = match headers.ip.unwrap() {
let (source_addr, _destination_addr) = match headers.ip.unwrap() {
InternetSlice::Ipv4(ip, _) => (ip.source_addr(), ip.destination_addr()),
InternetSlice::Ipv6(_, _) => unimplemented!(),
};

log::info!(
"{source_addr} -> {destination_addr}: {} bytes",
packet.len()
);
self.set_source_addr(source_addr);
self.tun_task_tx.send(packet.to_vec()).unwrap();
}
Expand Down

0 comments on commit 9ba2175

Please sign in to comment.