Skip to content

Commit

Permalink
Merge pull request #3289 from MonsieurNicolas/banDropped
Browse files Browse the repository at this point in the history
Ban transactions that get dropped late in the pipeline

Reviewed-by: graydon
  • Loading branch information
latobarita authored Dec 9, 2021
2 parents eaf39a6 + 577bef4 commit b63c162
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 23 deletions.
40 changes: 25 additions & 15 deletions src/herder/TransactionQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -844,12 +844,12 @@ TransactionQueue::getMaxOpsToFloodThisPeriod() const
return static_cast<size_t>(opsToFlood);
}

bool
TransactionQueue::BroadcastStatus
TransactionQueue::broadcastTx(AccountState& state, TimestampedTx& tx)
{
if (tx.mBroadcasted)
{
return false;
return BroadcastStatus::BROADCAST_STATUS_ALREADY;
}

bool allowTx{true};
Expand Down Expand Up @@ -933,10 +933,11 @@ TransactionQueue::broadcastTx(AccountState& state, TimestampedTx& tx)
// false to our caller so that they will not count this tx against the
// per-timeslice counters -- we want to allow the caller to try useful
// work from other sources.
return false;
return BroadcastStatus::BROADCAST_STATUS_SKIPPED;
}
return mApp.getOverlayManager().broadcastMessage(
tx.mTx->toStellarMessage());
return mApp.getOverlayManager().broadcastMessage(tx.mTx->toStellarMessage())
? BroadcastStatus::BROADCAST_STATUS_SUCCESS
: BroadcastStatus::BROADCAST_STATUS_ALREADY;
}

struct TxQueueTracker
Expand Down Expand Up @@ -1012,6 +1013,7 @@ TransactionQueue::broadcastSome()
}
}

std::vector<TransactionFrameBasePtr> banningTxs;
while (opsToFlood != 0 && !iters.empty())
{
auto curTracker = iters.top();
Expand All @@ -1027,22 +1029,30 @@ TransactionQueue::broadcastSome()
// accumulate more flooding credit
break;
}
auto bStatus = broadcastTx(*curTracker.mAccountState, *cur);
if (bStatus == BroadcastStatus::BROADCAST_STATUS_SUCCESS)
{
auto ops = tx->getNumOperations();
opsToFlood -= ops;
totalOpsToFlood -= ops;
}
// when skipping, we ban the transaction and skip the remainder of the
// queue
if (bStatus == BroadcastStatus::BROADCAST_STATUS_SKIPPED)
{
banningTxs.emplace_back(tx);
}
else
{
if (broadcastTx(*curTracker.mAccountState, *cur))
cur++;
// if we're not done with this account, add the tracker back
if (curTracker.skipToFirstNotBroadcasted())
{
auto ops = tx->getNumOperations();
opsToFlood -= ops;
totalOpsToFlood -= ops;
iters.push(curTracker);
}
cur++;
}
// if we're not done with this account, add the tracker back
if (curTracker.skipToFirstNotBroadcasted())
{
iters.push(curTracker);
}
}
ban(banningTxs);
// carry over remainder, up to MAX_OPS_PER_TX ops
// reason is that if we add 1 next round, we can flood a "worst case fee
// bump" tx
Expand Down
8 changes: 7 additions & 1 deletion src/herder/TransactionQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,13 @@ class TransactionQueue
bool broadcastSome();
void broadcast(bool fromCallback);
// broadcasts a single transaction
bool broadcastTx(AccountState& state, TimestampedTx& tx);
enum class BroadcastStatus
{
BROADCAST_STATUS_ALREADY,
BROADCAST_STATUS_SUCCESS,
BROADCAST_STATUS_SKIPPED
};
BroadcastStatus broadcastTx(AccountState& state, TimestampedTx& tx);

AddResult canAdd(TransactionFrameBasePtr tx,
AccountStates::iterator& stateIter,
Expand Down
13 changes: 7 additions & 6 deletions src/ledger/LedgerManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -658,12 +658,12 @@ LedgerManagerImpl::closeLedger(LedgerCloseData const& ledgerData)

// first, prefetch source accounts for txset, then charge fees
prefetchTxSourceIds(txs);
processFeesSeqNums(txs, ltx, txSet->getBaseFee(header.current()),
ledgerCloseMeta);
auto curBaseFee = txSet->getBaseFee(header.current());
processFeesSeqNums(txs, ltx, curBaseFee, ledgerCloseMeta);

TransactionResultSet txResultSet;
txResultSet.results.reserve(txs.size());
applyTransactions(txs, ltx, txResultSet, ledgerCloseMeta);
applyTransactions(txs, ltx, txResultSet, ledgerCloseMeta, curBaseFee);

ltx.loadHeader().current().txSetResultHash = xdrSha256(txResultSet);

Expand Down Expand Up @@ -1084,7 +1084,7 @@ void
LedgerManagerImpl::applyTransactions(
std::vector<TransactionFrameBasePtr>& txs, AbstractLedgerTxn& ltx,
TransactionResultSet& txResultSet,
std::unique_ptr<LedgerCloseMeta> const& ledgerCloseMeta)
std::unique_ptr<LedgerCloseMeta> const& ledgerCloseMeta, int64 curBaseFee)
{
ZoneNamedN(txsZone, "applyTransactions", true);
int index = 0;
Expand All @@ -1103,8 +1103,9 @@ LedgerManagerImpl::applyTransactions(
});
mOperationCount.Update(static_cast<int64_t>(numOps));
TracyPlot("ledger.operation.count", static_cast<int64_t>(numOps));
CLOG_INFO(Tx, "applying ledger {} (txs:{}, ops:{})",
ltx.loadHeader().current().ledgerSeq, numTxs, numOps);
CLOG_INFO(Tx, "applying ledger {} (txs:{}, ops:{}, base_fee:{})",
ltx.loadHeader().current().ledgerSeq, numTxs, numOps,
curBaseFee);
}

prefetchTransactionData(txs);
Expand Down
3 changes: 2 additions & 1 deletion src/ledger/LedgerManagerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ class LedgerManagerImpl : public LedgerManager
void
applyTransactions(std::vector<TransactionFrameBasePtr>& txs,
AbstractLedgerTxn& ltx, TransactionResultSet& txResultSet,
std::unique_ptr<LedgerCloseMeta> const& ledgerCloseMeta);
std::unique_ptr<LedgerCloseMeta> const& ledgerCloseMeta,
int64 curBaseFee);

void ledgerClosed(AbstractLedgerTxn& ltx);

Expand Down

0 comments on commit b63c162

Please sign in to comment.