diff --git a/crates/networking/p2p/Cargo.toml b/crates/networking/p2p/Cargo.toml index 128cff7e7..55661aa8b 100644 --- a/crates/networking/p2p/Cargo.toml +++ b/crates/networking/p2p/Cargo.toml @@ -16,6 +16,7 @@ bytes.workspace = true hex.workspace = true thiserror.workspace = true lazy_static.workspace = true +snap.workspace = true k256 = { version = "0.13.3", features = ["ecdh"] } sha3 = "0.10.8" @@ -26,7 +27,6 @@ hmac = "0.12.1" aes = "0.8.4" ctr = "0.9.2" rand = "0.8.5" -snap = "1.1.1" [dev-dependencies] hex-literal = "0.4.1" diff --git a/crates/networking/p2p/net.rs b/crates/networking/p2p/net.rs index b37f6650c..9b027fa92 100644 --- a/crates/networking/p2p/net.rs +++ b/crates/networking/p2p/net.rs @@ -18,7 +18,7 @@ use k256::{ }; use kademlia::{bucket_number, KademliaTable, MAX_NODES_PER_BUCKET}; use rand::rngs::OsRng; -use rlpx::{connection::RLPxConnection, message::Message as RLPxMessage}; +use rlpx::{connection::RLPxConnection, error::RLPxError, message::Message as RLPxMessage}; use tokio::{ net::{TcpSocket, TcpStream, UdpSocket}, sync::{broadcast, Mutex}, @@ -809,23 +809,32 @@ async fn handle_peer_as_initiator( } async fn handle_peer(mut conn: RLPxConnection, table: Arc>) { - match conn.handshake().await { - Ok(_) => match conn.handle_peer().await { - Ok(_) => unreachable!(), - Err(e) => info!("Error during RLPx connection: ({e})"), - }, - Err(e) => { - if let Ok(node_id) = conn.get_remote_node_id() { - // Discard peer from kademlia table - info!("Handshake failed: ({e}), discarding peer {node_id}"); - table.lock().await.replace_peer(node_id); - } else { - info!("Handshake failed: ({e}), unknown peer"); - } + // Perform handshake + if let Err(e) = conn.handshake().await { + peer_conn_failed("Handshake failed", e, conn, table).await; + } else { + // Handshake OK: handle connection + if let Err(e) = conn.handle_peer_conn().await { + peer_conn_failed("Error during RLPx connection", e, conn, table).await; } } } +async fn peer_conn_failed( + error_text: &str, + error: RLPxError, + conn: RLPxConnection, + table: Arc>, +) { + if let Ok(node_id) = conn.get_remote_node_id() { + // Discard peer from kademlia table + info!("{error_text}: ({error}), discarding peer {node_id}"); + table.lock().await.replace_peer(node_id); + } else { + info!("{error_text}: ({error}), unknown peer") + } +} + pub fn node_id_from_signing_key(signer: &SigningKey) -> H512 { let public_key = PublicKey::from(signer.verifying_key()); let encoded = public_key.to_encoded_point(false); diff --git a/crates/networking/p2p/rlpx/connection.rs b/crates/networking/p2p/rlpx/connection.rs index 0ebdf1118..b319f09c8 100644 --- a/crates/networking/p2p/rlpx/connection.rs +++ b/crates/networking/p2p/rlpx/connection.rs @@ -9,7 +9,7 @@ use crate::{ }, handshake::encode_ack_message, message::Message, - p2p, + p2p::{self, PingMessage, PongMessage}, utils::id2pubkey, }, snap::{ @@ -39,13 +39,15 @@ use sha3::{Digest, Keccak256}; use tokio::{ io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}, sync::broadcast::{self, error::RecvError}, - task::Id, + task, + time::{sleep, Instant}, }; use tracing::{error, info}; const CAP_P2P: (Capability, u8) = (Capability::P2p, 5); const CAP_ETH: (Capability, u8) = (Capability::Eth, 68); const CAP_SNAP: (Capability, u8) = (Capability::Snap, 1); const SUPPORTED_CAPABILITIES: [(Capability, u8); 3] = [CAP_P2P, CAP_ETH, CAP_SNAP]; +const PERIODIC_TASKS_CHECK_INTERVAL: std::time::Duration = std::time::Duration::from_secs(15); pub(crate) type Aes256Ctr64BE = ctr::Ctr64BE; @@ -56,6 +58,7 @@ pub(crate) struct RLPxConnection { stream: S, storage: Store, capabilities: Vec<(Capability, u8)>, + next_periodic_task_check: Instant, /// Send end of the channel used to broadcast messages /// to other connected peers, is ok to have it here, /// since internally it's an Arc. @@ -64,7 +67,7 @@ pub(crate) struct RLPxConnection { /// messages from other connections (sent from other peers). /// The receive end is instantiated after the handshake is completed /// under `handle_peer`. - connection_broadcast_send: broadcast::Sender<(tokio::task::Id, Arc)>, + connection_broadcast_send: broadcast::Sender<(task::Id, Arc)>, } impl RLPxConnection { @@ -73,7 +76,7 @@ impl RLPxConnection { stream: S, state: RLPxConnectionState, storage: Store, - connection_broadcast: broadcast::Sender<(tokio::task::Id, Arc)>, + connection_broadcast: broadcast::Sender<(task::Id, Arc)>, ) -> Self { Self { signer, @@ -81,7 +84,8 @@ impl RLPxConnection { stream, storage, capabilities: vec![], - connection_broadcast_send: connection_broadcast, + next_periodic_task_check: Instant::now() + PERIODIC_TASKS_CHECK_INTERVAL, + connection_broadcast_send: connection_broadcast.clone(), } } @@ -89,7 +93,7 @@ impl RLPxConnection { signer: SigningKey, stream: S, storage: Store, - connection_broadcast: broadcast::Sender<(tokio::task::Id, Arc)>, + connection_broadcast: broadcast::Sender<(task::Id, Arc)>, ) -> Self { let mut rng = rand::thread_rng(); Self::new( @@ -109,7 +113,7 @@ impl RLPxConnection { msg: &[u8], stream: S, storage: Store, - connection_broadcast_send: broadcast::Sender<(tokio::task::Id, Arc)>, + connection_broadcast_send: broadcast::Sender<(task::Id, Arc)>, ) -> Result { let mut rng = rand::thread_rng(); let digest = Keccak256::digest(msg.get(65..).ok_or(RLPxError::InvalidMessageLength())?); @@ -166,135 +170,74 @@ impl RLPxConnection { self.send(hello_msg).await?; // Receive Hello message - match self.receive().await? { - Message::Hello(hello_message) => { - self.capabilities = hello_message.capabilities; - - // Check if we have any capability in common - for cap in self.capabilities.clone() { - if SUPPORTED_CAPABILITIES.contains(&cap) { - return Ok(()); - } + if let Message::Hello(hello_message) = self.receive().await? { + self.capabilities = hello_message.capabilities; + + // Check if we have any capability in common + for cap in self.capabilities.clone() { + if SUPPORTED_CAPABILITIES.contains(&cap) { + return Ok(()); } - // Return error if not - Err(RLPxError::HandshakeError( - "No matching capabilities".to_string(), - )) - } - _ => { - // Fail if it is not a hello message - Err(RLPxError::HandshakeError( - "Expected Hello message".to_string(), - )) } + // Return error if not + Err(RLPxError::HandshakeError( + "No matching capabilities".to_string(), + )) + } else { + // Fail if it is not a hello message + Err(RLPxError::HandshakeError( + "Expected Hello message".to_string(), + )) } } - pub async fn handle_peer(&mut self) -> Result<(), RLPxError> { - self.start_capabilities().await?; - match &self.state { - RLPxConnectionState::Established(_) => { - info!("Started peer main loop"); - // Wait for eth status message or timeout. - let mut broadcaster_receive = { - if self.capabilities.contains(&CAP_ETH) { - Some(self.connection_broadcast_send.subscribe()) - } else { - None + pub async fn handle_peer_conn(&mut self) -> Result<(), RLPxError> { + if let RLPxConnectionState::Established(_) = &self.state { + self.init_peer_conn().await?; + info!("Started peer main loop"); + // Wait for eth status message or timeout. + let mut broadcaster_receive = { + if self.capabilities.contains(&CAP_ETH) { + Some(self.connection_broadcast_send.subscribe()) + } else { + None + } + }; + + // Status message received, start listening for connections, + // and subscribe this connection to the broadcasting. + loop { + tokio::select! { + // TODO check if this is cancel safe, and fix it if not. + message = self.receive() => { + self.handle_message(message?).await?; } - }; - // Status message received, start listening for connections, - // and subscribe this connection to the broadcasting. - loop { - let peer_supports_eth = self.capabilities.contains(&CAP_ETH); - tokio::select! { - received_msg = self.receive() => { - match received_msg? { - Message::Disconnect(_) => info!("Received Disconnect"), - Message::Ping(_) => info!("Received Ping"), - Message::Pong(_) => info!("Received Pong"), - Message::Status(_) if !peer_supports_eth => { - info!("Received Status"); - // TODO: Check peer's status message. - broadcaster_receive = Some(self.connection_broadcast_send.subscribe()); - self.capabilities.push(CAP_ETH); - } - // TODO: implement handlers for each message type - Message::GetAccountRange(req) => { - let response = - process_account_range_request(req, self.storage.clone())?; - self.send(Message::AccountRange(response)).await? - } - // TODO(#1129) Add the transaction to the mempool once received. - txs_msg @ Message::Transactions(_) if peer_supports_eth => { - self.broadcast_message(txs_msg).await?; - } - Message::GetBlockHeaders(msg_data) if peer_supports_eth => { - let response = BlockHeaders { - id: msg_data.id, - block_headers: msg_data.fetch_headers(&self.storage), - }; - self.send(Message::BlockHeaders(response)).await?; - } - Message::GetBlockBodies(msg_data) if peer_supports_eth => { - let response = BlockBodies { - id: msg_data.id, - block_bodies: msg_data.fetch_blocks(&self.storage), - }; - self.send(Message::BlockBodies(response)).await?; - } - Message::GetStorageRanges(req) => { - let response = - process_storage_ranges_request(req, self.storage.clone())?; - self.send(Message::StorageRanges(response)).await? - } - Message::GetByteCodes(req) => { - let response = process_byte_codes_request(req, self.storage.clone())?; - self.send(Message::ByteCodes(response)).await? - } - Message::GetTrieNodes(req) => { - let response = process_trie_nodes_request(req, self.storage.clone())?; - self.send(Message::TrieNodes(response)).await? - } - // TODO: Add new message types and handlers as they are implemented - message => return Err(RLPxError::MessageNotHandled(format!("{message}"))), - } - } - // This is not ideal, but using the receiver without - // this function call, causes the loop to take ownwership - // of the variable and the compiler will complain about it, - // with this function, we avoid that. - // If the broadcaster is Some (i.e. we're connected to a peer that supports an eth protocol), - // we'll receive broadcasted messages from another connections through a channel, otherwise - // the function below will yield immediately but the select will not match and - // ignore the returned value. - Some(Ok((id, broadcasted_msg))) = Self::maybe_wait_for_broadcaster(&mut broadcaster_receive) => { - if id != tokio::task::id() { - match broadcasted_msg.as_ref() { - Message::Transactions(ref txs) => { - // TODO(#1131): Avoid cloning this vector. - let cloned = txs.transactions.clone(); - let new_msg = Message::Transactions(Transactions { transactions: cloned }); - self.send(new_msg).await?; - } - msg => { - error!("Unsupported message was broadcasted: {msg}"); - return Err(RLPxError::BroadcastError(format!("Non-supported message broadcasted {}", msg))) - } - } - - } - } + // This is not ideal, but using the receiver without + // this function call, causes the loop to take ownwership + // of the variable and the compiler will complain about it, + // with this function, we avoid that. + // If the broadcaster is Some (i.e. we're connected to a peer that supports an eth protocol), + // we'll receive broadcasted messages from another connections through a channel, otherwise + // the function below will yield immediately but the select will not match and + // ignore the returned value. + Some(broadcasted_msg) = Self::maybe_wait_for_broadcaster(&mut broadcaster_receive) => { + self.handle_broadcast(broadcasted_msg?).await? + } + _ = sleep(PERIODIC_TASKS_CHECK_INTERVAL) => { + // no progress on other tasks, yield control to check + // periodic tasks } } + self.check_periodic_tasks().await?; } - _ => Err(RLPxError::InvalidState()), + } else { + Err(RLPxError::InvalidState()) } } async fn maybe_wait_for_broadcaster( - receiver: &mut Option)>>, - ) -> Option), RecvError>> { + receiver: &mut Option)>>, + ) -> Option), RecvError>> { match receiver { None => None, Some(rec) => Some(rec.recv().await), @@ -302,15 +245,114 @@ impl RLPxConnection { } pub fn get_remote_node_id(&self) -> Result { - match &self.state { - RLPxConnectionState::Established(state) => Ok(state.remote_node_id), - _ => Err(RLPxError::InvalidState()), + if let RLPxConnectionState::Established(state) = &self.state { + Ok(state.remote_node_id) + } else { + Err(RLPxError::InvalidState()) + } + } + + async fn check_periodic_tasks(&mut self) -> Result<(), RLPxError> { + if Instant::now() >= self.next_periodic_task_check { + self.send(Message::Ping(PingMessage {})).await?; + info!("Ping sent"); + self.next_periodic_task_check = Instant::now() + PERIODIC_TASKS_CHECK_INTERVAL; + }; + Ok(()) + } + + async fn handle_message(&mut self, message: Message) -> Result<(), RLPxError> { + let peer_supports_eth = self.capabilities.contains(&CAP_ETH); + match message { + Message::Disconnect(msg_data) => { + info!("Received Disconnect: {:?}", msg_data.reason); + // Returning a Disonnect error to be handled later at the call stack + return Err(RLPxError::Disconnect()); + } + Message::Ping(_) => { + info!("Received Ping"); + self.send(Message::Pong(PongMessage {})).await?; + info!("Pong sent"); + } + Message::Pong(_) => { + // We ignore received Pong messages + } + // Implmenent Status vaidations + // https://github.com/lambdaclass/lambda_ethereum_rust/issues/420 + Message::Status(_) if !peer_supports_eth => { + info!("Received Status"); + // TODO: Check peer's status message. + } + // TODO: implement handlers for each message type + Message::GetAccountRange(req) => { + let response = process_account_range_request(req, self.storage.clone())?; + self.send(Message::AccountRange(response)).await? + } + // TODO(#1129) Add the transaction to the mempool once received. + txs_msg @ Message::Transactions(_) if peer_supports_eth => { + self.broadcast_message(txs_msg).await?; + } + Message::GetBlockHeaders(msg_data) if peer_supports_eth => { + let response = BlockHeaders { + id: msg_data.id, + block_headers: msg_data.fetch_headers(&self.storage), + }; + self.send(Message::BlockHeaders(response)).await?; + } + Message::GetBlockBodies(msg_data) if peer_supports_eth => { + let response = BlockBodies { + id: msg_data.id, + block_bodies: msg_data.fetch_blocks(&self.storage), + }; + self.send(Message::BlockBodies(response)).await?; + } + Message::GetStorageRanges(req) => { + let response = process_storage_ranges_request(req, self.storage.clone())?; + self.send(Message::StorageRanges(response)).await? + } + Message::GetByteCodes(req) => { + let response = process_byte_codes_request(req, self.storage.clone())?; + self.send(Message::ByteCodes(response)).await? + } + Message::GetTrieNodes(req) => { + let response = process_trie_nodes_request(req, self.storage.clone())?; + self.send(Message::TrieNodes(response)).await? + } + // TODO: Add new message types and handlers as they are implemented + message => return Err(RLPxError::MessageNotHandled(format!("{message}"))), + }; + Ok(()) + } + + async fn handle_broadcast( + &mut self, + (id, broadcasted_msg): (task::Id, Arc), + ) -> Result<(), RLPxError> { + if id != tokio::task::id() { + match broadcasted_msg.as_ref() { + Message::Transactions(ref txs) => { + // TODO(#1131): Avoid cloning this vector. + let cloned = txs.transactions.clone(); + let new_msg = Message::Transactions(Transactions { + transactions: cloned, + }); + self.send(new_msg).await?; + } + msg => { + error!("Unsupported message was broadcasted: {msg}"); + return Err(RLPxError::BroadcastError(format!( + "Non-supported message broadcasted {}", + msg + ))); + } + } } + Ok(()) } - async fn start_capabilities(&mut self) -> Result<(), RLPxError> { + async fn init_peer_conn(&mut self) -> Result<(), RLPxError> { // Sending eth Status if peer supports it - if let Some(cap_index) = self.capabilities.iter().position(|cap| cap == &CAP_ETH) { + if self.capabilities.contains(&CAP_ETH) { let status = backend::get_status(&self.storage)?; self.send(Message::Status(status)).await?; // The next immediate message in the ETH protocol is the @@ -323,11 +365,10 @@ impl RLPxConnection { Message::Status(_) => { // TODO: Check message status is correct. } - msg => { - error!( - "Peer established eth capability but sent message: {msg} instead of status" - ); - self.capabilities.remove(cap_index); + _msg => { + return Err(RLPxError::HandshakeError( + "Expected a Status message".to_string(), + )) } } } @@ -336,117 +377,111 @@ impl RLPxConnection { } async fn send_auth(&mut self) -> Result<(), RLPxError> { - match &self.state { - RLPxConnectionState::Initiator(initiator_state) => { - let secret_key: SecretKey = self.signer.clone().into(); - let peer_pk = - id2pubkey(initiator_state.remote_node_id).ok_or(RLPxError::InvalidPeerId())?; - - // Clonning previous state to avoid ownership issues - let previous_state = initiator_state.clone(); - - let msg = encode_auth_message( - &secret_key, - previous_state.nonce, - &peer_pk, - &previous_state.ephemeral_key, - )?; - - self.send_handshake_msg(&msg).await?; - - self.state = - RLPxConnectionState::InitiatedAuth(InitiatedAuth::new(previous_state, msg)); - Ok(()) - } - _ => Err(RLPxError::InvalidState()), + if let RLPxConnectionState::Initiator(initiator_state) = &self.state { + let secret_key: SecretKey = self.signer.clone().into(); + let peer_pk = + id2pubkey(initiator_state.remote_node_id).ok_or(RLPxError::InvalidPeerId())?; + + // Clonning previous state to avoid ownership issues + let previous_state = initiator_state.clone(); + + let msg = encode_auth_message( + &secret_key, + previous_state.nonce, + &peer_pk, + &previous_state.ephemeral_key, + )?; + + self.send_handshake_msg(&msg).await?; + + self.state = + RLPxConnectionState::InitiatedAuth(InitiatedAuth::new(previous_state, msg)); + Ok(()) + } else { + Err(RLPxError::InvalidState()) } } async fn send_ack(&mut self) -> Result<(), RLPxError> { - match &self.state { - RLPxConnectionState::ReceivedAuth(received_auth_state) => { - let peer_pk = id2pubkey(received_auth_state.remote_node_id) - .ok_or(RLPxError::InvalidPeerId())?; - - // Clonning previous state to avoid ownership issues - let previous_state = received_auth_state.clone(); - - let msg = encode_ack_message( - &previous_state.local_ephemeral_key, - previous_state.local_nonce, - &peer_pk, - )?; - - self.send_handshake_msg(&msg).await?; - - self.state = RLPxConnectionState::Established(Box::new(Established::for_receiver( - previous_state, - msg, - ))); - Ok(()) - } - _ => Err(RLPxError::InvalidState()), + if let RLPxConnectionState::ReceivedAuth(received_auth_state) = &self.state { + let peer_pk = + id2pubkey(received_auth_state.remote_node_id).ok_or(RLPxError::InvalidPeerId())?; + + // Clonning previous state to avoid ownership issues + let previous_state = received_auth_state.clone(); + + let msg = encode_ack_message( + &previous_state.local_ephemeral_key, + previous_state.local_nonce, + &peer_pk, + )?; + + self.send_handshake_msg(&msg).await?; + + self.state = RLPxConnectionState::Established(Box::new(Established::for_receiver( + previous_state, + msg, + ))); + Ok(()) + } else { + Err(RLPxError::InvalidState()) } } async fn receive_auth(&mut self) -> Result<(), RLPxError> { - match &self.state { - RLPxConnectionState::Receiver(receiver_state) => { - let secret_key: SecretKey = self.signer.clone().into(); - // Clonning previous state to avoid ownership issues - let previous_state = receiver_state.clone(); - let msg_bytes = self.receive_handshake_msg().await?; - let size_data = &msg_bytes - .get(..2) - .ok_or(RLPxError::InvalidMessageLength())?; - let msg = &msg_bytes - .get(2..) - .ok_or(RLPxError::InvalidMessageLength())?; - let (auth, remote_ephemeral_key) = - decode_auth_message(&secret_key, msg, size_data)?; - - // Build next state - self.state = RLPxConnectionState::ReceivedAuth(ReceivedAuth::new( - previous_state, - auth.node_id, - msg_bytes.to_owned(), - auth.nonce, - remote_ephemeral_key, - )); - Ok(()) - } - _ => Err(RLPxError::InvalidState()), + if let RLPxConnectionState::Receiver(receiver_state) = &self.state { + let secret_key: SecretKey = self.signer.clone().into(); + // Clonning previous state to avoid ownership issues + let previous_state = receiver_state.clone(); + let msg_bytes = self.receive_handshake_msg().await?; + let size_data = &msg_bytes + .get(..2) + .ok_or(RLPxError::InvalidMessageLength())?; + let msg = &msg_bytes + .get(2..) + .ok_or(RLPxError::InvalidMessageLength())?; + let (auth, remote_ephemeral_key) = decode_auth_message(&secret_key, msg, size_data)?; + + // Build next state + self.state = RLPxConnectionState::ReceivedAuth(ReceivedAuth::new( + previous_state, + auth.node_id, + msg_bytes.to_owned(), + auth.nonce, + remote_ephemeral_key, + )); + Ok(()) + } else { + Err(RLPxError::InvalidState()) } } async fn receive_ack(&mut self) -> Result<(), RLPxError> { - match &self.state { - RLPxConnectionState::InitiatedAuth(initiated_auth_state) => { - let secret_key: SecretKey = self.signer.clone().into(); - // Clonning previous state to avoid ownership issues - let previous_state = initiated_auth_state.clone(); - let msg_bytes = self.receive_handshake_msg().await?; - let size_data = &msg_bytes - .get(..2) - .ok_or(RLPxError::InvalidMessageLength())?; - let msg = &msg_bytes - .get(2..) - .ok_or(RLPxError::InvalidMessageLength())?; - let ack = decode_ack_message(&secret_key, msg, size_data)?; - let remote_ephemeral_key = ack - .get_ephemeral_pubkey() - .ok_or(RLPxError::NotFound("Remote ephemeral key".to_string()))?; - // Build next state - self.state = - RLPxConnectionState::Established(Box::new(Established::for_initiator( - previous_state, - msg_bytes.to_owned(), - ack.nonce, - remote_ephemeral_key, - ))); - Ok(()) - } - _ => Err(RLPxError::InvalidState()), + if let RLPxConnectionState::InitiatedAuth(initiated_auth_state) = &self.state { + let secret_key: SecretKey = self.signer.clone().into(); + // Clonning previous state to avoid ownership issues + let previous_state = initiated_auth_state.clone(); + let msg_bytes = self.receive_handshake_msg().await?; + let size_data = &msg_bytes + .get(..2) + .ok_or(RLPxError::InvalidMessageLength())?; + let msg = &msg_bytes + .get(2..) + .ok_or(RLPxError::InvalidMessageLength())?; + let ack = decode_ack_message(&secret_key, msg, size_data)?; + let remote_ephemeral_key = ack + .get_ephemeral_pubkey() + .ok_or(RLPxError::NotFound("Remote ephemeral key".to_string()))?; + // Build next state + self.state = RLPxConnectionState::Established(Box::new(Established::for_initiator( + previous_state, + msg_bytes.to_owned(), + ack.nonce, + remote_ephemeral_key, + ))); + Ok(()) + } else { + Err(RLPxError::InvalidState()) } } @@ -479,25 +514,23 @@ impl RLPxConnection { } async fn send(&mut self, message: rlpx::Message) -> Result<(), RLPxError> { - match &mut self.state { - RLPxConnectionState::Established(state) => { - let mut frame_buffer = vec![]; - message.encode(&mut frame_buffer)?; - frame::write(frame_buffer, state, &mut self.stream).await?; - Ok(()) - } - _ => Err(RLPxError::InvalidState()), + if let RLPxConnectionState::Established(state) = &mut self.state { + let mut frame_buffer = vec![]; + message.encode(&mut frame_buffer)?; + frame::write(frame_buffer, state, &mut self.stream).await?; + Ok(()) + } else { + Err(RLPxError::InvalidState()) } } async fn receive(&mut self) -> Result { - match &mut self.state { - RLPxConnectionState::Established(state) => { - let frame_data = frame::read(state, &mut self.stream).await?; - let (msg_id, msg_data): (u8, _) = RLPDecode::decode_unfinished(&frame_data)?; - Ok(rlpx::Message::decode(msg_id, msg_data)?) - } - _ => Err(RLPxError::InvalidState()), + if let RLPxConnectionState::Established(state) = &mut self.state { + let frame_data = frame::read(state, &mut self.stream).await?; + let (msg_id, msg_data): (u8, _) = RLPDecode::decode_unfinished(&frame_data)?; + Ok(rlpx::Message::decode(msg_id, msg_data)?) + } else { + Err(RLPxError::InvalidState()) } } diff --git a/crates/networking/p2p/rlpx/error.rs b/crates/networking/p2p/rlpx/error.rs index d87019432..a3b6efa46 100644 --- a/crates/networking/p2p/rlpx/error.rs +++ b/crates/networking/p2p/rlpx/error.rs @@ -1,6 +1,7 @@ use ethereum_rust_rlp::error::{RLPDecodeError, RLPEncodeError}; use ethereum_rust_storage::error::StoreError; use thiserror::Error; +use tokio::sync::broadcast::error::RecvError; // TODO improve errors #[derive(Debug, Error)] @@ -11,6 +12,8 @@ pub(crate) enum RLPxError { ConnectionError(String), #[error("Invalid connection state")] InvalidState(), + #[error("Disconnect received")] + Disconnect(), #[error("Not Found: {0}")] NotFound(String), #[error("Invalid peer id")] @@ -33,6 +36,8 @@ pub(crate) enum RLPxError { CryptographyError(String), #[error("Failed to broadcast msg: {0}")] BroadcastError(String), + #[error(transparent)] + RecvError(#[from] RecvError), } // Grouping all cryptographic related errors in a single CryptographicError variant diff --git a/crates/networking/p2p/rlpx/p2p.rs b/crates/networking/p2p/rlpx/p2p.rs index 41f7f6432..70573c021 100644 --- a/crates/networking/p2p/rlpx/p2p.rs +++ b/crates/networking/p2p/rlpx/p2p.rs @@ -102,7 +102,7 @@ impl RLPxMessage for HelloMessage { #[derive(Debug)] pub(crate) struct DisconnectMessage { - reason: Option, + pub(crate) reason: Option, } impl DisconnectMessage {