Skip to content
This repository has been archived by the owner on Jun 9, 2024. It is now read-only.

test(mempool): Logging + potential error fixes #1470

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion cosmos/runtime/ante/ante.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,15 @@ func (ah *Provider) AnteHandler() func(
return func(ctx sdk.Context, tx sdk.Tx, simulate bool) (sdk.Context, error) {
// If the transaction contains a single EVM transaction, use the EVM ante handler
if len(tx.GetMsgs()) == 1 {
if _, ok := tx.GetMsgs()[0].(*evmtypes.WrappedEthereumTransaction); ok {
if ethTx, ok := tx.GetMsgs()[0].(*evmtypes.WrappedEthereumTransaction); ok {
ctx.Logger().Info("running evm ante handler for eth tx", "hash", ethTx.Unwrap().Hash())
return ah.evmAnteHandler(ctx, tx, simulate)
} else if _, ok = tx.GetMsgs()[0].(*evmtypes.WrappedPayloadEnvelope); ok {
if ctx.ExecMode() != sdk.ExecModeCheck {
ctx.Logger().Info("running evm ante handler for payload tx")
return ctx, nil
}
ctx.Logger().Error("running evm ante handler for payload tx in check tx")
return ctx, errors.New("payload envelope is not supported in CheckTx")
}
}
Expand Down
7 changes: 5 additions & 2 deletions cosmos/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,11 @@ func (p *Polaris) SetupServices(clientCtx client.Context) error {
clientCtx.TxConfig, evmtypes.WrapPayload))

// Initialize the txpool with a new transaction serializer.
p.WrappedTxPool.Init(p.logger, clientCtx, libtx.NewSerializer[*ethtypes.Transaction](
clientCtx.TxConfig, evmtypes.WrapTx))
p.WrappedTxPool.Init(
p.logger, clientCtx,
libtx.NewSerializer[*ethtypes.Transaction](clientCtx.TxConfig, evmtypes.WrapTx),
clientCtx.Client,
)

// Register services with Polaris.
p.RegisterLifecycles([]node.Lifecycle{
Expand Down
27 changes: 20 additions & 7 deletions cosmos/runtime/txpool/ante.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,45 +44,56 @@ func (m *Mempool) AnteHandle(
telemetry.IncrCounter(float32(1), MetricKeyCometPoolTxs)
msgs := tx.GetMsgs()

ctx.Logger().Info("AnteHandle Polaris Mempool", "msgs", len(msgs), "simulate", simulate)

// TODO: Record the time it takes to build a payload.

// We only want to eject transactions from comet on recheck.
if ctx.ExecMode() == sdk.ExecModeCheck || ctx.ExecMode() == sdk.ExecModeReCheck {
ctx.Logger().Info("AnteHandle in Check/Recheck tx")
if wet, ok := utils.GetAs[*types.WrappedEthereumTransaction](msgs[0]); ok {
ethTx := wet.Unwrap()
ctx.Logger().Info("AnteHandle for eth tx", "tx", ethTx.Hash(), "mode", ctx.ExecMode())
if shouldEject := m.shouldEjectFromCometMempool(
ctx.BlockTime().Unix(), ethTx,
ctx, ethTx,
); shouldEject {
ctx.Logger().Info("AnteHandle dropping tx from comet mempool", "tx", ethTx.Hash())
m.crc.DropRemoteTx(ethTx.Hash())
telemetry.IncrCounter(float32(1), MetricKeyAnteEjectedTxs)
return ctx, errors.New("eject from comet mempool")
}
ctx.Logger().Info("AnteHandle NOT dropping comet mempool", "tx", ethTx.Hash())
}
}
return next(ctx, tx, simulate)
}

// shouldEject returns true if the transaction should be ejected from the CometBFT mempool.
func (m *Mempool) shouldEjectFromCometMempool(
currentTime int64, tx *ethtypes.Transaction,
ctx sdk.Context, tx *ethtypes.Transaction,
) bool {
defer telemetry.MeasureSince(time.Now(), MetricKeyTimeShouldEject)
if tx == nil {
ctx.Logger().Info("shouldEjectFromCometMempool: tx is nil")
return false
}

// First check things that are stateless.
if m.validateStateless(tx, currentTime) {
if m.validateStateless(ctx, tx) {
ctx.Logger().Info("shouldEjectFromCometMempool: stateless failed", "tx", tx.Hash())
return true
}

// Then check for things that are stateful.
return m.validateStateful(tx)
return m.validateStateful(ctx, tx)
}

// validateStateless returns whether the tx of the given hash is stateless.
func (m *Mempool) validateStateless(tx *ethtypes.Transaction, currentTime int64) bool {
func (m *Mempool) validateStateless(ctx sdk.Context, tx *ethtypes.Transaction) bool {
txHash := tx.Hash()
currentTime := ctx.BlockTime().Unix()
ctx.Logger().Info("validateStateless", "txHash", txHash, "currentTime", currentTime)

// 1. If the transaction has been in the mempool for longer than the configured timeout.
// 2. If the transaction's gas params are less than or equal to the configured limit.
expired := currentTime-m.crc.TimeFirstSeen(txHash) > m.lifetime
Expand All @@ -95,20 +106,22 @@ func (m *Mempool) validateStateless(tx *ethtypes.Transaction, currentTime int64)
telemetry.IncrCounter(float32(1), MetricKeyAnteShouldEjectPriceLimit)
}

ctx.Logger().Info("validateStateless", "expired", expired, "priceLeLimit", priceLeLimit)

return expired || priceLeLimit
}

// includedCanonicalChain returns whether the tx of the given hash is included in the canonical
// Eth chain.
func (m *Mempool) validateStateful(tx *ethtypes.Transaction) bool {
func (m *Mempool) validateStateful(ctx sdk.Context, tx *ethtypes.Transaction) bool {
// // 1. If the transaction has been included in a block.
// signer := ethtypes.LatestSignerForChainID(m.chainConfig.ChainID)
// if _, err := ethtypes.Sender(signer, tx); err != nil {
// return true
// }

// tx.Nonce() <
included := m.chain.GetTransactionLookup(tx.Hash()) != nil
telemetry.IncrCounter(float32(1), MetricKeyAnteShouldEjectInclusion)
ctx.Logger().Info("validateStateful", "included", included)
return included
}
3 changes: 2 additions & 1 deletion cosmos/runtime/txpool/comet.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,10 @@ func (crc *cometRemoteCache) IsRemoteTx(txHash common.Hash) bool {
return ok
}

// Record the time the tx was inserted from Comet successfully.
// Record the first time the tx was inserted from Comet successfully.
func (crc *cometRemoteCache) MarkRemoteSeen(txHash common.Hash) {
crc.timeInsertedMu.Lock()
// TODO: only insert a new timestamp if not already seen.
crc.timeInserted[txHash] = time.Now().Unix()
crc.timeInsertedMu.Unlock()
}
Expand Down
33 changes: 27 additions & 6 deletions cosmos/runtime/txpool/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@
package txpool

import (
"context"
"errors"
"sync/atomic"
"time"

coretypes "github.com/cometbft/cometbft/rpc/core/types"

"cosmossdk.io/log"

"github.com/cosmos/cosmos-sdk/telemetry"
Expand All @@ -40,7 +43,7 @@
// size of tx pool.
const (
txChanSize = 4096
maxRetries = 5
maxRetries = 0
retryDelay = 50 * time.Millisecond
statPeriod = 60 * time.Second
)
Expand All @@ -67,6 +70,16 @@
BroadcastTxSync(txBytes []byte) (res *sdk.TxResponse, err error)
}

type TxSearcher interface {
TxSearch(
ctx context.Context,
query string,
prove bool,
page, perPage *int,
orderBy string,
) (*coretypes.ResultTxSearch, error)
}

// Subscription represents a subscription to the txpool.
type Subscription interface {
event.Subscription
Expand All @@ -85,6 +98,7 @@
logger log.Logger
clientCtx TxBroadcaster
serializer TxSerializer
searcher TxSearcher
crc CometRemoteCache

// Ethereum
Expand All @@ -100,13 +114,14 @@

// newHandler creates a new handler.
func newHandler(
clientCtx TxBroadcaster, txPool TxSubProvider, serializer TxSerializer,
clientCtx TxBroadcaster, txSearcher TxSearcher, txPool TxSubProvider, serializer TxSerializer,
crc CometRemoteCache, logger log.Logger,
) *handler {
h := &handler{
logger: logger,
clientCtx: clientCtx,
serializer: serializer,
searcher: txSearcher,
crc: crc,
txPool: txPool,
txsCh: make(chan core.NewTxsEvent, txChanSize),
Expand All @@ -122,7 +137,7 @@
return errors.New("handler already started")
}
go h.mainLoop()
go h.failedLoop() // Start the retry policy
// go h.failedLoop() // Start the retry policy
go h.statLoop()
return nil
}
Expand Down Expand Up @@ -162,7 +177,7 @@
}

// failedLoop will periodically attempt to re-broadcast failed transactions.
func (h *handler) failedLoop() {

Check failure on line 180 in cosmos/runtime/txpool/handler.go

View workflow job for this annotation

GitHub Actions / ci (lint, polaris-linux-latest, 1.21.6)

func `(*handler).failedLoop` is unused (unused)
for {
select {
case <-h.stopCh:
Expand All @@ -172,6 +187,7 @@
h.logger.Error("failed to broadcast transaction after max retries", "tx", maxRetries)
continue
}
h.logger.Info("retrying failed tx", "tx", failed.tx.Hash(), "retries", failed.retries)
telemetry.IncrCounter(float32(1), MetricKeyBroadcastRetry)
h.broadcastTransaction(failed.tx, failed.retries-1)
}
Expand Down Expand Up @@ -225,11 +241,12 @@
numBroadcasted := 0
for _, signedEthTx := range txs {
if !h.crc.IsRemoteTx(signedEthTx.Hash()) {
h.logger.Info("broadcasting local eth tx", "hash", signedEthTx.Hash().Hex())
h.broadcastTransaction(signedEthTx, maxRetries)
numBroadcasted++
}
}
h.logger.Debug(
h.logger.Info(
"broadcasting transactions", "num_received", len(txs), "num_broadcasted", numBroadcasted,
)
}
Expand All @@ -242,6 +259,8 @@
return
}

h.logger.Info("broadcasting to comet", "ethTx", tx.Hash(), "sdkTx", txBytes)

// Send the transaction to the CometBFT mempool, which will gossip it to peers via
// CometBFT's p2p layer.
rsp, err := h.clientCtx.BroadcastTxSync(txBytes)
Expand All @@ -254,21 +273,23 @@
// If rsp == 1, likely the txn is already in a block, and thus the broadcast failing is actually
// the desired behaviour.
if rsp == nil || rsp.Code == 0 || rsp.Code == 1 {
h.logger.Info("broadcasting to comet", "hash", tx.Hash(), "rsp", rsp, "code", rsp.Code)
return
}

switch rsp.Code {
case sdkerrors.ErrMempoolIsFull.ABCICode():
h.logger.Error("failed to broadcast: comet-bft mempool is full", "tx_hash", tx.Hash())
telemetry.IncrCounter(float32(1), MetricKeyMempoolFull)
case
sdkerrors.ErrTxInMempoolCache.ABCICode():
case sdkerrors.ErrTxInMempoolCache.ABCICode():
return
default:
h.logger.Error("failed to broadcast transaction",
"codespace", rsp.Codespace, "code", rsp.Code, "info", rsp.Info, "tx_hash", tx.Hash())
telemetry.IncrCounter(float32(1), MetricKeyBroadcastFailure)
}

h.logger.Info("failed to broadcast transaction", "tx_hash", tx.Hash(), "retries", retries)

h.failedTxs <- &failedTx{tx: tx, retries: retries}
}
2 changes: 1 addition & 1 deletion cosmos/runtime/txpool/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ var _ = Describe("", func() {
subprovider = mocks.NewTxSubProvider(t)
subprovider.On("SubscribeTransactions", mock.Anything, mock.Anything).Return(subscription)
serializer = mocks.NewTxSerializer(t)
h = newHandler(broadcaster, subprovider, serializer, newCometRemoteCache(), log.NewTestLogger(t))
h = newHandler(broadcaster, nil, subprovider, serializer, newCometRemoteCache(), log.NewTestLogger(t))
err := h.Start()
Expect(err).NotTo(HaveOccurred())
for !h.Running() {
Expand Down
12 changes: 11 additions & 1 deletion cosmos/runtime/txpool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"errors"
"math/big"
"sync"
"time"

"cosmossdk.io/log"

Expand Down Expand Up @@ -92,8 +93,9 @@ func (m *Mempool) Init(
logger log.Logger,
txBroadcaster TxBroadcaster,
txSerializer TxSerializer,
txSearcher TxSearcher,
) {
m.handler = newHandler(txBroadcaster, m.TxPool, txSerializer, m.crc, logger)
m.handler = newHandler(txBroadcaster, txSearcher, m.TxPool, txSerializer, m.crc, logger)
}

// Start starts the Mempool TxHandler.
Expand All @@ -111,12 +113,14 @@ func (m *Mempool) Insert(ctx context.Context, sdkTx sdk.Tx) error {
sCtx := sdk.UnwrapSDKContext(ctx)
msgs := sdkTx.GetMsgs()
if len(msgs) != 1 {
sCtx.Logger().Error("mempool insert: only one message is supported")
return errors.New("only one message is supported")
}

wet, ok := utils.GetAs[*types.WrappedEthereumTransaction](msgs[0])
if !ok {
// We have to return nil for non-ethereum transactions as to not fail check-tx.
sCtx.Logger().Info("mempool insert: not an ethereum transaction")
return nil
}

Expand All @@ -130,14 +134,20 @@ func (m *Mempool) Insert(ctx context.Context, sdkTx sdk.Tx) error {
if len(errs) > 0 {
// Handle case where a node broadcasts to itself, we don't want it to fail CheckTx.
if errors.Is(errs[0], ethtxpool.ErrAlreadyKnown) &&
// TODO: checking for CheckTx/ReCheck here is not necessary (only ever called in CheckTx)
(sCtx.ExecMode() == sdk.ExecModeCheck || sCtx.ExecMode() == sdk.ExecModeReCheck) {
telemetry.IncrCounter(float32(1), MetricKeyMempoolKnownTxs)
sCtx.Logger().Info("mempool insert: tx already in mempool", "mode", sCtx.ExecMode())
Comment on lines +137 to +140
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment regarding the unnecessary check for CheckTx/ReCheck mode during insertion into the mempool suggests a potential area for code cleanup or optimization. Consider removing this check if it is indeed redundant, to simplify the logic.

- // TODO: checking for CheckTx/ReCheck here is not necessary (only ever called in CheckTx)

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
// TODO: checking for CheckTx/ReCheck here is not necessary (only ever called in CheckTx)
(sCtx.ExecMode() == sdk.ExecModeCheck || sCtx.ExecMode() == sdk.ExecModeReCheck) {
telemetry.IncrCounter(float32(1), MetricKeyMempoolKnownTxs)
sCtx.Logger().Info("mempool insert: tx already in mempool", "mode", sCtx.ExecMode())
(sCtx.ExecMode() == sdk.ExecModeCheck || sCtx.ExecMode() == sdk.ExecModeReCheck) {
telemetry.IncrCounter(float32(1), MetricKeyMempoolKnownTxs)
sCtx.Logger().Info("mempool insert: tx already in mempool", "mode", sCtx.ExecMode())

return nil
}
return errs[0]
}

// Add the eth tx to the remote cache.
sCtx.Logger().Info(
"mempool insert: marking remote seen", "tx", ethTx.Hash(), "time", time.Now().Unix(),
"is(already)RemoteTx", m.crc.IsRemoteTx(ethTx.Hash()),
)
m.crc.MarkRemoteSeen(ethTx.Hash())

return nil
Expand Down
Loading