Skip to content

Commit

Permalink
Merge pull request #926 from MonsieurNicolas/scpFixupNovember17
Browse files Browse the repository at this point in the history
Scp fixup november 17

Reviewed-by: jedmccaleb
  • Loading branch information
latobarita committed Nov 18, 2015
2 parents c9bfa9f + 443b4a4 commit f5b9f4e
Show file tree
Hide file tree
Showing 3 changed files with 300 additions and 68 deletions.
151 changes: 98 additions & 53 deletions src/scp/BallotProtocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ BallotProtocol::updateCurrentValue(SCPBallot const& ballot)
bool updated = false;
if (!mCurrentBallot)
{
bumpToBallot(ballot);
bumpToBallot(ballot, true);
updated = true;
}
else
Expand All @@ -393,7 +393,7 @@ BallotProtocol::updateCurrentValue(SCPBallot const& ballot)
int comp = compareBallots(*mCurrentBallot, ballot);
if (comp < 0)
{
bumpToBallot(ballot);
bumpToBallot(ballot, true);
updated = true;
}
else if (comp > 0)
Expand All @@ -410,8 +410,6 @@ BallotProtocol::updateCurrentValue(SCPBallot const& ballot)
"a smaller value";
// can't just bump to the value as we may already have
// statements at counter+1
// bumpToBallot(SCPBallot(mCurrentBallot->counter + 1,
// ballot.value));
return false;
}
}
Expand All @@ -427,7 +425,7 @@ BallotProtocol::updateCurrentValue(SCPBallot const& ballot)
}

void
BallotProtocol::bumpToBallot(SCPBallot const& ballot)
BallotProtocol::bumpToBallot(SCPBallot const& ballot, bool check)
{
CLOG(DEBUG, "SCP") << "BallotProtocol::bumpToBallot"
<< " i: " << mSlot.getSlotIndex()
Expand All @@ -436,11 +434,14 @@ BallotProtocol::bumpToBallot(SCPBallot const& ballot)
// `bumpToBallot` should be never called once we committed.
dbgAssert(mPhase != SCP_PHASE_EXTERNALIZE);

// We should move mCurrentBallot monotonically only
dbgAssert(!mCurrentBallot || compareBallots(ballot, *mCurrentBallot) >= 0);
if (check)
{
// We should move mCurrentBallot monotonically only
dbgAssert(!mCurrentBallot ||
compareBallots(ballot, *mCurrentBallot) >= 0);
}

bool gotBumped =
!mCurrentBallot || (mCurrentBallot->counter != ballot.counter);
bool gotBumped = !mCurrentBallot || !(*mCurrentBallot == ballot);

mCurrentBallot = make_unique<SCPBallot>(ballot);

Expand Down Expand Up @@ -567,23 +568,31 @@ BallotProtocol::emitCurrentStateStatement()

bool canEmit = (mCurrentBallot != nullptr);

if (mSlot.processEnvelope(envelope, true) == SCP::EnvelopeState::VALID)
// if we generate the same envelope, don't process it again
// this can occur when updating h in PREPARE phase
// as statements only keep track of h.n (but h.x could be different)
auto lastEnv = mLatestEnvelopes.find(mSlot.getSCP().getLocalNodeID());

if (lastEnv == mLatestEnvelopes.end() || !(lastEnv->second == envelope))
{
if (canEmit &&
(!mLastEnvelope ||
isNewerStatement(mLastEnvelope->statement, envelope.statement)))
if (mSlot.processEnvelope(envelope, true) == SCP::EnvelopeState::VALID)
{
mLastEnvelope = make_unique<SCPEnvelope>(envelope);
// this will no-op if invoked from advanceSlot
// as advanceSlot consolidates all messages
sendLatestEnvelope();
if (canEmit &&
(!mLastEnvelope || isNewerStatement(mLastEnvelope->statement,
envelope.statement)))
{
mLastEnvelope = std::make_shared<SCPEnvelope>(envelope);
// this will no-op if invoked from advanceSlot
// as advanceSlot consolidates all messages sent
sendLatestEnvelope();
}
}
else
{
// there is a bug in the application if it queued up
// a statement for itself that it considers invalid
throw std::runtime_error("moved to a bad state (ballot protocol)");
}
}
else
{
// there is a bug in the application if it queued up
// a statement for itself that it considers invalid
throw std::runtime_error("moved to a bad state (ballot protocol)");
}
}

Expand Down Expand Up @@ -636,6 +645,10 @@ BallotProtocol::getPrepareCandidates(SCPStatement const& hint)
{
hintBallots.insert(*prep.prepared);
}
if (prep.preparedPrime)
{
hintBallots.insert(*prep.preparedPrime);
}
}
break;
case SCP_ST_CONFIRM:
Expand Down Expand Up @@ -681,6 +694,11 @@ BallotProtocol::getPrepareCandidates(SCPStatement const& hint)
{
candidates.insert(*prep.prepared);
}
if (prep.preparedPrime &&
areBallotsLessAndCompatible(*prep.preparedPrime, topVote))
{
candidates.insert(*prep.preparedPrime);
}
}
break;
case SCP_ST_CONFIRM:
Expand Down Expand Up @@ -715,7 +733,7 @@ BallotProtocol::updateCurrentIfNeeded()
{
if (!mCurrentBallot || compareBallots(*mCurrentBallot, *mHighBallot) < 0)
{
bumpToBallot(*mHighBallot);
bumpToBallot(*mHighBallot, true);
}
}

Expand Down Expand Up @@ -746,11 +764,23 @@ BallotProtocol::attemptPreparedAccept(SCPStatement const& hint)
}

// if we already prepared this ballot, don't bother checking again
if (mPrepared && compareBallots(ballot, *mPrepared) <= 0)

// if ballot <= p' ballot is neither a candidate for p nor p'
if (mPreparedPrime && compareBallots(ballot, *mPreparedPrime) <= 0)
{
continue;
}

if (mPrepared)
{
// if ballot is already covered by p, skip
if (areBallotsLessAndCompatible(ballot, *mPrepared))
{
continue;
}
// otherwise, there is a chance it increases p'
}

bool accepted = federatedAccept(
// checks if any node is voting for this ballot
[&ballot, this](SCPStatement const& st)
Expand Down Expand Up @@ -1200,35 +1230,32 @@ BallotProtocol::setAcceptCommit(SCPBallot const& c, SCPBallot const& h)

bool didWork = false;

if (mCurrentBallot && compareBallots(h, *mCurrentBallot) < 0 &&
!areBallotsCompatible(h, *mCurrentBallot))
if (!mHighBallot || !mCommit || compareBallots(*mHighBallot, h) != 0 ||
compareBallots(*mCommit, c) != 0)
{
// we would end up with h < b and !(h ~ b) which is an invalid state
CLOG(DEBUG, "SCP") << "Ignoring new values: would violate h ~ b";
mCommit = make_unique<SCPBallot>(c);
mHighBallot = make_unique<SCPBallot>(h);

didWork = true;
}
else

if (mPhase == SCP_PHASE_PREPARE)
{
if (!mHighBallot || !mCommit || compareBallots(*mHighBallot, h) != 0 ||
compareBallots(*mCommit, c) != 0)
mPhase = SCP_PHASE_CONFIRM;
if (mCurrentBallot && !areBallotsLessAndCompatible(h, *mCurrentBallot))
{
mCommit = make_unique<SCPBallot>(c);
mHighBallot = make_unique<SCPBallot>(h);
didWork = true;
bumpToBallot(h, false);
}

if (mPhase == SCP_PHASE_PREPARE)
{
mPhase = SCP_PHASE_CONFIRM;
didWork = true;
}
didWork = true;
}

if (didWork)
{
updateCurrentIfNeeded();
if (didWork)
{
updateCurrentIfNeeded();

mSlot.getSCPDriver().acceptedCommit(mSlot.getSlotIndex(), h);
emitCurrentStateStatement();
}
mSlot.getSCPDriver().acceptedCommit(mSlot.getSlotIndex(), h);
emitCurrentStateStatement();
}

return didWork;
Expand Down Expand Up @@ -1434,7 +1461,10 @@ BallotProtocol::hasPreparedBallot(SCPBallot const& ballot,
case SCP_ST_PREPARE:
{
auto const& p = st.pledges.prepare();
res = p.prepared && areBallotsLessAndCompatible(ballot, *p.prepared);
res =
(p.prepared && areBallotsLessAndCompatible(ballot, *p.prepared)) ||
(p.preparedPrime &&
areBallotsLessAndCompatible(ballot, *p.preparedPrime));
}
break;
case SCP_ST_CONFIRM:
Expand Down Expand Up @@ -1510,7 +1540,8 @@ BallotProtocol::setPrepared(SCPBallot const& ballot)

if (mPrepared)
{
if (compareBallots(*mPrepared, ballot) < 0)
int comp = compareBallots(*mPrepared, ballot);
if (comp < 0)
{
if (!areBallotsCompatible(*mPrepared, ballot))
{
Expand All @@ -1519,6 +1550,15 @@ BallotProtocol::setPrepared(SCPBallot const& ballot)
mPrepared = make_unique<SCPBallot>(ballot);
didWork = true;
}
else if (comp > 0)
{
// check if we should update only p'
if (!mPreparedPrime || compareBallots(*mPreparedPrime, ballot) < 0)
{
mPreparedPrime = make_unique<SCPBallot>(ballot);
didWork = true;
}
}
}
else
{
Expand Down Expand Up @@ -1609,7 +1649,8 @@ BallotProtocol::setStateFromEnvelope(SCPEnvelope const& e)

recordEnvelope(e);

mLastEnvelope = make_unique<SCPEnvelope>(e);
mLastEnvelope = std::make_shared<SCPEnvelope>(e);
mLastEnvelopeEmit = mLastEnvelope;

auto const& pl = e.statement.pledges;

Expand All @@ -1619,7 +1660,7 @@ BallotProtocol::setStateFromEnvelope(SCPEnvelope const& e)
{
auto const& prep = pl.prepare();
auto const& b = prep.ballot;
bumpToBallot(b);
bumpToBallot(b, true);
if (prep.prepared)
{
mPrepared = make_unique<SCPBallot>(*prep.prepared);
Expand All @@ -1643,7 +1684,7 @@ BallotProtocol::setStateFromEnvelope(SCPEnvelope const& e)
{
auto const& c = pl.confirm();
auto const& v = c.ballot.value;
bumpToBallot(c.ballot);
bumpToBallot(c.ballot, true);
mPrepared = make_unique<SCPBallot>(c.nPrepared, v);
mHighBallot = make_unique<SCPBallot>(c.nH, v);
mCommit = make_unique<SCPBallot>(c.nCommit, v);
Expand All @@ -1654,7 +1695,7 @@ BallotProtocol::setStateFromEnvelope(SCPEnvelope const& e)
{
auto const& ext = pl.externalize();
auto const& v = ext.commit.value;
bumpToBallot(SCPBallot(UINT32_MAX, v));
bumpToBallot(SCPBallot(UINT32_MAX, v), true);
mPrepared = make_unique<SCPBallot>(UINT32_MAX, v);
mHighBallot = make_unique<SCPBallot>(ext.nH, v);
mCommit = make_unique<SCPBallot>(ext.commit);
Expand Down Expand Up @@ -1818,7 +1859,11 @@ BallotProtocol::sendLatestEnvelope()
// emit current envelope if needed
if (mCurrentMessageLevel == 0 && mLastEnvelope && mSlot.isFullyValidated())
{
mSlot.getSCPDriver().emitEnvelope(*mLastEnvelope);
if (!mLastEnvelopeEmit || mLastEnvelope != mLastEnvelopeEmit)
{
mLastEnvelopeEmit = mLastEnvelope;
mSlot.getSCPDriver().emitEnvelope(*mLastEnvelopeEmit);
}
}
}

Expand Down
12 changes: 8 additions & 4 deletions src/scp/BallotProtocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,11 @@ class BallotProtocol

int mCurrentMessageLevel; // number of messages triggered in one run

std::unique_ptr<SCPEnvelope>
mLastEnvelope; // last envelope emitted by this node
std::shared_ptr<SCPEnvelope>
mLastEnvelope; // last envelope generated by this node

std::shared_ptr<SCPEnvelope>
mLastEnvelopeEmit; // last envelope emitted by this node

public:
BallotProtocol(Slot& slot);
Expand Down Expand Up @@ -99,7 +102,7 @@ class BallotProtocol
SCPEnvelope*
getLastMessageSend() const
{
return mLastEnvelope.get();
return mLastEnvelopeEmit.get();
}

void setStateFromEnvelope(SCPEnvelope const& e);
Expand Down Expand Up @@ -227,7 +230,8 @@ class BallotProtocol
// helper function that updates the current ballot
// this is the lowest level method to update the current ballot and as
// such doesn't do any validation
void bumpToBallot(SCPBallot const& ballot);
// check: verifies that ballot is greater than old one
void bumpToBallot(SCPBallot const& ballot, bool check);

// switch the local node to the given ballot's value
// with the assumption that the ballot is more recent than the one
Expand Down
Loading

0 comments on commit f5b9f4e

Please sign in to comment.