Skip to content

Commit

Permalink
ledger: restore block listeners on reloadLedger (#6041)
Browse files Browse the repository at this point in the history
  • Loading branch information
algorandskiy authored Jun 26, 2024
1 parent f523300 commit d3831cd
Show file tree
Hide file tree
Showing 14 changed files with 69 additions and 35 deletions.
3 changes: 1 addition & 2 deletions catchup/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,7 @@ func buildTestLedger(t *testing.T, blk bookkeeping.Block) (ledger *data.Ledger,
cfg := config.GetDefaultLocal()
cfg.Archival = true
ledger, err = data.LoadLedger(
log, t.Name(), inMem, protocol.ConsensusCurrentVersion, genBal, "", genHash,
nil, cfg,
log, t.Name(), inMem, protocol.ConsensusCurrentVersion, genBal, "", genHash, cfg,
)
if err != nil {
t.Fatal("couldn't build ledger", err)
Expand Down
4 changes: 2 additions & 2 deletions catchup/pref_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func BenchmarkServiceFetchBlocks(b *testing.B) {
for i := 0; i < b.N; i++ {
inMem := true
prefix := b.Name() + "empty" + strconv.Itoa(i)
local, err := data.LoadLedger(logging.TestingLog(b), prefix, inMem, protocol.ConsensusCurrentVersion, genesisBalances, "", crypto.Digest{}, nil, cfg)
local, err := data.LoadLedger(logging.TestingLog(b), prefix, inMem, protocol.ConsensusCurrentVersion, genesisBalances, "", crypto.Digest{}, cfg)
require.NoError(b, err)

// Make Service
Expand Down Expand Up @@ -150,7 +150,7 @@ func benchenv(t testing.TB, numAccounts, numBlocks int) (ledger, emptyLedger *da
cfg := config.GetDefaultLocal()
cfg.Archival = true
prefix := t.Name() + "empty"
emptyLedger, err = data.LoadLedger(logging.TestingLog(t), prefix, inMem, protocol.ConsensusCurrentVersion, genesisBalances, "", crypto.Digest{}, nil, cfg)
emptyLedger, err = data.LoadLedger(logging.TestingLog(t), prefix, inMem, protocol.ConsensusCurrentVersion, genesisBalances, "", crypto.Digest{}, cfg)
require.NoError(t, err)

ledger, err = datatest.FabricateLedger(logging.TestingLog(t), t.Name(), parts, genesisBalances, emptyLedger.LastRound()+basics.Round(numBlocks))
Expand Down
2 changes: 1 addition & 1 deletion daemon/algod/api/server/v2/test/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ func testingenvWithBalances(t testing.TB, minMoneyAtStart, maxMoneyAtStart, numA
const inMem = true
cfg := config.GetDefaultLocal()
cfg.Archival = true
ledger, err := data.LoadLedger(logging.Base(), t.Name(), inMem, protocol.ConsensusFuture, bootstrap, genesisID, genesisHash, nil, cfg)
ledger, err := data.LoadLedger(logging.Base(), t.Name(), inMem, protocol.ConsensusFuture, bootstrap, genesisID, genesisHash, cfg)
if err != nil {
panic(err)
}
Expand Down
2 changes: 1 addition & 1 deletion data/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func testingenv(t testing.TB, numAccounts, numTxs int, offlineAccounts bool) (*L
const inMem = true
cfg := config.GetDefaultLocal()
cfg.Archival = true
ledger, err := LoadLedger(logging.Base(), t.Name(), inMem, protocol.ConsensusCurrentVersion, bootstrap, genesisID, genesisHash, nil, cfg)
ledger, err := LoadLedger(logging.Base(), t.Name(), inMem, protocol.ConsensusCurrentVersion, bootstrap, genesisID, genesisHash, cfg)
if err != nil {
panic(err)
}
Expand Down
2 changes: 1 addition & 1 deletion data/datatest/fabricateLedger.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func FabricateLedger(log logging.Logger, ledgerName string, accounts []account.P
const inMem = true
cfg := config.GetDefaultLocal()
cfg.Archival = true
ledger, err := data.LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genesis, "", crypto.Digest{}, nil, cfg)
ledger, err := data.LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genesis, "", crypto.Digest{}, cfg)
if err != nil {
return nil, err
}
Expand Down
3 changes: 1 addition & 2 deletions data/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ type roundSeed struct {
func LoadLedger[T string | ledger.DirsAndPrefix](
log logging.Logger, dir T, memory bool,
genesisProto protocol.ConsensusVersion, genesisBal bookkeeping.GenesisBalances, genesisID string, genesisHash crypto.Digest,
blockListeners []ledgercore.BlockListener, cfg config.Local,
cfg config.Local,
) (*Ledger, error) {
if genesisBal.Balances == nil {
genesisBal.Balances = make(map[basics.Address]basics.AccountData)
Expand Down Expand Up @@ -115,7 +115,6 @@ func LoadLedger[T string | ledger.DirsAndPrefix](
}

l.Ledger = ll
l.RegisterBlockListeners(blockListeners)
return l, nil
}

Expand Down
18 changes: 9 additions & 9 deletions data/txHandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func BenchmarkTxHandlerProcessing(b *testing.B) {
cfg.Archival = true
cfg.TxBacklogReservedCapacityPerPeer = 1
cfg.IncomingConnectionsLimit = 10
ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, nil, cfg)
ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, cfg)
require.NoError(b, err)
defer ledger.Close()

Expand Down Expand Up @@ -1027,7 +1027,7 @@ func TestTxHandlerProcessIncomingCacheTxPoolDrop(t *testing.T) {
cfg.Archival = true
cfg.EnableTxBacklogRateLimiting = false
cfg.TxIncomingFilteringFlags = 3 // txFilterRawMsg + txFilterCanonical
ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, nil, cfg)
ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, cfg)
require.NoError(t, err)
defer ledger.Close()

Expand Down Expand Up @@ -1196,7 +1196,7 @@ func incomingTxHandlerProcessing(maxGroupSize, numberOfTransactionGroups int, t
cfg := config.GetDefaultLocal()
cfg.Archival = true
cfg.EnableTxBacklogRateLimiting = false
ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, nil, cfg)
ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, cfg)
require.NoError(t, err)
defer ledger.Close()

Expand Down Expand Up @@ -1641,7 +1641,7 @@ func (g *txGenerator) makeLedger(tb testing.TB, cfg config.Local, log logging.Lo
ledgerName := fmt.Sprintf("%s-in_mem-w_inv=%d", namePrefix, ivrString)
ledgerName = strings.Replace(ledgerName, "#", "-", 1)
const inMem = true
ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, nil, cfg)
ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, cfg)
require.NoError(tb, err)
return ledger
}
Expand Down Expand Up @@ -2183,7 +2183,7 @@ func TestTxHandlerRememberReportErrorsWithTxPool(t *testing.T) { //nolint:parall
cfg := config.GetDefaultLocal()
cfg.Archival = true
cfg.TxPoolSize = config.MaxTxGroupSize + 1
ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, nil, cfg)
ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, cfg)
require.NoError(t, err)
defer ledger.Close()

Expand Down Expand Up @@ -2419,7 +2419,7 @@ func TestTxHandlerRestartWithBacklogAndTxPool(t *testing.T) { //nolint:parallelt
const inMem = true
cfg := config.GetDefaultLocal()
cfg.Archival = true
ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, nil, cfg)
ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, cfg)
require.NoError(t, err)
defer ledger.Ledger.Close()

Expand Down Expand Up @@ -2524,7 +2524,7 @@ func TestTxHandlerAppRateLimiterERLEnabled(t *testing.T) {
cfg.TxBacklogServiceRateWindowSeconds = 1
cfg.TxBacklogAppTxPerSecondRate = 3
cfg.TxBacklogSize = 3
ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, bookkeeping.GenesisBalances{}, genesisID, genesisHash, nil, cfg)
ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, bookkeeping.GenesisBalances{}, genesisID, genesisHash, cfg)
require.NoError(t, err)
defer ledger.Close()

Expand Down Expand Up @@ -2636,7 +2636,7 @@ func TestTxHandlerAppRateLimiter(t *testing.T) {
cfg.TxBacklogAppTxRateLimiterMaxSize = 100
cfg.TxBacklogServiceRateWindowSeconds = 1
cfg.TxBacklogAppTxPerSecondRate = 3
ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, nil, cfg)
ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, cfg)
require.NoError(t, err)
defer ledger.Close()

Expand Down Expand Up @@ -2705,7 +2705,7 @@ func TestTxHandlerCapGuard(t *testing.T) {
cfg.IncomingConnectionsLimit = 1
cfg.TxBacklogSize = 3

ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, nil, cfg)
ledger, err := LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, cfg)
require.NoError(t, err)
defer ledger.Close()

Expand Down
9 changes: 9 additions & 0 deletions ledger/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,10 @@ func (l *Ledger) reloadLedger() error {
l.trackerMu.Lock()
defer l.trackerMu.Unlock()

// save block listeners to recover them later
blockListeners := make([]ledgercore.BlockListener, 0, len(l.notifier.listeners))
blockListeners = append(blockListeners, l.notifier.listeners...)

// close the trackers.
l.trackers.close()

Expand Down Expand Up @@ -256,6 +260,9 @@ func (l *Ledger) reloadLedger() error {
return err
}

// restore block listeners since l.notifier might not survive a reload
l.notifier.register(blockListeners)

// post-init actions
if trackerDBInitParams.VacuumOnStartup || l.cfg.OptimizeAccountsDatabaseOnStartup {
err = l.accts.vacuumDatabase(context.Background())
Expand Down Expand Up @@ -423,6 +430,8 @@ func (l *Ledger) Close() {
// RegisterBlockListeners registers listeners that will be called when a
// new block is added to the ledger.
func (l *Ledger) RegisterBlockListeners(listeners []ledgercore.BlockListener) {
l.trackerMu.RLock()
defer l.trackerMu.RUnlock()
l.notifier.register(listeners)
}

Expand Down
35 changes: 35 additions & 0 deletions ledger/ledger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3422,5 +3422,40 @@ func TestLedgerRetainMinOffCatchpointInterval(t *testing.T) {
}
}()
}
}

type testBlockListener struct {
id int
}

func (t *testBlockListener) OnNewBlock(bookkeeping.Block, ledgercore.StateDelta) {}

// TestLedgerRegisterBlockListeners ensures that the block listeners survive reloadLedger
func TestLedgerRegisterBlockListeners(t *testing.T) {
partitiontest.PartitionTest(t)

genBalances, _, _ := ledgertesting.NewTestGenesis()
var genHash crypto.Digest
crypto.RandBytes(genHash[:])
cfg := config.GetDefaultLocal()
l := newSimpleLedgerFull(t, genBalances, protocol.ConsensusCurrentVersion, genHash, cfg)
defer l.Close()

l.RegisterBlockListeners([]ledgercore.BlockListener{&testBlockListener{1}, &testBlockListener{2}})
l.RegisterBlockListeners([]ledgercore.BlockListener{&testBlockListener{3}})

require.Equal(t, 3, len(l.notifier.listeners))
var ids []int
for _, bl := range l.notifier.listeners {
ids = append(ids, bl.(*testBlockListener).id)
}
require.Equal(t, []int{1, 2, 3}, ids)

l.reloadLedger()

ids = nil
for _, bl := range l.notifier.listeners {
ids = append(ids, bl.(*testBlockListener).id)
}
require.Equal(t, []int{1, 2, 3}, ids)
}
4 changes: 2 additions & 2 deletions node/assemble_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func BenchmarkAssembleBlock(b *testing.B) {
const inMem = true
cfg := config.GetDefaultLocal()
cfg.Archival = true
ledger, err := data.LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, nil, cfg)
ledger, err := data.LoadLedger(log, ledgerName, inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, cfg)
require.NoError(b, err)

l := ledger
Expand Down Expand Up @@ -212,7 +212,7 @@ func TestAssembleBlockTransactionPoolBehind(t *testing.T) {
const inMem = true
cfg := config.GetDefaultLocal()
cfg.Archival = true
ledger, err := data.LoadLedger(log, "ledgerName", inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, nil, cfg)
ledger, err := data.LoadLedger(log, "ledgerName", inMem, protocol.ConsensusCurrentVersion, genBal, genesisID, genesisHash, cfg)
require.NoError(t, err)

l := ledger
Expand Down
8 changes: 2 additions & 6 deletions node/follower_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,17 +116,13 @@ func MakeFollower(log logging.Logger, rootDir string, cfg config.Local, phoneboo
DBFilePrefix: config.LedgerFilenamePrefix,
ResolvedGenesisDirs: node.genesisDirs,
}
node.ledger, err = data.LoadLedger(node.log, ledgerPaths, false, genesis.Proto, genalloc, node.genesisID, node.genesisHash, []ledgercore.BlockListener{}, cfg)
node.ledger, err = data.LoadLedger(node.log, ledgerPaths, false, genesis.Proto, genalloc, node.genesisID, node.genesisHash, cfg)
if err != nil {
log.Errorf("Cannot initialize ledger (%v): %v", ledgerPaths, err)
return nil, err
}

blockListeners := []ledgercore.BlockListener{
node,
}

node.ledger.RegisterBlockListeners(blockListeners)
node.ledger.RegisterBlockListeners([]ledgercore.BlockListener{node})

if cfg.IsGossipServer() {
rpcs.MakeHealthService(node.net)
Expand Down
10 changes: 3 additions & 7 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ func MakeFull(log logging.Logger, rootDir string, cfg config.Local, phonebookAdd
DBFilePrefix: config.LedgerFilenamePrefix,
ResolvedGenesisDirs: node.genesisDirs,
}
node.ledger, err = data.LoadLedger(node.log, ledgerPaths, false, genesis.Proto, genalloc, node.genesisID, node.genesisHash, []ledgercore.BlockListener{}, cfg)
node.ledger, err = data.LoadLedger(node.log, ledgerPaths, false, genesis.Proto, genalloc, node.genesisID, node.genesisHash, cfg)
if err != nil {
log.Errorf("Cannot initialize ledger (%v): %v", ledgerPaths, err)
return nil, err
Expand All @@ -246,12 +246,7 @@ func MakeFull(log logging.Logger, rootDir string, cfg config.Local, phonebookAdd

node.transactionPool = pools.MakeTransactionPool(node.ledger.Ledger, cfg, node.log, node)

blockListeners := []ledgercore.BlockListener{
node.transactionPool,
node,
}

node.ledger.RegisterBlockListeners(blockListeners)
node.ledger.RegisterBlockListeners([]ledgercore.BlockListener{node.transactionPool, node})
txHandlerOpts := data.TxHandlerOpts{
TxPool: node.transactionPool,
ExecutionPool: node.lowPriorityCryptoVerificationPool,
Expand Down Expand Up @@ -1211,6 +1206,7 @@ func (node *AlgorandFullNode) SetCatchpointCatchupMode(catchpointCatchupMode boo
return
}
defer node.mu.Unlock()

// start
node.transactionPool.Reset()
node.catchupService.Start()
Expand Down
2 changes: 1 addition & 1 deletion node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func setupFullNodes(t *testing.T, proto protocol.ConsensusVersion, verificationP
cfg, err := config.LoadConfigFromDisk(rootDirectory)
require.NoError(t, err)
cfg.Archival = true
_, err = data.LoadLedger(logging.Base().With("name", nodeID), ledgerFilenamePrefix, inMem, g.Proto, bootstrap, g.ID(), g.Hash(), nil, cfg)
_, err = data.LoadLedger(logging.Base().With("name", nodeID), ledgerFilenamePrefix, inMem, g.Proto, bootstrap, g.ID(), g.Hash(), cfg)
require.NoError(t, err)
}

Expand Down
2 changes: 1 addition & 1 deletion rpcs/blockService_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,7 @@ func makeLedger(t *testing.T, namePostfix string) *data.Ledger {
prefix := t.Name() + namePostfix
ledger, err := data.LoadLedger(
log, prefix, inMem, protocol.ConsensusCurrentVersion, genBal, "", genHash,
nil, cfg,
cfg,
)
require.NoError(t, err)
return ledger
Expand Down

0 comments on commit d3831cd

Please sign in to comment.