Skip to content

Commit

Permalink
feat(consensus): add papyrus context
Browse files Browse the repository at this point in the history
  • Loading branch information
DvirYo-starkware committed Jun 16, 2024
1 parent 83e0ed6 commit fd3ecd3
Show file tree
Hide file tree
Showing 7 changed files with 332 additions and 1 deletion.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/papyrus_protobuf/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ use starknet_api::block::BlockHash;
use starknet_api::core::ContractAddress;
use starknet_api::transaction::Transaction;

#[derive(Debug, Clone, Eq, PartialEq)]
pub struct Proposal {
pub height: u64,
pub proposer: ContractAddress,
pub transactions: Vec<Transaction>,
pub block_hash: BlockHash,
}

#[derive(Debug, Clone, Eq, PartialEq)]
pub enum ConsensusMessage {
Proposal(Proposal),
}
3 changes: 3 additions & 0 deletions crates/sequencing/papyrus_consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ async-trait.workspace = true
futures.workspace = true
papyrus_network = { path = "../../papyrus_network", version = "0.4.0-dev.2" }
papyrus_protobuf = { path = "../../papyrus_protobuf", version = "0.4.0-dev.2" }
papyrus_storage = { path = "../../papyrus_storage", version = "0.4.0-dev.2" }
starknet_api.workspace = true
starknet-types-core.workspace = true
thiserror.workspace = true
Expand All @@ -19,3 +20,5 @@ tracing.workspace = true

[dev-dependencies]
mockall.workspace = true
papyrus_storage = { path = "../../papyrus_storage", features = ["testing"] }
test_utils = { path = "../../test_utils" }
1 change: 1 addition & 0 deletions crates/sequencing/papyrus_consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub mod single_height_consensus;
pub(crate) mod test_utils;
#[allow(dead_code)]
pub mod types;
pub mod papyrus_consensus_context;

// TODO(dvir): add test for this.
pub async fn run_consensus<BlockT: ConsensusBlock>(
Expand Down
204 changes: 204 additions & 0 deletions crates/sequencing/papyrus_consensus/src/papyrus_consensus_context.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
#[cfg(test)]
#[path = "papyrus_context_test.rs"]
mod papyrus_context_test;

use core::panic;
use std::sync::Arc;
use std::time::Duration;

use async_trait::async_trait;
use futures::channel::{mpsc, oneshot};
use futures::StreamExt;
use papyrus_protobuf::consensus::Proposal;
use papyrus_storage::body::BodyStorageReader;
use papyrus_storage::header::HeaderStorageReader;
use papyrus_storage::{StorageError, StorageReader};
use starknet_api::block::{BlockHash, BlockNumber};
use starknet_api::transaction::Transaction;
use tokio::sync::Mutex;
use tracing::debug;

use crate::types::{ConsensusBlock, ConsensusContext, ConsensusError, ProposalInit, ValidatorId};

// TODO: add debug messages and span to the tasks.

#[derive(Debug, PartialEq, Eq, Clone)]
struct PapyrusConsensusBlock {
content: Vec<Transaction>,
id: BlockHash,
}

impl ConsensusBlock for PapyrusConsensusBlock {
type ProposalChunk = Transaction;
type ProposalIter = std::vec::IntoIter<Transaction>;

fn id(&self) -> BlockHash {
self.id
}

fn proposal_iter(&self) -> Self::ProposalIter {
self.content.clone().into_iter()
}
}

struct PapyrusConsensusContext {
storage_reader: StorageReader,
broadcast_sender: Arc<Mutex<mpsc::Sender<Proposal>>>,
}

impl PapyrusConsensusContext {
// TODO(dvir): remove the dead code attribute after we will use this function.
#[allow(dead_code)]
pub fn new(storage_reader: StorageReader, broadcast_sender: mpsc::Sender<Proposal>) -> Self {
Self { storage_reader, broadcast_sender: Arc::new(Mutex::new(broadcast_sender)) }
}
}

const CHANNEL_SIZE: usize = 5000;

#[async_trait]
impl ConsensusContext for PapyrusConsensusContext {
type Block = PapyrusConsensusBlock;

async fn build_proposal(
&self,
height: BlockNumber,
) -> (mpsc::Receiver<Transaction>, oneshot::Receiver<PapyrusConsensusBlock>) {
let (mut sender, receiver) = mpsc::channel(CHANNEL_SIZE);
let (fin_sender, fin_receiver) = oneshot::channel();

let storage_reader = self.storage_reader.clone();
tokio::spawn(async move {
// TODO(dvir): consider fix this for the case of reverts. If between the check that the
// block in storage and to getting the transaction was a revert this flow will fail.
wait_to_block(&storage_reader, height).await.expect("Failed to wait to block");

let txn = storage_reader.begin_ro_txn().expect("Failed to begin ro txn");
let transactions = txn
.get_block_transactions(height)
.expect("Get transactions from storage failed")
.expect(&format!(
"Block in {height} was no found in storage although waiting for it"
));

for tx in transactions.clone() {
sender.try_send(tx).expect("Send should succeed");
}
sender.close_channel();

fin_sender
.send(PapyrusConsensusBlock {
content: transactions,
id: txn
.get_block_header(height)
.expect("Get header from storage failed")
.expect(&format!(
"Block in {height} was no found in storage although waiting for it"
))
.block_hash,
})
.expect("Send should succeed");
});

(receiver, fin_receiver)
}

async fn validate_proposal(
&self,
height: BlockNumber,
mut content: mpsc::Receiver<Transaction>,
) -> oneshot::Receiver<PapyrusConsensusBlock> {
let (fin_sender, fin_receiver) = oneshot::channel();

let storage_reader = self.storage_reader.clone();
tokio::spawn(async move {
// TODO(dvir): consider fix this for the case of reverts. If between the check that the
// block in storage and to getting the transaction was a revert this flow will fail.
wait_to_block(&storage_reader, height).await.expect("Failed to wait to block");

let txn = storage_reader.begin_ro_txn().expect("Failed to begin ro txn");
let transactions = txn
.get_block_transactions(height)
.expect("Get transactions from storage failed")
.expect(&format!(
"Block in {height} was no found in storage although waiting for it"
));

for tx in transactions.iter() {
let recived_tx = content
.next()
.await
.expect(&format!("Not recived transaction equals to {tx:?}"));
if tx != &recived_tx {
panic!(
"Transactions are not equal. In storage: {tx:?}, recived: {recived_tx:?}"
);
}
}

fin_sender
.send(PapyrusConsensusBlock {
content: transactions,
id: txn
.get_block_header(height)
.expect("Get header from storage failed")
.expect(&format!(
"Block in {height} was no found in storage although waiting for it"
))
.block_hash,
})
.expect("Send should succeed");
});

fin_receiver
}

async fn validators(&self, _height: BlockNumber) -> Vec<ValidatorId> {
vec![0u8.into(), 1u8.into(), 2u8.into()]
}

fn proposer(&self, _validators: &Vec<ValidatorId>, _height: BlockNumber) -> ValidatorId {
0u8.into()
}

async fn propose(
&self,
init: ProposalInit,
mut content_receiver: mpsc::Receiver<Transaction>,
fin_receiver: oneshot::Receiver<BlockHash>,
) -> Result<(), ConsensusError> {
let broadcast_sender = self.broadcast_sender.clone();

tokio::spawn(async move {
let mut transactions = Vec::new();
while let Some(tx) = content_receiver.next().await {
transactions.push(tx);
}

let block_hash =
fin_receiver.await.expect("Failed to get block hash from fin receiver");
let proposal = Proposal {
height: init.height.0,
proposer: init.proposer,
transactions,
block_hash,
};

broadcast_sender.lock().await.try_send(proposal).expect("Failed to send proposal");
});
Ok(())
}
}

const SLEEP_BETWEEN_CHECK_FOR_BLOCK: Duration = Duration::from_secs(10);

async fn wait_to_block(
storage_reader: &StorageReader,
height: BlockNumber,
) -> Result<(), StorageError> {
while storage_reader.begin_ro_txn()?.get_body_marker()? <= height {
debug!("Waiting for block {height:?} to continue consensus");
tokio::time::sleep(SLEEP_BETWEEN_CHECK_FOR_BLOCK).await;
}
Ok(())
}
119 changes: 119 additions & 0 deletions crates/sequencing/papyrus_consensus/src/papyrus_context_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
use futures::channel::{mpsc, oneshot};
use futures::StreamExt;
use papyrus_protobuf::consensus::Proposal;
use papyrus_storage::body::BodyStorageWriter;
use papyrus_storage::header::HeaderStorageWriter;
use papyrus_storage::test_utils::get_test_storage;
use papyrus_storage::StorageReader;
use starknet_api::block::Block;
use starknet_api::core::ContractAddress;
use starknet_api::transaction::Transaction;
use test_utils::get_test_block;

use crate::papyrus_consensus_context::PapyrusConsensusContext;
use crate::types::{ConsensusBlock, ConsensusContext, ProposalInit};

// TODO(dvir): consider adding tests for times, i.e, the calls are returned immediately and nothing
// happen until it should (for example, not creating a block before we have it in storage).

const TEST_CHANNEL_SIZE: usize = 10;

#[tokio::test]
async fn build_proposal() {
let (block, papyrus_context, _network_receiver) = get_storage_with_block_and_papyrus_context();
let block_number = block.header.block_number;

let (mut proposal_receiver, fin_receiver) = papyrus_context.build_proposal(block_number).await;

let mut transactions = Vec::new();
while let Some(tx) = proposal_receiver.next().await {
transactions.push(tx);
}
assert_eq!(transactions, block.body.transactions);

let fin = fin_receiver.await.unwrap();
assert_eq!(fin.id(), block.header.block_hash);
assert_eq!(fin.proposal_iter().collect::<Vec::<Transaction>>(), block.body.transactions);
}

#[tokio::test]
async fn validate_proposal_success() {
let (block, papyrus_context, _network_receiver) = get_storage_with_block_and_papyrus_context();
let block_number = block.header.block_number;

let (mut validate_sender, validate_receiver) = mpsc::channel(TEST_CHANNEL_SIZE);
for tx in block.body.transactions.clone() {
validate_sender.try_send(tx).unwrap();
}
validate_sender.close_channel();

let fin =
papyrus_context.validate_proposal(block_number, validate_receiver).await.await.unwrap();

assert_eq!(fin.id(), block.header.block_hash);
assert_eq!(fin.proposal_iter().collect::<Vec::<Transaction>>(), block.body.transactions);
}

#[tokio::test]
async fn validate_proposal_fail() {
let (block, papyrus_context, _network_receiver) = get_storage_with_block_and_papyrus_context();
let block_number = block.header.block_number;

let different_block = get_test_block(4, None, None, None);
let (mut validate_sender, validate_receiver) = mpsc::channel(5000);
for tx in different_block.body.transactions.clone() {
validate_sender.try_send(tx).unwrap();
}
validate_sender.close_channel();

let fin = papyrus_context.validate_proposal(block_number, validate_receiver).await.await;
assert_eq!(fin, Err(oneshot::Canceled));
}

#[tokio::test]
async fn propose() {
let (block, papyrus_context, mut network_receiver) =
get_storage_with_block_and_papyrus_context();
let block_number = block.header.block_number;

let (mut content_sender, content_receiver) = mpsc::channel(TEST_CHANNEL_SIZE);
for tx in block.body.transactions.clone() {
content_sender.try_send(tx).unwrap();
}
content_sender.close_channel();

let (fin_sender, fin_receiver) = oneshot::channel();
fin_sender.send(block.header.block_hash).unwrap();

let proposal_init = ProposalInit { height: block_number, proposer: ContractAddress::default() };
papyrus_context.propose(proposal_init.clone(), content_receiver, fin_receiver).await.unwrap();

let expected_proposal = Proposal {
height: proposal_init.height.0,
proposer: proposal_init.proposer,
transactions: block.body.transactions,
block_hash: block.header.block_hash,
};

assert_eq!(network_receiver.next().await.unwrap(), expected_proposal);
}

fn get_storage_with_block_and_papyrus_context()
-> (Block, PapyrusConsensusContext, mpsc::Receiver<Proposal>) {
let ((storage_reader, mut storage_writer), _temp_dir) = get_test_storage();
let block = get_test_block(5, None, None, None);
let block_number = block.header.block_number;
storage_writer
.begin_rw_txn()
.unwrap()
.append_header(block_number, &block.header)
.unwrap()
.append_body(block_number, block.body.clone())
.unwrap()
.commit()
.unwrap();

let (network_sender, network_reciver) = mpsc::channel(TEST_CHANNEL_SIZE);
let papyrus_context = PapyrusConsensusContext::new(storage_reader.clone(), network_sender);
(block, papyrus_context, network_reciver)
}
2 changes: 1 addition & 1 deletion crates/sequencing/papyrus_consensus/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ pub trait ConsensusContext: Send + Sync {
) -> Result<(), ConsensusError>;
}

#[derive(PartialEq, Debug)]
#[derive(PartialEq, Debug, Clone)]
pub struct ProposalInit {
pub height: BlockNumber,
pub proposer: ValidatorId,
Expand Down

0 comments on commit fd3ecd3

Please sign in to comment.