Skip to content

Commit

Permalink
Refactor cluster app info and segment headers responses with concurre…
Browse files Browse the repository at this point in the history
…ncy and full segment headers cache on controller
  • Loading branch information
nazar-pc committed May 13, 2024
1 parent 7257373 commit 6575cb2
Show file tree
Hide file tree
Showing 2 changed files with 189 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,14 @@ pub(super) async fn controller(
let instance = instance.clone();

move || async move {
controller_service(&nats_client, &node_client, &piece_getter, &instance).await
controller_service(
&nats_client,
&node_client,
&piece_getter,
&instance,
&AsyncRwLock::default(),
)
.await
}
},
"controller-service".to_string(),
Expand Down
261 changes: 181 additions & 80 deletions crates/subspace-farmer/src/cluster/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::cluster::nats_client::{
};
use crate::node_client::{Error as NodeClientError, NodeClient};
use anyhow::anyhow;
use async_lock::Semaphore;
use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock, Semaphore};
use async_nats::{HeaderValue, Message};
use async_trait::async_trait;
use futures::stream::FuturesUnordered;
Expand All @@ -28,8 +28,9 @@ use subspace_core_primitives::{Piece, PieceIndex, SegmentHeader, SegmentIndex};
use subspace_farmer_components::PieceGetter;
use subspace_rpc_primitives::{
FarmerAppInfo, RewardSignatureResponse, RewardSigningInfo, SlotInfo, SolutionResponse,
MAX_SEGMENT_HEADERS_PER_REQUEST,
};
use tracing::{debug, trace, warn};
use tracing::{debug, info, trace, warn};

const FARMER_APP_INFO_DEDUPLICATION_WINDOW: Duration = Duration::from_secs(1);

Expand Down Expand Up @@ -324,6 +325,7 @@ pub async fn controller_service<NC, PG>(
node_client: &NC,
piece_getter: &PG,
instance: &str,
segment_headers: &AsyncRwLock<Vec<SegmentHeader>>,
) -> anyhow::Result<()>
where
NC: NodeClient,
Expand All @@ -336,7 +338,7 @@ where
result = reward_signing_broadcaster(nats_client, node_client, instance).fuse() => {
result
},
result = archived_segment_headers_broadcaster(nats_client, node_client, instance).fuse() => {
result = archived_segment_headers_broadcaster(nats_client, node_client, instance, segment_headers).fuse() => {
result
},
result = solution_response_forwarder(nats_client, node_client, instance).fuse() => {
Expand All @@ -348,7 +350,7 @@ where
result = farmer_app_info_responder(nats_client, node_client).fuse() => {
result
},
result = segment_headers_responder(nats_client, node_client).fuse() => {
result = segment_headers_responder(nats_client, segment_headers).fuse() => {
result
},
result = piece_responder(nats_client, piece_getter).fuse() => {
Expand Down Expand Up @@ -428,6 +430,7 @@ async fn archived_segment_headers_broadcaster<NC>(
nats_client: &NatsClient,
node_client: &NC,
instance: &str,
segment_headers: &AsyncRwLock<Vec<SegmentHeader>>,
) -> anyhow::Result<()>
where
NC: NodeClient,
Expand All @@ -439,6 +442,39 @@ where
anyhow!("Failed to subscribe to archived segment header notifications: {error}")
})?;

info!("Downloading all segment headers from node...");
{
let mut segment_headers = segment_headers.write().await;
let mut segment_index_offset = SegmentIndex::from(segment_headers.len() as u64);
let segment_index_step = SegmentIndex::from(MAX_SEGMENT_HEADERS_PER_REQUEST as u64);

'outer: loop {
let from = segment_index_offset;
let to = segment_index_offset + segment_index_step;
trace!(%from, %to, "Requesting segment headers");

for maybe_segment_header in node_client
.segment_headers((from..to).collect())
.await
.map_err(|error| {
anyhow!("Failed to download segment headers {from}..{to} from node: {error}")
})?
{
let Some(segment_header) = maybe_segment_header else {
// Reached non-existent segment header
break 'outer;
};

if segment_headers.len() == u64::from(segment_header.segment_index()) as usize {
segment_headers.push(segment_header);
}
}

segment_index_offset += segment_index_step;
}
}
info!("Downloaded all segment headers from node successfully");

while let Some(archived_segment_header) = archived_segments_notifications.next().await {
trace!(
?archived_segment_header,
Expand All @@ -461,6 +497,11 @@ where
{
warn!(%error, "Failed to broadcast archived segment header info");
}

let mut segment_headers = segment_headers.write().await;
if segment_headers.len() == u64::from(archived_segment_header.segment_index()) as usize {
segment_headers.push(archived_segment_header);
}
}

Ok(())
Expand Down Expand Up @@ -535,121 +576,181 @@ async fn farmer_app_info_responder<NC>(
where
NC: NodeClient,
{
let mut subscription = nats_client
let farmer_app_info: <ClusterControllerFarmerAppInfoRequest as GenericRequest>::Response =
node_client
.farmer_app_info()
.await
.map_err(|error| anyhow!("Failed to get farmer app info: {error}"))?;
let last_farmer_app_info = AsyncMutex::new((farmer_app_info, Instant::now()));

// Initialize with pending future so it never ends
let mut processing = FuturesUnordered::<Pin<Box<dyn Future<Output = ()> + Send>>>::from_iter([
Box::pin(pending()) as Pin<Box<_>>,
]);

let subscription = nats_client
.queue_subscribe(
ClusterControllerFarmerAppInfoRequest::SUBJECT,
"subspace.controller".to_string(),
)
.await
.map_err(|error| anyhow!("Failed to subscribe to farmer app info requests: {error}"))?;
debug!(?subscription, "Farmer app info requests subscription");
let mut subscription = subscription.fuse();

let mut last_farmer_app_info: <ClusterControllerFarmerAppInfoRequest as GenericRequest>::Response = node_client
.farmer_app_info()
.await
.map_err(|error| anyhow!("Failed to get farmer app info: {error}"))?;
let mut last_farmer_app_info_request = Instant::now();
loop {
select! {
maybe_message = subscription.next() => {
let Some(message) = maybe_message else {
break;
};

while let Some(message) = subscription.next().await {
trace!("Farmer app info request");
// Create background task for concurrent processing
processing.push(Box::pin(process_farmer_app_info_request(
nats_client,
node_client,
message,
&last_farmer_app_info,
)));
}
_ = processing.next() => {
// Nothing to do here
}
}
}

let Some(reply_subject) = message.reply else {
continue;
};
Ok(())
}

async fn process_farmer_app_info_request<NC>(
nats_client: &NatsClient,
node_client: &NC,
message: Message,
last_farmer_app_info: &AsyncMutex<(FarmerAppInfo, Instant)>,
) where
NC: NodeClient,
{
let Some(reply_subject) = message.reply else {
return;
};

trace!("Farmer app info request");

let farmer_app_info = {
let (last_farmer_app_info, last_farmer_app_info_request) =
&mut *last_farmer_app_info.lock().await;

if last_farmer_app_info_request.elapsed() > FARMER_APP_INFO_DEDUPLICATION_WINDOW {
match node_client.farmer_app_info().await {
let farmer_app_info: Result<
<ClusterControllerFarmerAppInfoRequest as GenericRequest>::Response,
_,
> = node_client.farmer_app_info().await;
match farmer_app_info {
Ok(new_last_farmer_app_info) => {
last_farmer_app_info = new_last_farmer_app_info;
last_farmer_app_info_request = Instant::now();
*last_farmer_app_info = new_last_farmer_app_info;
*last_farmer_app_info_request = Instant::now();
}
Err(error) => {
warn!(%error, "Failed to get farmer app info");
}
}
}

if let Err(error) = nats_client
.publish(reply_subject, last_farmer_app_info.encode().into())
.await
{
warn!(%error, "Failed to send farmer app info response");
}
}
last_farmer_app_info.clone()
};

Ok(())
if let Err(error) = nats_client
.publish(reply_subject, farmer_app_info.encode().into())
.await
{
warn!(%error, "Failed to send farmer app info response");
}
}

async fn segment_headers_responder<NC>(
async fn segment_headers_responder(
nats_client: &NatsClient,
node_client: &NC,
) -> anyhow::Result<()>
where
NC: NodeClient,
{
let mut subscription = nats_client
segment_headers: &AsyncRwLock<Vec<SegmentHeader>>,
) -> anyhow::Result<()> {
// Initialize with pending future so it never ends
let mut processing = FuturesUnordered::<Pin<Box<dyn Future<Output = ()> + Send>>>::from_iter([
Box::pin(pending()) as Pin<Box<_>>,
]);

let subscription = nats_client
.queue_subscribe(
ClusterControllerSegmentHeadersRequest::SUBJECT,
"subspace.controller".to_string(),
)
.await
.map_err(|error| anyhow!("Failed to subscribe to segment headers requests: {error}"))?;
debug!(?subscription, "Segment headers requests subscription");
let mut subscription = subscription.fuse();

let mut last_request_response = None::<(
ClusterControllerSegmentHeadersRequest,
<ClusterControllerSegmentHeadersRequest as GenericRequest>::Response,
)>;
loop {
select! {
maybe_message = subscription.next() => {
let Some(message) = maybe_message else {
break;
};

while let Some(message) = subscription.next().await {
let Some(reply_subject) = message.reply else {
continue;
};
// Create background task for concurrent processing
processing.push(Box::pin(process_segment_headers_request(
nats_client,
segment_headers,
message,
)));
}
_ = processing.next() => {
// Nothing to do here
}
}
}
Ok(())
}

let request =
match ClusterControllerSegmentHeadersRequest::decode(&mut message.payload.as_ref()) {
Ok(request) => request,
Err(error) => {
warn!(
%error,
message = %hex::encode(message.payload),
"Failed to decode segment headers request"
);
continue;
}
};
trace!(?request, "Segment headers request");
async fn process_segment_headers_request(
nats_client: &NatsClient,
segment_headers: &AsyncRwLock<Vec<SegmentHeader>>,
message: Message,
) {
let Some(reply_subject) = message.reply else {
return;
};

let response = if let Some((last_request, response)) = &last_request_response
&& last_request.segment_indices == request.segment_indices
{
response
} else {
match node_client
.segment_headers(request.segment_indices.clone())
.await
{
Ok(segment_headers) => &last_request_response.insert((request, segment_headers)).1,
Err(error) => {
warn!(
%error,
segment_indices = ?request.segment_indices,
"Failed to get segment headers"
);
continue;
}
let request =
match ClusterControllerSegmentHeadersRequest::decode(&mut message.payload.as_ref()) {
Ok(request) => request,
Err(error) => {
warn!(
%error,
message = %hex::encode(message.payload),
"Failed to decode segment headers request"
);
return;
}
};
trace!(?request, "Segment headers request");

let response: <ClusterControllerSegmentHeadersRequest as GenericRequest>::Response = {
let segment_headers = segment_headers.read().await;

request
.segment_indices
.into_iter()
.map(|segment_index| {
segment_headers
.get(u64::from(segment_index) as usize)
.copied()
})
.collect()
};

if let Err(error) = nats_client
.publish(reply_subject, response.encode().into())
.await
{
warn!(%error, "Failed to send farmer app info response");
}
if let Err(error) = nats_client
.publish(reply_subject, response.encode().into())
.await
{
warn!(%error, "Failed to send segment headers response");
}

Ok(())
}

async fn piece_responder<PG>(nats_client: &NatsClient, piece_getter: &PG) -> anyhow::Result<()>
Expand Down

0 comments on commit 6575cb2

Please sign in to comment.