Skip to content

Commit

Permalink
Merge pull request #972 from graydon/inferquorum
Browse files Browse the repository at this point in the history
Inferquorum

Reviewed-by: jedmccaleb
  • Loading branch information
latobarita committed Jan 14, 2016
2 parents 3e6c7f1 + 89763c2 commit 2f015f6
Show file tree
Hide file tree
Showing 15 changed files with 1,690 additions and 15 deletions.
4 changes: 4 additions & 0 deletions src/history/HistoryManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
// of this distribution or at http://www.apache.org/licenses/LICENSE-2.0

#include "overlay/StellarXDR.h"
#include "history/InferredQuorum.h"
#include "history/HistoryArchive.h"
#include <functional>
#include <memory>
Expand Down Expand Up @@ -350,6 +351,9 @@ class HistoryManager
// Return the HistoryArchiveState of the LedgerManager's LCL
virtual HistoryArchiveState getLastClosedHistoryArchiveState() const = 0;

// Infer a quorum set by reading SCP messages in history archives.
virtual InferredQuorum inferQuorum() = 0;

// Return the name of the HistoryManager's tmpdir (used for storing files in
// transit).
virtual std::string const& getTmpDir() = 0;
Expand Down
16 changes: 16 additions & 0 deletions src/history/HistoryManagerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,22 @@ HistoryManagerImpl::getLastClosedHistoryArchiveState() const
return HistoryArchiveState(seq, bl);
}

InferredQuorum
HistoryManagerImpl::inferQuorum()
{
InferredQuorum iq;
bool done = false;
auto handler = [&done](asio::error_code const& ec) { done = true; };
CLOG(INFO, "History") << "Starting FetchRecentQsetsWork";
mApp.getWorkManager().addWork<FetchRecentQsetsWork>(iq, handler);
mApp.getWorkManager().advanceChildren();
while (!done)
{
mApp.getClock().crank(false);
}
return iq;
}

bool
HistoryManagerImpl::hasAnyWritableHistoryArchive()
{
Expand Down
2 changes: 2 additions & 0 deletions src/history/HistoryManagerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ class HistoryManagerImpl : public HistoryManager

HistoryArchiveState getLastClosedHistoryArchiveState() const override;

InferredQuorum inferQuorum() override;

std::string const& getTmpDir() override;

std::string localFilename(std::string const& basename) override;
Expand Down
80 changes: 80 additions & 0 deletions src/history/HistoryWork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1872,4 +1872,84 @@ CatchupRecentWork::onFailureRaise()
asio::error_code ec = std::make_error_code(std::errc::timed_out);
mEndHandler(ec, HistoryManager::CATCHUP_RECENT, mFirstVerified);
}

///////////////////////////////////////////////////////////////////////////
// FetchRecentQsetsWork
///////////////////////////////////////////////////////////////////////////
FetchRecentQsetsWork::FetchRecentQsetsWork(Application& app, WorkParent& parent,
InferredQuorum& inferredQuorum,
handler endHandler)
: Work(app, parent, "fetch-recent-qsets")
, mEndHandler(endHandler)
, mInferredQuorum(inferredQuorum)
{
}

void
FetchRecentQsetsWork::onReset()
{
clearChildren();
mDownloadSCPMessagesWork.reset();
mDownloadDir =
make_unique<TmpDir>(mApp.getTmpDirManager().tmpDir(getUniqueName()));
}

void
FetchRecentQsetsWork::onFailureRaise()
{
asio::error_code ec = std::make_error_code(std::errc::timed_out);
mEndHandler(ec);
}

Work::State
FetchRecentQsetsWork::onSuccess()
{
// Phase 1: fetch remote history archive state
if (!mGetHistoryArchiveStateWork)
{
mGetHistoryArchiveStateWork = addWork<GetHistoryArchiveStateWork>(
mRemoteState, 0, std::chrono::seconds(0));
return WORK_PENDING;
}

// Phase 2: download some SCP messages; for now we just pull the past
// 100 checkpoints = 9 hours of history. A more sophisticated view
// would survey longer time periods at lower resolution.
uint32_t numCheckpoints = 100;
uint32_t step = mApp.getHistoryManager().getCheckpointFrequency();
uint32_t window = numCheckpoints * step;
uint32_t lastSeq = mRemoteState.currentLedger;
uint32_t firstSeq = lastSeq < window ? (step-1) : (lastSeq-window);

if (!mDownloadSCPMessagesWork)
{
CLOG(INFO, "History") << "Downloading recent SCP messages: ["
<< firstSeq << ", " << lastSeq << "]";
mDownloadSCPMessagesWork = addWork<BatchDownloadWork>(
firstSeq, lastSeq, HISTORY_FILE_TYPE_SCP, *mDownloadDir);
return WORK_PENDING;
}

// Phase 3: extract the qsets.
for (auto i = firstSeq; i <= lastSeq; i += step)
{
CLOG(INFO, "History") << "Scanning for QSets in checkpoint: " << i;
XDRInputFileStream in;
FileTransferInfo fi(*mDownloadDir, HISTORY_FILE_TYPE_SCP, i);
in.open(fi.localPath_nogz());
SCPHistoryEntry tmp;
while (in && in.readOne(tmp))
{
mInferredQuorum.noteSCPHistory(tmp);
}
}

asio::error_code ec;
mEndHandler(ec);
return WORK_SUCCESS;
}




}
21 changes: 21 additions & 0 deletions src/history/HistoryWork.h
Original file line number Diff line number Diff line change
Expand Up @@ -450,4 +450,25 @@ class RepairMissingBucketsWork : public BucketDownloadWork
void onFailureRaise() override;
Work::State onSuccess() override;
};

class FetchRecentQsetsWork : public Work
{

typedef std::function<void(asio::error_code const& ec)> handler;
handler mEndHandler;
std::unique_ptr<TmpDir> mDownloadDir;
InferredQuorum& mInferredQuorum;
HistoryArchiveState mRemoteState;
std::shared_ptr<Work> mGetHistoryArchiveStateWork;
std::shared_ptr<Work> mDownloadSCPMessagesWork;

public:
FetchRecentQsetsWork(Application& app, WorkParent& parent,
InferredQuorum& iq,
handler endHandler);
void onReset() override;
void onFailureRaise() override;
Work::State onSuccess() override;
};

}
Loading

0 comments on commit 2f015f6

Please sign in to comment.