Skip to content

Commit

Permalink
Merge pull request #2001 from graydon/straggler-config-param
Browse files Browse the repository at this point in the history
overlay: add config param PEER_STRAGGLER_TIMEOUT to control straggler drops

Reviewed-by: MonsieurNicolas
  • Loading branch information
latobarita committed Mar 9, 2019
2 parents 737f1a2 + f7c3eab commit de204d7
Show file tree
Hide file tree
Showing 8 changed files with 129 additions and 3 deletions.
5 changes: 5 additions & 0 deletions docs/stellar-core_example.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,11 @@ PEER_AUTHENTICATION_TIMEOUT=2
# time when authenticated.
PEER_TIMEOUT=30

# PEER_STRAGGLER_TIMEOUT (Integer) default 120
# This server will drop peer that does not drain its outgoing queue during that
# time when authenticated.
PEER_STRAGGLER_TIMEOUT=120

# PREFERRED_PEERS (list of strings) default is empty
# These are IP:port strings that this server will add to its DB of peers.
# This server will try to always stay connected to the other peers on this list.
Expand Down
6 changes: 6 additions & 0 deletions src/main/Config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ Config::Config() : NODE_SEED(SecretKey::random())
MAX_INBOUND_PENDING_CONNECTIONS = 0;
PEER_AUTHENTICATION_TIMEOUT = 2;
PEER_TIMEOUT = 30;
PEER_STRAGGLER_TIMEOUT = 120;
PREFERRED_PEERS_ONLY = false;

MINIMUM_IDLE_PERCENT = 0;
Expand Down Expand Up @@ -417,6 +418,11 @@ Config::load(std::string const& filename)
PEER_TIMEOUT = readInt<unsigned short>(
item, 1, std::numeric_limits<unsigned short>::max());
}
else if (item.first == "PEER_STRAGGLER_TIMEOUT")
{
PEER_STRAGGLER_TIMEOUT = readInt<unsigned short>(
item, 1, std::numeric_limits<unsigned short>::max());
}
else if (item.first == "PREFERRED_PEERS")
{
PREFERRED_PEERS = readStringArray(item);
Expand Down
1 change: 1 addition & 0 deletions src/main/Config.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ class Config : public std::enable_shared_from_this<Config>
unsigned short MAX_OUTBOUND_PENDING_CONNECTIONS;
unsigned short PEER_AUTHENTICATION_TIMEOUT;
unsigned short PEER_TIMEOUT;
unsigned short PEER_STRAGGLER_TIMEOUT;
static constexpr auto const POSSIBLY_PREFERRED_EXTRA = 2;

// Peers we will always try to stay connected to
Expand Down
27 changes: 27 additions & 0 deletions src/overlay/LoopbackPeer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "overlay/OverlayManager.h"
#include "overlay/StellarXDR.h"
#include "util/Logging.h"
#include "util/Math.h"
#include "xdrpp/marshal.h"

namespace stellar
Expand Down Expand Up @@ -63,6 +64,16 @@ LoopbackPeer::sendMessage(xdr::msg_ptr&& msg)
// Possibly flush some queued messages if queue's full.
while (mOutQueue.size() > mMaxQueueDepth && !mCorked)
{
// If our recipient is straggling, we will break off sending 50% of the
// time even when we have more things to send, causing the outbound
// queue to back up gradually.
auto remote = mRemote.lock();
if (remote && remote->getStraggling() && rand_flip())
{
CLOG(INFO, "Overlay") << "LoopbackPeer recipient straggling, "
<< "outbound queue at " << mOutQueue.size();
break;
}
deliverOne();
}
}
Expand Down Expand Up @@ -212,6 +223,10 @@ LoopbackPeer::deliverOne()
}
LoadManager::PeerContext loadCtx(mApp, mPeerID);
mLastWrite = mApp.getClock().now();
if (mOutQueue.empty())
{
mLastEmpty = mApp.getClock().now();
}
mMessageWrite.Mark();
mByteWrite.Mark(nBytes);

Expand Down Expand Up @@ -269,6 +284,18 @@ LoopbackPeer::setCorked(bool c)
mCorked = c;
}

bool
LoopbackPeer::getStraggling() const
{
return mStraggling;
}

void
LoopbackPeer::setStraggling(bool s)
{
mStraggling = s;
}

size_t
LoopbackPeer::getMaxQueueDepth() const
{
Expand Down
4 changes: 4 additions & 0 deletions src/overlay/LoopbackPeer.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class LoopbackPeer : public Peer
std::queue<xdr::msg_ptr> mInQueue; // receiving queue

bool mCorked{false};
bool mStraggling{false};
size_t mMaxQueueDepth{0};

bool mDamageCert{false};
Expand Down Expand Up @@ -79,6 +80,9 @@ class LoopbackPeer : public Peer
bool getCorked() const;
void setCorked(bool c);

bool getStraggling() const;
void setStraggling(bool s);

size_t getMaxQueueDepth() const;
void setMaxQueueDepth(size_t sz);

Expand Down
10 changes: 7 additions & 3 deletions src/overlay/Peer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ Peer::Peer(Application& app, PeerRole role)
app.getMetrics().NewMeter({"overlay", "error", "write"}, "error"))
, mTimeoutIdle(
app.getMetrics().NewMeter({"overlay", "timeout", "idle"}, "timeout"))
, mTimeoutStraggler(app.getMetrics().NewMeter(
{"overlay", "timeout", "straggler"}, "timeout"))

, mRecvErrorTimer(app.getMetrics().NewTimer({"overlay", "recv", "error"}))
, mRecvHelloTimer(app.getMetrics().NewTimer({"overlay", "recv", "hello"}))
Expand Down Expand Up @@ -217,17 +219,19 @@ Peer::idleTimerExpired(asio::error_code const& error)
{
auto now = mApp.getClock().now();
auto timeout = getIOTimeout();
auto stragglerTimeout =
std::chrono::seconds(mApp.getConfig().PEER_STRAGGLER_TIMEOUT);
if (((now - mLastRead) >= timeout) && ((now - mLastWrite) >= timeout))
{
CLOG(WARNING, "Overlay") << "idle timeout for peer " << toString();
mTimeoutIdle.Mark();
drop(Peer::DropMode::IGNORE_WRITE_QUEUE);
}
else if (((now - mLastEmpty) >= timeout))
else if (((now - mLastEmpty) >= stragglerTimeout))
{
CLOG(WARNING, "Overlay")
<< "peer " << toString() << " cannot keep up";
mTimeoutIdle.Mark();
<< "peer " << toString() << " straggling (cannot keep up)";
mTimeoutStraggler.Mark();
drop(Peer::DropMode::IGNORE_WRITE_QUEUE);
}
else
Expand Down
1 change: 1 addition & 0 deletions src/overlay/Peer.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ class Peer : public std::enable_shared_from_this<Peer>,
medida::Meter& mErrorRead;
medida::Meter& mErrorWrite;
medida::Meter& mTimeoutIdle;
medida::Meter& mTimeoutStraggler;

medida::Timer& mRecvErrorTimer;
medida::Timer& mRecvHelloTimer;
Expand Down
78 changes: 78 additions & 0 deletions src/overlay/test/OverlayTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,84 @@ TEST_CASE("reject peers who dont handshake quickly", "[overlay][connections]")
}
}

TEST_CASE("drop peers who straggle", "[overlay][connections][straggler]")
{
auto test = [](unsigned short stragglerTimeout) {
VirtualClock clock;
Config cfg1 = getTestConfig(0);
Config cfg2 = getTestConfig(1);

// Straggler detection piggy-backs on the idle timer so we drive
// the test from idle-timer-firing granularity.
assert(cfg1.PEER_TIMEOUT == cfg2.PEER_TIMEOUT);
assert(cfg1.PEER_TIMEOUT > stragglerTimeout);

// Initiator (cfg1) will straggle, and acceptor (cfg2) will notice and
// disconnect.
cfg2.PEER_STRAGGLER_TIMEOUT = stragglerTimeout;

auto app1 = createTestApplication(clock, cfg1);
auto app2 = createTestApplication(clock, cfg2);
auto waitTime = std::chrono::seconds(cfg2.PEER_TIMEOUT * 5);
auto padTime = std::chrono::seconds(5);

LoopbackPeerConnection conn(*app1, *app2);
auto start = clock.now();

testutil::crankSome(clock);
REQUIRE(conn.getInitiator()->isAuthenticated());
REQUIRE(conn.getAcceptor()->isAuthenticated());

conn.getInitiator()->setStraggling(true);
auto straggler = conn.getInitiator();
VirtualTimer sendTimer(*app1);

while (clock.now() < (start + waitTime) &&
(conn.getInitiator()->isConnected() ||
conn.getAcceptor()->isConnected()))
{
LOG(INFO) << "clock.now() = "
<< clock.now().time_since_epoch().count();

// Straggler keeps asking for peers 10 times per second -- this is
// easy traffic to fake-generate -- but not accepting response
// messages in a timely fashion.
sendTimer.expires_from_now(std::chrono::milliseconds(100));
sendTimer.async_wait([straggler](asio::error_code const& error) {
if (!error)
{
straggler->sendGetPeers();
}
});
clock.crank(false);
}
LOG(INFO) << "loop complete, clock.now() = "
<< clock.now().time_since_epoch().count();
REQUIRE(clock.now() < (start + waitTime + padTime));
REQUIRE(!conn.getInitiator()->isConnected());
REQUIRE(!conn.getAcceptor()->isConnected());
REQUIRE(app1->getMetrics()
.NewMeter({"overlay", "timeout", "idle"}, "timeout")
.count() == 0);
REQUIRE(app2->getMetrics()
.NewMeter({"overlay", "timeout", "idle"}, "timeout")
.count() == 0);
REQUIRE(app2->getMetrics()
.NewMeter({"overlay", "timeout", "straggler"}, "timeout")
.count() != 0);
};

SECTION("5 seconds straggle timeout")
{
test(5);
}

SECTION("28 seconds straggle timeout")
{
test(28);
}
}

TEST_CASE("reject peers with the same nodeid", "[overlay][connections]")
{
VirtualClock clock;
Expand Down

0 comments on commit de204d7

Please sign in to comment.