Skip to content

Commit

Permalink
Use tokio primitives for stream timeouts
Browse files Browse the repository at this point in the history
  • Loading branch information
aterentic-ethernal committed Oct 29, 2024
1 parent 0618881 commit 1a05edb
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 141 deletions.
182 changes: 42 additions & 140 deletions core/src/network/rpc/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,11 @@ use color_eyre::{
Report, Result,
};
use futures::{Stream, TryStreamExt};
use rand::Rng;
use std::{
iter::Iterator,
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::Duration,
};
use tokio::{
sync::RwLock,
time::{timeout, Instant},
};
use std::{iter::Iterator, pin::Pin, sync::Arc, time::Duration};
use tokio::sync::RwLock;
use tokio_retry::Retry;
use tokio_stream::StreamExt;
use tracing::{debug, error, info, warn};
use tokio_stream::{Elapsed, StreamExt};
use tracing::{error, info, warn};

use super::{configuration::RetryConfig, Node, Nodes, Subscription, WrappedProof};
use crate::{
Expand Down Expand Up @@ -122,96 +112,7 @@ impl GenesisHash {
}

type SubscriptionStream =
Pin<Box<dyn Stream<Item = Result<Subscription, subxt::error::Error>> + Send>>;

// Custom type for merged subscription streams
struct MergedSubscriptions {
headers: SubscriptionStream,
justifications: SubscriptionStream,
last_header_at: Instant,
last_justification_at: Instant,
timeout_in: Duration,
}

enum StreamType {
Headers,
Justifications,
}

impl StreamType {
fn label(&self) -> &str {
match self {
StreamType::Headers => "Avail Headers",
StreamType::Justifications => "Grandpa Justifications",
}
}
}

impl MergedSubscriptions {
fn streams(&mut self, headers_first: bool) -> Vec<(&mut SubscriptionStream, StreamType)> {
let mut streams = vec![
(&mut self.headers, StreamType::Headers),
(&mut self.justifications, StreamType::Justifications),
];
if !headers_first {
streams.reverse();
}
streams
}
}

impl Stream for MergedSubscriptions {
type Item = Result<Subscription, subxt::error::Error>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// Randomly decide which stream to poll first
let poll_headers_first = rand::thread_rng().gen_bool(0.5);

for (stream, stream_type) in self.streams(poll_headers_first) {
let label = stream_type.label();

let result = match (*stream).as_mut().poll_next(cx) {
Poll::Ready(None) => {
debug!("{label} stream ended, terminating merged stream");
Poll::Ready(None)
},
Poll::Ready(Some(Ok(item))) => {
info!("Received {label} item");

match stream_type {
StreamType::Headers => self.last_header_at = Instant::now(),
StreamType::Justifications => self.last_justification_at = Instant::now(),
}

Poll::Ready(Some(Ok(item)))
},
Poll::Ready(Some(Err(e))) => {
error!("Error in {label} stream: {:?}", e);
Poll::Ready(Some(Err(e)))
},
Poll::Pending => continue,
};

return result;
}

let now = Instant::now();

if now.duration_since(self.last_header_at) > self.timeout_in {
return Poll::Ready(Some(Err(subxt::Error::Other(
"Headers stream timeout".to_string(),
))));
}

if now.duration_since(self.last_justification_at) > self.timeout_in {
return Poll::Ready(Some(Err(subxt::Error::Other(
"Justifications stream timeout".to_string(),
))));
}

Poll::Pending
}
}
Pin<Box<dyn Stream<Item = Result<Result<Subscription, subxt::error::Error>, Elapsed>> + Send>>;

#[derive(Clone)]
pub struct Client<T: Database> {
Expand Down Expand Up @@ -486,24 +387,23 @@ impl<D: Database> Client<D> {
}

pub async fn subscription_stream(self) -> impl Stream<Item = Result<Subscription>> {
let timeout_in = Duration::from_secs(60);
async_stream::stream! {
loop {
match self.with_retries(Self::create_rpc_subscriptions).await {
Ok(mut stream) => {
loop {
match timeout(timeout_in, stream.next()).await {
Ok(Some(Ok(item))) => {
match stream.next().await {
Some(Ok(Ok(item))) => {
yield Ok(item);
continue;
},
Ok(Some(Err(error))) => warn!(%error, "Received error on RPC Subscription stream. Creating new connection."),
Ok(None) => {},
Err(error) => warn!(%error, "Received timeout on RPC Subscription stream. Creating new connection."),
Some(Ok(Err(error))) => warn!(%error, "Received error on RPC Subscription stream. Creating new connection."),
Some(Err(error)) => warn!(%error, "Received error on RPC Subscription stream. Creating new connection."),
None => warn!("RPC Subscription Stream exhausted. Creating new connection."),
}
break;
}
warn!("RPC Subscription Stream exhausted. Creating new connection.");

}
Err(err) => {
warn!(error = %err, "Failed to create RPC Subscription stream.");
Expand All @@ -519,38 +419,40 @@ impl<D: Database> Client<D> {
self.subxt_client.read().await.clone()
}

async fn create_rpc_subscriptions(
client: SDK,
) -> Result<impl Stream<Item = Result<Subscription, subxt::error::Error>>> {
async fn create_rpc_subscriptions(client: SDK) -> Result<SubscriptionStream> {
let timeout_in = Duration::from_secs(30);

// Create fused Avail Header subscription
let headers = client
.api
.backend()
.stream_finalized_block_headers()
.await?
.map_ok(|(header, _)| Subscription::Header(header))
.fuse();
let headers: SubscriptionStream = Box::pin(
client
.api
.backend()
.stream_finalized_block_headers()
.await?
.map_ok(|(header, _)| Subscription::Header(header))
.inspect_ok(|_| info!("Received header on the stream"))
.timeout(timeout_in)
.fuse(),
);

// Create fused GrandpaJustification subscription
let justifications = client
.rpc
.client
.subscribe(
"grandpa_subscribeJustifications",
rpc_params![],
"grandpa_unsubscribeJustifications",
)
.await?
.map_ok(Subscription::Justification)
.fuse();

Ok(MergedSubscriptions {
headers: Box::pin(headers),
justifications: Box::pin(justifications),
last_header_at: Instant::now(),
last_justification_at: Instant::now(),
timeout_in: Duration::from_secs(60),
})
let justifications: SubscriptionStream = Box::pin(
client
.rpc
.client
.subscribe(
"grandpa_subscribeJustifications",
rpc_params![],
"grandpa_unsubscribeJustifications",
)
.await?
.map_ok(Subscription::Justification)
.inspect_ok(|_| info!("Received justification on the stream"))
.timeout(timeout_in)
.fuse(),
);

Ok(Box::pin(headers.merge(justifications)))
}

pub async fn get_block_hash(&self, block_number: u32) -> Result<H256> {
Expand Down
2 changes: 1 addition & 1 deletion core/src/network/rpc/subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ impl<T: Database + Clone> SubscriptionLoop<T> {
match subscriptions.next().await {
Some(Ok(sub)) => self.handle_new_subscription(sub).await,
Some(Err(error)) => return Err(eyre!(error)),
None => return Err(eyre!("Subscriptions ended")),
None => return Err(eyre!("No available subscriptions")),
}
}
}
Expand Down

0 comments on commit 1a05edb

Please sign in to comment.