diff --git a/core/txpool/txpool.go b/core/txpool/txpool.go index eb591fcc72..ec2a010b46 100644 --- a/core/txpool/txpool.go +++ b/core/txpool/txpool.go @@ -645,6 +645,7 @@ func resolveCancellableBundles(lubCh chan []types.LatestUuidBundle, errCh chan e log.Trace("Processing uuid bundles", "uuidBundles", uuidBundles) lubs := <-lubCh +LubLoop: for _, lub := range lubs { ubk := uuidBundleKey{lub.Uuid, lub.SigningAddress} bundles, found := uuidBundles[ubk] @@ -652,6 +653,18 @@ func resolveCancellableBundles(lubCh chan []types.LatestUuidBundle, errCh chan e log.Trace("missing uuid bundle", "ubk", ubk) continue } + + // If lub has bundle_uuid set, and we can find corresponding bundle we prefer it, if not we fallback to bundle_hash equivalence + if lub.BundleUUID != types.EmptyUUID { + for _, bundle := range bundles { + if bundle.ComputeUUID() == lub.BundleUUID { + log.Trace("adding uuid bundle", "bundle hash", bundle.Hash.String(), "lub", lub) + currentCancellableBundles = append(currentCancellableBundles, bundle) + continue LubLoop + } + } + } + for _, bundle := range bundles { if bundle.Hash == lub.BundleHash { log.Trace("adding uuid bundle", "bundle hash", bundle.Hash.String(), "lub", lub) diff --git a/core/txpool/txpool2_test.go b/core/txpool/txpool2_test.go index 6d84975d83..3211cfd8dd 100644 --- a/core/txpool/txpool2_test.go +++ b/core/txpool/txpool2_test.go @@ -17,6 +17,7 @@ package txpool import ( "crypto/ecdsa" + "fmt" "math/big" "testing" @@ -26,6 +27,8 @@ import ( "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/event" + "github.com/google/uuid" + "github.com/stretchr/testify/require" ) func pricedValuedTransaction(nonce uint64, value int64, gaslimit uint64, gasprice *big.Int, key *ecdsa.PrivateKey) *types.Transaction { @@ -210,3 +213,78 @@ func TestTransactionZAttack(t *testing.T) { newIvPending, ivPending, pool.config.GlobalSlots, newQueued) } } + +// Tests that cancellable bundle prefers latest with the same bundle_uuid, but fallbacks to bundle_hash equality +func TestCancellableBundles(t *testing.T) { + mb1 := types.MevBundle{ + Txs: nil, + BlockNumber: big.NewInt(1), + Uuid: uuid.MustParse("2fa47a9c-1eb2-4189-b1b0-d79bf2d0fc83"), + SigningAddress: common.HexToAddress("0x1"), + MinTimestamp: 0, + MaxTimestamp: 0, + RevertingTxHashes: []common.Hash{common.HexToHash("0x111")}, + Hash: common.HexToHash("0x2"), + } + muid1 := mb1.ComputeUUID() + + mb2 := types.MevBundle{ + Txs: nil, + BlockNumber: big.NewInt(1), + Uuid: uuid.MustParse("2fa47a9c-1eb2-4189-b1b0-d79bf2d0fc83"), + SigningAddress: common.HexToAddress("0x1"), + MinTimestamp: 0, + MaxTimestamp: 0, + RevertingTxHashes: nil, + Hash: common.HexToHash("0x2"), + } + muid2 := mb2.ComputeUUID() + _ = muid2 + + mb3 := types.MevBundle{ + Txs: nil, + BlockNumber: big.NewInt(1), + Uuid: uuid.MustParse("e2b1132f-7948-4227-aac4-041e9192110a"), + SigningAddress: common.HexToAddress("0x1"), + MinTimestamp: 0, + MaxTimestamp: 0, + RevertingTxHashes: nil, + Hash: common.HexToHash("0x3"), + } + + lubCh := make(chan []types.LatestUuidBundle, 1) + errCh := make(chan error, 1) + go func() { + lub1 := types.LatestUuidBundle{ + Uuid: uuid.MustParse("2fa47a9c-1eb2-4189-b1b0-d79bf2d0fc83"), + SigningAddress: common.HexToAddress("0x1"), + BundleHash: common.HexToHash("0x2"), + BundleUUID: muid1, + } + lub2 := types.LatestUuidBundle{ + Uuid: uuid.MustParse("e2b1132f-7948-4227-aac4-041e9192110a"), + SigningAddress: common.HexToAddress("0x1"), + BundleHash: common.HexToHash("0x3"), + } + lubCh <- []types.LatestUuidBundle{lub1, lub2} + errCh <- nil + }() + + uuidBundles := make(map[uuidBundleKey][]types.MevBundle) + firstUuidBK := uuidBundleKey{ + Uuid: uuid.MustParse("2fa47a9c-1eb2-4189-b1b0-d79bf2d0fc83"), + SigningAddress: common.HexToAddress("0x1"), + } + secondUuidBK := uuidBundleKey{ + Uuid: uuid.MustParse("e2b1132f-7948-4227-aac4-041e9192110a"), + SigningAddress: common.HexToAddress("0x1"), + } + + uuidBundles[firstUuidBK] = []types.MevBundle{mb1, mb2} + uuidBundles[secondUuidBK] = []types.MevBundle{mb3} + + mbs := resolveCancellableBundles(lubCh, errCh, uuidBundles) + require.Equal(t, mbs[0], mb1) + require.Equal(t, mbs[1], mb3) + fmt.Println(mbs) +} diff --git a/core/types/transaction.go b/core/types/transaction.go index 03eb018326..a58814b2bd 100644 --- a/core/types/transaction.go +++ b/core/types/transaction.go @@ -19,9 +19,12 @@ package types import ( "bytes" "container/heap" + "crypto/sha256" + "encoding/binary" "errors" "io" "math/big" + "sort" "sync/atomic" "time" @@ -771,6 +774,7 @@ type LatestUuidBundle struct { Uuid uuid.UUID SigningAddress common.Address BundleHash common.Hash + BundleUUID uuid.UUID } type MevBundle struct { @@ -784,6 +788,23 @@ type MevBundle struct { Hash common.Hash } +func (b *MevBundle) UniquePayload() []byte { + var buf []byte + buf = binary.AppendVarint(buf, b.BlockNumber.Int64()) + buf = append(buf, b.Hash[:]...) + sort.Slice(b.RevertingTxHashes, func(i, j int) bool { + return bytes.Compare(b.RevertingTxHashes[i][:], b.RevertingTxHashes[j][:]) <= 0 + }) + for _, txHash := range b.RevertingTxHashes { + buf = append(buf, txHash[:]...) + } + return buf +} + +func (b *MevBundle) ComputeUUID() uuid.UUID { + return uuid.NewHash(sha256.New(), uuid.Nil, b.UniquePayload(), 5) +} + func (b *MevBundle) RevertingHash(hash common.Hash) bool { for _, revHash := range b.RevertingTxHashes { if revHash == hash { diff --git a/flashbotsextra/database.go b/flashbotsextra/database.go index d39274e974..80e8170607 100644 --- a/flashbotsextra/database.go +++ b/flashbotsextra/database.go @@ -10,6 +10,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" + "github.com/google/uuid" "github.com/jmoiron/sqlx" _ "github.com/lib/pq" ) @@ -61,17 +62,17 @@ func NewDatabaseService(postgresDSN string) (*DatabaseService, error) { return nil, err } - insertMissingBundleStmt, err := db.PrepareNamed("insert into bundles (bundle_hash, param_signed_txs, param_block_number, param_timestamp, received_timestamp, param_reverting_tx_hashes, coinbase_diff, total_gas_used, state_block_number, gas_fees, eth_sent_to_coinbase) values (:bundle_hash, :param_signed_txs, :param_block_number, :param_timestamp, :received_timestamp, :param_reverting_tx_hashes, :coinbase_diff, :total_gas_used, :state_block_number, :gas_fees, :eth_sent_to_coinbase) on conflict (bundle_hash, param_block_number) do nothing returning id") + insertMissingBundleStmt, err := db.PrepareNamed("insert into bundles (bundle_hash, param_signed_txs, param_block_number, param_timestamp, received_timestamp, param_reverting_tx_hashes, coinbase_diff, total_gas_used, state_block_number, gas_fees, eth_sent_to_coinbase, bundle_uuid) values (:bundle_hash, :param_signed_txs, :param_block_number, :param_timestamp, :received_timestamp, :param_reverting_tx_hashes, :coinbase_diff, :total_gas_used, :state_block_number, :gas_fees, :eth_sent_to_coinbase, :bundle_uuid) on conflict do nothing returning id") if err != nil { return nil, err } - fetchPrioBundlesStmt, err := db.PrepareNamed("select bundle_hash, param_signed_txs, param_block_number, param_timestamp, received_timestamp, param_reverting_tx_hashes, coinbase_diff, total_gas_used, state_block_number, gas_fees, eth_sent_to_coinbase from bundles where is_high_prio = :is_high_prio and coinbase_diff*1e18/total_gas_used > 1000000000 and param_block_number = :param_block_number order by coinbase_diff/total_gas_used DESC limit :limit") + fetchPrioBundlesStmt, err := db.PrepareNamed("select bundle_hash, param_signed_txs, param_block_number, param_timestamp, received_timestamp, param_reverting_tx_hashes, coinbase_diff, total_gas_used, state_block_number, gas_fees, eth_sent_to_coinbase, bundle_uuid from bundles where is_high_prio = :is_high_prio and coinbase_diff*1e18/total_gas_used > 1000000000 and param_block_number = :param_block_number order by coinbase_diff/total_gas_used DESC limit :limit") if err != nil { return nil, err } - fetchGetLatestUuidBundlesStmt, err := db.PrepareNamed("select replacement_uuid, signing_address, bundle_hash from latest_uuid_bundle where target_block_number = :target_block_number") + fetchGetLatestUuidBundlesStmt, err := db.PrepareNamed("select replacement_uuid, signing_address, bundle_hash, bundle_uuid from latest_uuid_bundle where target_block_number = :target_block_number") if err != nil { return nil, err } @@ -92,12 +93,12 @@ func Min(l int, r int) int { return r } -func (ds *DatabaseService) getBundleIds(ctx context.Context, blockNumber uint64, bundles []types.SimulatedBundle) (map[string]uint64, error) { +func (ds *DatabaseService) getBundleIds(ctx context.Context, blockNumber uint64, bundles []uuidBundle) (map[uuid.UUID]uint64, error) { if len(bundles) == 0 { return nil, nil } - bundleIdsMap := make(map[string]uint64, len(bundles)) + bundleIdsMap := make(map[uuid.UUID]uint64, len(bundles)) // Batch by 500 requestsToMake := [][]string{make([]string, 0, Min(500, len(bundles)))} @@ -107,27 +108,39 @@ func (ds *DatabaseService) getBundleIds(ctx context.Context, blockNumber uint64, cRequestInd += 1 requestsToMake = append(requestsToMake, make([]string, 0, Min(500, len(bundles)-i))) } - requestsToMake[cRequestInd] = append(requestsToMake[cRequestInd], bundle.OriginalBundle.Hash.String()) + requestsToMake[cRequestInd] = append(requestsToMake[cRequestInd], bundle.SimulatedBundle.OriginalBundle.Hash.String()) } for _, request := range requestsToMake { - query, args, err := sqlx.In("select id, bundle_hash from bundles where param_block_number = ? and bundle_hash in (?)", blockNumber, request) + query, args, err := sqlx.In("select id, bundle_hash, bundle_uuid from bundles where param_block_number = ? and bundle_hash in (?)", blockNumber, request) if err != nil { return nil, err } query = ds.db.Rebind(query) queryRes := []struct { - Id uint64 `db:"id"` - BundleHash string `db:"bundle_hash"` + Id uint64 `db:"id"` + BundleHash string `db:"bundle_hash"` + BundleUUID uuid.UUID `db:"bundle_uuid"` }{} + err = ds.db.SelectContext(ctx, &queryRes, query, args...) if err != nil { return nil, err } - + RowLoop: for _, row := range queryRes { - bundleIdsMap[row.BundleHash] = row.Id + for _, b := range bundles { + // if UUID agree it's same exact bundle we stop searching + if b.UUID == row.BundleUUID { + bundleIdsMap[b.UUID] = row.Id + continue RowLoop + } + // we can have multiple bundles with same hash eventually, so we fall back on getting row with same hash + if b.SimulatedBundle.OriginalBundle.Hash.String() == row.BundleHash { + bundleIdsMap[b.UUID] = row.Id + } + } } } @@ -135,24 +148,23 @@ func (ds *DatabaseService) getBundleIds(ctx context.Context, blockNumber uint64, } // TODO: cache locally for current block! -func (ds *DatabaseService) getBundleIdsAndInsertMissingBundles(ctx context.Context, blockNumber uint64, bundles []types.SimulatedBundle) (map[string]uint64, error) { +func (ds *DatabaseService) getBundleIdsAndInsertMissingBundles(ctx context.Context, blockNumber uint64, bundles []uuidBundle) (map[uuid.UUID]uint64, error) { bundleIdsMap, err := ds.getBundleIds(ctx, blockNumber, bundles) if err != nil { return nil, err } - toRetry := []types.SimulatedBundle{} + toRetry := make([]uuidBundle, 0) for _, bundle := range bundles { - bundleHashString := bundle.OriginalBundle.Hash.String() - if _, found := bundleIdsMap[bundleHashString]; found { + if _, found := bundleIdsMap[bundle.UUID]; found { continue } var bundleId uint64 - missingBundleData := SimulatedBundleToDbBundle(&bundle) // nolint: gosec + missingBundleData := SimulatedBundleToDbBundle(&bundle.SimulatedBundle) // nolint: gosec err = ds.insertMissingBundleStmt.GetContext(ctx, &bundleId, missingBundleData) // not using the tx as it relies on the unique constraint! if err == nil { - bundleIdsMap[bundleHashString] = bundleId + bundleIdsMap[bundle.UUID] = bundleId } else if err == sql.ErrNoRows /* conflict, someone else inserted the bundle before we could */ { toRetry = append(toRetry, bundle) } else { @@ -213,13 +225,13 @@ func (ds *DatabaseService) insertBuildBlockBundleIds(tx *sqlx.Tx, ctx context.Co return err } -func (ds *DatabaseService) insertAllBlockBundleIds(tx *sqlx.Tx, ctx context.Context, blockId uint64, bundleIdsMap map[string]uint64) error { - if len(bundleIdsMap) == 0 { +func (ds *DatabaseService) insertAllBlockBundleIds(tx *sqlx.Tx, ctx context.Context, blockId uint64, bundleIds []uint64) error { + if len(bundleIds) == 0 { return nil } - toInsert := make([]blockAndBundleId, 0, len(bundleIdsMap)) - for _, bundleId := range bundleIdsMap { + toInsert := make([]blockAndBundleId, 0, len(bundleIds)) + for _, bundleId := range bundleIds { toInsert = append(toInsert, blockAndBundleId{blockId, bundleId}) } @@ -244,14 +256,29 @@ func (ds *DatabaseService) insertUsedSBundleIds(tx *sqlx.Tx, ctx context.Context return err } +type uuidBundle struct { + SimulatedBundle types.SimulatedBundle + UUID uuid.UUID +} + func (ds *DatabaseService) ConsumeBuiltBlock(block *types.Block, blockValue *big.Int, ordersClosedAt time.Time, sealedAt time.Time, commitedBundles []types.SimulatedBundle, allBundles []types.SimulatedBundle, usedSbundles []types.UsedSBundle, bidTrace *apiv1.BidTrace) { + var allUUIDBundles = make([]uuidBundle, 0, len(allBundles)) + for _, bundle := range allBundles { + allUUIDBundles = append(allUUIDBundles, uuidBundle{bundle, bundle.OriginalBundle.ComputeUUID()}) + } + + var commitedUUIDBundles = make([]uuidBundle, 0, len(commitedBundles)) + for _, bundle := range commitedBundles { + commitedUUIDBundles = append(commitedUUIDBundles, uuidBundle{bundle, bundle.OriginalBundle.ComputeUUID()}) + } + ctx, cancel := context.WithTimeout(context.Background(), 12*time.Second) defer cancel() - bundleIdsMap, err := ds.getBundleIdsAndInsertMissingBundles(ctx, block.NumberU64(), allBundles) + bundleIdsMap, err := ds.getBundleIdsAndInsertMissingBundles(ctx, block.NumberU64(), allUUIDBundles) if err != nil { log.Error("could not insert bundles", "err", err) } @@ -270,8 +297,8 @@ func (ds *DatabaseService) ConsumeBuiltBlock(block *types.Block, blockValue *big } commitedBundlesIds := make([]uint64, 0, len(commitedBundles)) - for _, bundle := range commitedBundles { - if id, found := bundleIdsMap[bundle.OriginalBundle.Hash.String()]; found { + for _, bundle := range commitedUUIDBundles { + if id, found := bundleIdsMap[bundle.UUID]; found { commitedBundlesIds = append(commitedBundlesIds, id) } } @@ -283,10 +310,20 @@ func (ds *DatabaseService) ConsumeBuiltBlock(block *types.Block, blockValue *big return } - err = ds.insertAllBlockBundleIds(tx, ctx, blockId, bundleIdsMap) + var uniqueBundleIDs = make(map[uint64]struct{}) + var allBundleIds []uint64 + // we need to filter out duplicates while we still have unique constraint on bundle_hash+block_number which leads to data discrepancies + for _, v := range bundleIdsMap { + if _, ok := uniqueBundleIDs[v]; ok { + continue + } + uniqueBundleIDs[v] = struct{}{} + allBundleIds = append(allBundleIds, v) + } + err = ds.insertAllBlockBundleIds(tx, ctx, blockId, allBundleIds) if err != nil { tx.Rollback() - log.Error("could not insert built block all bundles", "err", err) + log.Error("could not insert built block all bundles", "err", err, "block", block.NumberU64(), "commitedBundles", commitedBundlesIds) return } @@ -326,6 +363,7 @@ func (ds *DatabaseService) GetLatestUuidBundles(ctx context.Context, blockNum in Uuid: dbLub.Uuid, SigningAddress: common.HexToAddress(dbLub.SigningAddress), BundleHash: common.HexToHash(dbLub.BundleHash), + BundleUUID: dbLub.BundleUUID, }) } return latestBundles, nil diff --git a/flashbotsextra/database_test.go b/flashbotsextra/database_test.go index 232ab4c3b1..a9c2c79337 100644 --- a/flashbotsextra/database_test.go +++ b/flashbotsextra/database_test.go @@ -17,7 +17,6 @@ func TestDatabaseBlockInsertion(t *testing.T) { if dsn == "" { t.Skip() } - ds, err := NewDatabaseService(dsn) require.NoError(t, err) @@ -151,7 +150,7 @@ func TestDatabaseBlockInsertion(t *testing.T) { require.Equal(t, sealedAt.Truncate(time.Millisecond), dbBlock.SealedAt.UTC().Truncate(time.Millisecond)) var bundles []DbBundle - ds.db.Select(&bundles, "select bundle_hash, param_signed_txs, param_block_number, param_timestamp, param_reverting_tx_hashes, coinbase_diff, total_gas_used, state_block_number, gas_fees, eth_sent_to_coinbase from bundles order by param_timestamp") + ds.db.Select(&bundles, "select bundle_hash, param_signed_txs, param_block_number, param_timestamp, param_reverting_tx_hashes, coinbase_diff, total_gas_used, state_block_number, gas_fees, eth_sent_to_coinbase, bundle_uuid from bundles order by param_timestamp") require.Len(t, bundles, 4) require.Equal(t, []DbBundle{SimulatedBundleToDbBundle(&simBundle1), SimulatedBundleToDbBundle(&simBundle2), SimulatedBundleToDbBundle(&simBundle3), SimulatedBundleToDbBundle(&simBundle4)}, bundles) @@ -173,3 +172,49 @@ func TestDatabaseBlockInsertion(t *testing.T) { Inserted: usedSbundle.Success, }, usedSbundles[0]) } + +func simpleTx(nonce uint64) *types.Transaction { + value := big.NewInt(1000000000000000) // in wei (0.001 eth) + gasLimit := uint64(21000) // in units + gasPrice := big.NewInt(1000000000) + + toAddress := common.HexToAddress("0x7777492a736CD894Cb12DFE5e944047499AEF7a0") + var data []byte + return types.NewTx(&types.LegacyTx{ + Nonce: nonce, + To: &toAddress, + Value: value, + Gas: gasLimit, + GasPrice: gasPrice, + Data: data, + }) +} + +func TestBundleUUIDHash(t *testing.T) { + tx1 := simpleTx(1) + tx2 := simpleTx(2) + bts1, err := tx1.MarshalBinary() + require.Nil(t, err) + bts2, err := tx2.MarshalBinary() + require.Nil(t, err) + _, _ = bts1, bts2 + t.Run("no reverts", func(t *testing.T) { + b := types.MevBundle{ + BlockNumber: big.NewInt(1), + Hash: common.HexToHash("0x135a7f22459b2102d51de2d6704512a03e1e2d2059c34bcbb659f4ba65e9f92c"), + } + + require.Equal(t, "5171315f-6ba4-52b2-866e-e2390d422d81", b.ComputeUUID().String()) + }) + t.Run("one revert", func(t *testing.T) { + b := types.MevBundle{ + BlockNumber: big.NewInt(1), + Hash: common.HexToHash("0x135a7f22459b2102d51de2d6704512a03e1e2d2059c34bcbb659f4ba65e9f92c"), + RevertingTxHashes: []common.Hash{ + tx1.Hash(), + }, + } + + require.Equal(t, "49dada39-6db2-500e-ae59-6cc18b2c19e0", b.ComputeUUID().String()) + }) +} diff --git a/flashbotsextra/database_types.go b/flashbotsextra/database_types.go index 8bff4d8cd2..c034d6cfe8 100644 --- a/flashbotsextra/database_types.go +++ b/flashbotsextra/database_types.go @@ -38,8 +38,9 @@ type BuiltBlockBundle struct { } type DbBundle struct { - DbId uint64 `db:"id"` - BundleHash string `db:"bundle_hash"` + DbId uint64 `db:"id"` + BundleHash string `db:"bundle_hash"` + BundleUUID uuid.UUID `db:"bundle_uuid"` ParamSignedTxs string `db:"param_signed_txs"` ParamBlockNumber uint64 `db:"param_block_number"` @@ -58,6 +59,7 @@ type DbLatestUuidBundle struct { Uuid uuid.UUID `db:"replacement_uuid"` SigningAddress string `db:"signing_address"` BundleHash string `db:"bundle_hash"` + BundleUUID uuid.UUID `db:"bundle_uuid"` } type blockAndBundleId struct { @@ -93,8 +95,8 @@ func SimulatedBundleToDbBundle(bundle *types.SimulatedBundle) DbBundle { } return DbBundle{ - BundleHash: bundle.OriginalBundle.Hash.String(), - + BundleHash: bundle.OriginalBundle.Hash.String(), + BundleUUID: bundle.OriginalBundle.ComputeUUID(), ParamSignedTxs: strings.Join(signedTxsStrings, ","), ParamBlockNumber: bundle.OriginalBundle.BlockNumber.Uint64(), ParamTimestamp: &bundle.OriginalBundle.MinTimestamp,