diff --git a/rust-toolchain.toml b/rust-toolchain.toml index 1de01fa45c..2e2b8c8521 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,2 +1,2 @@ [toolchain] -channel = "1.81.0" +channel = "1.82.0" diff --git a/src/bucket/BucketManagerImpl.cpp b/src/bucket/BucketManagerImpl.cpp index a953bef124..b228f93e83 100644 --- a/src/bucket/BucketManagerImpl.cpp +++ b/src/bucket/BucketManagerImpl.cpp @@ -34,6 +34,7 @@ #include #include +#include "history/FileTransferInfo.h" #include "medida/counter.h" #include "medida/meter.h" #include "medida/metrics_registry.h" @@ -134,6 +135,17 @@ BucketManagerImpl::initialize() mApp.getConfig().QUERY_SNAPSHOT_LEDGERS); } } + + // Create persistent publish directories + // Note: HISTORY_FILE_TYPE_BUCKET is already tracked by BucketList in + // BUCKET_DIR_PATH, HISTORY_FILE_TYPE_SCP is persisted to the database + // so create the remaining ledger header, transactions and results + // directories + createPublishDir(FileType::HISTORY_FILE_TYPE_LEDGER, mApp.getConfig()); + createPublishDir(FileType::HISTORY_FILE_TYPE_TRANSACTIONS, + mApp.getConfig()); + createPublishDir(FileType::HISTORY_FILE_TYPE_RESULTS, mApp.getConfig()); + HistoryManager::createPublishQueueDir(mApp.getConfig()); } void diff --git a/src/bucket/test/BucketManagerTests.cpp b/src/bucket/test/BucketManagerTests.cpp index fd7653205b..fc256c4cc2 100644 --- a/src/bucket/test/BucketManagerTests.cpp +++ b/src/bucket/test/BucketManagerTests.cpp @@ -579,7 +579,6 @@ TEST_CASE_VERSIONS( auto& hm = app->getHistoryManager(); auto& bm = app->getBucketManager(); auto& bl = bm.getBucketList(); - auto& lm = app->getLedgerManager(); hm.setPublicationEnabled(false); app->getHistoryArchiveManager().initializeHistoryArchive( tcfg.getArchiveDirName()); diff --git a/src/catchup/ApplyCheckpointWork.cpp b/src/catchup/ApplyCheckpointWork.cpp index 86f6bf01b5..39180e9cdb 100644 --- a/src/catchup/ApplyCheckpointWork.cpp +++ b/src/catchup/ApplyCheckpointWork.cpp @@ -85,8 +85,9 @@ ApplyCheckpointWork::openInputFiles() ZoneScoped; mHdrIn.close(); mTxIn.close(); - FileTransferInfo hi(mDownloadDir, HISTORY_FILE_TYPE_LEDGER, mCheckpoint); - FileTransferInfo ti(mDownloadDir, HISTORY_FILE_TYPE_TRANSACTIONS, + FileTransferInfo hi(mDownloadDir, FileType::HISTORY_FILE_TYPE_LEDGER, + mCheckpoint); + FileTransferInfo ti(mDownloadDir, FileType::HISTORY_FILE_TYPE_TRANSACTIONS, mCheckpoint); CLOG_DEBUG(History, "Replaying ledger headers from {}", hi.localPath_nogz()); diff --git a/src/catchup/CatchupManager.h b/src/catchup/CatchupManager.h index 61c9b5821f..dc5655f05d 100644 --- a/src/catchup/CatchupManager.h +++ b/src/catchup/CatchupManager.h @@ -6,6 +6,7 @@ #include "catchup/CatchupWork.h" #include "herder/LedgerCloseData.h" +#include "history/FileTransferInfo.h" #include #include #include @@ -115,6 +116,6 @@ class CatchupManager virtual void ledgerChainsVerificationFailed(uint32_t num = 1) = 0; virtual void bucketsApplied(uint32_t num = 1) = 0; virtual void txSetsApplied(uint32_t num = 1) = 0; - virtual void fileDownloaded(std::string type, uint32_t num = 1) = 0; + virtual void fileDownloaded(FileType type, uint32_t num = 1) = 0; }; } diff --git a/src/catchup/CatchupManagerImpl.cpp b/src/catchup/CatchupManagerImpl.cpp index b1eca69dd7..2070989de8 100644 --- a/src/catchup/CatchupManagerImpl.cpp +++ b/src/catchup/CatchupManagerImpl.cpp @@ -495,26 +495,27 @@ CatchupManagerImpl::txSetsApplied(uint32_t num) } void -CatchupManagerImpl::fileDownloaded(std::string type, uint32_t num) +CatchupManagerImpl::fileDownloaded(FileType type, uint32_t num) { - if (type == HISTORY_FILE_TYPE_BUCKET) + if (type == FileType::HISTORY_FILE_TYPE_BUCKET) { mMetrics.mBucketsDownloaded += num; } - else if (type == HISTORY_FILE_TYPE_LEDGER) + else if (type == FileType::HISTORY_FILE_TYPE_LEDGER) { mMetrics.mCheckpointsDownloaded += num; } - else if (type == HISTORY_FILE_TYPE_TRANSACTIONS) + else if (type == FileType::HISTORY_FILE_TYPE_TRANSACTIONS) { mMetrics.mTxSetsDownloaded += num; } - else if (type != HISTORY_FILE_TYPE_RESULTS && type != HISTORY_FILE_TYPE_SCP) + else if (type != FileType::HISTORY_FILE_TYPE_RESULTS && + type != FileType::HISTORY_FILE_TYPE_SCP) { throw std::runtime_error(fmt::format( FMT_STRING( "CatchupManagerImpl::fileDownloaded unknown file type {}"), - type)); + typeString(type))); } } diff --git a/src/catchup/CatchupManagerImpl.h b/src/catchup/CatchupManagerImpl.h index 8c04a344aa..917f5a3a4a 100644 --- a/src/catchup/CatchupManagerImpl.h +++ b/src/catchup/CatchupManagerImpl.h @@ -94,7 +94,7 @@ class CatchupManagerImpl : public CatchupManager void ledgerChainsVerificationFailed(uint32_t num) override; void bucketsApplied(uint32_t num) override; void txSetsApplied(uint32_t num) override; - void fileDownloaded(std::string type, uint32_t num) override; + void fileDownloaded(FileType type, uint32_t num) override; #ifdef BUILD_TESTS std::map const& diff --git a/src/catchup/CatchupWork.cpp b/src/catchup/CatchupWork.cpp index 760c15436c..20dfe65359 100644 --- a/src/catchup/CatchupWork.cpp +++ b/src/catchup/CatchupWork.cpp @@ -166,8 +166,8 @@ CatchupWork::downloadVerifyLedgerChain(CatchupRange const& catchupRange, // Batch download has default retries ("a few") to ensure we rotate through // archives auto getLedgers = std::make_shared( - mApp, checkpointRange, HISTORY_FILE_TYPE_LEDGER, *mDownloadDir, - mArchive); + mApp, checkpointRange, FileType::HISTORY_FILE_TYPE_LEDGER, + *mDownloadDir, mArchive); mRangeEndPromise = std::promise(); mRangeEndFuture = mRangeEndPromise.get_future().share(); mRangeEndPromise.set_value(rangeEnd); @@ -598,8 +598,8 @@ CatchupWork::runCatchupStep() auto checkpoint = app.getHistoryManager().checkpointContainingLedger( ledgerSeq); - auto ft = - FileTransferInfo(dir, HISTORY_FILE_TYPE_LEDGER, checkpoint); + auto ft = FileTransferInfo( + dir, FileType::HISTORY_FILE_TYPE_LEDGER, checkpoint); return setHerderStateTo(ft, ledgerSeq, app); }; diff --git a/src/catchup/DownloadApplyTxsWork.cpp b/src/catchup/DownloadApplyTxsWork.cpp index 1746060d69..90538ce3fa 100644 --- a/src/catchup/DownloadApplyTxsWork.cpp +++ b/src/catchup/DownloadApplyTxsWork.cpp @@ -46,8 +46,9 @@ DownloadApplyTxsWork::yieldMoreWork() CLOG_INFO(History, "Downloading, unzipping and applying {} for checkpoint {}", - HISTORY_FILE_TYPE_TRANSACTIONS, mCheckpointToQueue); - FileTransferInfo ft(mDownloadDir, HISTORY_FILE_TYPE_TRANSACTIONS, + typeString(FileType::HISTORY_FILE_TYPE_TRANSACTIONS), + mCheckpointToQueue); + FileTransferInfo ft(mDownloadDir, FileType::HISTORY_FILE_TYPE_TRANSACTIONS, mCheckpointToQueue); auto getAndUnzip = std::make_shared(mApp, ft, mArchive); @@ -67,8 +68,8 @@ DownloadApplyTxsWork::yieldMoreWork() auto archive = getFile->getArchive(); if (archive) { - FileTransferInfo ti(dir, HISTORY_FILE_TYPE_TRANSACTIONS, - checkpoint); + FileTransferInfo ti( + dir, FileType::HISTORY_FILE_TYPE_TRANSACTIONS, checkpoint); CLOG_ERROR(History, "Archive {} maybe contains corrupt file {}", archive->getName(), ti.remoteName()); } diff --git a/src/catchup/VerifyLedgerChainWork.cpp b/src/catchup/VerifyLedgerChainWork.cpp index fc496883a3..a5603fd1e0 100644 --- a/src/catchup/VerifyLedgerChainWork.cpp +++ b/src/catchup/VerifyLedgerChainWork.cpp @@ -166,7 +166,7 @@ VerifyLedgerChainWork::verifyHistoryOfSingleCheckpoint() // trusted hash passed in. If LCL is reached, verify that it agrees with // the chain. - FileTransferInfo ft(mDownloadDir, HISTORY_FILE_TYPE_LEDGER, + FileTransferInfo ft(mDownloadDir, FileType::HISTORY_FILE_TYPE_LEDGER, mCurrCheckpoint); XDRInputFileStream hdrIn; hdrIn.open(ft.localPath_nogz()); @@ -185,6 +185,8 @@ VerifyLedgerChainWork::verifyHistoryOfSingleCheckpoint() CLOG_DEBUG(History, "Verifying ledger headers from {} for checkpoint {}", ft.localPath_nogz(), mCurrCheckpoint); + auto const& hm = mApp.getHistoryManager(); + while (hdrIn) { try @@ -243,6 +245,15 @@ VerifyLedgerChainWork::verifyHistoryOfSingleCheckpoint() if (beginCheckpoint) { + if (!hm.isFirstLedgerInCheckpoint(curr.header.ledgerSeq)) + { + CLOG_ERROR( + History, "Checkpoint did not start with {} - got {}", + hm.firstLedgerInCheckpointContaining(curr.header.ledgerSeq), + curr.header.ledgerSeq); + return HistoryManager::VERIFY_STATUS_ERR_MISSING_ENTRIES; + } + // At the beginning of checkpoint, we can't verify the link with // previous ledger, so at least verify that header content hashes to // correct value @@ -301,8 +312,8 @@ VerifyLedgerChainWork::verifyHistoryOfSingleCheckpoint() // or at mRange.last() if history chain file was valid and we // reached last ledger in the range. Any other ledger here means // that file is corrupted. - CLOG_ERROR(History, "History chain did not end with {} or {}", - mCurrCheckpoint, mRange.last()); + CLOG_ERROR(History, "History chain did not end with {} or {} - got {}", + mCurrCheckpoint, mRange.last(), curr.header.ledgerSeq); return HistoryManager::VERIFY_STATUS_ERR_MISSING_ENTRIES; } diff --git a/src/database/Database.cpp b/src/database/Database.cpp index e06f1ff016..4ab34363cc 100644 --- a/src/database/Database.cpp +++ b/src/database/Database.cpp @@ -63,7 +63,7 @@ bool Database::gDriversRegistered = false; // smallest schema version supported static unsigned long const MIN_SCHEMA_VERSION = 21; -static unsigned long const SCHEMA_VERSION = 22; +static unsigned long const SCHEMA_VERSION = 23; // These should always match our compiled version precisely, since we are // using a bundled version to get access to carray(). But in case someone @@ -213,7 +213,11 @@ Database::applySchemaUpgrade(unsigned long vers) switch (vers) { case 22: - deprecateTransactionFeeHistory(*this); + dropSupportTransactionFeeHistory(*this); + break; + case 23: + mApp.getHistoryManager().dropSQLBasedPublish(); + Upgrades::dropSupportUpgradeHistory(*this); break; default: throw std::runtime_error("Unknown DB schema version"); @@ -471,7 +475,9 @@ Database::initialize() PersistentState::dropAll(*this); ExternalQueue::dropAll(*this); LedgerHeaderUtils::dropAll(*this); - dropTransactionHistory(*this, mApp.getConfig()); + // No need to re-create txhistory, will be dropped during + // upgradeToCurrentSchema anyway + dropSupportTxHistory(*this); HistoryManager::dropAll(*this); HerderPersistence::dropAll(*this); BanManager::dropAll(*this); diff --git a/src/database/DatabaseUtils.cpp b/src/database/DatabaseUtils.cpp index 4cae92d704..9bf9ca1fec 100644 --- a/src/database/DatabaseUtils.cpp +++ b/src/database/DatabaseUtils.cpp @@ -30,15 +30,5 @@ deleteOldEntriesHelper(soci::session& sess, uint32_t ledgerSeq, uint32_t count, << " <= " << m; } } - -void -deleteNewerEntriesHelper(soci::session& sess, uint32_t ledgerSeq, - std::string const& tableName, - std::string const& ledgerSeqColumn) -{ - sess << "DELETE FROM " << tableName << " WHERE " << ledgerSeqColumn - << " >= " << ledgerSeq; -} - } } \ No newline at end of file diff --git a/src/database/DatabaseUtils.h b/src/database/DatabaseUtils.h index 79a9257408..e36b67e07c 100644 --- a/src/database/DatabaseUtils.h +++ b/src/database/DatabaseUtils.h @@ -13,9 +13,5 @@ namespace DatabaseUtils void deleteOldEntriesHelper(soci::session& sess, uint32_t ledgerSeq, uint32_t count, std::string const& tableName, std::string const& ledgerSeqColumn); - -void deleteNewerEntriesHelper(soci::session& sess, uint32_t ledgerSeq, - std::string const& tableName, - std::string const& ledgerSeqColumn); } } diff --git a/src/herder/HerderPersistence.h b/src/herder/HerderPersistence.h index 354e58c013..e5d44b0cc5 100644 --- a/src/herder/HerderPersistence.h +++ b/src/herder/HerderPersistence.h @@ -49,8 +49,6 @@ class HerderPersistence static void dropAll(Database& db); static void deleteOldEntries(Database& db, uint32_t ledgerSeq, uint32_t count); - static void deleteNewerEntries(Database& db, uint32_t ledgerSeq); - static void createQuorumTrackingTable(soci::session& sess); }; } diff --git a/src/herder/HerderPersistenceImpl.cpp b/src/herder/HerderPersistenceImpl.cpp index 72c62dbbde..bd743e8072 100644 --- a/src/herder/HerderPersistenceImpl.cpp +++ b/src/herder/HerderPersistenceImpl.cpp @@ -416,14 +416,4 @@ HerderPersistence::deleteOldEntries(Database& db, uint32_t ledgerSeq, DatabaseUtils::deleteOldEntriesHelper(db.getSession(), ledgerSeq, count, "scpquorums", "lastledgerseq"); } - -void -HerderPersistence::deleteNewerEntries(Database& db, uint32_t ledgerSeq) -{ - ZoneScoped; - DatabaseUtils::deleteNewerEntriesHelper(db.getSession(), ledgerSeq, - "scphistory", "ledgerseq"); - DatabaseUtils::deleteNewerEntriesHelper(db.getSession(), ledgerSeq, - "scpquorums", "lastledgerseq"); -} } diff --git a/src/herder/Upgrades.cpp b/src/herder/Upgrades.cpp index 12f7bb0737..689b7d0139 100644 --- a/src/herder/Upgrades.cpp +++ b/src/herder/Upgrades.cpp @@ -703,53 +703,9 @@ Upgrades::dropAll(Database& db) } void -Upgrades::storeUpgradeHistory(Database& db, uint32_t ledgerSeq, - LedgerUpgrade const& upgrade, - LedgerEntryChanges const& changes, int index) +Upgrades::dropSupportUpgradeHistory(Database& db) { - ZoneScoped; - xdr::opaque_vec<> upgradeContent(xdr::xdr_to_opaque(upgrade)); - std::string upgradeContent64 = decoder::encode_b64(upgradeContent); - - xdr::opaque_vec<> upgradeChanges(xdr::xdr_to_opaque(changes)); - std::string upgradeChanges64 = decoder::encode_b64(upgradeChanges); - - auto prep = db.getPreparedStatement( - "INSERT INTO upgradehistory " - "(ledgerseq, upgradeindex, upgrade, changes) VALUES " - "(:seq, :upgradeindex, :upgrade, :changes)"); - - auto& st = prep.statement(); - st.exchange(soci::use(ledgerSeq)); - st.exchange(soci::use(index)); - st.exchange(soci::use(upgradeContent64)); - st.exchange(soci::use(upgradeChanges64)); - st.define_and_bind(); - { - ZoneNamedN(insertUpgradeZone, "insert upgradehistory", true); - st.execute(true); - } - - if (st.get_affected_rows() != 1) - { - throw std::runtime_error("Could not update data in SQL"); - } -} - -void -Upgrades::deleteOldEntries(Database& db, uint32_t ledgerSeq, uint32_t count) -{ - ZoneScoped; - DatabaseUtils::deleteOldEntriesHelper(db.getSession(), ledgerSeq, count, - "upgradehistory", "ledgerseq"); -} - -void -Upgrades::deleteNewerEntries(Database& db, uint32_t ledgerSeq) -{ - ZoneScoped; - DatabaseUtils::deleteNewerEntriesHelper(db.getSession(), ledgerSeq, - "upgradehistory", "ledgerseq"); + db.getSession() << "DROP TABLE IF EXISTS upgradehistory"; } static void diff --git a/src/herder/Upgrades.h b/src/herder/Upgrades.h index 036b281991..7645eb7ed2 100644 --- a/src/herder/Upgrades.h +++ b/src/herder/Upgrades.h @@ -119,14 +119,7 @@ class Upgrades uint64_t time, bool& updated); static void dropAll(Database& db); - - static void storeUpgradeHistory(Database& db, uint32_t ledgerSeq, - LedgerUpgrade const& upgrade, - LedgerEntryChanges const& changes, - int index); - static void deleteOldEntries(Database& db, uint32_t ledgerSeq, - uint32_t count); - static void deleteNewerEntries(Database& db, uint32_t ledgerSeq); + static void dropSupportUpgradeHistory(Database& db); private: UpgradeParameters mParams; diff --git a/src/history/CheckpointBuilder.cpp b/src/history/CheckpointBuilder.cpp new file mode 100644 index 0000000000..a3e89e0312 --- /dev/null +++ b/src/history/CheckpointBuilder.cpp @@ -0,0 +1,290 @@ +#include "history/CheckpointBuilder.h" +#include "history/HistoryArchiveManager.h" +#include "history/HistoryManager.h" +#include "ledger/LedgerManager.h" +#include "main/Application.h" +#include "util/XDRStream.h" + +namespace stellar +{ +void +CheckpointBuilder::ensureOpen(uint32_t ledgerSeq) +{ + ZoneScoped; + releaseAssert(mApp.getHistoryArchiveManager().publishEnabled()); + if (!mOpen) + { + releaseAssert(!mTxResults); + releaseAssert(!mTxs); + releaseAssert(!mLedgerHeaders); + + auto checkpoint = + mApp.getHistoryManager().checkpointContainingLedger(ledgerSeq); + auto res = FileTransferInfo(FileType::HISTORY_FILE_TYPE_RESULTS, + checkpoint, mApp.getConfig()); + auto txs = FileTransferInfo(FileType::HISTORY_FILE_TYPE_TRANSACTIONS, + checkpoint, mApp.getConfig()); + auto ledger = FileTransferInfo(FileType::HISTORY_FILE_TYPE_LEDGER, + checkpoint, mApp.getConfig()); + + // Open files in append mode + mTxResults = std::make_unique( + mApp.getClock().getIOContext(), /* fsync*/ true); + mTxResults->open(res.localPath_nogz_dirty()); + + mTxs = std::make_unique( + mApp.getClock().getIOContext(), /* fsync*/ true); + mTxs->open(txs.localPath_nogz_dirty()); + + mLedgerHeaders = std::make_unique( + mApp.getClock().getIOContext(), /* fsync*/ true); + mLedgerHeaders->open(ledger.localPath_nogz_dirty()); + mOpen = true; + } +} + +void +CheckpointBuilder::checkpointComplete(uint32_t checkpoint) +{ + ZoneScoped; + releaseAssert(mApp.getHistoryArchiveManager().publishEnabled()); + releaseAssert( + mApp.getHistoryManager().isLastLedgerInCheckpoint(checkpoint)); + + // This will close and reset the streams + mLedgerHeaders.reset(); + mTxs.reset(); + mTxResults.reset(); + mOpen = false; + + auto maybeRename = [&](FileTransferInfo const& ft) { + if (fs::exists(ft.localPath_nogz())) + { + CLOG_INFO(History, "File {} already exists, skipping rename", + ft.localPath_nogz()); + } + else if (!fs::durableRename( + ft.localPath_nogz_dirty(), ft.localPath_nogz(), + getPublishHistoryDir(ft.getType(), mApp.getConfig()) + .string())) + { + throw std::runtime_error( + fmt::format("Failed to rename checkpoint file {}", + ft.localPath_nogz_dirty())); + } + }; + + auto res = FileTransferInfo(FileType::HISTORY_FILE_TYPE_RESULTS, checkpoint, + mApp.getConfig()); + auto txs = FileTransferInfo(FileType::HISTORY_FILE_TYPE_TRANSACTIONS, + checkpoint, mApp.getConfig()); + auto ledger = FileTransferInfo(FileType::HISTORY_FILE_TYPE_LEDGER, + checkpoint, mApp.getConfig()); + maybeRename(res); + maybeRename(txs); + maybeRename(ledger); +} + +CheckpointBuilder::CheckpointBuilder(Application& app) : mApp(app) +{ +} + +void +CheckpointBuilder::appendTransactionSet(uint32_t ledgerSeq, + TxSetXDRFrameConstPtr const& txSet, + TransactionResultSet const& resultSet, + bool skipStartupCheck) +{ + ZoneScoped; + TransactionHistoryEntry txs; + txs.ledgerSeq = ledgerSeq; + + if (txSet->isGeneralizedTxSet()) + { + txs.ext.v(1); + txSet->toXDR(txs.ext.generalizedTxSet()); + } + else + { + txSet->toXDR(txs.txSet); + } + appendTransactionSet(ledgerSeq, txs, resultSet); +} + +void +CheckpointBuilder::appendTransactionSet(uint32_t ledgerSeq, + TransactionHistoryEntry const& txSet, + TransactionResultSet const& resultSet, + bool skipStartupCheck) +{ + ZoneScoped; + if (!mStartupValidationComplete && + ledgerSeq != LedgerManager::GENESIS_LEDGER_SEQ && !skipStartupCheck) + { + throw std::runtime_error("Startup validation not performed"); + } + ensureOpen(ledgerSeq); + + if (!resultSet.results.empty()) + { + TransactionHistoryResultEntry results; + results.ledgerSeq = ledgerSeq; + results.txResultSet = resultSet; + mTxResults->durableWriteOne(results); + mTxs->durableWriteOne(txSet); + } +} + +void +CheckpointBuilder::appendLedgerHeader(LedgerHeader const& header, + bool skipStartupCheck) +{ + ZoneScoped; + if (!mStartupValidationComplete && + header.ledgerSeq != LedgerManager::GENESIS_LEDGER_SEQ && + !skipStartupCheck) + { + throw std::runtime_error("Startup validation not performed"); + } + ensureOpen(header.ledgerSeq); + + LedgerHeaderHistoryEntry lhe; + lhe.header = header; + lhe.hash = xdrSha256(header); + mLedgerHeaders->writeOne(lhe); + mLedgerHeaders->flush(); +} + +uint32_t +getLedgerSeq(TransactionHistoryEntry const& entry) +{ + return entry.ledgerSeq; +} + +uint32_t +getLedgerSeq(TransactionHistoryResultEntry const& entry) +{ + return entry.ledgerSeq; +} + +uint32_t +getLedgerSeq(LedgerHeaderHistoryEntry const& entry) +{ + return entry.header.ledgerSeq; +} + +void +CheckpointBuilder::cleanup(uint32_t lcl) +{ + if (mStartupValidationComplete) + { + return; + } + + mTxResults.reset(); + mTxs.reset(); + mLedgerHeaders.reset(); + mOpen = false; + auto const& cfg = mApp.getConfig(); + + auto checkpoint = mApp.getHistoryManager().checkpointContainingLedger(lcl); + auto res = + FileTransferInfo(FileType::HISTORY_FILE_TYPE_RESULTS, checkpoint, cfg); + auto txs = FileTransferInfo(FileType::HISTORY_FILE_TYPE_TRANSACTIONS, + checkpoint, cfg); + auto ledger = + FileTransferInfo(FileType::HISTORY_FILE_TYPE_LEDGER, checkpoint, cfg); + + auto tmpDir = + mApp.getBucketManager().getTmpDirManager().tmpDir("truncated"); + + auto recover = [&](FileTransferInfo const& ft, auto entry, + uint32_t enforceLCL) { + if (fs::exists(ft.localPath_nogz())) + { + // Make sure any new checkpoints are deleted + auto next = FileTransferInfo( + ft.getType(), + mApp.getHistoryManager().checkpointContainingLedger(checkpoint + + 1), + cfg); + CLOG_INFO(History, "Deleting next checkpoint files {}", + next.localPath_nogz_dirty()); + // This may fail if no files were created before core restarted + // (which is a valid behavior) + if (std::remove(next.localPath_nogz_dirty().c_str()) && + errno != ENOENT) + { + throw std::runtime_error( + fmt::format("Failed to delete next checkpoint file {}", + next.localPath_nogz_dirty())); + } + return; + } + + // Find a tmp file; any potentially invalid files _must_ be tmp files, + // because checkpoint files are finalized (renamed) only after ledger is + // committed. + std::filesystem::path dir = tmpDir.getName(); + std::filesystem::path tmpFile = dir / ft.baseName_nogz(); + { + auto out = XDROutputFileStream(mApp.getClock().getIOContext(), + /* fsync*/ true); + out.open(tmpFile); + XDRInputFileStream in; + in.open(ft.localPath_nogz_dirty()); + uint32_t lastReadLedgerSeq = 0; + while (in) + { + try + { + if (!in.readOne(entry)) + { + // If file doesn't end on LCL, it's corrupt + if (enforceLCL && lastReadLedgerSeq != lcl) + { + throw std::runtime_error( + fmt::format("Corrupt checkpoint file {}, ends " + "on ledger {}, LCL is {}", + ft.localPath_nogz_dirty(), + getLedgerSeq(entry), lcl)); + } + break; + } + lastReadLedgerSeq = getLedgerSeq(entry); + if (lastReadLedgerSeq > lcl) + { + CLOG_INFO(History, "Truncating {} at ledger {}", + ft.localPath_nogz_dirty(), lcl); + break; + } + out.writeOne(entry); + } + catch (xdr::xdr_runtime_error const& e) + { + // If we can't read the entry, we likely encountered a + // partial write + CLOG_INFO(History, + "Encountered partial write in {}, truncating", + ft.localPath_nogz_dirty()); + break; + } + } + } + + if (!fs::durableRename( + tmpFile.c_str(), ft.localPath_nogz_dirty().c_str(), + getPublishHistoryDir(ft.getType(), mApp.getConfig()).string())) + { + throw std::runtime_error("Failed to rename checkpoint file"); + } + }; + + // We can only require ledger header to be at LCL; transactions and results + // can have gaps (if there were empty ledgers) + recover(res, TransactionHistoryResultEntry{}, /* enforceLCL */ false); + recover(txs, TransactionHistoryEntry{}, /* enforceLCL */ false); + recover(ledger, LedgerHeaderHistoryEntry{}, /* enforceLCL */ true); + mStartupValidationComplete = true; +} +} \ No newline at end of file diff --git a/src/history/CheckpointBuilder.h b/src/history/CheckpointBuilder.h new file mode 100644 index 0000000000..9225e96ab0 --- /dev/null +++ b/src/history/CheckpointBuilder.h @@ -0,0 +1,85 @@ +// Copyright 2024 Stellar Development Foundation and contributors. Licensed +// under the Apache License, Version 2.0. See the COPYING file at the root +// of this distribution or at http://www.apache.org/licenses/LICENSE-2.0 + +#pragma once + +#include "herder/TxSetFrame.h" +#include "util/XDRStream.h" + +namespace stellar +{ +class Application; + +/* + * CheckpointBuilder manages the ACID transactional appending of confirmed +ledgers to sequential streams (transaction results, transactions, and ledger +headers) and the ACID queueing of completed checkpoints for historical purposes. +Completed checkpoints are published to history archives by PublihsWork class. + +* Atomicity is ensured through a write-tmp-file-then-rename cycle, where files +are first written as temporary "dirty" files and then renamed to their final +names to mark a successfully constructed checkpoint. Note that tmp files are +highly durable, and are fsynced on every write. This way on crash all publish +data is preserved and can be recovered to a valid checkpoint on restart. + +* All publish files are renamed to their final names _after_ ledger commits. +This ensures that final checkpoint files are always valid (and do not contain +uncommitted data). If a crash occurs before these files are finalized, but after +commit, core will finalize files on restart based on LCL. + +* Because of the requirement above, we can derive the following guarantees: + - Dirty publish files must always end at a ledger that is greater than or +equal to LCL in the database. + - Final publish files must always end at a ledger that is less than or equal +to LCL in the database. + +* Both publish queue and checkpoint files rely on the last committed ledger +sequence stored in SQL (LCL). If a crash or shutdown occurs, publish file state +may be left inconsistent with the DB. On restart, core automatically recovers +valid publish state based on the LCL. For publish queue files, this means +deleting any checkpoints with ledger sequence greater than LCL. For the rest of +the files, this means extracting a history prefix of a checkpoint up until and +including the LCL, and truncating the rest (including malformed data). If a +crash before commit occurrs on a first ledger in a checkpoint, the dirty file is +simply deleted on startup. This ensures core always starts with a valid publish +state. + */ +class CheckpointBuilder +{ + Application& mApp; +#ifdef BUILD_TESTS + public: +#endif + // Current checkpoint streams (unpublished data) + std::unique_ptr mTxResults; + std::unique_ptr mTxs; + std::unique_ptr mLedgerHeaders; + bool mOpen{false}; + bool mStartupValidationComplete{false}; + + void ensureOpen(uint32_t ledgerSeq); + + public: + CheckpointBuilder(Application& app); + void appendTransactionSet(uint32_t ledgerSeq, + TxSetXDRFrameConstPtr const& txSet, + TransactionResultSet const& resultSet, + bool skipStartupCheck = false); + void appendTransactionSet(uint32_t ledgerSeq, + TransactionHistoryEntry const& txSet, + TransactionResultSet const& resultSet, + bool skipStartupCheck = false); + void appendLedgerHeader(LedgerHeader const& header, + bool skipStartupCheck = false); + + // Cleanup publish files according to the latest LCL. + // Publish files might contain dirty data if a crash occurred after append + // but before commit in LedgerManagerImpl::closeLedger + void cleanup(uint32_t lcl); + + // Finalize checkpoint by renaming all temporary files to their canonical + // names. No-op if files are already rotated. + void checkpointComplete(uint32_t checkpoint); +}; +} \ No newline at end of file diff --git a/src/history/FileTransferInfo.cpp b/src/history/FileTransferInfo.cpp index 26b4dc4d79..44451daa05 100644 --- a/src/history/FileTransferInfo.cpp +++ b/src/history/FileTransferInfo.cpp @@ -8,29 +8,69 @@ namespace stellar { -char const* HISTORY_FILE_TYPE_BUCKET = "bucket"; -char const* HISTORY_FILE_TYPE_LEDGER = "ledger"; -char const* HISTORY_FILE_TYPE_TRANSACTIONS = "transactions"; -char const* HISTORY_FILE_TYPE_RESULTS = "results"; -char const* HISTORY_FILE_TYPE_SCP = "scp"; -std::string -FileTransferInfo::getLocalDir(TmpDir const& localRoot) const +void +createPath(std::filesystem::path path) { - ZoneScoped; - auto localDir = localRoot.getName(); - localDir += "/" + fs::remoteDir(mType, mHexDigits); + if (fs::exists(path)) + { + return; + } + int retries = 5; // Similarly to TmpDir, retry in case there were // OS-related errors (e.g. out of memory) or race conditions - while (!fs::mkpath(localDir)) + while (!fs::mkpath(path.string())) { if (--retries == 0) { - throw std::runtime_error("Unable to make a path " + localDir); + throw std::runtime_error("Unable to make a path " + path.string()); } std::this_thread::sleep_for(std::chrono::milliseconds(100)); } +} + +std::string +FileTransferInfo::getLocalDir(TmpDir const& localRoot) const +{ + ZoneScoped; + auto localDir = localRoot.getName(); + localDir += "/" + fs::remoteDir(typeString(mType), mHexDigits); + createPath(localDir); return localDir; } + +std::string +typeString(FileType type) +{ + switch (type) + { + case FileType::HISTORY_FILE_TYPE_BUCKET: + return "bucket"; + case FileType::HISTORY_FILE_TYPE_LEDGER: + return "ledger"; + case FileType::HISTORY_FILE_TYPE_TRANSACTIONS: + return "transactions"; + case FileType::HISTORY_FILE_TYPE_RESULTS: + return "results"; + case FileType::HISTORY_FILE_TYPE_SCP: + return "scp"; + } +} + +std::filesystem::path +createPublishDir(FileType type, Config const& cfg) +{ + std::filesystem::path root = cfg.BUCKET_DIR_PATH; + auto path = getPublishHistoryDir(type, cfg); + createPath(path); + return path; +} + +std::filesystem::path +getPublishHistoryDir(FileType type, Config const& cfg) +{ + std::filesystem::path root = cfg.BUCKET_DIR_PATH; + return root / HISTORY_LOCAL_DIR_NAME / typeString(type); +} } diff --git a/src/history/FileTransferInfo.h b/src/history/FileTransferInfo.h index 1e66b3f8ab..bfe830762e 100644 --- a/src/history/FileTransferInfo.h +++ b/src/history/FileTransferInfo.h @@ -6,6 +6,7 @@ #include "bucket/Bucket.h" #include "crypto/Hex.h" +#include "main/Config.h" #include "util/Fs.h" #include "util/Logging.h" #include "util/TmpDir.h" @@ -14,28 +15,37 @@ namespace stellar { -extern char const* HISTORY_FILE_TYPE_BUCKET; -extern char const* HISTORY_FILE_TYPE_LEDGER; -extern char const* HISTORY_FILE_TYPE_TRANSACTIONS; -extern char const* HISTORY_FILE_TYPE_RESULTS; -extern char const* HISTORY_FILE_TYPE_SCP; +std::string const HISTORY_LOCAL_DIR_NAME = "history"; +enum class FileType +{ + HISTORY_FILE_TYPE_BUCKET, + HISTORY_FILE_TYPE_LEDGER, + HISTORY_FILE_TYPE_TRANSACTIONS, + HISTORY_FILE_TYPE_RESULTS, + HISTORY_FILE_TYPE_SCP +}; + +std::string typeString(FileType type); +void createPath(std::filesystem::path path); +std::filesystem::path createPublishDir(FileType type, Config const& cfg); +std::filesystem::path getPublishHistoryDir(FileType type, Config const& cfg); class FileTransferInfo { - std::string mType; + FileType mType; std::string mHexDigits; std::string mLocalPath; std::string getLocalDir(TmpDir const& localRoot) const; public: FileTransferInfo(Bucket const& bucket) - : mType(HISTORY_FILE_TYPE_BUCKET) + : mType(FileType::HISTORY_FILE_TYPE_BUCKET) , mHexDigits(binToHex(bucket.getHash())) , mLocalPath(bucket.getFilename().string()) { } - FileTransferInfo(TmpDir const& snapDir, std::string const& snapType, + FileTransferInfo(TmpDir const& snapDir, FileType const& snapType, uint32_t checkpointLedger) : mType(snapType) , mHexDigits(fs::hexStr(checkpointLedger)) @@ -43,7 +53,16 @@ class FileTransferInfo { } - FileTransferInfo(TmpDir const& snapDir, std::string const& snapType, + FileTransferInfo(FileType const& snapType, uint32_t checkpointLedger, + Config const& cfg) + : mType(snapType) + , mHexDigits(fs::hexStr(checkpointLedger)) + , mLocalPath(getPublishHistoryDir(snapType, cfg).string() + "/" + + baseName_nogz()) + { + } + + FileTransferInfo(TmpDir const& snapDir, FileType const& snapType, std::string const& hexDigits) : mType(snapType) , mHexDigits(hexDigits) @@ -51,17 +70,30 @@ class FileTransferInfo { } - std::string + FileType getType() const { return mType; } + std::string + getTypeString() const + { + return typeString(mType); + } + std::string localPath_nogz() const { return mLocalPath; } + + std::string + localPath_nogz_dirty() const + { + return mLocalPath + ".dirty"; + } + std::string localPath_gz() const { @@ -76,7 +108,7 @@ class FileTransferInfo std::string baseName_nogz() const { - return fs::baseName(mType, mHexDigits, "xdr"); + return fs::baseName(getTypeString(), mHexDigits, "xdr"); } std::string baseName_gz() const @@ -92,12 +124,12 @@ class FileTransferInfo std::string remoteDir() const { - return fs::remoteDir(mType, mHexDigits); + return fs::remoteDir(getTypeString(), mHexDigits); } std::string remoteName() const { - return fs::remoteName(mType, mHexDigits, "xdr.gz"); + return fs::remoteName(getTypeString(), mHexDigits, "xdr.gz"); } }; } diff --git a/src/history/HistoryArchiveManager.cpp b/src/history/HistoryArchiveManager.cpp index 3e3cc24b2a..5f95c523a4 100644 --- a/src/history/HistoryArchiveManager.cpp +++ b/src/history/HistoryArchiveManager.cpp @@ -239,7 +239,7 @@ HistoryArchiveManager::initializeHistoryArchive(std::string const& arch) const } bool -HistoryArchiveManager::hasAnyWritableHistoryArchive() const +HistoryArchiveManager::publishEnabled() const { return std::any_of(std::begin(mArchives), std::end(mArchives), [](std::shared_ptr const& x) { diff --git a/src/history/HistoryArchiveManager.h b/src/history/HistoryArchiveManager.h index a5c77b7b89..de29e704ee 100644 --- a/src/history/HistoryArchiveManager.h +++ b/src/history/HistoryArchiveManager.h @@ -43,7 +43,7 @@ class HistoryArchiveManager // Returns whether or not the HistoryManager has any writable history // archives (those configured with both a `get` and `put` command). - bool hasAnyWritableHistoryArchive() const; + bool publishEnabled() const; // Returns history archive with given name or nullptr. std::shared_ptr diff --git a/src/history/HistoryManager.h b/src/history/HistoryManager.h index d69f1c1bd8..eff457ee64 100644 --- a/src/history/HistoryManager.h +++ b/src/history/HistoryManager.h @@ -4,6 +4,7 @@ // under the Apache License, Version 2.0. See the COPYING file at the root // of this distribution or at http://www.apache.org/licenses/LICENSE-2.0 +#include "herder/TxSetFrame.h" #include "history/HistoryArchive.h" #include "overlay/StellarXDR.h" #include "util/GlobalChecks.h" @@ -208,6 +209,8 @@ class HistoryManager // Initialize DB table for persistent publishing queue. static void dropAll(Database& db); + static std::filesystem::path publishQueuePath(Config const& cfg); + static void createPublishQueueDir(Config const& cfg); // Checkpoints are made every getCheckpointFrequency() ledgers. // This should normally be a constant (64) but in testing cases @@ -331,6 +334,13 @@ class HistoryManager // Returns the number of publishes initiated. virtual size_t publishQueuedHistory() = 0; + // Prepare checkpoint files for publishing + virtual void maybeCheckpointComplete() = 0; + + // Migrate SQL-based publish queue to the new file format + // (one-time call during database schema upgrade path) + virtual void dropSQLBasedPublish() = 0; + // Return the set of buckets referenced by the persistent (DB) publish // queue that are not present in the BucketManager. These need to be // fetched from somewhere before publishing can begin again. @@ -355,8 +365,18 @@ class HistoryManager std::vector const& originalBuckets, bool success) = 0; - // clear the publish queue for any ledgers more recent than ledgerSeq - virtual void deleteCheckpointsNewerThan(uint32_t ledgerSeq) = 0; + virtual void + appendTransactionSet(uint32_t ledgerSeq, TxSetXDRFrameConstPtr const& txSet, + TransactionResultSet const& resultSet) = 0; + virtual void appendLedgerHeader(LedgerHeader const& header) = 0; + + // On startup, restore checkpoint files based on the last committed LCL + virtual void restoreCheckpoint(uint32_t lcl) = 0; + + // Cleanup published files. If core is reset to genesis, any unpublished + // files will be cleaned by removal of the buckets directory. + virtual void deletePublishedFiles(uint32_t ledgerSeq, + Config const& cfg) = 0; // Return the name of the HistoryManager's tmpdir (used for storing files in // transit). diff --git a/src/history/HistoryManagerImpl.cpp b/src/history/HistoryManagerImpl.cpp index 6eb8a257a3..0564ff9ab8 100644 --- a/src/history/HistoryManagerImpl.cpp +++ b/src/history/HistoryManagerImpl.cpp @@ -21,6 +21,7 @@ #include "historywork/PutSnapshotFilesWork.h" #include "historywork/ResolveSnapshotWork.h" #include "historywork/WriteSnapshotWork.h" +#include "ledger/LedgerHeaderUtils.h" #include "ledger/LedgerManager.h" #include "main/Application.h" #include "main/Config.h" @@ -28,6 +29,7 @@ #include "medida/metrics_registry.h" #include "overlay/StellarXDR.h" #include "process/ProcessManager.h" +#include "transactions/TransactionSQL.h" #include "util/GlobalChecks.h" #include "util/Logging.h" #include "util/Math.h" @@ -40,6 +42,7 @@ #include #include +#include #include namespace stellar @@ -47,10 +50,11 @@ namespace stellar using namespace std; -static string kSQLCreateStatement = "CREATE TABLE IF NOT EXISTS publishqueue (" - "ledger INTEGER PRIMARY KEY," - "state TEXT" - "); "; +static std::string kSQLCreateStatement = + "CREATE TABLE IF NOT EXISTS publishqueue (" + "ledger INTEGER PRIMARY KEY," + "state TEXT" + "); "; void HistoryManager::dropAll(Database& db) @@ -60,6 +64,135 @@ HistoryManager::dropAll(Database& db) st.execute(true); } +std::filesystem::path +HistoryManager::publishQueuePath(Config const& cfg) +{ + std::filesystem::path b = cfg.BUCKET_DIR_PATH; + return b / "publishqueue"; +} + +void +HistoryManager::createPublishQueueDir(Config const& cfg) +{ + fs::mkpath(HistoryManager::publishQueuePath(cfg)); +} + +std::filesystem::path +publishQueueFileName(uint32_t seq) +{ + return fs::hexStr(seq) + ".json"; +} + +std::filesystem::path +publishQueueTmpFileName(uint32_t seq) +{ + return fs::hexStr(seq) + ".json.dirty"; +} + +void +writeCheckpointFile(Application& app, HistoryArchiveState const& has, + bool finalize) +{ + releaseAssert( + app.getHistoryManager().isLastLedgerInCheckpoint(has.currentLedger)); + auto filename = publishQueueFileName(has.currentLedger); + auto tmpOut = app.getHistoryManager().getTmpDir() / filename; + has.save(tmpOut); + + // Immediately produce a final checkpoint JSON (suitable for confirmed + // ledgers) + if (finalize) + { + auto out = HistoryManager::publishQueuePath(app.getConfig()) / filename; + // Durable rename also fsyncs the file which ensures durability + fs::durableRename(tmpOut, out, + HistoryManager::publishQueuePath(app.getConfig())); + } + else + { + auto out = HistoryManager::publishQueuePath(app.getConfig()) / + publishQueueTmpFileName(has.currentLedger); + // Otherwise, white a temporary durable file, to be finalized once + // has.currentLedger is actually committed + fs::durableRename(tmpOut, out, + HistoryManager::publishQueuePath(app.getConfig())); + } +} + +void +HistoryManagerImpl::dropSQLBasedPublish() +{ + // soci::transaction is created externally during schema upgrade, so this + // function is atomic + releaseAssert(threadIsMain()); + + auto const& cfg = mApp.getConfig(); + auto& db = mApp.getDatabase(); + auto& sess = db.getSession(); + + // In case previous schema migration rolled back, cleanup files + fs::deltree(publishQueuePath(cfg)); + fs::deltree(getPublishHistoryDir(FileType::HISTORY_FILE_TYPE_LEDGER, cfg) + .parent_path()); + createPublishDir(FileType::HISTORY_FILE_TYPE_LEDGER, cfg); + createPublishDir(FileType::HISTORY_FILE_TYPE_TRANSACTIONS, cfg); + createPublishDir(FileType::HISTORY_FILE_TYPE_RESULTS, cfg); + HistoryManager::createPublishQueueDir(cfg); + + std::set checkpointLedgers; + // Migrate all the existing queued checkpoints to the new format + { + std::string state; + auto prep = db.getPreparedStatement("SELECT state FROM publishqueue;"); + auto& st = prep.statement(); + st.exchange(soci::into(state)); + st.define_and_bind(); + st.execute(true); + while (st.got_data()) + { + HistoryArchiveState has; + has.fromString(state); + releaseAssert(isLastLedgerInCheckpoint(has.currentLedger)); + checkpointLedgers.insert(has.currentLedger); + writeCheckpointFile(mApp, has, /* finalize */ true); + st.fetch(); + } + } + + auto freq = getCheckpointFrequency(); + uint32_t lastQueued = 0; + for (auto const& checkpoint : checkpointLedgers) + { + auto begin = firstLedgerInCheckpointContaining(checkpoint); + populateCheckpointFilesFromDB(mApp, sess, begin, freq, + mCheckpointBuilder); + LedgerHeaderUtils::copyToStream(db, sess, begin, freq, + mCheckpointBuilder); + // Checkpoints in publish queue are complete, so we can finalize them + mCheckpointBuilder.checkpointComplete(checkpoint); + lastQueued = std::max(lastQueued, checkpoint); + } + + auto lcl = LedgerHeaderUtils::loadMaxLedgerSeq(db); + if (lastQueued < lcl) + { + // Then, reconstruct any partial checkpoints that haven't yet been + // queued + populateCheckpointFilesFromDB(mApp, sess, + firstLedgerInCheckpointContaining(lcl), + freq, mCheckpointBuilder); + LedgerHeaderUtils::copyToStream(db, sess, + firstLedgerInCheckpointContaining(lcl), + freq, mCheckpointBuilder); + } + db.clearPreparedStatementCache(); + + // Now it's safe to drop obsolete SQL tables + sess << "DROP TABLE IF EXISTS publishqueue;"; + dropSupportTxHistory(db); + dropSupportTxSetHistory(db); +} + std::unique_ptr HistoryManager::create(Application& app) { @@ -76,6 +209,7 @@ HistoryManagerImpl::HistoryManagerImpl(Application& app) app.getMetrics().NewMeter({"history", "publish", "failure"}, "event")) , mEnqueueToPublishTimer( app.getMetrics().NewTimer({"history", "publish", "time"})) + , mCheckpointBuilder(app) { } @@ -125,18 +259,58 @@ HistoryManagerImpl::logAndUpdatePublishStatus() } } +bool +isPublishFile(std::string const& name) +{ + std::regex re("^[a-z0-9]{8}\\.json$"); + auto a = regex_match(name, re); + return a; +} + +bool +isPublishTmpFile(std::string const& name) +{ + std::regex re("^[a-z0-9]{8}\\.json.dirty$"); + auto a = regex_match(name, re); + return a; +} + +std::vector +findPublishFiles(std::string const& dir) +{ + return fs::findfiles(dir, isPublishFile); +} + +void +iterateOverCheckpoints(std::vector const& files, + std::function f) +{ + for (auto const& file : files) + { + uint32_t seq = std::stoul(file.substr(0, 8), nullptr, 16); + f(seq, file); + } +} + +void +forEveryQueuedCheckpoint(std::string const& dir, + std::function f) +{ + iterateOverCheckpoints(findPublishFiles(dir), f); +} + +void +forEveryTmpCheckpoint(std::string const& dir, + std::function f) +{ + iterateOverCheckpoints(fs::findfiles(dir, isPublishTmpFile), f); +} + size_t HistoryManagerImpl::publishQueueLength() const { ZoneScoped; - uint32_t count; - auto prep = mApp.getDatabase().getPreparedStatement( - "SELECT count(ledger) FROM publishqueue;"); - auto& st = prep.statement(); - st.exchange(soci::into(count)); - st.define_and_bind(); - st.execute(true); - return count; + return findPublishFiles(publishQueuePath(mApp.getConfig())).size(); } string const& @@ -161,38 +335,22 @@ uint32_t HistoryManagerImpl::getMinLedgerQueuedToPublish() { ZoneScoped; - uint32_t seq; - soci::indicator minIndicator; - auto prep = mApp.getDatabase().getPreparedStatement( - "SELECT min(ledger) FROM publishqueue;"); - auto& st = prep.statement(); - st.exchange(soci::into(seq, minIndicator)); - st.define_and_bind(); - st.execute(true); - if (minIndicator == soci::indicator::i_ok) - { - return seq; - } - return 0; + auto min = std::numeric_limits::max(); + forEveryQueuedCheckpoint( + publishQueuePath(mApp.getConfig()), + [&](uint32_t seq, std::string const& f) { min = std::min(min, seq); }); + return min; } uint32_t HistoryManagerImpl::getMaxLedgerQueuedToPublish() { ZoneScoped; - uint32_t seq; - soci::indicator maxIndicator; - auto prep = mApp.getDatabase().getPreparedStatement( - "SELECT max(ledger) FROM publishqueue;"); - auto& st = prep.statement(); - st.exchange(soci::into(seq, maxIndicator)); - st.define_and_bind(); - st.execute(true); - if (maxIndicator == soci::indicator::i_ok) - { - return seq; - } - return 0; + auto max = std::numeric_limits::min(); + forEveryQueuedCheckpoint( + publishQueuePath(mApp.getConfig()), + [&](uint32_t seq, std::string const& f) { max = std::max(max, seq); }); + return max; } bool @@ -204,7 +362,7 @@ HistoryManagerImpl::maybeQueueHistoryCheckpoint() return false; } - if (!mApp.getHistoryArchiveManager().hasAnyWritableHistoryArchive()) + if (!mApp.getHistoryArchiveManager().publishEnabled()) { CLOG_DEBUG(History, "Skipping checkpoint, no writable history archives"); @@ -232,17 +390,8 @@ HistoryManagerImpl::queueCurrentHistory() CLOG_DEBUG(History, "Queueing publish state for ledger {}", ledger); mEnqueueTimes.emplace(ledger, std::chrono::steady_clock::now()); - auto state = has.toString(); - auto prep = mApp.getDatabase().getPreparedStatement( - "INSERT INTO publishqueue (ledger, state) VALUES (:lg, :st);"); - auto& st = prep.statement(); - st.exchange(soci::use(ledger)); - st.exchange(soci::use(state)); - st.define_and_bind(); - { - ZoneNamedN(insertPublishQueueZone, "insert publishqueue", true); - st.execute(true); - } + // We queue history inside ledger commit, so do not finalize the file yet + writeCheckpointFile(mApp, has, /* finalize */ false); // We have now written the current HAS to the database, so // it's "safe" to crash (at least after the enclosing tx commits); @@ -327,24 +476,48 @@ HistoryManagerImpl::publishQueuedHistory() #endif ZoneScoped; - std::string state; - - auto prep = mApp.getDatabase().getPreparedStatement( - "SELECT state FROM publishqueue" - " ORDER BY ledger ASC LIMIT 1;"); - auto& st = prep.statement(); - soci::indicator stateIndicator; - st.exchange(soci::into(state, stateIndicator)); - st.define_and_bind(); - st.execute(true); - if (st.got_data() && stateIndicator == soci::indicator::i_ok) + HistoryArchiveState has; + auto seq = getMinLedgerQueuedToPublish(); + + if (seq == std::numeric_limits::max()) + { + return 0; + } + + auto file = publishQueuePath(mApp.getConfig()) / publishQueueFileName(seq); + has.load(file); + takeSnapshotAndPublish(has); + return 1; +} + +void +HistoryManagerImpl::maybeCheckpointComplete() +{ + uint32_t lcl = mApp.getLedgerManager().getLastClosedLedgerNum(); + if (!publishCheckpointOnLedgerClose(lcl) || + !mApp.getHistoryArchiveManager().publishEnabled()) + { + return; + } + + mCheckpointBuilder.checkpointComplete(lcl); + + auto finalizedHAS = + publishQueuePath(mApp.getConfig()) / publishQueueFileName(lcl); + if (fs::exists(finalizedHAS.c_str())) + { + CLOG_INFO(History, "{} exists, nothing to do", finalizedHAS); + return; + } + + auto temp = HistoryManager::publishQueuePath(mApp.getConfig()) / + publishQueueTmpFileName(lcl); + if (!fs::durableRename(temp, finalizedHAS, + HistoryManager::publishQueuePath(mApp.getConfig()))) { - HistoryArchiveState has; - has.fromString(state); - takeSnapshotAndPublish(has); - return 1; + throw std::runtime_error(fmt::format( + "Failed to rename {} to {}", temp.string(), finalizedHAS.string())); } - return 0; } std::vector @@ -352,20 +525,14 @@ HistoryManagerImpl::getPublishQueueStates() { ZoneScoped; std::vector states; - - std::string state; - auto prep = mApp.getDatabase().getPreparedStatement( - "SELECT state FROM publishqueue;"); - auto& st = prep.statement(); - st.exchange(soci::into(state)); - st.define_and_bind(); - st.execute(true); - while (st.got_data()) - { - states.emplace_back(); - states.back().fromString(state); - st.fetch(); - } + forEveryQueuedCheckpoint(publishQueuePath(mApp.getConfig()), + [&](uint32_t seq, std::string const& f) { + HistoryArchiveState has; + auto fullPath = + publishQueuePath(mApp.getConfig()) / f; + has.load(fullPath); + states.push_back(has); + }); return states; } @@ -419,6 +586,26 @@ HistoryManagerImpl::getMissingBucketsReferencedByPublishQueue() return std::vector(buckets.begin(), buckets.end()); } +void +HistoryManagerImpl::deletePublishedFiles(uint32_t ledgerSeq, Config const& cfg) +{ + releaseAssert(isLastLedgerInCheckpoint(ledgerSeq)); + FileTransferInfo res(FileType::HISTORY_FILE_TYPE_RESULTS, ledgerSeq, + mApp.getConfig()); + FileTransferInfo txs(FileType::HISTORY_FILE_TYPE_TRANSACTIONS, ledgerSeq, + mApp.getConfig()); + FileTransferInfo headers(FileType::HISTORY_FILE_TYPE_LEDGER, ledgerSeq, + mApp.getConfig()); + // Dirty files shouldn't exist, but cleanup just in case + std::remove(res.localPath_nogz_dirty().c_str()); + std::remove(txs.localPath_nogz_dirty().c_str()); + std::remove(headers.localPath_nogz_dirty().c_str()); + // Remove published files + std::remove(res.localPath_nogz().c_str()); + std::remove(txs.localPath_nogz().c_str()); + std::remove(headers.localPath_nogz().c_str()); +} + void HistoryManagerImpl::historyPublished( uint32_t ledgerSeq, std::vector const& originalBuckets, @@ -440,17 +627,11 @@ HistoryManagerImpl::historyPublished( } this->mPublishSuccess.Mark(); - auto prep = mApp.getDatabase().getPreparedStatement( - "DELETE FROM publishqueue WHERE ledger = :lg;"); - auto& st = prep.statement(); - st.exchange(soci::use(ledgerSeq)); - st.define_and_bind(); - { - ZoneNamedN(deletePublishQueueZone, "delete publishqueue", true); - st.execute(true); - } - + auto file = publishQueuePath(mApp.getConfig()) / + publishQueueFileName(ledgerSeq); + std::remove(file.c_str()); mPublishQueueBuckets.removeBuckets(originalBuckets); + deletePublishedFiles(ledgerSeq, mApp.getConfig()); } else { @@ -462,15 +643,53 @@ HistoryManagerImpl::historyPublished( } void -HistoryManagerImpl::deleteCheckpointsNewerThan(uint32_t ledgerSeq) +HistoryManagerImpl::appendTransactionSet(uint32_t ledgerSeq, + TxSetXDRFrameConstPtr const& txSet, + TransactionResultSet const& resultSet) { - ZoneScoped; - auto prep = mApp.getDatabase().getPreparedStatement( - "DELETE FROM publishqueue WHERE ledger >= :lg;"); - auto& st = prep.statement(); - st.exchange(soci::use(ledgerSeq)); - st.define_and_bind(); - st.execute(true); + if (mApp.getHistoryArchiveManager().publishEnabled()) + { + mCheckpointBuilder.appendTransactionSet(ledgerSeq, txSet, resultSet); + } +} + +void +HistoryManagerImpl::appendLedgerHeader(LedgerHeader const& header) +{ + if (mApp.getHistoryArchiveManager().publishEnabled()) + { + mCheckpointBuilder.appendLedgerHeader(header); +#ifdef BUILD_TESTS + CLOG_INFO(History, "Appending ledger header for ledger {} {}", + header.ledgerSeq, mThrowOnAppend); + if (header.ledgerSeq == mThrowOnAppend) + { + throw std::runtime_error("Throwing for testing"); + } +#endif + } +} + +void +HistoryManagerImpl::restoreCheckpoint(uint32_t lcl) +{ + if (mApp.getHistoryArchiveManager().publishEnabled()) + { + mCheckpointBuilder.cleanup(lcl); + // Remove any tmp checkpoints that were potentially queued _before_ a + // crash, causing ledger rollback. These files will be finalized during + // successful ledger close. + forEveryTmpCheckpoint(publishQueuePath(mApp.getConfig()), + [&](uint32_t seq, std::string const& f) { + if (seq > lcl) + { + std::remove(f.c_str()); + } + }); + // Maybe finalize checkpoint if we're at a checkpoint boundary and + // haven't rotated yet. No-op if checkpoint has been rotated already + maybeCheckpointComplete(); + } } uint64_t diff --git a/src/history/HistoryManagerImpl.h b/src/history/HistoryManagerImpl.h index c0c6057adb..e03251f2f9 100644 --- a/src/history/HistoryManagerImpl.h +++ b/src/history/HistoryManagerImpl.h @@ -5,6 +5,7 @@ // of this distribution or at http://www.apache.org/licenses/LICENSE-2.0 #include "bucket/PublishQueueBuckets.h" +#include "history/CheckpointBuilder.h" #include "history/HistoryManager.h" #include "util/TmpDir.h" #include "work/Work.h" @@ -13,6 +14,7 @@ namespace medida { class Meter; +class Timer; } namespace stellar @@ -36,6 +38,7 @@ class HistoryManagerImpl : public HistoryManager medida::Timer& mEnqueueToPublishTimer; UnorderedMap mEnqueueTimes; + CheckpointBuilder mCheckpointBuilder; PublishQueueBuckets::BucketCount loadBucketsReferencedByPublishQueue(); #ifdef BUILD_TESTS @@ -64,6 +67,9 @@ class HistoryManagerImpl : public HistoryManager size_t publishQueuedHistory() override; + void maybeCheckpointComplete() override; + void dropSQLBasedPublish() override; + std::vector getMissingBucketsReferencedByPublishQueue() override; @@ -74,8 +80,12 @@ class HistoryManagerImpl : public HistoryManager void historyPublished(uint32_t ledgerSeq, std::vector const& originalBuckets, bool success) override; - - void deleteCheckpointsNewerThan(uint32_t ledgerSeq) override; + void appendTransactionSet(uint32_t ledgerSeq, + TxSetXDRFrameConstPtr const& txSet, + TransactionResultSet const& resultSet) override; + void appendLedgerHeader(LedgerHeader const& header) override; + void restoreCheckpoint(uint32_t lcl) override; + void deletePublishedFiles(uint32_t ledgerSeq, Config const& cfg) override; std::string const& getTmpDir() override; @@ -87,6 +97,13 @@ class HistoryManagerImpl : public HistoryManager #ifdef BUILD_TESTS void setPublicationEnabled(bool enabled) override; + // Throw after inseting ledger `n` into a checkpoint + uint32_t mThrowOnAppend{0}; + CheckpointBuilder& + getCheckpointBuilder() + { + return mCheckpointBuilder; + } #endif }; } diff --git a/src/history/StateSnapshot.cpp b/src/history/StateSnapshot.cpp index 6c03902a2c..3995aeea1b 100644 --- a/src/history/StateSnapshot.cpp +++ b/src/history/StateSnapshot.cpp @@ -29,16 +29,19 @@ StateSnapshot::StateSnapshot(Application& app, HistoryArchiveState const& state) , mLocalState(state) , mSnapDir(app.getTmpDirManager().tmpDir("snapshot")) , mLedgerSnapFile(std::make_shared( - mSnapDir, HISTORY_FILE_TYPE_LEDGER, mLocalState.currentLedger)) + FileType::HISTORY_FILE_TYPE_LEDGER, mLocalState.currentLedger, + mApp.getConfig())) , mTransactionSnapFile(std::make_shared( - mSnapDir, HISTORY_FILE_TYPE_TRANSACTIONS, mLocalState.currentLedger)) + FileType::HISTORY_FILE_TYPE_TRANSACTIONS, mLocalState.currentLedger, + mApp.getConfig())) , mTransactionResultSnapFile(std::make_shared( - mSnapDir, HISTORY_FILE_TYPE_RESULTS, mLocalState.currentLedger)) + FileType::HISTORY_FILE_TYPE_RESULTS, mLocalState.currentLedger, + mApp.getConfig())) , mSCPHistorySnapFile(std::make_shared( - mSnapDir, HISTORY_FILE_TYPE_SCP, mLocalState.currentLedger)) + mSnapDir, FileType::HISTORY_FILE_TYPE_SCP, mLocalState.currentLedger)) { if (mLocalState.currentBuckets.size() != BucketList::kNumLevels) @@ -48,7 +51,7 @@ StateSnapshot::StateSnapshot(Application& app, HistoryArchiveState const& state) } bool -StateSnapshot::writeHistoryBlocks() const +StateSnapshot::writeSCPMessages() const { ZoneScoped; std::unique_ptr snapSess( @@ -62,18 +65,16 @@ StateSnapshot::writeHistoryBlocks() const // headers, one TransactionHistoryEntry (which contain txSets), // one TransactionHistoryResultEntry containing transaction set results and // one (optional) SCPHistoryEntry containing the SCP messages used to close. - // All files are streamed out of the database, entry-by-entry. + // Only SCP messages are stream out of database, entry by entry. + // The rest are built incrementally during ledger close. size_t nbSCPMessages; uint32_t begin, count; - size_t nHeaders; { bool doFsync = !mApp.getConfig().DISABLE_XDR_FSYNC; asio::io_context& ctx = mApp.getClock().getIOContext(); - XDROutputFileStream ledgerOut(ctx, doFsync), txOut(ctx, doFsync), - txResultOut(ctx, doFsync), scpHistory(ctx, doFsync); - ledgerOut.open(mLedgerSnapFile->localPath_nogz()); - txOut.open(mTransactionSnapFile->localPath_nogz()); - txResultOut.open(mTransactionResultSnapFile->localPath_nogz()); + + // Extract SCP messages from the database + XDROutputFileStream scpHistory(ctx, doFsync); scpHistory.open(mSCPHistorySnapFile->localPath_nogz()); auto& hm = mApp.getHistoryManager(); @@ -82,17 +83,6 @@ StateSnapshot::writeHistoryBlocks() const CLOG_DEBUG(History, "Streaming {} ledgers worth of history, from {}", count, begin); - nHeaders = LedgerHeaderUtils::copyToStream(mApp.getDatabase(), sess, - begin, count, ledgerOut); - - size_t nTxs = copyTransactionsToStream(mApp, sess, begin, count, txOut, - txResultOut); - CLOG_DEBUG(History, "Wrote {} ledger headers to {}", nHeaders, - mLedgerSnapFile->localPath_nogz()); - CLOG_DEBUG(History, "Wrote {} transactions to {} and {}", nTxs, - mTransactionSnapFile->localPath_nogz(), - mTransactionResultSnapFile->localPath_nogz()); - nbSCPMessages = HerderPersistence::copySCPHistoryToStream( mApp.getDatabase(), sess, begin, count, scpHistory); @@ -109,22 +99,6 @@ StateSnapshot::writeHistoryBlocks() const // When writing checkpoint 0x3f (63) we will have written 63 headers because // header 0 doesn't exist, ledger 1 is the first. For all later checkpoints // we will write 64 headers; any less and something went wrong[1]. - // - // [1]: Probably our read transaction was serialized ahead of the write - // transaction composing the history itself, despite occurring in the - // opposite wall-clock order, this is legal behavior in SERIALIZABLE - // transaction-isolation level -- the highest offered! -- as txns only have - // to be applied in isolation and in _some_ order, not the wall-clock order - // we issued them. Anyway this is transient and should go away upon retry. - if (nHeaders != count) - { - CLOG_WARNING( - History, - "Only wrote {} ledger headers for {}, expecting {}, will retry", - nHeaders, mLedgerSnapFile->localPath_nogz(), count); - return false; - } - return true; } diff --git a/src/history/StateSnapshot.h b/src/history/StateSnapshot.h index 08987db80e..eb98bafc22 100644 --- a/src/history/StateSnapshot.h +++ b/src/history/StateSnapshot.h @@ -28,7 +28,7 @@ struct StateSnapshot : public std::enable_shared_from_this std::shared_ptr mSCPHistorySnapFile; StateSnapshot(Application& app, HistoryArchiveState const& state); - bool writeHistoryBlocks() const; + bool writeSCPMessages() const; std::vector> differingHASFiles(HistoryArchiveState const& other); }; diff --git a/src/history/test/HistoryTests.cpp b/src/history/test/HistoryTests.cpp index 2b882a9099..a6d7b69763 100644 --- a/src/history/test/HistoryTests.cpp +++ b/src/history/test/HistoryTests.cpp @@ -6,9 +6,10 @@ #include "bucket/test/BucketTestUtils.h" #include "catchup/CatchupManagerImpl.h" #include "catchup/test/CatchupWorkTests.h" +#include "history/CheckpointBuilder.h" #include "history/FileTransferInfo.h" #include "history/HistoryArchiveManager.h" -#include "history/HistoryManager.h" +#include "history/HistoryManagerImpl.h" #include "history/test/HistoryTestsUtils.h" #include "historywork/GetHistoryArchiveStateWork.h" #include "historywork/GunzipFileWork.h" @@ -358,7 +359,7 @@ TEST_CASE("Ledger chain verification", "[ledgerheaderverification]") { std::tie(lcl, last) = ledgerChainGenerator.makeLedgerChainFiles( HistoryManager::VERIFY_STATUS_OK); - FileTransferInfo ft(tmpDir, HISTORY_FILE_TYPE_LEDGER, + FileTransferInfo ft(tmpDir, FileType::HISTORY_FILE_TYPE_LEDGER, last.header.ledgerSeq); std::remove(ft.localPath_nogz().c_str()); @@ -382,7 +383,7 @@ TEST_CASE("Tx results verification", "[batching][resultsverification]") catchupSimulation.getApp().getHistoryManager()}; auto verifyHeadersWork = wm.executeWork( - range, HISTORY_FILE_TYPE_LEDGER, tmpDir); + range, FileType::HISTORY_FILE_TYPE_LEDGER, tmpDir); REQUIRE(verifyHeadersWork->getState() == BasicWork::State::WORK_SUCCESS); SECTION("basic") { @@ -392,7 +393,8 @@ TEST_CASE("Tx results verification", "[batching][resultsverification]") } SECTION("header file missing") { - FileTransferInfo ft(tmpDir, HISTORY_FILE_TYPE_LEDGER, range.last()); + FileTransferInfo ft(tmpDir, FileType::HISTORY_FILE_TYPE_LEDGER, + range.last()); std::remove(ft.localPath_nogz().c_str()); auto verify = wm.executeWork(range, tmpDir); @@ -400,7 +402,8 @@ TEST_CASE("Tx results verification", "[batching][resultsverification]") } SECTION("hash mismatch") { - FileTransferInfo ft(tmpDir, HISTORY_FILE_TYPE_LEDGER, range.last()); + FileTransferInfo ft(tmpDir, FileType::HISTORY_FILE_TYPE_LEDGER, + range.last()); XDRInputFileStream res; res.open(ft.localPath_nogz()); std::vector entries; @@ -431,10 +434,11 @@ TEST_CASE("Tx results verification", "[batching][resultsverification]") SECTION("invalid result entries") { auto getResults = wm.executeWork( - range, HISTORY_FILE_TYPE_RESULTS, tmpDir); + range, FileType::HISTORY_FILE_TYPE_RESULTS, tmpDir); REQUIRE(getResults->getState() == BasicWork::State::WORK_SUCCESS); - FileTransferInfo ft(tmpDir, HISTORY_FILE_TYPE_RESULTS, range.last()); + FileTransferInfo ft(tmpDir, FileType::HISTORY_FILE_TYPE_RESULTS, + range.last()); XDRInputFileStream res; res.open(ft.localPath_nogz()); std::vector entries; @@ -469,6 +473,109 @@ TEST_CASE("History publish", "[history][publish]") catchupSimulation.ensureOfflineCatchupPossible(checkpointLedger); } +void +validateCheckpointFiles(Application& app, uint32_t ledger, bool isFinalized) +{ + auto const& cfg = app.getConfig(); + auto validateHdr = [](std::string path, uint32_t ledger) { + XDRInputFileStream hdrIn; + hdrIn.open(path); + LedgerHeaderHistoryEntry entry; + while (hdrIn && hdrIn.readOne(entry)) + { + REQUIRE(entry.header.ledgerSeq <= ledger); + } + REQUIRE(entry.header.ledgerSeq == ledger); + }; + + auto checkpoint = + app.getHistoryManager().checkpointContainingLedger(ledger); + FileTransferInfo res(FileType::HISTORY_FILE_TYPE_RESULTS, checkpoint, cfg); + FileTransferInfo txs(FileType::HISTORY_FILE_TYPE_TRANSACTIONS, checkpoint, + cfg); + FileTransferInfo headers(FileType::HISTORY_FILE_TYPE_LEDGER, checkpoint, + cfg); + if (isFinalized) + { + REQUIRE(fs::exists(res.localPath_nogz())); + REQUIRE(fs::exists(txs.localPath_nogz())); + REQUIRE(!fs::exists(res.localPath_nogz_dirty())); + REQUIRE(!fs::exists(txs.localPath_nogz_dirty())); + REQUIRE(!fs::exists(headers.localPath_nogz_dirty())); + validateHdr(headers.localPath_nogz(), ledger); + } + else + { + REQUIRE(!fs::exists(res.localPath_nogz())); + REQUIRE(!fs::exists(txs.localPath_nogz())); + REQUIRE(!fs::exists(headers.localPath_nogz())); + REQUIRE(fs::exists(res.localPath_nogz_dirty())); + REQUIRE(fs::exists(txs.localPath_nogz_dirty())); + validateHdr(headers.localPath_nogz_dirty(), ledger); + } +} + +TEST_CASE("History publish with restart", "[history][publish]") +{ + auto catchupSimulation = + CatchupSimulation(VirtualClock::VIRTUAL_TIME, + std::make_shared(), true, + Config::TESTDB_ON_DISK_SQLITE); + auto checkpointLedger = catchupSimulation.getLastCheckpointLedger(2); + + // Restart at various points in the checkpoint, core should continue + // properly writing checkpoint files + auto ledgerNums = std::vector{ + LedgerManager::GENESIS_LEDGER_SEQ, + 10, + catchupSimulation.getLastCheckpointLedger(1), + catchupSimulation.getApp() + .getHistoryManager() + .firstLedgerInCheckpointContaining(checkpointLedger), + checkpointLedger - 1, + checkpointLedger}; + for (auto ledgerNum : ledgerNums) + { + SECTION("Restart at ledger " + std::to_string(ledgerNum)) + { + SECTION("graceful") + { + catchupSimulation.ensureOfflineCatchupPossible(checkpointLedger, + ledgerNum); + } + SECTION("crash leaves dirty data") + { + if (ledgerNum == LedgerManager::GENESIS_LEDGER_SEQ) + { + // Genesis ledger is established when the app is created + continue; + } + auto& hm = static_cast( + catchupSimulation.getApp().getHistoryManager()); + hm.mThrowOnAppend = ledgerNum; + REQUIRE_THROWS_AS( + catchupSimulation.ensureOfflineCatchupPossible( + checkpointLedger), + std::runtime_error); + // Before proceeding, ensure files are actually corrupt + validateCheckpointFiles(catchupSimulation.getApp(), ledgerNum, + false); + // Restart app, truncate dirty data in checkpoints, proceed to + // publish + catchupSimulation.restartApp(); + catchupSimulation.ensureOfflineCatchupPossible( + checkpointLedger); + } + + // Now catchup to ensure published checkpoints are valid + auto app = catchupSimulation.createCatchupApplication( + std::numeric_limits::max(), + Config::TESTDB_ON_DISK_SQLITE, "app"); + REQUIRE(catchupSimulation.catchupOffline(app, checkpointLedger)); + } + } +} + TEST_CASE("History publish to multiple archives", "[history]") { Config cfg(getTestConfig()); @@ -1615,3 +1722,77 @@ TEST_CASE("Externalize gap while catchup work is running", "[history][catchup]") REQUIRE(catchupSimulation.catchupOnline(app, lcl + 2, 0, 0, 0, {128, 129, 127})); } + +TEST_CASE("CheckpointBuilder", "[history][publish]") +{ + VirtualClock clock; + auto cfg = getTestConfig(0, Config::TESTDB_ON_DISK_SQLITE); + TmpDirHistoryConfigurator().configure(cfg, true); + + auto app = createTestApplication(clock, cfg); + releaseAssert(app->getLedgerManager().getLastClosedLedgerNum() == + LedgerManager::GENESIS_LEDGER_SEQ); + auto& hm = static_cast(app->getHistoryManager()); + auto& cb = hm.getCheckpointBuilder(); + auto lcl = app->getLedgerManager().getLastClosedLedgerNum(); + + auto generate = [&](uint32_t count, bool appendHeaders = true) { + for (int i = lcl; i < lcl + count; ++i) + { + LedgerHeaderHistoryEntry lh; + lh.header.ledgerSeq = i; + cb.appendTransactionSet(i, TxSetXDRFrame::makeEmpty(lh), + TransactionResultSet{}); + // Do not append last ledger in a checkpoint if `appendHeaders` is + // false + if (!appendHeaders && i == count) + { + continue; + } + cb.appendLedgerHeader(lh.header); + } + }; + + SECTION("recover") + { + SECTION("recover transactions, but not headers") + { + generate(10, false); + validateCheckpointFiles(*app, 9, false); + } + SECTION("recover both") + { + generate(10); + validateCheckpointFiles(*app, 10, false); + } + SECTION("recover due to partial write") + { + generate(10); + validateCheckpointFiles(*app, 10, false); + FileTransferInfo headers( + FileType::HISTORY_FILE_TYPE_LEDGER, + app->getHistoryManager().checkpointContainingLedger(10), + app->getConfig()); + auto sz = + std::filesystem::file_size(headers.localPath_nogz_dirty()); + std::filesystem::resize_file(headers.localPath_nogz_dirty(), + sz - 1); + } + CheckpointBuilder cb2{*app}; + cb2.cleanup(9); + validateCheckpointFiles(*app, 9, false); + } + SECTION("checkpoint complete") + { + auto ledgerSeq = hm.checkpointContainingLedger(1); + // Checkpoint not finalized + generate(ledgerSeq); + validateCheckpointFiles(*app, ledgerSeq, false); + cb.checkpointComplete(ledgerSeq); + validateCheckpointFiles(*app, ledgerSeq, true); + REQUIRE(!cb.mOpen); + // any subssequent call to checkpointComplete is a no-op + cb.checkpointComplete(ledgerSeq); + validateCheckpointFiles(*app, ledgerSeq, true); + } +} diff --git a/src/history/test/HistoryTestsUtils.cpp b/src/history/test/HistoryTestsUtils.cpp index 5119d372a4..908548b433 100644 --- a/src/history/test/HistoryTestsUtils.cpp +++ b/src/history/test/HistoryTestsUtils.cpp @@ -187,8 +187,8 @@ TestBucketGenerator::generateBucket(TestBucketState state) // Upload generated bucket to the archive { - FileTransferInfo ft{mTmpDir->getName(), HISTORY_FILE_TYPE_BUCKET, - binToHex(hash)}; + FileTransferInfo ft{mTmpDir->getName(), + FileType::HISTORY_FILE_TYPE_BUCKET, binToHex(hash)}; auto& wm = mApp.getWorkScheduler(); auto put = std::make_shared( mApp, filename + ".gz", ft.remoteName(), mArchive); @@ -235,7 +235,8 @@ TestLedgerChainGenerator::createHistoryFiles( LedgerHeaderHistoryEntry& first, LedgerHeaderHistoryEntry& last, uint32_t checkpoint) { - FileTransferInfo ft{mTmpDir, HISTORY_FILE_TYPE_LEDGER, checkpoint}; + FileTransferInfo ft{mTmpDir, FileType::HISTORY_FILE_TYPE_LEDGER, + checkpoint}; XDROutputFileStream ledgerOut(mApp.getClock().getIOContext(), /*doFsync=*/true); ledgerOut.open(ft.localPath_nogz()); @@ -376,24 +377,23 @@ operator!=(CatchupPerformedWork const& x, CatchupPerformedWork const& y) CatchupSimulation::CatchupSimulation(VirtualClock::Mode mode, std::shared_ptr cg, - bool startApp) - : mClock(mode) + bool startApp, Config::TestDbMode dbMode) + : mClock(std::make_unique(mode)) , mHistoryConfigurator(cg) - , mCfg(getTestConfig()) - , mAppPtr(createTestApplication(mClock, + , mCfg(getTestConfig(0, dbMode)) + , mAppPtr(createTestApplication(*mClock, mHistoryConfigurator->configure(mCfg, true), /*newDB*/ true, /*startApp*/ false)) - , mApp(*mAppPtr) { auto dirName = cg->getArchiveDirName(); if (!dirName.empty()) { - CHECK( - mApp.getHistoryArchiveManager().initializeHistoryArchive(dirName)); + CHECK(getApp().getHistoryArchiveManager().initializeHistoryArchive( + dirName)); } if (startApp) { - mApp.start(); + mAppPtr->start(); } } @@ -404,26 +404,27 @@ CatchupSimulation::~CatchupSimulation() uint32_t CatchupSimulation::getLastCheckpointLedger(uint32_t checkpointIndex) const { - return mApp.getHistoryManager().getCheckpointFrequency() * checkpointIndex - + return getApp().getHistoryManager().getCheckpointFrequency() * + checkpointIndex - 1; } void CatchupSimulation::generateRandomLedger(uint32_t version) { - auto& lm = mApp.getLedgerManager(); + auto& lm = getApp().getLedgerManager(); uint32_t ledgerSeq = lm.getLastClosedLedgerNum() + 1; uint64_t minBalance = lm.getLastMinBalance(5); uint64_t big = minBalance + ledgerSeq; uint64_t small = 100 + ledgerSeq; uint64_t closeTime = 60 * 5 * ledgerSeq; - auto root = TestAccount{mApp, getRoot(mApp.getNetworkID())}; - auto alice = TestAccount{mApp, getAccount("alice")}; - auto bob = TestAccount{mApp, getAccount("bob")}; - auto carol = TestAccount{mApp, getAccount("carol")}; - auto eve = TestAccount{mApp, getAccount("eve")}; - auto stroopy = TestAccount{mApp, getAccount("stroopy")}; + auto root = TestAccount{getApp(), getRoot(getApp().getNetworkID())}; + auto alice = TestAccount{getApp(), getAccount("alice")}; + auto bob = TestAccount{getApp(), getAccount("bob")}; + auto carol = TestAccount{getApp(), getAccount("carol")}; + auto eve = TestAccount{getApp(), getAccount("eve")}; + auto stroopy = TestAccount{getApp(), getAccount("stroopy")}; std::vector txs; std::vector sorobanTxs; @@ -484,16 +485,17 @@ CatchupSimulation::generateRandomLedger(uint32_t version) SOROBAN_PROTOCOL_VERSION)) { SorobanResources res; - res.instructions = - mApp.getLedgerManager().maxSorobanTransactionResources().getVal( - Resource::Type::INSTRUCTIONS) / - 10; + res.instructions = getApp() + .getLedgerManager() + .maxSorobanTransactionResources() + .getVal(Resource::Type::INSTRUCTIONS) / + 10; res.writeBytes = 100'000; uint32_t inclusion = 100; sorobanTxs.push_back(createUploadWasmTx( - mApp, stroopy, inclusion, DEFAULT_TEST_RESOURCE_FEE, res)); + getApp(), stroopy, inclusion, DEFAULT_TEST_RESOURCE_FEE, res)); sorobanTxs.push_back(createUploadWasmTx( - mApp, eve, inclusion * 5, DEFAULT_TEST_RESOURCE_FEE, res)); + getApp(), eve, inclusion * 5, DEFAULT_TEST_RESOURCE_FEE, res)); check = true; } } @@ -504,7 +506,7 @@ CatchupSimulation::generateRandomLedger(uint32_t version) ? TxSetPhaseTransactions{txs, sorobanTxs} : TxSetPhaseTransactions{txs}; TxSetXDRFrameConstPtr txSet = - makeTxSetFromTransactions(phases, mApp, 0, 0).first; + makeTxSetFromTransactions(phases, getApp(), 0, 0).first; CLOG_INFO(History, "Closing synthetic ledger {} with {} txs (txhash:{})", ledgerSeq, txSet->sizeTxTotal(), @@ -519,14 +521,14 @@ CatchupSimulation::generateRandomLedger(uint32_t version) upgrades.push_back(UpgradeType{v.begin(), v.end()}); } - StellarValue sv = - mApp.getHerder().makeStellarValue(txSet->getContentsHash(), closeTime, - upgrades, mApp.getConfig().NODE_SEED); + StellarValue sv = getApp().getHerder().makeStellarValue( + txSet->getContentsHash(), closeTime, upgrades, + getApp().getConfig().NODE_SEED); mLedgerCloseDatas.emplace_back(ledgerSeq, txSet, sv); auto& txsSucceeded = - mApp.getMetrics().NewCounter({"ledger", "apply", "success"}); + getApp().getMetrics().NewCounter({"ledger", "apply", "success"}); auto lastSucceeded = txsSucceeded.count(); lm.closeLedger(mLedgerCloseDatas.back()); @@ -542,12 +544,14 @@ CatchupSimulation::generateRandomLedger(uint32_t version) mLedgerSeqs.push_back(lclh.header.ledgerSeq); mLedgerHashes.push_back(lclh.hash); mBucketListHashes.push_back(lclh.header.bucketListHash); - mBucket0Hashes.push_back(mApp.getBucketManager() + mBucket0Hashes.push_back(getApp() + .getBucketManager() .getBucketList() .getLevel(0) .getCurr() ->getHash()); - mBucket1Hashes.push_back(mApp.getBucketManager() + mBucket1Hashes.push_back(getApp() + .getBucketManager() .getBucketList() .getLevel(2) .getCurr() @@ -572,19 +576,25 @@ void CatchupSimulation::setUpgradeLedger(uint32_t ledger, ProtocolVersion upgradeProtocolVersion) { - REQUIRE(mApp.getLedgerManager().getLastClosedLedgerNum() < ledger); + REQUIRE(getApp().getLedgerManager().getLastClosedLedgerNum() < ledger); mUpgradeLedgerSeq = ledger; mUpgradeProtocolVersion = upgradeProtocolVersion; } void -CatchupSimulation::ensureLedgerAvailable(uint32_t targetLedger) +CatchupSimulation::ensureLedgerAvailable(uint32_t targetLedger, + std::optional restartLedger) { - auto& lm = mApp.getLedgerManager(); - auto& hm = mApp.getHistoryManager(); - while (lm.getLastClosedLedgerNum() < targetLedger) + while (getApp().getLedgerManager().getLastClosedLedgerNum() < targetLedger) { - auto lcl = lm.getLastClosedLedgerNum(); + if (restartLedger && + getApp().getLedgerManager().getLastClosedLedgerNum() == + *restartLedger) + { + REQUIRE(*restartLedger < targetLedger); + restartApp(); + } + auto lcl = getApp().getLedgerManager().getLastClosedLedgerNum(); if (lcl + 1 == mUpgradeLedgerSeq) { // Force protocol upgrade @@ -593,11 +603,13 @@ CatchupSimulation::ensureLedgerAvailable(uint32_t targetLedger) } else { - generateRandomLedger( - lm.getLastClosedLedgerHeader().header.ledgerVersion); + generateRandomLedger(getApp() + .getLedgerManager() + .getLastClosedLedgerHeader() + .header.ledgerVersion); } - if (hm.publishCheckpointOnLedgerClose(lcl)) + if (getApp().getHistoryManager().publishCheckpointOnLedgerClose(lcl)) { mBucketListAtLastPublish = getApp().getBucketManager().getBucketList(); @@ -608,26 +620,49 @@ CatchupSimulation::ensureLedgerAvailable(uint32_t targetLedger) void CatchupSimulation::ensurePublishesComplete() { - auto& hm = mApp.getHistoryManager(); - while (!mApp.getWorkScheduler().allChildrenDone() || - (hm.getPublishSuccessCount() < hm.getPublishQueueCount())) + auto& hm = getApp().getHistoryManager(); + while (hm.publishQueueLength() > 0 && hm.getPublishFailureCount() == 0) { - REQUIRE(hm.getPublishFailureCount() == 0); - mApp.getClock().crank(true); + getApp().getClock().crank(true); } REQUIRE(hm.getPublishFailureCount() == 0); // Make sure all references to buckets were released REQUIRE(hm.getBucketsReferencedByPublishQueue().empty()); + + // Make sure all published checkpoint files have been cleaned up + auto lcl = getApp().getLedgerManager().getLastClosedLedgerNum(); + auto firstCheckpoint = + hm.checkpointContainingLedger(LedgerManager::GENESIS_LEDGER_SEQ); + auto lastCheckpoint = hm.lastLedgerBeforeCheckpointContaining(lcl); + + for (uint32_t i = firstCheckpoint; i <= lastCheckpoint; + i += hm.getCheckpointFrequency()) + { + FileTransferInfo res(FileType::HISTORY_FILE_TYPE_RESULTS, i, + getApp().getConfig()); + FileTransferInfo txs(FileType::HISTORY_FILE_TYPE_TRANSACTIONS, i, + getApp().getConfig()); + FileTransferInfo headers(FileType::HISTORY_FILE_TYPE_LEDGER, i, + getApp().getConfig()); + REQUIRE(!fs::exists(res.localPath_nogz_dirty())); + REQUIRE(!fs::exists(txs.localPath_nogz_dirty())); + REQUIRE(!fs::exists(headers.localPath_nogz_dirty())); + REQUIRE(!fs::exists(res.localPath_nogz())); + REQUIRE(!fs::exists(txs.localPath_nogz())); + REQUIRE(!fs::exists(headers.localPath_nogz())); + } } void -CatchupSimulation::ensureOfflineCatchupPossible(uint32_t targetLedger) +CatchupSimulation::ensureOfflineCatchupPossible( + uint32_t targetLedger, std::optional restartLedger) { - auto& hm = mApp.getHistoryManager(); - // One additional ledger is needed for publish. - ensureLedgerAvailable(hm.checkpointContainingLedger(targetLedger) + 1); + auto target = + getApp().getHistoryManager().checkpointContainingLedger(targetLedger) + + 1; + ensureLedgerAvailable(target, restartLedger); ensurePublishesComplete(); } @@ -635,7 +670,7 @@ void CatchupSimulation::ensureOnlineCatchupPossible(uint32_t targetLedger, uint32_t bufferLedgers) { - auto& hm = mApp.getHistoryManager(); + auto& hm = getApp().getHistoryManager(); // One additional ledger is needed for publish, one as a trigger ledger for // catchup, one as closing ledger. @@ -651,7 +686,7 @@ CatchupSimulation::getAllPublishedCheckpoints() const assert(mLedgerHashes.size() == mLedgerSeqs.size()); auto hi = mLedgerHashes.begin(); auto si = mLedgerSeqs.begin(); - auto const& hm = mApp.getHistoryManager(); + auto const& hm = getApp().getHistoryManager(); while (si != mLedgerSeqs.end()) { if (hm.isLastLedgerInCheckpoint(*si)) @@ -674,7 +709,7 @@ CatchupSimulation::getLastPublishedCheckpoint() const assert(mLedgerHashes.size() == mLedgerSeqs.size()); auto hi = mLedgerHashes.rbegin(); auto si = mLedgerSeqs.rbegin(); - auto const& hm = mApp.getHistoryManager(); + auto const& hm = getApp().getHistoryManager(); while (si != mLedgerSeqs.rend()) { if (hm.isLastLedgerInCheckpoint(*si)) @@ -770,7 +805,7 @@ CatchupSimulation::catchupOffline(Application::pointer app, uint32_t toLedger, CatchupPerformedWork{endCatchupMetrics - startCatchupMetrics}; REQUIRE(catchupPerformedWork == expectedCatchupWork); - if (app->getHistoryArchiveManager().hasAnyWritableHistoryArchive()) + if (app->getHistoryArchiveManager().publishEnabled()) { auto& hm = app->getHistoryManager(); REQUIRE(hm.getPublishQueueCount() - hm.getPublishSuccessCount() <= @@ -932,7 +967,7 @@ CatchupSimulation::validateCatchup(Application::pointer app) size_t i = nextLedger - 3; - auto root = TestAccount{*app, getRoot(mApp.getNetworkID())}; + auto root = TestAccount{*app, getRoot(getApp().getNetworkID())}; auto alice = TestAccount{*app, getAccount("alice")}; auto bob = TestAccount{*app, getAccount("bob")}; auto carol = TestAccount{*app, getAccount("carol")}; @@ -1080,5 +1115,13 @@ CatchupSimulation::computeCatchupPerformedWork( txSetsDownloaded, txSetsApplied}; } + +void +CatchupSimulation::restartApp() +{ + mAppPtr.reset(); + mClock = std::make_unique(mClock->getMode()); + mAppPtr = createTestApplication(*mClock, mCfg, /*newDB*/ false); +} } } diff --git a/src/history/test/HistoryTestsUtils.h b/src/history/test/HistoryTestsUtils.h index eace1f3a6e..ce62a96098 100644 --- a/src/history/test/HistoryTestsUtils.h +++ b/src/history/test/HistoryTestsUtils.h @@ -178,13 +178,12 @@ struct CatchupPerformedWork class CatchupSimulation { protected: - VirtualClock mClock; + std::unique_ptr mClock; std::list mSpawnedAppsClocks; std::shared_ptr mHistoryConfigurator; Config mCfg; std::vector mCfgs; Application::pointer mAppPtr; - Application& mApp; BucketList mBucketListAtLastPublish; std::vector mLedgerCloseDatas; @@ -217,19 +216,20 @@ class CatchupSimulation VirtualClock::Mode mode = VirtualClock::VIRTUAL_TIME, std::shared_ptr cg = std::make_shared(), - bool startApp = true); + bool startApp = true, + Config::TestDbMode dbMode = Config::TESTDB_IN_MEMORY_OFFERS); ~CatchupSimulation(); Application& getApp() const { - return mApp; + return *mAppPtr; } VirtualClock& getClock() { - return mClock; + return *mClock; } HistoryConfigurator& @@ -249,8 +249,12 @@ class CatchupSimulation void generateRandomLedger(uint32_t version = 0); void ensurePublishesComplete(); - void ensureLedgerAvailable(uint32_t targetLedger); - void ensureOfflineCatchupPossible(uint32_t targetLedger); + void + ensureLedgerAvailable(uint32_t targetLedger, + std::optional restartLedger = std::nullopt); + void ensureOfflineCatchupPossible( + uint32_t targetLedger, + std::optional restartLedger = std::nullopt); void ensureOnlineCatchupPossible(uint32_t targetLedger, uint32_t bufferLedgers = 0); @@ -276,6 +280,7 @@ class CatchupSimulation VirtualClock::duration duration); void setUpgradeLedger(uint32_t ledger, ProtocolVersion upgradeVersion); + void restartApp(); }; } } diff --git a/src/historywork/BatchDownloadWork.cpp b/src/historywork/BatchDownloadWork.cpp index f74a9a6aa4..e446ee2db3 100644 --- a/src/historywork/BatchDownloadWork.cpp +++ b/src/historywork/BatchDownloadWork.cpp @@ -15,12 +15,12 @@ namespace stellar { BatchDownloadWork::BatchDownloadWork(Application& app, CheckpointRange range, - std::string const& type, + FileType const& type, TmpDir const& downloadDir, std::shared_ptr archive) : BatchWork(app, fmt::format(FMT_STRING("batch-download-{:s}-{:08x}-{:08x}"), - type, range.mFirst, range.limit())) + typeString(type), range.mFirst, range.limit())) , mRange(range) , mNext(range.mFirst) , mFileType(type) @@ -34,8 +34,8 @@ BatchDownloadWork::getStatus() const { if (!isDone() && !isAborting()) { - auto task = - fmt::format(FMT_STRING("downloading {:s} files"), mFileType); + auto task = fmt::format(FMT_STRING("downloading {:s} files"), + typeString(mFileType)); return fmtProgress(mApp, task, mRange.getLedgerRange(), mNext); } return BatchWork::getStatus(); @@ -54,7 +54,7 @@ BatchDownloadWork::yieldMoreWork() FileTransferInfo ft(mDownloadDir, mFileType, mNext); CLOG_DEBUG(History, "Downloading and unzipping {} for checkpoint {}", - mFileType, mNext); + typeString(mFileType), mNext); auto getAndUnzip = std::make_shared(mApp, ft, mArchive); mNext += mApp.getHistoryManager().getCheckpointFrequency(); diff --git a/src/historywork/BatchDownloadWork.h b/src/historywork/BatchDownloadWork.h index 3e355599df..de8388932b 100644 --- a/src/historywork/BatchDownloadWork.h +++ b/src/historywork/BatchDownloadWork.h @@ -22,13 +22,13 @@ class BatchDownloadWork : public BatchWork { CheckpointRange const mRange; uint32_t mNext; - std::string const mFileType; + FileType const mFileType; TmpDir const& mDownloadDir; std::shared_ptr mArchive; public: BatchDownloadWork(Application& app, CheckpointRange range, - std::string const& type, TmpDir const& downloadDir, + FileType const& type, TmpDir const& downloadDir, std::shared_ptr archive = nullptr); ~BatchDownloadWork() = default; std::string getStatus() const override; diff --git a/src/historywork/CheckSingleLedgerHeaderWork.cpp b/src/historywork/CheckSingleLedgerHeaderWork.cpp index 8209cc35aa..8d2e40690c 100644 --- a/src/historywork/CheckSingleLedgerHeaderWork.cpp +++ b/src/historywork/CheckSingleLedgerHeaderWork.cpp @@ -59,7 +59,7 @@ CheckSingleLedgerHeaderWork::doReset() uint32_t checkpoint = mApp.getHistoryManager().checkpointContainingLedger( mExpected.header.ledgerSeq); mFt = std::make_unique( - *mDownloadDir, HISTORY_FILE_TYPE_LEDGER, checkpoint); + *mDownloadDir, FileType::HISTORY_FILE_TYPE_LEDGER, checkpoint); } BasicWork::State diff --git a/src/historywork/DownloadBucketsWork.cpp b/src/historywork/DownloadBucketsWork.cpp index 2dcea7ba61..fffc94a0ea 100644 --- a/src/historywork/DownloadBucketsWork.cpp +++ b/src/historywork/DownloadBucketsWork.cpp @@ -71,7 +71,7 @@ DownloadBucketsWork::yieldMoreWork() } auto hash = *mNextBucketIter; - FileTransferInfo ft(mDownloadDir, HISTORY_FILE_TYPE_BUCKET, hash); + FileTransferInfo ft(mDownloadDir, FileType::HISTORY_FILE_TYPE_BUCKET, hash); auto w1 = std::make_shared(mApp, ft, mArchive); auto getFileWeak = std::weak_ptr(w1); diff --git a/src/historywork/DownloadVerifyTxResultsWork.cpp b/src/historywork/DownloadVerifyTxResultsWork.cpp index 958cc824f2..04ada07514 100644 --- a/src/historywork/DownloadVerifyTxResultsWork.cpp +++ b/src/historywork/DownloadVerifyTxResultsWork.cpp @@ -34,7 +34,7 @@ DownloadVerifyTxResultsWork::getStatus() const { auto task = fmt::format(FMT_STRING("Downloading and verifying {:s} files"), - HISTORY_FILE_TYPE_RESULTS); + typeString(FileType::HISTORY_FILE_TYPE_RESULTS)); return fmtProgress(mApp, task, mRange.getLedgerRange(), mCurrCheckpoint); } @@ -62,7 +62,7 @@ DownloadVerifyTxResultsWork::yieldMoreWork() throw std::runtime_error("Nothing to iterate over!"); } - FileTransferInfo ft(mDownloadDir, HISTORY_FILE_TYPE_RESULTS, + FileTransferInfo ft(mDownloadDir, FileType::HISTORY_FILE_TYPE_RESULTS, mCurrCheckpoint); auto w1 = std::make_shared(mApp, ft, mArchive); auto w2 = std::make_shared(mApp, mDownloadDir, diff --git a/src/historywork/FetchRecentQsetsWork.cpp b/src/historywork/FetchRecentQsetsWork.cpp index 6b38200061..1d12a00cbe 100644 --- a/src/historywork/FetchRecentQsetsWork.cpp +++ b/src/historywork/FetchRecentQsetsWork.cpp @@ -62,7 +62,7 @@ FetchRecentQsetsWork::doWork() firstSeq, lastSeq); auto range = CheckpointRange::inclusive(firstSeq, lastSeq, step); mDownloadSCPMessagesWork = addWork( - range, HISTORY_FILE_TYPE_SCP, *mDownloadDir); + range, FileType::HISTORY_FILE_TYPE_SCP, *mDownloadDir); return State::WORK_RUNNING; } else if (mDownloadSCPMessagesWork->getState() != State::WORK_SUCCESS) @@ -75,7 +75,7 @@ FetchRecentQsetsWork::doWork() { CLOG_INFO(History, "Scanning for QSets in checkpoint: {}", i); XDRInputFileStream in; - FileTransferInfo fi(*mDownloadDir, HISTORY_FILE_TYPE_SCP, i); + FileTransferInfo fi(*mDownloadDir, FileType::HISTORY_FILE_TYPE_SCP, i); try { in.open(fi.localPath_nogz()); diff --git a/src/historywork/VerifyTxResultsWork.cpp b/src/historywork/VerifyTxResultsWork.cpp index f957563809..d1c70634e5 100644 --- a/src/historywork/VerifyTxResultsWork.cpp +++ b/src/historywork/VerifyTxResultsWork.cpp @@ -89,9 +89,9 @@ VerifyTxResultsWork::verifyTxResultsOfCheckpoint() ZoneScoped; try { - FileTransferInfo hi(mDownloadDir, HISTORY_FILE_TYPE_LEDGER, + FileTransferInfo hi(mDownloadDir, FileType::HISTORY_FILE_TYPE_LEDGER, mCheckpoint); - FileTransferInfo ri(mDownloadDir, HISTORY_FILE_TYPE_RESULTS, + FileTransferInfo ri(mDownloadDir, FileType::HISTORY_FILE_TYPE_RESULTS, mCheckpoint); mHdrIn.open(hi.localPath_nogz()); mResIn.open(ri.localPath_nogz()); diff --git a/src/historywork/WriteSnapshotWork.cpp b/src/historywork/WriteSnapshotWork.cpp index bd40f26362..36ccb3b7ea 100644 --- a/src/historywork/WriteSnapshotWork.cpp +++ b/src/historywork/WriteSnapshotWork.cpp @@ -44,7 +44,7 @@ WriteSnapshotWork::onRun() auto snap = self->mSnapshot; bool success = true; - if (!snap->writeHistoryBlocks()) + if (!snap->writeSCPMessages()) { success = false; } diff --git a/src/historywork/WriteVerifiedCheckpointHashesWork.cpp b/src/historywork/WriteVerifiedCheckpointHashesWork.cpp index 200c0f9e64..45642878b1 100644 --- a/src/historywork/WriteVerifiedCheckpointHashesWork.cpp +++ b/src/historywork/WriteVerifiedCheckpointHashesWork.cpp @@ -122,7 +122,8 @@ WriteVerifiedCheckpointHashesWork::yieldMoreWork() auto tmpDir = std::make_shared( mApp.getTmpDirManager().tmpDir("verify-" + checkpointStr)); auto getWork = std::make_shared( - mApp, checkpointRange, HISTORY_FILE_TYPE_LEDGER, *tmpDir, mArchive); + mApp, checkpointRange, FileType::HISTORY_FILE_TYPE_LEDGER, *tmpDir, + mArchive); // When we have a previous-work, we grab a future attached to the promise it // will fulfill when it runs. This promise might not have a value _yet_ but diff --git a/src/ledger/LedgerHeaderUtils.cpp b/src/ledger/LedgerHeaderUtils.cpp index 372915d1f0..0835439355 100644 --- a/src/ledger/LedgerHeaderUtils.cpp +++ b/src/ledger/LedgerHeaderUtils.cpp @@ -7,6 +7,7 @@ #include "crypto/SHA.h" #include "database/Database.h" #include "database/DatabaseUtils.h" +#include "history/CheckpointBuilder.h" #include "util/Decoder.h" #include "util/GlobalChecks.h" #include "util/Logging.h" @@ -137,6 +138,25 @@ loadByHash(Database& db, Hash const& hash) return lhPtr; } +uint32_t +loadMaxLedgerSeq(Database& db) +{ + ZoneScoped; + uint32_t seq = 0; + soci::indicator maxIndicator; + auto prep = + db.getPreparedStatement("SELECT MAX(ledgerseq) FROM ledgerheaders"); + auto& st = prep.statement(); + st.exchange(soci::into(seq, maxIndicator)); + st.define_and_bind(); + st.execute(true); + if (maxIndicator == soci::indicator::i_ok) + { + return seq; + } + return 0; +} + std::shared_ptr loadBySequence(Database& db, soci::session& sess, uint32_t seq) { @@ -175,17 +195,9 @@ deleteOldEntries(Database& db, uint32_t ledgerSeq, uint32_t count) "ledgerheaders", "ledgerseq"); } -void -deleteNewerEntries(Database& db, uint32_t ledgerSeq) -{ - ZoneScoped; - DatabaseUtils::deleteNewerEntriesHelper(db.getSession(), ledgerSeq, - "ledgerheaders", "ledgerseq"); -} - size_t copyToStream(Database& db, soci::session& sess, uint32_t ledgerSeq, - uint32_t ledgerCount, XDROutputFileStream& headersOut) + uint32_t ledgerCount, CheckpointBuilder& checkpointBuilder) { ZoneNamedN(selectLedgerHeadersZone, "select ledgerheaders history", true); uint32_t begin = ledgerSeq, end = ledgerSeq + ledgerCount; @@ -207,7 +219,8 @@ copyToStream(Database& db, soci::session& sess, uint32_t ledgerSeq, lhe.header = decodeFromData(headerEncoded); lhe.hash = xdrSha256(lhe.header); CLOG_DEBUG(Ledger, "Streaming ledger-header {}", lhe.header.ledgerSeq); - headersOut.writeOne(lhe); + checkpointBuilder.appendLedgerHeader(lhe.header, + /* skipStartupCheck */ true); ++n; st.fetch(); } diff --git a/src/ledger/LedgerHeaderUtils.h b/src/ledger/LedgerHeaderUtils.h index 49b02adee8..e165570e1a 100644 --- a/src/ledger/LedgerHeaderUtils.h +++ b/src/ledger/LedgerHeaderUtils.h @@ -9,7 +9,7 @@ namespace stellar { -class XDROutputFileStream; +class CheckpointBuilder; namespace LedgerHeaderUtils { @@ -27,11 +27,12 @@ std::shared_ptr loadByHash(Database& db, Hash const& hash); std::shared_ptr loadBySequence(Database& db, soci::session& sess, uint32_t seq); +uint32_t loadMaxLedgerSeq(Database& db); + void deleteOldEntries(Database& db, uint32_t ledgerSeq, uint32_t count); -void deleteNewerEntries(Database& db, uint32_t ledgerSeq); size_t copyToStream(Database& db, soci::session& sess, uint32_t ledgerSeq, - uint32_t ledgerCount, XDROutputFileStream& headersOut); + uint32_t ledgerCount, CheckpointBuilder& checkpointBuilder); void dropAll(Database& db); } diff --git a/src/ledger/LedgerManager.h b/src/ledger/LedgerManager.h index 88d0ca8bb8..710ca7271d 100644 --- a/src/ledger/LedgerManager.h +++ b/src/ledger/LedgerManager.h @@ -184,12 +184,6 @@ class LedgerManager virtual void deleteOldEntries(Database& db, uint32_t ledgerSeq, uint32_t count) = 0; - // cleans historical data newer than ledgerSeq - // as this is used when applying buckets, the data is deleted such that: - // ledgerheaders >= ledgerSeq - // everything else > ledgerSeq - virtual void deleteNewerEntries(Database& db, uint32_t ledgerSeq) = 0; - virtual void setLastClosedLedger(LedgerHeaderHistoryEntry const& lastClosed, bool storeInDB) = 0; diff --git a/src/ledger/LedgerManagerImpl.cpp b/src/ledger/LedgerManagerImpl.cpp index 264781e82c..8e7c5841b2 100644 --- a/src/ledger/LedgerManagerImpl.cpp +++ b/src/ledger/LedgerManagerImpl.cpp @@ -392,6 +392,11 @@ LedgerManagerImpl::loadLastKnownLedger(bool restoreBucketlist, // Step 4. Restore LedgerManager's internal state advanceLedgerPointers(*latestLedgerHeader); + // Maybe truncate checkpoint files if we're restarting after a crash + // in closeLedger (in which case any modifications to the ledger state have + // been rolled back) + mApp.getHistoryManager().restoreCheckpoint(latestLedgerHeader->ledgerSeq); + if (protocolVersionStartsFrom(latestLedgerHeader->ledgerVersion, SOROBAN_PROTOCOL_VERSION)) { @@ -918,8 +923,9 @@ LedgerManagerImpl::closeLedger(LedgerCloseData const& ledgerData) ledgerCloseMeta); if (mApp.getConfig().MODE_STORES_HISTORY_MISC) { - storeTxSet(mApp.getDatabase(), ltx.loadHeader().current().ledgerSeq, - *txSet); + auto ledgerSeq = ltx.loadHeader().current().ledgerSeq; + mApp.getHistoryManager().appendTransactionSet(ledgerSeq, txSet, + txResultSet); } ltx.loadHeader().current().txSetResultHash = xdrSha256(txResultSet); @@ -955,7 +961,6 @@ LedgerManagerImpl::closeLedger(LedgerCloseData const& ledgerData) LedgerTxn ltxUpgrade(ltx); Upgrades::applyTo(lupgrade, mApp, ltxUpgrade); - auto ledgerSeq = ltxUpgrade.loadHeader().current().ledgerSeq; LedgerEntryChanges changes = ltxUpgrade.getChanges(); if (ledgerCloseMeta) { @@ -965,14 +970,6 @@ LedgerManagerImpl::closeLedger(LedgerCloseData const& ledgerData) uem.upgrade = lupgrade; uem.changes = changes; } - // Note: Index from 1 rather than 0 to match the behavior of - // storeTransaction and storeTransactionFee. - if (mApp.getConfig().MODE_STORES_HISTORY_MISC) - { - Upgrades::storeUpgradeHistory(getDatabase(), ledgerSeq, - lupgrade, changes, - static_cast(i + 1)); - } ltxUpgrade.commit(); } catch (std::runtime_error& e) @@ -1028,16 +1025,21 @@ LedgerManagerImpl::closeLedger(LedgerCloseData const& ledgerData) // // 2. Commit the current transaction. // - // 3. Start background eviction scan for the next ledger, _after_ the commit - // so that it takes its snapshot of network setting from the - // committed state. + // 3. Finalize any new checkpoint files _after_ the commit. If a crash + // occurs + // between commit and this step, core will attempt finalizing files again + // on restart. // // 4. Start any queued checkpoint publishing, _after_ the commit so that // it takes its snapshot of history-rows from the committed state, but // _before_ we GC any buckets (because this is the step where the // bucket refcounts are incremented for the duration of the publish). // - // 5. GC unreferenced buckets. Only do this once publishes are in progress. + // 5. Start background eviction scan for the next ledger, _after_ the commit + // so that it takes its snapshot of network setting from the + // committed state. + // + // 6. GC unreferenced buckets. Only do this once publishes are in progress. // step 1 auto& hm = mApp.getHistoryManager(); @@ -1046,7 +1048,18 @@ LedgerManagerImpl::closeLedger(LedgerCloseData const& ledgerData) // step 2 ltx.commit(); +#ifdef BUILD_TESTS + mLatestTxResultSet = txResultSet; +#endif + // step 3 + hm.maybeCheckpointComplete(); + + // step 4 + hm.publishQueuedHistory(); + hm.logAndUpdatePublishStatus(); + + // step 5 if (protocolVersionStartsFrom(initialLedgerVers, SOROBAN_PROTOCOL_VERSION) && mApp.getConfig().isUsingBackgroundEviction()) @@ -1054,11 +1067,7 @@ LedgerManagerImpl::closeLedger(LedgerCloseData const& ledgerData) mApp.getBucketManager().startBackgroundEvictionScan(ledgerSeq + 1); } - // step 4 - hm.publishQueuedHistory(); - hm.logAndUpdatePublishStatus(); - - // step 5 + // step 6 mApp.getBucketManager().forgetUnreferencedBuckets(); if (!mApp.getConfig().OP_APPLY_SLEEP_TIME_WEIGHT_FOR_TESTING.empty()) @@ -1100,28 +1109,7 @@ LedgerManagerImpl::deleteOldEntries(Database& db, uint32_t ledgerSeq, soci::transaction txscope(db.getSession()); db.clearPreparedStatementCache(); LedgerHeaderUtils::deleteOldEntries(db, ledgerSeq, count); - deleteOldTransactionHistoryEntries(db, ledgerSeq, count); HerderPersistence::deleteOldEntries(db, ledgerSeq, count); - Upgrades::deleteOldEntries(db, ledgerSeq, count); - db.clearPreparedStatementCache(); - txscope.commit(); -} - -void -LedgerManagerImpl::deleteNewerEntries(Database& db, uint32_t ledgerSeq) -{ - ZoneScoped; - soci::transaction txscope(db.getSession()); - db.clearPreparedStatementCache(); - - // as we use this method only when we apply buckets, we have to preserve - // data for everything but ledger header - LedgerHeaderUtils::deleteNewerEntries(db, ledgerSeq); - // for other data we delete data *after* - ++ledgerSeq; - deleteNewerTransactionHistoryEntries(db, ledgerSeq); - HerderPersistence::deleteNewerEntries(db, ledgerSeq); - Upgrades::deleteNewerEntries(db, ledgerSeq); db.clearPreparedStatementCache(); txscope.commit(); } @@ -1134,7 +1122,8 @@ LedgerManagerImpl::setLastClosedLedger( LedgerTxn ltx(mApp.getLedgerTxnRoot()); auto header = ltx.loadHeader(); header.current() = lastClosed.header; - storeCurrentLedger(header.current(), storeInDB); + storeCurrentLedger(header.current(), storeInDB, + /* appendToCheckpoint */ false); ltx.commit(); mRebuildInMemoryState = false; @@ -1599,22 +1588,7 @@ LedgerManagerImpl::applyTransactions( tm.getXDR(), std::move(results), index); } - // Then finally store the results and meta into the txhistory table. - // if we're running in a mode that has one. - // - // Note to future: when we eliminate the txhistory for archiving, the - // next step can be removed. - // - // Also note: for historical reasons the history tables number - // txs counting from 1, not 0. We preserve this for the time being - // in case anyone depends on it. ++index; - if (mApp.getConfig().MODE_STORES_HISTORY_MISC) - { - auto ledgerSeq = ltx.loadHeader().current().ledgerSeq; - storeTransaction(mApp.getDatabase(), ledgerSeq, tx, tm.getXDR(), - txResultSet, mApp.getConfig()); - } } mTransactionApplySucceeded.inc(txSucceeded); @@ -1641,7 +1615,7 @@ LedgerManagerImpl::logTxApplyMetrics(AbstractLedgerTxn& ltx, size_t numTxs, void LedgerManagerImpl::storeCurrentLedger(LedgerHeader const& header, - bool storeHeader) + bool storeHeader, bool appendToCheckpoint) { ZoneScoped; @@ -1666,6 +1640,10 @@ LedgerManagerImpl::storeCurrentLedger(LedgerHeader const& header, if (mApp.getConfig().MODE_STORES_HISTORY_LEDGERHEADERS && storeHeader) { LedgerHeaderUtils::storeInDatabase(mApp.getDatabase(), header); + if (appendToCheckpoint) + { + mApp.getHistoryManager().appendLedgerHeader(header); + } } } @@ -1768,7 +1746,8 @@ LedgerManagerImpl::ledgerClosed( ltx.unsealHeader([this](LedgerHeader& lh) { mApp.getBucketManager().snapshotLedger(lh); - storeCurrentLedger(lh, /* storeHeader */ true); + storeCurrentLedger(lh, /* storeHeader */ true, + /* appendToCheckpoint */ true); advanceLedgerPointers(lh); }); } diff --git a/src/ledger/LedgerManagerImpl.h b/src/ledger/LedgerManagerImpl.h index a5b1ae860a..1d16f93f4e 100644 --- a/src/ledger/LedgerManagerImpl.h +++ b/src/ledger/LedgerManagerImpl.h @@ -94,7 +94,8 @@ class LedgerManagerImpl : public LedgerManager std::unique_ptr const& ledgerCloseMeta, uint32_t initialLedgerVers); - void storeCurrentLedger(LedgerHeader const& header, bool storeHeader); + void storeCurrentLedger(LedgerHeader const& header, bool storeHeader, + bool appendToCheckpoint); void prefetchTransactionData(std::vector const& txs); void prefetchTxSourceIds(std::vector const& txs); @@ -163,6 +164,7 @@ class LedgerManagerImpl : public LedgerManager SorobanNetworkConfig& getMutableSorobanNetworkConfig() override; std::vector const& getLastClosedLedgerTxMeta() override; + TransactionResultSet mLatestTxResultSet{}; #endif uint64_t secondsSinceLastLedgerClose() const override; @@ -190,8 +192,6 @@ class LedgerManagerImpl : public LedgerManager void deleteOldEntries(Database& db, uint32_t ledgerSeq, uint32_t count) override; - void deleteNewerEntries(Database& db, uint32_t ledgerSeq) override; - void setLastClosedLedger(LedgerHeaderHistoryEntry const& lastClosed, bool storeInDB) override; diff --git a/src/ledger/readme.md b/src/ledger/readme.md index 84a3832968..88d6176931 100644 --- a/src/ledger/readme.md +++ b/src/ledger/readme.md @@ -189,11 +189,6 @@ Some tables are used as queues to other subsystems: LedgerHeader contains the ledger headers that were produced by the "closeLedger" method in LedgerManager. -TxHistory contains the record of all transactions applied to all ledgers that -were closed. -See [`src/transactions/TransactionFrame.cpp`](../transactions/TransactionFrame.cpp) -for more detail. - ## BucketManager The final LedgerDelta generated by closing the ledger is fed into the BucketManager to add it to the "L0" bucket. diff --git a/src/main/ApplicationImpl.cpp b/src/main/ApplicationImpl.cpp index 2a42a6485a..639cdb50d8 100644 --- a/src/main/ApplicationImpl.cpp +++ b/src/main/ApplicationImpl.cpp @@ -886,7 +886,7 @@ ApplicationImpl::validateAndLogConfig() "and RUN_STANDALONE is not set"); } - if (getHistoryArchiveManager().hasAnyWritableHistoryArchive()) + if (getHistoryArchiveManager().publishEnabled()) { if (!mConfig.modeStoresAllHistory()) { diff --git a/src/main/CommandLine.cpp b/src/main/CommandLine.cpp index db4bdd4896..a59df33fde 100644 --- a/src/main/CommandLine.cpp +++ b/src/main/CommandLine.cpp @@ -797,7 +797,6 @@ runCatchup(CommandLineArgs const& args) std::string trustedCheckpointHashesFile; bool completeValidation = false; bool inMemory = false; - bool forceBack = false; bool forceUntrusted = false; std::string hash; std::string stream; @@ -845,12 +844,6 @@ runCatchup(CommandLineArgs const& args) "verify all files from the archive for the catchup range"); }; - auto forceBackParser = [](bool& forceBackClean) { - return clara::Opt{forceBackClean}["--force-back"]( - "force ledger state to a previous state, preserving older " - "historical data"); - }; - return runWithHelp( args, {configurationParser(configOption), catchupStringParser, @@ -859,7 +852,7 @@ runCatchup(CommandLineArgs const& args) outputFileParser(outputFile), disableBucketGCParser(disableBucketGC), validationParser(completeValidation), inMemoryParser(inMemory), ledgerHashParser(hash), forceUntrustedCatchup(forceUntrusted), - metadataOutputStreamParser(stream), forceBackParser(forceBack)}, + metadataOutputStreamParser(stream)}, [&] { auto config = configOption.getConfig(); // Don't call config.setNoListen() here as we might want to @@ -931,49 +924,7 @@ runCatchup(CommandLineArgs const& args) cc = CatchupConfiguration(pair, cc.count(), cc.mode()); } - if (forceBack) - { - CatchupRange range(LedgerManager::GENESIS_LEDGER_SEQ, cc, - app->getHistoryManager()); - LOG_INFO(DEFAULT_LOG, "Force applying range {}-{}", - range.first(), range.last()); - if (!range.applyBuckets()) - { - throw std::runtime_error( - "force can only be used when buckets get applied"); - } - // by dropping persistent state, we ensure that we don't - // leave the database in some half-reset state until - // startNewLedger completes later on - { - auto& ps = app->getPersistentState(); - ps.setState(PersistentState::kLastClosedLedger, ""); - ps.setState(PersistentState::kHistoryArchiveState, ""); - ps.setState(PersistentState::kLastSCPData, ""); - ps.setState(PersistentState::kLastSCPDataXDR, ""); - ps.setState(PersistentState::kLedgerUpgrades, ""); - } - - LOG_INFO( - DEFAULT_LOG, - "Cleaning historical data (this may take a while)"); - auto& lm = app->getLedgerManager(); - lm.deleteNewerEntries(app->getDatabase(), range.first()); - // checkpoints - app->getHistoryManager().deleteCheckpointsNewerThan( - range.first()); - - // need to delete genesis ledger data (so that we can reset - // to it) - lm.deleteOldEntries(app->getDatabase(), - LedgerManager::GENESIS_LEDGER_SEQ, 1); - LOG_INFO( - DEFAULT_LOG, - "Resetting ledger state to genesis before catching up"); - app->resetLedgerState(); - lm.startNewLedger(); - } - else if (hash.empty() && !forceUntrusted) + if (hash.empty() && !forceUntrusted) { CLOG_WARNING( History, diff --git a/src/main/Maintainer.h b/src/main/Maintainer.h index 2105960008..e7b0b4ce37 100644 --- a/src/main/Maintainer.h +++ b/src/main/Maintainer.h @@ -21,7 +21,7 @@ class Maintainer // start automatic maintenance according to app.getConfig() void start(); - // removes maximum count entries from tables like txhistory or scphistory + // removes maximum count entries from tables like scphistory void performMaintenance(uint32_t count); private: diff --git a/src/main/test/ApplicationUtilsTests.cpp b/src/main/test/ApplicationUtilsTests.cpp index bc3e1e8451..bdbf6942ea 100644 --- a/src/main/test/ApplicationUtilsTests.cpp +++ b/src/main/test/ApplicationUtilsTests.cpp @@ -4,6 +4,7 @@ #include "crypto/Random.h" #include "history/HistoryArchiveManager.h" +#include "history/HistoryManagerImpl.h" #include "history/test/HistoryTestsUtils.h" #include "invariant/BucketListIsConsistentWithDatabase.h" #include "ledger/LedgerTxn.h" @@ -148,7 +149,6 @@ class SimulationHelper Config& mTestCfg; PublicKey mMainNodeID; PublicKey mTestNodeID; - SecretKey mTestNodeSecretKey; SCPQuorumSet mQuorum; TmpDirHistoryConfigurator mHistCfg; @@ -161,14 +161,10 @@ class SimulationHelper std::make_shared(Simulation::OVER_LOOPBACK, networkID); // Main node never shuts down, publishes checkpoints for test node - const Hash mainNodeSeed = sha256("NODE_SEED_MAIN"); - const SecretKey mainNodeSecretKey = SecretKey::fromSeed(mainNodeSeed); - mMainNodeID = mainNodeSecretKey.getPublicKey(); + mMainNodeID = mMainCfg.NODE_SEED.getPublicKey(); // Test node may shutdown, lose sync, etc. - const Hash testNodeSeed = sha256("NODE_SEED_SECONDARY"); - mTestNodeSecretKey = SecretKey::fromSeed(testNodeSeed); - mTestNodeID = mTestNodeSecretKey.getPublicKey(); + mTestNodeID = testCfg.NODE_SEED.getPublicKey(); mQuorum.threshold = 1; mQuorum.validators.push_back(mMainNodeID); @@ -193,11 +189,12 @@ class SimulationHelper // main validator mTestCfg = mHistCfg.configure(mTestCfg, /* writable */ false); - mMainNode = mSimulation->addNode(mainNodeSecretKey, mQuorum, &mMainCfg); + mMainNode = + mSimulation->addNode(mMainCfg.NODE_SEED, mQuorum, &mMainCfg); mMainNode->getHistoryArchiveManager().initializeHistoryArchive( mHistCfg.getArchiveDirName()); - mSimulation->addNode(mTestNodeSecretKey, mQuorum, &mTestCfg); + mSimulation->addNode(testCfg.NODE_SEED, mQuorum, &mTestCfg); mSimulation->addPendingConnection(mMainNodeID, mTestNodeID); mSimulation->startAllNodes(); @@ -333,7 +330,7 @@ class SimulationHelper std::chrono::seconds(delayBuckets); // Start test app - auto app = mSimulation->addNode(mTestNodeSecretKey, mQuorum, &mTestCfg, + auto app = mSimulation->addNode(mTestCfg.NODE_SEED, mQuorum, &mTestCfg, false, startFromLedger, startFromHash); mSimulation->addPendingConnection(mMainNodeID, mTestNodeID); REQUIRE(app); @@ -368,7 +365,7 @@ class SimulationHelper // State has been rebuilt and node is properly in sync REQUIRE(checkState(*app)); - REQUIRE(app->getLedgerManager().getLastClosedLedgerNum() == + REQUIRE(app->getLedgerManager().getLastClosedLedgerNum() + 1 >= getMainNodeLCL().header.ledgerSeq); REQUIRE(app->getLedgerManager().isSynced()); @@ -478,8 +475,8 @@ TEST_CASE("offline self-check works", "[applicationutils][selfcheck]") { // Damage the target ledger in the archive. auto path = archPath; - path /= - fs::remoteName(HISTORY_FILE_TYPE_LEDGER, fs::hexStr(l1), "xdr.gz"); + path /= fs::remoteName(typeString(FileType::HISTORY_FILE_TYPE_LEDGER), + fs::hexStr(l1), "xdr.gz"); TemporaryFileDamager damage(path); damage.damageVictim(); REQUIRE(selfCheck(chkConfig) == 1); diff --git a/src/rust/Cargo.toml b/src/rust/Cargo.toml index c96cf57e8e..5eeb748d14 100644 --- a/src/rust/Cargo.toml +++ b/src/rust/Cargo.toml @@ -2,7 +2,7 @@ name = "stellar-core" version = "0.1.0" edition = "2021" -rust-version = "1.81.0" +rust-version = "1.82.0" publish = false [lib] diff --git a/src/test/TxTests.cpp b/src/test/TxTests.cpp index 732a6fd376..12dfe7b79e 100644 --- a/src/test/TxTests.cpp +++ b/src/test/TxTests.cpp @@ -564,7 +564,8 @@ closeLedgerOn(Application& app, uint32 ledgerSeq, TimePoint closeTime, app.getHerder().externalizeValue(txSet.first, ledgerSeq, closeTime, emptyUpgradeSteps); REQUIRE(app.getLedgerManager().getLastClosedLedgerNum() == ledgerSeq); - return getTransactionHistoryResults(app.getDatabase(), ledgerSeq); + auto& lm = static_cast(app.getLedgerManager()); + return lm.mLatestTxResultSet; } TransactionResultSet @@ -584,7 +585,8 @@ closeLedgerOn(Application& app, uint32 ledgerSeq, time_t closeTime, app.getHerder().externalizeValue(txSet, ledgerSeq, closeTime, emptyUpgradeSteps); - auto z1 = getTransactionHistoryResults(app.getDatabase(), ledgerSeq); + auto& lm = static_cast(app.getLedgerManager()); + auto z1 = lm.mLatestTxResultSet; REQUIRE(app.getLedgerManager().getLastClosedLedgerNum() == ledgerSeq); diff --git a/src/transactions/TransactionSQL.cpp b/src/transactions/TransactionSQL.cpp index ff37172218..c35a725f36 100644 --- a/src/transactions/TransactionSQL.cpp +++ b/src/transactions/TransactionSQL.cpp @@ -7,6 +7,7 @@ #include "database/Database.h" #include "database/DatabaseUtils.h" #include "herder/TxSetFrame.h" +#include "history/CheckpointBuilder.h" #include "ledger/LedgerHeaderUtils.h" #include "main/Application.h" #include "util/Decoder.h" @@ -21,99 +22,6 @@ namespace stellar namespace { using namespace xdr; -// XDR marshaller that replaces all the transaction envelopes with their full -// hashes. This is needed because we store the transaction envelopes separately -// and use for a different purpose, so trimming them from -// GeneralizedTransactionSet allows to avoid duplication. -class GeneralizedTxSetPacker -{ - public: - GeneralizedTxSetPacker(GeneralizedTransactionSet const& xdrTxSet) - : mBuffer(xdr_argpack_size(xdrTxSet)) - { - } - - std::vector const& - getResult() - { - mBuffer.resize(mCurrIndex); - return mBuffer; - } - - template - typename std::enable_if< - std::is_same::uint_type>::value>::type - operator()(T t) - { - putBytes(&t, 4); - } - - template - typename std::enable_if< - std::is_same::uint_type>::value>::type - operator()(T t) - { - putBytes(&t, 8); - } - - template - typename std::enable_if::is_bytes>::type - operator()(T const& t) - { - if (xdr_traits::variable_nelem) - { - uint32_t size = size32(t.size()); - putBytes(&size, 4); - } - putBytes(t.data(), t.size()); - } - - template - typename std::enable_if::value && - (xdr_traits::is_class || - xdr_traits::is_container)>::type - operator()(T const& t) - { - xdr_traits::save(*this, t); - } - - template - typename std::enable_if::value>::type - operator()(T const& tx) - { - Hash hash = xdrSha256(tx); - putBytes(hash.data(), hash.size()); - } - - private: - // While unlikely, there is a possible edge case where `sum(hash sizes) > - // sum(TransactionEnvelope sizes)`. Hence instead of a strict size check we - // extend the buffer. - void - maybeExtend(size_t byBytes) - { - if (mCurrIndex + byBytes > mBuffer.size()) - { - mBuffer.resize(mCurrIndex + byBytes); - } - } - - void - putBytes(void const* buf, size_t len) - { - if (len == 0) - { - return; - } - maybeExtend(len); - std::memcpy(mBuffer.data() + mCurrIndex, buf, len); - mCurrIndex += len; - } - - std::vector mBuffer; - size_t mCurrIndex = 0; -}; - // XDR un-marshaller that reconstructs the GeneralizedTxSet using the trimmed // result of GeneralizedTxSetPacker and the actual TransactionEnvelopes. class GeneralizedTxSetUnpacker @@ -246,8 +154,8 @@ void writeNonGeneralizedTxSetToStream( Database& db, soci::session& sess, uint32 ledgerSeq, std::vector const& txs, - TransactionHistoryResultEntry& results, XDROutputFileStream& txOut, - XDROutputFileStream& txResultOut) + TransactionHistoryResultEntry& results, + CheckpointBuilder& checkpointBuilder) { ZoneScoped; // prepare the txset for saving @@ -261,8 +169,8 @@ writeNonGeneralizedTxSetToStream( TransactionHistoryEntry hist; hist.ledgerSeq = ledgerSeq; txSet->toXDR(hist.txSet); - txOut.writeOne(hist); - txResultOut.writeOne(results); + checkpointBuilder.appendTransactionSet(ledgerSeq, hist, results.txResultSet, + /* skipStartupCheck */ true); } void @@ -282,8 +190,7 @@ writeGeneralizedTxSetToStream(uint32 ledgerSeq, std::vector const& encodedTxSet, std::vector const& txs, TransactionHistoryResultEntry& results, - XDROutputFileStream& txOut, - XDROutputFileStream& txResultOut) + CheckpointBuilder& checkpointBuilder) { ZoneScoped; UnorderedMap txByHash; @@ -298,8 +205,8 @@ writeGeneralizedTxSetToStream(uint32 ledgerSeq, hist.ext.v(1); xdr_argpack_archive(unpacker, hist.ext.generalizedTxSet()); - txOut.writeOne(hist); - txResultOut.writeOne(results); + checkpointBuilder.appendTransactionSet(ledgerSeq, hist, results.txResultSet, + /* skipStartupCheck */ true); } void @@ -309,8 +216,8 @@ writeTxSetToStream( std::vector>>::const_iterator& encodedTxSetIt, std::vector const& txs, - TransactionHistoryResultEntry& results, XDROutputFileStream& txOut, - XDROutputFileStream& txResultOut) + TransactionHistoryResultEntry& results, + CheckpointBuilder& checkpointBuilder) { ZoneScoped; @@ -322,7 +229,7 @@ writeTxSetToStream( if (encodedTxSets.empty() || ledgerSeq < encodedTxSets.front().first) { writeNonGeneralizedTxSetToStream(db, sess, ledgerSeq, txs, results, - txOut, txResultOut); + checkpointBuilder); } else { @@ -340,137 +247,17 @@ writeTxSetToStream( "Could not find tx set corresponding to the ledger."); } writeGeneralizedTxSetToStream(ledgerSeq, encodedTxSetIt->second, txs, - results, txOut, txResultOut); + results, checkpointBuilder); ++encodedTxSetIt; } } } // namespace -void -storeTransaction(Database& db, uint32_t ledgerSeq, - TransactionFrameBasePtr const& tx, TransactionMeta const& tm, - TransactionResultSet const& resultSet, Config const& cfg) -{ - ZoneScoped; - std::string txBody = - decoder::encode_b64(xdr::xdr_to_opaque(tx->getEnvelope())); - std::string txResult = - decoder::encode_b64(xdr::xdr_to_opaque(resultSet.results.back())); - std::string meta = decoder::encode_b64(xdr::xdr_to_opaque(tm)); - - std::string txIDString = binToHex(tx->getContentsHash()); - uint32_t txIndex = static_cast(resultSet.results.size()); - - std::string sqlStr; - if (cfg.isUsingBucketListDB()) - { - sqlStr = "INSERT INTO txhistory " - "( txid, ledgerseq, txindex, txbody, txresult) VALUES " - "(:id, :seq, :txindex, :txb, :txres)"; - } - else - { - sqlStr = - "INSERT INTO txhistory " - "( txid, ledgerseq, txindex, txbody, txresult, txmeta) VALUES " - "(:id, :seq, :txindex, :txb, :txres, :meta)"; - } - - auto prep = db.getPreparedStatement(sqlStr); - auto& st = prep.statement(); - st.exchange(soci::use(txIDString)); - st.exchange(soci::use(ledgerSeq)); - st.exchange(soci::use(txIndex)); - st.exchange(soci::use(txBody)); - st.exchange(soci::use(txResult)); - - if (!cfg.isUsingBucketListDB()) - { - st.exchange(soci::use(meta)); - } - - st.define_and_bind(); - { - auto timer = db.getInsertTimer("txhistory"); - st.execute(true); - } - - if (st.get_affected_rows() != 1) - { - throw std::runtime_error("Could not update data in SQL"); - } -} - -void -storeTxSet(Database& db, uint32_t ledgerSeq, TxSetXDRFrame const& txSet) -{ - ZoneScoped; - if (!txSet.isGeneralizedTxSet()) - { - return; - } - GeneralizedTransactionSet xdrTxSet; - txSet.toXDR(xdrTxSet); - GeneralizedTxSetPacker txSetPacker(xdrTxSet); - xdr::xdr_argpack_archive(txSetPacker, xdrTxSet); - std::string trimmedTxSet = decoder::encode_b64(txSetPacker.getResult()); - - auto prep = db.getPreparedStatement("INSERT INTO txsethistory " - "( ledgerseq, txset) VALUES " - "(:seq, :txset)"); - - auto& st = prep.statement(); - st.exchange(soci::use(ledgerSeq)); - st.exchange(soci::use(trimmedTxSet)); - st.define_and_bind(); - { - auto timer = db.getInsertTimer("txsethistory"); - st.execute(true); - } - - if (st.get_affected_rows() != 1) - { - throw std::runtime_error("Could not update data in SQL"); - } -} - -TransactionResultSet -getTransactionHistoryResults(Database& db, uint32 ledgerSeq) -{ - ZoneScoped; - TransactionResultSet res; - std::string txresult64; - auto prep = - db.getPreparedStatement("SELECT txresult FROM txhistory " - "WHERE ledgerseq = :lseq ORDER BY txindex ASC"); - auto& st = prep.statement(); - - st.exchange(soci::use(ledgerSeq)); - st.exchange(soci::into(txresult64)); - st.define_and_bind(); - st.execute(true); - while (st.got_data()) - { - std::vector result; - decoder::decode_b64(txresult64, result); - - res.results.emplace_back(); - TransactionResultPair& p = res.results.back(); - - xdr::xdr_get g(&result.front(), &result.back() + 1); - xdr_argpack_archive(g, p); - - st.fetch(); - } - return res; -} - size_t -copyTransactionsToStream(Application& app, soci::session& sess, - uint32_t ledgerSeq, uint32_t ledgerCount, - XDROutputFileStream& txOut, - XDROutputFileStream& txResultOut) +populateCheckpointFilesFromDB(Application& app, soci::session& sess, + uint32_t ledgerSeq, uint32_t ledgerCount, + CheckpointBuilder& checkpointBuilder) { ZoneScoped; @@ -514,8 +301,7 @@ copyTransactionsToStream(Application& app, soci::session& sess, if (curLedgerSeq != lastLedgerSeq) { writeTxSetToStream(db, sess, lastLedgerSeq, encodedTxSets, - encodedTxSetIt, txs, results, txOut, - txResultOut); + encodedTxSetIt, txs, results, checkpointBuilder); // reset state txs.clear(); results.ledgerSeq = curLedgerSeq; @@ -553,73 +339,29 @@ copyTransactionsToStream(Application& app, soci::session& sess, if (n != 0) { writeTxSetToStream(db, sess, lastLedgerSeq, encodedTxSets, - encodedTxSetIt, txs, results, txOut, txResultOut); + encodedTxSetIt, txs, results, checkpointBuilder); } return n; } void -createTxSetHistoryTable(Database& db) -{ - db.getSession() << "DROP TABLE IF EXISTS txsethistory"; - db.getSession() << "CREATE TABLE txsethistory (" - "ledgerseq INT NOT NULL CHECK (ledgerseq >= 0)," - "txset TEXT NOT NULL," - "PRIMARY KEY (ledgerseq)" - ")"; -} - -void -deprecateTransactionFeeHistory(Database& db) +dropSupportTransactionFeeHistory(Database& db) { ZoneScoped; db.getSession() << "DROP TABLE IF EXISTS txfeehistory"; } void -dropTransactionHistory(Database& db, Config const& cfg) +dropSupportTxSetHistory(Database& db) { ZoneScoped; - db.getSession() << "DROP TABLE IF EXISTS txhistory"; - - // txmeta only supported when BucketListDB is not enabled - std::string txMetaColumn = - cfg.isUsingBucketListDB() ? "" : "txmeta TEXT NOT NULL,"; - - db.getSession() << "CREATE TABLE txhistory (" - "txid CHARACTER(64) NOT NULL," - "ledgerseq INT NOT NULL CHECK (ledgerseq >= 0)," - "txindex INT NOT NULL," - "txbody TEXT NOT NULL," - "txresult TEXT NOT NULL," + - txMetaColumn + - "PRIMARY KEY (ledgerseq, txindex)" - ")"; - - db.getSession() << "CREATE INDEX histbyseq ON txhistory (ledgerseq);"; - - createTxSetHistoryTable(db); -} - -void -deleteOldTransactionHistoryEntries(Database& db, uint32_t ledgerSeq, - uint32_t count) -{ - ZoneScoped; - DatabaseUtils::deleteOldEntriesHelper(db.getSession(), ledgerSeq, count, - "txhistory", "ledgerseq"); - DatabaseUtils::deleteOldEntriesHelper(db.getSession(), ledgerSeq, count, - "txsethistory", "ledgerseq"); + db.getSession() << "DROP TABLE IF EXISTS txsethistory"; } void -deleteNewerTransactionHistoryEntries(Database& db, uint32_t ledgerSeq) +dropSupportTxHistory(Database& db) { ZoneScoped; - DatabaseUtils::deleteNewerEntriesHelper(db.getSession(), ledgerSeq, - "txhistory", "ledgerseq"); - DatabaseUtils::deleteNewerEntriesHelper(db.getSession(), ledgerSeq, - "txsethistory", "ledgerseq"); + db.getSession() << "DROP TABLE IF EXISTS txhistory"; } - } diff --git a/src/transactions/TransactionSQL.h b/src/transactions/TransactionSQL.h index b63439d709..9b14ae38b5 100644 --- a/src/transactions/TransactionSQL.h +++ b/src/transactions/TransactionSQL.h @@ -5,38 +5,17 @@ #pragma once #include "database/Database.h" -#include "herder/TxSetFrame.h" #include "overlay/StellarXDR.h" -#include "transactions/TransactionFrameBase.h" namespace stellar { class Application; -class XDROutputFileStream; - -void storeTransaction(Database& db, uint32_t ledgerSeq, - TransactionFrameBasePtr const& tx, - TransactionMeta const& tm, - TransactionResultSet const& resultSet, Config const& cfg); - -void storeTxSet(Database& db, uint32_t ledgerSeq, TxSetXDRFrame const& txSet); - -TransactionResultSet getTransactionHistoryResults(Database& db, - uint32 ledgerSeq); - -size_t copyTransactionsToStream(Application& app, soci::session& sess, - uint32_t ledgerSeq, uint32_t ledgerCount, - XDROutputFileStream& txOut, - XDROutputFileStream& txResultOut); - -void createTxSetHistoryTable(Database& db); - -void deprecateTransactionFeeHistory(Database& db); - -void dropTransactionHistory(Database& db, Config const& cfg); - -void deleteOldTransactionHistoryEntries(Database& db, uint32_t ledgerSeq, - uint32_t count); - -void deleteNewerTransactionHistoryEntries(Database& db, uint32_t ledgerSeq); +class CheckpointBuilder; + +size_t populateCheckpointFilesFromDB(Application& app, soci::session& sess, + uint32_t ledgerSeq, uint32_t ledgerCount, + CheckpointBuilder& checkpointBuilder); +void dropSupportTransactionFeeHistory(Database& db); +void dropSupportTxSetHistory(Database& db); +void dropSupportTxHistory(Database& db); } diff --git a/src/transactions/readme.md b/src/transactions/readme.md index af9ffc7eec..464c61c8a1 100644 --- a/src/transactions/readme.md +++ b/src/transactions/readme.md @@ -76,8 +76,7 @@ such: the sequence number in the account is consumed, the fee is collected and the result set to "Failed". ## Result -When transactions are applied (success or not), the result is saved in the -"txhistory" table in the database. +When transactions are applied (success or not), the result will be emitted via LedgerCloseMeta (if configured). # Operations @@ -134,9 +133,7 @@ transactions and LedgerDelta objects, this makes rolling back changes simpler. For each Operation, there is a matching Result type that gathers information on the key side effects of the operation or in the case of failure records why in structured form. -Results are queued in the txhistory table for other components to derive data: -historical module for uploading it for long term storage, but also for API -servers to consume externally. +Results are queued in LedgerCloseMeta to be consumed by downstream systems (if configured) ## List of operations See `src/xdr/Stellar-transaction.x` for a detailed list of all operations and results. diff --git a/src/util/Fs.cpp b/src/util/Fs.cpp index f21799269a..d74a7b35a6 100644 --- a/src/util/Fs.cpp +++ b/src/util/Fs.cpp @@ -153,7 +153,8 @@ durableRename(std::string const& src, std::string const& dst, std::string const& dir) { ZoneScoped; - if (MoveFileExA(src.c_str(), dst.c_str(), MOVEFILE_WRITE_THROUGH) == 0) + if (MoveFileExA(src.c_str(), dst.c_str(), + MOVEFILE_WRITE_THROUGH | MOVEFILE_REPLACE_EXISTING) == 0) { FileSystemException::failWithGetLastError( "fs::durableRename() failed on MoveFileExA(): "); @@ -257,7 +258,9 @@ durableRename(std::string const& src, std::string const& dst, std::string const& dir) { ZoneScoped; - if (rename(src.c_str(), dst.c_str()) != 0) + std::error_code ec; + std::filesystem::rename(src.c_str(), dst.c_str(), ec); + if (ec) { return false; } diff --git a/src/util/Timer.h b/src/util/Timer.h index 6af877207a..bd0130c540 100644 --- a/src/util/Timer.h +++ b/src/util/Timer.h @@ -129,6 +129,12 @@ class VirtualClock void shutdown(); bool isStopped(); + Mode + getMode() const + { + return mMode; + } + private: asio::io_context mIOContext; Mode const mMode; diff --git a/src/util/XDRStream.h b/src/util/XDRStream.h index 517ebc72a2..8187d6d191 100644 --- a/src/util/XDRStream.h +++ b/src/util/XDRStream.h @@ -377,6 +377,16 @@ class XDROutputFileStream return isOpen(); } + template + void + durableWriteOne(T const& t, SHA256* hasher = nullptr, + size_t* bytesPut = nullptr) + { + writeOne(t, hasher, bytesPut); + flush(); + fs::flushFileChanges(getHandle()); + } + template void writeOne(T const& t, SHA256* hasher = nullptr, size_t* bytesPut = nullptr) diff --git a/src/util/test/FsTests.cpp b/src/util/test/FsTests.cpp index 15f9803af6..5680db14b4 100644 --- a/src/util/test/FsTests.cpp +++ b/src/util/test/FsTests.cpp @@ -65,7 +65,7 @@ TEST_CASE("filesystem findfiles", "[fs]") TEST_CASE("filesystem remoteName", "[fs]") { - REQUIRE(fs::remoteName(HISTORY_FILE_TYPE_LEDGER, fs::hexStr(0x0abbccdd), - "xdr.gz") == + REQUIRE(fs::remoteName(typeString(FileType::HISTORY_FILE_TYPE_LEDGER), + fs::hexStr(0x0abbccdd), "xdr.gz") == "ledger/0a/bb/cc/ledger-0abbccdd.xdr.gz"); }