Skip to content

Commit

Permalink
Merge pull request #2734 from subspace/optimize-idle-farmer-memory-usage
Browse files Browse the repository at this point in the history
Optimize idle farmer memory usage
  • Loading branch information
nazar-pc authored May 3, 2024
2 parents 5b2d897 + 0edfe9c commit 83f86cd
Showing 1 changed file with 11 additions and 29 deletions.
40 changes: 11 additions & 29 deletions crates/subspace-farmer/src/plotter/cpu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ use event_listener_primitives::{Bag, HandlerId};
use futures::channel::mpsc;
use futures::stream::FuturesUnordered;
use futures::{select, FutureExt, Sink, SinkExt, StreamExt};
use parking_lot::Mutex;
use std::error::Error;
use std::future::pending;
use std::marker::PhantomData;
use std::num::NonZeroUsize;
use std::pin::pin;
use std::sync::atomic::{AtomicBool, Ordering};
Expand All @@ -36,29 +36,22 @@ struct Handlers {
}

/// CPU plotter
pub struct CpuPlotter<PG, PosTable>
where
PosTable: Table,
{
pub struct CpuPlotter<PG, PosTable> {
piece_getter: PG,
downloading_semaphore: Arc<Semaphore>,
// TODO: It is ugly that thread pool manager and table generators are both having independent
// mutexes, they should be combined
plotting_thread_pool_manager: PlottingThreadPoolManager,
table_generators: Arc<Mutex<Vec<Vec<<PosTable as Table>::Generator>>>>,
record_encoding_concurrency: NonZeroUsize,
global_mutex: Arc<AsyncMutex<()>>,
kzg: Kzg,
erasure_coding: ErasureCoding,
handlers: Arc<Handlers>,
tasks_sender: mpsc::Sender<AsyncJoinOnDrop<()>>,
_background_tasks: AsyncJoinOnDrop<()>,
abort_early: Arc<AtomicBool>,
_phantom: PhantomData<PosTable>,
}

impl<PG, PosTable> Drop for CpuPlotter<PG, PosTable>
where
PosTable: Table,
{
impl<PG, PosTable> Drop for CpuPlotter<PG, PosTable> {
fn drop(&mut self) {
self.abort_early.store(true, Ordering::Release);
self.tasks_sender.close_channel();
Expand Down Expand Up @@ -117,7 +110,7 @@ where
let plotting_fut = {
let piece_getter = self.piece_getter.clone();
let plotting_thread_pool_manager = self.plotting_thread_pool_manager.clone();
let table_generators = Arc::clone(&self.table_generators);
let record_encoding_concurrency = self.record_encoding_concurrency;
let global_mutex = Arc::clone(&self.global_mutex);
let kzg = self.kzg.clone();
let erasure_coding = self.erasure_coding.clone();
Expand Down Expand Up @@ -185,10 +178,6 @@ where
let (sector, sector_metadata, plotted_sector) = {
let thread_pools = plotting_thread_pool_manager.get_thread_pools().await;

let mut local_table_generators = table_generators.lock().pop().expect(
"Number of table generators is the same as number of thread pools; qed",
);

let plotting_fn = || {
tokio::task::block_in_place(|| {
let mut sector = Vec::new();
Expand All @@ -202,7 +191,9 @@ where
pieces_in_sector,
sector_output: &mut sector,
sector_metadata_output: &mut sector_metadata,
table_generators: &mut local_table_generators,
table_generators: &mut (0..record_encoding_concurrency.get())
.map(|_| PosTable::generator())
.collect::<Vec<_>>(),
abort_early: &abort_early,
global_mutex: &global_mutex,
},
Expand Down Expand Up @@ -235,8 +226,6 @@ where

let plotting_result = thread_pool.install(plotting_fn);

table_generators.lock().push(local_table_generators);

match plotting_result {
Ok(plotting_result) => {
if !progress_updater
Expand Down Expand Up @@ -316,14 +305,6 @@ where
kzg: Kzg,
erasure_coding: ErasureCoding,
) -> Self {
let table_generators = (0..plotting_thread_pool_manager.thread_pool_pairs().get())
.map(|_| {
(0..record_encoding_concurrency.get())
.map(|_| PosTable::generator())
.collect::<Vec<_>>()
})
.collect::<Vec<_>>();

let (tasks_sender, mut tasks_receiver) = mpsc::channel(1);

// Basically runs plotting tasks in the background and allows to abort on drop
Expand Down Expand Up @@ -358,14 +339,15 @@ where
piece_getter,
downloading_semaphore,
plotting_thread_pool_manager,
table_generators: Arc::new(Mutex::new(table_generators)),
record_encoding_concurrency,
global_mutex,
kzg,
erasure_coding,
handlers: Arc::default(),
tasks_sender,
_background_tasks: background_tasks,
abort_early,
_phantom: PhantomData,
}
}

Expand Down

0 comments on commit 83f86cd

Please sign in to comment.