diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller/farms.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller/farms.rs index a85c43b384..76d5d7050f 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller/farms.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/controller/farms.rs @@ -28,7 +28,7 @@ use tokio::time::MissedTickBehavior; use tracing::{error, info, trace, warn}; type AddRemoveFuture<'a> = - Pin, Box)>> + 'a>>; + Pin, ClusterFarm)>> + 'a>>; pub(super) type FarmIndex = u16; @@ -319,7 +319,7 @@ fn process_farm_identify_message<'a>( ); } - Some((farm_index, expired_receiver, Box::new(farm) as Box<_>)) + Some((farm_index, expired_receiver, farm)) } Err(error) => { warn!( diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs index ce07e9d06b..bc271e6cd0 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs @@ -23,7 +23,8 @@ use subspace_core_primitives::crypto::kzg::{embedded_kzg_settings, Kzg}; use subspace_core_primitives::{PublicKey, Record}; use subspace_erasure_coding::ErasureCoding; use subspace_farmer::farm::{ - Farm, FarmingNotification, SectorExpirationDetails, SectorPlottingDetails, SectorUpdate, + FarmingNotification, PlottedSectors, SectorExpirationDetails, SectorPlottingDetails, + SectorUpdate, }; use subspace_farmer::farmer_cache::FarmerCache; use subspace_farmer::node_client::node_rpc_client::NodeRpcClient; @@ -606,7 +607,7 @@ where info!(" Directory: {}", disk_farm.directory.display()); } - (farm_index, Ok(Box::new(farm) as Box)) + (farm_index, Ok(farm)) } .instrument(info_span!("", %farm_index)) }) @@ -656,9 +657,15 @@ where } farmer_cache .replace_backing_caches( - farms.iter().map(|farm| farm.piece_cache()).collect(), + farms + .iter() + .map(|farm| Arc::new(farm.piece_cache()) as Arc<_>) + .collect(), if plot_cache { - farms.iter().map(|farm| farm.plot_cache()).collect() + farms + .iter() + .map(|farm| Arc::new(farm.plot_cache()) as Arc<_>) + .collect() } else { Vec::new() }, @@ -680,7 +687,7 @@ where ) })?; - plotted_pieces.add_farm(farm_index, farm.piece_reader()); + plotted_pieces.add_farm(farm_index, Arc::new(farm.piece_reader())); let total_sectors_count = farm.total_sectors_count(); let mut plotted_sectors_count = 0; diff --git a/crates/subspace-farmer/src/cluster/farmer.rs b/crates/subspace-farmer/src/cluster/farmer.rs index 6bf51ab333..fb2bd91adf 100644 --- a/crates/subspace-farmer/src/cluster/farmer.rs +++ b/crates/subspace-farmer/src/cluster/farmer.rs @@ -12,8 +12,8 @@ use crate::cluster::nats_client::{ GenericBroadcast, GenericRequest, GenericStreamRequest, NatsClient, StreamRequest, }; use crate::farm::{ - Farm, FarmError, FarmId, FarmingNotification, HandlerFn, HandlerId, MaybePieceStoredResult, - PieceCache, PieceCacheOffset, PieceReader, PlotCache, PlottedSectors, SectorUpdate, + Farm, FarmError, FarmId, FarmingNotification, HandlerFn, HandlerId, PieceReader, + PlottedSectors, SectorUpdate, }; use crate::utils::AsyncJoinOnDrop; use anyhow::anyhow; @@ -29,9 +29,8 @@ use std::pin::{pin, Pin}; use std::sync::Arc; use std::time::{Duration, Instant}; use subspace_core_primitives::crypto::blake3_hash_list; -use subspace_core_primitives::{Blake3Hash, Piece, PieceIndex, PieceOffset, SectorIndex}; +use subspace_core_primitives::{Blake3Hash, Piece, PieceOffset, SectorIndex}; use subspace_farmer_components::plotting::PlottedSector; -use subspace_networking::libp2p::kad::RecordKey; use subspace_rpc_primitives::SolutionResponse; use tokio::time::MissedTickBehavior; use tracing::{debug, error, trace, warn}; @@ -144,80 +143,6 @@ impl PlottedSectors for ClusterPlottedSectors { } } -#[derive(Debug)] -struct DummyPieceCache; - -#[async_trait] -impl PieceCache for DummyPieceCache { - #[inline] - fn max_num_elements(&self) -> u32 { - 0 - } - - #[inline] - async fn contents( - &self, - ) -> Result< - Box< - dyn Stream), FarmError>> - + Unpin - + Send - + '_, - >, - FarmError, - > { - Ok(Box::new(stream::empty())) - } - - #[inline] - async fn write_piece( - &self, - _offset: PieceCacheOffset, - _piece_index: PieceIndex, - _piece: &Piece, - ) -> Result<(), FarmError> { - Err("Can't write pieces into empty cache".into()) - } - - #[inline] - async fn read_piece_index( - &self, - _offset: PieceCacheOffset, - ) -> Result, FarmError> { - Ok(None) - } - - #[inline] - async fn read_piece(&self, _offset: PieceCacheOffset) -> Result, FarmError> { - Ok(None) - } -} - -#[derive(Debug)] -struct DummyPlotCache; - -#[async_trait] -impl PlotCache for DummyPlotCache { - async fn is_piece_maybe_stored( - &self, - _key: &RecordKey, - ) -> Result { - Ok(MaybePieceStoredResult::No) - } - - async fn try_store_piece( - &self, - _piece_index: PieceIndex, - _piece: &Piece, - ) -> Result { - Ok(false) - } - - async fn read_piece(&self, _key: &RecordKey) -> Result, FarmError> { - Ok(None) - } -} - #[derive(Debug)] struct ClusterPieceReader { farm_id_string: String, @@ -279,14 +204,6 @@ impl Farm for ClusterFarm { }) } - fn piece_cache(&self) -> Arc { - Arc::new(DummyPieceCache) - } - - fn plot_cache(&self) -> Arc { - Arc::new(DummyPlotCache) - } - fn piece_reader(&self) -> Arc { Arc::new(ClusterPieceReader { farm_id_string: self.farm_id_string.clone(), @@ -313,7 +230,7 @@ impl Farm for ClusterFarm { } fn run(self: Box) -> Pin> + Send>> { - Box::pin(async move { Ok(self.background_tasks.await?) }) + Box::pin((*self).run()) } } @@ -410,6 +327,11 @@ impl ClusterFarm { background_tasks: AsyncJoinOnDrop::new(tokio::spawn(background_tasks), true), }) } + + /// Run and wait for background tasks to exit or return an error + pub async fn run(self) -> anyhow::Result<()> { + Ok(self.background_tasks.await?) + } } #[derive(Debug)] diff --git a/crates/subspace-farmer/src/farm.rs b/crates/subspace-farmer/src/farm.rs index 399c7f11d5..fe81670bd0 100644 --- a/crates/subspace-farmer/src/farm.rs +++ b/crates/subspace-farmer/src/farm.rs @@ -425,12 +425,6 @@ pub trait Farm { /// Get plotted sectors instance fn plotted_sectors(&self) -> Arc; - /// Get piece cache instance - fn piece_cache(&self) -> Arc; - - /// Get plot cache instance - fn plot_cache(&self) -> Arc; - /// Get piece reader to read plotted pieces later fn piece_reader(&self) -> Arc; @@ -473,16 +467,6 @@ where self.as_ref().plotted_sectors() } - #[inline] - fn piece_cache(&self) -> Arc { - self.as_ref().piece_cache() - } - - #[inline] - fn plot_cache(&self) -> Arc { - self.as_ref().plot_cache() - } - #[inline] fn piece_reader(&self) -> Arc { self.as_ref().piece_reader() diff --git a/crates/subspace-farmer/src/single_disk_farm.rs b/crates/subspace-farmer/src/single_disk_farm.rs index ae85468848..988605144a 100644 --- a/crates/subspace-farmer/src/single_disk_farm.rs +++ b/crates/subspace-farmer/src/single_disk_farm.rs @@ -6,7 +6,7 @@ mod plotted_sectors; mod plotting; pub mod unbuffered_io_file_windows; -use crate::farm::{Farm, FarmId, HandlerFn, PieceReader, PlotCache, PlottedSectors, SectorUpdate}; +use crate::farm::{Farm, FarmId, HandlerFn, PieceReader, PlottedSectors, SectorUpdate}; pub use crate::farm::{FarmingError, FarmingNotification}; use crate::identity::{Identity, IdentityError}; use crate::node_client::NodeClient; @@ -626,14 +626,6 @@ impl Farm for SingleDiskFarm { Arc::new(self.plotted_sectors()) } - fn piece_cache(&self) -> Arc { - Arc::new(self.piece_cache()) - } - - fn plot_cache(&self) -> Arc { - Arc::new(self.plot_cache()) - } - fn piece_reader(&self) -> Arc { Arc::new(self.piece_reader()) }