From 29774f0c141246feabc11c3a79b6fd837353e28a Mon Sep 17 00:00:00 2001 From: David Terpay Date: Tue, 18 Jun 2024 13:52:41 -0400 Subject: [PATCH 1/5] benchmark contains --- block/base/mempool_test.go | 49 ++++++++++++++++++++++++++++++++++---- 1 file changed, 44 insertions(+), 5 deletions(-) diff --git a/block/base/mempool_test.go b/block/base/mempool_test.go index 5a8eeb3f..194cf289 100644 --- a/block/base/mempool_test.go +++ b/block/base/mempool_test.go @@ -1,4 +1,4 @@ -package base +package base_test import ( "fmt" @@ -10,6 +10,7 @@ import ( "github.com/stretchr/testify/require" signerextraction "github.com/skip-mev/block-sdk/v2/adapters/signer_extraction_adapter" + "github.com/skip-mev/block-sdk/v2/block/base" "github.com/skip-mev/block-sdk/v2/testutils" ) @@ -19,12 +20,50 @@ type txGen struct { amount sdk.Coin } +var ( + numAccounts = 10 + numTxsPerAcct = 100 +) + +func BenchmarkContains(b *testing.B) { + acct := testutils.RandomAccounts(rand.New(rand.NewSource(1)), numAccounts) + txc := testutils.CreateTestEncodingConfig().TxConfig + + mp := base.NewMempool( + base.DefaultTxPriority(), + txc.TxEncoder(), + signerextraction.NewDefaultAdapter(), + 1000, + ) + + txs := make([]sdk.Tx, numAccounts*numTxsPerAcct) + for i := 0; i < numAccounts; i++ { + for j := 0; j < numTxsPerAcct; j++ { + tx, err := testutils.CreateTx(txc, acct[i], uint64(j), 0, nil, sdk.NewCoin("stake", sdkmath.NewInt(1))) + require.NoError(b, err) + err = mp.Insert(sdk.Context{}, tx) + require.NoError(b, err) + txs[i*numTxsPerAcct+j] = tx + } + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + for _, tx := range txs { + found := mp.Contains(tx) + if !found { + b.Fatalf("tx not found in mempool") + } + } + } +} + func TestMempoolComparison(t *testing.T) { acct := testutils.RandomAccounts(rand.New(rand.NewSource(1)), 2) txc := testutils.CreateTestEncodingConfig().TxConfig ctx := testutils.CreateBaseSDKContext(t) - mp := NewMempool( - DefaultTxPriority(), + mp := base.NewMempool( + base.DefaultTxPriority(), txc.TxEncoder(), signerextraction.NewDefaultAdapter(), 1000, @@ -99,8 +138,8 @@ func TestMempoolSelect(t *testing.T) { txc := testutils.CreateTestEncodingConfig().TxConfig ctx := testutils.CreateBaseSDKContext(t) se := signerextraction.NewDefaultAdapter() - mp := NewMempool( - DefaultTxPriority(), + mp := base.NewMempool( + base.DefaultTxPriority(), txc.TxEncoder(), se, 1000, From 75db517ff15ef3df1d6434ab331933c085ffc3f9 Mon Sep 17 00:00:00 2001 From: David Terpay Date: Tue, 18 Jun 2024 15:15:08 -0400 Subject: [PATCH 2/5] use sender/nonce when caching --- block/base/mempool.go | 50 +++++++++++++++++++++++++++++++------------ 1 file changed, 36 insertions(+), 14 deletions(-) diff --git a/block/base/mempool.go b/block/base/mempool.go index 57bca8d0..4db5a9fc 100644 --- a/block/base/mempool.go +++ b/block/base/mempool.go @@ -9,7 +9,6 @@ import ( sdkmempool "github.com/cosmos/cosmos-sdk/types/mempool" signer_extraction "github.com/skip-mev/block-sdk/v2/adapters/signer_extraction_adapter" - "github.com/skip-mev/block-sdk/v2/block/utils" ) type ( @@ -38,7 +37,16 @@ type ( // txCache is a map of all transactions in the mempool. It is used // to quickly check if a transaction is already in the mempool. - txCache map[string]struct{} + txCache map[CacheIndex]struct{} + } + + // CacheIndex defines the index utilized to cache transactions that have been inserted. + CacheIndex struct { + // Sender is the sender of the transaction. + Sender string + + // Sequence is the sequence number of the transaction. + Sequence uint64 } ) @@ -55,7 +63,7 @@ func NewMempool[C comparable](txPriority TxPriority[C], txEncoder sdk.TxEncoder, extractor: extractor, txPriority: txPriority, txEncoder: txEncoder, - txCache: make(map[string]struct{}), + txCache: make(map[CacheIndex]struct{}), } } @@ -70,13 +78,11 @@ func (cm *Mempool[C]) Insert(ctx context.Context, tx sdk.Tx) error { return fmt.Errorf("failed to insert tx into auction index: %w", err) } - hash, err := utils.GetTxHash(cm.txEncoder, tx) + index, err := cm.getCacheIndex(tx) if err != nil { - cm.Remove(tx) - return err + return fmt.Errorf("failed to get cache index while inserting: %w", err) } - - cm.txCache[hash] = struct{}{} + cm.txCache[index] = struct{}{} return nil } @@ -87,12 +93,11 @@ func (cm *Mempool[C]) Remove(tx sdk.Tx) error { return fmt.Errorf("failed to remove transaction from the mempool: %w", err) } - hash, err := utils.GetTxHash(cm.txEncoder, tx) + index, err := cm.getCacheIndex(tx) if err != nil { - return fmt.Errorf("failed to get tx hash string: %w", err) + return fmt.Errorf("failed to get cache index while removing: %w", err) } - - delete(cm.txCache, hash) + delete(cm.txCache, index) return nil } @@ -112,12 +117,12 @@ func (cm *Mempool[C]) CountTx() int { // Contains returns true if the transaction is contained in the mempool. func (cm *Mempool[C]) Contains(tx sdk.Tx) bool { - hash, err := utils.GetTxHash(cm.txEncoder, tx) + index, err := cm.getCacheIndex(tx) if err != nil { return false } - _, ok := cm.txCache[hash] + _, ok := cm.txCache[index] return ok } @@ -167,3 +172,20 @@ func (cm *Mempool[C]) Compare(ctx sdk.Context, this sdk.Tx, other sdk.Tx) (int, secondPriority := cm.txPriority.GetTxPriority(ctx, other) return cm.txPriority.Compare(firstPriority, secondPriority), nil } + +// getCacheIndex returns the cache index for the transaction. +func (cm *Mempool[C]) getCacheIndex(tx sdk.Tx) (CacheIndex, error) { + signers, err := cm.extractor.GetSigners(tx) + if err != nil { + return CacheIndex{}, err + } + if len(signers) == 0 { + return CacheIndex{}, fmt.Errorf("expected one signer for the transaction") + } + + signerInfo := signers[0] + return CacheIndex{ + Sender: signerInfo.Signer.String(), + Sequence: signerInfo.Sequence, + }, nil +} From 203b4f53f140a82d7723d5fc511b02fd479ff498 Mon Sep 17 00:00:00 2001 From: David Terpay Date: Tue, 18 Jun 2024 16:38:27 -0400 Subject: [PATCH 3/5] nit --- block/base/lane.go | 1 - block/base/mempool.go | 67 ++++++------------------------------ block/base/mempool_test.go | 3 -- block/base/options.go | 1 - block/base/priority_nonce.go | 18 ++++++++++ lanes/base/mempool_test.go | 8 ++--- 6 files changed, 32 insertions(+), 66 deletions(-) diff --git a/block/base/lane.go b/block/base/lane.go index 39d9d1a4..475b9adb 100644 --- a/block/base/lane.go +++ b/block/base/lane.go @@ -59,7 +59,6 @@ func NewBaseLane( lane.LaneMempool = NewMempool( DefaultTxPriority(), - lane.cfg.TxEncoder, lane.cfg.SignerExtractor, lane.cfg.MaxTxs, ) diff --git a/block/base/mempool.go b/block/base/mempool.go index 4db5a9fc..3ca4e2ae 100644 --- a/block/base/mempool.go +++ b/block/base/mempool.go @@ -12,6 +12,13 @@ import ( ) type ( + // MempoolInterface defines the interface a mempool should implement. + MempoolInterface interface { + sdkmempool.Mempool + + Contains(tx sdk.Tx) bool + } + // Mempool defines a mempool that orders transactions based on the // txPriority. The mempool is a wrapper on top of the SDK's Priority Nonce mempool. // It include's additional helper functions that allow users to determine if a @@ -19,7 +26,7 @@ type ( // transactions. Mempool[C comparable] struct { // index defines an index of transactions. - index sdkmempool.Mempool + index MempoolInterface // signerExtractor defines the signer extraction adapter that allows us to // extract the signer from a transaction. @@ -30,28 +37,11 @@ type ( // of two transactions. The index utilizes this struct to order transactions // in the mempool. txPriority TxPriority[C] - - // txEncoder defines the sdk.Tx encoder that allows us to encode transactions - // to bytes. - txEncoder sdk.TxEncoder - - // txCache is a map of all transactions in the mempool. It is used - // to quickly check if a transaction is already in the mempool. - txCache map[CacheIndex]struct{} - } - - // CacheIndex defines the index utilized to cache transactions that have been inserted. - CacheIndex struct { - // Sender is the sender of the transaction. - Sender string - - // Sequence is the sequence number of the transaction. - Sequence uint64 } ) // NewMempool returns a new Mempool. -func NewMempool[C comparable](txPriority TxPriority[C], txEncoder sdk.TxEncoder, extractor signer_extraction.Adapter, maxTx int) *Mempool[C] { +func NewMempool[C comparable](txPriority TxPriority[C], extractor signer_extraction.Adapter, maxTx int) *Mempool[C] { return &Mempool[C]{ index: NewPriorityMempool( PriorityNonceMempoolConfig[C]{ @@ -62,8 +52,6 @@ func NewMempool[C comparable](txPriority TxPriority[C], txEncoder sdk.TxEncoder, ), extractor: extractor, txPriority: txPriority, - txEncoder: txEncoder, - txCache: make(map[CacheIndex]struct{}), } } @@ -78,12 +66,6 @@ func (cm *Mempool[C]) Insert(ctx context.Context, tx sdk.Tx) error { return fmt.Errorf("failed to insert tx into auction index: %w", err) } - index, err := cm.getCacheIndex(tx) - if err != nil { - return fmt.Errorf("failed to get cache index while inserting: %w", err) - } - cm.txCache[index] = struct{}{} - return nil } @@ -93,12 +75,6 @@ func (cm *Mempool[C]) Remove(tx sdk.Tx) error { return fmt.Errorf("failed to remove transaction from the mempool: %w", err) } - index, err := cm.getCacheIndex(tx) - if err != nil { - return fmt.Errorf("failed to get cache index while removing: %w", err) - } - delete(cm.txCache, index) - return nil } @@ -117,13 +93,7 @@ func (cm *Mempool[C]) CountTx() int { // Contains returns true if the transaction is contained in the mempool. func (cm *Mempool[C]) Contains(tx sdk.Tx) bool { - index, err := cm.getCacheIndex(tx) - if err != nil { - return false - } - - _, ok := cm.txCache[index] - return ok + return cm.index.Contains(tx) } // Compare determines the relative priority of two transactions belonging in the same lane. @@ -172,20 +142,3 @@ func (cm *Mempool[C]) Compare(ctx sdk.Context, this sdk.Tx, other sdk.Tx) (int, secondPriority := cm.txPriority.GetTxPriority(ctx, other) return cm.txPriority.Compare(firstPriority, secondPriority), nil } - -// getCacheIndex returns the cache index for the transaction. -func (cm *Mempool[C]) getCacheIndex(tx sdk.Tx) (CacheIndex, error) { - signers, err := cm.extractor.GetSigners(tx) - if err != nil { - return CacheIndex{}, err - } - if len(signers) == 0 { - return CacheIndex{}, fmt.Errorf("expected one signer for the transaction") - } - - signerInfo := signers[0] - return CacheIndex{ - Sender: signerInfo.Signer.String(), - Sequence: signerInfo.Sequence, - }, nil -} diff --git a/block/base/mempool_test.go b/block/base/mempool_test.go index 194cf289..8f8c2e0f 100644 --- a/block/base/mempool_test.go +++ b/block/base/mempool_test.go @@ -31,7 +31,6 @@ func BenchmarkContains(b *testing.B) { mp := base.NewMempool( base.DefaultTxPriority(), - txc.TxEncoder(), signerextraction.NewDefaultAdapter(), 1000, ) @@ -64,7 +63,6 @@ func TestMempoolComparison(t *testing.T) { ctx := testutils.CreateBaseSDKContext(t) mp := base.NewMempool( base.DefaultTxPriority(), - txc.TxEncoder(), signerextraction.NewDefaultAdapter(), 1000, ) @@ -140,7 +138,6 @@ func TestMempoolSelect(t *testing.T) { se := signerextraction.NewDefaultAdapter() mp := base.NewMempool( base.DefaultTxPriority(), - txc.TxEncoder(), se, 1000, ) diff --git a/block/base/options.go b/block/base/options.go index cd2fe0b8..d1f581a0 100644 --- a/block/base/options.go +++ b/block/base/options.go @@ -73,7 +73,6 @@ func WithMempoolConfigs[C comparable](cfg LaneConfig, txPriority TxPriority[C]) return func(l *BaseLane) { l.LaneMempool = NewMempool( txPriority, - cfg.TxEncoder, cfg.SignerExtractor, cfg.MaxTxs, ) diff --git a/block/base/priority_nonce.go b/block/base/priority_nonce.go index 6881104f..a67a527c 100644 --- a/block/base/priority_nonce.go +++ b/block/base/priority_nonce.go @@ -462,6 +462,24 @@ func (mp *PriorityNonceMempool[C]) Remove(tx sdk.Tx) error { return nil } +// Contains returns true if the transaction is in the mempool. +func (mp *PriorityNonceMempool[C]) Contains(tx sdk.Tx) bool { + signers, err := mp.signerExtractor.GetSigners(tx) + if err != nil { + return false + } + if len(signers) == 0 { + return false + } + + sig := signers[0] + nonce := sig.Sequence + sender := sig.Signer.String() + + _, ok := mp.scores[txMeta[C]{nonce: nonce, sender: sender}] + return ok +} + func IsEmpty[C comparable](mempool sdkmempool.Mempool) error { mp := mempool.(*PriorityNonceMempool[C]) if mp.priorityIndex.Len() != 0 { diff --git a/lanes/base/mempool_test.go b/lanes/base/mempool_test.go index 8521c1a7..ba5d9037 100644 --- a/lanes/base/mempool_test.go +++ b/lanes/base/mempool_test.go @@ -125,7 +125,7 @@ func (s *BaseTestSuite) TestCompareTxPriority() { } func (s *BaseTestSuite) TestInsert() { - mempool := base.NewMempool(base.DefaultTxPriority(), s.encodingConfig.TxConfig.TxEncoder(), signer_extraction.NewDefaultAdapter(), 3) + mempool := base.NewMempool(base.DefaultTxPriority(), signer_extraction.NewDefaultAdapter(), 3) s.Run("should be able to insert a transaction", func() { tx, err := testutils.CreateRandomTx( @@ -180,7 +180,7 @@ func (s *BaseTestSuite) TestInsert() { } func (s *BaseTestSuite) TestRemove() { - mempool := base.NewMempool(base.DefaultTxPriority(), s.encodingConfig.TxConfig.TxEncoder(), signer_extraction.NewDefaultAdapter(), 3) + mempool := base.NewMempool(base.DefaultTxPriority(), signer_extraction.NewDefaultAdapter(), 3) s.Run("should be able to remove a transaction", func() { tx, err := testutils.CreateRandomTx( @@ -220,7 +220,7 @@ func (s *BaseTestSuite) TestRemove() { func (s *BaseTestSuite) TestSelect() { s.Run("should be able to select transactions in the correct order", func() { - mempool := base.NewMempool(base.DefaultTxPriority(), s.encodingConfig.TxConfig.TxEncoder(), signer_extraction.NewDefaultAdapter(), 3) + mempool := base.NewMempool(base.DefaultTxPriority(), signer_extraction.NewDefaultAdapter(), 3) tx1, err := testutils.CreateRandomTx( s.encodingConfig.TxConfig, @@ -261,7 +261,7 @@ func (s *BaseTestSuite) TestSelect() { }) s.Run("should be able to select a single transaction", func() { - mempool := base.NewMempool(base.DefaultTxPriority(), s.encodingConfig.TxConfig.TxEncoder(), signer_extraction.NewDefaultAdapter(), 3) + mempool := base.NewMempool(base.DefaultTxPriority(), signer_extraction.NewDefaultAdapter(), 3) tx1, err := testutils.CreateRandomTx( s.encodingConfig.TxConfig, From 57ca54ce69c879eaf7506c77cd5f93a700fc1a55 Mon Sep 17 00:00:00 2001 From: David Terpay Date: Tue, 18 Jun 2024 16:46:50 -0400 Subject: [PATCH 4/5] nits --- block/base/mempool.go | 9 +-------- block/base/mempool_test.go | 2 +- block/base/priority_nonce.go | 10 +++++++++- 3 files changed, 11 insertions(+), 10 deletions(-) diff --git a/block/base/mempool.go b/block/base/mempool.go index 3ca4e2ae..66417175 100644 --- a/block/base/mempool.go +++ b/block/base/mempool.go @@ -12,13 +12,6 @@ import ( ) type ( - // MempoolInterface defines the interface a mempool should implement. - MempoolInterface interface { - sdkmempool.Mempool - - Contains(tx sdk.Tx) bool - } - // Mempool defines a mempool that orders transactions based on the // txPriority. The mempool is a wrapper on top of the SDK's Priority Nonce mempool. // It include's additional helper functions that allow users to determine if a @@ -63,7 +56,7 @@ func (cm *Mempool[C]) Priority(ctx sdk.Context, tx sdk.Tx) any { // Insert inserts a transaction into the mempool. func (cm *Mempool[C]) Insert(ctx context.Context, tx sdk.Tx) error { if err := cm.index.Insert(ctx, tx); err != nil { - return fmt.Errorf("failed to insert tx into auction index: %w", err) + return fmt.Errorf("failed to insert tx into mempool: %w", err) } return nil diff --git a/block/base/mempool_test.go b/block/base/mempool_test.go index 8f8c2e0f..68c0ca2c 100644 --- a/block/base/mempool_test.go +++ b/block/base/mempool_test.go @@ -22,7 +22,7 @@ type txGen struct { var ( numAccounts = 10 - numTxsPerAcct = 100 + numTxsPerAcct = 10 ) func BenchmarkContains(b *testing.B) { diff --git a/block/base/priority_nonce.go b/block/base/priority_nonce.go index a67a527c..9483a2aa 100644 --- a/block/base/priority_nonce.go +++ b/block/base/priority_nonce.go @@ -26,11 +26,19 @@ import ( ) var ( - _ sdkmempool.Mempool = (*PriorityNonceMempool[int64])(nil) + _ MempoolInterface = (*PriorityNonceMempool[int64])(nil) _ sdkmempool.Iterator = (*PriorityNonceIterator[int64])(nil) ) type ( + // MempoolInterface defines the interface a mempool should implement. + MempoolInterface interface { + sdkmempool.Mempool + + // Contains returns true if the transaction is in the mempool. + Contains(tx sdk.Tx) bool + } + // PriorityNonceMempoolConfig defines the configuration used to configure the // PriorityNonceMempool. PriorityNonceMempoolConfig[C comparable] struct { From 8098fb49ce2059353ac5ae78e324a08ae43b7eae Mon Sep 17 00:00:00 2001 From: David Terpay Date: Tue, 18 Jun 2024 17:33:13 -0400 Subject: [PATCH 5/5] nit --- block/base/mempool_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/block/base/mempool_test.go b/block/base/mempool_test.go index 68c0ca2c..8f8c2e0f 100644 --- a/block/base/mempool_test.go +++ b/block/base/mempool_test.go @@ -22,7 +22,7 @@ type txGen struct { var ( numAccounts = 10 - numTxsPerAcct = 10 + numTxsPerAcct = 100 ) func BenchmarkContains(b *testing.B) {