Skip to content

Commit

Permalink
node: close ledger on node shutdown
Browse files Browse the repository at this point in the history
* node_test runs multiple nodes in a single process that leads
  to file descriptors leak (see #5057 for more details).
* just closing ledger is not enough because of concurrent operations
  evaluation operations done by transaction pool.
* made transaction pool shutdown-able and stop it before ledger termination.
  • Loading branch information
algorandskiy committed Jun 25, 2024
1 parent 052ceb2 commit c319378
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 0 deletions.
25 changes: 25 additions & 0 deletions data/pools/transactionPool.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ type TransactionPool struct {
// stateproofOverflowed indicates that a stateproof transaction was allowed to
// exceed the txPoolMaxSize. This flag is reset to false OnNewBlock
stateproofOverflowed bool

// shutdown is set to true when the pool is being shut down. It is checked in exported methods
// to prevent pool operations like remember and recomputing the block evaluator
// from using down stream resources like ledger that may be shutting down.
shutdown bool
}

// BlockEvaluator defines the block evaluator interface exposed by the ledger package.
Expand All @@ -113,6 +118,8 @@ type VotingAccountSupplier interface {
VotingAccountsForRound(basics.Round) []basics.Address
}

var errPoolShutdown = errors.New("transaction pool is shutting down")

// MakeTransactionPool makes a transaction pool.
func MakeTransactionPool(ledger *ledger.Ledger, cfg config.Local, log logging.Logger, vac VotingAccountSupplier) *TransactionPool {
if cfg.TxPoolExponentialIncreaseFactor < 1 {
Expand Down Expand Up @@ -430,6 +437,10 @@ func (pool *TransactionPool) ingest(txgroup []transactions.SignedTxn, params poo
return ErrNoPendingBlockEvaluator
}

if pool.shutdown {
return errPoolShutdown

Check warning on line 441 in data/pools/transactionPool.go

View check run for this annotation

Codecov / codecov/patch

data/pools/transactionPool.go#L441

Added line #L441 was not covered by tests
}

if !params.recomputing {
// Make sure that the latest block has been processed by OnNewBlock().
// If not, we might be in a race, so wait a little bit for OnNewBlock()
Expand Down Expand Up @@ -529,6 +540,10 @@ func (pool *TransactionPool) OnNewBlock(block bookkeeping.Block, delta ledgercor

pool.mu.Lock()
defer pool.mu.Unlock()
if pool.shutdown {
return

Check warning on line 544 in data/pools/transactionPool.go

View check run for this annotation

Codecov / codecov/patch

data/pools/transactionPool.go#L544

Added line #L544 was not covered by tests
}

defer pool.cond.Broadcast()
if pool.pendingBlockEvaluator == nil || block.Round() >= pool.pendingBlockEvaluator.Round() {
// Adjust the pool fee threshold. The rules are:
Expand Down Expand Up @@ -1010,3 +1025,13 @@ func (pool *TransactionPool) AssembleDevModeBlock() (assembled *ledgercore.Unfin
assembled, err = pool.AssembleBlock(pool.pendingBlockEvaluator.Round(), time.Now().Add(pool.proposalAssemblyTime))
return
}

// Shutdown stops the transaction pool from accepting new transactions and blocks.
// It takes the pool.mu lock in order to ensure there is no pending remember or block operations in flight
// and sets the shutdown flag to true.
func (pool *TransactionPool) Shutdown() {
pool.mu.Lock()
defer pool.mu.Unlock()

Check warning on line 1034 in data/pools/transactionPool.go

View check run for this annotation

Codecov / codecov/patch

data/pools/transactionPool.go#L1032-L1034

Added lines #L1032 - L1034 were not covered by tests

pool.shutdown = true

Check warning on line 1036 in data/pools/transactionPool.go

View check run for this annotation

Codecov / codecov/patch

data/pools/transactionPool.go#L1036

Added line #L1036 was not covered by tests
}
2 changes: 2 additions & 0 deletions ledger/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ func (bn *blockNotifier) worker() {

func (bn *blockNotifier) close() {
bn.mu.Lock()
bn.pendingBlocks = nil
bn.listeners = nil
if bn.running {
bn.running = false
bn.cond.Broadcast()
Expand Down
2 changes: 2 additions & 0 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,9 @@ func (node *AlgorandFullNode) Stop() {
node.lowPriorityCryptoVerificationPool.Shutdown()
node.cryptoPool.Shutdown()
node.log.Debug("crypto worker pools have stopped")
node.transactionPool.Shutdown()
node.cancelCtx()
node.ledger.Close()
}

// note: unlike the other two functions, this accepts a whole filename
Expand Down

0 comments on commit c319378

Please sign in to comment.