Skip to content

Commit

Permalink
Add Aggregator
Browse files Browse the repository at this point in the history
  • Loading branch information
balqaasem committed Mar 16, 2024
1 parent 06d7ade commit 350ebb0
Show file tree
Hide file tree
Showing 4 changed files with 374 additions and 0 deletions.
16 changes: 16 additions & 0 deletions blockchain/aggregator/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
[package]
name = "aggregator"
version = "0.6.0"
authors.workspace = true
edition.workspace = true
homepage.workspace = true
repository.workspace = true

[dependencies]
aleph-bft-rmc = { workspace = true }
aleph-bft-types = { workspace = true }
async-trait = { workspace = true }
futures = { workspace = true }
log = { workspace = true }
parity-scale-codec = { workspace = true, features = ["derive"] }
tokio = { workspace = true, features = [ "sync", "macros", "time", "rt-multi-thread" ] }
7 changes: 7 additions & 0 deletions blockchain/aggregator/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Aggregator

## Overview

This crate provides an AlephBFT Block Signature Aggregator

Synchronize with [Aleph Aggregator Clique](https://github.com/Cardinal-Cryptography/aleph-node/tree/main/aggregator)
56 changes: 56 additions & 0 deletions blockchain/aggregator/TODO.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# To-Do List

This list contains all TODOs in the Repo


<!-- TOC -->
- [To-Do List](#to-do-list)
- [1. Guidelines](#1-guidelines)
- [2. Contribution](#2-contribution)
- [3. Lists](#3-lists)
- [4. Tasks](#4-tasks)
<!-- /TOC -->


## 1. Guidelines

Note: Before you write a ToDo in this repo, please read the below guidelines carefully.

Whenever you write a ToDo, you need to follow this standard syntax

```rust
//TODO:[file_name:task_number] - task_details
```

for example:

```rust
//TODO:[TODO.md:0] - Add Todo Guidelines
```

Note > the `//TODO:[filename:task_number] - ` is what we call the `task_prefix`.

Whenever adding/writing a Task/ToDo, you need to describe the task on this list. Whenever you write a TODO in any file, add a reference to it here. Please make sure the task reference here is titled correctly and as detailed as possible\.

Whenever you `complete` a task/TODO from any file, please tick/complete its reference here and make sure you do it in the same `commit` that completes the task.

Whenever a task is cancelled (discontinued or not needed for w/e reason), please note in the details why it is cancelled, make sure you do it in the same `commit` that removes/cancels the TODO, and add this `-C` as a suffix to its `file_name` in the list here, for example:

```rust
//TODO:[TODO.md-C:0] - Add Todo Guidelines
```

## 2. Contribution

You can contribute to this list by completing tasks or by adding tasks(TODOs) that are currently in the repo but not on the list. You can also contribute by updating old tasks to the new Standard.

## 3. Lists

Each package/module/directory has its own `TODO.md`.

## 4. Tasks

These tasks are just for this file specifically.

- [x] [[TODO.md:0] - Add TODO.md File](TODO.md): Add a TODO.md file to organise TODOs in the repo.
- [x] [[TODO.md:1] - Add a `task_title`](/TODO.md/#tasks): Adda `task_title`.
295 changes: 295 additions & 0 deletions blockchain/aggregator/aggregator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,295 @@
// بِسْمِ اللَّهِ الرَّحْمَنِ الرَّحِيم

// This file is part of Setheum.

// Copyright (C) 2019-Present Setheum Labs.
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0

// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.

// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use std::{
collections::{HashMap, HashSet, VecDeque},
fmt::Debug,
time::Instant,
};

use aleph_bft_rmc::{DoublingDelayScheduler, MultiKeychain, Multisigned, Service as RmcService};
use aleph_bft_types::Recipient;
use log::{debug, info, trace, warn};

use crate::{Hash, ProtocolSink, RmcNetworkData, SignableHash};

#[derive(Debug, PartialEq, Eq)]
pub enum AggregatorError {
NoHashFound,
DuplicateHash,
}

pub enum IOError {
NetworkChannelClosed,
}

pub type AggregatorResult<R> = Result<R, AggregatorError>;
pub type IOResult = Result<(), IOError>;
type Rmc<H, MK, S, PMS> =
RmcService<SignableHash<H>, MK, DoublingDelayScheduler<RmcNetworkData<H, S, PMS>>>;

/// A wrapper around an `rmc::Multicast` returning the signed hashes in the order of the [`Multicast::start_multicast`] calls.
pub struct BlockSignatureAggregator<H: Hash + Copy, PMS> {
signatures: HashMap<H, PMS>,
hash_queue: VecDeque<H>,
started_hashes: HashSet<H>,
last_change: Instant,
}

impl<H: Copy + Hash, PMS> Default for BlockSignatureAggregator<H, PMS> {
fn default() -> Self {
Self::new()
}
}

impl<H: Copy + Hash, PMS> BlockSignatureAggregator<H, PMS> {
pub fn new() -> Self {
BlockSignatureAggregator {
signatures: HashMap::new(),
hash_queue: VecDeque::new(),
started_hashes: HashSet::new(),
last_change: Instant::now(),
}
}

fn on_start(&mut self, hash: H) -> AggregatorResult<()> {
if !self.started_hashes.insert(hash) {
return Err(AggregatorError::DuplicateHash);
}
if self.hash_queue.is_empty() {
self.last_change = Instant::now();
}
self.hash_queue.push_back(hash);

Ok(())
}

fn on_multisigned_hash(&mut self, hash: H, signature: PMS) {
debug!(target: "aleph-aggregator", "New multisigned_hash {:?}.", hash);
self.signatures.insert(hash, signature);
}

fn try_pop_hash(&mut self) -> AggregatorResult<(H, PMS)> {
match self.hash_queue.pop_front() {
Some(hash) => {
if let Some(multisignature) = self.signatures.remove(&hash) {
self.last_change = Instant::now();
Ok((hash, multisignature))
} else {
self.hash_queue.push_front(hash);
Err(AggregatorError::NoHashFound)
}
}
None => Err(AggregatorError::NoHashFound),
}
}

pub fn status_report(&self) {
let mut status = String::from("Block Signature Aggregator status report: ");

status.push_str(&format!(
"started hashes - {:?}; ",
self.started_hashes.len()
));

status.push_str(&format!(
"collected signatures - {:?}; ",
self.signatures.len()
));

status.push_str(&format!("hashes in queue - {:?}; ", self.hash_queue.len()));

if let Some(hash) = self.hash_queue.front() {
status.push_str(&format!(
"front of hash queue - {} for - {:.2} s; ",
hash,
Instant::now()
.saturating_duration_since(self.last_change)
.as_secs_f64()
));
}

info!(target: "aleph-aggregator", "{}", status);
}
}

pub struct IO<
H: Hash + Copy,
N: ProtocolSink<RmcNetworkData<H, MK::Signature, MK::PartialMultisignature>>,
MK: MultiKeychain,
> {
network: N,
rmc_service: Rmc<H, MK, MK::Signature, MK::PartialMultisignature>,
aggregator: BlockSignatureAggregator<H, MK::PartialMultisignature>,
multisigned_events: VecDeque<Multisigned<SignableHash<H>, MK>>,
}

impl<
H: Copy + Hash,
N: ProtocolSink<RmcNetworkData<H, MK::Signature, MK::PartialMultisignature>>,
MK: MultiKeychain,
> IO<H, N, MK>
{
pub fn new(
network: N,
rmc_service: Rmc<H, MK, MK::Signature, MK::PartialMultisignature>,
aggregator: BlockSignatureAggregator<H, MK::PartialMultisignature>,
) -> Self {
IO {
network,
rmc_service,
aggregator,
multisigned_events: VecDeque::new(),
}
}

pub fn status_report(&self) {
self.aggregator.status_report()
}

pub async fn start_aggregation(&mut self, hash: H) {
debug!(target: "aleph-aggregator", "Started aggregation for block hash {:?}", hash);
if let Err(AggregatorError::DuplicateHash) = self.aggregator.on_start(hash) {
debug!(target: "aleph-aggregator", "Aggregation already started for block hash {:?}, ignoring.", hash);
return;
}
if let Some(multisigned) = self.rmc_service.start_rmc(SignableHash::new(hash)) {
self.multisigned_events.push_back(multisigned);
}
}

async fn wait_for_next_signature(&mut self) -> IOResult {
loop {
if let Some(multisigned) = self.multisigned_events.pop_front() {
let unchecked = multisigned.into_unchecked();
let signature = unchecked.signature();
self.aggregator
.on_multisigned_hash(unchecked.into_signable().get_hash(), signature);
return Ok(());
}
tokio::select! {
message_from_rmc = self.rmc_service.next_message() => {
trace!(target: "aleph-aggregator", "Our rmc message {:?}.", message_from_rmc);
if let Err(e) = self.network.send(message_from_rmc, Recipient::Everyone) {
warn!(target: "aleph-aggregator", "failed broadcasting a message from rmc: {:?}", e);
}
}
message_from_network = self.network.next() => match message_from_network {
Some(message) => {
trace!(target: "aleph-aggregator", "Received message for rmc: {:?}", message);
if let Some(multisigned) = self.rmc_service.process_message(message) {
self.multisigned_events.push_back(multisigned);
}
},
None => {
// In case the network is down we can terminate (?).
return Err(IOError::NetworkChannelClosed);
}
}
}
}
}

pub async fn next_multisigned_hash(&mut self) -> Option<(H, MK::PartialMultisignature)> {
loop {
trace!(target: "aleph-aggregator", "Entering next_multisigned_hash loop.");
match self.aggregator.try_pop_hash() {
Ok(res) => {
return Some(res);
}
Err(AggregatorError::NoHashFound) => { /* ignored */ }
Err(AggregatorError::DuplicateHash) => {
warn!(
target: "aleph-aggregator",
"Unexpected aggregator exception in IO: DuplicateHash",
)
}
}

if self.wait_for_next_signature().await.is_err() {
warn!(target: "aleph-aggregator", "the network channel closed");
return None;
}
}
}
}

#[cfg(test)]
mod tests {
use std::{
fmt::{Debug, Display, Formatter},
hash::Hash,
};

use parity_scale_codec::{Decode, Encode};

use crate::aggregator::{AggregatorError, BlockSignatureAggregator};

#[derive(Hash, PartialEq, Eq, Clone, Copy, Encode, Decode, Debug)]
struct MockHash(pub [u8; 32]);

impl AsRef<[u8]> for MockHash {
fn as_ref(&self) -> &[u8] {
&self.0
}
}

impl Display for MockHash {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
std::fmt::Debug::fmt(&self.0, f)
}
}
type TestMultisignature = usize;
const TEST_SIGNATURE: TestMultisignature = 42;

fn build_aggregator() -> BlockSignatureAggregator<MockHash, TestMultisignature> {
BlockSignatureAggregator::new()
}

fn build_hash(b0: u8) -> MockHash {
let mut bytes = [0u8; 32];
bytes[0] = b0;
MockHash(bytes)
}

#[test]
fn returns_with_matching_multisigned_hash() {
let mut aggregator = build_aggregator();
let res = aggregator.on_start(build_hash(0));
assert!(res.is_ok());

aggregator.on_multisigned_hash(build_hash(0), TEST_SIGNATURE);

let res = aggregator.try_pop_hash();
assert!(res.is_ok());
}

#[test]
fn doesnt_return_without_matching_multisigned_hash() {
let mut aggregator = build_aggregator();
let res = aggregator.on_start(build_hash(0));
assert!(res.is_ok());

aggregator.on_multisigned_hash(build_hash(1), TEST_SIGNATURE);

let res = aggregator.try_pop_hash();
assert_eq!(res, Err(AggregatorError::NoHashFound));
}
}

0 comments on commit 350ebb0

Please sign in to comment.