Skip to content

Commit

Permalink
ledger: make catchpoint generation backward compatible (#5598)
Browse files Browse the repository at this point in the history
  • Loading branch information
algorandskiy authored Nov 6, 2023
1 parent 5a2ef5e commit c1207a4
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 33 deletions.
101 changes: 69 additions & 32 deletions ledger/catchpointtracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,6 @@ type catchpointTracker struct {
// enableGeneratingCatchpointFiles determines whether catchpoints files should be generated by the trackers.
enableGeneratingCatchpointFiles bool

// Prepared SQL statements for fast accounts DB lookups.
accountsq trackerdb.AccountsReader

// log copied from ledger
log logging.Logger

Expand Down Expand Up @@ -138,6 +135,9 @@ type catchpointTracker struct {
// roundDigest stores the digest of the block for every round starting with dbRound+1 and every round after it.
roundDigest []crypto.Digest

// consensusVersion stores the consensus versions for every round starting with dbRound+1 and every round after it.
consensusVersion []protocol.ConsensusVersion

// reenableCatchpointsRound is a round where the EnableCatchpointsWithSPContexts feature was enabled via the consensus.
// we avoid generating catchpoints before that round in order to ensure the network remain consistent in the catchpoint
// label being produced. This variable could be "wrong" in two cases -
Expand All @@ -151,9 +151,13 @@ type catchpointTracker struct {
// catchpoint files even before the protocol upgrade took place.
forceCatchpointFileWriting bool

// catchpointsMu protects `roundDigest`, `reenableCatchpointsRound` and
// catchpointsMu protects roundDigest, reenableCatchpointsRound, cachedDBRound and
// `lastCatchpointLabel`.
catchpointsMu deadlock.RWMutex

// cachedDBRound is always exactly tracker DB round (and therefore, accountsRound()),
// cached to use in lookup functions
cachedDBRound basics.Round
}

// initialize initializes the catchpointTracker structure
Expand Down Expand Up @@ -205,7 +209,7 @@ func (ct *catchpointTracker) getSPVerificationData() (encodedData []byte, spVeri
return encodedData, spVerificationHash, nil
}

func (ct *catchpointTracker) finishFirstStage(ctx context.Context, dbRound basics.Round, updatingBalancesDuration time.Duration) error {
func (ct *catchpointTracker) finishFirstStage(ctx context.Context, dbRound basics.Round, blockProto protocol.ConsensusVersion, updatingBalancesDuration time.Duration) error {
ct.log.Infof("finishing catchpoint's first stage dbRound: %d", dbRound)

var totalKVs uint64
Expand All @@ -216,11 +220,15 @@ func (ct *catchpointTracker) finishFirstStage(ctx context.Context, dbRound basic
var spVerificationEncodedData []byte
var catchpointGenerationStats telemetryspec.CatchpointGenerationEventDetails

// Generate the SP Verification hash and encoded data. The hash is used in the label when tracking catchpoints,
// and the encoded data for that hash will be added to the catchpoint file if catchpoint generation is enabled.
spVerificationEncodedData, spVerificationHash, err := ct.getSPVerificationData()
if err != nil {
return err
params := config.Consensus[blockProto]
if params.EnableCatchpointsWithSPContexts {
// Generate the SP Verification hash and encoded data. The hash is used in the label when tracking catchpoints,
// and the encoded data for that hash will be added to the catchpoint file if catchpoint generation is enabled.
var err error
spVerificationEncodedData, spVerificationHash, err = ct.getSPVerificationData()
if err != nil {
return err
}
}

if ct.enableGeneratingCatchpointFiles {
Expand Down Expand Up @@ -257,7 +265,7 @@ func (ct *catchpointTracker) finishFirstStage(ctx context.Context, dbRound basic

// Possibly finish generating first stage catchpoint db record and data file after
// a crash.
func (ct *catchpointTracker) finishFirstStageAfterCrash(dbRound basics.Round) error {
func (ct *catchpointTracker) finishFirstStageAfterCrash(dbRound basics.Round, blockProto protocol.ConsensusVersion) error {
v, err := ct.catchpointStore.ReadCatchpointStateUint64(
context.Background(), trackerdb.CatchpointStateWritingFirstStageInfo)
if err != nil {
Expand All @@ -274,10 +282,10 @@ func (ct *catchpointTracker) finishFirstStageAfterCrash(dbRound basics.Round) er
return err
}

return ct.finishFirstStage(context.Background(), dbRound, 0)
return ct.finishFirstStage(context.Background(), dbRound, blockProto, 0)
}

func (ct *catchpointTracker) finishCatchpointsAfterCrash(catchpointLookback uint64) error {
func (ct *catchpointTracker) finishCatchpointsAfterCrash(blockProto protocol.ConsensusVersion, catchpointLookback uint64) error {
records, err := ct.catchpointStore.SelectUnfinishedCatchpoints(context.Background())
if err != nil {
return err
Expand All @@ -292,7 +300,7 @@ func (ct *catchpointTracker) finishCatchpointsAfterCrash(catchpointLookback uint
}

err = ct.finishCatchpoint(
context.Background(), record.Round, record.BlockHash, catchpointLookback)
context.Background(), record.Round, record.BlockHash, blockProto, catchpointLookback)
if err != nil {
return err
}
Expand All @@ -301,8 +309,8 @@ func (ct *catchpointTracker) finishCatchpointsAfterCrash(catchpointLookback uint
return nil
}

func (ct *catchpointTracker) recoverFromCrash(dbRound basics.Round) error {
err := ct.finishFirstStageAfterCrash(dbRound)
func (ct *catchpointTracker) recoverFromCrash(dbRound basics.Round, blockProto protocol.ConsensusVersion) error {
err := ct.finishFirstStageAfterCrash(dbRound, blockProto)
if err != nil {
return err
}
Expand All @@ -316,7 +324,7 @@ func (ct *catchpointTracker) recoverFromCrash(dbRound basics.Round) error {
}

if catchpointLookback != 0 {
err = ct.finishCatchpointsAfterCrash(catchpointLookback)
err = ct.finishCatchpointsAfterCrash(blockProto, catchpointLookback)
if err != nil {
return err
}
Expand Down Expand Up @@ -346,11 +354,15 @@ func (ct *catchpointTracker) loadFromDisk(l ledgerForTracker, dbRound basics.Rou
return err
}

ct.catchpointsMu.Lock()
ct.cachedDBRound = dbRound
ct.roundDigest = nil
ct.consensusVersion = nil
ct.catchpointDataWriting.Store(0)
// keep these channel closed if we're not generating catchpoint
ct.catchpointDataSlowWriting = make(chan struct{}, 1)
close(ct.catchpointDataSlowWriting)
ct.catchpointsMu.Unlock()

err = ct.dbs.Transaction(func(ctx context.Context, tx trackerdb.TransactionScope) error {
return ct.initializeHashes(ctx, tx, dbRound)
Expand All @@ -359,18 +371,18 @@ func (ct *catchpointTracker) loadFromDisk(l ledgerForTracker, dbRound basics.Rou
return err
}

ct.accountsq, err = ct.dbs.MakeAccountsOptimizedReader()
ct.lastCatchpointLabel, err = ct.catchpointStore.ReadCatchpointStateString(
context.Background(), trackerdb.CatchpointStateLastCatchpoint)
if err != nil {
return
}

ct.lastCatchpointLabel, err = ct.catchpointStore.ReadCatchpointStateString(
context.Background(), trackerdb.CatchpointStateLastCatchpoint)
hdr, err := l.BlockHdr(dbRound)
if err != nil {
return
}

return ct.recoverFromCrash(dbRound)
return ct.recoverFromCrash(dbRound, hdr.CurrentProtocol)
}

// newBlock informs the tracker of a new block from round
Expand All @@ -380,6 +392,7 @@ func (ct *catchpointTracker) newBlock(blk bookkeeping.Block, delta ledgercore.St
defer ct.catchpointsMu.Unlock()

ct.roundDigest = append(ct.roundDigest, blk.Digest())
ct.consensusVersion = append(ct.consensusVersion, blk.CurrentProtocol)

if (config.Consensus[blk.CurrentProtocol].EnableCatchpointsWithSPContexts || ct.forceCatchpointFileWriting) && ct.reenableCatchpointsRound == 0 {
catchpointLookback := config.Consensus[blk.CurrentProtocol].CatchpointLookback
Expand All @@ -396,7 +409,10 @@ func (ct *catchpointTracker) newBlock(blk bookkeeping.Block, delta ledgercore.St
// number that can be removed from the blocks database as well as the lookback that this
// tracker maintains.
func (ct *catchpointTracker) committedUpTo(rnd basics.Round) (retRound, lookback basics.Round) {
return rnd, basics.Round(0)
ct.catchpointsMu.RLock()
defer ct.catchpointsMu.RUnlock()
retRound = ct.cachedDBRound
return retRound, basics.Round(0)
}

// Calculate whether we have intermediate first stage catchpoint rounds and the
Expand Down Expand Up @@ -505,6 +521,8 @@ func (ct *catchpointTracker) prepareCommit(dcc *deferredCommitContext) error {

dcc.committedRoundDigests = make([]crypto.Digest, dcc.offset)
copy(dcc.committedRoundDigests, ct.roundDigest[:dcc.offset])
dcc.committedProtocolVersion = make([]protocol.ConsensusVersion, dcc.offset)
copy(dcc.committedProtocolVersion, ct.consensusVersion[:dcc.offset])

return nil
}
Expand Down Expand Up @@ -601,6 +619,8 @@ func (ct *catchpointTracker) postCommit(ctx context.Context, dcc *deferredCommit

ct.catchpointsMu.Lock()
ct.roundDigest = ct.roundDigest[dcc.offset:]
ct.consensusVersion = ct.consensusVersion[dcc.offset:]
ct.cachedDBRound = dcc.newBase()
ct.catchpointsMu.Unlock()

dcc.updatingBalancesDuration = time.Since(dcc.flushTime)
Expand Down Expand Up @@ -736,9 +756,18 @@ func repackCatchpoint(ctx context.Context, header CatchpointFileHeader, biggestC

// Create a catchpoint (a label and possibly a file with db record) and remove
// the unfinished catchpoint record.
func (ct *catchpointTracker) createCatchpoint(ctx context.Context, accountsRound basics.Round, round basics.Round, dataInfo trackerdb.CatchpointFirstStageInfo, blockHash crypto.Digest) error {
func (ct *catchpointTracker) createCatchpoint(ctx context.Context, accountsRound basics.Round, round basics.Round, dataInfo trackerdb.CatchpointFirstStageInfo, blockHash crypto.Digest, blockProto protocol.ConsensusVersion) error {
startTime := time.Now()
labelMaker := ledgercore.MakeCatchpointLabelMakerCurrent(round, &blockHash, &dataInfo.TrieBalancesHash, dataInfo.Totals, &dataInfo.StateProofVerificationHash)
var labelMaker ledgercore.CatchpointLabelMaker
var version uint64
params := config.Consensus[blockProto]
if params.EnableCatchpointsWithSPContexts {
labelMaker = ledgercore.MakeCatchpointLabelMakerCurrent(round, &blockHash, &dataInfo.TrieBalancesHash, dataInfo.Totals, &dataInfo.StateProofVerificationHash)
version = CatchpointFileVersionV7
} else {
labelMaker = ledgercore.MakeCatchpointLabelMakerV6(round, &blockHash, &dataInfo.TrieBalancesHash, dataInfo.Totals)
version = CatchpointFileVersionV6
}
label := ledgercore.MakeLabel(labelMaker)

ct.log.Infof(
Expand Down Expand Up @@ -774,7 +803,7 @@ func (ct *catchpointTracker) createCatchpoint(ctx context.Context, accountsRound

// Make a catchpoint file.
header := CatchpointFileHeader{
Version: CatchpointFileVersionV7,
Version: version,
BalancesRound: accountsRound,
BlocksRound: round,
Totals: dataInfo.Totals,
Expand Down Expand Up @@ -834,7 +863,7 @@ func (ct *catchpointTracker) createCatchpoint(ctx context.Context, accountsRound

// Try create a catchpoint (a label and possibly a file with db record) and remove
// the unfinished catchpoint record.
func (ct *catchpointTracker) finishCatchpoint(ctx context.Context, round basics.Round, blockHash crypto.Digest, catchpointLookback uint64) error {
func (ct *catchpointTracker) finishCatchpoint(ctx context.Context, round basics.Round, blockHash crypto.Digest, blockProto protocol.ConsensusVersion, catchpointLookback uint64) error {
accountsRound := round - basics.Round(catchpointLookback)

ct.log.Infof("finishing catchpoint round: %d accountsRound: %d", round, accountsRound)
Expand All @@ -847,7 +876,7 @@ func (ct *catchpointTracker) finishCatchpoint(ctx context.Context, round basics.
if !exists {
return ct.catchpointStore.DeleteUnfinishedCatchpoint(ctx, round)
}
return ct.createCatchpoint(ctx, accountsRound, round, dataInfo, blockHash)
return ct.createCatchpoint(ctx, accountsRound, round, dataInfo, blockHash, blockProto)
}

// Calculate catchpoint round numbers in [min, max]. `catchpointInterval` must be
Expand Down Expand Up @@ -908,7 +937,9 @@ func (ct *catchpointTracker) pruneFirstStageRecordsData(ctx context.Context, max

func (ct *catchpointTracker) postCommitUnlocked(ctx context.Context, dcc *deferredCommitContext) {
if dcc.catchpointFirstStage {
err := ct.finishFirstStage(ctx, dcc.newBase(), dcc.updatingBalancesDuration)
round := dcc.newBase()
blockProto := dcc.committedProtocolVersion[round-dcc.oldBase-1]
err := ct.finishFirstStage(ctx, round, blockProto, dcc.updatingBalancesDuration)
if err != nil {
ct.log.Warnf(
"error finishing catchpoint's first stage dcc.newBase: %d err: %v",
Expand All @@ -918,8 +949,10 @@ func (ct *catchpointTracker) postCommitUnlocked(ctx context.Context, dcc *deferr

// Generate catchpoints for rounds in (dcc.oldBase, dcc.newBase].
for _, round := range ct.calculateCatchpointRounds(&dcc.deferredCommitRange) {
blockHash := dcc.committedRoundDigests[round-dcc.oldBase-1]
blockProto := dcc.committedProtocolVersion[round-dcc.oldBase-1]
err := ct.finishCatchpoint(
ctx, round, dcc.committedRoundDigests[round-dcc.oldBase-1], dcc.catchpointLookback)
ctx, round, blockHash, blockProto, dcc.catchpointLookback)
if err != nil {
ct.log.Warnf("error creating catchpoint round: %d err: %v", round, err)
}
Expand Down Expand Up @@ -1157,9 +1190,13 @@ func (ct *catchpointTracker) generateCatchpointData(ctx context.Context, account
return
}

err = catchpointWriter.FileWriteSPVerificationContext(encodedSPData)
if err != nil {
return
// do not write encodedSPData if not provided,
// this is an indication the older catchpoint file is being generated.
if encodedSPData != nil {
err = catchpointWriter.FileWriteSPVerificationContext(encodedSPData)
if err != nil {
return
}
}

for more {
Expand Down
57 changes: 56 additions & 1 deletion ledger/catchpointtracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,10 @@ func createCatchpoint(t *testing.T, ct *catchpointTracker, accountsRound basics.

require.Equal(t, calculateStateProofVerificationHash(t, ml), stateProofVerificationHash)

err = ct.createCatchpoint(context.Background(), accountsRound, round, trackerdb.CatchpointFirstStageInfo{BiggestChunkLen: biggestChunkLen}, crypto.Digest{})
err = ct.createCatchpoint(
context.Background(), accountsRound, round,
trackerdb.CatchpointFirstStageInfo{BiggestChunkLen: biggestChunkLen},
crypto.Digest{}, protocol.ConsensusCurrentVersion)
require.NoError(t, err)
}

Expand Down Expand Up @@ -760,8 +763,10 @@ func TestCatchpointReproducibleLabels(t *testing.T) {

// test to see that after loadFromDisk, all the tracker content is lost ( as expected )
require.NotZero(t, len(ct.roundDigest))
require.NotZero(t, len(ct.consensusVersion))
require.NoError(t, ct.loadFromDisk(ml, ml.Latest()))
require.Zero(t, len(ct.roundDigest))
require.Zero(t, len(ct.consensusVersion))
require.Zero(t, ct.catchpointDataWriting.Load())
select {
case _, closed := <-ct.catchpointDataSlowWriting:
Expand All @@ -771,6 +776,56 @@ func TestCatchpointReproducibleLabels(t *testing.T) {
}
}

// TestCatchpointBackwardCompatibleLabels checks labels before and after EnableCatchpointsWithSPContexts was introduced.
func TestCatchpointBackwardCompatibleLabels(t *testing.T) {
partitiontest.PartitionTest(t)

temporaryDirectory := t.TempDir()

accts := []map[basics.Address]basics.AccountData{ledgertesting.RandomAccounts(20, true)}
ml := makeMockLedgerForTracker(t, true, 10, protocol.ConsensusCurrentVersion, accts)
defer ml.Close()

ct := &catchpointTracker{enableGeneratingCatchpointFiles: false}
conf := config.GetDefaultLocal()

conf.Archival = true
paths := DirsAndPrefix{
ResolvedGenesisDirs: config.ResolvedGenesisDirs{
CatchpointGenesisDir: ".",
HotGenesisDir: ".",
},
}
ct.initialize(conf, paths)

defer ct.close()
ct.dbDirectory = temporaryDirectory
ct.tmpDir = temporaryDirectory

_, err := trackerDBInitialize(ml, true, ct.dbDirectory)
require.NoError(t, err)

err = ct.loadFromDisk(ml, ml.Latest())
require.NoError(t, err)

// create catpoint with the latest version of the code
round := basics.Round(2000)

protos := []protocol.ConsensusVersion{protocol.ConsensusCurrentVersion, protocol.ConsensusV37, protocol.ConsensusV36}
labels := make([]string, len(protos))
for i, proto := range protos {
err = ct.createCatchpoint(
context.Background(), round-1, round,
trackerdb.CatchpointFirstStageInfo{},
crypto.Digest{}, proto)
require.NoError(t, err)
require.NotEmpty(t, ct.lastCatchpointLabel)
labels[i] = ct.lastCatchpointLabel
}
require.NotEqual(t, labels[0], labels[1])
require.Equal(t, labels[1], labels[2])
}

// blockingTracker is a testing tracker used to test "what if" a tracker would get blocked.
type blockingTracker struct {
emptyTracker
Expand Down
3 changes: 3 additions & 0 deletions ledger/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,9 @@ type deferredCommitContext struct {
// Block hashes for the committed rounds range.
committedRoundDigests []crypto.Digest

// Consensus versions for the committed rounds range.
committedProtocolVersion []protocol.ConsensusVersion

// on catchpoint rounds, the transaction tail would fill up this field with the hash of the recent 1001 rounds
// of the txtail data. The catchpointTracker would be able to use that for calculating the catchpoint label.
txTailHash crypto.Digest
Expand Down

0 comments on commit c1207a4

Please sign in to comment.