Skip to content
This repository has been archived by the owner on Jun 9, 2024. It is now read-only.

test(mempool): Logging + potential error fixes #1470

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion cosmos/runtime/ante/ante.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,15 @@ func (ah *Provider) AnteHandler() func(
return func(ctx sdk.Context, tx sdk.Tx, simulate bool) (sdk.Context, error) {
// If the transaction contains a single EVM transaction, use the EVM ante handler
if len(tx.GetMsgs()) == 1 {
if _, ok := tx.GetMsgs()[0].(*evmtypes.WrappedEthereumTransaction); ok {
if ethTx, ok := tx.GetMsgs()[0].(*evmtypes.WrappedEthereumTransaction); ok {
ctx.Logger().Info("running evm ante handler for eth tx", "hash", ethTx.Unwrap().Hash())
return ah.evmAnteHandler(ctx, tx, simulate)
} else if _, ok = tx.GetMsgs()[0].(*evmtypes.WrappedPayloadEnvelope); ok {
if ctx.ExecMode() != sdk.ExecModeCheck {
ctx.Logger().Info("running evm ante handler for payload tx")
return ctx, nil
}
ctx.Logger().Error("running evm ante handler for payload tx in check tx")
return ctx, errors.New("payload envelope is not supported in CheckTx")
}
}
Expand Down
27 changes: 20 additions & 7 deletions cosmos/runtime/txpool/ante.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,45 +44,56 @@ func (m *Mempool) AnteHandle(
telemetry.IncrCounter(float32(1), MetricKeyCometPoolTxs)
msgs := tx.GetMsgs()

ctx.Logger().Info("AnteHandle Polaris Mempool", "msgs", len(msgs), "simulate", simulate)

// TODO: Record the time it takes to build a payload.

// We only want to eject transactions from comet on recheck.
if ctx.ExecMode() == sdk.ExecModeCheck || ctx.ExecMode() == sdk.ExecModeReCheck {
ctx.Logger().Info("AnteHandle in Check/Recheck tx")
if wet, ok := utils.GetAs[*types.WrappedEthereumTransaction](msgs[0]); ok {
ethTx := wet.Unwrap()
ctx.Logger().Info("AnteHandle for eth tx", "tx", ethTx.Hash())
if shouldEject := m.shouldEjectFromCometMempool(
ctx.BlockTime().Unix(), ethTx,
ctx, ethTx,
); shouldEject {
ctx.Logger().Info("AnteHandle dropping tx from comet mempool", "tx", ethTx.Hash())
m.crc.DropRemoteTx(ethTx.Hash())
telemetry.IncrCounter(float32(1), MetricKeyAnteEjectedTxs)
return ctx, errors.New("eject from comet mempool")
}
ctx.Logger().Info("AnteHandle NOT dropping comet mempool", "tx", ethTx.Hash())
}
}
return next(ctx, tx, simulate)
}

// shouldEject returns true if the transaction should be ejected from the CometBFT mempool.
func (m *Mempool) shouldEjectFromCometMempool(
currentTime int64, tx *ethtypes.Transaction,
ctx sdk.Context, tx *ethtypes.Transaction,
) bool {
defer telemetry.MeasureSince(time.Now(), MetricKeyTimeShouldEject)
if tx == nil {
ctx.Logger().Info("shouldEjectFromCometMempool: tx is nil")
return false
}

// First check things that are stateless.
if m.validateStateless(tx, currentTime) {
if m.validateStateless(ctx, tx) {
ctx.Logger().Info("shouldEjectFromCometMempool: stateless failed", "tx", tx.Hash())
return true
}

// Then check for things that are stateful.
return m.validateStateful(tx)
return m.validateStateful(ctx, tx)
}

// validateStateless returns whether the tx of the given hash is stateless.
func (m *Mempool) validateStateless(tx *ethtypes.Transaction, currentTime int64) bool {
func (m *Mempool) validateStateless(ctx sdk.Context, tx *ethtypes.Transaction) bool {
txHash := tx.Hash()
currentTime := ctx.BlockTime().Unix()
ctx.Logger().Info("validateStateless", "txHash", txHash, "currentTime", currentTime)

// 1. If the transaction has been in the mempool for longer than the configured timeout.
// 2. If the transaction's gas params are less than or equal to the configured limit.
expired := currentTime-m.crc.TimeFirstSeen(txHash) > m.lifetime
Expand All @@ -95,20 +106,22 @@ func (m *Mempool) validateStateless(tx *ethtypes.Transaction, currentTime int64)
telemetry.IncrCounter(float32(1), MetricKeyAnteShouldEjectPriceLimit)
}

ctx.Logger().Info("validateStateless", "expired", expired, "priceLeLimit", priceLeLimit)

return expired || priceLeLimit
}

// includedCanonicalChain returns whether the tx of the given hash is included in the canonical
// Eth chain.
func (m *Mempool) validateStateful(tx *ethtypes.Transaction) bool {
func (m *Mempool) validateStateful(ctx sdk.Context, tx *ethtypes.Transaction) bool {
// // 1. If the transaction has been included in a block.
// signer := ethtypes.LatestSignerForChainID(m.chainConfig.ChainID)
// if _, err := ethtypes.Sender(signer, tx); err != nil {
// return true
// }

// tx.Nonce() <
included := m.chain.GetTransactionLookup(tx.Hash()) != nil
telemetry.IncrCounter(float32(1), MetricKeyAnteShouldEjectInclusion)
ctx.Logger().Info("validateStateful", "included", included)
return included
}
3 changes: 2 additions & 1 deletion cosmos/runtime/txpool/comet.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,10 @@ func (crc *cometRemoteCache) IsRemoteTx(txHash common.Hash) bool {
return ok
}

// Record the time the tx was inserted from Comet successfully.
// Record the first time the tx was inserted from Comet successfully.
func (crc *cometRemoteCache) MarkRemoteSeen(txHash common.Hash) {
crc.timeInsertedMu.Lock()
// TODO: only insert a new timestamp if not already seen.
crc.timeInserted[txHash] = time.Now().Unix()
crc.timeInsertedMu.Unlock()
}
Expand Down
12 changes: 9 additions & 3 deletions cosmos/runtime/txpool/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ func (h *handler) failedLoop() {
h.logger.Error("failed to broadcast transaction after max retries", "tx", maxRetries)
continue
}
h.logger.Info("retrying failed tx", "tx", failed.tx.Hash(), "retries", failed.retries)
telemetry.IncrCounter(float32(1), MetricKeyBroadcastRetry)
h.broadcastTransaction(failed.tx, failed.retries-1)
}
Expand Down Expand Up @@ -225,11 +226,12 @@ func (h *handler) broadcastTransactions(txs ethtypes.Transactions) {
numBroadcasted := 0
for _, signedEthTx := range txs {
if !h.crc.IsRemoteTx(signedEthTx.Hash()) {
h.logger.Info("broadcasting local eth tx", "hash", signedEthTx.Hash().Hex())
h.broadcastTransaction(signedEthTx, maxRetries)
numBroadcasted++
}
}
h.logger.Debug(
h.logger.Info(
"broadcasting transactions", "num_received", len(txs), "num_broadcasted", numBroadcasted,
)
}
Expand All @@ -242,6 +244,8 @@ func (h *handler) broadcastTransaction(tx *ethtypes.Transaction, retries int) {
return
}

h.logger.Info("broadcasting to comet", "ethTx", tx.Hash(), "sdkTx", txBytes)

// Send the transaction to the CometBFT mempool, which will gossip it to peers via
// CometBFT's p2p layer.
rsp, err := h.clientCtx.BroadcastTxSync(txBytes)
Expand All @@ -254,21 +258,23 @@ func (h *handler) broadcastTransaction(tx *ethtypes.Transaction, retries int) {
// If rsp == 1, likely the txn is already in a block, and thus the broadcast failing is actually
// the desired behaviour.
if rsp == nil || rsp.Code == 0 || rsp.Code == 1 {
h.logger.Info("broadcasting to comet", "hash", tx.Hash(), "rsp", rsp, "code", rsp.Code)
return
}

switch rsp.Code {
case sdkerrors.ErrMempoolIsFull.ABCICode():
h.logger.Error("failed to broadcast: comet-bft mempool is full", "tx_hash", tx.Hash())
telemetry.IncrCounter(float32(1), MetricKeyMempoolFull)
case
sdkerrors.ErrTxInMempoolCache.ABCICode():
case sdkerrors.ErrTxInMempoolCache.ABCICode():
return
default:
h.logger.Error("failed to broadcast transaction",
"codespace", rsp.Codespace, "code", rsp.Code, "info", rsp.Info, "tx_hash", tx.Hash())
telemetry.IncrCounter(float32(1), MetricKeyBroadcastFailure)
}

h.logger.Info("failed to broadcast transaction", "tx_hash", tx.Hash(), "retries", retries)

h.failedTxs <- &failedTx{tx: tx, retries: retries}
}
9 changes: 9 additions & 0 deletions cosmos/runtime/txpool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"errors"
"math/big"
"sync"
"time"

"cosmossdk.io/log"

Expand Down Expand Up @@ -111,12 +112,14 @@ func (m *Mempool) Insert(ctx context.Context, sdkTx sdk.Tx) error {
sCtx := sdk.UnwrapSDKContext(ctx)
msgs := sdkTx.GetMsgs()
if len(msgs) != 1 {
sCtx.Logger().Error("mempool insert: only one message is supported")
return errors.New("only one message is supported")
}

wet, ok := utils.GetAs[*types.WrappedEthereumTransaction](msgs[0])
if !ok {
// We have to return nil for non-ethereum transactions as to not fail check-tx.
sCtx.Logger().Info("mempool insert: not an ethereum transaction")
return nil
}

Expand All @@ -130,14 +133,20 @@ func (m *Mempool) Insert(ctx context.Context, sdkTx sdk.Tx) error {
if len(errs) > 0 {
// Handle case where a node broadcasts to itself, we don't want it to fail CheckTx.
if errors.Is(errs[0], ethtxpool.ErrAlreadyKnown) &&
// TODO: checking for CheckTx/ReCheck here is not necessary (only ever called in CheckTx)
(sCtx.ExecMode() == sdk.ExecModeCheck || sCtx.ExecMode() == sdk.ExecModeReCheck) {
telemetry.IncrCounter(float32(1), MetricKeyMempoolKnownTxs)
sCtx.Logger().Info("mempool insert: tx already in mempool", "mode", sCtx.ExecMode())
Comment on lines +137 to +140
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment regarding the unnecessary check for CheckTx/ReCheck mode during insertion into the mempool suggests a potential area for code cleanup or optimization. Consider removing this check if it is indeed redundant, to simplify the logic.

- // TODO: checking for CheckTx/ReCheck here is not necessary (only ever called in CheckTx)

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
// TODO: checking for CheckTx/ReCheck here is not necessary (only ever called in CheckTx)
(sCtx.ExecMode() == sdk.ExecModeCheck || sCtx.ExecMode() == sdk.ExecModeReCheck) {
telemetry.IncrCounter(float32(1), MetricKeyMempoolKnownTxs)
sCtx.Logger().Info("mempool insert: tx already in mempool", "mode", sCtx.ExecMode())
(sCtx.ExecMode() == sdk.ExecModeCheck || sCtx.ExecMode() == sdk.ExecModeReCheck) {
telemetry.IncrCounter(float32(1), MetricKeyMempoolKnownTxs)
sCtx.Logger().Info("mempool insert: tx already in mempool", "mode", sCtx.ExecMode())

return nil
}
return errs[0]
}

// Add the eth tx to the remote cache.
sCtx.Logger().Info(
"mempool insert: marking remote seen", "tx", ethTx.Hash(), "time", time.Now().Unix(),
"is(already)RemoteTx", m.crc.IsRemoteTx(ethTx.Hash()),
)
m.crc.MarkRemoteSeen(ethTx.Hash())

return nil
Expand Down
Loading