Skip to content

Commit

Permalink
Merge pull request #3128 from autonomys/farmer-anyhow
Browse files Browse the repository at this point in the history
Use `anyhow` more widely in farmer code
  • Loading branch information
nazar-pc authored Oct 15, 2024
2 parents 7e7188b + 911868e commit 6e2e4b1
Show file tree
Hide file tree
Showing 17 changed files with 96 additions and 126 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/subspace-farmer-components/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ include = [
bench = false

[dependencies]
anyhow = "1.0.89"
async-lock = "3.4.0"
async-trait = "0.1.83"
backoff = { version = "0.4.0", features = ["futures", "tokio"] }
Expand Down
16 changes: 3 additions & 13 deletions crates/subspace-farmer-components/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use async_trait::async_trait;
use parity_scale_codec::{Decode, Encode};
use serde::{Deserialize, Serialize};
use static_assertions::const_assert;
use std::error::Error;
use std::fs::File;
use std::future::Future;
use std::io;
Expand All @@ -40,31 +39,22 @@ use subspace_core_primitives::segments::{ArchivedHistorySegment, HistorySize};
#[async_trait]
pub trait PieceGetter {
/// Get piece by index
async fn get_piece(
&self,
piece_index: PieceIndex,
) -> Result<Option<Piece>, Box<dyn Error + Send + Sync + 'static>>;
async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>>;
}

#[async_trait]
impl<T> PieceGetter for Arc<T>
where
T: PieceGetter + Send + Sync,
{
async fn get_piece(
&self,
piece_index: PieceIndex,
) -> Result<Option<Piece>, Box<dyn Error + Send + Sync + 'static>> {
async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
self.as_ref().get_piece(piece_index).await
}
}

#[async_trait]
impl PieceGetter for ArchivedHistorySegment {
async fn get_piece(
&self,
piece_index: PieceIndex,
) -> Result<Option<Piece>, Box<dyn Error + Send + Sync + 'static>> {
async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
let position = usize::try_from(u64::from(piece_index))?;

Ok(self.pieces().nth(position))
Expand Down
19 changes: 9 additions & 10 deletions crates/subspace-farmer-components/src/plotting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ pub enum PlottingError {
#[error("Records encoder error: {error}")]
RecordsEncoderError {
/// Lower-level error
error: Box<dyn std::error::Error + Send + Sync + 'static>,
error: anyhow::Error,
},
/// Bad sector output size
#[error("Bad sector output size: provided {provided}, expected {expected}")]
Expand Down Expand Up @@ -97,7 +97,7 @@ pub enum PlottingError {
/// Piece index
piece_index: PieceIndex,
/// Lower-level error
error: Box<dyn std::error::Error + Send + Sync + 'static>,
error: anyhow::Error,
},
/// Failed to acquire permit
#[error("Failed to acquire permit: {error}")]
Expand Down Expand Up @@ -338,7 +338,7 @@ pub trait RecordsEncoder {
sector_id: &SectorId,
records: &mut [Record],
abort_early: &AtomicBool,
) -> Result<SectorContentsMap, Box<dyn std::error::Error + Send + Sync + 'static>>;
) -> anyhow::Result<SectorContentsMap>;
}

/// CPU implementation of [`RecordsEncoder`]
Expand All @@ -361,24 +361,23 @@ where
sector_id: &SectorId,
records: &mut [Record],
abort_early: &AtomicBool,
) -> Result<SectorContentsMap, Box<dyn std::error::Error + Send + Sync + 'static>> {
) -> anyhow::Result<SectorContentsMap> {
if self.erasure_coding.max_shards() < Record::NUM_S_BUCKETS {
return Err(format!(
return Err(anyhow::anyhow!(
"Invalid erasure coding instance: {} shards needed, {} supported",
Record::NUM_S_BUCKETS,
self.erasure_coding.max_shards()
)
.into());
));
}

if self.table_generators.is_empty() {
return Err("No table generators".into());
return Err(anyhow::anyhow!("No table generators"));
}

let pieces_in_sector = records
.len()
.try_into()
.map_err(|error| format!("Failed to convert pieces in sector: {error}"))?;
.map_err(|error| anyhow::anyhow!("Failed to convert pieces in sector: {error}"))?;
let mut sector_contents_map = SectorContentsMap::new(pieces_in_sector);

{
Expand Down Expand Up @@ -751,7 +750,7 @@ async fn download_sector_internal<PG: PieceGetter>(
let _permit = match recovery_semaphore.acquire().await {
Ok(permit) => permit,
Err(error) => {
let error = format!("Recovery semaphore was closed: {error}").into();
let error = anyhow::anyhow!("Recovery semaphore was closed: {error}");
return Err(PlottingError::FailedToRetrievePiece { piece_index, error });
}
};
Expand Down
30 changes: 13 additions & 17 deletions crates/subspace-farmer/src/cluster/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,14 @@ use crate::cluster::nats_client::{
};
use crate::farm::{PieceCacheId, PieceCacheOffset};
use crate::farmer_cache::FarmerCache;
use crate::node_client::{Error as NodeClientError, NodeClient};
use crate::node_client::NodeClient;
use anyhow::anyhow;
use async_lock::Semaphore;
use async_nats::HeaderValue;
use async_trait::async_trait;
use futures::{select, FutureExt, Stream, StreamExt};
use parity_scale_codec::{Decode, Encode};
use parking_lot::Mutex;
use std::error::Error;
use std::num::NonZeroUsize;
use std::pin::Pin;
use std::sync::Arc;
Expand Down Expand Up @@ -171,10 +170,7 @@ pub struct ClusterPieceGetter {

#[async_trait]
impl PieceGetter for ClusterPieceGetter {
async fn get_piece(
&self,
piece_index: PieceIndex,
) -> Result<Option<Piece>, Box<dyn Error + Send + Sync + 'static>> {
async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
let _guard = self.request_semaphore.acquire().await;

if let Some((piece_cache_id, piece_cache_offset)) = self
Expand Down Expand Up @@ -286,16 +282,17 @@ impl ClusterNodeClient {

#[async_trait]
impl NodeClient for ClusterNodeClient {
async fn farmer_app_info(&self) -> Result<FarmerAppInfo, NodeClientError> {
async fn farmer_app_info(&self) -> anyhow::Result<FarmerAppInfo> {
Ok(self
.nats_client
.request(&ClusterControllerFarmerAppInfoRequest, None)
.await??)
.await?
.map_err(anyhow::Error::msg)?)
}

async fn subscribe_slot_info(
&self,
) -> Result<Pin<Box<dyn Stream<Item = SlotInfo> + Send + 'static>>, NodeClientError> {
) -> anyhow::Result<Pin<Box<dyn Stream<Item = SlotInfo> + Send + 'static>>> {
let subscription = self
.nats_client
.subscribe_to_broadcasts::<ClusterControllerSlotInfoBroadcast>(None, None)
Expand Down Expand Up @@ -328,7 +325,7 @@ impl NodeClient for ClusterNodeClient {
async fn submit_solution_response(
&self,
solution_response: SolutionResponse,
) -> Result<(), NodeClientError> {
) -> anyhow::Result<()> {
let last_slot_info_instance = self.last_slot_info_instance.lock().clone();
Ok(self
.nats_client
Expand All @@ -341,8 +338,7 @@ impl NodeClient for ClusterNodeClient {

async fn subscribe_reward_signing(
&self,
) -> Result<Pin<Box<dyn Stream<Item = RewardSigningInfo> + Send + 'static>>, NodeClientError>
{
) -> anyhow::Result<Pin<Box<dyn Stream<Item = RewardSigningInfo> + Send + 'static>>> {
let subscription = self
.nats_client
.subscribe_to_broadcasts::<ClusterControllerRewardSigningBroadcast>(None, None)
Expand All @@ -356,7 +352,7 @@ impl NodeClient for ClusterNodeClient {
async fn submit_reward_signature(
&self,
reward_signature: RewardSignatureResponse,
) -> Result<(), NodeClientError> {
) -> anyhow::Result<()> {
Ok(self
.nats_client
.notification(
Expand All @@ -368,7 +364,7 @@ impl NodeClient for ClusterNodeClient {

async fn subscribe_archived_segment_headers(
&self,
) -> Result<Pin<Box<dyn Stream<Item = SegmentHeader> + Send + 'static>>, NodeClientError> {
) -> anyhow::Result<Pin<Box<dyn Stream<Item = SegmentHeader> + Send + 'static>>> {
let subscription = self
.nats_client
.subscribe_to_broadcasts::<ClusterControllerArchivedSegmentHeaderBroadcast>(None, None)
Expand Down Expand Up @@ -401,7 +397,7 @@ impl NodeClient for ClusterNodeClient {
async fn segment_headers(
&self,
segment_indices: Vec<SegmentIndex>,
) -> Result<Vec<Option<SegmentHeader>>, NodeClientError> {
) -> anyhow::Result<Vec<Option<SegmentHeader>>> {
Ok(self
.nats_client
.request(
Expand All @@ -411,7 +407,7 @@ impl NodeClient for ClusterNodeClient {
.await?)
}

async fn piece(&self, piece_index: PieceIndex) -> Result<Option<Piece>, NodeClientError> {
async fn piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
Ok(self
.nats_client
.request(&ClusterControllerPieceRequest { piece_index }, None)
Expand All @@ -421,7 +417,7 @@ impl NodeClient for ClusterNodeClient {
async fn acknowledge_archived_segment_header(
&self,
_segment_index: SegmentIndex,
) -> Result<(), NodeClientError> {
) -> anyhow::Result<()> {
// Acknowledgement is unnecessary/unsupported
Ok(())
}
Expand Down
5 changes: 2 additions & 3 deletions crates/subspace-farmer/src/farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
//! way). This crate provides a few of such implementations, but more can be created externally as
//! well if needed without modifying the library itself.

use crate::node_client;
use async_trait::async_trait;
use derive_more::{Display, From};
use futures::Stream;
Expand Down Expand Up @@ -278,13 +277,13 @@ pub enum FarmingError {
#[error("Failed to subscribe to slot info notifications: {error}")]
FailedToSubscribeSlotInfo {
/// Lower-level error
error: node_client::Error,
error: anyhow::Error,
},
/// Failed to retrieve farmer info
#[error("Failed to retrieve farmer info: {error}")]
FailedToGetFarmerInfo {
/// Lower-level error
error: node_client::Error,
error: anyhow::Error,
},
/// Slot info notification stream ended
#[error("Slot info notification stream ended")]
Expand Down
25 changes: 11 additions & 14 deletions crates/subspace-farmer/src/farmer_cache/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

use crate::disk_piece_cache::DiskPieceCache;
use crate::farmer_cache::{decode_piece_index_from_record_key, FarmerCache};
use crate::node_client::{Error, NodeClient};
use crate::node_client::NodeClient;
use async_trait::async_trait;
use futures::channel::{mpsc, oneshot};
use futures::{SinkExt, Stream, StreamExt};
Expand Down Expand Up @@ -44,7 +44,7 @@ struct MockNodeClient {

#[async_trait]
impl NodeClient for MockNodeClient {
async fn farmer_app_info(&self) -> Result<FarmerAppInfo, Error> {
async fn farmer_app_info(&self) -> anyhow::Result<FarmerAppInfo> {
// Most of these values make no sense, but they are not used by piece cache anyway
Ok(FarmerAppInfo {
genesis_hash: [0; 32],
Expand All @@ -68,33 +68,33 @@ impl NodeClient for MockNodeClient {

async fn subscribe_slot_info(
&self,
) -> Result<Pin<Box<dyn Stream<Item = SlotInfo> + Send + 'static>>, Error> {
) -> anyhow::Result<Pin<Box<dyn Stream<Item = SlotInfo> + Send + 'static>>> {
unimplemented!()
}

async fn submit_solution_response(
&self,
_solution_response: SolutionResponse,
) -> Result<(), Error> {
) -> anyhow::Result<()> {
unimplemented!()
}

async fn subscribe_reward_signing(
&self,
) -> Result<Pin<Box<dyn Stream<Item = RewardSigningInfo> + Send + 'static>>, Error> {
) -> anyhow::Result<Pin<Box<dyn Stream<Item = RewardSigningInfo> + Send + 'static>>> {
unimplemented!()
}

async fn submit_reward_signature(
&self,
_reward_signature: RewardSignatureResponse,
) -> Result<(), Error> {
) -> anyhow::Result<()> {
unimplemented!()
}

async fn subscribe_archived_segment_headers(
&self,
) -> Result<Pin<Box<dyn Stream<Item = SegmentHeader> + Send + 'static>>, Error> {
) -> anyhow::Result<Pin<Box<dyn Stream<Item = SegmentHeader> + Send + 'static>>> {
let (tx, rx) = oneshot::channel();
self.archived_segment_headers_stream_request_sender
.clone()
Expand All @@ -109,11 +109,11 @@ impl NodeClient for MockNodeClient {
async fn segment_headers(
&self,
_segment_indexes: Vec<SegmentIndex>,
) -> Result<Vec<Option<SegmentHeader>>, Error> {
) -> anyhow::Result<Vec<Option<SegmentHeader>>> {
unimplemented!()
}

async fn piece(&self, piece_index: PieceIndex) -> Result<Option<Piece>, Error> {
async fn piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
Ok(Some(
self.pieces
.lock()
Expand All @@ -130,7 +130,7 @@ impl NodeClient for MockNodeClient {
async fn acknowledge_archived_segment_header(
&self,
segment_index: SegmentIndex,
) -> Result<(), Error> {
) -> anyhow::Result<()> {
self.acknowledge_archived_segment_header_sender
.clone()
.send(segment_index)
Expand All @@ -147,10 +147,7 @@ struct MockPieceGetter {

#[async_trait]
impl PieceGetter for MockPieceGetter {
async fn get_piece(
&self,
piece_index: PieceIndex,
) -> Result<Option<Piece>, Box<dyn std::error::Error + Send + Sync + 'static>> {
async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
Ok(Some(
self.pieces
.lock()
Expand Down
11 changes: 2 additions & 9 deletions crates/subspace-farmer/src/farmer_piece_getter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ use backoff::future::retry;
use backoff::ExponentialBackoff;
use parking_lot::Mutex;
use std::collections::HashMap;
use std::error::Error;
use std::fmt;
use std::hash::Hash;
use std::num::NonZeroUsize;
Expand Down Expand Up @@ -383,10 +382,7 @@ where
PV: PieceValidator + Send + 'static,
NC: NodeClient,
{
async fn get_piece(
&self,
piece_index: PieceIndex,
) -> Result<Option<Piece>, Box<dyn Error + Send + Sync + 'static>> {
async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
let _guard = self.inner.request_semaphore.acquire().await;

match InProgressPiece::new(piece_index, &self.inner.in_progress_pieces) {
Expand Down Expand Up @@ -453,10 +449,7 @@ where
PV: PieceValidator + Send + 'static,
NC: NodeClient,
{
async fn get_piece(
&self,
piece_index: PieceIndex,
) -> Result<Option<Piece>, Box<dyn Error + Send + Sync + 'static>> {
async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
let Some(piece_getter) = self.upgrade() else {
debug!("Farmer piece getter upgrade didn't succeed");
return Ok(None);
Expand Down
Loading

0 comments on commit 6e2e4b1

Please sign in to comment.