Skip to content

Commit

Permalink
Improve RPC performance for farmer
Browse files Browse the repository at this point in the history
  • Loading branch information
nazar-pc committed May 22, 2024
1 parent 9741144 commit 1ab1cdf
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 26 deletions.
18 changes: 10 additions & 8 deletions crates/sc-consensus-subspace/src/archiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ use crate::slot_worker::SubspaceSyncOracle;
use crate::{SubspaceLink, SubspaceNotificationSender};
use codec::{Decode, Encode};
use futures::StreamExt;
use parking_lot::Mutex;
use parking_lot::RwLock;
use rand::prelude::*;
use rand_chacha::ChaCha8Rng;
use rayon::prelude::*;
Expand Down Expand Up @@ -99,7 +99,7 @@ struct SegmentHeadersStoreInner<AS> {
aux_store: Arc<AS>,
next_key_index: AtomicU16,
/// In-memory cache of segment headers
cache: Mutex<Vec<SegmentHeader>>,
cache: RwLock<Vec<SegmentHeader>>,
}

/// Persistent storage of segment headers.
Expand Down Expand Up @@ -163,15 +163,15 @@ where
inner: Arc::new(SegmentHeadersStoreInner {
aux_store,
next_key_index: AtomicU16::new(next_key_index),
cache: Mutex::new(cache),
cache: RwLock::new(cache),
}),
confirmation_depth_k,
})
}

/// Returns last observed segment index
pub fn max_segment_index(&self) -> Option<SegmentIndex> {
let segment_index = self.inner.cache.lock().len().checked_sub(1)? as u64;
let segment_index = self.inner.cache.read().len().checked_sub(1)? as u64;
Some(SegmentIndex::from(segment_index))
}

Expand Down Expand Up @@ -235,7 +235,7 @@ where

self.inner.aux_store.insert_aux(&insert_data, &[])?;
}
self.inner.cache.lock().extend(segment_headers_to_store);
self.inner.cache.write().extend(segment_headers_to_store);

Ok(())
}
Expand All @@ -244,7 +244,7 @@ where
pub fn get_segment_header(&self, segment_index: SegmentIndex) -> Option<SegmentHeader> {
self.inner
.cache
.lock()
.read()
.get(u64::from(segment_index) as usize)
.copied()
}
Expand Down Expand Up @@ -314,11 +314,13 @@ where
break;
}
} else {
return Vec::new(); // no segment headers required
// No segment headers required
return Vec::new();
}
}

Vec::new() // no segment headers required
// No segment headers required
Vec::new()
}
}

Expand Down
19 changes: 1 addition & 18 deletions crates/subspace-farmer/src/node_client/node_rpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,43 +6,27 @@ use jsonrpsee::rpc_params;
use jsonrpsee::ws_client::{WsClient, WsClientBuilder};
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use subspace_core_primitives::{Piece, PieceIndex, SegmentHeader, SegmentIndex};
use subspace_rpc_primitives::{
FarmerAppInfo, RewardSignatureResponse, RewardSigningInfo, SlotInfo, SolutionResponse,
};
use tokio::sync::Semaphore;

/// Defines max_concurrent_requests constant in the node rpc client
const RPC_MAX_CONCURRENT_REQUESTS: usize = 1_000_000;
/// Node is having a hard time responding for many piece requests
// TODO: Remove this once https://github.com/paritytech/jsonrpsee/issues/1189 is resolved
const MAX_CONCURRENT_PIECE_REQUESTS: usize = 10;
const REQUEST_TIMEOUT: Duration = Duration::from_mins(5);

/// `WsClient` wrapper.
#[derive(Debug, Clone)]
pub struct NodeRpcClient {
client: Arc<WsClient>,
piece_request_semaphore: Arc<Semaphore>,
}

impl NodeRpcClient {
/// Create a new instance of [`NodeClient`].
pub async fn new(url: &str) -> Result<Self, JsonError> {
let client = Arc::new(
WsClientBuilder::default()
.max_concurrent_requests(RPC_MAX_CONCURRENT_REQUESTS)
.max_request_size(20 * 1024 * 1024)
.request_timeout(REQUEST_TIMEOUT)
.build(url)
.await?,
);
let piece_request_semaphore = Arc::new(Semaphore::new(MAX_CONCURRENT_PIECE_REQUESTS));
Ok(Self {
client,
piece_request_semaphore,
})
Ok(Self { client })
}
}

Expand Down Expand Up @@ -144,7 +128,6 @@ impl NodeClient for NodeRpcClient {
}

async fn piece(&self, piece_index: PieceIndex) -> Result<Option<Piece>, RpcError> {
let _permit = self.piece_request_semaphore.acquire().await?;
let result: Option<Vec<u8>> = self
.client
.request("subspace_piece", rpc_params![&piece_index])
Expand Down

0 comments on commit 1ab1cdf

Please sign in to comment.