From 350ebb02cf1c36e312c35abfdb28c3140ce160ee Mon Sep 17 00:00:00 2001 From: "Muhammad-Jibril B.A. (Khalifa MBA)" Date: Sat, 16 Mar 2024 20:57:15 +0800 Subject: [PATCH] Add Aggregator --- blockchain/aggregator/Cargo.toml | 16 ++ blockchain/aggregator/README.md | 7 + blockchain/aggregator/TODO.md | 56 ++++++ blockchain/aggregator/aggregator.rs | 295 ++++++++++++++++++++++++++++ 4 files changed, 374 insertions(+) create mode 100644 blockchain/aggregator/Cargo.toml create mode 100644 blockchain/aggregator/README.md create mode 100644 blockchain/aggregator/TODO.md create mode 100644 blockchain/aggregator/aggregator.rs diff --git a/blockchain/aggregator/Cargo.toml b/blockchain/aggregator/Cargo.toml new file mode 100644 index 00000000..c7002639 --- /dev/null +++ b/blockchain/aggregator/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "aggregator" +version = "0.6.0" +authors.workspace = true +edition.workspace = true +homepage.workspace = true +repository.workspace = true + +[dependencies] +aleph-bft-rmc = { workspace = true } +aleph-bft-types = { workspace = true } +async-trait = { workspace = true } +futures = { workspace = true } +log = { workspace = true } +parity-scale-codec = { workspace = true, features = ["derive"] } +tokio = { workspace = true, features = [ "sync", "macros", "time", "rt-multi-thread" ] } diff --git a/blockchain/aggregator/README.md b/blockchain/aggregator/README.md new file mode 100644 index 00000000..a8c74b0d --- /dev/null +++ b/blockchain/aggregator/README.md @@ -0,0 +1,7 @@ +# Aggregator + +## Overview + +This crate provides an AlephBFT Block Signature Aggregator + +Synchronize with [Aleph Aggregator Clique](https://github.com/Cardinal-Cryptography/aleph-node/tree/main/aggregator) diff --git a/blockchain/aggregator/TODO.md b/blockchain/aggregator/TODO.md new file mode 100644 index 00000000..fb25575a --- /dev/null +++ b/blockchain/aggregator/TODO.md @@ -0,0 +1,56 @@ +# To-Do List + +This list contains all TODOs in the Repo + + + +- [To-Do List](#to-do-list) + - [1. Guidelines](#1-guidelines) + - [2. Contribution](#2-contribution) + - [3. Lists](#3-lists) + - [4. Tasks](#4-tasks) + + + +## 1. Guidelines + +Note: Before you write a ToDo in this repo, please read the below guidelines carefully. + +Whenever you write a ToDo, you need to follow this standard syntax + +```rust +//TODO:[file_name:task_number] - task_details +``` + +for example: + +```rust +//TODO:[TODO.md:0] - Add Todo Guidelines +``` + +Note > the `//TODO:[filename:task_number] - ` is what we call the `task_prefix`. + +Whenever adding/writing a Task/ToDo, you need to describe the task on this list. Whenever you write a TODO in any file, add a reference to it here. Please make sure the task reference here is titled correctly and as detailed as possible\. + +Whenever you `complete` a task/TODO from any file, please tick/complete its reference here and make sure you do it in the same `commit` that completes the task. + +Whenever a task is cancelled (discontinued or not needed for w/e reason), please note in the details why it is cancelled, make sure you do it in the same `commit` that removes/cancels the TODO, and add this `-C` as a suffix to its `file_name` in the list here, for example: + +```rust +//TODO:[TODO.md-C:0] - Add Todo Guidelines +``` + +## 2. Contribution + +You can contribute to this list by completing tasks or by adding tasks(TODOs) that are currently in the repo but not on the list. You can also contribute by updating old tasks to the new Standard. + +## 3. Lists + +Each package/module/directory has its own `TODO.md`. + +## 4. Tasks + +These tasks are just for this file specifically. + +- [x] [[TODO.md:0] - Add TODO.md File](TODO.md): Add a TODO.md file to organise TODOs in the repo. +- [x] [[TODO.md:1] - Add a `task_title`](/TODO.md/#tasks): Adda `task_title`. diff --git a/blockchain/aggregator/aggregator.rs b/blockchain/aggregator/aggregator.rs new file mode 100644 index 00000000..649cd425 --- /dev/null +++ b/blockchain/aggregator/aggregator.rs @@ -0,0 +1,295 @@ +// بِسْمِ اللَّهِ الرَّحْمَنِ الرَّحِيم + +// This file is part of Setheum. + +// Copyright (C) 2019-Present Setheum Labs. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use std::{ + collections::{HashMap, HashSet, VecDeque}, + fmt::Debug, + time::Instant, +}; + +use aleph_bft_rmc::{DoublingDelayScheduler, MultiKeychain, Multisigned, Service as RmcService}; +use aleph_bft_types::Recipient; +use log::{debug, info, trace, warn}; + +use crate::{Hash, ProtocolSink, RmcNetworkData, SignableHash}; + +#[derive(Debug, PartialEq, Eq)] +pub enum AggregatorError { + NoHashFound, + DuplicateHash, +} + +pub enum IOError { + NetworkChannelClosed, +} + +pub type AggregatorResult = Result; +pub type IOResult = Result<(), IOError>; +type Rmc = + RmcService, MK, DoublingDelayScheduler>>; + +/// A wrapper around an `rmc::Multicast` returning the signed hashes in the order of the [`Multicast::start_multicast`] calls. +pub struct BlockSignatureAggregator { + signatures: HashMap, + hash_queue: VecDeque, + started_hashes: HashSet, + last_change: Instant, +} + +impl Default for BlockSignatureAggregator { + fn default() -> Self { + Self::new() + } +} + +impl BlockSignatureAggregator { + pub fn new() -> Self { + BlockSignatureAggregator { + signatures: HashMap::new(), + hash_queue: VecDeque::new(), + started_hashes: HashSet::new(), + last_change: Instant::now(), + } + } + + fn on_start(&mut self, hash: H) -> AggregatorResult<()> { + if !self.started_hashes.insert(hash) { + return Err(AggregatorError::DuplicateHash); + } + if self.hash_queue.is_empty() { + self.last_change = Instant::now(); + } + self.hash_queue.push_back(hash); + + Ok(()) + } + + fn on_multisigned_hash(&mut self, hash: H, signature: PMS) { + debug!(target: "aleph-aggregator", "New multisigned_hash {:?}.", hash); + self.signatures.insert(hash, signature); + } + + fn try_pop_hash(&mut self) -> AggregatorResult<(H, PMS)> { + match self.hash_queue.pop_front() { + Some(hash) => { + if let Some(multisignature) = self.signatures.remove(&hash) { + self.last_change = Instant::now(); + Ok((hash, multisignature)) + } else { + self.hash_queue.push_front(hash); + Err(AggregatorError::NoHashFound) + } + } + None => Err(AggregatorError::NoHashFound), + } + } + + pub fn status_report(&self) { + let mut status = String::from("Block Signature Aggregator status report: "); + + status.push_str(&format!( + "started hashes - {:?}; ", + self.started_hashes.len() + )); + + status.push_str(&format!( + "collected signatures - {:?}; ", + self.signatures.len() + )); + + status.push_str(&format!("hashes in queue - {:?}; ", self.hash_queue.len())); + + if let Some(hash) = self.hash_queue.front() { + status.push_str(&format!( + "front of hash queue - {} for - {:.2} s; ", + hash, + Instant::now() + .saturating_duration_since(self.last_change) + .as_secs_f64() + )); + } + + info!(target: "aleph-aggregator", "{}", status); + } +} + +pub struct IO< + H: Hash + Copy, + N: ProtocolSink>, + MK: MultiKeychain, +> { + network: N, + rmc_service: Rmc, + aggregator: BlockSignatureAggregator, + multisigned_events: VecDeque, MK>>, +} + +impl< + H: Copy + Hash, + N: ProtocolSink>, + MK: MultiKeychain, + > IO +{ + pub fn new( + network: N, + rmc_service: Rmc, + aggregator: BlockSignatureAggregator, + ) -> Self { + IO { + network, + rmc_service, + aggregator, + multisigned_events: VecDeque::new(), + } + } + + pub fn status_report(&self) { + self.aggregator.status_report() + } + + pub async fn start_aggregation(&mut self, hash: H) { + debug!(target: "aleph-aggregator", "Started aggregation for block hash {:?}", hash); + if let Err(AggregatorError::DuplicateHash) = self.aggregator.on_start(hash) { + debug!(target: "aleph-aggregator", "Aggregation already started for block hash {:?}, ignoring.", hash); + return; + } + if let Some(multisigned) = self.rmc_service.start_rmc(SignableHash::new(hash)) { + self.multisigned_events.push_back(multisigned); + } + } + + async fn wait_for_next_signature(&mut self) -> IOResult { + loop { + if let Some(multisigned) = self.multisigned_events.pop_front() { + let unchecked = multisigned.into_unchecked(); + let signature = unchecked.signature(); + self.aggregator + .on_multisigned_hash(unchecked.into_signable().get_hash(), signature); + return Ok(()); + } + tokio::select! { + message_from_rmc = self.rmc_service.next_message() => { + trace!(target: "aleph-aggregator", "Our rmc message {:?}.", message_from_rmc); + if let Err(e) = self.network.send(message_from_rmc, Recipient::Everyone) { + warn!(target: "aleph-aggregator", "failed broadcasting a message from rmc: {:?}", e); + } + } + message_from_network = self.network.next() => match message_from_network { + Some(message) => { + trace!(target: "aleph-aggregator", "Received message for rmc: {:?}", message); + if let Some(multisigned) = self.rmc_service.process_message(message) { + self.multisigned_events.push_back(multisigned); + } + }, + None => { + // In case the network is down we can terminate (?). + return Err(IOError::NetworkChannelClosed); + } + } + } + } + } + + pub async fn next_multisigned_hash(&mut self) -> Option<(H, MK::PartialMultisignature)> { + loop { + trace!(target: "aleph-aggregator", "Entering next_multisigned_hash loop."); + match self.aggregator.try_pop_hash() { + Ok(res) => { + return Some(res); + } + Err(AggregatorError::NoHashFound) => { /* ignored */ } + Err(AggregatorError::DuplicateHash) => { + warn!( + target: "aleph-aggregator", + "Unexpected aggregator exception in IO: DuplicateHash", + ) + } + } + + if self.wait_for_next_signature().await.is_err() { + warn!(target: "aleph-aggregator", "the network channel closed"); + return None; + } + } + } +} + +#[cfg(test)] +mod tests { + use std::{ + fmt::{Debug, Display, Formatter}, + hash::Hash, + }; + + use parity_scale_codec::{Decode, Encode}; + + use crate::aggregator::{AggregatorError, BlockSignatureAggregator}; + + #[derive(Hash, PartialEq, Eq, Clone, Copy, Encode, Decode, Debug)] + struct MockHash(pub [u8; 32]); + + impl AsRef<[u8]> for MockHash { + fn as_ref(&self) -> &[u8] { + &self.0 + } + } + + impl Display for MockHash { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + std::fmt::Debug::fmt(&self.0, f) + } + } + type TestMultisignature = usize; + const TEST_SIGNATURE: TestMultisignature = 42; + + fn build_aggregator() -> BlockSignatureAggregator { + BlockSignatureAggregator::new() + } + + fn build_hash(b0: u8) -> MockHash { + let mut bytes = [0u8; 32]; + bytes[0] = b0; + MockHash(bytes) + } + + #[test] + fn returns_with_matching_multisigned_hash() { + let mut aggregator = build_aggregator(); + let res = aggregator.on_start(build_hash(0)); + assert!(res.is_ok()); + + aggregator.on_multisigned_hash(build_hash(0), TEST_SIGNATURE); + + let res = aggregator.try_pop_hash(); + assert!(res.is_ok()); + } + + #[test] + fn doesnt_return_without_matching_multisigned_hash() { + let mut aggregator = build_aggregator(); + let res = aggregator.on_start(build_hash(0)); + assert!(res.is_ok()); + + aggregator.on_multisigned_hash(build_hash(1), TEST_SIGNATURE); + + let res = aggregator.try_pop_hash(); + assert_eq!(res, Err(AggregatorError::NoHashFound)); + } +}