Skip to content

Commit

Permalink
fix: txpool metrics and unit test (#59)
Browse files Browse the repository at this point in the history
Co-authored-by: andyzhang2023 <[email protected]>
Co-authored-by: bnoieh <[email protected]>
  • Loading branch information
3 people authored Feb 2, 2024
1 parent 2a7bb9c commit 3910df6
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 28 deletions.
14 changes: 2 additions & 12 deletions core/txpool/invalid.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
)

const (
AlreadyKnown = "AlreadyKnown"
TypeNotSupportDeposit = "TypeNotSupportDeposit"
TypeNotSupport1559 = "TypeNotSupport1559"
TypeNotSupport2718 = "TypeNotSupport2718"
Expand All @@ -23,22 +22,17 @@ const (
InsufficientFunds = "InsufficientFunds"
Overdraft = "Overdraft"
IntrinsicGas = "IntrinsicGas"
Throttle = "Throttle"
Overflow = "Overflow"
FutureReplacePending = "FutureReplacePending"
ReplaceUnderpriced = "ReplaceUnderpriced"
QueuedDiscard = "QueueDiscard"
GasUnitOverflow = "GasUnitOverflow"
)

func meter(err string) metrics.Meter {
func Meter(err string) metrics.Meter {
return metrics.GetOrRegisterMeter("txpool/invalid/"+err, nil)
}

func init() {
// init the metrics
for _, err := range []string{
AlreadyKnown,
TypeNotSupportDeposit,
TypeNotSupport1559,
TypeNotSupport2718,
Expand All @@ -56,13 +50,9 @@ func init() {
InsufficientFunds,
Overdraft,
IntrinsicGas,
Throttle,
Overflow,
FutureReplacePending,
ReplaceUnderpriced,
QueuedDiscard,
GasUnitOverflow,
} {
meter(err).Mark(0)
Meter(err).Mark(0)
}
}
2 changes: 2 additions & 0 deletions core/txpool/legacypool/legacypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -829,6 +829,7 @@ func (pool *LegacyPool) add(tx *types.Transaction, local bool) (replaced bool, e
pool.priced.Put(dropTx, false)
}
log.Trace("Discarding future transaction replacing pending tx", "hash", hash)
txpool.Meter(txpool.FutureReplacePending).Mark(1)
return false, txpool.ErrFutureReplacePending
}
}
Expand Down Expand Up @@ -940,6 +941,7 @@ func (pool *LegacyPool) enqueueTx(hash common.Hash, tx *types.Transaction, local
// If the transaction isn't in lookup set but it's expected to be there,
// show the error log.
if pool.all.Get(hash) == nil && !addAll {
txpool.Meter(txpool.MissingTransaction).Mark(1)
log.Error("Missing transaction in lookup set, please report the issue", "hash", hash)
}
if addAll {
Expand Down
28 changes: 16 additions & 12 deletions core/txpool/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func ValidateTransaction(tx *types.Transaction, head *types.Header, signer types
// This is for spam protection, not consensus,
// as the external engine-API user authenticates deposits.
if tx.Type() == types.DepositTxType {
meter(TypeNotSupportDeposit).Mark(1)
Meter(TypeNotSupportDeposit).Mark(1)
return core.ErrTxTypeNotSupported
}
// Ensure transactions not implemented by the calling pool are rejected
Expand All @@ -81,25 +81,25 @@ func ValidateTransaction(tx *types.Transaction, head *types.Header, signer types
}
// Ensure only transactions that have been enabled are accepted
if !opts.Config.IsBerlin(head.Number) && tx.Type() != types.LegacyTxType {
meter(TypeNotSupport2718).Mark(1)
Meter(TypeNotSupport2718).Mark(1)
return fmt.Errorf("%w: type %d rejected, pool not yet in Berlin", core.ErrTxTypeNotSupported, tx.Type())
}
if !opts.Config.IsLondon(head.Number) && tx.Type() == types.DynamicFeeTxType {
meter(TypeNotSupport1559).Mark(1)
Meter(TypeNotSupport1559).Mark(1)
return fmt.Errorf("%w: type %d rejected, pool not yet in London", core.ErrTxTypeNotSupported, tx.Type())
}
if !opts.Config.IsCancun(head.Number, head.Time) && tx.Type() == types.BlobTxType {
return fmt.Errorf("%w: type %d rejected, pool not yet in Cancun", core.ErrTxTypeNotSupported, tx.Type())
}
// Check whether the init code size has been exceeded
if opts.Config.IsShanghai(head.Number, head.Time) && tx.To() == nil && len(tx.Data()) > params.MaxInitCodeSize {
meter(MaxInitCodeSizeExceeded).Mark(1)
Meter(MaxInitCodeSizeExceeded).Mark(1)
return fmt.Errorf("%w: code size %v, limit %v", core.ErrMaxInitCodeSizeExceeded, len(tx.Data()), params.MaxInitCodeSize)
}
// Transactions can't be negative. This may never happen using RLP decoded
// transactions but may occur for transactions created using the RPC.
if tx.Value().Sign() < 0 {
meter(NegativeValue).Mark(1)
Meter(NegativeValue).Mark(1)
return ErrNegativeValue
}
// Ensure the transaction doesn't exceed the current block limit gas
Expand All @@ -108,36 +108,38 @@ func ValidateTransaction(tx *types.Transaction, head *types.Header, signer types
}
// Sanity check for extremely large numbers (supported by RLP or RPC)
if tx.GasFeeCap().BitLen() > 256 {
meter(FeeCapVeryHigh).Mark(1)
Meter(FeeCapVeryHigh).Mark(1)
return core.ErrFeeCapVeryHigh
}
if tx.GasTipCap().BitLen() > 256 {
meter(TipVeryHigh).Mark(1)
Meter(TipVeryHigh).Mark(1)
return core.ErrTipVeryHigh
}
// Ensure gasFeeCap is greater than or equal to gasTipCap
if tx.GasFeeCapIntCmp(tx.GasTipCap()) < 0 {
meter(TipAboveFeeCap).Mark(1)
Meter(TipAboveFeeCap).Mark(1)
return core.ErrTipAboveFeeCap
}
// Make sure the transaction is signed properly
if _, err := types.Sender(signer, tx); err != nil {
meter(InvalidSender).Mark(1)
Meter(InvalidSender).Mark(1)
return ErrInvalidSender
}
// Ensure the transaction has more gas than the bare minimum needed to cover
// the transaction metadata
intrGas, err := core.IntrinsicGas(tx.Data(), tx.AccessList(), tx.To() == nil, true, opts.Config.IsIstanbul(head.Number), opts.Config.IsShanghai(head.Number, head.Time))
if err != nil {
Meter(GasUnitOverflow).Mark(1)
return err
}
if tx.Gas() < intrGas {
Meter(IntrinsicGas).Mark(1)
return fmt.Errorf("%w: needed %v, allowed %v", core.ErrIntrinsicGas, intrGas, tx.Gas())
}
// Ensure the gasprice is high enough to cover the requirement of the calling
// pool and/or block producer
if tx.GasTipCapIntCmp(opts.MinTip) < 0 {
meter(Underpriced).Mark(1)
Meter(Underpriced).Mark(1)
return fmt.Errorf("%w: tip needed %v, tip permitted %v", ErrUnderpriced, opts.MinTip, tx.GasTipCap())
}
// Ensure blob transactions have valid commitments
Expand Down Expand Up @@ -240,7 +242,7 @@ func ValidateTransactionWithState(tx *types.Transaction, signer types.Signer, op
}
next := opts.State.GetNonce(from)
if next > tx.Nonce() {
meter(NonceTooLow).Mark(1)
Meter(NonceTooLow).Mark(1)
return fmt.Errorf("%w: next nonce %v, tx nonce %v", core.ErrNonceTooLow, next, tx.Nonce())
}
// Ensure the transaction doesn't produce a nonce gap in pools that do not
Expand All @@ -261,7 +263,7 @@ func ValidateTransactionWithState(tx *types.Transaction, signer types.Signer, op
}
}
if balance.Cmp(cost) < 0 {
meter(InsufficientFunds).Mark(1)
Meter(InsufficientFunds).Mark(1)
return fmt.Errorf("%w: balance %v, tx cost %v, overshot %v", core.ErrInsufficientFunds, balance, cost, new(big.Int).Sub(cost, balance))
}
// Ensure the transactor has enough funds to cover for replacements or nonce
Expand All @@ -271,11 +273,13 @@ func ValidateTransactionWithState(tx *types.Transaction, signer types.Signer, op
bump := new(big.Int).Sub(cost, prev)
need := new(big.Int).Add(spent, bump)
if balance.Cmp(need) < 0 {
Meter(Overdraft).Mark(1)
return fmt.Errorf("%w: balance %v, queued cost %v, tx bumped %v, overshot %v", core.ErrInsufficientFunds, balance, spent, bump, new(big.Int).Sub(need, balance))
}
} else {
need := new(big.Int).Add(spent, cost)
if balance.Cmp(need) < 0 {
Meter(Overdraft).Mark(1)
return fmt.Errorf("%w: balance %v, queued cost %v, tx cost %v, overshot %v", core.ErrInsufficientFunds, balance, spent, cost, new(big.Int).Sub(need, balance))
}
// Transaction takes a new nonce value out of the pool. Ensure it doesn't
Expand Down
8 changes: 4 additions & 4 deletions eth/handler_eth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,14 +461,14 @@ func TestTransactionPendingReannounce(t *testing.T) {

sink := newTestHandler()
defer sink.close()
sink.handler.acceptTxs = 1 // mark synced to accept transactions
sink.handler.synced.Store(true) // mark synced to accept transactions

sourcePipe, sinkPipe := p2p.MsgPipe()
defer sourcePipe.Close()
defer sinkPipe.Close()

sourcePeer := eth.NewPeer(eth.ETH66, p2p.NewPeer(enode.ID{0}, "", nil), sourcePipe, source.txpool)
sinkPeer := eth.NewPeer(eth.ETH66, p2p.NewPeer(enode.ID{0}, "", nil), sinkPipe, sink.txpool)
sourcePeer := eth.NewPeer(eth.ETH68, p2p.NewPeer(enode.ID{0}, "", nil), sourcePipe, source.txpool)
sinkPeer := eth.NewPeer(eth.ETH68, p2p.NewPeer(enode.ID{0}, "", nil), sinkPipe, sink.txpool)
defer sourcePeer.Close()
defer sinkPeer.Close()

Expand All @@ -481,7 +481,7 @@ func TestTransactionPendingReannounce(t *testing.T) {

// Subscribe transaction pools
txCh := make(chan core.NewTxsEvent, 1024)
sub := sink.txpool.SubscribeNewTxsEvent(txCh)
sub := sink.txpool.SubscribeTransactions(txCh, false)
defer sub.Unsubscribe()

txs := make([]*types.Transaction, 64)
Expand Down

0 comments on commit 3910df6

Please sign in to comment.