Skip to content

Commit

Permalink
sim-all: logs aliases and pks if both are known
Browse files Browse the repository at this point in the history
This commit redefines what data about a node is passed along to the simulator on
activities. Previously, we used to pass only the src/dst `PublicKey`. However, by doing
this we had no way of logging (nor accessing) any other information about them. This is
mainly because all this mapping is only available on `sim-cli::main.rs`, and also because
the data is mostly logged by the producers/consumers functions, which are not even part
or the `Simulator` class.

Therefore, the alternative we are left with are passing the information to the simulator.
This can be done in several ways, I've gone with the one that has a better balance between
codediff and usefulness, which is passing `NodeInfo` along. This, however, means having to
obtain the destination node features on `main` instead of on `validation`.
  • Loading branch information
sr-gi committed Oct 4, 2023
1 parent 5220fa8 commit 20e1967
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 109 deletions.
72 changes: 52 additions & 20 deletions sim-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ async fn main() -> anyhow::Result<()> {
} = serde_json::from_str(&config_str)?;

let mut clients: HashMap<PublicKey, Arc<Mutex<dyn LightningNode + Send>>> = HashMap::new();
let mut pk_node_map = HashMap::new();
let mut alias_node_map = HashMap::new();

for connection in nodes {
Expand Down Expand Up @@ -77,36 +78,67 @@ async fn main() -> anyhow::Result<()> {
)));
}

alias_node_map.insert(node_info.alias.clone(), node_info.pubkey);
clients.insert(node_info.pubkey, node);
pk_node_map.insert(node_info.pubkey, node_info.clone());
alias_node_map.insert(node_info.alias.clone(), node_info);
}

let mut validated_activities = vec![];
// Make all the activities identifiable by PK internally
for act in activity.iter_mut() {
// We can only map aliases to nodes we control, so if either the source or destination alias
// is not in alias_node_map, we fail
if let NodeId::Alias(a) = &act.source {
if let Some(pk) = alias_node_map.get(a) {
act.source = NodeId::PublicKey(*pk);
} else {
anyhow::bail!(LightningError::ValidationError(format!(
"activity source alias {} not found in nodes.",
act.source
)));
let source = if let Some(source) = match &act.source {
NodeId::PublicKey(pk) => pk_node_map.get(pk),
NodeId::Alias(a) => alias_node_map.get(a),
} {
source.clone()
} else {
anyhow::bail!(LightningError::ValidationError(format!(
"activity source {} not found in nodes.",
act.source
)));
};

let destination = match &act.destination {
NodeId::Alias(a) => {
if let Some(info) = alias_node_map.get(a) {
info.clone()
} else {
anyhow::bail!(LightningError::ValidationError(format!(
"unknown activity destination: {}.",
act.destination
)));
}
}
}
if let NodeId::Alias(a) = &act.destination {
if let Some(pk) = alias_node_map.get(a) {
act.destination = NodeId::PublicKey(*pk);
} else {
anyhow::bail!(LightningError::ValidationError(format!(
"unknown activity destination: {}.",
act.destination
)));
NodeId::PublicKey(pk) => {
if let Some(info) = pk_node_map.get(pk) {
info.clone()
} else {
clients
.get(&source.pubkey)
.unwrap()
.lock()
.await
.get_node_info(pk)
.await
.map_err(|e| {
log::debug!("{}", e);
LightningError::ValidationError(format!(
"Destination node unknown or invalid: {}.",
pk,
))
})?
}
}
}
validated_activities.push(ActivityDefinition::try_from(act)?);
};

validated_activities.push(ActivityDefinition {
source,
destination,
interval_secs: act.interval_secs,
amount_msat: act.amount_msat,
});
}

let sim = Simulation::new(
Expand Down
32 changes: 16 additions & 16 deletions sim-lib/src/cln.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,31 +230,31 @@ impl LightningNode for ClnNode {
}
}

async fn get_node_features(
&mut self,
node: &PublicKey,
) -> Result<NodeFeatures, LightningError> {
let node_id = node.serialize().to_vec();
let nodes: Vec<cln_grpc::pb::ListnodesNodes> = self
async fn get_node_info(&mut self, node_id: &PublicKey) -> Result<NodeInfo, LightningError> {
let mut nodes: Vec<cln_grpc::pb::ListnodesNodes> = self
.client
.list_nodes(ListnodesRequest {
id: Some(node_id.clone()),
id: Some(node_id.serialize().to_vec()),
})
.await
.map_err(|err| LightningError::GetNodeInfoError(err.to_string()))?
.into_inner()
.nodes;

// We are filtering `list_nodes` to a single node, so we should get either an empty vector or one with a single element
if let Some(node) = nodes.first() {
Ok(node
.features
.clone()
.map_or(NodeFeatures::empty(), |mut f| {
// We need to reverse this given it has the CLN wire encoding which is BE
f.reverse();
NodeFeatures::from_le_bytes(f)
}))
if let Some(node) = nodes.pop() {
Ok(NodeInfo {
pubkey: *node_id,
alias: node.alias.unwrap_or(String::new()),
features: node
.features
.clone()
.map_or(NodeFeatures::empty(), |mut f| {
// We need to reverse this given it has the CLN wire encoding which is BE
f.reverse();
NodeFeatures::from_le_bytes(f)
}),
})
} else {
Err(LightningError::GetNodeInfoError(
"Node not found".to_string(),
Expand Down
106 changes: 41 additions & 65 deletions sim-lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,37 +106,18 @@ pub struct ActivityParser {
pub amount_msat: u64,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[derive(Debug, Clone)]
pub struct ActivityDefinition {
// The source of the payment.
pub source: PublicKey,
pub source: NodeInfo,
// The destination of the payment.
pub destination: PublicKey,
pub destination: NodeInfo,
// The interval of the event, as in every how many seconds the payment is performed.
pub interval_secs: u16,
// The amount of m_sat to used in this payment.
pub amount_msat: u64,
}

impl TryFrom<&mut ActivityParser> for ActivityDefinition {
type Error = LightningError;

fn try_from(a: &mut ActivityParser) -> Result<Self, Self::Error> {
let source = *a.source.get_pk().map_err(Self::Error::ValidationError)?;
let destination = *a
.destination
.get_pk()
.map_err(Self::Error::ValidationError)?;

Ok(Self {
source,
destination,
interval_secs: a.interval_secs,
amount_msat: a.amount_msat,
})
}
}

#[derive(Debug, Error)]
pub enum SimulationError {
#[error("Lightning Error: {0:?}")]
Expand Down Expand Up @@ -181,6 +162,18 @@ pub struct NodeInfo {
pub features: NodeFeatures,
}

impl Display for NodeInfo {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let pk = self.pubkey.to_string();
let pk_summary = format!("{}...{}", &pk[..6], &pk[pk.len() - 6..]);
if self.alias.is_empty() {
write!(f, "{}", pk_summary)
} else {
write!(f, "{}({})", self.alias, pk_summary)
}
}
}

/// LightningNode represents the functionality that is required to execute events on a lightning node.
#[async_trait]
pub trait LightningNode {
Expand All @@ -200,9 +193,8 @@ pub trait LightningNode {
hash: PaymentHash,
shutdown: Listener,
) -> Result<PaymentResult, LightningError>;
/// Gets the list of features of a given node
async fn get_node_features(&mut self, node: &PublicKey)
-> Result<NodeFeatures, LightningError>;
/// Gets information on a specific node
async fn get_node_info(&mut self, node_id: &PublicKey) -> Result<NodeInfo, LightningError>;
/// Lists all channels, at present only returns a vector of channel capacities in msat because no further
/// information is required.
async fn list_channels(&mut self) -> Result<Vec<u64>, LightningError>;
Expand All @@ -222,6 +214,14 @@ pub trait PaymentGenerator {
fn payment_amount(&self, destination_capacity: u64) -> Result<u64, SimulationError>;
}

/// SimulationEvent describes the set of actions that the simulator can run on nodes that it has execution permissions
/// on.
#[derive(Clone)]
enum SimulationEvent {
// Dispatch a payment of the specified amount to the public key provided.
SendPayment(NodeInfo, u64),
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PaymentResult {
pub htlc_count: usize,
Expand Down Expand Up @@ -301,15 +301,6 @@ impl Display for Payment {
}
}

/// SimulationEvent describes the set of actions that the simulator can run on nodes that it has execution permissions
/// on.
#[derive(Clone, Copy)]
enum SimulationEvent {
/// Dispatch a payment of the specified amount to the public key provided.
/// Results in `SimulationOutput::SendPaymentSuccess` or `SimulationOutput::SendPaymentFailure`.
SendPayment(PublicKey, u64),
}

/// SimulationOutput provides the output of a simulation event.
enum SimulationOutput {
/// Intermediate output for when simulator has successfully dispatched a payment.
Expand Down Expand Up @@ -375,30 +366,16 @@ impl Simulation {
for payment_flow in self.activity.iter() {
// We need every source node that is configured to execute some activity to be included in our set of
// nodes so that we can execute events on it.
let source_node =
self.nodes
.get(&payment_flow.source)
.ok_or(LightningError::ValidationError(format!(
"Source node not found, {}",
payment_flow.source,
)))?;
self.nodes
.get(&payment_flow.source.pubkey)
.ok_or(LightningError::ValidationError(format!(
"Source node not found, {}",
payment_flow.source,
)))?;

// Destinations must support keysend to be able to receive payments.
// Note: validation should be update with a different check if an event is not a payment.
let features = source_node
.lock()
.await
.get_node_features(&payment_flow.destination)
.await
.map_err(|e| {
log::debug!("{}", e);
LightningError::ValidationError(format!(
"Destination node unknown or invalid, {}",
payment_flow.destination,
))
})?;

if !features.supports_keysend() {
if !payment_flow.destination.features.supports_keysend() {
return Err(LightningError::ValidationError(format!(
"Destination node does not support keysend, {}",
payment_flow.destination,
Expand Down Expand Up @@ -475,7 +452,7 @@ impl Simulation {
let collecting_nodes = if !self.activity.is_empty() {
self.activity
.iter()
.map(|activity| activity.source)
.map(|activity| activity.source.pubkey)
.collect()
} else {
random_activity_nodes.extend(self.random_activity_nodes().await?);
Expand Down Expand Up @@ -627,7 +604,7 @@ impl Simulation {
tasks: &mut JoinSet<()>,
) {
for description in self.activity.iter() {
let sender_chan = producer_channels.get(&description.source).unwrap();
let sender_chan = producer_channels.get(&description.source.pubkey).unwrap();
tasks.spawn(produce_events(
description.clone(),
sender_chan.clone(),
Expand Down Expand Up @@ -695,8 +672,8 @@ async fn consume_events(
sender: Sender<SimulationOutput>,
shutdown: Trigger,
) {
let node_id = node.lock().await.get_info().pubkey;
log::debug!("Started consumer for {}.", node_id);
let node_info = node.lock().await.get_info().clone();
log::debug!("Started consumer for {}.", node_info);

while let Some(event) = receiver.recv().await {
match event {
Expand All @@ -707,15 +684,15 @@ async fn consume_events(
source: node.get_info().pubkey,
hash: None,
amount_msat: amt_msat,
destination: dest,
destination: dest.pubkey,
dispatch_time: SystemTime::now(),
};

let outcome = match node.send_payment(dest, amt_msat).await {
let outcome = match node.send_payment(dest.pubkey, amt_msat).await {
Ok(payment_hash) => {
log::debug!(
"Send payment: {} -> {}: ({}).",
node_id,
node_info,
dest,
hex::encode(payment_hash.0)
);
Expand All @@ -724,7 +701,7 @@ async fn consume_events(
SimulationOutput::SendPaymentSuccess(payment)
}
Err(e) => {
log::error!("Error while sending payment {} -> {}.", node_id, dest);
log::error!("Error while sending payment {} -> {}.", node_info, dest);

match e {
LightningError::PermanentError(s) => {
Expand Down Expand Up @@ -760,7 +737,6 @@ async fn produce_events(
shutdown: Trigger,
listener: Listener,
) {
let e: SimulationEvent = SimulationEvent::SendPayment(act.destination, act.amount_msat);
let interval = time::Duration::from_secs(act.interval_secs as u64);

log::debug!(
Expand All @@ -776,7 +752,7 @@ async fn produce_events(
biased;
_ = time::sleep(interval) => {
// Consumer was dropped
if sender.send(e).await.is_err() {
if sender.send(SimulationEvent::SendPayment(act.destination.clone(), act.amount_msat)).await.is_err() {
log::debug!(
"Stopped producer for {}: {} -> {}. Consumer cannot be reached.",
act.amount_msat,
Expand Down
15 changes: 7 additions & 8 deletions sim-lib/src/lnd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,25 +229,24 @@ impl LightningNode for LndNode {
}
}

async fn get_node_features(
&mut self,
node: &PublicKey,
) -> Result<NodeFeatures, LightningError> {
async fn get_node_info(&mut self, node_id: &PublicKey) -> Result<NodeInfo, LightningError> {
let node_info = self
.client
.lightning()
.get_node_info(NodeInfoRequest {
pub_key: node.to_string(),
pub_key: node_id.to_string(),
include_channels: false,
})
.await
.map_err(|err| LightningError::GetNodeInfoError(err.to_string()))?
.into_inner();

if let Some(node_info) = node_info.node {
Ok(parse_node_features(
node_info.features.keys().cloned().collect(),
))
Ok(NodeInfo {
pubkey: *node_id,
alias: node_info.alias,
features: parse_node_features(node_info.features.keys().cloned().collect()),
})
} else {
Err(LightningError::GetNodeInfoError(
"Node not found".to_string(),
Expand Down

0 comments on commit 20e1967

Please sign in to comment.