Skip to content
This repository has been archived by the owner on Apr 17, 2019. It is now read-only.

Commit

Permalink
Fix rejected status emitting (#2083)
Browse files Browse the repository at this point in the history
Signed-off-by: Andrei Lebedev <[email protected]>
  • Loading branch information
lebdron authored Feb 11, 2019
1 parent f30497f commit 228c49c
Show file tree
Hide file tree
Showing 10 changed files with 73 additions and 82 deletions.
2 changes: 1 addition & 1 deletion irohad/torii/impl/command_service_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ namespace iroha {
shared_model::interface::StatefulFailedTxResponse,
shared_model::interface::CommittedTxResponse,
shared_model::interface::MstExpiredResponse,
shared_model::interface::RejectTxResponse>::value;
shared_model::interface::RejectedTxResponse>::value;

rxcpp::observable<
std::shared_ptr<shared_model::interface::TransactionResponse>>
Expand Down
41 changes: 23 additions & 18 deletions irohad/torii/processor/impl/transaction_processor_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ namespace iroha {

// notify about failed txs
const auto &errors = proposal_and_errors->rejected_transactions;
std::lock_guard<std::mutex> lock(notifier_mutex_);
for (const auto &tx_error : errors) {
log_->info(composeErrorMessage(tx_error));
this->publishStatus(TxStatusType::kStatefulFailed,
Expand All @@ -75,7 +74,7 @@ namespace iroha {
// notify about success txs
for (const auto &successful_tx :
proposal_and_errors->verified_proposal->transactions()) {
log_->info("on stateful validation success: {}",
log_->info("VerifiedProposalCreatorEvent StatefulValid: {}",
successful_tx.hash().hex());
this->publishStatus(TxStatusType::kStatefulValid,
successful_tx.hash());
Expand All @@ -85,27 +84,29 @@ namespace iroha {
// commit transactions
pcs_->on_commit().subscribe(
[this](synchronizer::SynchronizationEvent sync_event) {
bool has_at_least_one_committed = false;
sync_event.synced_blocks.subscribe(
// on next
[this](auto model_block) {
current_txs_hashes_.reserve(
model_block->transactions().size());
std::transform(model_block->transactions().begin(),
model_block->transactions().end(),
std::back_inserter(current_txs_hashes_),
[](const auto &tx) { return tx.hash(); });
[this, &has_at_least_one_committed](auto model_block) {
for (const auto &tx : model_block->transactions()) {
const auto &hash = tx.hash();
log_->info("SynchronizationEvent Committed: {}",
hash.hex());
this->publishStatus(TxStatusType::kCommitted, hash);
has_at_least_one_committed = true;
}
for (const auto &rejected_tx_hash :
model_block->rejected_transactions_hashes()) {
log_->info("SynchronizationEvent Rejected: {}",
rejected_tx_hash.hex());
this->publishStatus(TxStatusType::kRejected,
rejected_tx_hash);
}
},
// on complete
[this] {
if (current_txs_hashes_.empty()) {
[this, &has_at_least_one_committed] {
if (not has_at_least_one_committed) {
log_->info("there are no transactions to be committed");
} else {
std::lock_guard<std::mutex> lock(notifier_mutex_);
for (const auto &tx_hash : current_txs_hashes_) {
log_->info("on commit committed: {}", tx_hash.hex());
this->publishStatus(TxStatusType::kCommitted, tx_hash);
}
current_txs_hashes_.clear();
}
});
});
Expand Down Expand Up @@ -175,6 +176,10 @@ namespace iroha {
status_factory_->makeStatefulValid(hash, tx_error));
return;
};
case TxStatusType::kRejected: {
status_bus_->publish(status_factory_->makeRejected(hash, tx_error));
return;
};
case TxStatusType::kCommitted: {
status_bus_->publish(status_factory_->makeCommitted(hash, tx_error));
return;
Expand Down
5 changes: 1 addition & 4 deletions irohad/torii/processor/transaction_processor_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,6 @@ namespace iroha {

logger::Logger log_;

/// prevents from emitting new tx statuses from different threads
/// in parallel
std::mutex notifier_mutex_;

// TODO: [IR-1665] Akvinikym 29.08.18: Refactor method publishStatus(..)
/**
* Complementary class for publishStatus method
Expand All @@ -77,6 +73,7 @@ namespace iroha {
kStatelessValid,
kStatefulFailed,
kStatefulValid,
kRejected,
kCommitted,
kMstExpired,
kNotReceived,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ namespace shared_model {

using CommittedTxResponse = TrivialProto<interface::CommittedTxResponse,
iroha::protocol::ToriiResponse>;
using RejectedTxResponse = TrivialProto<interface::RejectTxResponse,
using RejectedTxResponse = TrivialProto<interface::RejectedTxResponse,
iroha::protocol::ToriiResponse>;

// ---------------------------| Rest statuses |-----------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ namespace shared_model {
/**
* Status shows that transaction was rejected on consensus
*/
class RejectTxResponse : public AbstractTxResponse<RejectTxResponse> {
class RejectedTxResponse : public AbstractTxResponse<RejectedTxResponse> {
private:
std::string className() const override {
return "RejectedTxResponse";
Expand Down
4 changes: 2 additions & 2 deletions shared_model/interfaces/transaction_responses/tx_response.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ namespace shared_model {
class StatelessValidTxResponse;
class StatefulFailedTxResponse;
class StatefulValidTxResponse;
class RejectTxResponse;
class RejectedTxResponse;
class CommittedTxResponse;
class MstExpiredResponse;
class NotReceivedTxResponse;
Expand Down Expand Up @@ -47,7 +47,7 @@ namespace shared_model {
StatelessValidTxResponse,
StatefulFailedTxResponse,
StatefulValidTxResponse,
RejectTxResponse,
RejectedTxResponse,
CommittedTxResponse,
MstExpiredResponse,
NotReceivedTxResponse,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/**
* Copyright Soramitsu Co., Ltd. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/
* Copyright Soramitsu Co., Ltd. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

#ifndef IROHA_SHARED_MODEL_TX_RESPONSE_VARIANT_HPP
#define IROHA_SHARED_MODEL_TX_RESPONSE_VARIANT_HPP
Expand All @@ -11,17 +11,17 @@
#include <boost/variant.hpp>

namespace boost {
extern template class variant<
const shared_model::interface::StatelessFailedTxResponse &,
const shared_model::interface::StatelessValidTxResponse &,
const shared_model::interface::StatefulFailedTxResponse &,
const shared_model::interface::StatefulValidTxResponse &,
const shared_model::interface::RejectTxResponse &,
const shared_model::interface::CommittedTxResponse &,
const shared_model::interface::MstExpiredResponse &,
const shared_model::interface::NotReceivedTxResponse &,
const shared_model::interface::MstPendingResponse &,
const shared_model::interface::EnoughSignaturesCollectedResponse &>;
extern template class variant<
const shared_model::interface::StatelessFailedTxResponse &,
const shared_model::interface::StatelessValidTxResponse &,
const shared_model::interface::StatefulFailedTxResponse &,
const shared_model::interface::StatefulValidTxResponse &,
const shared_model::interface::RejectedTxResponse &,
const shared_model::interface::CommittedTxResponse &,
const shared_model::interface::MstExpiredResponse &,
const shared_model::interface::NotReceivedTxResponse &,
const shared_model::interface::MstPendingResponse &,
const shared_model::interface::EnoughSignaturesCollectedResponse &>;
}

#endif // IROHA_SHARED_MODEL_TX_RESPONSE_VARIANT_HPP
15 changes: 4 additions & 11 deletions test/integration/acceptance/revoke_permission_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ TEST_F(GrantablePermissionsFixture, RevokeWithoutPermission) {
IntegrationTestFramework itf(1);
itf.setInitialState(kAdminKeypair);
createTwoAccounts(itf, {}, {Role::kReceive})
.sendTxAwait(makeUserWithPerms(
{interface::permissions::Role::kSetMyQuorum}),
.sendTxAwait(
makeUserWithPerms({interface::permissions::Role::kSetMyQuorum}),
[](auto &block) { ASSERT_EQ(block->transactions().size(), 1); })
.sendTxAwait(
grantPermission(kUser,
Expand Down Expand Up @@ -373,15 +373,8 @@ namespace grantables {
ASSERT_EQ(proposal->transactions().size(), 1);
})
.skipVerifiedProposal()
.skipBlock()
.getTxStatus(last_check_tx.hash(),
[&last_check_tx](auto &status) {
auto err_cmd_name = status.statelessErrorOrCommandName();
auto cmd_in_tx = last_check_tx.commands()[0].toString();
auto cmd_in_tx_name =
cmd_in_tx.substr(0, cmd_in_tx.find(":"));
ASSERT_EQ(err_cmd_name, cmd_in_tx_name);
})
.checkBlock(
[](auto &block) { ASSERT_EQ(block->transactions().size(), 0); })
.done();
}

Expand Down
3 changes: 1 addition & 2 deletions test/integration/acceptance/set_account_quorum_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ TEST_F(QuorumFixture, CannotRaiseQuorumMoreThanSignatures) {
const auto new_quorum = 3;
auto raise_quorum_tx = complete(
baseTx(kAdminId).setAccountQuorum(kAdminId, new_quorum), kAdminKeypair);
itf.sendTxAwait(raise_quorum_tx, CHECK_TXS_QUANTITY(0))
.getTxStatus(raise_quorum_tx.hash(), CHECK_STATEFUL_INVALID);
itf.sendTxAwait(raise_quorum_tx, CHECK_TXS_QUANTITY(0));
}

/**
Expand Down
53 changes: 25 additions & 28 deletions test/module/irohad/torii/processor/transaction_processor_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "torii/processor/transaction_processor_impl.hpp"

#include <backend/protobuf/proto_tx_status_factory.hpp>
#include <boost/range/adaptor/transformed.hpp>
#include <boost/range/join.hpp>
#include <boost/variant.hpp>
#include "builders/default_builders.hpp"
Expand Down Expand Up @@ -233,10 +234,10 @@ TEST_F(TransactionProcessorTest, TransactionProcessorOnProposalBatchTest) {
/**
* @given transaction processor
* @when transactions compose proposal which is sent to peer
* communication service @and all transactions composed the block
* @then for every transaction in bathces STATEFUL_VALID status is returned
* communication service
* @then for every transaction in batches STATEFUL_VALID status is returned
*/
TEST_F(TransactionProcessorTest, TransactionProcessorBlockCreatedTest) {
TEST_F(TransactionProcessorTest, TransactionProcessorVerifiedProposalTest) {
std::vector<shared_model::proto::Transaction> txs;
for (size_t i = 0; i < proposal_size; i++) {
auto &&tx = addSignaturesFromKeyPairs(baseTestTx(), makeKey());
Expand Down Expand Up @@ -268,23 +269,6 @@ TEST_F(TransactionProcessorTest, TransactionProcessorBlockCreatedTest) {
verified_prop_notifier.get_subscriber().on_next(
simulator::VerifiedProposalCreatorEvent{validation_result, round});

auto block = TestBlockBuilder().transactions(txs).build();

// 2. Create block and notify transaction processor about it
rxcpp::subjects::subject<std::shared_ptr<shared_model::interface::Block>>
blocks_notifier;

commit_notifier.get_subscriber().on_next(
SynchronizationEvent{blocks_notifier.get_observable(),
SynchronizationOutcomeType::kCommit,
{},
{}});

blocks_notifier.get_subscriber().on_next(
std::shared_ptr<shared_model::interface::Block>(clone(block)));
// Note blocks_notifier hasn't invoked on_completed, so
// transactions are not commited

SCOPED_TRACE("Stateful Valid status verification");
validateStatuses<shared_model::interface::StatefulValidTxResponse>(txs);
}
Expand Down Expand Up @@ -349,8 +333,8 @@ TEST_F(TransactionProcessorTest, TransactionProcessorOnCommitTest) {
* communication service @and some transactions became part of block, while some
* were not committed, failing stateful validation
* @then for every transaction from block COMMIT status is returned @and
* for every transaction, which failed stateful validation,
* STATEFUL_INVALID_STATUS status is returned
* for every transaction, which failed stateful validation, REJECTED status is
* returned
*/
TEST_F(TransactionProcessorTest, TransactionProcessorInvalidTxsTest) {
std::vector<shared_model::proto::Transaction> block_txs;
Expand All @@ -377,8 +361,9 @@ TEST_F(TransactionProcessorTest, TransactionProcessorInvalidTxsTest) {
// passed or not stateful validation)
// Plus all transactions from block will
// be committed and corresponding status will be sent
// Rejected statuses will be published for invalid transactions
EXPECT_CALL(*status_bus, publish(_))
.Times(proposal_size + block_size)
.Times(proposal_size + block_size + invalid_txs.size())
.WillRepeatedly(testing::Invoke([this](auto response) {
status_map[response->transactionHash()] = response;
}));
Expand All @@ -403,7 +388,20 @@ TEST_F(TransactionProcessorTest, TransactionProcessorInvalidTxsTest) {
verified_prop_notifier.get_subscriber().on_next(
simulator::VerifiedProposalCreatorEvent{validation_result, round});

auto block = TestBlockBuilder().transactions(block_txs).build();
{
SCOPED_TRACE("Stateful invalid status verification");
// check that all invalid transactions will have stateful invalid status
validateStatuses<shared_model::interface::StatefulFailedTxResponse>(
invalid_txs);
}

auto block = TestBlockBuilder()
.transactions(block_txs)
.rejectedTransactions(
invalid_txs | boost::adaptors::transformed([](auto &tx) {
return tx.hash();
}))
.build();

SynchronizationEvent commit_event{
rxcpp::observable<>::just(
Expand All @@ -414,10 +412,9 @@ TEST_F(TransactionProcessorTest, TransactionProcessorInvalidTxsTest) {
commit_notifier.get_subscriber().on_next(commit_event);

{
SCOPED_TRACE("Stateful invalid status verification");
// check that all invalid transactions will have stateful invalid status
validateStatuses<shared_model::interface::StatefulFailedTxResponse>(
invalid_txs);
SCOPED_TRACE("Rejected status verification");
// check that all invalid transactions will have rejected status
validateStatuses<shared_model::interface::RejectedTxResponse>(invalid_txs);
}
{
SCOPED_TRACE("Committed status verification");
Expand Down

0 comments on commit 228c49c

Please sign in to comment.