Skip to content

Commit

Permalink
List seller test timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
rishflab committed Oct 12, 2021
1 parent b9d2e9c commit 036dff3
Show file tree
Hide file tree
Showing 14 changed files with 249 additions and 214 deletions.
303 changes: 150 additions & 153 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion swap/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ ed25519-dalek = "1"
futures = { version = "0.3", default-features = false }
hex = "0.4"
itertools = "0.10"
libp2p = { git = "https://github.com/comit-network/rust-libp2p", branch = "rendezvous", default-features = false, features = [ "tcp-tokio", "yamux", "mplex", "dns-tokio", "noise", "request-response", "websocket", "ping", "rendezvous" ] }
libp2p = { git = "https://github.com/libp2p/rust-libp2p.git", revision = "7718d1de387c63786cffdcbfcfdf1c4df40bc21a", default-features = false, features = [ "tcp-tokio", "yamux", "mplex", "dns-tokio", "noise", "request-response", "websocket", "ping", "rendezvous" ] }
monero = { version = "0.12", features = [ "serde_support" ] }
monero-rpc = { path = "../monero-rpc" }
pem = "1.0"
Expand Down
8 changes: 4 additions & 4 deletions swap/src/asb/event_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,11 +249,11 @@ where
channel
}.boxed());
}
SwarmEvent::Behaviour(OutEvent::Rendezvous(libp2p::rendezvous::Event::Registered { .. })) => {
SwarmEvent::Behaviour(OutEvent::Rendezvous(libp2p::rendezvous::client::Event::Registered { .. })) => {
tracing::info!("Successfully registered with rendezvous node");
}
SwarmEvent::Behaviour(OutEvent::Rendezvous(libp2p::rendezvous::Event::RegisterFailed(error))) => {
tracing::error!("Registration with rendezvous node failed: {:#}", error);
SwarmEvent::Behaviour(OutEvent::Rendezvous(libp2p::rendezvous::client::Event::RegisterFailed(error))) => {
tracing::error!("Registration with rendezvous node failed: {:?}", error);
}
SwarmEvent::Behaviour(OutEvent::Failure {peer, error}) => {
tracing::error!(
Expand Down Expand Up @@ -281,7 +281,7 @@ where
SwarmEvent::ConnectionClosed { peer_id: peer, num_established, endpoint, cause: None } if num_established == 0 => {
tracing::info!(%peer, address = %endpoint.get_remote_address(), "Successfully closed connection");
}
SwarmEvent::NewListenAddr(address) => {
SwarmEvent::NewListenAddr{address, ..} => {
tracing::info!(%address, "New listen address reported");
}
_ => {}
Expand Down
43 changes: 22 additions & 21 deletions swap/src/asb/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ pub mod behaviour {
channel: ResponseChannel<()>,
peer: PeerId,
},
Rendezvous(libp2p::rendezvous::Event),
Rendezvous(libp2p::rendezvous::client::Event),
Failure {
peer: PeerId,
error: Error,
Expand Down Expand Up @@ -163,8 +163,8 @@ pub mod behaviour {
}
}

impl From<libp2p::rendezvous::Event> for OutEvent {
fn from(event: libp2p::rendezvous::Event) -> Self {
impl From<libp2p::rendezvous::client::Event> for OutEvent {
fn from(event: libp2p::rendezvous::client::Event) -> Self {
OutEvent::Rendezvous(event)
}
}
Expand All @@ -173,6 +173,7 @@ pub mod behaviour {
pub mod rendezous {
use super::*;
use std::pin::Pin;
use libp2p::swarm::DialError;

#[derive(PartialEq)]
enum ConnectionStatus {
Expand All @@ -190,7 +191,7 @@ pub mod rendezous {
}

pub struct Behaviour {
inner: libp2p::rendezvous::Rendezvous,
inner: libp2p::rendezvous::client::Behaviour,
rendezvous_point: Multiaddr,
rendezvous_peer_id: PeerId,
namespace: XmrBtcNamespace,
Expand All @@ -208,10 +209,7 @@ pub mod rendezous {
registration_ttl: Option<u64>,
) -> Self {
Self {
inner: libp2p::rendezvous::Rendezvous::new(
identity,
libp2p::rendezvous::Config::default(),
),
inner: libp2p::rendezvous::client::Behaviour::new(identity),
rendezvous_point: rendezvous_address,
rendezvous_peer_id,
namespace,
Expand All @@ -232,8 +230,8 @@ pub mod rendezous {

impl NetworkBehaviour for Behaviour {
type ProtocolsHandler =
<libp2p::rendezvous::Rendezvous as NetworkBehaviour>::ProtocolsHandler;
type OutEvent = libp2p::rendezvous::Event;
<libp2p::rendezvous::client::Behaviour as NetworkBehaviour>::ProtocolsHandler;
type OutEvent = libp2p::rendezvous::client::Event;

fn new_handler(&mut self) -> Self::ProtocolsHandler {
self.inner.new_handler()
Expand Down Expand Up @@ -277,14 +275,19 @@ pub mod rendezous {
self.inner.inject_event(peer_id, connection, event)
}

fn inject_dial_failure(&mut self, peer_id: &PeerId) {
fn inject_dial_failure(
&mut self,
peer_id: &PeerId,
_handler: Self::ProtocolsHandler,
_error: DialError,
) {
if peer_id == &self.rendezvous_peer_id {
self.connection_status = ConnectionStatus::Disconnected;
}
}

#[allow(clippy::type_complexity)]
fn poll(&mut self, cx: &mut std::task::Context<'_>, params: &mut impl PollParameters) -> Poll<NetworkBehaviourAction<<<Self::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InEvent, Self::OutEvent>>{
fn poll(&mut self, cx: &mut std::task::Context<'_>, params: &mut impl PollParameters) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ProtocolsHandler>>{
match &mut self.registration_status {
RegistrationStatus::RegisterOnNextConnection => match self.connection_status {
ConnectionStatus::Disconnected => {
Expand All @@ -293,6 +296,7 @@ pub mod rendezous {
return Poll::Ready(NetworkBehaviourAction::DialPeer {
peer_id: self.rendezvous_peer_id,
condition: DialPeerCondition::Disconnected,
handler: Self::ProtocolsHandler::new(Duration::from_secs(30)),
});
}
ConnectionStatus::Dialling => {}
Expand All @@ -315,6 +319,7 @@ pub mod rendezous {
return Poll::Ready(NetworkBehaviourAction::DialPeer {
peer_id: self.rendezvous_peer_id,
condition: DialPeerCondition::Disconnected,
handler: Self::ProtocolsHandler::new(Duration::from_secs(30))
});
}
ConnectionStatus::Dialling => {}
Expand All @@ -328,7 +333,7 @@ pub mod rendezous {

// reset the timer if we successfully registered
if let Poll::Ready(NetworkBehaviourAction::GenerateEvent(
libp2p::rendezvous::Event::Registered { ttl, .. },
libp2p::rendezvous::client::Event::Registered { ttl, .. },
)) = &inner_poll
{
let half_of_ttl = Duration::from_secs(*ttl) / 2;
Expand All @@ -353,7 +358,7 @@ pub mod rendezous {
async fn given_no_initial_connection_when_constructed_asb_connects_and_registers_with_rendezvous_node(
) {
let mut rendezvous_node = new_swarm(|_, identity| {
libp2p::rendezvous::Rendezvous::new(identity, libp2p::rendezvous::Config::default())
libp2p::rendezvous::client::Behaviour::new(identity)
});
let rendezvous_address = rendezvous_node.listen_on_random_memory_address().await;

Expand All @@ -375,7 +380,7 @@ pub mod rendezous {
});
let asb_registered = tokio::spawn(async move {
loop {
if let SwarmEvent::Behaviour(libp2p::rendezvous::Event::Registered { .. }) =
if let SwarmEvent::Behaviour(libp2p::rendezvous::client::Event::Registered { .. }) =
asb.select_next_some().await
{
break;
Expand All @@ -391,12 +396,8 @@ pub mod rendezous {

#[tokio::test]
async fn asb_automatically_re_registers() {
let min_ttl = 5;
let mut rendezvous_node = new_swarm(|_, identity| {
libp2p::rendezvous::Rendezvous::new(
identity,
libp2p::rendezvous::Config::default().with_min_ttl(min_ttl),
)
libp2p::rendezvous::client::Behaviour::new(identity)
});
let rendezvous_address = rendezvous_node.listen_on_random_memory_address().await;

Expand All @@ -420,7 +421,7 @@ pub mod rendezous {
let mut number_of_registrations = 0;

loop {
if let SwarmEvent::Behaviour(libp2p::rendezvous::Event::Registered { .. }) =
if let SwarmEvent::Behaviour(libp2p::rendezvous::client::Event::Registered { .. }) =
asb.select_next_some().await
{
number_of_registrations += 1
Expand Down
58 changes: 48 additions & 10 deletions swap/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,9 @@ mod tests {
}

async fn setup_rendezvous_point() -> (Multiaddr, PeerId) {
let mut rendezvous_node = new_swarm(|_, identity| RendezvousPointBehaviour {
rendezvous: libp2p::rendezvous::Rendezvous::new(
identity,
libp2p::rendezvous::Config::default(),
let mut rendezvous_node = new_swarm(|_, _| RendezvousPointBehaviour {
rendezvous: libp2p::rendezvous::server::Behaviour::new(
libp2p::rendezvous::server::Config::default(),
),
ping: Default::default(),
});
Expand Down Expand Up @@ -126,7 +125,11 @@ mod tests {
}
}

#[derive(Debug)]
struct StaticQuoteAsbEvent;

#[derive(libp2p::NetworkBehaviour)]
#[behaviour(out_event = "StaticQuoteAsbEvent")]
struct StaticQuoteAsbBehaviour {
rendezvous: asb::rendezous::Behaviour,
// Support `Ping` as a workaround until https://github.com/libp2p/rust-libp2p/issues/2109 is fixed.
Expand All @@ -139,13 +142,32 @@ mod tests {
registered: bool,
}

impl NetworkBehaviourEventProcess<libp2p::rendezvous::Event> for StaticQuoteAsbBehaviour {
fn inject_event(&mut self, event: libp2p::rendezvous::Event) {
if let libp2p::rendezvous::Event::Registered { .. } = event {
impl From<RequestResponseEvent<(), quote::BidQuote>> for StaticQuoteAsbEvent {
fn from(_: RequestResponseEvent<(), BidQuote>) -> Self {
StaticQuoteAsbEvent
}
}

impl From<libp2p::ping::Event> for StaticQuoteAsbEvent {
fn from(_: libp2p::ping::Event) -> Self {
StaticQuoteAsbEvent
}
}

impl From<libp2p::rendezvous::client::Event> for StaticQuoteAsbEvent {
fn from(_: libp2p::rendezvous::client::Event) -> Self {
StaticQuoteAsbEvent
}
}

impl NetworkBehaviourEventProcess<libp2p::rendezvous::client::Event> for StaticQuoteAsbBehaviour {
fn inject_event(&mut self, event: libp2p::rendezvous::client::Event) {
if let libp2p::rendezvous::client::Event::Registered { .. } = event {
self.registered = true;
}
}
}

impl NetworkBehaviourEventProcess<libp2p::ping::PingEvent> for StaticQuoteAsbBehaviour {
fn inject_event(&mut self, _: libp2p::ping::PingEvent) {}
}
Expand All @@ -163,17 +185,33 @@ mod tests {
}
}

#[derive(Debug)]
struct RendezvousPointEvent;

#[derive(libp2p::NetworkBehaviour)]
#[behaviour(out_event = "RendezvousPointEvent")]
struct RendezvousPointBehaviour {
rendezvous: libp2p::rendezvous::Rendezvous,
rendezvous: libp2p::rendezvous::server::Behaviour,
// Support `Ping` as a workaround until https://github.com/libp2p/rust-libp2p/issues/2109 is fixed.
ping: libp2p::ping::Ping,
}

impl NetworkBehaviourEventProcess<libp2p::rendezvous::Event> for RendezvousPointBehaviour {
fn inject_event(&mut self, _: libp2p::rendezvous::Event) {}
impl NetworkBehaviourEventProcess<libp2p::rendezvous::server::Event> for RendezvousPointBehaviour {
fn inject_event(&mut self, _: libp2p::rendezvous::server::Event) {}
}
impl NetworkBehaviourEventProcess<libp2p::ping::PingEvent> for RendezvousPointBehaviour {
fn inject_event(&mut self, _: libp2p::ping::PingEvent) {}
}

impl From<libp2p::ping::Event> for RendezvousPointEvent {
fn from(_: libp2p::ping::Event) -> Self {
RendezvousPointEvent
}
}

impl From<libp2p::rendezvous::server::Event> for RendezvousPointEvent {
fn from(_: libp2p::rendezvous::server::Event) -> Self {
RendezvousPointEvent
}
}
}
14 changes: 7 additions & 7 deletions swap/src/cli/list_sellers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use anyhow::{Context, Result};
use futures::StreamExt;
use libp2p::multiaddr::Protocol;
use libp2p::ping::{Ping, PingConfig, PingEvent};
use libp2p::rendezvous::{Namespace, Rendezvous};
use libp2p::rendezvous::Namespace;
use libp2p::request_response::{RequestResponseEvent, RequestResponseMessage};
use libp2p::swarm::SwarmEvent;
use libp2p::{identity, rendezvous, Multiaddr, PeerId, Swarm};
Expand All @@ -29,7 +29,7 @@ pub async fn list_sellers(
identity: identity::Keypair,
) -> Result<Vec<Seller>> {
let behaviour = Behaviour {
rendezvous: Rendezvous::new(identity.clone(), rendezvous::Config::default()),
rendezvous: libp2p::rendezvous::client::Behaviour::new(identity.clone()),
quote: quote::cli(),
ping: Ping::new(
PingConfig::new()
Expand Down Expand Up @@ -74,13 +74,13 @@ pub enum Status {

#[derive(Debug)]
enum OutEvent {
Rendezvous(rendezvous::Event),
Rendezvous(rendezvous::client::Event),
Quote(quote::OutEvent),
Ping(PingEvent),
}

impl From<rendezvous::Event> for OutEvent {
fn from(event: rendezvous::Event) -> Self {
impl From<rendezvous::client::Event> for OutEvent {
fn from(event: rendezvous::client::Event) -> Self {
OutEvent::Rendezvous(event)
}
}
Expand All @@ -95,7 +95,7 @@ impl From<quote::OutEvent> for OutEvent {
#[behaviour(event_process = false)]
#[behaviour(out_event = "OutEvent")]
struct Behaviour {
rendezvous: Rendezvous,
rendezvous: rendezvous::client::Behaviour,
quote: quote::Behaviour,
ping: Ping,
}
Expand Down Expand Up @@ -194,7 +194,7 @@ impl EventLoop {
}
}
SwarmEvent::Behaviour(OutEvent::Rendezvous(
rendezvous::Event::Discovered { registrations, .. },
libp2p::rendezvous::client::Event::Discovered { registrations, .. },
)) => {
self.state = State::WaitForQuoteCompletion;

Expand Down
12 changes: 4 additions & 8 deletions swap/src/network/cbor_request_response.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use async_trait::async_trait;
use futures::prelude::*;
use libp2p::core::upgrade;
use libp2p::core::upgrade::ReadOneError;
use libp2p::request_response::{ProtocolName, RequestResponseCodec};
use serde::de::DeserializeOwned;
use serde::Serialize;
Expand Down Expand Up @@ -40,10 +39,7 @@ where
where
T: AsyncRead + Unpin + Send,
{
let message = upgrade::read_one(io, BUF_SIZE).await.map_err(|e| match e {
ReadOneError::Io(err) => err,
e => io::Error::new(io::ErrorKind::Other, e),
})?;
let message = upgrade::read_length_prefixed(io, BUF_SIZE).await.map_err(|e| io::Error::new(io::ErrorKind::Other, e))?;
let mut de = serde_cbor::Deserializer::from_slice(&message);
let msg = Req::deserialize(&mut de)
.map_err(|error| io::Error::new(io::ErrorKind::Other, error))?;
Expand All @@ -59,7 +55,7 @@ where
where
T: AsyncRead + Unpin + Send,
{
let message = upgrade::read_one(io, BUF_SIZE)
let message = upgrade::read_length_prefixed(io, BUF_SIZE)
.await
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
let mut de = serde_cbor::Deserializer::from_slice(&message);
Expand All @@ -81,7 +77,7 @@ where
let bytes =
serde_cbor::to_vec(&req).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;

upgrade::write_one(io, &bytes).await?;
upgrade::write_length_prefixed(io, &bytes).await?;

Ok(())
}
Expand All @@ -97,7 +93,7 @@ where
{
let bytes = serde_cbor::to_vec(&res)
.map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?;
upgrade::write_one(io, &bytes).await?;
upgrade::write_length_prefixed(io, &bytes).await?;

Ok(())
}
Expand Down
4 changes: 2 additions & 2 deletions swap/src/network/json_pull_codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ where
where
T: AsyncRead + Unpin + Send,
{
let message = upgrade::read_one(io, BUF_SIZE)
let message = upgrade::read_length_prefixed(io, BUF_SIZE)
.await
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
let mut de = serde_json::Deserializer::from_slice(&message);
Expand Down Expand Up @@ -88,7 +88,7 @@ where
{
let bytes = serde_json::to_vec(&res)
.map_err(|error| io::Error::new(io::ErrorKind::InvalidData, error))?;
upgrade::write_one(io, &bytes).await?;
upgrade::write_length_prefixed(io, &bytes).await?;

Ok(())
}
Expand Down
Loading

0 comments on commit 036dff3

Please sign in to comment.