Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dvir/papyrus consensus context #2111

Merged
merged 3 commits into from
Jun 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/papyrus_protobuf/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ use starknet_api::block::BlockHash;
use starknet_api::core::ContractAddress;
use starknet_api::transaction::Transaction;

#[derive(Debug, Clone, Eq, PartialEq)]
pub struct Proposal {
pub height: u64,
pub proposer: ContractAddress,
pub transactions: Vec<Transaction>,
pub block_hash: BlockHash,
}

#[derive(Debug, Clone, Eq, PartialEq)]
pub enum ConsensusMessage {
Proposal(Proposal),
}
3 changes: 3 additions & 0 deletions crates/sequencing/papyrus_consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ async-trait.workspace = true
futures.workspace = true
papyrus_network = { path = "../../papyrus_network", version = "0.4.0-dev.2" }
papyrus_protobuf = { path = "../../papyrus_protobuf", version = "0.4.0-dev.2" }
papyrus_storage = { path = "../../papyrus_storage", version = "0.4.0-dev.2" }
starknet_api.workspace = true
starknet-types-core.workspace = true
thiserror.workspace = true
Expand All @@ -19,3 +20,5 @@ tracing.workspace = true

[dev-dependencies]
mockall.workspace = true
papyrus_storage = { path = "../../papyrus_storage", features = ["testing"] }
test_utils = { path = "../../test_utils" }
6 changes: 4 additions & 2 deletions crates/sequencing/papyrus_consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use tracing::info;
use types::{ConsensusBlock, ConsensusContext, ConsensusError, ProposalInit, ValidatorId};

// TODO(matan): Remove dead code allowance at the end of milestone 1.
pub mod papyrus_consensus_context;
#[allow(dead_code)]
pub mod single_height_consensus;
#[cfg(test)]
Expand All @@ -28,8 +29,9 @@ where
{
let mut current_height = start_height;
loop {
info!("Starting consensus for height {start_height}");
let mut shc = SingleHeightConsensus::new(start_height, context.clone(), validator_id).await;
info!("Starting consensus for height {current_height}");
let mut shc =
SingleHeightConsensus::new(current_height, context.clone(), validator_id).await;

let block = if let Some(block) = shc.start().await? {
info!("Proposer flow height {current_height}");
Expand Down
199 changes: 199 additions & 0 deletions crates/sequencing/papyrus_consensus/src/papyrus_consensus_context.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
#[cfg(test)]
#[path = "papyrus_consensus_context_test.rs"]
mod papyrus_consensus_context_test;

use core::panic;
use std::sync::Arc;
use std::time::Duration;

use async_trait::async_trait;
use futures::channel::{mpsc, oneshot};
use futures::StreamExt;
use papyrus_protobuf::consensus::Proposal;
use papyrus_storage::body::BodyStorageReader;
use papyrus_storage::header::HeaderStorageReader;
use papyrus_storage::{StorageError, StorageReader};
use starknet_api::block::{BlockHash, BlockNumber};
use starknet_api::block_hash;
use starknet_api::transaction::Transaction;
use tokio::sync::Mutex;
use tracing::debug;

use crate::types::{ConsensusBlock, ConsensusContext, ConsensusError, ProposalInit, ValidatorId};

// TODO: add debug messages and span to the tasks.

#[derive(Debug, PartialEq, Eq, Clone)]
struct PapyrusConsensusBlock {
content: Vec<Transaction>,
id: BlockHash,
}

impl ConsensusBlock for PapyrusConsensusBlock {
type ProposalChunk = Transaction;
type ProposalIter = std::vec::IntoIter<Transaction>;

fn id(&self) -> BlockHash {
self.id
}

fn proposal_iter(&self) -> Self::ProposalIter {
self.content.clone().into_iter()
}
}

struct PapyrusConsensusContext {
storage_reader: StorageReader,
broadcast_sender: Arc<Mutex<mpsc::Sender<Proposal>>>,
}

impl PapyrusConsensusContext {
// TODO(dvir): remove the dead code attribute after we will use this function.
#[allow(dead_code)]
pub fn new(storage_reader: StorageReader, broadcast_sender: mpsc::Sender<Proposal>) -> Self {
Self { storage_reader, broadcast_sender: Arc::new(Mutex::new(broadcast_sender)) }
}
}

const CHANNEL_SIZE: usize = 5000;

#[async_trait]
impl ConsensusContext for PapyrusConsensusContext {
type Block = PapyrusConsensusBlock;

async fn build_proposal(
&self,
height: BlockNumber,
) -> (mpsc::Receiver<Transaction>, oneshot::Receiver<PapyrusConsensusBlock>) {
let (mut sender, receiver) = mpsc::channel(CHANNEL_SIZE);
let (fin_sender, fin_receiver) = oneshot::channel();

let storage_reader = self.storage_reader.clone();
tokio::spawn(async move {
// TODO(dvir): consider fix this for the case of reverts. If between the check that the
// block in storage and to getting the transaction was a revert this flow will fail.
wait_for_block(&storage_reader, height).await.expect("Failed to wait to block");

let txn = storage_reader.begin_ro_txn().expect("Failed to begin ro txn");
let transactions = txn
.get_block_transactions(height)
.expect("Get transactions from storage failed")
.expect(&format!(
"Block in {height} was not found in storage despite waiting for it"
));

for tx in transactions.clone() {
sender.try_send(tx).expect("Send should succeed");
}
sender.close_channel();

let block_hash = txn
.get_block_header(height)
.expect("Get header from storage failed")
.expect(&format!(
"Block in {height} was not found in storage despite waiting for it"
))
.block_hash;
fin_sender
.send(PapyrusConsensusBlock { content: transactions, id: block_hash })
.expect("Send should succeed");
});

(receiver, fin_receiver)
}

async fn validate_proposal(
&self,
height: BlockNumber,
mut content: mpsc::Receiver<Transaction>,
) -> oneshot::Receiver<PapyrusConsensusBlock> {
let (fin_sender, fin_receiver) = oneshot::channel();

let storage_reader = self.storage_reader.clone();
tokio::spawn(async move {
// TODO(dvir): consider fix this for the case of reverts. If between the check that the
// block in storage and to getting the transaction was a revert this flow will fail.
wait_for_block(&storage_reader, height).await.expect("Failed to wait to block");

let txn = storage_reader.begin_ro_txn().expect("Failed to begin ro txn");
let transactions = txn
.get_block_transactions(height)
.expect("Get transactions from storage failed")
.expect(&format!(
"Block in {height} was not found in storage despite waiting for it"
));

for tx in transactions.iter() {
let received_tx = content
.next()
.await
.expect(&format!("Not received transaction equals to {tx:?}"));
if tx != &received_tx {
panic!("Transactions are not equal. In storage: {tx:?}, : {received_tx:?}");
}
}

let block_hash = txn
.get_block_header(height)
.expect("Get header from storage failed")
.expect(&format!(
"Block in {height} was not found in storage despite waiting for it"
))
.block_hash;
fin_sender
.send(PapyrusConsensusBlock { content: transactions, id: block_hash })
.expect("Send should succeed");
});

fin_receiver
}

async fn validators(&self, _height: BlockNumber) -> Vec<ValidatorId> {
vec![0u8.into(), 1u8.into(), 2u8.into()]
}

fn proposer(&self, _validators: &Vec<ValidatorId>, _height: BlockNumber) -> ValidatorId {
0u8.into()
}

async fn propose(
&self,
init: ProposalInit,
mut content_receiver: mpsc::Receiver<Transaction>,
fin_receiver: oneshot::Receiver<BlockHash>,
) -> Result<(), ConsensusError> {
let broadcast_sender = self.broadcast_sender.clone();

tokio::spawn(async move {
let mut transactions = Vec::new();
while let Some(tx) = content_receiver.next().await {
transactions.push(tx);
}

let block_hash =
fin_receiver.await.expect("Failed to get block hash from fin receiver");
let proposal = Proposal {
height: init.height.0,
proposer: init.proposer,
transactions,
block_hash,
};

broadcast_sender.lock().await.try_send(proposal).expect("Failed to send proposal");
});
Ok(())
}
}

const SLEEP_BETWEEN_CHECK_FOR_BLOCK: Duration = Duration::from_secs(10);

async fn wait_for_block(
storage_reader: &StorageReader,
height: BlockNumber,
) -> Result<(), StorageError> {
while storage_reader.begin_ro_txn()?.get_body_marker()? <= height {
debug!("Waiting for block {height:?} to continue consensus");
tokio::time::sleep(SLEEP_BETWEEN_CHECK_FOR_BLOCK).await;
}
Ok(())
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
use futures::channel::{mpsc, oneshot};
use futures::StreamExt;
use papyrus_protobuf::consensus::Proposal;
use papyrus_storage::body::BodyStorageWriter;
use papyrus_storage::header::HeaderStorageWriter;
use papyrus_storage::test_utils::get_test_storage;
use starknet_api::block::Block;
use starknet_api::core::ContractAddress;
use starknet_api::transaction::Transaction;
use test_utils::get_test_block;

use crate::papyrus_consensus_context::PapyrusConsensusContext;
use crate::types::{ConsensusBlock, ConsensusContext, ProposalInit};

// TODO(dvir): consider adding tests for times, i.e, the calls are returned immediately and nothing
// happen until it should (for example, not creating a block before we have it in storage).

const TEST_CHANNEL_SIZE: usize = 10;

#[tokio::test]
async fn build_proposal() {
let (block, papyrus_context, _network_receiver) = test_setup();
let block_number = block.header.block_number;

let (mut proposal_receiver, fin_receiver) = papyrus_context.build_proposal(block_number).await;

let mut transactions = Vec::new();
while let Some(tx) = proposal_receiver.next().await {
transactions.push(tx);
}
assert_eq!(transactions, block.body.transactions);

let fin = fin_receiver.await.unwrap();
assert_eq!(fin.id(), block.header.block_hash);
assert_eq!(fin.proposal_iter().collect::<Vec::<Transaction>>(), block.body.transactions);
}

#[tokio::test]
async fn validate_proposal_success() {
let (block, papyrus_context, _network_receiver) = test_setup();
let block_number = block.header.block_number;

let (mut validate_sender, validate_receiver) = mpsc::channel(TEST_CHANNEL_SIZE);
for tx in block.body.transactions.clone() {
validate_sender.try_send(tx).unwrap();
}
validate_sender.close_channel();

let fin =
papyrus_context.validate_proposal(block_number, validate_receiver).await.await.unwrap();

assert_eq!(fin.id(), block.header.block_hash);
assert_eq!(fin.proposal_iter().collect::<Vec::<Transaction>>(), block.body.transactions);
}

#[tokio::test]
async fn validate_proposal_fail() {
let (block, papyrus_context, _network_receiver) = test_setup();
let block_number = block.header.block_number;

let different_block = get_test_block(4, None, None, None);
let (mut validate_sender, validate_receiver) = mpsc::channel(5000);
for tx in different_block.body.transactions.clone() {
validate_sender.try_send(tx).unwrap();
}
validate_sender.close_channel();

let fin = papyrus_context.validate_proposal(block_number, validate_receiver).await.await;
assert_eq!(fin, Err(oneshot::Canceled));
}

#[tokio::test]
async fn propose() {
let (block, papyrus_context, mut network_receiver) = test_setup();
let block_number = block.header.block_number;

let (mut content_sender, content_receiver) = mpsc::channel(TEST_CHANNEL_SIZE);
for tx in block.body.transactions.clone() {
content_sender.try_send(tx).unwrap();
}
content_sender.close_channel();

let (fin_sender, fin_receiver) = oneshot::channel();
fin_sender.send(block.header.block_hash).unwrap();

let proposal_init = ProposalInit { height: block_number, proposer: ContractAddress::default() };
papyrus_context.propose(proposal_init.clone(), content_receiver, fin_receiver).await.unwrap();

let expected_proposal = Proposal {
height: proposal_init.height.0,
proposer: proposal_init.proposer,
transactions: block.body.transactions,
block_hash: block.header.block_hash,
};

assert_eq!(network_receiver.next().await.unwrap(), expected_proposal);
}

fn test_setup() -> (Block, PapyrusConsensusContext, mpsc::Receiver<Proposal>) {
let ((storage_reader, mut storage_writer), _temp_dir) = get_test_storage();
let block = get_test_block(5, None, None, None);
let block_number = block.header.block_number;
storage_writer
.begin_rw_txn()
.unwrap()
.append_header(block_number, &block.header)
.unwrap()
.append_body(block_number, block.body.clone())
.unwrap()
.commit()
.unwrap();

let (network_sender, network_reciver) = mpsc::channel(TEST_CHANNEL_SIZE);
let papyrus_context = PapyrusConsensusContext::new(storage_reader.clone(), network_sender);
(block, papyrus_context, network_reciver)
}
2 changes: 1 addition & 1 deletion crates/sequencing/papyrus_consensus/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ pub trait ConsensusContext: Send + Sync {
) -> Result<(), ConsensusError>;
}

#[derive(PartialEq, Debug)]
#[derive(PartialEq, Debug, Clone)]
pub struct ProposalInit {
pub height: BlockNumber,
pub proposer: ValidatorId,
Expand Down
Loading