From c94c7e1a9892b2ba4ed5166ef9a31b159865c863 Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Sun, 14 Jan 2024 05:52:42 +0100 Subject: [PATCH 1/8] Skip unnecessary interaction with service task --- src/handler/mod.rs | 24 +++++++----------------- src/handler/nat_hole_punch/mod.rs | 2 +- 2 files changed, 8 insertions(+), 18 deletions(-) diff --git a/src/handler/mod.rs b/src/handler/mod.rs index 301e72344..6c0a859a1 100644 --- a/src/handler/mod.rs +++ b/src/handler/mod.rs @@ -1112,7 +1112,7 @@ impl Handler { debug!("peer {node_address} relayed a hole punch notification but we are not behind nat"); } _ => { - if let Err(e) = self.on_relay_msg(initr, timed_out_nonce).await { + if let Err(e) = self.on_relay_msg::

(initr, timed_out_nonce).await { warn!( "failed handling notification relayed from {node_address}, {e}" ); @@ -1590,7 +1590,7 @@ impl HolePunchNat for Handler { Ok(()) } - async fn on_relay_msg( + async fn on_relay_msg( &mut self, initr: Enr, timed_out_nonce: MessageNonce, @@ -1603,10 +1603,7 @@ impl HolePunchNat for Handler { // A session may already have been established. if self.sessions.cache.get(&initiator_node_address).is_some() { - trace!( - "Session already established with initiator: {}", - initiator_node_address - ); + trace!("Session already established with initiator: {initiator_node_address}"); return Ok(()); } // Possibly, an attempt to punch this hole, using another relay, is in progress. @@ -1615,22 +1612,15 @@ impl HolePunchNat for Handler { .get(&initiator_node_address) .is_some() { - trace!( - "WHOAREYOU packet already sent to initiator: {}", - initiator_node_address - ); + trace!("WHOAREYOU packet already sent to initiator: {initiator_node_address}"); return Ok(()); } + // If not hole punch attempts are in progress, spawn a WHOAREYOU event to punch a hole in // our NAT for initiator. let whoareyou_ref = WhoAreYouRef(initiator_node_address, timed_out_nonce); - if let Err(e) = self - .service_send - .send(HandlerOut::WhoAreYou(whoareyou_ref)) - .await - { - return Err(NatHolePunchError::Target(e.into())); - } + self.send_challenge::

(whoareyou_ref, None).await; + Ok(()) } diff --git a/src/handler/nat_hole_punch/mod.rs b/src/handler/nat_hole_punch/mod.rs index e59166bce..f2cd009f4 100644 --- a/src/handler/nat_hole_punch/mod.rs +++ b/src/handler/nat_hole_punch/mod.rs @@ -36,7 +36,7 @@ pub trait HolePunchNat { /// A RelayMsg notification is received over discv5 indicating this node is the target. Should /// trigger a WHOAREYOU to be sent to the initiator using the `nonce` in the RelayMsg. - async fn on_relay_msg( + async fn on_relay_msg( &mut self, initr: Enr, timed_out_nonce: MessageNonce, From 3620b37f79414bd94532f4d43f70fd2232d2790e Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Sun, 14 Jan 2024 05:53:25 +0100 Subject: [PATCH 2/8] Add test for target --- src/handler/tests.rs | 112 ++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 110 insertions(+), 2 deletions(-) diff --git a/src/handler/tests.rs b/src/handler/tests.rs index 15b1946ed..850afe837 100644 --- a/src/handler/tests.rs +++ b/src/handler/tests.rs @@ -3,7 +3,7 @@ use super::*; use crate::{ handler::sessions::session::build_dummy_session, - packet::{DefaultProtocolId, PacketHeader, MAX_PACKET_SIZE}, + packet::{DefaultProtocolId, PacketHeader, MAX_PACKET_SIZE, MESSAGE_NONCE_LENGTH}, return_if_ipv6_is_not_supported, rpc::{Request, Response}, Discv5ConfigBuilder, IpMode, @@ -485,7 +485,7 @@ async fn remove_one_time_session() { } #[tokio::test(flavor = "multi_thread")] -async fn relay() { +async fn nat_hole_punch_relay() { init(); // Relay @@ -628,3 +628,111 @@ async fn relay() { _ => panic!("message should decode to a relay msg notification"), } } + +#[tokio::test(flavor = "multi_thread")] +async fn nat_hole_punch_target() { + init(); + + // Target + let listen_config = ListenConfig::default().with_ipv4(Ipv4Addr::LOCALHOST, 9902); + let (mut handler, mock_service) = + build_handler_with_listen_config::(listen_config).await; + let tgt_addr = handler.enr.read().udp4_socket().unwrap().into(); + let tgt_node_id = handler.enr.read().node_id(); + let dummy_session = build_dummy_session(); + handler.nat_hole_puncher.is_behind_nat = Some(true); + + // Relay + let relay_enr = { + let key = CombinedKey::generate_secp256k1(); + EnrBuilder::new("v4") + .ip4(Ipv4Addr::LOCALHOST) + .udp4(9022) + .build(&key) + .unwrap() + }; + let relay_addr = relay_enr.udp4_socket().unwrap().into(); + let relay_node_id = relay_enr.node_id(); + + let relay_node_address = NodeAddress::new(relay_addr, relay_node_id); + handler + .sessions + .cache + .insert(relay_node_address, dummy_session.clone()); + + let relay_socket = UdpSocket::bind(relay_addr) + .await + .expect("should bind to target socket"); + + // Initiator + let initr_enr = { + let key = CombinedKey::generate_secp256k1(); + EnrBuilder::new("v4") + .ip4(Ipv4Addr::LOCALHOST) + .udp4(9021) + .build(&key) + .unwrap() + }; + let initr_addr = initr_enr.udp4_socket().unwrap(); + let initr_node_id = initr_enr.node_id(); + let initr_nonce: MessageNonce = [1; MESSAGE_NONCE_LENGTH]; + + let initr_socket = UdpSocket::bind(initr_addr) + .await + .expect("should bind to initiator socket"); + + // Target handle + let tgt_handle = tokio::spawn(async move { handler.start::().await }); + + // Relay handle + let relay_msg_notif = Notification::RelayMsg(initr_enr.clone(), initr_nonce); + + let relay_handle = tokio::spawn(async move { + let mut session = build_dummy_session(); + let packet = session + .encrypt_session_message::(relay_node_id, &relay_msg_notif.encode()) + .expect("should encrypt notification"); + let encoded_packet = packet.encode::(&tgt_node_id); + + relay_socket + .send_to(&encoded_packet, tgt_addr) + .await + .expect("should relay init notification to relay") + }); + + // Initiator handle + let target_exit = mock_service.exit_tx; + let initr_handle = tokio::spawn(async move { + let mut buffer = [0; MAX_PACKET_SIZE]; + let res = initr_socket + .recv_from(&mut buffer) + .await + .expect("should read bytes from socket"); + + drop(target_exit); + + (res, buffer) + }); + + // Join all handles + let (tgt_res, relay_res, initr_res) = tokio::join!(tgt_handle, relay_handle, initr_handle); + + tgt_res.unwrap(); + relay_res.unwrap(); + + let ((length, src), buffer) = initr_res.unwrap(); + + assert_eq!(src, tgt_addr); + + let (packet, _aad) = Packet::decode::(&initr_node_id, &buffer[..length]) + .expect("should decode packet"); + let Packet { header, .. } = packet; + let PacketHeader { + kind, + message_nonce, + .. + } = header; + + assert!(kind.is_whoareyou()); + assert_eq!(message_nonce, initr_nonce) +} From b32e750736769d8f84143b1cb23b54c84f5d8d24 Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Sun, 14 Jan 2024 07:12:06 +0100 Subject: [PATCH 3/8] Remove unnecessary memory re-allocation for sessions cache --- src/handler/mod.rs | 22 +++++++++------------- src/service.rs | 16 ++++++---------- 2 files changed, 15 insertions(+), 23 deletions(-) diff --git a/src/handler/mod.rs b/src/handler/mod.rs index 6c0a859a1..d5ac1131c 100644 --- a/src/handler/mod.rs +++ b/src/handler/mod.rs @@ -117,8 +117,8 @@ pub enum HandlerIn { /// [`Notification::RelayMsg`] we intend to relay to that peer. HolePunchEnr(Enr, Notification), - /// Observed socket has been update. The old socket and the current socket. - SocketUpdate(Option, SocketAddr), + /// Observed socket has been update, contains the current socket. + SocketUpdate(SocketAddr), } /// Messages sent between a node on the network and `Handler`. @@ -377,19 +377,15 @@ impl Handler { warn!("Failed to relay. Error: {}", e); } } - HandlerIn::SocketUpdate(old_socket, socket) => { + HandlerIn::SocketUpdate(socket) => { let ip = socket.ip(); let port = socket.port(); - if old_socket.is_none() { - // This node goes from being unreachable to being reachable. Remove - // its sessions to trigger a WHOAREYOU from peers on next sent - // message. If the peer is running this implementation of - // discovery, this makes it possible for the local node to be - // inserted into its peers' kbuckets before the session they - // already had expires. Session duration, in this impl defaults to - // 24 hours. - self.sessions.cache.clear() - } + // This node goes from being unreachable to being reachable. + // Reasonably assuming all its peers are indexing sessions based on + // `node_id`, like this implementation, the first message sent in each + // session from here on will trigger a WHOAREYOU message from the peer + // (since the peer won't be able to find the decryption key for the + // session with the new node id as message's src id). self.nat_hole_puncher.set_is_behind_nat(self.listen_sockets.iter(), Some(ip), Some(port)); } } diff --git a/src/service.rs b/src/service.rs index 17258be7e..ea72e5339 100644 --- a/src/service.rs +++ b/src/service.rs @@ -855,11 +855,9 @@ impl Service { new_ip6, )); // Notify Handler of socket update - if let Err(e) = - self.handler_send.send(HandlerIn::SocketUpdate( - local_ip6_socket.map(SocketAddr::V6), - new_ip6, - )) + if let Err(e) = self + .handler_send + .send(HandlerIn::SocketUpdate(new_ip6)) { warn!("Failed to send socket update to handler: {}", e); }; @@ -883,11 +881,9 @@ impl Service { new_ip4, )); // Notify Handler of socket update - if let Err(e) = - self.handler_send.send(HandlerIn::SocketUpdate( - local_ip4_socket.map(SocketAddr::V4), - new_ip4, - )) + if let Err(e) = self + .handler_send + .send(HandlerIn::SocketUpdate(new_ip4)) { warn!("Failed to send socket update {}", e); }; From b94736464ad2dc639895b86d49a4658fcdeac56a Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Sun, 14 Jan 2024 07:40:48 +0100 Subject: [PATCH 4/8] Reset previous commit --- src/handler/mod.rs | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/src/handler/mod.rs b/src/handler/mod.rs index d5ac1131c..2915377be 100644 --- a/src/handler/mod.rs +++ b/src/handler/mod.rs @@ -117,8 +117,8 @@ pub enum HandlerIn { /// [`Notification::RelayMsg`] we intend to relay to that peer. HolePunchEnr(Enr, Notification), - /// Observed socket has been update, contains the current socket. - SocketUpdate(SocketAddr), + /// Observed socket has been update. The old socket and the current socket. + SocketUpdate(Option, SocketAddr), } /// Messages sent between a node on the network and `Handler`. @@ -377,15 +377,20 @@ impl Handler { warn!("Failed to relay. Error: {}", e); } } - HandlerIn::SocketUpdate(socket) => { + HandlerIn::SocketUpdate(old_socket, socket) => { let ip = socket.ip(); let port = socket.port(); - // This node goes from being unreachable to being reachable. - // Reasonably assuming all its peers are indexing sessions based on - // `node_id`, like this implementation, the first message sent in each - // session from here on will trigger a WHOAREYOU message from the peer - // (since the peer won't be able to find the decryption key for the - // session with the new node id as message's src id). + if old_socket.is_none() { + // This node goes from being unreachable to being reachable, but + // keeps the same enr key (hence same node id). Remove its + // sessions to trigger a WHOAREYOU from peers on next sent + // message. If the peer is running this implementation of + // discovery, this makes it possible for the local node to be + // inserted into its peers' kbuckets before the session they + // already had expires. Session duration, in this impl defaults to + // 24 hours. + self.sessions.cache.clear() + } self.nat_hole_puncher.set_is_behind_nat(self.listen_sockets.iter(), Some(ip), Some(port)); } } From e0a2d4af5eecd142c79f7f80e5a3ff32dbae88d0 Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Sun, 14 Jan 2024 09:09:58 +0100 Subject: [PATCH 5/8] Improve type safety for notifications --- src/handler/mod.rs | 110 ++++++++--------- src/handler/nat_hole_punch/mod.rs | 19 ++- src/handler/tests.rs | 11 +- src/kbucket/entry.rs | 4 +- src/rpc.rs | 42 ++++--- src/rpc/notification.rs | 197 +++++++++++++++++++----------- src/service.rs | 53 +++++--- 7 files changed, 248 insertions(+), 188 deletions(-) diff --git a/src/handler/mod.rs b/src/handler/mod.rs index 2915377be..c1c05fba9 100644 --- a/src/handler/mod.rs +++ b/src/handler/mod.rs @@ -32,7 +32,8 @@ use crate::{ error::{Discv5Error, RequestError}, packet::{ChallengeData, IdNonce, MessageNonce, Packet, PacketKind, ProtocolIdentity}, rpc::{ - Message, Notification, Payload, Request, RequestBody, RequestId, Response, ResponseBody, + Message, Payload, RelayInitNotification, RelayMsgNotification, Request, RequestBody, + RequestId, Response, ResponseBody, }, socket, socket::{FilterConfig, Outbound, Socket}, @@ -114,8 +115,8 @@ pub enum HandlerIn { WhoAreYou(WhoAreYouRef, Option), /// A response to a [`HandlerOut::FindHolePunchEnr`]. Returns the ENR and the - /// [`Notification::RelayMsg`] we intend to relay to that peer. - HolePunchEnr(Enr, Notification), + /// [`RelayInitNotification`] from [`HandlerOut::FindHolePunchEnr`]. + HolePunchEnr(Enr, RelayInitNotification), /// Observed socket has been update. The old socket and the current socket. SocketUpdate(Option, SocketAddr), @@ -147,8 +148,8 @@ pub enum HandlerOut { RequestFailed(RequestId, RequestError), /// Look-up an ENR in k-buckets. Passes the node id of the peer to look up and the - /// [`Notification::RelayMsg`] we intend to send to it. - FindHolePunchEnr(NodeId, Notification), + /// [`RelayMsgNotification`] we intend to send to it. + FindHolePunchEnr(RelayInitNotification), } /// How we connected to the node. @@ -372,7 +373,10 @@ impl Handler { } HandlerIn::Response(dst, response) => self.send_response::

(dst, *response).await, HandlerIn::WhoAreYou(wru_ref, enr) => self.send_challenge::

(wru_ref, enr).await, - HandlerIn::HolePunchEnr(tgt_enr, relay_msg_notif) => { + HandlerIn::HolePunchEnr(tgt_enr, relay_init) => { + // Assemble the notification for the target + let (initr_enr, _tgt, timed_out_nonce) = relay_init.into(); + let relay_msg_notif = RelayMsgNotification::new(initr_enr, timed_out_nonce); if let Err(e) = self.send_relay_msg_notif::

(tgt_enr, relay_msg_notif).await { warn!("Failed to relay. Error: {}", e); } @@ -380,17 +384,12 @@ impl Handler { HandlerIn::SocketUpdate(old_socket, socket) => { let ip = socket.ip(); let port = socket.port(); - if old_socket.is_none() { - // This node goes from being unreachable to being reachable, but - // keeps the same enr key (hence same node id). Remove its - // sessions to trigger a WHOAREYOU from peers on next sent - // message. If the peer is running this implementation of - // discovery, this makes it possible for the local node to be - // inserted into its peers' kbuckets before the session they - // already had expires. Session duration, in this impl defaults to - // 24 hours. - self.sessions.cache.clear() - } + // This node goes from being unreachable to being reachable. + // Reasonably assuming all its peers are indexing sessions based on + // `node_id`, like this implementation, the first message sent in each + // session from here on will trigger a WHOAREYOU message from the peer + // (since the peer won't be able to find the decryption key for the + // session with the new node id as message's src id). self.nat_hole_puncher.set_is_behind_nat(self.listen_sockets.iter(), Some(ip), Some(port)); } } @@ -1089,39 +1088,35 @@ impl Handler { match message { Message::Response(response) => self.handle_response::

(node_address, response).await, - Message::Notification(notif) => match notif { - Notification::RelayInit(initr, tgt, timed_out_nonce) => { - let initr_node_id = initr.node_id(); - if initr_node_id != node_address.node_id { - warn!("peer {node_address} tried to initiate hole punch attempt for another node {initr_node_id}, banning peer {node_address}"); - self.fail_session(&node_address, RequestError::MaliciousRelayInit, true) - .await; - let ban_timeout = self - .nat_hole_puncher - .ban_duration - .map(|v| Instant::now() + v); - PERMIT_BAN_LIST.write().ban(node_address, ban_timeout); - } else if let Err(e) = self.on_relay_init(initr, tgt, timed_out_nonce).await { - warn!("failed handling notification to relay for {node_address}, {e}"); - } + Message::RelayInitNotification(notif) => { + let initr_node_id = notif.initiator_enr().node_id(); + if initr_node_id != node_address.node_id { + warn!("peer {node_address} tried to initiate hole punch attempt for another node {initr_node_id}, banning peer {node_address}"); + self.fail_session(&node_address, RequestError::MaliciousRelayInit, true) + .await; + let ban_timeout = self + .nat_hole_puncher + .ban_duration + .map(|v| Instant::now() + v); + PERMIT_BAN_LIST.write().ban(node_address, ban_timeout); + } else if let Err(e) = self.on_relay_init(notif).await { + warn!("failed handling notification to relay for {node_address}, {e}"); } - Notification::RelayMsg(initr, timed_out_nonce) => { - match self.nat_hole_puncher.is_behind_nat { - Some(false) => { - // initr may not be malicious and initiated a hole punch attempt when - // a request to this node timed out for another reason - debug!("peer {node_address} relayed a hole punch notification but we are not behind nat"); - } - _ => { - if let Err(e) = self.on_relay_msg::

(initr, timed_out_nonce).await { - warn!( - "failed handling notification relayed from {node_address}, {e}" - ); - } + } + Message::RelayMsgNotification(notif) => { + match self.nat_hole_puncher.is_behind_nat { + Some(false) => { + // initr may not be malicious and initiated a hole punch attempt when + // a request to this node timed out for another reason + debug!("peer {node_address} relayed a hole punch notification but we are not behind nat"); + } + _ => { + if let Err(e) = self.on_relay_msg::

(notif).await { + warn!("failed handling notification relayed from {node_address}, {e}"); } } } - }, + } Message::Request(_) => { warn!( "Peer sent message type {} that shouldn't be sent in packet type `Session Message`, {}", @@ -1200,7 +1195,7 @@ impl Handler { warn!("Received a response in a `Message` packet, should be sent in a `SessionMessage`"); self.handle_response::

(node_address, response).await } - Message::Notification(_) => { + Message::RelayInitNotification(_) | Message::RelayMsgNotification(_) => { warn!( "Peer sent message type {} that shouldn't be sent in packet type `Message`, {}", message.msg_type(), @@ -1542,7 +1537,7 @@ impl HolePunchNat for Handler { } if let Some(session) = self.sessions.cache.get_mut(&relay) { let relay_init_notif = - Notification::RelayInit(local_enr, target_node_address.node_id, timed_out_nonce); + RelayInitNotification::new(local_enr, target_node_address.node_id, timed_out_nonce); trace!( "Sending notif to relay {}. relay init: {}", relay.node_id, @@ -1573,17 +1568,12 @@ impl HolePunchNat for Handler { async fn on_relay_init( &mut self, - initr: Enr, - tgt: NodeId, - timed_out_nonce: MessageNonce, + relay_init: RelayInitNotification, ) -> Result<(), NatHolePunchError> { - // Assemble the notification for the target - let relay_msg_notif = Notification::RelayMsg(initr, timed_out_nonce); - // Check for target peer in our kbuckets otherwise drop notification. if let Err(e) = self .service_send - .send(HandlerOut::FindHolePunchEnr(tgt, relay_msg_notif)) + .send(HandlerOut::FindHolePunchEnr(relay_init)) .await { return Err(NatHolePunchError::Relay(e.into())); @@ -1593,11 +1583,11 @@ impl HolePunchNat for Handler { async fn on_relay_msg( &mut self, - initr: Enr, - timed_out_nonce: MessageNonce, + relay_msg: RelayMsgNotification, ) -> Result<(), NatHolePunchError> { + let (initr_enr, timed_out_msg_nonce) = relay_msg.into(); let initiator_node_address = - match NodeContact::try_from_enr(initr, self.nat_hole_puncher.ip_mode) { + match NodeContact::try_from_enr(initr_enr, self.nat_hole_puncher.ip_mode) { Ok(contact) => contact.node_address(), Err(e) => return Err(NatHolePunchError::Target(e.into())), }; @@ -1619,7 +1609,7 @@ impl HolePunchNat for Handler { // If not hole punch attempts are in progress, spawn a WHOAREYOU event to punch a hole in // our NAT for initiator. - let whoareyou_ref = WhoAreYouRef(initiator_node_address, timed_out_nonce); + let whoareyou_ref = WhoAreYouRef(initiator_node_address, timed_out_msg_nonce); self.send_challenge::

(whoareyou_ref, None).await; Ok(()) @@ -1628,7 +1618,7 @@ impl HolePunchNat for Handler { async fn send_relay_msg_notif( &mut self, tgt_enr: Enr, - relay_msg_notif: Notification, + relay_msg_notif: RelayMsgNotification, ) -> Result<(), NatHolePunchError> { let tgt_node_address = match NodeContact::try_from_enr(tgt_enr, self.nat_hole_puncher.ip_mode) { diff --git a/src/handler/nat_hole_punch/mod.rs b/src/handler/nat_hole_punch/mod.rs index f2cd009f4..1d1d26a14 100644 --- a/src/handler/nat_hole_punch/mod.rs +++ b/src/handler/nat_hole_punch/mod.rs @@ -1,9 +1,10 @@ use std::net::SocketAddr; -use enr::NodeId; - use crate::{ - node_info::NodeAddress, packet::MessageNonce, rpc::Notification, Enr, ProtocolIdentity, + node_info::NodeAddress, + packet::MessageNonce, + rpc::{RelayInitNotification, RelayMsgNotification}, + Enr, ProtocolIdentity, }; mod error; @@ -27,26 +28,20 @@ pub trait HolePunchNat { /// A RelayInit notification is received over discv5 indicating this node is the relay. Should /// trigger sending a RelayMsg to the target. - async fn on_relay_init( - &mut self, - initr: Enr, - tgt: NodeId, - timed_out_nonce: MessageNonce, - ) -> Result<(), Error>; + async fn on_relay_init(&mut self, relay_init: RelayInitNotification) -> Result<(), Error>; /// A RelayMsg notification is received over discv5 indicating this node is the target. Should /// trigger a WHOAREYOU to be sent to the initiator using the `nonce` in the RelayMsg. async fn on_relay_msg( &mut self, - initr: Enr, - timed_out_nonce: MessageNonce, + relay_msg: RelayMsgNotification, ) -> Result<(), Error>; /// Send a RELAYMSG notification. async fn send_relay_msg_notif( &mut self, tgt_enr: Enr, - relay_msg_notif: Notification, + relay_msg_notif: RelayMsgNotification, ) -> Result<(), Error>; /// A hole punched for a peer closes. Should trigger an empty packet to be sent to the diff --git a/src/handler/tests.rs b/src/handler/tests.rs index 850afe837..67d3b5f9c 100644 --- a/src/handler/tests.rs +++ b/src/handler/tests.rs @@ -550,8 +550,8 @@ async fn nat_hole_punch_relay() { let mock_service_handle = tokio::spawn(async move { let service_msg = rx.recv().await.expect("should receive service message"); match service_msg { - HandlerOut::FindHolePunchEnr(_tgt_node_id, relay_msg_notif) => tx - .send(HandlerIn::HolePunchEnr(tgt_enr_clone, relay_msg_notif)) + HandlerOut::FindHolePunchEnr(relay_init) => tx + .send(HandlerIn::HolePunchEnr(tgt_enr_clone, relay_init)) .expect("should send message to handler"), _ => panic!("service message should be 'find hole punch enr'"), } @@ -559,7 +559,7 @@ async fn nat_hole_punch_relay() { // Initiator handle let relay_init_notif = - Notification::RelayInit(initr_enr.clone(), tgt_node_id, MessageNonce::default()); + RelayInitNotification::new(initr_enr.clone(), tgt_node_id, MessageNonce::default()); let initr_handle = tokio::spawn(async move { let mut session = build_dummy_session(); @@ -622,7 +622,8 @@ async fn nat_hole_punch_relay() { .decrypt_message(message_nonce, &message, &aad) .expect("should decrypt message"); match Message::decode(&decrypted_message).expect("should decode message") { - Message::Notification(Notification::RelayMsg(enr, _nonce)) => { + Message::RelayMsgNotification(relay_msg) => { + let (enr, _) = relay_msg.into(); assert_eq!(initr_enr, enr) } _ => panic!("message should decode to a relay msg notification"), @@ -685,7 +686,7 @@ async fn nat_hole_punch_target() { let tgt_handle = tokio::spawn(async move { handler.start::().await }); // Relay handle - let relay_msg_notif = Notification::RelayMsg(initr_enr.clone(), initr_nonce); + let relay_msg_notif = RelayMsgNotification::new(initr_enr.clone(), initr_nonce); let relay_handle = tokio::spawn(async move { let mut session = build_dummy_session(); diff --git a/src/kbucket/entry.rs b/src/kbucket/entry.rs index 97be95de1..101d637c2 100644 --- a/src/kbucket/entry.rs +++ b/src/kbucket/entry.rs @@ -174,8 +174,8 @@ where PendingEntry(EntryRef { bucket, key }) } - /// Returns the value associated with the key. - pub fn value(&mut self) -> &mut TVal { + /// Returns mutable access value associated with the key. + pub fn value_mut(&mut self) -> &mut TVal { self.0 .bucket .pending_mut() diff --git a/src/rpc.rs b/src/rpc.rs index c26c690c2..974cb8d4b 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -6,7 +6,7 @@ mod notification; mod request; mod response; -pub use notification::Notification; +pub use notification::{RelayInitNotification, RelayMsgNotification}; pub use request::{Request, RequestBody, RequestId}; pub use response::{Response, ResponseBody}; @@ -60,12 +60,19 @@ pub enum Message { /// A request, which contains its [`RequestId`]. #[display(fmt = "{_0}")] Request(Request), + /// A Response, which contains the [`RequestId`] of its associated request. #[display(fmt = "{_0}")] Response(Response), - /// A unicast notification. + + /// Unicast notifications. + /// + /// A [`RelayInitNotification`]. #[display(fmt = "{_0}")] - Notification(Notification), + RelayInitNotification(RelayInitNotification), + /// A [`RelayMsgNotification`]. + #[display(fmt = "{_0}")] + RelayMsgNotification(RelayMsgNotification), } #[allow(dead_code)] @@ -74,7 +81,8 @@ impl Message { match self { Self::Request(request) => request.encode(), Self::Response(response) => response.encode(), - Self::Notification(notif) => notif.encode(), + Self::RelayInitNotification(notif) => notif.encode(), + Self::RelayMsgNotification(notif) => notif.encode(), } } @@ -93,17 +101,17 @@ impl Message { MessageType::Pong | MessageType::Nodes | MessageType::TalkResp => { Ok(Response::decode(msg_type, &rlp)?.into()) } - MessageType::RelayInit | MessageType::RelayMsg => { - Ok(Notification::decode(msg_type, &rlp)?.into()) - } + MessageType::RelayInit => Ok(RelayInitNotification::decode(msg_type, &rlp)?.into()), + MessageType::RelayMsg => Ok(RelayMsgNotification::decode(msg_type, &rlp)?.into()), } } pub fn msg_type(&self) -> String { match self { - Self::Notification(n) => format!("notification type {}", n.msg_type()), Self::Request(r) => format!("request type {}", r.msg_type()), Self::Response(r) => format!("response type {}", r.msg_type()), + Self::RelayInitNotification(n) => format!("notification type {}", n.msg_type()), + Self::RelayMsgNotification(n) => format!("notification type {}", n.msg_type()), } } } @@ -413,12 +421,13 @@ mod tests { let mut nonce = [0u8; MESSAGE_NONCE_LENGTH]; nonce[MESSAGE_NONCE_LENGTH - nonce_bytes.len()..].copy_from_slice(&nonce_bytes); - let notif = Message::Notification(Notification::RelayInit(inr_enr, tgt_node_id, nonce)); + let notif = RelayInitNotification::new(inr_enr, tgt_node_id, nonce); + let msg = Message::RelayInitNotification(notif); - let encoded_notif = notif.clone().encode(); - let decoded_notif = Message::decode(&encoded_notif).expect("Should decode"); + let encoded_msg = msg.clone().encode(); + let decoded_msg = Message::decode(&encoded_msg).expect("Should decode"); - assert_eq!(notif, decoded_notif); + assert_eq!(msg, decoded_msg); } #[test] @@ -432,11 +441,12 @@ mod tests { let mut nonce = [0u8; MESSAGE_NONCE_LENGTH]; nonce[MESSAGE_NONCE_LENGTH - nonce_bytes.len()..].copy_from_slice(&nonce_bytes); - let notif = Message::Notification(Notification::RelayMsg(inr_enr, nonce)); + let notif = RelayMsgNotification::new(inr_enr, nonce); + let msg = Message::RelayMsgNotification(notif); - let encoded_notif = notif.clone().encode(); - let decoded_notif = Message::decode(&encoded_notif).expect("Should decode"); + let encoded_msg = msg.clone().encode(); + let decoded_msg = Message::decode(&encoded_msg).expect("Should decode"); - assert_eq!(notif, decoded_notif); + assert_eq!(msg, decoded_msg); } } diff --git a/src/rpc/notification.rs b/src/rpc/notification.rs index dc29b2b22..cfae7d6b0 100644 --- a/src/rpc/notification.rs +++ b/src/rpc/notification.rs @@ -6,31 +6,41 @@ use crate::{ use derive_more::Display; use enr::NodeId; use rlp::{DecoderError, Rlp, RlpStream}; -use std::convert::TryInto; /// Nonce of request that triggered the initiation of this hole punching attempt. type NonceOfTimedOutMessage = MessageNonce; /// Node id length in bytes. pub const NODE_ID_LENGTH: usize = 32; -/// A unicast notification sent over discv5. +/// Unicast notifications [`RelayInitNotification`] and [`RelayMsgNotification`] sent over discv5. + +/// A notification to initialise a one-shot relay circuit for hole-punching. #[derive(Debug, Display, PartialEq, Eq, Clone)] -pub enum Notification { - /// A notification to initialise a one-shot relay circuit for hole-punching. - #[display(fmt = "Notification: RelayInit: Initiator: {_0}, Target: {_1}, Nonce: {_2:?}")] - RelayInit(Enr, NodeId, NonceOfTimedOutMessage), - /// The notification relayed to target of hole punch attempt. - #[display(fmt = "Notification: RelayMsg: Initiator: {_0}, Nonce: {_1:?}")] - RelayMsg(Enr, NonceOfTimedOutMessage), +#[display(fmt = "Notification: RelayInit: Initiator: {_0}, Target: {_1}, Nonce: {_2:?}")] +pub struct RelayInitNotification(Enr, NodeId, NonceOfTimedOutMessage); + +impl RelayInitNotification { + pub fn new( + initr_enr: Enr, + tgt_node_id: NodeId, + timed_out_msg_nonce: NonceOfTimedOutMessage, + ) -> Self { + Self(initr_enr, tgt_node_id, timed_out_msg_nonce) + } + + pub fn initiator_enr(&self) -> &Enr { + &self.0 + } + + pub fn target_node_id(&self) -> NodeId { + self.1 + } } -impl Payload for Notification { +impl Payload for RelayInitNotification { /// Matches a notification type to its message type id. fn msg_type(&self) -> u8 { - match self { - Self::RelayInit(..) => MessageType::RelayInit as u8, - Self::RelayMsg(..) => MessageType::RelayMsg as u8, - } + MessageType::RelayInit as u8 } /// Encodes a notification message to RLP-encoded bytes. @@ -39,71 +49,112 @@ impl Payload for Notification { let msg_type = self.msg_type(); buf.push(msg_type); let mut s = RlpStream::new(); - match self { - Self::RelayInit(initiator, target, nonce) => { - s.begin_list(3); - s.append(&initiator); - s.append(&(&target.raw() as &[u8])); - s.append(&(&nonce as &[u8])); - } - Self::RelayMsg(initiator, nonce) => { - s.begin_list(2); - s.append(&initiator); - s.append(&(&nonce as &[u8])); - } - } + let Self(initiator, target, nonce) = self; + + s.begin_list(3); + s.append(&initiator); + s.append(&(&target.raw() as &[u8])); + s.append(&(&nonce as &[u8])); + buf.extend_from_slice(&s.out()); buf } /// Decodes RLP-encoded bytes into a notification message. - fn decode(msg_type: u8, rlp: &Rlp<'_>) -> Result { - match msg_type.try_into()? { - MessageType::RelayInit => { - if rlp.item_count()? != 3 { - return Err(DecoderError::RlpIncorrectListLen); - } - let initiator = rlp.val_at::(0)?; - - let tgt_bytes = rlp.val_at::>(1)?; - if tgt_bytes.len() > NODE_ID_LENGTH { - return Err(DecoderError::RlpIsTooBig); - } - let mut tgt = [0u8; NODE_ID_LENGTH]; - tgt[NODE_ID_LENGTH - tgt_bytes.len()..].copy_from_slice(&tgt_bytes); - let tgt = NodeId::from(tgt); - - let nonce = { - let bytes = rlp.val_at::>(2)?; - if bytes.len() > MESSAGE_NONCE_LENGTH { - return Err(DecoderError::RlpIsTooBig); - } - let mut buf = [0u8; MESSAGE_NONCE_LENGTH]; - buf[MESSAGE_NONCE_LENGTH - bytes.len()..].copy_from_slice(&bytes); - buf - }; - - Ok(Notification::RelayInit(initiator, tgt, nonce)) - } - MessageType::RelayMsg => { - if rlp.item_count()? != 2 { - return Err(DecoderError::RlpIncorrectListLen); - } - let initiator = rlp.val_at::(0)?; - - let nonce = { - let bytes = rlp.val_at::>(1)?; - if bytes.len() > MESSAGE_NONCE_LENGTH { - return Err(DecoderError::RlpIsTooBig); - } - let mut buf = [0u8; MESSAGE_NONCE_LENGTH]; - buf[MESSAGE_NONCE_LENGTH - bytes.len()..].copy_from_slice(&bytes); - buf - }; - - Ok(Notification::RelayMsg(initiator, nonce)) + fn decode(_msg_type: u8, rlp: &Rlp<'_>) -> Result { + if rlp.item_count()? != 3 { + return Err(DecoderError::RlpIncorrectListLen); + } + let initiator = rlp.val_at::(0)?; + + let tgt_bytes = rlp.val_at::>(1)?; + if tgt_bytes.len() > NODE_ID_LENGTH { + return Err(DecoderError::RlpIsTooBig); + } + let mut tgt = [0u8; NODE_ID_LENGTH]; + tgt[NODE_ID_LENGTH - tgt_bytes.len()..].copy_from_slice(&tgt_bytes); + let tgt = NodeId::from(tgt); + + let nonce = { + let bytes = rlp.val_at::>(2)?; + if bytes.len() > MESSAGE_NONCE_LENGTH { + return Err(DecoderError::RlpIsTooBig); } - _ => unreachable!("Implementation does not adhere to wire protocol"), + let mut buf = [0u8; MESSAGE_NONCE_LENGTH]; + buf[MESSAGE_NONCE_LENGTH - bytes.len()..].copy_from_slice(&bytes); + buf + }; + + Ok(Self(initiator, tgt, nonce)) + } +} + +impl From for (Enr, NodeId, NonceOfTimedOutMessage) { + fn from(value: RelayInitNotification) -> Self { + let RelayInitNotification(initr_enr, tgt_node_id, timed_out_msg_nonce) = value; + + (initr_enr, tgt_node_id, timed_out_msg_nonce) + } +} + +/// The notification relayed to target of hole punch attempt. +#[derive(Debug, Display, PartialEq, Eq, Clone)] +#[display(fmt = "Notification: RelayMsg: Initiator: {_0}, Nonce: {_1:?}")] +pub struct RelayMsgNotification(Enr, NonceOfTimedOutMessage); + +impl RelayMsgNotification { + pub fn new(initr_enr: Enr, timed_out_msg_nonce: NonceOfTimedOutMessage) -> Self { + RelayMsgNotification(initr_enr, timed_out_msg_nonce) + } +} + +impl Payload for RelayMsgNotification { + /// Matches a notification type to its message type id. + fn msg_type(&self) -> u8 { + MessageType::RelayMsg as u8 + } + + /// Encodes a notification message to RLP-encoded bytes. + fn encode(self) -> Vec { + let mut buf = Vec::with_capacity(100); + let msg_type = self.msg_type(); + buf.push(msg_type); + let mut s = RlpStream::new(); + let Self(initiator, nonce) = self; + + s.begin_list(2); + s.append(&initiator); + s.append(&(&nonce as &[u8])); + + buf.extend_from_slice(&s.out()); + buf + } + + /// Decodes RLP-encoded bytes into a notification message. + fn decode(_msg_type: u8, rlp: &Rlp<'_>) -> Result { + if rlp.item_count()? != 2 { + return Err(DecoderError::RlpIncorrectListLen); } + let initiator = rlp.val_at::(0)?; + + let nonce = { + let bytes = rlp.val_at::>(1)?; + if bytes.len() > MESSAGE_NONCE_LENGTH { + return Err(DecoderError::RlpIsTooBig); + } + let mut buf = [0u8; MESSAGE_NONCE_LENGTH]; + buf[MESSAGE_NONCE_LENGTH - bytes.len()..].copy_from_slice(&bytes); + buf + }; + + Ok(Self(initiator, nonce)) + } +} + +impl From for (Enr, NonceOfTimedOutMessage) { + fn from(value: RelayMsgNotification) -> Self { + let RelayMsgNotification(initr_enr, timed_out_msg_nonce) = value; + + (initr_enr, timed_out_msg_nonce) } } diff --git a/src/service.rs b/src/service.rs index ea72e5339..0451ab0f5 100644 --- a/src/service.rs +++ b/src/service.rs @@ -409,19 +409,28 @@ impl Service { } self.rpc_failure(request_id, error); } - HandlerOut::FindHolePunchEnr(tgt_node_id, relay_msg_notif) => { - // check if we know this node id in our routing table, otherwise drop - // notification. - // todo(emhane): ban peers that ask us to relay to a peer we very - // unlikely could have sent to them in a NODES response. - let key = kbucket::Key::from(tgt_node_id); - if let kbucket::Entry::Present(entry, _) = self.kbuckets.write().entry(&key) { - let enr = entry.value().clone(); - if let Err(e) = self.handler_send.send(HandlerIn::HolePunchEnr(enr, relay_msg_notif)) { - warn!("Failed to send target enr to relay proccess, error: {}", e); + HandlerOut::FindHolePunchEnr(relay_init) => { + // check if we know the target node id in our routing table, otherwise + // drop relay attempt. + let tgt_node_id = relay_init.target_node_id(); + let tgt_key = kbucket::Key::from(tgt_node_id); + if let kbucket::Entry::Present(entry, _) = self.kbuckets.write().entry(&tgt_key) { + let tgt_enr = entry.value().clone(); + if let Err(e) = self.handler_send.send(HandlerIn::HolePunchEnr(tgt_enr, relay_init)) { + warn!( + "Failed to send target enr to relay process, error: {e}" + ); } } else { - warn!("Peer {tgt_node_id} requested relaying to a peer not in k-buckets, {relay_msg_notif}"); + // todo(emhane): ban peers that ask us to relay to a peer we very + // unlikely could have sent to them in a NODES response. + let inr_node_id = relay_init.initiator_enr().node_id(); + + warn!( + inr_node_id=%inr_node_id, + tgt_node_id=%tgt_node_id, + "Peer requested relaying to a peer not in k-buckets" + ); } } } @@ -596,8 +605,8 @@ impl Service { } } kbucket::Entry::Pending(ref mut entry, _) => { - if entry.value().seq() < enr_seq { - let enr = entry.value().clone(); + if entry.value_mut().seq() < enr_seq { + let enr = entry.value_mut().clone(); to_request_enr = Some(enr); } } @@ -855,9 +864,11 @@ impl Service { new_ip6, )); // Notify Handler of socket update - if let Err(e) = self - .handler_send - .send(HandlerIn::SocketUpdate(new_ip6)) + if let Err(e) = + self.handler_send.send(HandlerIn::SocketUpdate( + local_ip6_socket.map(SocketAddr::V6), + new_ip6, + )) { warn!("Failed to send socket update to handler: {}", e); }; @@ -881,9 +892,11 @@ impl Service { new_ip4, )); // Notify Handler of socket update - if let Err(e) = self - .handler_send - .send(HandlerIn::SocketUpdate(new_ip4)) + if let Err(e) = + self.handler_send.send(HandlerIn::SocketUpdate( + local_ip4_socket.map(SocketAddr::V4), + new_ip4, + )) { warn!("Failed to send socket update {}", e); }; @@ -1234,7 +1247,7 @@ impl Service { let must_update_enr = match self.kbuckets.write().entry(&key) { kbucket::Entry::Present(entry, _) => entry.value().seq() < enr.seq(), - kbucket::Entry::Pending(mut entry, _) => entry.value().seq() < enr.seq(), + kbucket::Entry::Pending(mut entry, _) => entry.value_mut().seq() < enr.seq(), _ => false, }; From 25077d4e919b9f5a061281f6d3c914811517a15e Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Sun, 14 Jan 2024 09:15:49 +0100 Subject: [PATCH 6/8] Reset commit b32e7507 and check sequence of initiator's enr against kbucket entry --- src/handler/mod.rs | 17 +++++++++++------ src/service.rs | 18 ++++++++++++++++++ 2 files changed, 29 insertions(+), 6 deletions(-) diff --git a/src/handler/mod.rs b/src/handler/mod.rs index c1c05fba9..05341669b 100644 --- a/src/handler/mod.rs +++ b/src/handler/mod.rs @@ -384,12 +384,17 @@ impl Handler { HandlerIn::SocketUpdate(old_socket, socket) => { let ip = socket.ip(); let port = socket.port(); - // This node goes from being unreachable to being reachable. - // Reasonably assuming all its peers are indexing sessions based on - // `node_id`, like this implementation, the first message sent in each - // session from here on will trigger a WHOAREYOU message from the peer - // (since the peer won't be able to find the decryption key for the - // session with the new node id as message's src id). + if old_socket.is_none() { + // This node goes from being unreachable to being reachable, but + // keeps the same enr key (hence same node id). Remove its + // sessions to trigger a WHOAREYOU from peers on next sent + // message. If the peer is running this implementation of + // discovery, this makes it possible for the local node to be + // inserted into its peers' kbuckets before the session they + // already had expires. Session duration, in this impl defaults to + // 24 hours. + self.sessions.cache.clear() + } self.nat_hole_puncher.set_is_behind_nat(self.listen_sockets.iter(), Some(ip), Some(port)); } } diff --git a/src/service.rs b/src/service.rs index 0451ab0f5..1bb246442 100644 --- a/src/service.rs +++ b/src/service.rs @@ -410,6 +410,24 @@ impl Service { self.rpc_failure(request_id, error); } HandlerOut::FindHolePunchEnr(relay_init) => { + // update initiator's enr if it's in kbuckets + let inr_enr = relay_init.initiator_enr(); + let inr_key = kbucket::Key::from(inr_enr.node_id()); + match self.kbuckets.write().entry(&inr_key) { + kbucket::Entry::Present(ref mut entry, _) => { + let enr = entry.value_mut(); + if enr.seq() < inr_enr.seq() { + *enr = inr_enr.clone(); + } + } + kbucket::Entry::Pending(ref mut entry, _) => { + let enr = entry.value_mut(); + if enr.seq() < inr_enr.seq() { + *enr = inr_enr.clone(); + } + } + _ => () + } // check if we know the target node id in our routing table, otherwise // drop relay attempt. let tgt_node_id = relay_init.target_node_id(); From 41f0ef64dbf6fa24745787d8e5ad889132d93ba5 Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Sun, 14 Jan 2024 09:16:30 +0100 Subject: [PATCH 7/8] Refactor initiator abbreviation --- src/handler/mod.rs | 16 +++++++------- src/handler/tests.rs | 52 ++++++++++++++++++++++---------------------- 2 files changed, 34 insertions(+), 34 deletions(-) diff --git a/src/handler/mod.rs b/src/handler/mod.rs index 05341669b..6c9f1e713 100644 --- a/src/handler/mod.rs +++ b/src/handler/mod.rs @@ -375,8 +375,8 @@ impl Handler { HandlerIn::WhoAreYou(wru_ref, enr) => self.send_challenge::

(wru_ref, enr).await, HandlerIn::HolePunchEnr(tgt_enr, relay_init) => { // Assemble the notification for the target - let (initr_enr, _tgt, timed_out_nonce) = relay_init.into(); - let relay_msg_notif = RelayMsgNotification::new(initr_enr, timed_out_nonce); + let (inr_enr, _tgt, timed_out_nonce) = relay_init.into(); + let relay_msg_notif = RelayMsgNotification::new(inr_enr, timed_out_nonce); if let Err(e) = self.send_relay_msg_notif::

(tgt_enr, relay_msg_notif).await { warn!("Failed to relay. Error: {}", e); } @@ -1094,9 +1094,9 @@ impl Handler { match message { Message::Response(response) => self.handle_response::

(node_address, response).await, Message::RelayInitNotification(notif) => { - let initr_node_id = notif.initiator_enr().node_id(); - if initr_node_id != node_address.node_id { - warn!("peer {node_address} tried to initiate hole punch attempt for another node {initr_node_id}, banning peer {node_address}"); + let inr_node_id = notif.initiator_enr().node_id(); + if inr_node_id != node_address.node_id { + warn!("peer {node_address} tried to initiate hole punch attempt for another node {inr_node_id}, banning peer {node_address}"); self.fail_session(&node_address, RequestError::MaliciousRelayInit, true) .await; let ban_timeout = self @@ -1111,7 +1111,7 @@ impl Handler { Message::RelayMsgNotification(notif) => { match self.nat_hole_puncher.is_behind_nat { Some(false) => { - // initr may not be malicious and initiated a hole punch attempt when + // inr may not be malicious and initiated a hole punch attempt when // a request to this node timed out for another reason debug!("peer {node_address} relayed a hole punch notification but we are not behind nat"); } @@ -1590,9 +1590,9 @@ impl HolePunchNat for Handler { &mut self, relay_msg: RelayMsgNotification, ) -> Result<(), NatHolePunchError> { - let (initr_enr, timed_out_msg_nonce) = relay_msg.into(); + let (inr_enr, timed_out_msg_nonce) = relay_msg.into(); let initiator_node_address = - match NodeContact::try_from_enr(initr_enr, self.nat_hole_puncher.ip_mode) { + match NodeContact::try_from_enr(inr_enr, self.nat_hole_puncher.ip_mode) { Ok(contact) => contact.node_address(), Err(e) => return Err(NatHolePunchError::Target(e.into())), }; diff --git a/src/handler/tests.rs b/src/handler/tests.rs index 67d3b5f9c..bc38ad694 100644 --- a/src/handler/tests.rs +++ b/src/handler/tests.rs @@ -497,7 +497,7 @@ async fn nat_hole_punch_relay() { let mut dummy_session = build_dummy_session(); // Initiator - let initr_enr = { + let inr_enr = { let key = CombinedKey::generate_secp256k1(); EnrBuilder::new("v4") .ip4(Ipv4Addr::LOCALHOST) @@ -505,16 +505,16 @@ async fn nat_hole_punch_relay() { .build(&key) .unwrap() }; - let initr_addr = initr_enr.udp4_socket().unwrap().into(); - let initr_node_id = initr_enr.node_id(); + let inr_addr = inr_enr.udp4_socket().unwrap().into(); + let inr_node_id = inr_enr.node_id(); - let initr_node_address = NodeAddress::new(initr_addr, initr_enr.node_id()); + let inr_node_address = NodeAddress::new(inr_addr, inr_enr.node_id()); handler .sessions .cache - .insert(initr_node_address, dummy_session.clone()); + .insert(inr_node_address, dummy_session.clone()); - let initr_socket = UdpSocket::bind(initr_addr) + let inr_socket = UdpSocket::bind(inr_addr) .await .expect("should bind to initiator socket"); @@ -559,16 +559,16 @@ async fn nat_hole_punch_relay() { // Initiator handle let relay_init_notif = - RelayInitNotification::new(initr_enr.clone(), tgt_node_id, MessageNonce::default()); + RelayInitNotification::new(inr_enr.clone(), tgt_node_id, MessageNonce::default()); - let initr_handle = tokio::spawn(async move { + let inr_handle = tokio::spawn(async move { let mut session = build_dummy_session(); let packet = session - .encrypt_session_message::(initr_node_id, &relay_init_notif.encode()) + .encrypt_session_message::(inr_node_id, &relay_init_notif.encode()) .expect("should encrypt notification"); let encoded_packet = packet.encode::(&relay_node_id); - initr_socket + inr_socket .send_to(&encoded_packet, relay_addr) .await .expect("should relay init notification to relay") @@ -589,10 +589,10 @@ async fn nat_hole_punch_relay() { }); // Join all handles - let (initr_res, relay_res, tgt_res, mock_service_res) = - tokio::join!(initr_handle, relay_handle, tgt_handle, mock_service_handle); + let (inr_res, relay_res, tgt_res, mock_service_res) = + tokio::join!(inr_handle, relay_handle, tgt_handle, mock_service_handle); - initr_res.unwrap(); + inr_res.unwrap(); relay_res.unwrap(); mock_service_res.unwrap(); @@ -624,7 +624,7 @@ async fn nat_hole_punch_relay() { match Message::decode(&decrypted_message).expect("should decode message") { Message::RelayMsgNotification(relay_msg) => { let (enr, _) = relay_msg.into(); - assert_eq!(initr_enr, enr) + assert_eq!(inr_enr, enr) } _ => panic!("message should decode to a relay msg notification"), } @@ -666,7 +666,7 @@ async fn nat_hole_punch_target() { .expect("should bind to target socket"); // Initiator - let initr_enr = { + let inr_enr = { let key = CombinedKey::generate_secp256k1(); EnrBuilder::new("v4") .ip4(Ipv4Addr::LOCALHOST) @@ -674,11 +674,11 @@ async fn nat_hole_punch_target() { .build(&key) .unwrap() }; - let initr_addr = initr_enr.udp4_socket().unwrap(); - let initr_node_id = initr_enr.node_id(); - let initr_nonce: MessageNonce = [1; MESSAGE_NONCE_LENGTH]; + let inr_addr = inr_enr.udp4_socket().unwrap(); + let inr_node_id = inr_enr.node_id(); + let inr_nonce: MessageNonce = [1; MESSAGE_NONCE_LENGTH]; - let initr_socket = UdpSocket::bind(initr_addr) + let inr_socket = UdpSocket::bind(inr_addr) .await .expect("should bind to initiator socket"); @@ -686,7 +686,7 @@ async fn nat_hole_punch_target() { let tgt_handle = tokio::spawn(async move { handler.start::().await }); // Relay handle - let relay_msg_notif = RelayMsgNotification::new(initr_enr.clone(), initr_nonce); + let relay_msg_notif = RelayMsgNotification::new(inr_enr.clone(), inr_nonce); let relay_handle = tokio::spawn(async move { let mut session = build_dummy_session(); @@ -703,9 +703,9 @@ async fn nat_hole_punch_target() { // Initiator handle let target_exit = mock_service.exit_tx; - let initr_handle = tokio::spawn(async move { + let inr_handle = tokio::spawn(async move { let mut buffer = [0; MAX_PACKET_SIZE]; - let res = initr_socket + let res = inr_socket .recv_from(&mut buffer) .await .expect("should read bytes from socket"); @@ -716,16 +716,16 @@ async fn nat_hole_punch_target() { }); // Join all handles - let (tgt_res, relay_res, initr_res) = tokio::join!(tgt_handle, relay_handle, initr_handle); + let (tgt_res, relay_res, inr_res) = tokio::join!(tgt_handle, relay_handle, inr_handle); tgt_res.unwrap(); relay_res.unwrap(); - let ((length, src), buffer) = initr_res.unwrap(); + let ((length, src), buffer) = inr_res.unwrap(); assert_eq!(src, tgt_addr); - let (packet, _aad) = Packet::decode::(&initr_node_id, &buffer[..length]) + let (packet, _aad) = Packet::decode::(&inr_node_id, &buffer[..length]) .expect("should decode packet"); let Packet { header, .. } = packet; let PacketHeader { @@ -735,5 +735,5 @@ async fn nat_hole_punch_target() { } = header; assert!(kind.is_whoareyou()); - assert_eq!(message_nonce, initr_nonce) + assert_eq!(message_nonce, inr_nonce) } From 61875972eeeb858b3c20a18032ada604d46fd9ee Mon Sep 17 00:00:00 2001 From: Emilia Hane Date: Mon, 15 Jan 2024 04:31:10 +0100 Subject: [PATCH 8/8] Trigger ping all peers on upgrade to reachable enr --- src/handler/mod.rs | 18 +++++++++++++++++- src/service.rs | 1 + 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/src/handler/mod.rs b/src/handler/mod.rs index 6c9f1e713..bcdf97042 100644 --- a/src/handler/mod.rs +++ b/src/handler/mod.rs @@ -150,6 +150,15 @@ pub enum HandlerOut { /// Look-up an ENR in k-buckets. Passes the node id of the peer to look up and the /// [`RelayMsgNotification`] we intend to send to it. FindHolePunchEnr(RelayInitNotification), + + /// Triggers a ping to all peers, outside of the regular ping interval. Needed to trigger + /// renewed session establishment after updating the local ENR from unreachable to reachable + /// and clearing all sessions. Only this way does the local node have a chance to make it into + /// its peers kbuckets before the session expires (defaults to 24 hours). This is the case + /// since its peers, running this implementation, will only respond to PINGs from nodes in its + /// kbucktes and unreachable ENRs don't make it into kbuckets upon [`HandlerOut::Established`] + /// event. + PingAllPeers, } /// How we connected to the node. @@ -393,7 +402,14 @@ impl Handler { // inserted into its peers' kbuckets before the session they // already had expires. Session duration, in this impl defaults to // 24 hours. - self.sessions.cache.clear() + self.sessions.cache.clear(); + if let Err(e) = self + .service_send + .send(HandlerOut::PingAllPeers) + .await + { + warn!("Failed to inform that request failed {}", e); + } } self.nat_hole_puncher.set_is_behind_nat(self.listen_sockets.iter(), Some(ip), Some(port)); } diff --git a/src/service.rs b/src/service.rs index 1bb246442..783d175ca 100644 --- a/src/service.rs +++ b/src/service.rs @@ -451,6 +451,7 @@ impl Service { ); } } + HandlerOut::PingAllPeers => self.ping_connected_peers() } } event = Service::bucket_maintenance_poll(&self.kbuckets) => {