From c319378bc0c1037573313291ab4009e4a0bd66f3 Mon Sep 17 00:00:00 2001 From: Pavel Zbitskiy Date: Tue, 25 Jun 2024 10:33:33 -0400 Subject: [PATCH] node: close ledger on node shutdown * node_test runs multiple nodes in a single process that leads to file descriptors leak (see #5057 for more details). * just closing ledger is not enough because of concurrent operations evaluation operations done by transaction pool. * made transaction pool shutdown-able and stop it before ledger termination. --- data/pools/transactionPool.go | 25 +++++++++++++++++++++++++ ledger/notifier.go | 2 ++ node/node.go | 2 ++ 3 files changed, 29 insertions(+) diff --git a/data/pools/transactionPool.go b/data/pools/transactionPool.go index 687a3db80c..126cc0d0c4 100644 --- a/data/pools/transactionPool.go +++ b/data/pools/transactionPool.go @@ -95,6 +95,11 @@ type TransactionPool struct { // stateproofOverflowed indicates that a stateproof transaction was allowed to // exceed the txPoolMaxSize. This flag is reset to false OnNewBlock stateproofOverflowed bool + + // shutdown is set to true when the pool is being shut down. It is checked in exported methods + // to prevent pool operations like remember and recomputing the block evaluator + // from using down stream resources like ledger that may be shutting down. + shutdown bool } // BlockEvaluator defines the block evaluator interface exposed by the ledger package. @@ -113,6 +118,8 @@ type VotingAccountSupplier interface { VotingAccountsForRound(basics.Round) []basics.Address } +var errPoolShutdown = errors.New("transaction pool is shutting down") + // MakeTransactionPool makes a transaction pool. func MakeTransactionPool(ledger *ledger.Ledger, cfg config.Local, log logging.Logger, vac VotingAccountSupplier) *TransactionPool { if cfg.TxPoolExponentialIncreaseFactor < 1 { @@ -430,6 +437,10 @@ func (pool *TransactionPool) ingest(txgroup []transactions.SignedTxn, params poo return ErrNoPendingBlockEvaluator } + if pool.shutdown { + return errPoolShutdown + } + if !params.recomputing { // Make sure that the latest block has been processed by OnNewBlock(). // If not, we might be in a race, so wait a little bit for OnNewBlock() @@ -529,6 +540,10 @@ func (pool *TransactionPool) OnNewBlock(block bookkeeping.Block, delta ledgercor pool.mu.Lock() defer pool.mu.Unlock() + if pool.shutdown { + return + } + defer pool.cond.Broadcast() if pool.pendingBlockEvaluator == nil || block.Round() >= pool.pendingBlockEvaluator.Round() { // Adjust the pool fee threshold. The rules are: @@ -1010,3 +1025,13 @@ func (pool *TransactionPool) AssembleDevModeBlock() (assembled *ledgercore.Unfin assembled, err = pool.AssembleBlock(pool.pendingBlockEvaluator.Round(), time.Now().Add(pool.proposalAssemblyTime)) return } + +// Shutdown stops the transaction pool from accepting new transactions and blocks. +// It takes the pool.mu lock in order to ensure there is no pending remember or block operations in flight +// and sets the shutdown flag to true. +func (pool *TransactionPool) Shutdown() { + pool.mu.Lock() + defer pool.mu.Unlock() + + pool.shutdown = true +} diff --git a/ledger/notifier.go b/ledger/notifier.go index aabf62d080..f97e1c77e6 100644 --- a/ledger/notifier.go +++ b/ledger/notifier.go @@ -74,6 +74,8 @@ func (bn *blockNotifier) worker() { func (bn *blockNotifier) close() { bn.mu.Lock() + bn.pendingBlocks = nil + bn.listeners = nil if bn.running { bn.running = false bn.cond.Broadcast() diff --git a/node/node.go b/node/node.go index d1c6cc4b82..93c2c566c7 100644 --- a/node/node.go +++ b/node/node.go @@ -441,7 +441,9 @@ func (node *AlgorandFullNode) Stop() { node.lowPriorityCryptoVerificationPool.Shutdown() node.cryptoPool.Shutdown() node.log.Debug("crypto worker pools have stopped") + node.transactionPool.Shutdown() node.cancelCtx() + node.ledger.Close() } // note: unlike the other two functions, this accepts a whole filename