From 1a05edb9b21bb2b16a1f83d5e71b9fab5e80c9c4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleksandar=20Terenti=C4=87?= Date: Tue, 29 Oct 2024 10:11:28 +0100 Subject: [PATCH] Use tokio primitives for stream timeouts --- core/src/network/rpc/client.rs | 182 ++++++-------------------- core/src/network/rpc/subscriptions.rs | 2 +- 2 files changed, 43 insertions(+), 141 deletions(-) diff --git a/core/src/network/rpc/client.rs b/core/src/network/rpc/client.rs index 8cf42e161..22f24fb3d 100644 --- a/core/src/network/rpc/client.rs +++ b/core/src/network/rpc/client.rs @@ -15,21 +15,11 @@ use color_eyre::{ Report, Result, }; use futures::{Stream, TryStreamExt}; -use rand::Rng; -use std::{ - iter::Iterator, - pin::Pin, - sync::Arc, - task::{Context, Poll}, - time::Duration, -}; -use tokio::{ - sync::RwLock, - time::{timeout, Instant}, -}; +use std::{iter::Iterator, pin::Pin, sync::Arc, time::Duration}; +use tokio::sync::RwLock; use tokio_retry::Retry; -use tokio_stream::StreamExt; -use tracing::{debug, error, info, warn}; +use tokio_stream::{Elapsed, StreamExt}; +use tracing::{error, info, warn}; use super::{configuration::RetryConfig, Node, Nodes, Subscription, WrappedProof}; use crate::{ @@ -122,96 +112,7 @@ impl GenesisHash { } type SubscriptionStream = - Pin> + Send>>; - -// Custom type for merged subscription streams -struct MergedSubscriptions { - headers: SubscriptionStream, - justifications: SubscriptionStream, - last_header_at: Instant, - last_justification_at: Instant, - timeout_in: Duration, -} - -enum StreamType { - Headers, - Justifications, -} - -impl StreamType { - fn label(&self) -> &str { - match self { - StreamType::Headers => "Avail Headers", - StreamType::Justifications => "Grandpa Justifications", - } - } -} - -impl MergedSubscriptions { - fn streams(&mut self, headers_first: bool) -> Vec<(&mut SubscriptionStream, StreamType)> { - let mut streams = vec![ - (&mut self.headers, StreamType::Headers), - (&mut self.justifications, StreamType::Justifications), - ]; - if !headers_first { - streams.reverse(); - } - streams - } -} - -impl Stream for MergedSubscriptions { - type Item = Result; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - // Randomly decide which stream to poll first - let poll_headers_first = rand::thread_rng().gen_bool(0.5); - - for (stream, stream_type) in self.streams(poll_headers_first) { - let label = stream_type.label(); - - let result = match (*stream).as_mut().poll_next(cx) { - Poll::Ready(None) => { - debug!("{label} stream ended, terminating merged stream"); - Poll::Ready(None) - }, - Poll::Ready(Some(Ok(item))) => { - info!("Received {label} item"); - - match stream_type { - StreamType::Headers => self.last_header_at = Instant::now(), - StreamType::Justifications => self.last_justification_at = Instant::now(), - } - - Poll::Ready(Some(Ok(item))) - }, - Poll::Ready(Some(Err(e))) => { - error!("Error in {label} stream: {:?}", e); - Poll::Ready(Some(Err(e))) - }, - Poll::Pending => continue, - }; - - return result; - } - - let now = Instant::now(); - - if now.duration_since(self.last_header_at) > self.timeout_in { - return Poll::Ready(Some(Err(subxt::Error::Other( - "Headers stream timeout".to_string(), - )))); - } - - if now.duration_since(self.last_justification_at) > self.timeout_in { - return Poll::Ready(Some(Err(subxt::Error::Other( - "Justifications stream timeout".to_string(), - )))); - } - - Poll::Pending - } -} + Pin, Elapsed>> + Send>>; #[derive(Clone)] pub struct Client { @@ -486,24 +387,23 @@ impl Client { } pub async fn subscription_stream(self) -> impl Stream> { - let timeout_in = Duration::from_secs(60); async_stream::stream! { loop { match self.with_retries(Self::create_rpc_subscriptions).await { Ok(mut stream) => { loop { - match timeout(timeout_in, stream.next()).await { - Ok(Some(Ok(item))) => { + match stream.next().await { + Some(Ok(Ok(item))) => { yield Ok(item); continue; }, - Ok(Some(Err(error))) => warn!(%error, "Received error on RPC Subscription stream. Creating new connection."), - Ok(None) => {}, - Err(error) => warn!(%error, "Received timeout on RPC Subscription stream. Creating new connection."), + Some(Ok(Err(error))) => warn!(%error, "Received error on RPC Subscription stream. Creating new connection."), + Some(Err(error)) => warn!(%error, "Received error on RPC Subscription stream. Creating new connection."), + None => warn!("RPC Subscription Stream exhausted. Creating new connection."), } break; } - warn!("RPC Subscription Stream exhausted. Creating new connection."); + } Err(err) => { warn!(error = %err, "Failed to create RPC Subscription stream."); @@ -519,38 +419,40 @@ impl Client { self.subxt_client.read().await.clone() } - async fn create_rpc_subscriptions( - client: SDK, - ) -> Result>> { + async fn create_rpc_subscriptions(client: SDK) -> Result { + let timeout_in = Duration::from_secs(30); + // Create fused Avail Header subscription - let headers = client - .api - .backend() - .stream_finalized_block_headers() - .await? - .map_ok(|(header, _)| Subscription::Header(header)) - .fuse(); + let headers: SubscriptionStream = Box::pin( + client + .api + .backend() + .stream_finalized_block_headers() + .await? + .map_ok(|(header, _)| Subscription::Header(header)) + .inspect_ok(|_| info!("Received header on the stream")) + .timeout(timeout_in) + .fuse(), + ); // Create fused GrandpaJustification subscription - let justifications = client - .rpc - .client - .subscribe( - "grandpa_subscribeJustifications", - rpc_params![], - "grandpa_unsubscribeJustifications", - ) - .await? - .map_ok(Subscription::Justification) - .fuse(); - - Ok(MergedSubscriptions { - headers: Box::pin(headers), - justifications: Box::pin(justifications), - last_header_at: Instant::now(), - last_justification_at: Instant::now(), - timeout_in: Duration::from_secs(60), - }) + let justifications: SubscriptionStream = Box::pin( + client + .rpc + .client + .subscribe( + "grandpa_subscribeJustifications", + rpc_params![], + "grandpa_unsubscribeJustifications", + ) + .await? + .map_ok(Subscription::Justification) + .inspect_ok(|_| info!("Received justification on the stream")) + .timeout(timeout_in) + .fuse(), + ); + + Ok(Box::pin(headers.merge(justifications))) } pub async fn get_block_hash(&self, block_number: u32) -> Result { diff --git a/core/src/network/rpc/subscriptions.rs b/core/src/network/rpc/subscriptions.rs index 50f451471..9df1fc235 100644 --- a/core/src/network/rpc/subscriptions.rs +++ b/core/src/network/rpc/subscriptions.rs @@ -93,7 +93,7 @@ impl SubscriptionLoop { match subscriptions.next().await { Some(Ok(sub)) => self.handle_new_subscription(sub).await, Some(Err(error)) => return Err(eyre!(error)), - None => return Err(eyre!("Subscriptions ended")), + None => return Err(eyre!("No available subscriptions")), } } }