From f7c3eaba49d0074db146bd72b2c8f6710c62ea07 Mon Sep 17 00:00:00 2001 From: Graydon Hoare Date: Fri, 8 Mar 2019 14:41:20 -0800 Subject: [PATCH] overlay: add config param PEER_STRAGGLER_TIMEOUT to control straggler drops. --- docs/stellar-core_example.cfg | 5 ++ src/main/Config.cpp | 6 +++ src/main/Config.h | 1 + src/overlay/LoopbackPeer.cpp | 27 +++++++++++ src/overlay/LoopbackPeer.h | 4 ++ src/overlay/Peer.cpp | 10 ++-- src/overlay/Peer.h | 1 + src/overlay/test/OverlayTests.cpp | 78 +++++++++++++++++++++++++++++++ 8 files changed, 129 insertions(+), 3 deletions(-) diff --git a/docs/stellar-core_example.cfg b/docs/stellar-core_example.cfg index 27637f705a..547e17a55f 100644 --- a/docs/stellar-core_example.cfg +++ b/docs/stellar-core_example.cfg @@ -117,6 +117,11 @@ PEER_AUTHENTICATION_TIMEOUT=2 # time when authenticated. PEER_TIMEOUT=30 +# PEER_STRAGGLER_TIMEOUT (Integer) default 120 +# This server will drop peer that does not drain its outgoing queue during that +# time when authenticated. +PEER_STRAGGLER_TIMEOUT=120 + # PREFERRED_PEERS (list of strings) default is empty # These are IP:port strings that this server will add to its DB of peers. # This server will try to always stay connected to the other peers on this list. diff --git a/src/main/Config.cpp b/src/main/Config.cpp index 38eaad3cc2..d3abac3c63 100644 --- a/src/main/Config.cpp +++ b/src/main/Config.cpp @@ -84,6 +84,7 @@ Config::Config() : NODE_SEED(SecretKey::random()) MAX_INBOUND_PENDING_CONNECTIONS = 0; PEER_AUTHENTICATION_TIMEOUT = 2; PEER_TIMEOUT = 30; + PEER_STRAGGLER_TIMEOUT = 120; PREFERRED_PEERS_ONLY = false; MINIMUM_IDLE_PERCENT = 0; @@ -417,6 +418,11 @@ Config::load(std::string const& filename) PEER_TIMEOUT = readInt( item, 1, std::numeric_limits::max()); } + else if (item.first == "PEER_STRAGGLER_TIMEOUT") + { + PEER_STRAGGLER_TIMEOUT = readInt( + item, 1, std::numeric_limits::max()); + } else if (item.first == "PREFERRED_PEERS") { PREFERRED_PEERS = readStringArray(item); diff --git a/src/main/Config.h b/src/main/Config.h index d5c30655c1..525cbef87a 100644 --- a/src/main/Config.h +++ b/src/main/Config.h @@ -167,6 +167,7 @@ class Config : public std::enable_shared_from_this unsigned short MAX_OUTBOUND_PENDING_CONNECTIONS; unsigned short PEER_AUTHENTICATION_TIMEOUT; unsigned short PEER_TIMEOUT; + unsigned short PEER_STRAGGLER_TIMEOUT; static constexpr auto const POSSIBLY_PREFERRED_EXTRA = 2; // Peers we will always try to stay connected to diff --git a/src/overlay/LoopbackPeer.cpp b/src/overlay/LoopbackPeer.cpp index a89896af1f..81b4bdd4c6 100644 --- a/src/overlay/LoopbackPeer.cpp +++ b/src/overlay/LoopbackPeer.cpp @@ -10,6 +10,7 @@ #include "overlay/OverlayManager.h" #include "overlay/StellarXDR.h" #include "util/Logging.h" +#include "util/Math.h" #include "xdrpp/marshal.h" namespace stellar @@ -63,6 +64,16 @@ LoopbackPeer::sendMessage(xdr::msg_ptr&& msg) // Possibly flush some queued messages if queue's full. while (mOutQueue.size() > mMaxQueueDepth && !mCorked) { + // If our recipient is straggling, we will break off sending 50% of the + // time even when we have more things to send, causing the outbound + // queue to back up gradually. + auto remote = mRemote.lock(); + if (remote && remote->getStraggling() && rand_flip()) + { + CLOG(INFO, "Overlay") << "LoopbackPeer recipient straggling, " + << "outbound queue at " << mOutQueue.size(); + break; + } deliverOne(); } } @@ -212,6 +223,10 @@ LoopbackPeer::deliverOne() } LoadManager::PeerContext loadCtx(mApp, mPeerID); mLastWrite = mApp.getClock().now(); + if (mOutQueue.empty()) + { + mLastEmpty = mApp.getClock().now(); + } mMessageWrite.Mark(); mByteWrite.Mark(nBytes); @@ -269,6 +284,18 @@ LoopbackPeer::setCorked(bool c) mCorked = c; } +bool +LoopbackPeer::getStraggling() const +{ + return mStraggling; +} + +void +LoopbackPeer::setStraggling(bool s) +{ + mStraggling = s; +} + size_t LoopbackPeer::getMaxQueueDepth() const { diff --git a/src/overlay/LoopbackPeer.h b/src/overlay/LoopbackPeer.h index 36a4d6e4ab..5b7799f686 100644 --- a/src/overlay/LoopbackPeer.h +++ b/src/overlay/LoopbackPeer.h @@ -29,6 +29,7 @@ class LoopbackPeer : public Peer std::queue mInQueue; // receiving queue bool mCorked{false}; + bool mStraggling{false}; size_t mMaxQueueDepth{0}; bool mDamageCert{false}; @@ -79,6 +80,9 @@ class LoopbackPeer : public Peer bool getCorked() const; void setCorked(bool c); + bool getStraggling() const; + void setStraggling(bool s); + size_t getMaxQueueDepth() const; void setMaxQueueDepth(size_t sz); diff --git a/src/overlay/Peer.cpp b/src/overlay/Peer.cpp index c5e52f7038..e3ab254c70 100644 --- a/src/overlay/Peer.cpp +++ b/src/overlay/Peer.cpp @@ -74,6 +74,8 @@ Peer::Peer(Application& app, PeerRole role) app.getMetrics().NewMeter({"overlay", "error", "write"}, "error")) , mTimeoutIdle( app.getMetrics().NewMeter({"overlay", "timeout", "idle"}, "timeout")) + , mTimeoutStraggler(app.getMetrics().NewMeter( + {"overlay", "timeout", "straggler"}, "timeout")) , mRecvErrorTimer(app.getMetrics().NewTimer({"overlay", "recv", "error"})) , mRecvHelloTimer(app.getMetrics().NewTimer({"overlay", "recv", "hello"})) @@ -217,17 +219,19 @@ Peer::idleTimerExpired(asio::error_code const& error) { auto now = mApp.getClock().now(); auto timeout = getIOTimeout(); + auto stragglerTimeout = + std::chrono::seconds(mApp.getConfig().PEER_STRAGGLER_TIMEOUT); if (((now - mLastRead) >= timeout) && ((now - mLastWrite) >= timeout)) { CLOG(WARNING, "Overlay") << "idle timeout for peer " << toString(); mTimeoutIdle.Mark(); drop(Peer::DropMode::IGNORE_WRITE_QUEUE); } - else if (((now - mLastEmpty) >= timeout)) + else if (((now - mLastEmpty) >= stragglerTimeout)) { CLOG(WARNING, "Overlay") - << "peer " << toString() << " cannot keep up"; - mTimeoutIdle.Mark(); + << "peer " << toString() << " straggling (cannot keep up)"; + mTimeoutStraggler.Mark(); drop(Peer::DropMode::IGNORE_WRITE_QUEUE); } else diff --git a/src/overlay/Peer.h b/src/overlay/Peer.h index e76b0235e9..8d8891eabe 100644 --- a/src/overlay/Peer.h +++ b/src/overlay/Peer.h @@ -91,6 +91,7 @@ class Peer : public std::enable_shared_from_this, medida::Meter& mErrorRead; medida::Meter& mErrorWrite; medida::Meter& mTimeoutIdle; + medida::Meter& mTimeoutStraggler; medida::Timer& mRecvErrorTimer; medida::Timer& mRecvHelloTimer; diff --git a/src/overlay/test/OverlayTests.cpp b/src/overlay/test/OverlayTests.cpp index 6434595361..b80a672553 100644 --- a/src/overlay/test/OverlayTests.cpp +++ b/src/overlay/test/OverlayTests.cpp @@ -600,6 +600,84 @@ TEST_CASE("reject peers who dont handshake quickly", "[overlay][connections]") } } +TEST_CASE("drop peers who straggle", "[overlay][connections][straggler]") +{ + auto test = [](unsigned short stragglerTimeout) { + VirtualClock clock; + Config cfg1 = getTestConfig(0); + Config cfg2 = getTestConfig(1); + + // Straggler detection piggy-backs on the idle timer so we drive + // the test from idle-timer-firing granularity. + assert(cfg1.PEER_TIMEOUT == cfg2.PEER_TIMEOUT); + assert(cfg1.PEER_TIMEOUT > stragglerTimeout); + + // Initiator (cfg1) will straggle, and acceptor (cfg2) will notice and + // disconnect. + cfg2.PEER_STRAGGLER_TIMEOUT = stragglerTimeout; + + auto app1 = createTestApplication(clock, cfg1); + auto app2 = createTestApplication(clock, cfg2); + auto waitTime = std::chrono::seconds(cfg2.PEER_TIMEOUT * 5); + auto padTime = std::chrono::seconds(5); + + LoopbackPeerConnection conn(*app1, *app2); + auto start = clock.now(); + + testutil::crankSome(clock); + REQUIRE(conn.getInitiator()->isAuthenticated()); + REQUIRE(conn.getAcceptor()->isAuthenticated()); + + conn.getInitiator()->setStraggling(true); + auto straggler = conn.getInitiator(); + VirtualTimer sendTimer(*app1); + + while (clock.now() < (start + waitTime) && + (conn.getInitiator()->isConnected() || + conn.getAcceptor()->isConnected())) + { + LOG(INFO) << "clock.now() = " + << clock.now().time_since_epoch().count(); + + // Straggler keeps asking for peers 10 times per second -- this is + // easy traffic to fake-generate -- but not accepting response + // messages in a timely fashion. + sendTimer.expires_from_now(std::chrono::milliseconds(100)); + sendTimer.async_wait([straggler](asio::error_code const& error) { + if (!error) + { + straggler->sendGetPeers(); + } + }); + clock.crank(false); + } + LOG(INFO) << "loop complete, clock.now() = " + << clock.now().time_since_epoch().count(); + REQUIRE(clock.now() < (start + waitTime + padTime)); + REQUIRE(!conn.getInitiator()->isConnected()); + REQUIRE(!conn.getAcceptor()->isConnected()); + REQUIRE(app1->getMetrics() + .NewMeter({"overlay", "timeout", "idle"}, "timeout") + .count() == 0); + REQUIRE(app2->getMetrics() + .NewMeter({"overlay", "timeout", "idle"}, "timeout") + .count() == 0); + REQUIRE(app2->getMetrics() + .NewMeter({"overlay", "timeout", "straggler"}, "timeout") + .count() != 0); + }; + + SECTION("5 seconds straggle timeout") + { + test(5); + } + + SECTION("28 seconds straggle timeout") + { + test(28); + } +} + TEST_CASE("reject peers with the same nodeid", "[overlay][connections]") { VirtualClock clock;