Skip to content

Commit

Permalink
Remove Farm::piece_cache and Farm::plot_cache since those are not…
Browse files Browse the repository at this point in the history
… strictly necessary to be there
  • Loading branch information
nazar-pc committed May 22, 2024
1 parent 9741144 commit c3df829
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 119 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use tokio::time::MissedTickBehavior;
use tracing::{error, info, trace, warn};

type AddRemoveFuture<'a> =
Pin<Box<dyn Future<Output = Option<(FarmIndex, oneshot::Receiver<()>, Box<dyn Farm>)>> + 'a>>;
Pin<Box<dyn Future<Output = Option<(FarmIndex, oneshot::Receiver<()>, ClusterFarm)>> + 'a>>;

pub(super) type FarmIndex = u16;

Expand Down Expand Up @@ -319,7 +319,7 @@ fn process_farm_identify_message<'a>(
);
}

Some((farm_index, expired_receiver, Box::new(farm) as Box<_>))
Some((farm_index, expired_receiver, farm))
}
Err(error) => {
warn!(
Expand Down
17 changes: 12 additions & 5 deletions crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ use subspace_core_primitives::crypto::kzg::{embedded_kzg_settings, Kzg};
use subspace_core_primitives::{PublicKey, Record};
use subspace_erasure_coding::ErasureCoding;
use subspace_farmer::farm::{
Farm, FarmingNotification, SectorExpirationDetails, SectorPlottingDetails, SectorUpdate,
FarmingNotification, PlottedSectors, SectorExpirationDetails, SectorPlottingDetails,
SectorUpdate,
};
use subspace_farmer::farmer_cache::FarmerCache;
use subspace_farmer::node_client::node_rpc_client::NodeRpcClient;
Expand Down Expand Up @@ -606,7 +607,7 @@ where
info!(" Directory: {}", disk_farm.directory.display());
}

(farm_index, Ok(Box::new(farm) as Box<dyn Farm>))
(farm_index, Ok(farm))
}
.instrument(info_span!("", %farm_index))
})
Expand Down Expand Up @@ -656,9 +657,15 @@ where
}
farmer_cache
.replace_backing_caches(
farms.iter().map(|farm| farm.piece_cache()).collect(),
farms
.iter()
.map(|farm| Arc::new(farm.piece_cache()) as Arc<_>)
.collect(),
if plot_cache {
farms.iter().map(|farm| farm.plot_cache()).collect()
farms
.iter()
.map(|farm| Arc::new(farm.plot_cache()) as Arc<_>)
.collect()
} else {
Vec::new()
},
Expand All @@ -680,7 +687,7 @@ where
)
})?;

plotted_pieces.add_farm(farm_index, farm.piece_reader());
plotted_pieces.add_farm(farm_index, Arc::new(farm.piece_reader()));

let total_sectors_count = farm.total_sectors_count();
let mut plotted_sectors_count = 0;
Expand Down
96 changes: 9 additions & 87 deletions crates/subspace-farmer/src/cluster/farmer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ use crate::cluster::nats_client::{
GenericBroadcast, GenericRequest, GenericStreamRequest, NatsClient, StreamRequest,
};
use crate::farm::{
Farm, FarmError, FarmId, FarmingNotification, HandlerFn, HandlerId, MaybePieceStoredResult,
PieceCache, PieceCacheOffset, PieceReader, PlotCache, PlottedSectors, SectorUpdate,
Farm, FarmError, FarmId, FarmingNotification, HandlerFn, HandlerId, PieceReader,
PlottedSectors, SectorUpdate,
};
use crate::utils::AsyncJoinOnDrop;
use anyhow::anyhow;
Expand All @@ -29,9 +29,8 @@ use std::pin::{pin, Pin};
use std::sync::Arc;
use std::time::{Duration, Instant};
use subspace_core_primitives::crypto::blake3_hash_list;
use subspace_core_primitives::{Blake3Hash, Piece, PieceIndex, PieceOffset, SectorIndex};
use subspace_core_primitives::{Blake3Hash, Piece, PieceOffset, SectorIndex};
use subspace_farmer_components::plotting::PlottedSector;
use subspace_networking::libp2p::kad::RecordKey;
use subspace_rpc_primitives::SolutionResponse;
use tokio::time::MissedTickBehavior;
use tracing::{debug, error, trace, warn};
Expand Down Expand Up @@ -144,80 +143,6 @@ impl PlottedSectors for ClusterPlottedSectors {
}
}

#[derive(Debug)]
struct DummyPieceCache;

#[async_trait]
impl PieceCache for DummyPieceCache {
#[inline]
fn max_num_elements(&self) -> u32 {
0
}

#[inline]
async fn contents(
&self,
) -> Result<
Box<
dyn Stream<Item = Result<(PieceCacheOffset, Option<PieceIndex>), FarmError>>
+ Unpin
+ Send
+ '_,
>,
FarmError,
> {
Ok(Box::new(stream::empty()))
}

#[inline]
async fn write_piece(
&self,
_offset: PieceCacheOffset,
_piece_index: PieceIndex,
_piece: &Piece,
) -> Result<(), FarmError> {
Err("Can't write pieces into empty cache".into())
}

#[inline]
async fn read_piece_index(
&self,
_offset: PieceCacheOffset,
) -> Result<Option<PieceIndex>, FarmError> {
Ok(None)
}

#[inline]
async fn read_piece(&self, _offset: PieceCacheOffset) -> Result<Option<Piece>, FarmError> {
Ok(None)
}
}

#[derive(Debug)]
struct DummyPlotCache;

#[async_trait]
impl PlotCache for DummyPlotCache {
async fn is_piece_maybe_stored(
&self,
_key: &RecordKey,
) -> Result<MaybePieceStoredResult, FarmError> {
Ok(MaybePieceStoredResult::No)
}

async fn try_store_piece(
&self,
_piece_index: PieceIndex,
_piece: &Piece,
) -> Result<bool, FarmError> {
Ok(false)
}

async fn read_piece(&self, _key: &RecordKey) -> Result<Option<Piece>, FarmError> {
Ok(None)
}
}

#[derive(Debug)]
struct ClusterPieceReader {
farm_id_string: String,
Expand Down Expand Up @@ -279,14 +204,6 @@ impl Farm for ClusterFarm {
})
}

fn piece_cache(&self) -> Arc<dyn PieceCache + 'static> {
Arc::new(DummyPieceCache)
}

fn plot_cache(&self) -> Arc<dyn PlotCache + 'static> {
Arc::new(DummyPlotCache)
}

fn piece_reader(&self) -> Arc<dyn PieceReader + 'static> {
Arc::new(ClusterPieceReader {
farm_id_string: self.farm_id_string.clone(),
Expand All @@ -313,7 +230,7 @@ impl Farm for ClusterFarm {
}

fn run(self: Box<Self>) -> Pin<Box<dyn Future<Output = anyhow::Result<()>> + Send>> {
Box::pin(async move { Ok(self.background_tasks.await?) })
Box::pin((*self).run())
}
}

Expand Down Expand Up @@ -410,6 +327,11 @@ impl ClusterFarm {
background_tasks: AsyncJoinOnDrop::new(tokio::spawn(background_tasks), true),
})
}

/// Run and wait for background tasks to exit or return an error
pub async fn run(self) -> anyhow::Result<()> {
Ok(self.background_tasks.await?)
}
}

#[derive(Debug)]
Expand Down
16 changes: 0 additions & 16 deletions crates/subspace-farmer/src/farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,12 +425,6 @@ pub trait Farm {
/// Get plotted sectors instance
fn plotted_sectors(&self) -> Arc<dyn PlottedSectors + 'static>;

/// Get piece cache instance
fn piece_cache(&self) -> Arc<dyn PieceCache + 'static>;

/// Get plot cache instance
fn plot_cache(&self) -> Arc<dyn PlotCache + 'static>;

/// Get piece reader to read plotted pieces later
fn piece_reader(&self) -> Arc<dyn PieceReader + 'static>;

Expand Down Expand Up @@ -473,16 +467,6 @@ where
self.as_ref().plotted_sectors()
}

#[inline]
fn piece_cache(&self) -> Arc<dyn PieceCache + 'static> {
self.as_ref().piece_cache()
}

#[inline]
fn plot_cache(&self) -> Arc<dyn PlotCache + 'static> {
self.as_ref().plot_cache()
}

#[inline]
fn piece_reader(&self) -> Arc<dyn PieceReader + 'static> {
self.as_ref().piece_reader()
Expand Down
10 changes: 1 addition & 9 deletions crates/subspace-farmer/src/single_disk_farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ mod plotted_sectors;
mod plotting;
pub mod unbuffered_io_file_windows;

use crate::farm::{Farm, FarmId, HandlerFn, PieceReader, PlotCache, PlottedSectors, SectorUpdate};
use crate::farm::{Farm, FarmId, HandlerFn, PieceReader, PlottedSectors, SectorUpdate};
pub use crate::farm::{FarmingError, FarmingNotification};
use crate::identity::{Identity, IdentityError};
use crate::node_client::NodeClient;
Expand Down Expand Up @@ -626,14 +626,6 @@ impl Farm for SingleDiskFarm {
Arc::new(self.plotted_sectors())
}

fn piece_cache(&self) -> Arc<dyn farm::PieceCache + 'static> {
Arc::new(self.piece_cache())
}

fn plot_cache(&self) -> Arc<dyn PlotCache + 'static> {
Arc::new(self.plot_cache())
}

fn piece_reader(&self) -> Arc<dyn PieceReader + 'static> {
Arc::new(self.piece_reader())
}
Expand Down

0 comments on commit c3df829

Please sign in to comment.