Skip to content

Commit

Permalink
SQUASH-ME: fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
sr-gi committed Oct 5, 2023
1 parent 20e1967 commit 3710351
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 43 deletions.
63 changes: 38 additions & 25 deletions sim-lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ pub trait LightningNode {
pub trait NetworkGenerator {
// sample_node_by_capacity randomly picks a node within the network weighted by its capacity deployed to the
// network in channels. It returns the node's public key and its capacity in millisatoshis.
fn sample_node_by_capacity(&self, source: PublicKey) -> (PublicKey, u64);
fn sample_node_by_capacity(&self, source: PublicKey) -> (NodeInfo, u64);
}

pub trait PaymentGenerator {
Expand All @@ -214,14 +214,6 @@ pub trait PaymentGenerator {
fn payment_amount(&self, destination_capacity: u64) -> Result<u64, SimulationError>;
}

/// SimulationEvent describes the set of actions that the simulator can run on nodes that it has execution permissions
/// on.
#[derive(Clone)]
enum SimulationEvent {
// Dispatch a payment of the specified amount to the public key provided.
SendPayment(NodeInfo, u64),
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PaymentResult {
pub htlc_count: usize,
Expand Down Expand Up @@ -301,6 +293,15 @@ impl Display for Payment {
}
}

/// SimulationEvent describes the set of actions that the simulator can run on nodes that it has execution permissions
/// on.
#[derive(Clone)]
enum SimulationEvent {
/// Dispatch a payment of the specified amount to the public key provided.
/// Results in `SimulationOutput::SendPaymentSuccess` or `SimulationOutput::SendPaymentFailure`.
SendPayment(NodeInfo, u64),
}

/// SimulationOutput provides the output of a simulation event.
enum SimulationOutput {
/// Intermediate output for when simulator has successfully dispatched a payment.
Expand Down Expand Up @@ -343,7 +344,7 @@ impl Simulation {
let (shutdown_trigger, shutdown_listener) = triggered::trigger();
Self {
nodes,
activity: activity,
activity,
shutdown_trigger,
shutdown_listener,
total_time: total_time.map(|x| Duration::from_secs(x as u64)),
Expand Down Expand Up @@ -542,7 +543,9 @@ impl Simulation {

/// Returns the list of nodes that are eligible for generating random activity on. This is the subset of nodes
/// that have sufficient capacity to generate payments of our expected payment amount.
async fn random_activity_nodes(&self) -> Result<HashMap<PublicKey, u64>, SimulationError> {
async fn random_activity_nodes(
&self,
) -> Result<HashMap<PublicKey, (NodeInfo, u64)>, SimulationError> {
// Collect capacity of each node from its view of its own channels. Total capacity is divided by two to
// avoid double counting capacity (as each node has a counterparty in the channel).
let mut node_capacities = HashMap::new();
Expand All @@ -557,7 +560,13 @@ impl Simulation {
continue;
}

node_capacities.insert(*pk, chan_capacity / 2);
node_capacities.insert(
*pk,
(
node.lock().await.get_node_info(pk).await?,
chan_capacity / 2,
),
);
}

Ok(node_capacities)
Expand Down Expand Up @@ -618,21 +627,22 @@ impl Simulation {
/// provided for each node represented in producer channels.
async fn dispatch_random_producers(
&self,
node_capacities: HashMap<PublicKey, u64>,
node_capacities: HashMap<PublicKey, (NodeInfo, u64)>,
producer_channels: HashMap<PublicKey, Sender<SimulationEvent>>,
tasks: &mut JoinSet<()>,
) -> Result<(), SimulationError> {
let network_generator =
Arc::new(Mutex::new(NetworkGraphView::new(node_capacities.clone())?));
let network_generator = Arc::new(Mutex::new(NetworkGraphView::new(
node_capacities.values().cloned().collect(),
)?));

log::info!(
"Created network generator: {}.",
network_generator.lock().await
);

for (pk, sender) in producer_channels.into_iter() {
let source_capacity = match node_capacities.get(&pk) {
Some(capacity) => *capacity,
let (info, source_capacity) = match node_capacities.get(&pk) {
Some((info, capacity)) => (info.clone(), *capacity),
None => {
return Err(SimulationError::RandomActivityError(format!(
"Random activity generator run for: {} with unknown capacity.",
Expand All @@ -648,7 +658,7 @@ impl Simulation {
)?;

tasks.spawn(produce_random_events(
pk,
info,
network_generator.clone(),
node_generator,
sender.clone(),
Expand All @@ -672,8 +682,7 @@ async fn consume_events(
sender: Sender<SimulationOutput>,
shutdown: Trigger,
) {
let node_info = node.lock().await.get_info().clone();
log::debug!("Started consumer for {}.", node_info);
log::debug!("Started consumer for {}.", node.lock().await.get_info());

while let Some(event) = receiver.recv().await {
match event {
Expand All @@ -692,7 +701,7 @@ async fn consume_events(
Ok(payment_hash) => {
log::debug!(
"Send payment: {} -> {}: ({}).",
node_info,
node.get_info(),
dest,
hex::encode(payment_hash.0)
);
Expand All @@ -701,7 +710,11 @@ async fn consume_events(
SimulationOutput::SendPaymentSuccess(payment)
}
Err(e) => {
log::error!("Error while sending payment {} -> {}.", node_info, dest);
log::error!(
"Error while sending payment {} -> {}.",
node.get_info(),
dest
);

match e {
LightningError::PermanentError(s) => {
Expand Down Expand Up @@ -780,7 +793,7 @@ async fn produce_events(
}

async fn produce_random_events<N: NetworkGenerator, A: PaymentGenerator + Display>(
source: PublicKey,
source: NodeInfo,
network_generator: Arc<Mutex<N>>,
node_generator: A,
sender: Sender<SimulationEvent>,
Expand All @@ -802,7 +815,7 @@ async fn produce_random_events<N: NetworkGenerator, A: PaymentGenerator + Displa
// Wait until our time to next payment has elapsed then execute a random amount payment to a random
// destination.
_ = time::sleep(wait) => {
let destination = network_generator.lock().await.sample_node_by_capacity(source);
let destination = network_generator.lock().await.sample_node_by_capacity(source.pubkey);

// Only proceed with a payment if the amount is non-zero, otherwise skip this round. If we can't get
// a payment amount something has gone wrong (because we should have validated that we can always
Expand All @@ -824,7 +837,7 @@ async fn produce_random_events<N: NetworkGenerator, A: PaymentGenerator + Displa
log::debug!("Generated random payment: {source} -> {}: {amount} msat.", destination.0);

// Send the payment, exiting if we can no longer send to the consumer.
let event = SimulationEvent::SendPayment(destination.0, amount);
let event = SimulationEvent::SendPayment(destination.0.clone(), amount);
if let Err(e) = sender.send(event).await {
log::debug!(
"Stopped random producer for {amount}: {source} -> {}. Consumer error: {e}.", destination.0,
Expand Down
10 changes: 6 additions & 4 deletions sim-lib/src/lnd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,14 @@ impl LightningNode for LndNode {
.into_inner();

if info.chains.is_empty() {
return Err(LightningError::ValidationError(
"LND node is not connected any chain".to_string(),
));
return Err(LightningError::ValidationError(format!(
"{} is not connected any chain",
self.get_info()
)));
} else if info.chains.len() > 1 {
return Err(LightningError::ValidationError(format!(
"LND node is connected to more than one chain: {:?}",
"{} is connected to more than one chain: {:?}",
self.get_info(),
info.chains.iter().map(|c| c.chain.to_string())
)));
}
Expand Down
26 changes: 12 additions & 14 deletions sim-lib/src/random_activity.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
use core::fmt;
use std::collections::HashMap;
use std::fmt::Display;

use bitcoin::secp256k1::PublicKey;
use rand_distr::{Distribution, Exp, LogNormal, WeightedIndex};
use std::time::Duration;

use crate::{NetworkGenerator, PaymentGenerator, SimulationError};
use crate::{NetworkGenerator, NodeInfo, PaymentGenerator, SimulationError};

const HOURS_PER_MONTH: u64 = 30 * 24;
const SECONDS_PER_MONTH: u64 = HOURS_PER_MONTH * 60 * 60;
Expand All @@ -17,33 +16,31 @@ const SECONDS_PER_MONTH: u64 = HOURS_PER_MONTH * 60 * 60;
/// which has a view of the full network except for itself).
pub struct NetworkGraphView {
node_picker: WeightedIndex<u64>,
nodes: Vec<(PublicKey, u64)>,
nodes: Vec<(NodeInfo, u64)>,
}

impl NetworkGraphView {
// Creates a network view for the map of node public keys to capacity (in millisatoshis) provided. Returns an error
// if any node's capacity is zero (the node cannot receive), or there are not at least two nodes (one node can't
// send to itself).
pub fn new(node_capacities: HashMap<PublicKey, u64>) -> Result<Self, SimulationError> {
if node_capacities.len() < 2 {
pub fn new(nodes: Vec<(NodeInfo, u64)>) -> Result<Self, SimulationError> {
if nodes.len() < 2 {
return Err(SimulationError::RandomActivityError(
"at least two nodes required for activity generation".to_string(),
));
}

if node_capacities.values().any(|v| *v == 0) {
if nodes.iter().any(|(_, v)| *v == 0) {
return Err(SimulationError::RandomActivityError(
"network generator created with zero capacity node".to_string(),
));
}

// To create a weighted index we're going to need a vector of nodes that we index and weights that are set
// by their deployed capacity. To efficiently store our view of nodes capacity, we're also going to store
// capacity along with the node pubkey because we query the two at the same time. Zero capacity nodes are
// capacity along with the node info because we query the two at the same time. Zero capacity nodes are
// filtered out because they have no chance of being selected (and wont' be able to receive payments).
let nodes = node_capacities.iter().map(|(k, v)| (*k, *v)).collect();

let node_picker = WeightedIndex::new(node_capacities.into_values().collect::<Vec<u64>>())
let node_picker = WeightedIndex::new(nodes.iter().map(|(_, v)| *v).collect::<Vec<u64>>())
.map_err(|e| SimulationError::RandomActivityError(e.to_string()))?;

Ok(NetworkGraphView { node_picker, nodes })
Expand All @@ -54,7 +51,7 @@ impl NetworkGenerator for NetworkGraphView {
/// Randomly samples the network for a node, weighted by capacity. Using a single graph view means that it's
/// possible for a source node to select itself. After sufficient retries, this is highly improbable (even with
/// very small graphs, or those with one node significantly more capitalized than others).
fn sample_node_by_capacity(&self, source: PublicKey) -> (PublicKey, u64) {
fn sample_node_by_capacity(&self, source: PublicKey) -> (NodeInfo, u64) {
let mut rng = rand::thread_rng();

// While it's very unlikely that we can't pick a destination that is not our source, it's possible that there's
Expand All @@ -63,10 +60,11 @@ impl NetworkGenerator for NetworkGraphView {
let mut i = 1;
loop {
let index = self.node_picker.sample(&mut rng);
let destination = self.nodes[index];
// Unwrapping is safe given `NetworkGraphView` has the same amount of elements for `nodes` and `node_picker`
let destination = self.nodes.get(index).unwrap();

if destination.0 != source {
return destination;
if destination.0.pubkey != source {
return destination.clone();
}

if i % 50 == 0 {
Expand Down

0 comments on commit 3710351

Please sign in to comment.