Skip to content

Commit

Permalink
Bundle new UUID (#112)
Browse files Browse the repository at this point in the history
* Fill bundle_uuid field on bundle insertions

* fix db ID determination

* add bundle uuid hash test

* change to sha256

* align replacement uuid logic with new bundle_uuid field

* add latest_uuid/bundle_uuid test

* fetch bundle_uuid for consistency

* Filter out unique bundle_ids prior to inserting
  • Loading branch information
TymKh authored Sep 29, 2023
1 parent cd74c75 commit 7f43d0b
Show file tree
Hide file tree
Showing 6 changed files with 229 additions and 32 deletions.
13 changes: 13 additions & 0 deletions core/txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,13 +645,26 @@ func resolveCancellableBundles(lubCh chan []types.LatestUuidBundle, errCh chan e
log.Trace("Processing uuid bundles", "uuidBundles", uuidBundles)

lubs := <-lubCh
LubLoop:
for _, lub := range lubs {
ubk := uuidBundleKey{lub.Uuid, lub.SigningAddress}
bundles, found := uuidBundles[ubk]
if !found {
log.Trace("missing uuid bundle", "ubk", ubk)
continue
}

// If lub has bundle_uuid set, and we can find corresponding bundle we prefer it, if not we fallback to bundle_hash equivalence
if lub.BundleUUID != types.EmptyUUID {
for _, bundle := range bundles {
if bundle.ComputeUUID() == lub.BundleUUID {
log.Trace("adding uuid bundle", "bundle hash", bundle.Hash.String(), "lub", lub)
currentCancellableBundles = append(currentCancellableBundles, bundle)
continue LubLoop
}
}
}

for _, bundle := range bundles {
if bundle.Hash == lub.BundleHash {
log.Trace("adding uuid bundle", "bundle hash", bundle.Hash.String(), "lub", lub)
Expand Down
78 changes: 78 additions & 0 deletions core/txpool/txpool2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package txpool

import (
"crypto/ecdsa"
"fmt"
"math/big"
"testing"

Expand All @@ -26,6 +27,8 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/event"
"github.com/google/uuid"
"github.com/stretchr/testify/require"
)

func pricedValuedTransaction(nonce uint64, value int64, gaslimit uint64, gasprice *big.Int, key *ecdsa.PrivateKey) *types.Transaction {
Expand Down Expand Up @@ -210,3 +213,78 @@ func TestTransactionZAttack(t *testing.T) {
newIvPending, ivPending, pool.config.GlobalSlots, newQueued)
}
}

// Tests that cancellable bundle prefers latest with the same bundle_uuid, but fallbacks to bundle_hash equality
func TestCancellableBundles(t *testing.T) {
mb1 := types.MevBundle{
Txs: nil,
BlockNumber: big.NewInt(1),
Uuid: uuid.MustParse("2fa47a9c-1eb2-4189-b1b0-d79bf2d0fc83"),
SigningAddress: common.HexToAddress("0x1"),
MinTimestamp: 0,
MaxTimestamp: 0,
RevertingTxHashes: []common.Hash{common.HexToHash("0x111")},
Hash: common.HexToHash("0x2"),
}
muid1 := mb1.ComputeUUID()

mb2 := types.MevBundle{
Txs: nil,
BlockNumber: big.NewInt(1),
Uuid: uuid.MustParse("2fa47a9c-1eb2-4189-b1b0-d79bf2d0fc83"),
SigningAddress: common.HexToAddress("0x1"),
MinTimestamp: 0,
MaxTimestamp: 0,
RevertingTxHashes: nil,
Hash: common.HexToHash("0x2"),
}
muid2 := mb2.ComputeUUID()
_ = muid2

mb3 := types.MevBundle{
Txs: nil,
BlockNumber: big.NewInt(1),
Uuid: uuid.MustParse("e2b1132f-7948-4227-aac4-041e9192110a"),
SigningAddress: common.HexToAddress("0x1"),
MinTimestamp: 0,
MaxTimestamp: 0,
RevertingTxHashes: nil,
Hash: common.HexToHash("0x3"),
}

lubCh := make(chan []types.LatestUuidBundle, 1)
errCh := make(chan error, 1)
go func() {
lub1 := types.LatestUuidBundle{
Uuid: uuid.MustParse("2fa47a9c-1eb2-4189-b1b0-d79bf2d0fc83"),
SigningAddress: common.HexToAddress("0x1"),
BundleHash: common.HexToHash("0x2"),
BundleUUID: muid1,
}
lub2 := types.LatestUuidBundle{
Uuid: uuid.MustParse("e2b1132f-7948-4227-aac4-041e9192110a"),
SigningAddress: common.HexToAddress("0x1"),
BundleHash: common.HexToHash("0x3"),
}
lubCh <- []types.LatestUuidBundle{lub1, lub2}
errCh <- nil
}()

uuidBundles := make(map[uuidBundleKey][]types.MevBundle)
firstUuidBK := uuidBundleKey{
Uuid: uuid.MustParse("2fa47a9c-1eb2-4189-b1b0-d79bf2d0fc83"),
SigningAddress: common.HexToAddress("0x1"),
}
secondUuidBK := uuidBundleKey{
Uuid: uuid.MustParse("e2b1132f-7948-4227-aac4-041e9192110a"),
SigningAddress: common.HexToAddress("0x1"),
}

uuidBundles[firstUuidBK] = []types.MevBundle{mb1, mb2}
uuidBundles[secondUuidBK] = []types.MevBundle{mb3}

mbs := resolveCancellableBundles(lubCh, errCh, uuidBundles)
require.Equal(t, mbs[0], mb1)
require.Equal(t, mbs[1], mb3)
fmt.Println(mbs)
}
21 changes: 21 additions & 0 deletions core/types/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@ package types
import (
"bytes"
"container/heap"
"crypto/sha256"
"encoding/binary"
"errors"
"io"
"math/big"
"sort"
"sync/atomic"
"time"

Expand Down Expand Up @@ -771,6 +774,7 @@ type LatestUuidBundle struct {
Uuid uuid.UUID
SigningAddress common.Address
BundleHash common.Hash
BundleUUID uuid.UUID
}

type MevBundle struct {
Expand All @@ -784,6 +788,23 @@ type MevBundle struct {
Hash common.Hash
}

func (b *MevBundle) UniquePayload() []byte {
var buf []byte
buf = binary.AppendVarint(buf, b.BlockNumber.Int64())
buf = append(buf, b.Hash[:]...)
sort.Slice(b.RevertingTxHashes, func(i, j int) bool {
return bytes.Compare(b.RevertingTxHashes[i][:], b.RevertingTxHashes[j][:]) <= 0
})
for _, txHash := range b.RevertingTxHashes {
buf = append(buf, txHash[:]...)
}
return buf
}

func (b *MevBundle) ComputeUUID() uuid.UUID {
return uuid.NewHash(sha256.New(), uuid.Nil, b.UniquePayload(), 5)
}

func (b *MevBundle) RevertingHash(hash common.Hash) bool {
for _, revHash := range b.RevertingTxHashes {
if revHash == hash {
Expand Down
90 changes: 64 additions & 26 deletions flashbotsextra/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/google/uuid"
"github.com/jmoiron/sqlx"
_ "github.com/lib/pq"
)
Expand Down Expand Up @@ -61,17 +62,17 @@ func NewDatabaseService(postgresDSN string) (*DatabaseService, error) {
return nil, err
}

insertMissingBundleStmt, err := db.PrepareNamed("insert into bundles (bundle_hash, param_signed_txs, param_block_number, param_timestamp, received_timestamp, param_reverting_tx_hashes, coinbase_diff, total_gas_used, state_block_number, gas_fees, eth_sent_to_coinbase) values (:bundle_hash, :param_signed_txs, :param_block_number, :param_timestamp, :received_timestamp, :param_reverting_tx_hashes, :coinbase_diff, :total_gas_used, :state_block_number, :gas_fees, :eth_sent_to_coinbase) on conflict (bundle_hash, param_block_number) do nothing returning id")
insertMissingBundleStmt, err := db.PrepareNamed("insert into bundles (bundle_hash, param_signed_txs, param_block_number, param_timestamp, received_timestamp, param_reverting_tx_hashes, coinbase_diff, total_gas_used, state_block_number, gas_fees, eth_sent_to_coinbase, bundle_uuid) values (:bundle_hash, :param_signed_txs, :param_block_number, :param_timestamp, :received_timestamp, :param_reverting_tx_hashes, :coinbase_diff, :total_gas_used, :state_block_number, :gas_fees, :eth_sent_to_coinbase, :bundle_uuid) on conflict do nothing returning id")
if err != nil {
return nil, err
}

fetchPrioBundlesStmt, err := db.PrepareNamed("select bundle_hash, param_signed_txs, param_block_number, param_timestamp, received_timestamp, param_reverting_tx_hashes, coinbase_diff, total_gas_used, state_block_number, gas_fees, eth_sent_to_coinbase from bundles where is_high_prio = :is_high_prio and coinbase_diff*1e18/total_gas_used > 1000000000 and param_block_number = :param_block_number order by coinbase_diff/total_gas_used DESC limit :limit")
fetchPrioBundlesStmt, err := db.PrepareNamed("select bundle_hash, param_signed_txs, param_block_number, param_timestamp, received_timestamp, param_reverting_tx_hashes, coinbase_diff, total_gas_used, state_block_number, gas_fees, eth_sent_to_coinbase, bundle_uuid from bundles where is_high_prio = :is_high_prio and coinbase_diff*1e18/total_gas_used > 1000000000 and param_block_number = :param_block_number order by coinbase_diff/total_gas_used DESC limit :limit")
if err != nil {
return nil, err
}

fetchGetLatestUuidBundlesStmt, err := db.PrepareNamed("select replacement_uuid, signing_address, bundle_hash from latest_uuid_bundle where target_block_number = :target_block_number")
fetchGetLatestUuidBundlesStmt, err := db.PrepareNamed("select replacement_uuid, signing_address, bundle_hash, bundle_uuid from latest_uuid_bundle where target_block_number = :target_block_number")
if err != nil {
return nil, err
}
Expand All @@ -92,12 +93,12 @@ func Min(l int, r int) int {
return r
}

func (ds *DatabaseService) getBundleIds(ctx context.Context, blockNumber uint64, bundles []types.SimulatedBundle) (map[string]uint64, error) {
func (ds *DatabaseService) getBundleIds(ctx context.Context, blockNumber uint64, bundles []uuidBundle) (map[uuid.UUID]uint64, error) {
if len(bundles) == 0 {
return nil, nil
}

bundleIdsMap := make(map[string]uint64, len(bundles))
bundleIdsMap := make(map[uuid.UUID]uint64, len(bundles))

// Batch by 500
requestsToMake := [][]string{make([]string, 0, Min(500, len(bundles)))}
Expand All @@ -107,52 +108,63 @@ func (ds *DatabaseService) getBundleIds(ctx context.Context, blockNumber uint64,
cRequestInd += 1
requestsToMake = append(requestsToMake, make([]string, 0, Min(500, len(bundles)-i)))
}
requestsToMake[cRequestInd] = append(requestsToMake[cRequestInd], bundle.OriginalBundle.Hash.String())
requestsToMake[cRequestInd] = append(requestsToMake[cRequestInd], bundle.SimulatedBundle.OriginalBundle.Hash.String())
}

for _, request := range requestsToMake {
query, args, err := sqlx.In("select id, bundle_hash from bundles where param_block_number = ? and bundle_hash in (?)", blockNumber, request)
query, args, err := sqlx.In("select id, bundle_hash, bundle_uuid from bundles where param_block_number = ? and bundle_hash in (?)", blockNumber, request)
if err != nil {
return nil, err
}
query = ds.db.Rebind(query)

queryRes := []struct {
Id uint64 `db:"id"`
BundleHash string `db:"bundle_hash"`
Id uint64 `db:"id"`
BundleHash string `db:"bundle_hash"`
BundleUUID uuid.UUID `db:"bundle_uuid"`
}{}

err = ds.db.SelectContext(ctx, &queryRes, query, args...)
if err != nil {
return nil, err
}

RowLoop:
for _, row := range queryRes {
bundleIdsMap[row.BundleHash] = row.Id
for _, b := range bundles {
// if UUID agree it's same exact bundle we stop searching
if b.UUID == row.BundleUUID {
bundleIdsMap[b.UUID] = row.Id
continue RowLoop
}
// we can have multiple bundles with same hash eventually, so we fall back on getting row with same hash
if b.SimulatedBundle.OriginalBundle.Hash.String() == row.BundleHash {
bundleIdsMap[b.UUID] = row.Id
}
}
}
}

return bundleIdsMap, nil
}

// TODO: cache locally for current block!
func (ds *DatabaseService) getBundleIdsAndInsertMissingBundles(ctx context.Context, blockNumber uint64, bundles []types.SimulatedBundle) (map[string]uint64, error) {
func (ds *DatabaseService) getBundleIdsAndInsertMissingBundles(ctx context.Context, blockNumber uint64, bundles []uuidBundle) (map[uuid.UUID]uint64, error) {
bundleIdsMap, err := ds.getBundleIds(ctx, blockNumber, bundles)
if err != nil {
return nil, err
}

toRetry := []types.SimulatedBundle{}
toRetry := make([]uuidBundle, 0)
for _, bundle := range bundles {
bundleHashString := bundle.OriginalBundle.Hash.String()
if _, found := bundleIdsMap[bundleHashString]; found {
if _, found := bundleIdsMap[bundle.UUID]; found {
continue
}

var bundleId uint64
missingBundleData := SimulatedBundleToDbBundle(&bundle) // nolint: gosec
missingBundleData := SimulatedBundleToDbBundle(&bundle.SimulatedBundle) // nolint: gosec
err = ds.insertMissingBundleStmt.GetContext(ctx, &bundleId, missingBundleData) // not using the tx as it relies on the unique constraint!
if err == nil {
bundleIdsMap[bundleHashString] = bundleId
bundleIdsMap[bundle.UUID] = bundleId
} else if err == sql.ErrNoRows /* conflict, someone else inserted the bundle before we could */ {
toRetry = append(toRetry, bundle)
} else {
Expand Down Expand Up @@ -213,13 +225,13 @@ func (ds *DatabaseService) insertBuildBlockBundleIds(tx *sqlx.Tx, ctx context.Co
return err
}

func (ds *DatabaseService) insertAllBlockBundleIds(tx *sqlx.Tx, ctx context.Context, blockId uint64, bundleIdsMap map[string]uint64) error {
if len(bundleIdsMap) == 0 {
func (ds *DatabaseService) insertAllBlockBundleIds(tx *sqlx.Tx, ctx context.Context, blockId uint64, bundleIds []uint64) error {
if len(bundleIds) == 0 {
return nil
}

toInsert := make([]blockAndBundleId, 0, len(bundleIdsMap))
for _, bundleId := range bundleIdsMap {
toInsert := make([]blockAndBundleId, 0, len(bundleIds))
for _, bundleId := range bundleIds {
toInsert = append(toInsert, blockAndBundleId{blockId, bundleId})
}

Expand All @@ -244,14 +256,29 @@ func (ds *DatabaseService) insertUsedSBundleIds(tx *sqlx.Tx, ctx context.Context
return err
}

type uuidBundle struct {
SimulatedBundle types.SimulatedBundle
UUID uuid.UUID
}

func (ds *DatabaseService) ConsumeBuiltBlock(block *types.Block, blockValue *big.Int, ordersClosedAt time.Time, sealedAt time.Time,
commitedBundles []types.SimulatedBundle, allBundles []types.SimulatedBundle,
usedSbundles []types.UsedSBundle,
bidTrace *apiv1.BidTrace) {
var allUUIDBundles = make([]uuidBundle, 0, len(allBundles))
for _, bundle := range allBundles {
allUUIDBundles = append(allUUIDBundles, uuidBundle{bundle, bundle.OriginalBundle.ComputeUUID()})
}

var commitedUUIDBundles = make([]uuidBundle, 0, len(commitedBundles))
for _, bundle := range commitedBundles {
commitedUUIDBundles = append(commitedUUIDBundles, uuidBundle{bundle, bundle.OriginalBundle.ComputeUUID()})
}

ctx, cancel := context.WithTimeout(context.Background(), 12*time.Second)
defer cancel()

bundleIdsMap, err := ds.getBundleIdsAndInsertMissingBundles(ctx, block.NumberU64(), allBundles)
bundleIdsMap, err := ds.getBundleIdsAndInsertMissingBundles(ctx, block.NumberU64(), allUUIDBundles)
if err != nil {
log.Error("could not insert bundles", "err", err)
}
Expand All @@ -270,8 +297,8 @@ func (ds *DatabaseService) ConsumeBuiltBlock(block *types.Block, blockValue *big
}

commitedBundlesIds := make([]uint64, 0, len(commitedBundles))
for _, bundle := range commitedBundles {
if id, found := bundleIdsMap[bundle.OriginalBundle.Hash.String()]; found {
for _, bundle := range commitedUUIDBundles {
if id, found := bundleIdsMap[bundle.UUID]; found {
commitedBundlesIds = append(commitedBundlesIds, id)
}
}
Expand All @@ -283,10 +310,20 @@ func (ds *DatabaseService) ConsumeBuiltBlock(block *types.Block, blockValue *big
return
}

err = ds.insertAllBlockBundleIds(tx, ctx, blockId, bundleIdsMap)
var uniqueBundleIDs = make(map[uint64]struct{})
var allBundleIds []uint64
// we need to filter out duplicates while we still have unique constraint on bundle_hash+block_number which leads to data discrepancies
for _, v := range bundleIdsMap {
if _, ok := uniqueBundleIDs[v]; ok {
continue
}
uniqueBundleIDs[v] = struct{}{}
allBundleIds = append(allBundleIds, v)
}
err = ds.insertAllBlockBundleIds(tx, ctx, blockId, allBundleIds)
if err != nil {
tx.Rollback()
log.Error("could not insert built block all bundles", "err", err)
log.Error("could not insert built block all bundles", "err", err, "block", block.NumberU64(), "commitedBundles", commitedBundlesIds)
return
}

Expand Down Expand Up @@ -326,6 +363,7 @@ func (ds *DatabaseService) GetLatestUuidBundles(ctx context.Context, blockNum in
Uuid: dbLub.Uuid,
SigningAddress: common.HexToAddress(dbLub.SigningAddress),
BundleHash: common.HexToHash(dbLub.BundleHash),
BundleUUID: dbLub.BundleUUID,
})
}
return latestBundles, nil
Expand Down
Loading

0 comments on commit 7f43d0b

Please sign in to comment.