Skip to content

Commit

Permalink
Merge pull request #146 from sr-gi/20231018-no-results
Browse files Browse the repository at this point in the history
sim-all: Allow running the simulator without it writing on disk
  • Loading branch information
sr-gi authored Oct 23, 2023
2 parents b09d3c0 + cf92f52 commit 0f3d292
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 26 deletions.
4 changes: 4 additions & 0 deletions sim-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ struct Cli {
/// Multiplier of the overall network capacity used by the random activity generator
#[clap(long, short, default_value_t = ACTIVITY_MULTIPLIER)]
capacity_multiplier: f64,
/// Do not create an output file containing the simulations results
#[clap(long, default_value_t = false)]
no_results: bool,
}

#[tokio::main]
Expand Down Expand Up @@ -164,6 +167,7 @@ async fn main() -> anyhow::Result<()> {
cli.print_batch_size,
cli.expected_pmt_amt,
cli.capacity_multiplier,
cli.no_results,
);
let sim2 = sim.clone();

Expand Down
58 changes: 32 additions & 26 deletions sim-lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,8 @@ pub struct Simulation {
/// The number of times that the network sends its total capacity in a month of operation when generating random
/// activity.
activity_multiplier: f64,
/// Whether we want the simulation not to produce and result file. Useful for developing, defaults to false.
no_results: bool,
}

impl Simulation {
Expand All @@ -338,6 +340,7 @@ impl Simulation {
print_batch_size: u32,
expected_payment_msat: u64,
activity_multiplier: f64,
no_results: bool,
) -> Self {
let (shutdown_trigger, shutdown_listener) = triggered::trigger();
Self {
Expand All @@ -349,6 +352,7 @@ impl Simulation {
print_batch_size,
expected_payment_msat,
activity_multiplier,
no_results,
}
}

Expand Down Expand Up @@ -528,7 +532,6 @@ impl Simulation {
tasks: &mut JoinSet<()>,
) {
let listener = self.shutdown_listener.clone();
let print_batch_size = self.print_batch_size;
log::debug!("Setting up simulator data collection.");

// Create a sender/receiver pair that will be used to report final results of simulation.
Expand All @@ -544,7 +547,8 @@ impl Simulation {
tasks.spawn(consume_simulation_results(
results_receiver,
listener,
print_batch_size,
self.print_batch_size,
self.no_results,
));
log::debug!("Simulator data collection set up.");
}
Expand Down Expand Up @@ -864,10 +868,11 @@ async fn consume_simulation_results(
receiver: Receiver<(Payment, PaymentResult)>,
listener: Listener,
print_batch_size: u32,
no_results: bool,
) {
log::debug!("Simulation results consumer started.");

if let Err(e) = write_payment_results(receiver, listener, print_batch_size).await {
if let Err(e) = write_payment_results(receiver, listener, print_batch_size, no_results).await {
log::error!("Error while reporting payment results: {:?}.", e);
}

Expand All @@ -878,47 +883,48 @@ async fn write_payment_results(
mut receiver: Receiver<(Payment, PaymentResult)>,
listener: Listener,
print_batch_size: u32,
no_results: bool,
) -> Result<(), SimulationError> {
let mut writer = WriterBuilder::new().from_path(format!(
"simulation_{:?}.csv",
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs()
))?;
let mut writer = if !no_results {
Some(WriterBuilder::new().from_path(format!(
"simulation_{:?}.csv",
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs()
))?)
} else {
None
};

let mut result_logger = PaymentResultLogger::new();

let mut counter = 1;
let mut counter = 0;
loop {
tokio::select! {
biased;
_ = listener.clone() => {
log::debug!("Simulation results consumer received shutdown signal.");
break writer.flush().map_err(|_| SimulationError::FileError)
return writer.map_or(Ok(()), |ref mut w| w.flush().map_err(|_| SimulationError::FileError))
},
payment_report = receiver.recv() => {
match payment_report {
Some((details, result)) => {
result_logger.report_result(&details, &result);
log::trace!("Resolved dispatched payment: {} with: {}.", details, result);

writer.serialize((details, result)).map_err(|e| {
let _ = writer.flush();
SimulationError::CsvError(e)
})?;
if let Some(ref mut w) = writer {
w.serialize((details, result)).map_err(|e| {
let _ = w.flush();
SimulationError::CsvError(e)
})?;

if print_batch_size == counter {
writer.flush().map_err(|_| SimulationError::FileError)?;
counter = 1;
} else {
counter += 1;
counter = counter % print_batch_size + 1;
if print_batch_size == counter {
w.flush().map_err(|_| SimulationError::FileError)?;
}
}
continue;
},
None => {
break writer.flush().map_err(|_| SimulationError::FileError)
}
None => return writer.map(|ref mut w| w.flush().map_err(|_| SimulationError::FileError)).unwrap_or(Ok(())),
}
}
}
Expand Down

0 comments on commit 0f3d292

Please sign in to comment.