Skip to content

Commit

Permalink
Merge pull request #3655 from SirTyson/sync-fix
Browse files Browse the repository at this point in the history
Online catchup no longer waits for trigger ledger

Reviewed-by: marta-lokhova
  • Loading branch information
latobarita authored Apr 21, 2023
2 parents 16cea83 + 9f029fe commit bff2c2b
Show file tree
Hide file tree
Showing 21 changed files with 333 additions and 54 deletions.
3 changes: 2 additions & 1 deletion docs/stellar-core_example.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,8 @@ DISABLE_XDR_FSYNC=false
# Most people should leave this to 12
# Number of most recent ledgers keep in memory. Storing more ledgers allows other
# nodes to join the network without catching up. This is useful for simulation
# testing purposes.
# testing purposes. Note that the SCP checkpoint message is always kept and does
# not count towards this limit.
MAX_SLOTS_TO_REMEMBER=12

# METADATA_OUTPUT_STREAM defaults to "", disabling it.
Expand Down
8 changes: 8 additions & 0 deletions src/catchup/CatchupManagerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,5 +90,13 @@ class CatchupManagerImpl : public CatchupManager
void bucketsApplied(uint32_t num) override;
void txSetsApplied(uint32_t num) override;
void fileDownloaded(std::string type, uint32_t num) override;

#ifdef BUILD_TESTS
std::map<uint32_t, LedgerCloseData> const&
getBufferedLedgers() const
{
return mSyncingLedgers;
}
#endif
};
}
2 changes: 2 additions & 0 deletions src/herder/Herder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ std::chrono::seconds const Herder::MAX_SCP_TIMEOUT_SECONDS(240);
std::chrono::seconds const Herder::CONSENSUS_STUCK_TIMEOUT_SECONDS(35);
std::chrono::seconds const Herder::OUT_OF_SYNC_RECOVERY_TIMER =
std::chrono::seconds(10);
std::chrono::seconds const Herder::SEND_LATEST_CHECKPOINT_DELAY =
std::chrono::seconds(2);
std::chrono::seconds constexpr Herder::MAX_TIME_SLIP_SECONDS;
std::chrono::seconds const Herder::NODE_EXPIRATION_SECONDS(240);
// the value of LEDGER_VALIDITY_BRACKET should be in the order of
Expand Down
9 changes: 9 additions & 0 deletions src/herder/Herder.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ class Herder
// timeout before triggering out of sync recovery
static std::chrono::seconds const OUT_OF_SYNC_RECOVERY_TIMER;

// Timeout before sending latest checkpoint ledger after sending current SCP
// state
static std::chrono::seconds const SEND_LATEST_CHECKPOINT_DELAY;

// Maximum time slip between nodes.
static std::chrono::seconds constexpr MAX_TIME_SLIP_SECONDS =
std::chrono::seconds{60};
Expand Down Expand Up @@ -162,6 +166,11 @@ class Herder
// sender in the pending or recent tx sets.
virtual SequenceNumber getMaxSeqInPendingTxs(AccountID const&) = 0;

// Returns sequence number for most recent completed checkpoint that the
// node knows about, as derived from
// trackingConsensusLedgerIndex
virtual uint32_t getMostRecentCheckpointSeq() = 0;

virtual void triggerNextLedger(uint32_t ledgerSeqToTrigger,
bool forceTrackingSCP) = 0;
virtual void setInSyncAndTriggerNextLedger() = 0;
Expand Down
104 changes: 91 additions & 13 deletions src/herder/HerderImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ HerderImpl::HerderImpl(Application& app)
, mTriggerTimer(app)
, mOutOfSyncTimer(app)
, mTxSetGarbageCollectTimer(app)
, mEarlyCatchupTimer(app)
, mApp(app)
, mLedgerManager(app.getLedgerManager())
, mSCPMetrics(app)
Expand Down Expand Up @@ -240,6 +241,7 @@ HerderImpl::shutdown()
mTrackingTimer.cancel();
mOutOfSyncTimer.cancel();
mTriggerTimer.cancel();
mEarlyCatchupTimer.cancel();
if (mLastQuorumMapIntersectionState.mRecalculating)
{
// We want to interrupt any calculation-in-progress at shutdown to
Expand Down Expand Up @@ -607,6 +609,9 @@ HerderImpl::recvSCPEnvelope(SCPEnvelope const& envelope)
return Herder::ENVELOPE_STATUS_DISCARDED;
}

auto checkpoint = getMostRecentCheckpointSeq();
auto index = envelope.statement.slotIndex;

if (isTracking())
{
// when tracking, we can filter messages based on the information we got
Expand All @@ -618,8 +623,11 @@ HerderImpl::recvSCPEnvelope(SCPEnvelope const& envelope)
// ledger closing
maxLedgerSeq = nextConsensusLedgerIndex() + LEDGER_VALIDITY_BRACKET;
}
// Allow message with a drift larger than MAXIMUM_LEDGER_CLOSETIME_DRIFT if
// it is a checkpoint message
else if (!checkCloseTime(envelope, trackingConsensusLedgerIndex() <=
LedgerManager::GENESIS_LEDGER_SEQ))
LedgerManager::GENESIS_LEDGER_SEQ) &&
index != checkpoint)
{
// if we've never been in sync, we can be more aggressive in how we
// filter messages: we can ignore messages that are unlikely to be
Expand All @@ -631,9 +639,9 @@ HerderImpl::recvSCPEnvelope(SCPEnvelope const& envelope)
return Herder::ENVELOPE_STATUS_DISCARDED;
}

// If envelopes are out of our validity brackets, we just ignore them.
if (envelope.statement.slotIndex > maxLedgerSeq ||
envelope.statement.slotIndex < minLedgerSeq)
// If envelopes are out of our validity brackets, or if envelope does not
// contain the checkpoint for early catchup, we just ignore them.
if ((index > maxLedgerSeq || index < minLedgerSeq) && index != checkpoint)
{
CLOG_TRACE(Herder, "Ignoring SCPEnvelope outside of range: {}( {},{})",
envelope.statement.slotIndex, minLedgerSeq, maxLedgerSeq);
Expand Down Expand Up @@ -726,19 +734,63 @@ HerderImpl::sendSCPStateToPeer(uint32 ledgerSeq, Peer::pointer peer)
ZoneScoped;
bool log = true;
auto maxSlots = Herder::LEDGER_VALIDITY_BRACKET;

auto sendSlot = [weakPeer = std::weak_ptr<Peer>(peer)](SCPEnvelope const& e,
bool log) {
// If in the process of shutting down, exit early
auto peerPtr = weakPeer.lock();
if (!peerPtr)
{
return false;
}

StellarMessage m;
m.type(SCP_MESSAGE);
m.envelope() = e;
auto mPtr = std::make_shared<StellarMessage const>(m);
peerPtr->sendMessage(mPtr, log);
return true;
};

bool delayCheckpoint = false;
auto checkpoint = getMostRecentCheckpointSeq();
auto consensusIndex = trackingConsensusLedgerIndex();
auto firstSequentialLedgerSeq =
consensusIndex > mApp.getConfig().MAX_SLOTS_TO_REMEMBER
? consensusIndex - mApp.getConfig().MAX_SLOTS_TO_REMEMBER
: LedgerManager::GENESIS_LEDGER_SEQ;

// If there is a gap between the latest completed checkpoint and the next
// saved message, we should delay sending the checkpoint ledger. Send all
// other messages first, then send checkpoint messages after node that is
// catching up knows network state. We need to do this because checkpoint
// message are almost always outside MAXIMUM_LEDGER_CLOSETIME_DRIFT.
// Checkpoint ledgers are special cased to be allowed to be outside this
// range, but to determine if a message is a checkpoint message, the node
// needs the correct trackingConsensusLedgerIndex. We send the checkpoint
// message after a delay so that the recieving node has time to process the
// initially sent messages and establish trackingConsensusLedgerIndex
if (checkpoint < firstSequentialLedgerSeq)
{
delayCheckpoint = true;
}

// Send MAX_SLOTS_TO_SEND slots
getSCP().processSlotsAscendingFrom(ledgerSeq, [&](uint64 seq) {
// Skip checkpoint ledger if we should delay
if (seq == checkpoint && delayCheckpoint)
{
return true;
}

bool slotHadData = false;
getSCP().processCurrentState(
seq,
[&](SCPEnvelope const& e) {
StellarMessage m;
m.type(SCP_MESSAGE);
m.envelope() = e;
auto mPtr = std::make_shared<StellarMessage const>(m);
peer->sendMessage(mPtr, log);
log = false;
slotHadData = true;
return true;
auto ret = sendSlot(e, log);
log = false;
return ret;
},
false);
if (slotHadData)
Expand All @@ -747,6 +799,23 @@ HerderImpl::sendSCPStateToPeer(uint32 ledgerSeq, Peer::pointer peer)
}
return maxSlots != 0;
});

// Out of sync node needs to recieve latest messages to determine network
// state before recieving checkpoint message. Delay sending checkpoint
// ledger to achieve this
if (delayCheckpoint)
{
mEarlyCatchupTimer.expires_from_now(
Herder::SEND_LATEST_CHECKPOINT_DELAY);
mEarlyCatchupTimer.async_wait(
[checkpoint, this, sendSlot]() {
getSCP().processCurrentState(
checkpoint,
[&](SCPEnvelope const& e) { return sendSlot(e, true); },
false);
},
&VirtualTimer::onFailureNoop);
}
}

void
Expand Down Expand Up @@ -948,8 +1017,9 @@ HerderImpl::setupTriggerNextLedger()
void
HerderImpl::eraseBelow(uint32 ledgerSeq)
{
getHerderSCPDriver().purgeSlots(ledgerSeq);
mPendingEnvelopes.eraseBelow(ledgerSeq);
auto lastCheckpointSeq = getMostRecentCheckpointSeq();
getHerderSCPDriver().purgeSlots(ledgerSeq, lastCheckpointSeq);
mPendingEnvelopes.eraseBelow(ledgerSeq, lastCheckpointSeq);
auto lastIndex = trackingConsensusLedgerIndex();
mApp.getOverlayManager().clearLedgersBelow(ledgerSeq, lastIndex);
}
Expand Down Expand Up @@ -1022,6 +1092,14 @@ HerderImpl::getMaxSeqInPendingTxs(AccountID const& acc)
return mTransactionQueue.getAccountTransactionQueueInfo(acc).mMaxSeq;
}

uint32_t
HerderImpl::getMostRecentCheckpointSeq()
{
auto lastIndex = trackingConsensusLedgerIndex();
return mApp.getHistoryManager().firstLedgerInCheckpointContaining(
lastIndex);
}

void
HerderImpl::setInSyncAndTriggerNextLedger()
{
Expand Down
8 changes: 7 additions & 1 deletion src/herder/HerderImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ class HerderImpl : public Herder

SequenceNumber getMaxSeqInPendingTxs(AccountID const&) override;

uint32_t getMostRecentCheckpointSeq() override;

void triggerNextLedger(uint32_t ledgerSeqToTrigger,
bool checkTrackingSCP) override;

Expand Down Expand Up @@ -250,6 +252,8 @@ class HerderImpl : public Herder

VirtualTimer mTxSetGarbageCollectTimer;

VirtualTimer mEarlyCatchupTimer;

Application& mApp;
LedgerManager& mLedgerManager;

Expand Down Expand Up @@ -277,7 +281,9 @@ class HerderImpl : public Herder
// run a background job that re-analyzes the current quorum map.
void checkAndMaybeReanalyzeQuorumMap();

// erase all data for ledgers strictly less than ledgerSeq
// erase all data for ledgers strictly less than ledgerSeq except for the
// first ledger on the current checkpoint. Hold onto this ledger so
// peers can catchup without waiting for the next checkpoint.
void eraseBelow(uint32 ledgerSeq);

struct QuorumMapIntersectionState
Expand Down
13 changes: 10 additions & 3 deletions src/herder/HerderSCPDriver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1115,16 +1115,23 @@ HerderSCPDriver::recordSCPExecutionMetrics(uint64_t slotIndex)
}

void
HerderSCPDriver::purgeSlots(uint64_t maxSlotIndex)
HerderSCPDriver::purgeSlots(uint64_t maxSlotIndex, uint64 slotToKeep)
{
// Clean up timings map
auto it = mSCPExecutionTimes.begin();
while (it != mSCPExecutionTimes.end() && it->first < maxSlotIndex)
{
it = mSCPExecutionTimes.erase(it);
if (it->first == slotToKeep)
{
++it;
}
else
{
it = mSCPExecutionTimes.erase(it);
}
}

getSCP().purgeSlots(maxSlotIndex);
getSCP().purgeSlots(maxSlotIndex, slotToKeep);
}

void
Expand Down
2 changes: 1 addition & 1 deletion src/herder/HerderSCPDriver.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ class HerderSCPDriver : public SCPDriver
ValueWrapperPtr wrapValue(Value const& sv) override;

// clean up older slots
void purgeSlots(uint64_t maxSlotIndex);
void purgeSlots(uint64_t maxSlotIndex, uint64 slotToKeep);

double getExternalizeLag(NodeID const& id) const;

Expand Down
26 changes: 19 additions & 7 deletions src/herder/PendingEnvelopes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -655,9 +655,9 @@ PendingEnvelopes::readySlots()
}

void
PendingEnvelopes::eraseBelow(uint64 slotIndex)
PendingEnvelopes::eraseBelow(uint64 slotIndex, uint64 slotToKeep)
{
stopAllBelow(slotIndex);
stopAllBelow(slotIndex, slotToKeep);

// report only for the highest slot that we're purging
reportCostOutliersForSlot(slotIndex - 1, true);
Expand All @@ -666,7 +666,14 @@ PendingEnvelopes::eraseBelow(uint64 slotIndex)
{
if (iter->first < slotIndex)
{
iter = mEnvelopes.erase(iter);
if (iter->first == slotToKeep)
{
++iter;
}
else
{
iter = mEnvelopes.erase(iter);
}
}
else
break;
Expand All @@ -675,29 +682,34 @@ PendingEnvelopes::eraseBelow(uint64 slotIndex)
// 0 is special mark for data that we do not know the slot index
// it is used for state loaded from database
mTxSetCache.erase_if([&](TxSetFramCacheItem const& i) {
return i.first != 0 && i.first < slotIndex;
return i.first != 0 && i.first < slotIndex && i.first != slotToKeep;
});

cleanKnownData();
updateMetrics();
}

void
PendingEnvelopes::stopAllBelow(uint64 slotIndex)
PendingEnvelopes::stopAllBelow(uint64 slotIndex, uint64 slotToKeep)
{
// Before we purge a slot, check if any envelopes are still in
// "fetching" mode and attempt to record cost
for (auto it = mEnvelopes.begin();
it != mEnvelopes.end() && it->first < slotIndex; it++)
{
if (it->first == slotToKeep)
{
continue;
}

auto& envs = it->second;
for (auto const& env : envs.mFetchingEnvelopes)
{
recordReceivedCost(env.first);
}
}
mTxSetFetcher.stopFetchingBelow(slotIndex);
mQuorumSetFetcher.stopFetchingBelow(slotIndex);
mTxSetFetcher.stopFetchingBelow(slotIndex, slotToKeep);
mQuorumSetFetcher.stopFetchingBelow(slotIndex, slotToKeep);
}

void
Expand Down
7 changes: 4 additions & 3 deletions src/herder/PendingEnvelopes.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ class PendingEnvelopes

// stops all pending downloads for slots strictly below `slotIndex`
// counts partially downloaded data towards the cost for that slot
void stopAllBelow(uint64 slotIndex);
void stopAllBelow(uint64 slotIndex, uint64 slotToKeep);

public:
PendingEnvelopes(Application& app, HerderImpl& herder);
Expand Down Expand Up @@ -178,8 +178,9 @@ class PendingEnvelopes

SCPEnvelopeWrapperPtr pop(uint64 slotIndex);

// erases data for all slots strictly below `slotIndex`
void eraseBelow(uint64 slotIndex);
// erases data for all slots strictly below `slotIndex` except
// slotToKeep.
void eraseBelow(uint64 slotIndex, uint64 slotToKeep);

void forceRebuildQuorum();

Expand Down
Loading

0 comments on commit bff2c2b

Please sign in to comment.