Skip to content

Commit

Permalink
[feature] #3355: Standardize block API (#3884)
Browse files Browse the repository at this point in the history
Co-authored-by: Shanin Roman <[email protected]>
[fix] #3890: Fix validate topology on block sync

Signed-off-by: Marin Veršić <[email protected]>
  • Loading branch information
mversic authored Sep 19, 2023
1 parent d111396 commit 5db5e32
Show file tree
Hide file tree
Showing 72 changed files with 2,220 additions and 2,604 deletions.
369 changes: 182 additions & 187 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 3 additions & 5 deletions cli/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use warp::ws::WebSocket;
use crate::stream::{self, Sink, Stream};

/// Type of Stream error
pub type StreamError = stream::Error<<WebSocket as Stream<VersionedEventSubscriptionRequest>>::Err>;
pub type StreamError = stream::Error<<WebSocket as Stream<EventSubscriptionRequest>>::Err>;

/// Type of error for `Consumer`
#[derive(thiserror::Error, Debug)]
Expand Down Expand Up @@ -57,9 +57,7 @@ impl Consumer {
/// Can fail due to timeout or without message at websocket or during decoding request
#[iroha_futures::telemetry_future]
pub async fn new(mut stream: WebSocket) -> Result<Self> {
let subscription_request: VersionedEventSubscriptionRequest = stream.recv().await?;
let EventSubscriptionRequest(filter) = subscription_request.into_v1();

let EventSubscriptionRequest(filter) = stream.recv().await?;
Ok(Consumer { stream, filter })
}

Expand All @@ -74,7 +72,7 @@ impl Consumer {
}

self.stream
.send(VersionedEventMessage::from(EventMessage(event)))
.send(EventMessage(event))
.await
.map_err(Into::into)
}
Expand Down
4 changes: 2 additions & 2 deletions cli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,9 @@ impl NetworkRelay {

match msg {
SumeragiPacket(data) => {
self.sumeragi.incoming_message(data.into_v1());
self.sumeragi.incoming_message(*data);
}
BlockSync(data) => self.block_sync.message(data.into_v1()).await,
BlockSync(data) => self.block_sync.message(*data).await,
TransactionGossiper(data) => self.gossiper.gossip(*data).await,
Health => {}
}
Expand Down
24 changes: 10 additions & 14 deletions cli/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use core::{result::Result, time::Duration};

use futures::{SinkExt, StreamExt};
use iroha_version::prelude::*;
use parity_scale_codec::DecodeAll;

#[cfg(test)]
const TIMEOUT: Duration = Duration::from_millis(10_000);
Expand Down Expand Up @@ -34,7 +35,7 @@ where
/// Unexpected non-binary message received
NonBinaryMessage,
/// Error during versioned message decoding
IrohaVersion(#[from] iroha_version::error::Error),
Decode(#[from] parity_scale_codec::Error),
}

/// Represents message used by the stream
Expand All @@ -56,7 +57,7 @@ pub trait StreamMessage {
#[async_trait::async_trait]
pub trait Sink<S>: SinkExt<Self::Message, Error = Self::Err> + Unpin
where
S: EncodeVersioned + Send + Sync + 'static,
S: Encode + Send + Sync + 'static,
{
/// Error type returned by the sink
type Err: std::error::Error + Send + Sync + 'static;
Expand All @@ -68,10 +69,7 @@ where
async fn send(&mut self, message: S) -> Result<(), Error<Self::Err>> {
tokio::time::timeout(
TIMEOUT,
<Self as SinkExt<Self::Message>>::send(
self,
Self::Message::binary(message.encode_versioned()),
),
<Self as SinkExt<Self::Message>>::send(self, Self::Message::binary(message.encode())),
)
.await
.map_err(|_err| Error::SendTimeout)?
Expand All @@ -81,7 +79,7 @@ where

/// Trait for reading custom messages from stream
#[async_trait::async_trait]
pub trait Stream<R: DecodeVersioned>:
pub trait Stream<R: DecodeAll>:
StreamExt<Item = std::result::Result<Self::Message, Self::Err>> + Unpin
{
/// Error type returned by the stream
Expand All @@ -106,9 +104,7 @@ pub trait Stream<R: DecodeVersioned>:
return Err(Error::NonBinaryMessage);
}

Ok(R::decode_all_versioned(
subscription_request_message.as_bytes(),
)?)
Ok(R::decode_all(&mut subscription_request_message.as_bytes())?)
}
}

Expand All @@ -133,14 +129,14 @@ impl StreamMessage for warp::ws::Message {
#[async_trait::async_trait]
impl<M> Sink<M> for warp::ws::WebSocket
where
M: EncodeVersioned + Send + Sync + 'static,
M: Encode + Send + Sync + 'static,
{
type Err = warp::Error;
type Message = warp::ws::Message;
}

#[async_trait::async_trait]
impl<M: DecodeVersioned> Stream<M> for warp::ws::WebSocket {
impl<M: DecodeAll> Stream<M> for warp::ws::WebSocket {
type Err = warp::Error;
type Message = warp::ws::Message;
}
Expand All @@ -152,14 +148,14 @@ mod ws_client {
use super::*;

#[async_trait::async_trait]
impl<M: DecodeVersioned> Stream<M> for WsClient {
impl<M: DecodeAll> Stream<M> for WsClient {
type Err = warp::test::WsError;
type Message = warp::ws::Message;
}
#[async_trait::async_trait]
impl<M> Sink<M> for WsClient
where
M: EncodeVersioned + Send + Sync + 'static,
M: Encode + Send + Sync + 'static,
{
type Err = warp::test::WsError;
type Message = warp::ws::Message;
Expand Down
16 changes: 5 additions & 11 deletions cli/src/torii/routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,8 @@ use iroha_core::{
};
use iroha_data_model::{
block::{
stream::{
BlockMessage, BlockSubscriptionRequest, VersionedBlockMessage,
VersionedBlockSubscriptionRequest,
},
VersionedCommittedBlock,
stream::{BlockMessage, BlockSubscriptionRequest},
VersionedSignedBlock,
},
http::{BatchedResponse, VersionedBatchedResponse},
prelude::*,
Expand Down Expand Up @@ -278,8 +275,7 @@ async fn handle_post_configuration(

#[iroha_futures::telemetry_future]
async fn handle_blocks_stream(kura: Arc<Kura>, mut stream: WebSocket) -> eyre::Result<()> {
let subscription_request: VersionedBlockSubscriptionRequest = stream.recv().await?;
let BlockSubscriptionRequest(mut from_height) = subscription_request.into_v1();
let BlockSubscriptionRequest(mut from_height) = stream.recv().await?;

let mut interval = tokio::time::interval(std::time::Duration::from_millis(10));
loop {
Expand Down Expand Up @@ -307,10 +303,8 @@ async fn handle_blocks_stream(kura: Arc<Kura>, mut stream: WebSocket) -> eyre::R
_ = interval.tick() => {
if let Some(block) = kura.get_block_by_height(from_height.get()) {
stream
// TODO: to avoid clone `VersionedBlockMessage` could be split into sending and receiving parts
.send(VersionedBlockMessage::from(
BlockMessage(VersionedCommittedBlock::clone(&block)),
))
// TODO: to avoid clone `BlockMessage` could be split into sending and receiving parts
.send(BlockMessage(VersionedSignedBlock::clone(&block)))
.await?;
from_height = from_height.checked_add(1).expect("Maximum block height is achieved.");
}
Expand Down
8 changes: 4 additions & 4 deletions client/benches/tps/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,14 +132,15 @@ impl Config {
let block = blocks
.next()
.expect("The block is not yet in WSV. Need more sleep?");
let block = block.as_v1();
(
block
.payload()
.transactions
.iter()
.filter(|tx| tx.error.is_none())
.count(),
block
.payload()
.transactions
.iter()
.filter(|tx| tx.error.is_some())
Expand Down Expand Up @@ -173,7 +174,7 @@ impl MeasurerUnit {
let keypair = iroha_crypto::KeyPair::generate().expect("Failed to generate KeyPair.");

let account_id = account_id(self.name);
let alice_id = <Account as Identifiable>::Id::from_str("alice@wonderland")?;
let alice_id = AccountId::from_str("alice@wonderland")?;
let asset_id = asset_id(self.name);

let register_me = RegisterBox::new(Account::new(
Expand Down Expand Up @@ -238,8 +239,7 @@ impl MeasurerUnit {
let submitter = self.client.clone();
let interval_us_per_tx = self.config.interval_us_per_tx;
let instructions = self.instructions();
let alice_id = <Account as Identifiable>::Id::from_str("alice@wonderland")
.expect("Failed to parse account id");
let alice_id = AccountId::from_str("alice@wonderland").expect("Failed to parse account id");

let mut nonce = NonZeroU32::new(1).expect("Valid");

Expand Down
28 changes: 10 additions & 18 deletions client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use http_default::{AsyncWebSocketStream, WebSocketStream};
use iroha_config::{client::Configuration, torii::uri, GetConfiguration, PostConfiguration};
use iroha_crypto::{HashOf, KeyPair};
use iroha_data_model::{
block::VersionedCommittedBlock,
block::VersionedSignedBlock,
http::VersionedBatchedResponse,
isi::Instruction,
predicate::PredicateBox,
Expand Down Expand Up @@ -654,7 +654,7 @@ impl Client {
PipelineStatus::Rejected(ref reason) => {
return Err(reason.clone().into());
}
PipelineStatus::Committed => return Ok(hash.transmute()),
PipelineStatus::Committed => return Ok(hash),
}
}
}
Expand Down Expand Up @@ -1036,7 +1036,7 @@ impl Client {
pub fn listen_for_blocks(
&self,
height: NonZeroU64,
) -> Result<impl Iterator<Item = Result<VersionedCommittedBlock>>> {
) -> Result<impl Iterator<Item = Result<VersionedSignedBlock>>> {
blocks_api::BlockIterator::new(self.blocks_handler(height)?)
}

Expand Down Expand Up @@ -1448,10 +1448,7 @@ pub mod events_api {
url,
} = self;

let msg =
VersionedEventSubscriptionRequest::from(EventSubscriptionRequest::new(filter))
.encode_versioned();

let msg = EventSubscriptionRequest::new(filter).encode();
InitData::new(R::new(HttpMethod::GET, url).headers(headers), msg, Events)
}
}
Expand All @@ -1464,8 +1461,7 @@ pub mod events_api {
type Event = iroha_data_model::prelude::Event;

fn message(&self, message: Vec<u8>) -> Result<Self::Event> {
let event_socket_message =
VersionedEventMessage::decode_all_versioned(&message)?.into_v1();
let event_socket_message = EventMessage::decode_all(&mut message.as_slice())?;
Ok(event_socket_message.into())
}
}
Expand Down Expand Up @@ -1532,10 +1528,7 @@ mod blocks_api {
url,
} = self;

let msg =
VersionedBlockSubscriptionRequest::from(BlockSubscriptionRequest::new(height))
.encode_versioned();

let msg = BlockSubscriptionRequest::new(height).encode();
InitData::new(R::new(HttpMethod::GET, url).headers(headers), msg, Events)
}
}
Expand All @@ -1545,11 +1538,10 @@ mod blocks_api {
pub struct Events;

impl FlowEvents for Events {
type Event = iroha_data_model::block::VersionedCommittedBlock;
type Event = iroha_data_model::block::VersionedSignedBlock;

fn message(&self, message: Vec<u8>) -> Result<Self::Event> {
let block_msg = VersionedBlockMessage::decode_all_versioned(&message)?.into_v1();
Ok(block_msg.into())
Ok(BlockMessage::decode_all(&mut message.as_slice()).map(Into::into)?)
}
}
}
Expand Down Expand Up @@ -1610,7 +1602,7 @@ pub mod asset {
}

/// Construct a query to get an asset by its id
pub fn by_id(asset_id: impl Into<EvaluatesTo<<Asset as Identifiable>::Id>>) -> FindAssetById {
pub fn by_id(asset_id: impl Into<EvaluatesTo<AssetId>>) -> FindAssetById {
FindAssetById::new(asset_id)
}
}
Expand All @@ -1632,7 +1624,7 @@ pub mod block {

/// Construct a query to find block header by hash
pub fn header_by_hash(
hash: impl Into<EvaluatesTo<HashOf<VersionedCommittedBlock>>>,
hash: impl Into<EvaluatesTo<HashOf<VersionedSignedBlock>>>,
) -> FindBlockHeaderByHash {
FindBlockHeaderByHash::new(hash)
}
Expand Down
10 changes: 5 additions & 5 deletions client/tests/integration/asset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use super::Configuration;

#[test]
fn client_register_asset_should_add_asset_once_but_not_twice() -> Result<()> {
let (_rt, _peer, mut test_client) = <PeerBuilder>::new().with_port(10_620).start_with_runtime();
let (_rt, _peer, test_client) = <PeerBuilder>::new().with_port(10_620).start_with_runtime();
wait_for_genesis_committed(&[test_client.clone()], 0);

// Given
Expand Down Expand Up @@ -48,7 +48,7 @@ fn client_register_asset_should_add_asset_once_but_not_twice() -> Result<()> {

#[test]
fn unregister_asset_should_remove_asset_from_account() -> Result<()> {
let (_rt, _peer, mut test_client) = <PeerBuilder>::new().with_port(10_555).start_with_runtime();
let (_rt, _peer, test_client) = <PeerBuilder>::new().with_port(10_555).start_with_runtime();
wait_for_genesis_committed(&[test_client.clone()], 0);

// Given
Expand Down Expand Up @@ -87,7 +87,7 @@ fn unregister_asset_should_remove_asset_from_account() -> Result<()> {

#[test]
fn client_add_asset_quantity_to_existing_asset_should_increase_asset_amount() -> Result<()> {
let (_rt, _peer, mut test_client) = <PeerBuilder>::new().with_port(10_000).start_with_runtime();
let (_rt, _peer, test_client) = <PeerBuilder>::new().with_port(10_000).start_with_runtime();
wait_for_genesis_committed(&[test_client.clone()], 0);

// Given
Expand Down Expand Up @@ -120,7 +120,7 @@ fn client_add_asset_quantity_to_existing_asset_should_increase_asset_amount() ->

#[test]
fn client_add_big_asset_quantity_to_existing_asset_should_increase_asset_amount() -> Result<()> {
let (_rt, _peer, mut test_client) = <PeerBuilder>::new().with_port(10_510).start_with_runtime();
let (_rt, _peer, test_client) = <PeerBuilder>::new().with_port(10_510).start_with_runtime();
wait_for_genesis_committed(&[test_client.clone()], 0);

// Given
Expand Down Expand Up @@ -153,7 +153,7 @@ fn client_add_big_asset_quantity_to_existing_asset_should_increase_asset_amount(

#[test]
fn client_add_asset_with_decimal_should_increase_asset_amount() -> Result<()> {
let (_rt, _peer, mut test_client) = <PeerBuilder>::new().with_port(10_515).start_with_runtime();
let (_rt, _peer, test_client) = <PeerBuilder>::new().with_port(10_515).start_with_runtime();
wait_for_genesis_committed(&[test_client.clone()], 0);

// Given
Expand Down
Loading

0 comments on commit 5db5e32

Please sign in to comment.