Skip to content

Commit

Permalink
Merge branch 'main' of github.com:input-output-hk/ouroboros-leios
Browse files Browse the repository at this point in the history
  • Loading branch information
cjkoepke committed Nov 14, 2024
2 parents 2e460e9 + 0c4bd12 commit 1582d87
Show file tree
Hide file tree
Showing 11 changed files with 322 additions and 87 deletions.
2 changes: 2 additions & 0 deletions sim-rs/sim-cli/src/bin/gen-test-data/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,11 @@ fn main() -> Result<()> {
links,
block_generation_probability: 0.05,
ib_generation_probability: 5.0,
eb_generation_probability: 5.0,
ib_shards: 8,
max_block_size: 90112,
stage_length: 2,
deliver_stage_count: 2,
uniform_ib_generation: true,
max_ib_requests_per_peer: 1,
max_ib_size: 327680,
Expand Down
99 changes: 78 additions & 21 deletions sim-rs/sim-cli/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use sim_core::{
clock::Timestamp,
config::{NodeId, SimConfiguration},
events::Event,
model::{InputBlockId, TransactionId},
model::{EndorserBlockId, InputBlockId, TransactionId},
};
use tokio::{
fs::{self, File},
Expand All @@ -36,6 +36,8 @@ pub enum OutputFormat {
pub struct EventMonitor {
node_ids: Vec<NodeId>,
pool_ids: Vec<NodeId>,
stage_length: u64,
maximum_ib_age: u64,
events_source: mpsc::UnboundedReceiver<(Event, Timestamp)>,
output_path: Option<PathBuf>,
output_format: OutputFormat,
Expand All @@ -55,9 +57,13 @@ impl EventMonitor {
.filter(|p| p.stake > 0)
.map(|p| p.id)
.collect();
let stage_length = config.stage_length;
let maximum_ib_age = stage_length * config.deliver_stage_count;
Self {
node_ids,
pool_ids,
stage_length,
maximum_ib_age,
events_source,
output_path,
output_format: output_format.unwrap_or(OutputFormat::EventStream),
Expand All @@ -75,14 +81,19 @@ impl EventMonitor {
let mut seen_ibs: BTreeMap<NodeId, f64> = BTreeMap::new();
let mut txs_in_ib: BTreeMap<InputBlockId, f64> = BTreeMap::new();
let mut bytes_in_ib: BTreeMap<InputBlockId, f64> = BTreeMap::new();
let mut ibs_in_eb: BTreeMap<EndorserBlockId, f64> = BTreeMap::new();
let mut ibs_containing_tx: BTreeMap<TransactionId, f64> = BTreeMap::new();
let mut ebs_containing_ib: BTreeMap<InputBlockId, f64> = BTreeMap::new();
let mut pending_ibs: BTreeSet<InputBlockId> = BTreeSet::new();

let mut last_timestamp = Timestamp(Duration::from_secs(0));
let mut total_slots = 0u64;
let mut published_txs = 0u64;
let mut published_bytes = 0u64;
let mut generated_ibs = 0u64;
let mut empty_ibs = 0u64;
let mut expired_ibs = 0u64;
let mut generated_ebs = 0u64;

// Pretty print options for bytes
let pbo = Some(PrettyBytesOptions {
Expand Down Expand Up @@ -112,17 +123,28 @@ impl EventMonitor {
};
while let Some((event, time)) = self.events_source.recv().await {
last_timestamp = time;
if should_log_event(&event) {
let output_event = OutputEvent {
time,
message: event.clone(),
};
output.write(output_event).await?;
}
let output_event = OutputEvent {
time,
message: event.clone(),
};
output.write(output_event).await?;
match event {
Event::Slot { number } => {
info!("Slot {number} has begun.");
total_slots = number + 1;
if number % self.stage_length == 0 {
let Some(oldest_live_stage) = number.checked_sub(self.maximum_ib_age)
else {
continue;
};
pending_ibs.retain(|ib| {
if ib.slot < oldest_live_stage {
expired_ibs += 1;
return false;
}
true
});
}
}
Event::TransactionGenerated { id, bytes, .. } => {
txs.insert(
Expand Down Expand Up @@ -171,6 +193,10 @@ impl EventMonitor {
transactions,
} => {
generated_ibs += 1;
if transactions.is_empty() {
empty_ibs += 1;
}
pending_ibs.insert(header.id());
let mut ib_bytes = 0;
for tx_id in &transactions {
*txs_in_ib.entry(header.id()).or_default() += 1.;
Expand All @@ -184,20 +210,33 @@ impl EventMonitor {
}
*seen_ibs.entry(header.producer).or_default() += 1.;
info!(
"Pool {} generated an IB with {} transaction(s) in slot {} ({})",
"Pool {} generated an IB with {} transaction(s) in slot {} ({}).",
header.producer,
transactions.len(),
header.slot,
pretty_bytes(ib_bytes, pbo.clone()),
)
}
Event::EmptyInputBlockNotGenerated { .. } => {
empty_ibs += 1;
}
Event::InputBlockSent { .. } => {}
Event::InputBlockReceived { recipient, .. } => {
*seen_ibs.entry(recipient).or_default() += 1.;
}
Event::EndorserBlockGenerated { id, input_blocks } => {
generated_ebs += 1;
for ib_id in &input_blocks {
*ibs_in_eb.entry(id).or_default() += 1.0;
*ebs_containing_ib.entry(*ib_id).or_default() += 1.0;
pending_ibs.remove(ib_id);
}
info!(
"Pool {} generated an EB with {} IBs(s) in slot {}.",
id.producer,
input_blocks.len(),
id.slot,
)
}
Event::EndorserBlockSent { .. } => {}
Event::EndorserBlockReceived { .. } => {}
}
}

Expand Down Expand Up @@ -239,9 +278,13 @@ impl EventMonitor {
.values()
.filter(|tx| tx.included_in_ib.is_some())
.collect();
let empty_ebs = generated_ebs - ibs_in_eb.len() as u64;
let ibs_which_reached_eb = ebs_containing_ib.len();
let txs_per_ib = compute_stats(txs_in_ib.into_values());
let bytes_per_ib = compute_stats(bytes_in_ib.into_values());
let ibs_per_tx = compute_stats(ibs_containing_tx.into_values());
let ibs_per_eb = compute_stats(ebs_containing_ib.into_values());
let ebs_per_ib = compute_stats(ibs_in_eb.into_values());
let times_to_reach_ibs = compute_stats(txs_which_reached_ib.iter().map(|tx| {
let duration = tx.included_in_ib.unwrap() - tx.generated;
duration.as_secs_f64()
Expand All @@ -252,7 +295,7 @@ impl EventMonitor {
.map(|id| seen_ibs.get(id).copied().unwrap_or_default()),
);
info!(
"{generated_ibs} IB(s) were generated, and {empty_ibs} IB(s) were skipped because they were empty; on average there were {:.3} non-empty IB(s) per slot.",
"{generated_ibs} IB(s) were generated, on average {:.3} IB(s) per slot.",
generated_ibs as f64 / total_slots as f64
);
info!(
Expand All @@ -274,9 +317,10 @@ impl EventMonitor {
ibs_per_tx.mean, ibs_per_tx.std_dev,
);
info!(
"Each IB contained an average of {:.3} transaction(s) (stddev {:.3}) and an average of {} (stddev {:.3}).",
"Each IB contained an average of {:.3} transaction(s) (stddev {:.3}) and an average of {} (stddev {:.3}). {} IB(s) were empty.",
txs_per_ib.mean, txs_per_ib.std_dev,
pretty_bytes(bytes_per_ib.mean.trunc() as u64, pbo), bytes_per_ib.std_dev,
empty_ibs,
);
info!(
"Each transaction took an average of {:.3}s (stddev {:.3}) to be included in an IB.",
Expand All @@ -286,6 +330,26 @@ impl EventMonitor {
"Each node received an average of {:.3} IB(s) (stddev {:.3}).",
ibs_received.mean, ibs_received.std_dev,
);
info!(
"{generated_ebs} EB(s) were generated; on average there were {:.3} EB(s) per slot.",
generated_ebs as f64 / total_slots as f64
);
info!(
"Each EB contained an average of {:.3} IB(s) (stddev {:.3}). {} EB(s) were empty.",
ibs_per_eb.mean, ibs_per_eb.std_dev, empty_ebs
);
info!(
"Each IB was included in an average of {:.3} EB(s) (stddev {:.3}).",
ebs_per_ib.mean, ebs_per_ib.std_dev,
);
info!(
"{} out of {} IBs were included in at least one EB.",
ibs_which_reached_eb, generated_ibs,
);
info!(
"{} out of {} IBs expired before they reached an EB.",
expired_ibs, generated_ibs,
);
});
Ok(())
}
Expand All @@ -310,13 +374,6 @@ fn compute_stats<Iter: IntoIterator<Item = f64>>(data: Iter) -> Stats {
}
}

fn should_log_event(event: &Event) -> bool {
if matches!(event, Event::EmptyInputBlockNotGenerated { .. }) {
return false;
}
true
}

enum OutputTarget {
EventStream(BufWriter<File>),
SlotStream {
Expand Down
6 changes: 6 additions & 0 deletions sim-rs/sim-core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,11 @@ pub struct RawConfig {
pub links: Vec<RawLinkConfig>,
pub block_generation_probability: f64,
pub ib_generation_probability: f64,
pub eb_generation_probability: f64,
pub max_block_size: u64,
pub max_tx_size: u64,
pub stage_length: u64,
pub deliver_stage_count: u64,
pub uniform_ib_generation: bool,
pub max_ib_size: u64,
pub max_ib_requests_per_peer: usize,
Expand Down Expand Up @@ -114,9 +116,11 @@ impl From<RawConfig> for SimConfiguration {
links,
block_generation_probability: value.block_generation_probability,
ib_generation_probability: value.ib_generation_probability,
eb_generation_probability: value.eb_generation_probability,
max_block_size: value.max_block_size,
max_tx_size: value.max_tx_size,
stage_length: value.stage_length,
deliver_stage_count: value.deliver_stage_count,
uniform_ib_generation: value.uniform_ib_generation,
max_ib_size: value.max_ib_size,
max_ib_requests_per_peer: value.max_ib_requests_per_peer,
Expand All @@ -137,9 +141,11 @@ pub struct SimConfiguration {
pub links: Vec<LinkConfiguration>,
pub block_generation_probability: f64,
pub ib_generation_probability: f64,
pub eb_generation_probability: f64,
pub max_block_size: u64,
pub max_tx_size: u64,
pub stage_length: u64,
pub deliver_stage_count: u64,
pub uniform_ib_generation: bool,
pub max_ib_size: u64,
pub max_ib_requests_per_peer: usize,
Expand Down
55 changes: 44 additions & 11 deletions sim-rs/sim-core/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ use tracing::warn;
use crate::{
clock::{Clock, Timestamp},
config::NodeId,
model::{Block, InputBlock, InputBlockHeader, InputBlockId, Transaction, TransactionId},
model::{
Block, EndorserBlock, EndorserBlockId, InputBlock, InputBlockHeader, InputBlockId,
Transaction, TransactionId,
},
};

#[derive(Debug, Clone, Serialize)]
Expand Down Expand Up @@ -50,10 +53,6 @@ pub enum Event {
header: InputBlockHeader,
transactions: Vec<TransactionId>,
},
EmptyInputBlockNotGenerated {
#[serde(flatten)]
header: InputBlockHeader,
},
InputBlockSent {
#[serde(flatten)]
id: InputBlockId,
Expand All @@ -66,6 +65,23 @@ pub enum Event {
sender: NodeId,
recipient: NodeId,
},
EndorserBlockGenerated {
#[serde(flatten)]
id: EndorserBlockId,
input_blocks: Vec<InputBlockId>,
},
EndorserBlockSent {
#[serde(flatten)]
id: EndorserBlockId,
sender: NodeId,
recipient: NodeId,
},
EndorserBlockReceived {
#[serde(flatten)]
id: EndorserBlockId,
sender: NodeId,
recipient: NodeId,
},
}

#[derive(Clone)]
Expand Down Expand Up @@ -139,12 +155,6 @@ impl EventTracker {
});
}

pub fn track_empty_ib_not_generated(&self, header: &InputBlockHeader) {
self.send(Event::EmptyInputBlockNotGenerated {
header: header.clone(),
});
}

pub fn track_ib_sent(&self, id: InputBlockId, sender: NodeId, recipient: NodeId) {
self.send(Event::InputBlockSent {
id,
Expand All @@ -161,6 +171,29 @@ impl EventTracker {
});
}

pub fn track_eb_generated(&self, block: &EndorserBlock) {
self.send(Event::EndorserBlockGenerated {
id: block.id(),
input_blocks: block.ibs.clone(),
});
}

pub fn track_eb_sent(&self, id: EndorserBlockId, sender: NodeId, recipient: NodeId) {
self.send(Event::EndorserBlockSent {
id,
sender,
recipient,
});
}

pub fn track_eb_received(&self, id: EndorserBlockId, sender: NodeId, recipient: NodeId) {
self.send(Event::EndorserBlockReceived {
id,
sender,
recipient,
});
}

fn send(&self, event: Event) {
if self.sender.send((event, self.clock.now())).is_err() {
warn!("tried sending event after aggregator finished");
Expand Down
22 changes: 22 additions & 0 deletions sim-rs/sim-core/src/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,25 @@ impl InputBlock {
self.transactions.iter().map(|tx| tx.bytes).sum()
}
}

#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash, PartialOrd, Ord, Serialize)]
pub struct EndorserBlockId {
pub slot: u64,
pub producer: NodeId,
}

#[derive(Debug)]
pub struct EndorserBlock {
pub slot: u64,
pub producer: NodeId,
// The real impl will store hashes
pub ibs: Vec<InputBlockId>,
}
impl EndorserBlock {
pub fn id(&self) -> EndorserBlockId {
EndorserBlockId {
slot: self.slot,
producer: self.producer,
}
}
}
Loading

0 comments on commit 1582d87

Please sign in to comment.