Skip to content

Commit

Permalink
Add timeout to BitMex websocket and fail upward if exceeded
Browse files Browse the repository at this point in the history
This is to prevent the websocket from getting stuck if disconnected, as pings will still return Ok(()) even if the link layer fails
  • Loading branch information
Restioson committed Jul 27, 2023
1 parent c770ac8 commit fc98e5f
Showing 1 changed file with 16 additions and 1 deletion.
17 changes: 16 additions & 1 deletion crates/bitmex-stream/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use serde::Serialize;
use serde_json::to_string;
use std::ops::Add;
use std::time::Duration;
use std::time::Instant;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;
use tokio_tungstenite::tungstenite;
Expand Down Expand Up @@ -80,18 +81,32 @@ fn subscribe_impl<const N: usize>(
))?)
.await;

let mut last_bitmex_message = Instant::now();

loop {
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(5)) => {
if last_bitmex_message.elapsed() > Duration::from_secs(20) {
yield Err(anyhow::anyhow!("BitMex websocket timed out"));
return;
}

let span = tracing::trace_span!("Ping BitMex");
span.in_scope(|| tracing::trace!("No message from BitMex in the last 5 seconds, pinging"));
let _ = connection

let res = connection
.send(tungstenite::Message::Ping([0u8; 32].to_vec()))
.instrument(span)
.await;

if let Err(e) = res {
yield Err(anyhow::anyhow!(e));
return;
}
},
msg = connection.next() => {
last_bitmex_message = Instant::now();

let msg = match msg {
Some(Ok(msg)) => {
msg
Expand Down

0 comments on commit fc98e5f

Please sign in to comment.