-
Notifications
You must be signed in to change notification settings - Fork 242
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
16 changed files
with
5,491 additions
and
26 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,4 +1,5 @@ | ||
pub(crate) mod benchmark; | ||
pub(crate) mod cluster; | ||
pub(crate) mod farm; | ||
mod info; | ||
mod scrub; | ||
|
122 changes: 122 additions & 0 deletions
122
crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster.rs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,122 @@ | ||
mod cache; | ||
mod controller; | ||
mod farmer; | ||
mod plotter; | ||
|
||
use crate::commands::cluster::cache::{cache, CacheArgs}; | ||
use crate::commands::cluster::controller::{controller, ControllerArgs}; | ||
use crate::commands::cluster::farmer::{farmer, FarmerArgs}; | ||
use crate::commands::cluster::plotter::{plotter, PlotterArgs}; | ||
use crate::utils::shutdown_signal; | ||
use anyhow::anyhow; | ||
use async_nats::ServerAddr; | ||
use backoff::ExponentialBackoff; | ||
use clap::{Parser, Subcommand}; | ||
use futures::stream::FuturesUnordered; | ||
use futures::{select, FutureExt, StreamExt}; | ||
use prometheus_client::registry::Registry; | ||
use std::net::SocketAddr; | ||
use subspace_farmer::cluster::nats_client::NatsClient; | ||
use subspace_farmer::utils::AsyncJoinOnDrop; | ||
use subspace_metrics::{start_prometheus_metrics_server, RegistryAdapter}; | ||
use subspace_proof_of_space::Table; | ||
|
||
/// Arguments for cluster | ||
#[derive(Debug, Parser)] | ||
pub(crate) struct ClusterArgs { | ||
/// Shared arguments for all subcommands | ||
#[clap(flatten)] | ||
shared_args: SharedArgs, | ||
/// Cluster subcommands | ||
#[clap(subcommand)] | ||
subcommand: ClusterSubcommand, | ||
} | ||
|
||
/// Shared arguments | ||
#[derive(Debug, Parser)] | ||
struct SharedArgs { | ||
/// NATS server address, typically in `nats://server1:port1` format, can be specified multiple | ||
/// times. | ||
/// | ||
/// NOTE: NATS must be configured for message sizes of 2MiB or larger (1MiB is the default), | ||
/// which can be done by starting NATS server with config file containing `max_payload = 2MB`. | ||
#[arg(long, alias = "nats-server", required = true)] | ||
nats_servers: Vec<ServerAddr>, | ||
/// Defines endpoints for the prometheus metrics server. It doesn't start without at least | ||
/// one specified endpoint. Format: 127.0.0.1:8080 | ||
#[arg(long, aliases = ["metrics-endpoint", "metrics-endpoints"])] | ||
prometheus_listen_on: Vec<SocketAddr>, | ||
} | ||
|
||
/// Cluster subcommands | ||
#[derive(Debug, Subcommand)] | ||
enum ClusterSubcommand { | ||
/// Farming cluster controller | ||
Controller(ControllerArgs), | ||
/// Farming cluster farmer | ||
Farmer(FarmerArgs), | ||
/// Farming cluster plotter | ||
Plotter(PlotterArgs), | ||
/// Farming cluster cache | ||
Cache(CacheArgs), | ||
} | ||
|
||
pub(crate) async fn cluster<PosTable>(cluster_args: ClusterArgs) -> anyhow::Result<()> | ||
where | ||
PosTable: Table, | ||
{ | ||
let signal = shutdown_signal(); | ||
|
||
let nats_client = NatsClient::new( | ||
cluster_args.shared_args.nats_servers, | ||
ExponentialBackoff { | ||
max_elapsed_time: None, | ||
..ExponentialBackoff::default() | ||
}, | ||
) | ||
.await | ||
.map_err(|error| anyhow!("Failed to connect to NATS server: {error}"))?; | ||
let mut registry = Registry::default(); | ||
|
||
let mut tasks = FuturesUnordered::new(); | ||
|
||
// TODO: Support running multiple components at once | ||
tasks.push(match cluster_args.subcommand { | ||
ClusterSubcommand::Controller(controller_args) => { | ||
controller(nats_client, &mut registry, controller_args).await? | ||
} | ||
ClusterSubcommand::Farmer(farmer_args) => { | ||
farmer::<PosTable>(nats_client, &mut registry, farmer_args).await? | ||
} | ||
ClusterSubcommand::Plotter(plotter_args) => { | ||
plotter::<PosTable>(nats_client, &mut registry, plotter_args).await? | ||
} | ||
ClusterSubcommand::Cache(cache_args) => { | ||
cache(nats_client, &mut registry, cache_args).await? | ||
} | ||
}); | ||
|
||
if !cluster_args.shared_args.prometheus_listen_on.is_empty() { | ||
let prometheus_task = start_prometheus_metrics_server( | ||
cluster_args.shared_args.prometheus_listen_on, | ||
RegistryAdapter::PrometheusClient(registry), | ||
)?; | ||
|
||
let join_handle = tokio::spawn(prometheus_task); | ||
tasks.push(Box::pin(async move { | ||
Ok(AsyncJoinOnDrop::new(join_handle, true).await??) | ||
})); | ||
} | ||
|
||
select! { | ||
// Signal future | ||
_ = signal.fuse() => { | ||
Ok(()) | ||
}, | ||
|
||
// Run future | ||
result = tasks.next() => { | ||
result.expect("List of tasks is not empty; qed") | ||
}, | ||
} | ||
} |
175 changes: 175 additions & 0 deletions
175
crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/cache.rs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,175 @@ | ||
use anyhow::anyhow; | ||
use bytesize::ByteSize; | ||
use clap::Parser; | ||
use prometheus_client::registry::Registry; | ||
use std::fs; | ||
use std::future::Future; | ||
use std::path::PathBuf; | ||
use std::pin::Pin; | ||
use std::str::FromStr; | ||
use std::time::Duration; | ||
use subspace_farmer::cluster::cache::cache_service; | ||
use subspace_farmer::cluster::nats_client::NatsClient; | ||
use subspace_farmer::piece_cache::PieceCache; | ||
|
||
/// Interval between cache self-identification broadcast messages | ||
pub(super) const CACHE_IDENTIFICATION_BROADCAST_INTERVAL: Duration = Duration::from_secs(5); | ||
|
||
#[derive(Debug, Clone)] | ||
struct DiskCache { | ||
/// Path to directory where cache is stored | ||
directory: PathBuf, | ||
/// How much space in bytes can cache use | ||
allocated_space: u64, | ||
} | ||
|
||
impl FromStr for DiskCache { | ||
type Err = String; | ||
|
||
fn from_str(s: &str) -> anyhow::Result<Self, Self::Err> { | ||
let parts = s.split(',').collect::<Vec<_>>(); | ||
if parts.len() != 2 { | ||
return Err("Must contain 2 coma-separated components".to_string()); | ||
} | ||
|
||
let mut plot_directory = None; | ||
let mut allocated_space = None; | ||
|
||
for part in parts { | ||
let part = part.splitn(2, '=').collect::<Vec<_>>(); | ||
if part.len() != 2 { | ||
return Err("Each component must contain = separating key from value".to_string()); | ||
} | ||
|
||
let key = *part.first().expect("Length checked above; qed"); | ||
let value = *part.get(1).expect("Length checked above; qed"); | ||
|
||
match key { | ||
"path" => { | ||
plot_directory.replace(PathBuf::from(value)); | ||
} | ||
"size" => { | ||
allocated_space.replace( | ||
value | ||
.parse::<ByteSize>() | ||
.map_err(|error| { | ||
format!("Failed to parse `size` \"{value}\": {error}") | ||
})? | ||
.as_u64(), | ||
); | ||
} | ||
key => { | ||
return Err(format!( | ||
"Key \"{key}\" is not supported, only `path` or `size`" | ||
)); | ||
} | ||
} | ||
} | ||
|
||
Ok(DiskCache { | ||
directory: plot_directory.ok_or( | ||
"`path` key is required with path to directory where cache will be stored", | ||
)?, | ||
allocated_space: allocated_space | ||
.ok_or("`size` key is required with allocated amount of disk space")?, | ||
}) | ||
} | ||
} | ||
|
||
/// Arguments for cache | ||
#[derive(Debug, Parser)] | ||
pub(super) struct CacheArgs { | ||
/// One or more caches located at specified path, each with its own allocated space. | ||
/// | ||
/// Format for each cache is coma-separated list of strings like this: | ||
/// | ||
/// path=/path/to/directory,size=5T | ||
/// | ||
/// `size` is max allocated size in human-readable format (e.g. 10GB, 2TiB) or just bytes that | ||
/// cache will make sure to not exceed (and will pre-allocated all the space on startup to | ||
/// ensure it will not run out of space in runtime). | ||
disk_caches: Vec<DiskCache>, | ||
/// Run temporary cache with specified farm size in human-readable format (e.g. 10GB, 2TiB) or | ||
/// just bytes (e.g. 4096), this will create a temporary directory that will be deleted at the | ||
/// end of the process. | ||
#[arg(long, conflicts_with = "disk_caches")] | ||
tmp: Option<ByteSize>, | ||
/// Cache group to use, the same cache group must be also specified on corresponding controller | ||
#[arg(long, default_value = "default")] | ||
cache_group: String, | ||
} | ||
|
||
pub(super) async fn cache( | ||
nats_client: NatsClient, | ||
_registry: &mut Registry, | ||
cache_args: CacheArgs, | ||
) -> anyhow::Result<Pin<Box<dyn Future<Output = anyhow::Result<()>>>>> { | ||
let CacheArgs { | ||
mut disk_caches, | ||
tmp, | ||
cache_group, | ||
} = cache_args; | ||
|
||
let tmp_directory = if let Some(plot_size) = tmp { | ||
let tmp_directory = tempfile::Builder::new() | ||
.prefix("subspace-cache-") | ||
.tempdir()?; | ||
|
||
disk_caches = vec![DiskCache { | ||
directory: tmp_directory.as_ref().to_path_buf(), | ||
allocated_space: plot_size.as_u64(), | ||
}]; | ||
|
||
Some(tmp_directory) | ||
} else { | ||
if disk_caches.is_empty() { | ||
return Err(anyhow!("There must be at least one disk cache provided")); | ||
} | ||
|
||
for cache in &disk_caches { | ||
if !cache.directory.exists() { | ||
if let Err(error) = fs::create_dir(&cache.directory) { | ||
return Err(anyhow!( | ||
"Directory {} doesn't exist and can't be created: {}", | ||
cache.directory.display(), | ||
error | ||
)); | ||
} | ||
} | ||
} | ||
None | ||
}; | ||
|
||
// TODO: Metrics | ||
|
||
let caches = disk_caches | ||
.iter() | ||
.map(|disk_cache| { | ||
PieceCache::open( | ||
&disk_cache.directory, | ||
u32::try_from(disk_cache.allocated_space / PieceCache::element_size() as u64) | ||
.unwrap_or(u32::MAX), | ||
) | ||
.map_err(|error| { | ||
anyhow!( | ||
"Failed to open piece cache at {}: {error}", | ||
disk_cache.directory.display() | ||
) | ||
}) | ||
}) | ||
.collect::<Result<Vec<_>, _>>()?; | ||
|
||
Ok(Box::pin(async move { | ||
cache_service( | ||
nats_client, | ||
&caches, | ||
&cache_group, | ||
CACHE_IDENTIFICATION_BROADCAST_INTERVAL, | ||
) | ||
.await?; | ||
|
||
drop(tmp_directory); | ||
|
||
Ok(()) | ||
})) | ||
} |
Oops, something went wrong.