diff --git a/crates/subspace-farmer/src/plotter/cpu.rs b/crates/subspace-farmer/src/plotter/cpu.rs index 6f60aa7e70..c9689127ae 100644 --- a/crates/subspace-farmer/src/plotter/cpu.rs +++ b/crates/subspace-farmer/src/plotter/cpu.rs @@ -7,9 +7,9 @@ use event_listener_primitives::{Bag, HandlerId}; use futures::channel::mpsc; use futures::stream::FuturesUnordered; use futures::{select, FutureExt, Sink, SinkExt, StreamExt}; -use parking_lot::Mutex; use std::error::Error; use std::future::pending; +use std::marker::PhantomData; use std::num::NonZeroUsize; use std::pin::pin; use std::sync::atomic::{AtomicBool, Ordering}; @@ -36,16 +36,11 @@ struct Handlers { } /// CPU plotter -pub struct CpuPlotter -where - PosTable: Table, -{ +pub struct CpuPlotter { piece_getter: PG, downloading_semaphore: Arc, - // TODO: It is ugly that thread pool manager and table generators are both having independent - // mutexes, they should be combined plotting_thread_pool_manager: PlottingThreadPoolManager, - table_generators: Arc::Generator>>>>, + record_encoding_concurrency: NonZeroUsize, global_mutex: Arc>, kzg: Kzg, erasure_coding: ErasureCoding, @@ -53,12 +48,10 @@ where tasks_sender: mpsc::Sender>, _background_tasks: AsyncJoinOnDrop<()>, abort_early: Arc, + _phantom: PhantomData, } -impl Drop for CpuPlotter -where - PosTable: Table, -{ +impl Drop for CpuPlotter { fn drop(&mut self) { self.abort_early.store(true, Ordering::Release); self.tasks_sender.close_channel(); @@ -117,7 +110,7 @@ where let plotting_fut = { let piece_getter = self.piece_getter.clone(); let plotting_thread_pool_manager = self.plotting_thread_pool_manager.clone(); - let table_generators = Arc::clone(&self.table_generators); + let record_encoding_concurrency = self.record_encoding_concurrency; let global_mutex = Arc::clone(&self.global_mutex); let kzg = self.kzg.clone(); let erasure_coding = self.erasure_coding.clone(); @@ -185,10 +178,6 @@ where let (sector, sector_metadata, plotted_sector) = { let thread_pools = plotting_thread_pool_manager.get_thread_pools().await; - let mut local_table_generators = table_generators.lock().pop().expect( - "Number of table generators is the same as number of thread pools; qed", - ); - let plotting_fn = || { tokio::task::block_in_place(|| { let mut sector = Vec::new(); @@ -202,7 +191,9 @@ where pieces_in_sector, sector_output: &mut sector, sector_metadata_output: &mut sector_metadata, - table_generators: &mut local_table_generators, + table_generators: &mut (0..record_encoding_concurrency.get()) + .map(|_| PosTable::generator()) + .collect::>(), abort_early: &abort_early, global_mutex: &global_mutex, }, @@ -235,8 +226,6 @@ where let plotting_result = thread_pool.install(plotting_fn); - table_generators.lock().push(local_table_generators); - match plotting_result { Ok(plotting_result) => { if !progress_updater @@ -316,14 +305,6 @@ where kzg: Kzg, erasure_coding: ErasureCoding, ) -> Self { - let table_generators = (0..plotting_thread_pool_manager.thread_pool_pairs().get()) - .map(|_| { - (0..record_encoding_concurrency.get()) - .map(|_| PosTable::generator()) - .collect::>() - }) - .collect::>(); - let (tasks_sender, mut tasks_receiver) = mpsc::channel(1); // Basically runs plotting tasks in the background and allows to abort on drop @@ -358,7 +339,7 @@ where piece_getter, downloading_semaphore, plotting_thread_pool_manager, - table_generators: Arc::new(Mutex::new(table_generators)), + record_encoding_concurrency, global_mutex, kzg, erasure_coding, @@ -366,6 +347,7 @@ where tasks_sender, _background_tasks: background_tasks, abort_early, + _phantom: PhantomData, } }