Skip to content

Commit

Permalink
sim-all: Allow running the simulator without it writing on disk
Browse files Browse the repository at this point in the history
It is rather annoying to run the simulator and having to delete the output files
when we are doing testing/dev. This allows us to do so without the file being created.
  • Loading branch information
sr-gi committed Oct 20, 2023
1 parent b09d3c0 commit 3c34db8
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 24 deletions.
4 changes: 4 additions & 0 deletions sim-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ struct Cli {
/// Multiplier of the overall network capacity used by the random activity generator
#[clap(long, short, default_value_t = ACTIVITY_MULTIPLIER)]
capacity_multiplier: f64,
/// Do not create an output file containing the simulations results
#[clap(long, default_value_t = false)]
no_results: bool,
}

#[tokio::main]
Expand Down Expand Up @@ -164,6 +167,7 @@ async fn main() -> anyhow::Result<()> {
cli.print_batch_size,
cli.expected_pmt_amt,
cli.capacity_multiplier,
cli.no_results,
);
let sim2 = sim.clone();

Expand Down
58 changes: 34 additions & 24 deletions sim-lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,8 @@ pub struct Simulation {
/// The number of times that the network sends its total capacity in a month of operation when generating random
/// activity.
activity_multiplier: f64,
/// Whether we want the simulation not to produce and result file. Useful for developing, defaults to false.
no_results: bool,
}

impl Simulation {
Expand All @@ -338,6 +340,7 @@ impl Simulation {
print_batch_size: u32,
expected_payment_msat: u64,
activity_multiplier: f64,
no_results: bool,
) -> Self {
let (shutdown_trigger, shutdown_listener) = triggered::trigger();
Self {
Expand All @@ -349,6 +352,7 @@ impl Simulation {
print_batch_size,
expected_payment_msat,
activity_multiplier,
no_results,
}
}

Expand Down Expand Up @@ -528,7 +532,6 @@ impl Simulation {
tasks: &mut JoinSet<()>,
) {
let listener = self.shutdown_listener.clone();
let print_batch_size = self.print_batch_size;
log::debug!("Setting up simulator data collection.");

// Create a sender/receiver pair that will be used to report final results of simulation.
Expand All @@ -544,7 +547,8 @@ impl Simulation {
tasks.spawn(consume_simulation_results(
results_receiver,
listener,
print_batch_size,
self.print_batch_size,
self.no_results,
));
log::debug!("Simulator data collection set up.");
}
Expand Down Expand Up @@ -864,10 +868,11 @@ async fn consume_simulation_results(
receiver: Receiver<(Payment, PaymentResult)>,
listener: Listener,
print_batch_size: u32,
no_results: bool,
) {
log::debug!("Simulation results consumer started.");

if let Err(e) = write_payment_results(receiver, listener, print_batch_size).await {
if let Err(e) = write_payment_results(receiver, listener, print_batch_size, no_results).await {
log::error!("Error while reporting payment results: {:?}.", e);
}

Expand All @@ -878,46 +883,51 @@ async fn write_payment_results(
mut receiver: Receiver<(Payment, PaymentResult)>,
listener: Listener,
print_batch_size: u32,
no_results: bool,
) -> Result<(), SimulationError> {
let mut writer = WriterBuilder::new().from_path(format!(
"simulation_{:?}.csv",
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs()
))?;
let mut writer = if !no_results {
Some(WriterBuilder::new().from_path(format!(
"simulation_{:?}.csv",
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs()
))?)
} else {
None
};

let mut result_logger = PaymentResultLogger::new();

let mut counter = 1;
let mut counter = 0;
loop {
tokio::select! {
biased;
_ = listener.clone() => {
log::debug!("Simulation results consumer received shutdown signal.");
break writer.flush().map_err(|_| SimulationError::FileError)
return writer.map(|ref mut w| w.flush().map_err(|_| SimulationError::FileError)).unwrap_or(Ok(()));

},
payment_report = receiver.recv() => {
match payment_report {
Some((details, result)) => {
result_logger.report_result(&details, &result);
log::trace!("Resolved dispatched payment: {} with: {}.", details, result);

writer.serialize((details, result)).map_err(|e| {
let _ = writer.flush();
SimulationError::CsvError(e)
})?;
if let Some(ref mut w) = writer {
w.serialize((details, result)).map_err(|e| {
let _ = w.flush();
SimulationError::CsvError(e)
})?;

if print_batch_size == counter {
writer.flush().map_err(|_| SimulationError::FileError)?;
counter = 1;
} else {
counter += 1;
counter = counter % print_batch_size + 1;
if print_batch_size == counter {
w.flush().map_err(|_| SimulationError::FileError)?;
}
}
continue;
},
None => {
break writer.flush().map_err(|_| SimulationError::FileError)
return writer.map(|ref mut w| w.flush().map_err(|_| SimulationError::FileError)).unwrap_or(Ok(()));

}
}
}
Expand Down

0 comments on commit 3c34db8

Please sign in to comment.