diff --git a/Builds/levelization/results/loops.txt b/Builds/levelization/results/loops.txt index 669fb6bbe33..54f403aabcd 100644 --- a/Builds/levelization/results/loops.txt +++ b/Builds/levelization/results/loops.txt @@ -17,7 +17,7 @@ Loop: xrpld.app xrpld.net xrpld.app > xrpld.net Loop: xrpld.app xrpld.overlay - xrpld.overlay == xrpld.app + xrpld.overlay ~= xrpld.app Loop: xrpld.app xrpld.peerfinder xrpld.app > xrpld.peerfinder diff --git a/include/xrpl/basics/CanProcess.h b/include/xrpl/basics/CanProcess.h new file mode 100644 index 00000000000..44a2b951427 --- /dev/null +++ b/include/xrpl/basics/CanProcess.h @@ -0,0 +1,106 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2024 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#ifndef RIPPLE_BASICS_CANPROCESS_H_INCLUDED +#define RIPPLE_BASICS_CANPROCESS_H_INCLUDED + +/** RAII class to check if an Item is already being processed on another thread, + * as indicated by it's presence in a Collection. + * + * If the Item is not in the Collection, it will be added under lock in the + * ctor, and removed under lock in the dtor. The object will be considered + * "usable" and evaluate to `true`. + * + * If the Item is in the Collection, no changes will be made to the collection, + * and the CanProcess object will be considered "unusable". + * + * It's up to the caller to decide what "usable" and "unusable" mean. (e.g. + * Process or skip a block of code, or set a flag.) + * + * The current use is to avoid lock contention that would be involved in + * processing something associated with the Item. + * + * Examples: + * + * void IncomingLedgers::acquireAsync(LedgerHash const& hash, ...) + * { + * if (CanProcess check{acquiresMutex_, pendingAcquires_, hash}) + * { + * acquire(hash, ...); + * } + * } + * + * bool + * NetworkOPsImp::recvValidation( + * std::shared_ptr const& val, + * std::string const& source) + * { + * CanProcess check( + * validationsMutex_, pendingValidations_, val->getLedgerHash()); + * BypassAccept bypassAccept = + * check.canProcess() ? BypassAccept::no : BypassAccept::yes; + * handleNewValidation(app_, val, source, bypassAccept, m_journal); + * } + * + */ +template +class CanProcess +{ +public: + CanProcess(Mutex& mtx, Collection& collection, Item const& item) + : mtx_(mtx), collection_(collection), item_(item), canProcess_(insert()) + { + } + + ~CanProcess() + { + if (canProcess_) + { + std::unique_lock lock_(mtx_); + collection_.erase(item_); + } + } + + bool + canProcess() const + { + return canProcess_; + } + + operator bool() const + { + return canProcess_; + } + +private: + bool + insert() + { + std::unique_lock lock_(mtx_); + auto const [_, inserted] = collection_.insert(item_); + return inserted; + } + + Mutex& mtx_; + Collection& collection_; + Item const item_; + bool const canProcess_; +}; + +#endif diff --git a/include/xrpl/basics/base_uint.h b/include/xrpl/basics/base_uint.h index ae5aa17a63e..667c618ec54 100644 --- a/include/xrpl/basics/base_uint.h +++ b/include/xrpl/basics/base_uint.h @@ -625,6 +625,13 @@ to_string(base_uint const& a) return strHex(a.cbegin(), a.cend()); } +template +inline std::string +to_short_string(base_uint const& a) +{ + return strHex(a.cbegin(), a.cend()).substr(0, 8) + "..."; +} + template inline std::ostream& operator<<(std::ostream& out, base_uint const& u) diff --git a/include/xrpl/proto/ripple.proto b/include/xrpl/proto/ripple.proto index a06bbd9a311..e121a39706c 100644 --- a/include/xrpl/proto/ripple.proto +++ b/include/xrpl/proto/ripple.proto @@ -321,8 +321,18 @@ message TMLedgerData required uint32 ledgerSeq = 2; required TMLedgerInfoType type = 3; repeated TMLedgerNode nodes = 4; + // If the peer supports "responseCookies", this field will + // never be populated. optional uint32 requestCookie = 5; optional TMReplyError error = 6; + // The old field is called "requestCookie", but this is + // a response, so this name makes more sense + repeated uint32 responseCookies = 7; + // If a TMGetLedger request was received without a "requestCookie", + // and the peer supports it, this flag will be set to true to + // indicate that the receiver should process the result in addition + // to forwarding it to its "responseCookies" peers. + optional bool directResponse = 8; } message TMPing diff --git a/include/xrpl/protocol/LedgerHeader.h b/include/xrpl/protocol/LedgerHeader.h index 663eb709be2..2af464e4e1d 100644 --- a/include/xrpl/protocol/LedgerHeader.h +++ b/include/xrpl/protocol/LedgerHeader.h @@ -55,6 +55,8 @@ struct LedgerHeader // If validated is false, it means "not yet validated." // Once validated is true, it will never be set false at a later time. + // NOTE: If you are accessing this directly, you are probably doing it + // wrong. Use LedgerMaster::isValidated(). // VFALCO TODO Make this not mutable bool mutable validated = false; bool accepted = false; diff --git a/src/test/app/HashRouter_test.cpp b/src/test/app/HashRouter_test.cpp index 1234bc5b9cb..68e0d830657 100644 --- a/src/test/app/HashRouter_test.cpp +++ b/src/test/app/HashRouter_test.cpp @@ -242,6 +242,33 @@ class HashRouter_test : public beast::unit_test::suite BEAST_EXPECT(router.shouldProcess(key, peer, flags, 1s)); } + void + testProcessPeer() + { + using namespace std::chrono_literals; + TestStopwatch stopwatch; + HashRouter router(stopwatch, 5s); + uint256 const key(1); + HashRouter::PeerShortID peer1 = 1; + HashRouter::PeerShortID peer2 = 2; + auto const timeout = 2s; + + BEAST_EXPECT(router.shouldProcessForPeer(key, peer1, timeout)); + BEAST_EXPECT(!router.shouldProcessForPeer(key, peer1, timeout)); + ++stopwatch; + BEAST_EXPECT(!router.shouldProcessForPeer(key, peer1, timeout)); + BEAST_EXPECT(router.shouldProcessForPeer(key, peer2, timeout)); + BEAST_EXPECT(!router.shouldProcessForPeer(key, peer2, timeout)); + ++stopwatch; + BEAST_EXPECT(router.shouldProcessForPeer(key, peer1, timeout)); + BEAST_EXPECT(!router.shouldProcessForPeer(key, peer2, timeout)); + ++stopwatch; + BEAST_EXPECT(router.shouldProcessForPeer(key, peer2, timeout)); + ++stopwatch; + BEAST_EXPECT(router.shouldProcessForPeer(key, peer1, timeout)); + BEAST_EXPECT(!router.shouldProcessForPeer(key, peer2, timeout)); + } + public: void run() override @@ -252,6 +279,7 @@ class HashRouter_test : public beast::unit_test::suite testSetFlags(); testRelay(); testProcess(); + testProcessPeer(); } }; diff --git a/src/test/app/LedgerReplay_test.cpp b/src/test/app/LedgerReplay_test.cpp index 72db7443110..4f277d50eed 100644 --- a/src/test/app/LedgerReplay_test.cpp +++ b/src/test/app/LedgerReplay_test.cpp @@ -321,6 +321,11 @@ class TestPeer : public Peer { return false; } + std::set> + releaseRequestCookies(uint256 const& requestHash) override + { + return {}; + } bool ledgerReplayEnabled_; PublicKey nodePublicKey_; diff --git a/src/test/basics/base_uint_test.cpp b/src/test/basics/base_uint_test.cpp index 9f3194f4fbc..50411461e0d 100644 --- a/src/test/basics/base_uint_test.cpp +++ b/src/test/basics/base_uint_test.cpp @@ -151,6 +151,7 @@ struct base_uint_test : beast::unit_test::suite uset.insert(u); BEAST_EXPECT(raw.size() == u.size()); BEAST_EXPECT(to_string(u) == "0102030405060708090A0B0C"); + BEAST_EXPECT(to_short_string(u) == "01020304..."); BEAST_EXPECT(*u.data() == 1); BEAST_EXPECT(u.signum() == 1); BEAST_EXPECT(!!u); @@ -173,6 +174,7 @@ struct base_uint_test : beast::unit_test::suite test96 v{~u}; uset.insert(v); BEAST_EXPECT(to_string(v) == "FEFDFCFBFAF9F8F7F6F5F4F3"); + BEAST_EXPECT(to_short_string(v) == "FEFDFCFB..."); BEAST_EXPECT(*v.data() == 0xfe); BEAST_EXPECT(v.signum() == 1); BEAST_EXPECT(!!v); @@ -193,6 +195,7 @@ struct base_uint_test : beast::unit_test::suite test96 z{beast::zero}; uset.insert(z); BEAST_EXPECT(to_string(z) == "000000000000000000000000"); + BEAST_EXPECT(to_short_string(z) == "00000000..."); BEAST_EXPECT(*z.data() == 0); BEAST_EXPECT(*z.begin() == 0); BEAST_EXPECT(*std::prev(z.end(), 1) == 0); @@ -213,6 +216,7 @@ struct base_uint_test : beast::unit_test::suite BEAST_EXPECT(n == z); n--; BEAST_EXPECT(to_string(n) == "FFFFFFFFFFFFFFFFFFFFFFFF"); + BEAST_EXPECT(to_short_string(n) == "FFFFFFFF..."); n = beast::zero; BEAST_EXPECT(n == z); @@ -223,6 +227,7 @@ struct base_uint_test : beast::unit_test::suite test96 x{zm1 ^ zp1}; uset.insert(x); BEAST_EXPECTS(to_string(x) == "FFFFFFFFFFFFFFFFFFFFFFFE", to_string(x)); + BEAST_EXPECTS(to_short_string(x) == "FFFFFFFF...", to_short_string(x)); BEAST_EXPECT(uset.size() == 4); diff --git a/src/test/overlay/ProtocolVersion_test.cpp b/src/test/overlay/ProtocolVersion_test.cpp index dfc0ee70b8e..97469c59805 100644 --- a/src/test/overlay/ProtocolVersion_test.cpp +++ b/src/test/overlay/ProtocolVersion_test.cpp @@ -87,8 +87,8 @@ class ProtocolVersion_test : public beast::unit_test::suite negotiateProtocolVersion("XRPL/2.2") == make_protocol(2, 2)); BEAST_EXPECT( negotiateProtocolVersion( - "RTXP/1.2, XRPL/2.2, XRPL/2.3, XRPL/999.999") == - make_protocol(2, 2)); + "RTXP/1.2, XRPL/2.2, XRPL/2.3, XRPL/2.4, XRPL/999.999") == + make_protocol(2, 3)); BEAST_EXPECT( negotiateProtocolVersion("XRPL/999.999, WebSocket/1.0") == std::nullopt); diff --git a/src/test/overlay/reduce_relay_test.cpp b/src/test/overlay/reduce_relay_test.cpp index e0b0d006a2f..0e421bb2ba8 100644 --- a/src/test/overlay/reduce_relay_test.cpp +++ b/src/test/overlay/reduce_relay_test.cpp @@ -181,6 +181,11 @@ class PeerPartial : public Peer removeTxQueue(const uint256&) override { } + std::set> + releaseRequestCookies(uint256 const& requestHash) override + { + return {}; + } }; /** Manually advanced clock. */ diff --git a/src/xrpld/app/consensus/RCLConsensus.cpp b/src/xrpld/app/consensus/RCLConsensus.cpp index 263d660d003..7f4f1ef08ca 100644 --- a/src/xrpld/app/consensus/RCLConsensus.cpp +++ b/src/xrpld/app/consensus/RCLConsensus.cpp @@ -1061,7 +1061,8 @@ void RCLConsensus::Adaptor::updateOperatingMode(std::size_t const positions) const { if (!positions && app_.getOPs().isFull()) - app_.getOPs().setMode(OperatingMode::CONNECTED); + app_.getOPs().setMode( + OperatingMode::CONNECTED, "updateOperatingMode: no positions"); } void diff --git a/src/xrpld/app/ledger/InboundLedger.h b/src/xrpld/app/ledger/InboundLedger.h index 13f603e79d0..8a98ba7c498 100644 --- a/src/xrpld/app/ledger/InboundLedger.h +++ b/src/xrpld/app/ledger/InboundLedger.h @@ -196,6 +196,24 @@ class InboundLedger final : public TimeoutCounter, std::unique_ptr mPeerSet; }; +inline std::string +to_string(InboundLedger::Reason reason) +{ + using enum InboundLedger::Reason; + switch (reason) + { + case HISTORY: + return "HISTORY"; + case GENERIC: + return "GENERIC"; + case CONSENSUS: + return "CONSENSUS"; + default: + assert(false); + return "unknown"; + } +} + } // namespace ripple #endif diff --git a/src/xrpld/app/ledger/detail/InboundLedger.cpp b/src/xrpld/app/ledger/detail/InboundLedger.cpp index 16b15c2fce7..e8d6d5a9f53 100644 --- a/src/xrpld/app/ledger/detail/InboundLedger.cpp +++ b/src/xrpld/app/ledger/detail/InboundLedger.cpp @@ -390,7 +390,14 @@ InboundLedger::onTimer(bool wasProgress, ScopedLockType&) if (!wasProgress) { - checkLocal(); + if (checkLocal()) + { + // Done. Something else (probably consensus) built the ledger + // locally while waiting for data (or possibly before requesting) + assert(isDone()); + JLOG(journal_.info()) << "Finished while waiting " << hash_; + return; + } mByHash = true; @@ -497,15 +504,17 @@ InboundLedger::trigger(std::shared_ptr const& peer, TriggerReason reason) if (auto stream = journal_.debug()) { - stream << "Trigger acquiring ledger " << hash_; + std::stringstream ss; + ss << "Trigger acquiring ledger " << hash_; if (peer) - stream << " from " << peer; + ss << " from " << peer; if (complete_ || failed_) - stream << "complete=" << complete_ << " failed=" << failed_; + ss << " complete=" << complete_ << " failed=" << failed_; else - stream << "header=" << mHaveHeader << " tx=" << mHaveTransactions - << " as=" << mHaveState; + ss << " header=" << mHaveHeader << " tx=" << mHaveTransactions + << " as=" << mHaveState; + stream << ss.str(); } if (!mHaveHeader) diff --git a/src/xrpld/app/ledger/detail/InboundLedgers.cpp b/src/xrpld/app/ledger/detail/InboundLedgers.cpp index f6d86a4d737..5f4f9eff841 100644 --- a/src/xrpld/app/ledger/detail/InboundLedgers.cpp +++ b/src/xrpld/app/ledger/detail/InboundLedgers.cpp @@ -23,9 +23,9 @@ #include #include #include +#include #include #include -#include #include #include #include @@ -75,11 +75,90 @@ class InboundLedgersImp : public InboundLedgers auto doAcquire = [&, seq, reason]() -> std::shared_ptr { assert(hash.isNonZero()); - // probably not the right rule - if (app_.getOPs().isNeedNetworkLedger() && - (reason != InboundLedger::Reason::GENERIC) && - (reason != InboundLedger::Reason::CONSENSUS)) + bool const shouldAcquire = [&]() { + if (!app_.getOPs().isNeedNetworkLedger()) + return true; + if (reason == InboundLedger::Reason::GENERIC) + return true; + if (reason == InboundLedger::Reason::CONSENSUS) + return true; + return false; + }(); + assert( + shouldAcquire == + !(app_.getOPs().isNeedNetworkLedger() && + (reason != InboundLedger::Reason::GENERIC) && + (reason != InboundLedger::Reason::CONSENSUS))); + + std::stringstream ss; + ss << "InboundLedger::acquire: " + << "Request: " << to_string(hash) << ", " << seq + << " NeedNetworkLedger: " + << (app_.getOPs().isNeedNetworkLedger() ? "yes" : "no") + << " Reason: " << to_string(reason) + << " Should acquire: " << (shouldAcquire ? "true." : "false."); + + /* Acquiring ledgers is somewhat expensive. It requires lots of + * computation and network communication. Avoid it when it's not + * appropriate. Every validation from a peer for a ledger that + * we do not have locally results in a call to this function: even + * if we are moments away from validating the same ledger. + */ + bool const shouldBroadcast = [&]() { + // If the node is not in "full" state, it needs to sync to + // the network, and doesn't have the necessary tx's and + // ledger entries to build the ledger. + bool const isFull = app_.getOPs().isFull(); + // If everything else is ok, don't try to acquire the ledger + // if the requested seq is in the near future relative to + // the validated ledger. If the requested ledger is between + // 1 and 19 inclusive ledgers ahead of the valid ledger this + // node has not built it yet, but it's possible/likely it + // has the tx's necessary to build it and get caught up. + // Plus it might not become validated. On the other hand, if + // it's more than 20 in the future, this node should request + // it so that it can jump ahead and get caught up. + LedgerIndex const validSeq = + app_.getLedgerMaster().getValidLedgerIndex(); + constexpr std::size_t lagLeeway = 20; + bool const nearFuture = + (seq > validSeq) && (seq < validSeq + lagLeeway); + // If everything else is ok, don't try to acquire the ledger + // if the request is related to consensus. (Note that + // consensus calls usually pass a seq of 0, so nearFuture + // will be false other than on a brand new network.) + bool const consensus = + reason == InboundLedger::Reason::CONSENSUS; + ss << " Evaluating whether to broadcast requests to peers" + << ". full: " << (isFull ? "true" : "false") + << ". ledger sequence " << seq + << ". Valid sequence: " << validSeq + << ". Lag leeway: " << lagLeeway + << ". request for near future ledger: " + << (nearFuture ? "true" : "false") + << ". Consensus: " << (consensus ? "true" : "false"); + + // If the node is not synced, send requests. + if (!isFull) + return true; + // If the ledger is in the near future, do NOT send requests. + // This node is probably about to build it. + if (nearFuture) + return false; + // If the request is because of consensus, do NOT send requests. + // This node is probably about to build it. + if (consensus) + return false; + return true; + }(); + ss << ". Would broadcast to peers? " + << (shouldBroadcast ? "true." : "false."); + + if (!shouldAcquire) + { + JLOG(j_.debug()) << "Abort(rule): " << ss.str(); return {}; + } bool isNew = true; std::shared_ptr inbound; @@ -87,6 +166,7 @@ class InboundLedgersImp : public InboundLedgers ScopedLockType sl(mLock); if (stopping_) { + JLOG(j_.debug()) << "Abort(stopping): " << ss.str(); return {}; } @@ -110,23 +190,29 @@ class InboundLedgersImp : public InboundLedgers ++mCounter; } } + ss << " IsNew: " << (isNew ? "true" : "false"); if (inbound->isFailed()) + { + JLOG(j_.debug()) << "Abort(failed): " << ss.str(); return {}; + } if (!isNew) inbound->update(seq); if (!inbound->isComplete()) + { + JLOG(j_.debug()) << "InProgress: " << ss.str(); return {}; + } + JLOG(j_.debug()) << "Complete: " << ss.str(); return inbound->getLedger(); }; using namespace std::chrono_literals; - std::shared_ptr ledger = perf::measureDurationAndLog( + return perf::measureDurationAndLog( doAcquire, "InboundLedgersImp::acquire", 500ms, j_); - - return ledger; } void @@ -135,28 +221,25 @@ class InboundLedgersImp : public InboundLedgers std::uint32_t seq, InboundLedger::Reason reason) override { - std::unique_lock lock(acquiresMutex_); - try - { - if (pendingAcquires_.contains(hash)) - return; - pendingAcquires_.insert(hash); - scope_unlock unlock(lock); - acquire(hash, seq, reason); - } - catch (std::exception const& e) - { - JLOG(j_.warn()) - << "Exception thrown for acquiring new inbound ledger " << hash - << ": " << e.what(); - } - catch (...) + if (CanProcess check{acquiresMutex_, pendingAcquires_, hash}) { - JLOG(j_.warn()) - << "Unknown exception thrown for acquiring new inbound ledger " - << hash; + try + { + acquire(hash, seq, reason); + } + catch (std::exception const& e) + { + JLOG(j_.warn()) + << "Exception thrown for acquiring new inbound ledger " + << hash << ": " << e.what(); + } + catch (...) + { + JLOG(j_.warn()) << "Unknown exception thrown for acquiring new " + "inbound ledger " + << hash; + } } - pendingAcquires_.erase(hash); } std::shared_ptr diff --git a/src/xrpld/app/ledger/detail/LedgerMaster.cpp b/src/xrpld/app/ledger/detail/LedgerMaster.cpp index 53edef17d33..550c10aa35f 100644 --- a/src/xrpld/app/ledger/detail/LedgerMaster.cpp +++ b/src/xrpld/app/ledger/detail/LedgerMaster.cpp @@ -968,8 +968,9 @@ LedgerMaster::checkAccept(std::shared_ptr const& ledger) } JLOG(m_journal.info()) << "Advancing accepted ledger to " - << ledger->info().seq << " with >= " << minVal - << " validations"; + << ledger->info().seq << " (" + << to_short_string(ledger->info().hash) + << ") with >= " << minVal << " validations"; ledger->setValidated(); ledger->setFull(); diff --git a/src/xrpld/app/ledger/detail/TimeoutCounter.cpp b/src/xrpld/app/ledger/detail/TimeoutCounter.cpp index f70e54f8cd4..c38cee579b6 100644 --- a/src/xrpld/app/ledger/detail/TimeoutCounter.cpp +++ b/src/xrpld/app/ledger/detail/TimeoutCounter.cpp @@ -33,7 +33,8 @@ TimeoutCounter::TimeoutCounter( QueueJobParameter&& jobParameter, beast::Journal journal) : app_(app) - , journal_(journal) + , sink_(journal, to_short_string(hash) + " ") + , journal_(sink_) , hash_(hash) , timeouts_(0) , complete_(false) @@ -51,6 +52,8 @@ TimeoutCounter::setTimer(ScopedLockType& sl) { if (isDone()) return; + JLOG(journal_.debug()) << "Setting timer for " << timerInterval_.count() + << "ms"; timer_.expires_after(timerInterval_); timer_.async_wait( [wptr = pmDowncast()](boost::system::error_code const& ec) { @@ -59,6 +62,12 @@ TimeoutCounter::setTimer(ScopedLockType& sl) if (auto ptr = wptr.lock()) { + JLOG(ptr->journal_.debug()) + << "timer: ec: " << ec << " (operation_aborted: " + << boost::asio::error::operation_aborted << " - " + << (ec == boost::asio::error::operation_aborted ? "aborted" + : "other") + << ")"; ScopedLockType sl(ptr->mtx_); ptr->queueJob(sl); } diff --git a/src/xrpld/app/ledger/detail/TimeoutCounter.h b/src/xrpld/app/ledger/detail/TimeoutCounter.h index 228e879d4de..a65208a938b 100644 --- a/src/xrpld/app/ledger/detail/TimeoutCounter.h +++ b/src/xrpld/app/ledger/detail/TimeoutCounter.h @@ -24,6 +24,8 @@ #include #include #include +#include + #include #include @@ -121,6 +123,7 @@ class TimeoutCounter // Used in this class for access to boost::asio::io_service and // ripple::Overlay. Used in subtypes for the kitchen sink. Application& app_; + beast::WrappedSink sink_; beast::Journal journal_; mutable std::recursive_mutex mtx_; diff --git a/src/xrpld/app/misc/HashRouter.cpp b/src/xrpld/app/misc/HashRouter.cpp index c117d20fe2b..95a8ea1ffe8 100644 --- a/src/xrpld/app/misc/HashRouter.cpp +++ b/src/xrpld/app/misc/HashRouter.cpp @@ -90,6 +90,20 @@ HashRouter::shouldProcess( return s.shouldProcess(suppressionMap_.clock().now(), tx_interval); } +bool +HashRouter::shouldProcessForPeer( + uint256 const& key, + PeerShortID peer, + std::chrono::seconds interval) +{ + std::lock_guard lock(mutex_); + + auto& entry = emplace(key).first; + + return entry.shouldProcessForPeer( + peer, suppressionMap_.clock().now(), interval); +} + int HashRouter::getFlags(uint256 const& key) { @@ -128,4 +142,20 @@ HashRouter::shouldRelay(uint256 const& key) return s.releasePeerSet(); } +void +HashRouter::forEachPeer( + uint256 const& key, + std::function callback) +{ + std::lock_guard lock(mutex_); + + auto& s = emplace(key).first; + + for (auto const peerID : s.peekPeerSet()) + { + if (!callback(peerID)) + return; + } +} + } // namespace ripple diff --git a/src/xrpld/app/misc/HashRouter.h b/src/xrpld/app/misc/HashRouter.h index e9d040fc8bf..a73720c05b0 100644 --- a/src/xrpld/app/misc/HashRouter.h +++ b/src/xrpld/app/misc/HashRouter.h @@ -92,6 +92,13 @@ class HashRouter return std::move(peers_); } + /** Return set of peers waiting for reply. Leaves list unchanged. */ + std::set const& + peekPeerSet() + { + return peers_; + } + /** Return seated relay time point if the message has been relayed */ std::optional relayed() const @@ -125,6 +132,21 @@ class HashRouter return true; } + bool + shouldProcessForPeer( + PeerShortID peer, + Stopwatch::time_point now, + std::chrono::seconds interval) + { + if (peerProcessed_.contains(peer) && + ((peerProcessed_[peer] + interval) > now)) + return false; + // Peer may already be in the list, but adding it again doesn't hurt + addPeer(peer); + peerProcessed_[peer] = now; + return true; + } + private: int flags_ = 0; std::set peers_; @@ -132,6 +154,7 @@ class HashRouter // than one flag needs to expire independently. std::optional relayed_; std::optional processed_; + std::map peerProcessed_; }; public: @@ -163,7 +186,7 @@ class HashRouter /** Add a suppression peer and get message's relay status. * Return pair: - * element 1: true if the peer is added. + * element 1: true if the key is added. * element 2: optional is seated to the relay time point or * is unseated if has not relayed yet. */ std::pair> @@ -180,6 +203,18 @@ class HashRouter int& flags, std::chrono::seconds tx_interval); + /** Determines whether the hashed item should be processed for the given + peer. Could be an incoming or outgoing message. + + Items filtered with this function should only be processed for the given + peer once. Unlike shouldProcess, it can be processed for other peers. + */ + bool + shouldProcessForPeer( + uint256 const& key, + PeerShortID peer, + std::chrono::seconds interval); + /** Set the flags on a hash. @return `true` if the flags were changed. `false` if unchanged. @@ -205,6 +240,15 @@ class HashRouter std::optional> shouldRelay(uint256 const& key); + /** Calls a callback for each peer in the Entry for the key + + callback is executed under lock. If callback returns false, the loop + will exit. + + */ + void + forEachPeer(uint256 const& key, std::function callback); + private: // pair.second indicates whether the entry was created std::pair diff --git a/src/xrpld/app/misc/NetworkOPs.cpp b/src/xrpld/app/misc/NetworkOPs.cpp index d647df91f1e..19e28ecef62 100644 --- a/src/xrpld/app/misc/NetworkOPs.cpp +++ b/src/xrpld/app/misc/NetworkOPs.cpp @@ -50,10 +50,10 @@ #include #include #include +#include #include #include #include -#include #include #include #include @@ -400,7 +400,7 @@ class NetworkOPsImp final : public NetworkOPs isFull() override; void - setMode(OperatingMode om) override; + setMode(OperatingMode om, const char* reason) override; bool isBlocked() override; @@ -871,7 +871,7 @@ NetworkOPsImp::strOperatingMode(bool const admin /* = false */) const inline void NetworkOPsImp::setStandAlone() { - setMode(OperatingMode::FULL); + setMode(OperatingMode::FULL, "setStandAlone"); } inline void @@ -1019,7 +1019,9 @@ NetworkOPsImp::processHeartbeatTimer() { if (mMode != OperatingMode::DISCONNECTED) { - setMode(OperatingMode::DISCONNECTED); + setMode( + OperatingMode::DISCONNECTED, + "Heartbeat: insufficient peers"); JLOG(m_journal.warn()) << "Node count (" << numPeers << ") has fallen " << "below required minimum (" << minPeerCount_ << ")."; @@ -1035,7 +1037,7 @@ NetworkOPsImp::processHeartbeatTimer() if (mMode == OperatingMode::DISCONNECTED) { - setMode(OperatingMode::CONNECTED); + setMode(OperatingMode::CONNECTED, "Heartbeat: sufficient peers"); JLOG(m_journal.info()) << "Node count (" << numPeers << ") is sufficient."; } @@ -1043,9 +1045,9 @@ NetworkOPsImp::processHeartbeatTimer() // Check if the last validated ledger forces a change between these // states. if (mMode == OperatingMode::SYNCING) - setMode(OperatingMode::SYNCING); + setMode(OperatingMode::SYNCING, "Heartbeat: check syncing"); else if (mMode == OperatingMode::CONNECTED) - setMode(OperatingMode::CONNECTED); + setMode(OperatingMode::CONNECTED, "Heartbeat: check connected"); } mConsensus.timerEntry(app_.timeKeeper().closeTime()); @@ -1599,7 +1601,7 @@ void NetworkOPsImp::setAmendmentBlocked() { amendmentBlocked_ = true; - setMode(OperatingMode::CONNECTED); + setMode(OperatingMode::CONNECTED, "setAmendmentBlocked"); } inline bool @@ -1630,7 +1632,7 @@ void NetworkOPsImp::setUNLBlocked() { unlBlocked_ = true; - setMode(OperatingMode::CONNECTED); + setMode(OperatingMode::CONNECTED, "setUNLBlocked"); } inline void @@ -1731,7 +1733,7 @@ NetworkOPsImp::checkLastClosedLedger( if ((mMode == OperatingMode::TRACKING) || (mMode == OperatingMode::FULL)) { - setMode(OperatingMode::CONNECTED); + setMode(OperatingMode::CONNECTED, "check LCL: not on consensus ledger"); } if (consensus) @@ -1816,8 +1818,9 @@ NetworkOPsImp::beginConsensus(uint256 const& networkClosed) // this shouldn't happen unless we jump ledgers if (mMode == OperatingMode::FULL) { - JLOG(m_journal.warn()) << "Don't have LCL, going to tracking"; - setMode(OperatingMode::TRACKING); + JLOG(m_journal.warn()) + << "beginConsensus Don't have LCL, going to tracking"; + setMode(OperatingMode::TRACKING, "beginConsensus: No LCL"); } return false; @@ -1923,7 +1926,7 @@ NetworkOPsImp::endConsensus() // validations we have for LCL. If the ledger is good enough, go to // TRACKING - TODO if (!needNetworkLedger_) - setMode(OperatingMode::TRACKING); + setMode(OperatingMode::TRACKING, "endConsensus: check tracking"); } if (((mMode == OperatingMode::CONNECTED) || @@ -1937,7 +1940,7 @@ NetworkOPsImp::endConsensus() if (app_.timeKeeper().now() < (current->info().parentCloseTime + 2 * current->info().closeTimeResolution)) { - setMode(OperatingMode::FULL); + setMode(OperatingMode::FULL, "endConsensus: check full"); } } @@ -1949,7 +1952,7 @@ NetworkOPsImp::consensusViewChange() { if ((mMode == OperatingMode::FULL) || (mMode == OperatingMode::TRACKING)) { - setMode(OperatingMode::CONNECTED); + setMode(OperatingMode::CONNECTED, "consensusViewChange"); } } @@ -2267,7 +2270,7 @@ NetworkOPsImp::pubPeerStatus(std::function const& func) } void -NetworkOPsImp::setMode(OperatingMode om) +NetworkOPsImp::setMode(OperatingMode om, const char* reason) { using namespace std::chrono_literals; if (om == OperatingMode::CONNECTED) @@ -2287,11 +2290,12 @@ NetworkOPsImp::setMode(OperatingMode om) if (mMode == om) return; + auto const sink = om < mMode ? m_journal.warn() : m_journal.info(); mMode = om; accounting_.mode(om); - JLOG(m_journal.info()) << "STATE->" << strOperatingMode(); + JLOG(sink) << "STATE->" << strOperatingMode() << " - " << reason; pubServer(); } @@ -2303,34 +2307,28 @@ NetworkOPsImp::recvValidation( JLOG(m_journal.trace()) << "recvValidation " << val->getLedgerHash() << " from " << source; - std::unique_lock lock(validationsMutex_); - BypassAccept bypassAccept = BypassAccept::no; - try - { - if (pendingValidations_.contains(val->getLedgerHash())) - bypassAccept = BypassAccept::yes; - else - pendingValidations_.insert(val->getLedgerHash()); - scope_unlock unlock(lock); - handleNewValidation(app_, val, source, bypassAccept, m_journal); - } - catch (std::exception const& e) { - JLOG(m_journal.warn()) - << "Exception thrown for handling new validation " - << val->getLedgerHash() << ": " << e.what(); - } - catch (...) - { - JLOG(m_journal.warn()) - << "Unknown exception thrown for handling new validation " - << val->getLedgerHash(); - } - if (bypassAccept == BypassAccept::no) - { - pendingValidations_.erase(val->getLedgerHash()); + CanProcess check( + validationsMutex_, pendingValidations_, val->getLedgerHash()); + try + { + BypassAccept bypassAccept = + check.canProcess() ? BypassAccept::no : BypassAccept::yes; + handleNewValidation(app_, val, source, bypassAccept, m_journal); + } + catch (std::exception const& e) + { + JLOG(m_journal.warn()) + << "Exception thrown for handling new validation " + << val->getLedgerHash() << ": " << e.what(); + } + catch (...) + { + JLOG(m_journal.warn()) + << "Unknown exception thrown for handling new validation " + << val->getLedgerHash(); + } } - lock.unlock(); pubValidation(val); diff --git a/src/xrpld/app/misc/NetworkOPs.h b/src/xrpld/app/misc/NetworkOPs.h index 166b9e9e11f..96969f4bcba 100644 --- a/src/xrpld/app/misc/NetworkOPs.h +++ b/src/xrpld/app/misc/NetworkOPs.h @@ -197,7 +197,7 @@ class NetworkOPs : public InfoSub::Source virtual bool isFull() = 0; virtual void - setMode(OperatingMode om) = 0; + setMode(OperatingMode om, const char* reason) = 0; virtual bool isBlocked() = 0; virtual bool diff --git a/src/xrpld/overlay/Peer.h b/src/xrpld/overlay/Peer.h index 82ed2c2481a..efa5c63ebb9 100644 --- a/src/xrpld/overlay/Peer.h +++ b/src/xrpld/overlay/Peer.h @@ -36,6 +36,7 @@ enum class ProtocolFeature { ValidatorListPropagation, ValidatorList2Propagation, LedgerReplay, + LedgerDataCookies }; /** Represents a peer connection in the overlay. */ @@ -133,6 +134,13 @@ class Peer virtual bool txReduceRelayEnabled() const = 0; + + // + // Messages + // + + virtual std::set> + releaseRequestCookies(uint256 const& requestHash) = 0; }; } // namespace ripple diff --git a/src/xrpld/overlay/detail/PeerImp.cpp b/src/xrpld/overlay/detail/PeerImp.cpp index 4f5f1470f8e..6136382ca05 100644 --- a/src/xrpld/overlay/detail/PeerImp.cpp +++ b/src/xrpld/overlay/detail/PeerImp.cpp @@ -30,6 +30,7 @@ #include #include #include +#include #include #include #include @@ -38,7 +39,6 @@ #include #include #include -// #include #include #include @@ -60,6 +60,9 @@ std::chrono::milliseconds constexpr peerHighLatency{300}; /** How often we PING the peer to check for latency and sendq probe */ std::chrono::seconds constexpr peerTimerInterval{60}; + +/** How often we process duplicate incoming TMGetLedger messages */ +std::chrono::seconds constexpr getledgerInterval{15}; } // namespace PeerImp::PeerImp( @@ -504,6 +507,8 @@ PeerImp::supportsFeature(ProtocolFeature f) const return protocol_ >= make_protocol(2, 2); case ProtocolFeature::LedgerReplay: return ledgerReplayEnabled_; + case ProtocolFeature::LedgerDataCookies: + return protocol_ >= make_protocol(2, 3); } return false; } @@ -1317,8 +1322,9 @@ PeerImp::handleTransaction( void PeerImp::onMessage(std::shared_ptr const& m) { - auto badData = [&](std::string const& msg) { - charge(Resource::feeBadData); + auto badData = [&](std::string const& msg, bool chargefee = true) { + if (chargefee) + charge(Resource::feeBadData); JLOG(p_journal_.warn()) << "TMGetLedger: " << msg; }; auto const itype{m->itype()}; @@ -1395,12 +1401,74 @@ PeerImp::onMessage(std::shared_ptr const& m) } } + // Drop duplicate requests from the same peer for at least + // `getLedgerInterval` seconds. + // Append a little junk to prevent the hash of an incoming messsage + // from matching the hash of the same outgoing message. + // `shouldProcessForPeer` does not distingish between incoming and + // outgoing, and some of the message relay logic checks the hash to see + // if the message has been relayed already. If the hashes are the same, + // a duplicate will be detected when sending the message is attempted, + // so it will fail. + auto const messageHash = sha512Half(*m, nullptr); + // Request cookies are not included in the hash. Track them here. + auto const requestCookie = [&m]() -> std::optional { + if (m->has_requestcookie()) + return m->requestcookie(); + return std::nullopt; + }(); + auto const [inserted, pending] = [&] { + std::lock_guard lock{cookieLock_}; + auto& cookies = messageRequestCookies_[messageHash]; + bool const pending = !cookies.empty(); + return std::pair{cookies.emplace(requestCookie).second, pending}; + }(); + // Check if the request has been seen from this peer. + if (!app_.getHashRouter().shouldProcessForPeer( + messageHash, id_, getledgerInterval)) + { + // This request has already been seen from this peer. + // Has it been seen with this request cookie (or lack thereof)? + + if (inserted) + { + // This is a duplicate request, but with a new cookie. When a + // response is ready, one will be sent for each request cookie. + JLOG(p_journal_.debug()) + << "TMGetLedger: duplicate request with new request cookie: " + << requestCookie.value_or(0) + << ". Job pending: " << (pending ? "yes" : "no") << ": " + << messageHash; + if (pending) + { + // Don't bother queueing up a new job if other requests are + // already pending. This should limit entries in the job queue + // to one per peer per unique request. + JLOG(p_journal_.debug()) + << "TMGetLedger: Suppressing recvGetLedger job, since one " + "is pending: " + << messageHash; + return; + } + } + else + { + // Don't punish nodes that don't know any better + return badData( + "duplicate request: " + to_string(messageHash), + supportsFeature(ProtocolFeature::LedgerDataCookies)); + } + } + // Queue a job to process the request + JLOG(p_journal_.debug()) + << "TMGetLedger: Adding recvGetLedger job: " << messageHash; std::weak_ptr weak = shared_from_this(); - app_.getJobQueue().addJob(jtLEDGER_REQ, "recvGetLedger", [weak, m]() { - if (auto peer = weak.lock()) - peer->processLedgerRequest(m); - }); + app_.getJobQueue().addJob( + jtLEDGER_REQ, "recvGetLedger", [weak, m, messageHash]() { + if (auto peer = weak.lock()) + peer->processLedgerRequest(m, messageHash); + }); } void @@ -1504,8 +1572,9 @@ PeerImp::onMessage(std::shared_ptr const& m) void PeerImp::onMessage(std::shared_ptr const& m) { - auto badData = [&](std::string const& msg) { - fee_ = Resource::feeBadData; + auto badData = [&](std::string const& msg, bool charge = true) { + if (charge) + fee_ = Resource::feeBadData; JLOG(p_journal_.warn()) << "TMLedgerData: " << msg; }; @@ -1556,23 +1625,89 @@ PeerImp::onMessage(std::shared_ptr const& m) "Invalid Ledger/TXset nodes " + std::to_string(m->nodes_size())); } - // If there is a request cookie, attempt to relay the message - if (m->has_requestcookie()) + auto const messageHash = sha512Half(*m); + if (!app_.getHashRouter().addSuppressionPeer(messageHash, id_)) { - if (auto peer = overlay_.findPeerByShortID(m->requestcookie())) + // Don't punish nodes that don't know any better + return badData( + "Duplicate message: " + to_string(messageHash), + supportsFeature(ProtocolFeature::LedgerDataCookies)); + } + + bool const routed = m->has_directresponse() || m->responsecookies_size() || + m->has_requestcookie(); + + { + // Check if this message needs to be forwarded to one or more peers. + // Maximum of one of the relevant fields should be populated. + assert(!m->has_requestcookie() || !m->responsecookies_size()); + + // Make a copy of the response cookies, then wipe the list so it can be + // forwarded cleanly + auto const responseCookies = m->responsecookies(); + m->clear_responsecookies(); + // Flag indicating if this response should be processed locally, + // possibly in addition to being forwarded. + bool const directResponse = + m->has_directresponse() && m->directresponse(); + m->clear_directresponse(); + + auto const relay = [this, m, &messageHash](auto const cookie) { + if (auto peer = overlay_.findPeerByShortID(cookie)) + { + assert(!m->has_requestcookie() && !m->responsecookies_size()); + if (peer->supportsFeature(ProtocolFeature::LedgerDataCookies)) + // Setting this flag is not _strictly_ necessary for peers + // that support it if there are no cookies included in the + // message, but it is more accurate. + m->set_directresponse(true); + else + m->clear_directresponse(); + peer->send( + std::make_shared(*m, protocol::mtLEDGER_DATA)); + } + else + JLOG(p_journal_.info()) + << "Unable to route TX/ledger data reply to peer [" + << cookie << "]: " << messageHash; + }; + // If there is a request cookie, attempt to relay the message + if (m->has_requestcookie()) { + assert(responseCookies.empty()); m->clear_requestcookie(); - peer->send(std::make_shared(*m, protocol::mtLEDGER_DATA)); + relay(m->requestcookie()); + if (!directResponse && responseCookies.empty()) + return; } - else + // If there's a list of request cookies, attempt to relay the message to + // all of them. + if (responseCookies.size()) { - JLOG(p_journal_.info()) << "Unable to route TX/ledger data reply"; + for (auto const cookie : responseCookies) + relay(cookie); + if (!directResponse) + return; } - return; } - uint256 const ledgerHash{m->ledgerhash()}; + // Now that any forwarding is done check the base message (data only, no + // routing info for duplicates) + if (routed) + { + m->clear_directresponse(); + assert(!m->has_requestcookie() && !m->responsecookies_size()); + auto const baseMessageHash = sha512Half(*m); + if (!app_.getHashRouter().addSuppressionPeer(baseMessageHash, id_)) + { + // Don't punish nodes that don't know any better + return badData( + "Duplicate message: " + to_string(baseMessageHash), + supportsFeature(ProtocolFeature::LedgerDataCookies)); + } + } + uint256 const ledgerHash{m->ledgerhash()}; // Otherwise check if received data for a candidate transaction set if (m->type() == protocol::liTS_CANDIDATE) { @@ -2885,16 +3020,21 @@ PeerImp::checkValidation( // the TX tree with the specified root hash. // static std::shared_ptr -getPeerWithTree(OverlayImpl& ov, uint256 const& rootHash, PeerImp const* skip) +getPeerWithTree( + OverlayImpl& ov, + uint256 const& rootHash, + PeerImp const* skip, + std::function shouldProcessCallback) { std::shared_ptr ret; int retScore = 0; + assert(shouldProcessCallback); ov.for_each([&](std::shared_ptr&& p) { if (p->hasTxSet(rootHash) && p.get() != skip) { auto score = p->getScore(true); - if (!ret || (score > retScore)) + if (!ret || (score > retScore && shouldProcessCallback(p->id()))) { ret = std::move(p); retScore = score; @@ -2913,16 +3053,18 @@ getPeerWithLedger( OverlayImpl& ov, uint256 const& ledgerHash, LedgerIndex ledger, - PeerImp const* skip) + PeerImp const* skip, + std::function shouldProcessCallback) { std::shared_ptr ret; int retScore = 0; + assert(shouldProcessCallback); ov.for_each([&](std::shared_ptr&& p) { if (p->hasLedger(ledgerHash, ledger) && p.get() != skip) { auto score = p->getScore(true); - if (!ret || (score > retScore)) + if (!ret || (score > retScore && shouldProcessCallback(p->id()))) { ret = std::move(p); retScore = score; @@ -2936,7 +3078,8 @@ getPeerWithLedger( void PeerImp::sendLedgerBase( std::shared_ptr const& ledger, - protocol::TMLedgerData& ledgerData) + protocol::TMLedgerData& ledgerData, + PeerCookieMap const& destinations) { JLOG(p_journal_.trace()) << "sendLedgerBase: Base data"; @@ -2968,15 +3111,96 @@ PeerImp::sendLedgerBase( } } - auto message{ - std::make_shared(ledgerData, protocol::mtLEDGER_DATA)}; - send(message); + sendToMultiple(ledgerData, destinations); +} + +void +PeerImp::sendToMultiple( + protocol::TMLedgerData& ledgerData, + PeerCookieMap const& destinations) +{ + bool foundSelf = false; + for (auto const& [peer, cookies] : destinations) + { + if (peer.get() == this) + foundSelf = true; + bool const multipleCookies = + peer->supportsFeature(ProtocolFeature::LedgerDataCookies); + std::vector sendCookies; + + bool directResponse = false; + if (!multipleCookies) + { + JLOG(p_journal_.debug()) + << "sendToMultiple: Sending " << cookies.size() + << " TMLedgerData messages to peer [" << peer->id() + << "]: " << sha512Half(ledgerData); + } + for (auto const& cookie : cookies) + { + // Unfortunately, need a separate Message object for every + // combination + if (cookie) + { + if (multipleCookies) + { + // Save this one for later to send a single message + sendCookies.emplace_back(*cookie); + continue; + } + + // Feature not supported, so send a single message with a + // single cookie + ledgerData.set_requestcookie(*cookie); + } + else + { + if (multipleCookies) + { + // Set this flag later on the single message + directResponse = true; + continue; + } + + ledgerData.clear_requestcookie(); + } + assert(!multipleCookies); + auto message{ + std::make_shared(ledgerData, protocol::mtLEDGER_DATA)}; + peer->send(message); + } + if (multipleCookies) + { + // Send a single message with all the cookies and/or the direct + // response flag, so the receiver can farm out the single message to + // multiple peers and/or itself + assert(sendCookies.size() || directResponse); + ledgerData.clear_requestcookie(); + ledgerData.clear_responsecookies(); + ledgerData.set_directresponse(directResponse); + for (auto const& cookie : sendCookies) + ledgerData.add_responsecookies(cookie); + auto message{ + std::make_shared(ledgerData, protocol::mtLEDGER_DATA)}; + peer->send(message); + + JLOG(p_journal_.debug()) + << "sendToMultiple: Sent 1 TMLedgerData message to peer [" + << peer->id() << "]: including " + << (directResponse ? "the direct response flag and " : "") + << sendCookies.size() << " response cookies. " + << ": " << sha512Half(ledgerData); + } + } + assert(foundSelf); } std::shared_ptr -PeerImp::getLedger(std::shared_ptr const& m) +PeerImp::getLedger( + std::shared_ptr const& m, + uint256 const& mHash) { - JLOG(p_journal_.trace()) << "getLedger: Ledger"; + JLOG(p_journal_.trace()) << "getLedger: Ledger " << mHash; std::shared_ptr ledger; @@ -2993,22 +3217,33 @@ PeerImp::getLedger(std::shared_ptr const& m) if (m->has_querytype() && !m->has_requestcookie()) { // Attempt to relay the request to a peer + // Note repeated messages will not relay to the same peer + // before `getLedgerInterval` seconds. This prevents one + // peer from getting flooded, and distributes the request + // load. If a request has been relayed to all eligible + // peers, then this message will not be relayed. if (auto const peer = getPeerWithLedger( overlay_, ledgerHash, m->has_ledgerseq() ? m->ledgerseq() : 0, - this)) + this, + [&](Peer::id_t id) { + return app_.getHashRouter().shouldProcessForPeer( + mHash, id, getledgerInterval); + })) { m->set_requestcookie(id()); peer->send( std::make_shared(*m, protocol::mtGET_LEDGER)); JLOG(p_journal_.debug()) - << "getLedger: Request relayed to peer"; + << "getLedger: Request relayed to peer [" << peer->id() + << "]: " << mHash; return ledger; } JLOG(p_journal_.trace()) - << "getLedger: Failed to find peer to relay request"; + << "getLedger: Don't have ledger with hash " << ledgerHash + << ": " << mHash; } } } @@ -3018,7 +3253,7 @@ PeerImp::getLedger(std::shared_ptr const& m) if (m->ledgerseq() < app_.getLedgerMaster().getEarliestFetch()) { JLOG(p_journal_.debug()) - << "getLedger: Early ledger sequence request"; + << "getLedger: Early ledger sequence request " << mHash; } else { @@ -3027,7 +3262,7 @@ PeerImp::getLedger(std::shared_ptr const& m) { JLOG(p_journal_.debug()) << "getLedger: Don't have ledger with sequence " - << m->ledgerseq(); + << m->ledgerseq() << ": " << mHash; } } } @@ -3049,29 +3284,33 @@ PeerImp::getLedger(std::shared_ptr const& m) charge(Resource::feeInvalidRequest); ledger.reset(); - JLOG(p_journal_.warn()) - << "getLedger: Invalid ledger sequence " << ledgerSeq; + JLOG(p_journal_.warn()) << "getLedger: Invalid ledger sequence " + << ledgerSeq << ": " << mHash; } } else if (ledgerSeq < app_.getLedgerMaster().getEarliestFetch()) { ledger.reset(); JLOG(p_journal_.debug()) - << "getLedger: Early ledger sequence request " << ledgerSeq; + << "getLedger: Early ledger sequence request " << ledgerSeq + << ": " << mHash; } } else { - JLOG(p_journal_.debug()) << "getLedger: Unable to find ledger"; + JLOG(p_journal_.debug()) + << "getLedger: Unable to find ledger " << mHash; } return ledger; } std::shared_ptr -PeerImp::getTxSet(std::shared_ptr const& m) const +PeerImp::getTxSet( + std::shared_ptr const& m, + uint256 const& mHash) const { - JLOG(p_journal_.trace()) << "getTxSet: TX set"; + JLOG(p_journal_.trace()) << "getTxSet: TX set " << mHash; uint256 const txSetHash{m->ledgerhash()}; std::shared_ptr shaMap{ @@ -3081,22 +3320,34 @@ PeerImp::getTxSet(std::shared_ptr const& m) const if (m->has_querytype() && !m->has_requestcookie()) { // Attempt to relay the request to a peer - if (auto const peer = getPeerWithTree(overlay_, txSetHash, this)) + // Note repeated messages will not relay to the same peer + // before `getLedgerInterval` seconds. This prevents one + // peer from getting flooded, and distributes the request + // load. If a request has been relayed to all eligible + // peers, then this message will not be relayed. + if (auto const peer = getPeerWithTree( + overlay_, txSetHash, this, [&](Peer::id_t id) { + return app_.getHashRouter().shouldProcessForPeer( + mHash, id, getledgerInterval); + })) { m->set_requestcookie(id()); peer->send( std::make_shared(*m, protocol::mtGET_LEDGER)); - JLOG(p_journal_.debug()) << "getTxSet: Request relayed"; + JLOG(p_journal_.debug()) + << "getTxSet: Request relayed to peer [" << peer->id() + << "]: " << mHash; } else { JLOG(p_journal_.debug()) - << "getTxSet: Failed to find relay peer"; + << "getTxSet: Failed to find relay peer: " << mHash; } } else { - JLOG(p_journal_.debug()) << "getTxSet: Failed to find TX set"; + JLOG(p_journal_.debug()) + << "getTxSet: Failed to find TX set " << mHash; } } @@ -3104,7 +3355,9 @@ PeerImp::getTxSet(std::shared_ptr const& m) const } void -PeerImp::processLedgerRequest(std::shared_ptr const& m) +PeerImp::processLedgerRequest( + std::shared_ptr const& m, + uint256 const& mHash) { // Do not resource charge a peer responding to a relay if (!m->has_requestcookie()) @@ -3117,9 +3370,77 @@ PeerImp::processLedgerRequest(std::shared_ptr const& m) bool fatLeaves{true}; auto const itype{m->itype()}; + auto getDestinations = [&] { + // If a ledger data message is generated, it's going to be sent to every + // peer that is waiting for it. + + PeerCookieMap result; + + std::size_t numCookies = 0; + { + // Don't do the work under this peer if this peer is not waiting for + // any replies + auto myCookies = releaseRequestCookies(mHash); + if (myCookies.empty()) + { + JLOG(p_journal_.debug()) << "TMGetLedger: peer is no longer " + "waiting for response to request: " + << mHash; + return result; + } + numCookies += myCookies.size(); + result[shared_from_this()] = myCookies; + } + + std::set peers; + app_.getHashRouter().forEachPeer( + mHash, [&](HashRouter::PeerShortID const& peerID) { + // callback is called under lock, so finish as fast as + // possible. + peers.insert(peerID); + return true; + }); + for (auto const peerID : peers) + { + // This loop does not need to be done under the HashRouter + // lock because findPeerByShortID and releaseRequestCookies + // are thread safe, and everything else is local + if (auto p = overlay_.findPeerByShortID(peerID)) + { + auto cookies = p->releaseRequestCookies(mHash); + numCookies += cookies.size(); + if (result.contains(p)) + { + // Unlikely, but if a request came in to this peer while + // iterating, add the items instead of copying / + // overwriting. + assert(p.get() == this); + for (auto const& cookie : cookies) + result[p].emplace(cookie); + } + else if (cookies.size()) + result[p] = cookies; + } + } + + JLOG(p_journal_.debug()) + << "TMGetLedger: Processing request for " << result.size() + << " peers. Will send " << numCookies + << " messages if successful: " << mHash; + + return result; + }; + // Will only populate this if we're going to do work. + PeerCookieMap destinations; + if (itype == protocol::liTS_CANDIDATE) { - if (sharedMap = getTxSet(m); !sharedMap) + destinations = getDestinations(); + if (destinations.empty()) + // Nowhere to send the response! + return; + + if (sharedMap = getTxSet(m, mHash); !sharedMap) return; map = sharedMap.get(); @@ -3127,8 +3448,6 @@ PeerImp::processLedgerRequest(std::shared_ptr const& m) ledgerData.set_ledgerseq(0); ledgerData.set_ledgerhash(m->ledgerhash()); ledgerData.set_type(protocol::liTS_CANDIDATE); - if (m->has_requestcookie()) - ledgerData.set_requestcookie(m->requestcookie()); // We'll already have most transactions fatLeaves = false; @@ -3147,7 +3466,12 @@ PeerImp::processLedgerRequest(std::shared_ptr const& m) return; } - if (ledger = getLedger(m); !ledger) + destinations = getDestinations(); + if (destinations.empty()) + // Nowhere to send the response! + return; + + if (ledger = getLedger(m, mHash); !ledger) return; // Fill out the reply @@ -3155,13 +3479,11 @@ PeerImp::processLedgerRequest(std::shared_ptr const& m) ledgerData.set_ledgerhash(ledgerHash.begin(), ledgerHash.size()); ledgerData.set_ledgerseq(ledger->info().seq); ledgerData.set_type(itype); - if (m->has_requestcookie()) - ledgerData.set_requestcookie(m->requestcookie()); switch (itype) { case protocol::liBASE: - sendLedgerBase(ledger, ledgerData); + sendLedgerBase(ledger, ledgerData, destinations); return; case protocol::liTX_NODE: @@ -3272,7 +3594,7 @@ PeerImp::processLedgerRequest(std::shared_ptr const& m) << ledgerData.nodes_size() << " nodes"; } - send(std::make_shared(ledgerData, protocol::mtLEDGER_DATA)); + sendToMultiple(ledgerData, destinations); } int @@ -3330,6 +3652,19 @@ PeerImp::reduceRelayReady() return vpReduceRelayEnabled_ && reduceRelayReady_; } +std::set> +PeerImp::releaseRequestCookies(uint256 const& requestHash) +{ + std::set> result; + std::lock_guard lock(cookieLock_); + if (messageRequestCookies_.contains(requestHash)) + { + std::swap(result, messageRequestCookies_[requestHash]); + messageRequestCookies_.erase(requestHash); + } + return result; +}; + void PeerImp::Metrics::add_message(std::uint64_t bytes) { diff --git a/src/xrpld/overlay/detail/PeerImp.h b/src/xrpld/overlay/detail/PeerImp.h index 9c76ddb4db8..5cfd52f8eaf 100644 --- a/src/xrpld/overlay/detail/PeerImp.h +++ b/src/xrpld/overlay/detail/PeerImp.h @@ -176,6 +176,15 @@ class PeerImp : public Peer, bool ledgerReplayEnabled_ = false; LedgerReplayMsgHandler ledgerReplayMsgHandler_; + // Track message requests and responses + // TODO: Use an expiring cache or something + using MessageCookieMap = + std::map>>; + using PeerCookieMap = + std::map, std::set>>; + std::mutex mutable cookieLock_; + MessageCookieMap messageRequestCookies_; + friend class OverlayImpl; class Metrics @@ -422,6 +431,13 @@ class PeerImp : public Peer, return txReduceRelayEnabled_; } + // + // Messages + // + + std::set> + releaseRequestCookies(uint256 const& requestHash) override; + private: void close(); @@ -615,16 +631,28 @@ class PeerImp : public Peer, void sendLedgerBase( std::shared_ptr const& ledger, - protocol::TMLedgerData& ledgerData); + protocol::TMLedgerData& ledgerData, + PeerCookieMap const& destinations); + + void + sendToMultiple( + protocol::TMLedgerData& ledgerData, + PeerCookieMap const& destinations); std::shared_ptr - getLedger(std::shared_ptr const& m); + getLedger( + std::shared_ptr const& m, + uint256 const& mHash); std::shared_ptr - getTxSet(std::shared_ptr const& m) const; + getTxSet( + std::shared_ptr const& m, + uint256 const& mHash) const; void - processLedgerRequest(std::shared_ptr const& m); + processLedgerRequest( + std::shared_ptr const& m, + uint256 const& mHash); }; //------------------------------------------------------------------------------ diff --git a/src/xrpld/overlay/detail/PeerSet.cpp b/src/xrpld/overlay/detail/PeerSet.cpp index 909b20c3079..cb7b77db7fc 100644 --- a/src/xrpld/overlay/detail/PeerSet.cpp +++ b/src/xrpld/overlay/detail/PeerSet.cpp @@ -18,9 +18,11 @@ //============================================================================== #include +#include #include #include #include +#include namespace ripple { @@ -104,16 +106,52 @@ PeerSetImpl::sendRequest( std::shared_ptr const& peer) { auto packet = std::make_shared(message, type); + + auto const messageHash = [&]() { + auto const packetBuffer = + packet->getBuffer(compression::Compressed::Off); + return sha512Half(Slice(packetBuffer.data(), packetBuffer.size())); + }(); + + // Allow messages to be re-sent to the same peer after a delay + using namespace std::chrono_literals; + constexpr std::chrono::seconds interval = 30s; + if (peer) { - peer->send(packet); + if (app_.getHashRouter().shouldProcessForPeer( + messageHash, peer->id(), interval)) + { + JLOG(journal_.trace()) + << "Sending " << protocolMessageName(type) << " message to [" + << peer->id() << "]: " << messageHash; + peer->send(packet); + } + else + JLOG(journal_.debug()) + << "Suppressing sending duplicate " << protocolMessageName(type) + << " message to [" << peer->id() << "]: " << messageHash; return; } for (auto id : peers_) { if (auto p = app_.overlay().findPeerByShortID(id)) - p->send(packet); + { + if (app_.getHashRouter().shouldProcessForPeer( + messageHash, p->id(), interval)) + { + JLOG(journal_.trace()) + << "Sending " << protocolMessageName(type) + << " message to [" << p->id() << "]: " << messageHash; + p->send(packet); + } + else + JLOG(journal_.debug()) + << "Suppressing sending duplicate " + << protocolMessageName(type) << " message to [" << p->id() + << "]: " << messageHash; + } } } diff --git a/src/xrpld/overlay/detail/ProtocolMessage.h b/src/xrpld/overlay/detail/ProtocolMessage.h index 8a7512afb31..2e9837ebacf 100644 --- a/src/xrpld/overlay/detail/ProtocolMessage.h +++ b/src/xrpld/overlay/detail/ProtocolMessage.h @@ -43,6 +43,12 @@ protocolMessageType(protocol::TMGetLedger const&) return protocol::mtGET_LEDGER; } +inline protocol::MessageType +protocolMessageType(protocol::TMLedgerData const&) +{ + return protocol::mtLEDGER_DATA; +} + inline protocol::MessageType protocolMessageType(protocol::TMReplayDeltaRequest const&) { @@ -484,4 +490,64 @@ invokeProtocolMessage( } // namespace ripple +namespace protocol { + +template +void +hash_append(Hasher& h, TMGetLedger const& msg) +{ + using beast::hash_append; + using namespace ripple; + hash_append(h, safe_cast(protocolMessageType(msg))); + hash_append(h, safe_cast(msg.itype())); + if (msg.has_ltype()) + hash_append(h, safe_cast(msg.ltype())); + + if (msg.has_ledgerhash()) + hash_append(h, msg.ledgerhash()); + + if (msg.has_ledgerseq()) + hash_append(h, msg.ledgerseq()); + + for (auto const& nodeId : msg.nodeids()) + hash_append(h, nodeId); + hash_append(h, msg.nodeids_size()); + + // Do NOT include the request cookie. It does not affect the content of the + // request, but only where to route the results. + // if (msg.has_requestcookie()) + // hash_append(h, msg.requestcookie()); + + if (msg.has_querytype()) + hash_append(h, safe_cast(msg.querytype())); + + if (msg.has_querydepth()) + hash_append(h, msg.querydepth()); +} + +template +void +hash_append(Hasher& h, TMLedgerData const& msg) +{ + using beast::hash_append; + using namespace ripple; + hash_append(h, safe_cast(protocolMessageType(msg))); + hash_append(h, msg.ledgerhash()); + hash_append(h, msg.ledgerseq()); + hash_append(h, safe_cast(msg.type())); + for (auto const& node : msg.nodes()) + { + hash_append(h, node.nodedata()); + if (node.has_nodeid()) + hash_append(h, node.nodeid()); + } + hash_append(h, msg.nodes_size()); + if (msg.has_requestcookie()) + hash_append(h, msg.requestcookie()); + if (msg.has_error()) + hash_append(h, safe_cast(msg.error())); +} + +} // namespace protocol + #endif diff --git a/src/xrpld/overlay/detail/ProtocolVersion.cpp b/src/xrpld/overlay/detail/ProtocolVersion.cpp index 0fecb301f7f..ce6c1e6fa3d 100644 --- a/src/xrpld/overlay/detail/ProtocolVersion.cpp +++ b/src/xrpld/overlay/detail/ProtocolVersion.cpp @@ -37,7 +37,9 @@ namespace ripple { constexpr ProtocolVersion const supportedProtocolList[] { {2, 1}, - {2, 2} + {2, 2}, + // Adds TMLedgerData::responseCookies and directResponse + {2, 3} }; // clang-format on