Skip to content

Commit

Permalink
feat: more precise metrics (#1159)
Browse files Browse the repository at this point in the history
## Description

same as #1158 but targeted at main
  • Loading branch information
evan-forbes authored Jan 5, 2024
1 parent e8f6401 commit 570aa7e
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 16 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ format:

lint:
@echo "--> Running linter"
@go run github.com/golangci/golangci-lint/cmd/golangci-lint run
@go run github.com/golangci/golangci-lint/cmd/golangci-lint@v1.55.2 run
.PHONY: lint

vulncheck:
Expand Down
16 changes: 14 additions & 2 deletions mempool/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,17 @@ const (
// MetricsSubsystem is a subsystem shared by all metrics exposed by this
// package.
MetricsSubsystem = "mempool"

TypeLabel = "type"

FailedPrecheck = "precheck"
FailedAdding = "adding"
FailedRecheck = "recheck"

EvictedNewTxFullMempool = "full-removed-incoming"
EvictedExistingTxFullMempool = "full-removed-existing"
EvictedTxExpiredBlocks = "expired-ttl-blocks"
EvictedTxExpiredTime = "expired-ttl-time"
)

// Metrics contains metrics exposed by this package.
Expand Down Expand Up @@ -61,6 +72,7 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics {
for i := 0; i < len(labelsAndValues); i += 2 {
labels = append(labels, labelsAndValues[i])
}
typedCounterLabels := append(append(make([]string, 0, len(labels)+1), labels...), TypeLabel)

This comment has been minimized.

Copy link
@cmwaters

cmwaters Jul 24, 2024

Contributor

What is this for? I just saw this cause a panic in prometheus

This comment has been minimized.

Copy link
@evan-forbes

evan-forbes Jul 24, 2024

Author Member

to move a sync discussion here:

that was for the inscription event iirc, we were trying to figure out more specific things on why txs were getting evicted

return &Metrics{
Size: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Namespace: namespace,
Expand All @@ -82,14 +94,14 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics {
Subsystem: MetricsSubsystem,
Name: "failed_txs",
Help: "Number of failed transactions.",
}, labels).With(labelsAndValues...),
}, typedCounterLabels).With(labelsAndValues...),

EvictedTxs: prometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "evicted_txs",
Help: "Number of evicted transactions.",
}, labels).With(labelsAndValues...),
}, typedCounterLabels).With(labelsAndValues...),

SuccessfulTxs: prometheus.NewCounterFrom(stdprometheus.CounterOpts{
Namespace: namespace,
Expand Down
38 changes: 31 additions & 7 deletions mempool/v1/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
"github.com/cometbft/cometbft/libs/clist"
"github.com/cometbft/cometbft/libs/log"
"github.com/cometbft/cometbft/mempool"
"github.com/cometbft/cometbft/pkg/trace"
"github.com/cometbft/cometbft/pkg/trace/schema"
"github.com/cometbft/cometbft/proxy"
"github.com/cometbft/cometbft/types"
)
Expand Down Expand Up @@ -56,6 +58,8 @@ type TxMempool struct {
txs *clist.CList // valid transactions (passed CheckTx)
txByKey map[types.TxKey]*clist.CElement
txBySender map[string]*clist.CElement // for sender != ""

traceClient *trace.Client
}

// NewTxMempool constructs a new, empty priority mempool at the specified
Expand All @@ -79,6 +83,7 @@ func NewTxMempool(
height: height,
txByKey: make(map[types.TxKey]*clist.CElement),
txBySender: make(map[string]*clist.CElement),
traceClient: &trace.Client{},
}
if cfg.CacheSize > 0 {
txmp.cache = mempool.NewLRUTxCache(cfg.CacheSize)
Expand Down Expand Up @@ -110,6 +115,12 @@ func WithMetrics(metrics *mempool.Metrics) TxMempoolOption {
return func(txmp *TxMempool) { txmp.metrics = metrics }
}

func WithTraceClient(tc *trace.Client) TxMempoolOption {
return func(txmp *TxMempool) {
txmp.traceClient = tc
}
}

// Lock obtains a write-lock on the mempool. A caller must be sure to explicitly
// release the lock when finished.
func (txmp *TxMempool) Lock() { txmp.mtx.Lock() }
Expand Down Expand Up @@ -192,7 +203,8 @@ func (txmp *TxMempool) CheckTx(tx types.Tx, cb func(*abci.Response), txInfo memp
// If a precheck hook is defined, call it before invoking the application.
if txmp.preCheck != nil {
if err := txmp.preCheck(tx); err != nil {
txmp.metrics.FailedTxs.Add(1)
txmp.metrics.FailedTxs.With(mempool.TypeLabel, mempool.FailedPrecheck).Add(1)
schema.WriteMempoolRejected(txmp.traceClient, err.Error())
return 0, mempool.ErrPreCheck{Reason: err}
}
}
Expand Down Expand Up @@ -469,7 +481,16 @@ func (txmp *TxMempool) addNewTransaction(wtx *WrappedTx, checkTxRes *abci.Respon
"post_check_err", err,
)

txmp.metrics.FailedTxs.Add(1)
txmp.metrics.FailedTxs.With(mempool.TypeLabel, mempool.FailedAdding).Add(1)
reason := fmt.Sprintf(
"code: %d codespace: %s logs: %s local: %v postCheck error: %v",
checkTxRes.Code,
checkTxRes.Codespace,
checkTxRes.Log,
wtx.HasPeer(0), // this checks if the peer id is local
err,
)
schema.WriteMempoolRejected(txmp.traceClient, reason)

// Remove the invalid transaction from the cache, unless the operator has
// instructed us to keep invalid transactions.
Expand Down Expand Up @@ -537,7 +558,7 @@ func (txmp *TxMempool) addNewTransaction(wtx *WrappedTx, checkTxRes *abci.Respon
checkTxRes.MempoolError =
fmt.Sprintf("rejected valid incoming transaction; mempool is full (%X)",
wtx.tx.Hash())
txmp.metrics.EvictedTxs.Add(1)
txmp.metrics.EvictedTxs.With(mempool.TypeLabel, mempool.EvictedNewTxFullMempool).Add(1)
return
}

Expand Down Expand Up @@ -569,7 +590,7 @@ func (txmp *TxMempool) addNewTransaction(wtx *WrappedTx, checkTxRes *abci.Respon
)
txmp.removeTxByElement(vic)
txmp.cache.Remove(w.tx)
txmp.metrics.EvictedTxs.Add(1)
txmp.metrics.EvictedTxs.With(mempool.TypeLabel, mempool.EvictedExistingTxFullMempool).Add(1)

// We may not need to evict all the eligible transactions. Bail out
// early if we have made enough room.
Expand Down Expand Up @@ -646,9 +667,12 @@ func (txmp *TxMempool) handleRecheckResult(tx types.Tx, checkTxRes *abci.Respons
"code", checkTxRes.Code,
)
txmp.removeTxByElement(elt)
txmp.metrics.FailedTxs.Add(1)
txmp.metrics.FailedTxs.With(mempool.TypeLabel, mempool.FailedRecheck).Add(1)
if !txmp.config.KeepInvalidTxsInCache {
txmp.cache.Remove(wtx.tx)
if err != nil {
schema.WriteMempoolRejected(txmp.traceClient, err.Error())
}
}
txmp.metrics.Size.Set(float64(txmp.Size()))
}
Expand Down Expand Up @@ -758,11 +782,11 @@ func (txmp *TxMempool) purgeExpiredTxs(blockHeight int64) {
if txmp.config.TTLNumBlocks > 0 && (blockHeight-w.height) > txmp.config.TTLNumBlocks {
txmp.removeTxByElement(cur)
txmp.cache.Remove(w.tx)
txmp.metrics.EvictedTxs.Add(1)
txmp.metrics.EvictedTxs.With(mempool.TypeLabel, mempool.EvictedTxExpiredBlocks).Add(1)
} else if txmp.config.TTLDuration > 0 && now.Sub(w.timestamp) > txmp.config.TTLDuration {
txmp.removeTxByElement(cur)
txmp.cache.Remove(w.tx)
txmp.metrics.EvictedTxs.Add(1)
txmp.metrics.EvictedTxs.With(mempool.TypeLabel, mempool.EvictedTxExpiredTime).Add(1)
}
cur = next
}
Expand Down
1 change: 1 addition & 0 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,7 @@ func createMempoolAndMempoolReactor(
mempoolv1.WithMetrics(memplMetrics),
mempoolv1.WithPreCheck(sm.TxPreCheck(state)),
mempoolv1.WithPostCheck(sm.TxPostCheck(state)),
mempoolv1.WithTraceClient(traceClient),
)

reactor := mempoolv1.NewReactor(
Expand Down
23 changes: 17 additions & 6 deletions pkg/trace/schema/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ func MempoolTables() []string {
return []string{
MempoolTxTable,
MempoolPeerStateTable,
MempoolRejectedTable,
}
}

Expand Down Expand Up @@ -103,10 +104,10 @@ const (
func WriteMempoolPeerState(client *trace.Client, peer p2p.ID, stateUpdate, transferType, version string) {
// this check is redundant to what is checked during WritePoint, although it
// is an optimization to avoid allocations from creating the map of fields.
if !client.IsCollecting(RoundStateTable) {
if !client.IsCollecting(MempoolPeerStateTable) {
return
}
client.WritePoint(RoundStateTable, map[string]interface{}{
client.WritePoint(MempoolPeerStateTable, map[string]interface{}{
PeerFieldKey: peer,
TransferTypeFieldKey: transferType,
StateUpdateFieldKey: stateUpdate,
Expand All @@ -115,8 +116,18 @@ func WriteMempoolPeerState(client *trace.Client, peer p2p.ID, stateUpdate, trans
}

const (
// LocalTable is the tracing "measurement" (aka table) for the local mempool
// updates, such as when a tx is added or removed.
// TODO: actually implement the local mempool tracing
// LocalTable = "mempool_local"
MempoolRejectedTable = "mempool_rejected"
ReasonFieldKey = "reason"
)

// WriteMempoolRejected records why a transaction was rejected.
func WriteMempoolRejected(client *trace.Client, reason string) {
// this check is redundant to what is checked during WritePoint, although it
// is an optimization to avoid allocations from creating the map of fields.
if !client.IsCollecting(MempoolRejectedTable) {
return
}
client.WritePoint(MempoolRejectedTable, map[string]interface{}{
ReasonFieldKey: reason,
})
}

0 comments on commit 570aa7e

Please sign in to comment.