Skip to content

Commit

Permalink
Merge pull request #1585 from MonsieurNicolas/prefPeersAlways
Browse files Browse the repository at this point in the history
Always connect to preferred peers

Reviewed-by: MonsieurNicolas
  • Loading branch information
latobarita authored Mar 7, 2018
2 parents 1909c07 + 89b10a1 commit 74cae1e
Show file tree
Hide file tree
Showing 9 changed files with 276 additions and 101 deletions.
11 changes: 9 additions & 2 deletions src/database/Database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ using namespace std;

bool Database::gDriversRegistered = false;

static unsigned long const SCHEMA_VERSION = 5;
static unsigned long const SCHEMA_VERSION = 6;

static void
setSerializable(soci::session& sess)
Expand Down Expand Up @@ -140,7 +140,9 @@ Database::applySchemaUpgrade(unsigned long vers)
}
}
break;

case 6:
mSession << "ALTER TABLE peers ADD flags INT NOT NULL DEFAULT 0";
break;
default:
throw std::runtime_error("Unknown DB schema version");
break;
Expand Down Expand Up @@ -453,6 +455,11 @@ Database::recentIdleDbPercent()
std::chrono::nanoseconds total = mApp.getClock().now() - mLastIdleTotalTime;
total -= mExcludedTotalTime;

if (total == std::chrono::nanoseconds::zero())
{
return 100;
}

uint32_t queryPercent =
static_cast<uint32_t>((100 * query.count()) / total.count());
uint32_t idlePercent = 100 - queryPercent;
Expand Down
63 changes: 47 additions & 16 deletions src/overlay/OverlayManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,10 @@ OverlayManagerImpl::storePeerList(std::vector<std::string> const& list,
try
{
auto pr = PeerRecord::parseIPPort(peerStr, mApp);
pr.setPreferred(preferred);
if (resetBackOff)
{
pr.resetBackOff(mApp.getClock(), preferred);
pr.resetBackOff(mApp.getClock());
pr.storePeerRecord(mApp.getDatabase());
}
else
Expand Down Expand Up @@ -190,31 +191,50 @@ OverlayManagerImpl::storeConfigPeers()
storePeerList(ppeers, true, true);
}

void
OverlayManagerImpl::connectToMorePeers(int max)
std::vector<PeerRecord>
OverlayManagerImpl::getPreferredPeersFromConfig()
{
const int batchSize = std::max(10, max);
std::vector<PeerRecord> peers;
for (auto& pp : mPreferredPeers)
{
auto prParsed = PeerRecord::parseIPPort(pp, mApp);
if (!getConnectedPeer(prParsed.ip(), prParsed.port()))
{
auto pr = PeerRecord::loadPeerRecord(
mApp.getDatabase(), prParsed.ip(), prParsed.port());
if (pr && pr->mNextAttempt <= mApp.getClock().now())
{
peers.emplace_back(*pr);
}
}
}
return peers;
}

// load best candidates from the database,
// when PREFERRED_PEER_ONLY is set and we connect to a non
// preferred_peer we just end up dropping & backing off
// it during handshake (this allows for preferred_peers
// to work for both ip based and key based preferred mode)
std::vector<PeerRecord>
OverlayManagerImpl::getPeersToConnectTo(int maxNum)
{
const int batchSize = std::max(20, maxNum);

vector<PeerRecord> peers;
std::vector<PeerRecord> peers;

PeerRecord::loadPeerRecords(mApp.getDatabase(), batchSize,
mApp.getClock().now(),
[&](PeerRecord const& pr) {
// skip peers that we're already connected
// to
// skip peers that we're already
// connected/connecting to
if (!getConnectedPeer(pr.ip(), pr.port()))
{
peers.emplace_back(pr);
}
return peers.size() < max;
return peers.size() < maxNum;
});
return peers;
}

void
OverlayManagerImpl::connectToMorePeers(vector<PeerRecord>& peers)
{
orderByPreferredPeers(peers);

for (auto& pr : peers)
Expand All @@ -223,8 +243,9 @@ OverlayManagerImpl::connectToMorePeers(int max)
{
continue;
}
if (getAuthenticatedPeersCount() >=
mApp.getConfig().TARGET_PEER_CONNECTIONS)
// we always try to connect to preferred peers
if (!pr.isPreferred() && getAuthenticatedPeersCount() >=
mApp.getConfig().TARGET_PEER_CONNECTIONS)
{
break;
}
Expand All @@ -249,11 +270,21 @@ OverlayManagerImpl::tick()

mLoad.maybeShedExcessLoad(mApp);

// first, see if we should trigger connections to preferred peers
auto peers = getPreferredPeersFromConfig();
connectToMorePeers(peers);

if (getAuthenticatedPeersCount() < mApp.getConfig().TARGET_PEER_CONNECTIONS)
{
connectToMorePeers(
// load best candidates from the database,
// when PREFERRED_PEER_ONLY is set and we connect to a non
// preferred_peer we just end up dropping & backing off
// it during handshake (this allows for preferred_peers
// to work for both ip based and key based preferred mode)
peers = getPeersToConnectTo(
static_cast<int>(mApp.getConfig().TARGET_PEER_CONNECTIONS -
getAuthenticatedPeersCount()));
connectToMorePeers(peers);
}

mTimer.expires_from_now(
Expand Down
5 changes: 4 additions & 1 deletion src/overlay/OverlayManagerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class OverlayManagerImpl : public OverlayManager
Peer::pointer getConnectedPeer(std::string const& ip,
unsigned short port) override;

void connectToMorePeers(int max);
void connectToMorePeers(vector<PeerRecord>& peers);
std::vector<Peer::pointer> getRandomAuthenticatedPeers() override;

std::set<Peer::pointer> getPeersKnows(Hash const& h) override;
Expand All @@ -105,6 +105,9 @@ class OverlayManagerImpl : public OverlayManager
bool isShuttingDown() const override;

private:
std::vector<PeerRecord> getPreferredPeersFromConfig();
std::vector<PeerRecord> getPeersToConnectTo(int maxNum);

void orderByPreferredPeers(vector<PeerRecord>& peers);
bool moveToAuthenticated(Peer::pointer peer);
void updateSizeCounters();
Expand Down
15 changes: 9 additions & 6 deletions src/overlay/OverlayManagerTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,14 @@ class OverlayManagerTests
vector<string> threePeers;

OverlayManagerTests()
: app(createTestApplication<ApplicationStub>(clock, getTestConfig()))
, fourPeers(vector<string>{"127.0.0.1:2011", "127.0.0.1:2012",
: fourPeers(vector<string>{"127.0.0.1:2011", "127.0.0.1:2012",
"127.0.0.1:2013", "127.0.0.1:2014"})
, threePeers(vector<string>{"127.0.0.1:201", "127.0.0.1:202",
"127.0.0.1:203", "127.0.0.1:204"})
, threePeers(vector<string>{"127.0.0.1:64000", "127.0.0.1:64001",
"127.0.0.1:64002"})
{
auto cfg = getTestConfig();
cfg.TARGET_PEER_CONNECTIONS = 5;
app = createTestApplication<ApplicationStub>(clock, cfg);
}

void
Expand All @@ -125,7 +127,7 @@ class OverlayManagerTests
pm.storePeerList(fourPeers, false, false);

rowset<row> rs = app->getDatabase().getSession().prepare
<< "SELECT ip,port FROM peers";
<< "SELECT ip,port FROM peers ORDER BY nextattempt";
vector<string> actual;
for (auto it = rs.begin(); it != rs.end(); ++it)
actual.push_back(it->get<string>(0) + ":" +
Expand All @@ -150,7 +152,8 @@ class OverlayManagerTests

pm.storePeerList(fourPeers, false, false);
pm.storePeerList(threePeers, false, false);
pm.connectToMorePeers(5);
// connect to peers, respecting TARGET_PEER_CONNECTIONS
pm.tick();
REQUIRE(pm.mAuthenticatedPeers.size() == 5);
auto a = TestAccount{*app, getAccount("a")};
auto b = TestAccount{*app, getAccount("b")};
Expand Down
67 changes: 67 additions & 0 deletions src/overlay/OverlayTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "medida/meter.h"
#include "medida/metrics_registry.h"
#include "medida/timer.h"
#include "util/format.h"
#include <numeric>

using namespace stellar;
Expand Down Expand Up @@ -441,3 +442,69 @@ TEST_CASE("connecting to saturated nodes", "[overlay]")
REQUIRE(numberOfSimulationConnections() == 6);
simulation->crankForAtLeast(std::chrono::seconds{1}, true);
}

TEST_CASE("preferred peers always connect", "[overlay]")
{
auto networkID = sha256(getTestConfig().NETWORK_PASSPHRASE);
auto simulation =
std::make_shared<Simulation>(Simulation::OVER_TCP, networkID);

auto getConfiguration = [](int id, unsigned short targetConnections,
unsigned short peerConnections) {
auto cfg = getTestConfig(id);
cfg.MAX_PEER_CONNECTIONS = peerConnections;
cfg.TARGET_PEER_CONNECTIONS = targetConnections;
cfg.MAX_ADDITIONAL_PEER_CONNECTIONS = 0;
return cfg;
};

auto numberOfAppConnections = [](Application& app) {
return app.getOverlayManager().getAuthenticatedPeersCount();
};

Config configs[3];
for (int i = 0; i < 3; i++)
{
configs[i] = getConfiguration(i + 1, i == 0 ? 1 : 0, 2);
}

SIMULATION_CREATE_NODE(Node1);
SIMULATION_CREATE_NODE(Node2);
SIMULATION_CREATE_NODE(Node3);

SCPQuorumSet qSet;
qSet.threshold = 2;
qSet.validators.push_back(vNode1NodeID);
qSet.validators.push_back(vNode2NodeID);
qSet.validators.push_back(vNode3NodeID);

// node1 has node2 as preferred peer
configs[0].PREFERRED_PEERS.emplace_back(
fmt::format("localhost:{}", configs[1].PEER_PORT));

simulation->addNode(vNode1SecretKey, qSet, &configs[0]);
simulation->addNode(vNode2SecretKey, qSet, &configs[1]);
simulation->addNode(vNode3SecretKey, qSet, &configs[2]);

simulation->startAllNodes();
simulation->crankForAtLeast(std::chrono::seconds{3}, false);
// node1 connected to node2 (counted from both apps)
REQUIRE(numberOfAppConnections(*simulation->getNode(vNode1NodeID)) == 1);
REQUIRE(numberOfAppConnections(*simulation->getNode(vNode2NodeID)) == 1);
REQUIRE(numberOfAppConnections(*simulation->getNode(vNode3NodeID)) == 0);

// disconnect node1 and node2
simulation->dropConnection(vNode1NodeID, vNode2NodeID);
// and connect node 3 to node1 (to take the slot)
simulation->addConnection(vNode3NodeID, vNode1NodeID);
simulation->crankForAtLeast(std::chrono::seconds{1}, false);
REQUIRE(numberOfAppConnections(*simulation->getNode(vNode1NodeID)) == 1);
REQUIRE(numberOfAppConnections(*simulation->getNode(vNode2NodeID)) == 0);
REQUIRE(numberOfAppConnections(*simulation->getNode(vNode3NodeID)) == 1);

// wait a bit more, node1 connects to its preferred peer
simulation->crankForAtLeast(std::chrono::seconds{3}, true);
REQUIRE(numberOfAppConnections(*simulation->getNode(vNode1NodeID)) == 2);
REQUIRE(numberOfAppConnections(*simulation->getNode(vNode2NodeID)) == 1);
REQUIRE(numberOfAppConnections(*simulation->getNode(vNode3NodeID)) == 1);
}
4 changes: 2 additions & 2 deletions src/overlay/Peer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -887,8 +887,8 @@ Peer::noteHandshakeSuccessInPeerRecord()
getRemoteListeningPort());
if (pr)
{
pr->resetBackOff(mApp.getClock(),
mApp.getOverlayManager().isPreferred(this));
pr->setPreferred(mApp.getOverlayManager().isPreferred(this));
pr->resetBackOff(mApp.getClock());
}
else
{
Expand Down
Loading

0 comments on commit 74cae1e

Please sign in to comment.