diff --git a/src/herder/TransactionQueue.cpp b/src/herder/TransactionQueue.cpp index 6ec98093f4..5ac15116fa 100644 --- a/src/herder/TransactionQueue.cpp +++ b/src/herder/TransactionQueue.cpp @@ -844,12 +844,12 @@ TransactionQueue::getMaxOpsToFloodThisPeriod() const return static_cast(opsToFlood); } -bool +TransactionQueue::BroadcastStatus TransactionQueue::broadcastTx(AccountState& state, TimestampedTx& tx) { if (tx.mBroadcasted) { - return false; + return BroadcastStatus::BROADCAST_STATUS_ALREADY; } bool allowTx{true}; @@ -933,10 +933,11 @@ TransactionQueue::broadcastTx(AccountState& state, TimestampedTx& tx) // false to our caller so that they will not count this tx against the // per-timeslice counters -- we want to allow the caller to try useful // work from other sources. - return false; + return BroadcastStatus::BROADCAST_STATUS_SKIPPED; } - return mApp.getOverlayManager().broadcastMessage( - tx.mTx->toStellarMessage()); + return mApp.getOverlayManager().broadcastMessage(tx.mTx->toStellarMessage()) + ? BroadcastStatus::BROADCAST_STATUS_SUCCESS + : BroadcastStatus::BROADCAST_STATUS_ALREADY; } struct TxQueueTracker @@ -1012,6 +1013,7 @@ TransactionQueue::broadcastSome() } } + std::vector banningTxs; while (opsToFlood != 0 && !iters.empty()) { auto curTracker = iters.top(); @@ -1027,22 +1029,30 @@ TransactionQueue::broadcastSome() // accumulate more flooding credit break; } + auto bStatus = broadcastTx(*curTracker.mAccountState, *cur); + if (bStatus == BroadcastStatus::BROADCAST_STATUS_SUCCESS) + { + auto ops = tx->getNumOperations(); + opsToFlood -= ops; + totalOpsToFlood -= ops; + } + // when skipping, we ban the transaction and skip the remainder of the + // queue + if (bStatus == BroadcastStatus::BROADCAST_STATUS_SKIPPED) + { + banningTxs.emplace_back(tx); + } else { - if (broadcastTx(*curTracker.mAccountState, *cur)) + cur++; + // if we're not done with this account, add the tracker back + if (curTracker.skipToFirstNotBroadcasted()) { - auto ops = tx->getNumOperations(); - opsToFlood -= ops; - totalOpsToFlood -= ops; + iters.push(curTracker); } - cur++; - } - // if we're not done with this account, add the tracker back - if (curTracker.skipToFirstNotBroadcasted()) - { - iters.push(curTracker); } } + ban(banningTxs); // carry over remainder, up to MAX_OPS_PER_TX ops // reason is that if we add 1 next round, we can flood a "worst case fee // bump" tx diff --git a/src/herder/TransactionQueue.h b/src/herder/TransactionQueue.h index 9e70544fd1..c836ce683b 100644 --- a/src/herder/TransactionQueue.h +++ b/src/herder/TransactionQueue.h @@ -198,7 +198,13 @@ class TransactionQueue bool broadcastSome(); void broadcast(bool fromCallback); // broadcasts a single transaction - bool broadcastTx(AccountState& state, TimestampedTx& tx); + enum class BroadcastStatus + { + BROADCAST_STATUS_ALREADY, + BROADCAST_STATUS_SUCCESS, + BROADCAST_STATUS_SKIPPED + }; + BroadcastStatus broadcastTx(AccountState& state, TimestampedTx& tx); AddResult canAdd(TransactionFrameBasePtr tx, AccountStates::iterator& stateIter,