diff --git a/crates/bitmex-stream/src/lib.rs b/crates/bitmex-stream/src/lib.rs index 0d81369b4..5179a73b4 100644 --- a/crates/bitmex-stream/src/lib.rs +++ b/crates/bitmex-stream/src/lib.rs @@ -9,6 +9,7 @@ use serde::Serialize; use serde_json::to_string; use std::ops::Add; use std::time::Duration; +use std::time::Instant; use std::time::SystemTime; use std::time::UNIX_EPOCH; use tokio_tungstenite::tungstenite; @@ -80,18 +81,32 @@ fn subscribe_impl( ))?) .await; + let mut last_bitmex_message = Instant::now(); loop { tokio::select! { _ = tokio::time::sleep(Duration::from_secs(5)) => { + if last_bitmex_message.elapsed() > Duration::from_secs(20) { + yield Err(anyhow::anyhow!("BitMex websocket timed out")); + return; + } + let span = tracing::trace_span!("Ping BitMex"); span.in_scope(|| tracing::trace!("No message from BitMex in the last 5 seconds, pinging")); - let _ = connection + + let res = connection .send(tungstenite::Message::Ping([0u8; 32].to_vec())) .instrument(span) .await; + + if let Err(e) = res { + yield Err(anyhow::anyhow!(e)); + return; + } }, msg = connection.next() => { + last_bitmex_message = Instant::now(); + let msg = match msg { Some(Ok(msg)) => { msg