Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bundle new UUID #112

Merged
merged 8 commits into from
Sep 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions core/txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,13 +645,26 @@ func resolveCancellableBundles(lubCh chan []types.LatestUuidBundle, errCh chan e
log.Trace("Processing uuid bundles", "uuidBundles", uuidBundles)

lubs := <-lubCh
LubLoop:
for _, lub := range lubs {
ubk := uuidBundleKey{lub.Uuid, lub.SigningAddress}
bundles, found := uuidBundles[ubk]
if !found {
log.Trace("missing uuid bundle", "ubk", ubk)
continue
}

// If lub has bundle_uuid set, and we can find corresponding bundle we prefer it, if not we fallback to bundle_hash equivalence
if lub.BundleUUID != types.EmptyUUID {
for _, bundle := range bundles {
if bundle.ComputeUUID() == lub.BundleUUID {
log.Trace("adding uuid bundle", "bundle hash", bundle.Hash.String(), "lub", lub)
currentCancellableBundles = append(currentCancellableBundles, bundle)
continue LubLoop
}
}
}

for _, bundle := range bundles {
if bundle.Hash == lub.BundleHash {
log.Trace("adding uuid bundle", "bundle hash", bundle.Hash.String(), "lub", lub)
Expand Down
78 changes: 78 additions & 0 deletions core/txpool/txpool2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package txpool

import (
"crypto/ecdsa"
"fmt"
"math/big"
"testing"

Expand All @@ -26,6 +27,8 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/event"
"github.com/google/uuid"
"github.com/stretchr/testify/require"
)

func pricedValuedTransaction(nonce uint64, value int64, gaslimit uint64, gasprice *big.Int, key *ecdsa.PrivateKey) *types.Transaction {
Expand Down Expand Up @@ -210,3 +213,78 @@ func TestTransactionZAttack(t *testing.T) {
newIvPending, ivPending, pool.config.GlobalSlots, newQueued)
}
}

// Tests that cancellable bundle prefers latest with the same bundle_uuid, but fallbacks to bundle_hash equality
func TestCancellableBundles(t *testing.T) {
mb1 := types.MevBundle{
Txs: nil,
BlockNumber: big.NewInt(1),
Uuid: uuid.MustParse("2fa47a9c-1eb2-4189-b1b0-d79bf2d0fc83"),
SigningAddress: common.HexToAddress("0x1"),
MinTimestamp: 0,
MaxTimestamp: 0,
RevertingTxHashes: []common.Hash{common.HexToHash("0x111")},
Hash: common.HexToHash("0x2"),
}
muid1 := mb1.ComputeUUID()

mb2 := types.MevBundle{
Txs: nil,
BlockNumber: big.NewInt(1),
Uuid: uuid.MustParse("2fa47a9c-1eb2-4189-b1b0-d79bf2d0fc83"),
SigningAddress: common.HexToAddress("0x1"),
MinTimestamp: 0,
MaxTimestamp: 0,
RevertingTxHashes: nil,
Hash: common.HexToHash("0x2"),
}
muid2 := mb2.ComputeUUID()
_ = muid2

mb3 := types.MevBundle{
Txs: nil,
BlockNumber: big.NewInt(1),
Uuid: uuid.MustParse("e2b1132f-7948-4227-aac4-041e9192110a"),
SigningAddress: common.HexToAddress("0x1"),
MinTimestamp: 0,
MaxTimestamp: 0,
RevertingTxHashes: nil,
Hash: common.HexToHash("0x3"),
}

lubCh := make(chan []types.LatestUuidBundle, 1)
errCh := make(chan error, 1)
go func() {
lub1 := types.LatestUuidBundle{
Uuid: uuid.MustParse("2fa47a9c-1eb2-4189-b1b0-d79bf2d0fc83"),
SigningAddress: common.HexToAddress("0x1"),
BundleHash: common.HexToHash("0x2"),
BundleUUID: muid1,
}
lub2 := types.LatestUuidBundle{
Uuid: uuid.MustParse("e2b1132f-7948-4227-aac4-041e9192110a"),
SigningAddress: common.HexToAddress("0x1"),
BundleHash: common.HexToHash("0x3"),
}
lubCh <- []types.LatestUuidBundle{lub1, lub2}
errCh <- nil
}()

uuidBundles := make(map[uuidBundleKey][]types.MevBundle)
firstUuidBK := uuidBundleKey{
Uuid: uuid.MustParse("2fa47a9c-1eb2-4189-b1b0-d79bf2d0fc83"),
SigningAddress: common.HexToAddress("0x1"),
}
secondUuidBK := uuidBundleKey{
Uuid: uuid.MustParse("e2b1132f-7948-4227-aac4-041e9192110a"),
SigningAddress: common.HexToAddress("0x1"),
}

uuidBundles[firstUuidBK] = []types.MevBundle{mb1, mb2}
uuidBundles[secondUuidBK] = []types.MevBundle{mb3}

mbs := resolveCancellableBundles(lubCh, errCh, uuidBundles)
require.Equal(t, mbs[0], mb1)
require.Equal(t, mbs[1], mb3)
fmt.Println(mbs)
}
21 changes: 21 additions & 0 deletions core/types/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@ package types
import (
"bytes"
"container/heap"
"crypto/sha256"
"encoding/binary"
"errors"
"io"
"math/big"
"sort"
"sync/atomic"
"time"

Expand Down Expand Up @@ -771,6 +774,7 @@ type LatestUuidBundle struct {
Uuid uuid.UUID
SigningAddress common.Address
BundleHash common.Hash
BundleUUID uuid.UUID
}

type MevBundle struct {
Expand All @@ -784,6 +788,23 @@ type MevBundle struct {
Hash common.Hash
}

func (b *MevBundle) UniquePayload() []byte {
var buf []byte
buf = binary.AppendVarint(buf, b.BlockNumber.Int64())
buf = append(buf, b.Hash[:]...)
sort.Slice(b.RevertingTxHashes, func(i, j int) bool {
return bytes.Compare(b.RevertingTxHashes[i][:], b.RevertingTxHashes[j][:]) <= 0
})
for _, txHash := range b.RevertingTxHashes {
buf = append(buf, txHash[:]...)
}
return buf
}

func (b *MevBundle) ComputeUUID() uuid.UUID {
Copy link
Contributor

@Wazzymandias Wazzymandias Sep 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (non-blocking)

We should probably eventually use a sync.Once pattern to initialize the UUID once since we don't mutate bundles after initialization

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I thought in a little bit different direction, I believe we'll eventually move to propagating this uuid as a field of bundle structure, but it'd be better be done once we have some module with shared models. But I anyway understand your concern, I hope we'll address it in future

return uuid.NewHash(sha256.New(), uuid.Nil, b.UniquePayload(), 5)
}

func (b *MevBundle) RevertingHash(hash common.Hash) bool {
for _, revHash := range b.RevertingTxHashes {
if revHash == hash {
Expand Down
90 changes: 64 additions & 26 deletions flashbotsextra/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/google/uuid"
"github.com/jmoiron/sqlx"
_ "github.com/lib/pq"
)
Expand Down Expand Up @@ -61,17 +62,17 @@ func NewDatabaseService(postgresDSN string) (*DatabaseService, error) {
return nil, err
}

insertMissingBundleStmt, err := db.PrepareNamed("insert into bundles (bundle_hash, param_signed_txs, param_block_number, param_timestamp, received_timestamp, param_reverting_tx_hashes, coinbase_diff, total_gas_used, state_block_number, gas_fees, eth_sent_to_coinbase) values (:bundle_hash, :param_signed_txs, :param_block_number, :param_timestamp, :received_timestamp, :param_reverting_tx_hashes, :coinbase_diff, :total_gas_used, :state_block_number, :gas_fees, :eth_sent_to_coinbase) on conflict (bundle_hash, param_block_number) do nothing returning id")
insertMissingBundleStmt, err := db.PrepareNamed("insert into bundles (bundle_hash, param_signed_txs, param_block_number, param_timestamp, received_timestamp, param_reverting_tx_hashes, coinbase_diff, total_gas_used, state_block_number, gas_fees, eth_sent_to_coinbase, bundle_uuid) values (:bundle_hash, :param_signed_txs, :param_block_number, :param_timestamp, :received_timestamp, :param_reverting_tx_hashes, :coinbase_diff, :total_gas_used, :state_block_number, :gas_fees, :eth_sent_to_coinbase, :bundle_uuid) on conflict do nothing returning id")
if err != nil {
return nil, err
}

fetchPrioBundlesStmt, err := db.PrepareNamed("select bundle_hash, param_signed_txs, param_block_number, param_timestamp, received_timestamp, param_reverting_tx_hashes, coinbase_diff, total_gas_used, state_block_number, gas_fees, eth_sent_to_coinbase from bundles where is_high_prio = :is_high_prio and coinbase_diff*1e18/total_gas_used > 1000000000 and param_block_number = :param_block_number order by coinbase_diff/total_gas_used DESC limit :limit")
fetchPrioBundlesStmt, err := db.PrepareNamed("select bundle_hash, param_signed_txs, param_block_number, param_timestamp, received_timestamp, param_reverting_tx_hashes, coinbase_diff, total_gas_used, state_block_number, gas_fees, eth_sent_to_coinbase, bundle_uuid from bundles where is_high_prio = :is_high_prio and coinbase_diff*1e18/total_gas_used > 1000000000 and param_block_number = :param_block_number order by coinbase_diff/total_gas_used DESC limit :limit")
if err != nil {
return nil, err
}

fetchGetLatestUuidBundlesStmt, err := db.PrepareNamed("select replacement_uuid, signing_address, bundle_hash from latest_uuid_bundle where target_block_number = :target_block_number")
fetchGetLatestUuidBundlesStmt, err := db.PrepareNamed("select replacement_uuid, signing_address, bundle_hash, bundle_uuid from latest_uuid_bundle where target_block_number = :target_block_number")
if err != nil {
return nil, err
}
Expand All @@ -92,12 +93,12 @@ func Min(l int, r int) int {
return r
}

func (ds *DatabaseService) getBundleIds(ctx context.Context, blockNumber uint64, bundles []types.SimulatedBundle) (map[string]uint64, error) {
func (ds *DatabaseService) getBundleIds(ctx context.Context, blockNumber uint64, bundles []uuidBundle) (map[uuid.UUID]uint64, error) {
if len(bundles) == 0 {
return nil, nil
}

bundleIdsMap := make(map[string]uint64, len(bundles))
bundleIdsMap := make(map[uuid.UUID]uint64, len(bundles))

// Batch by 500
requestsToMake := [][]string{make([]string, 0, Min(500, len(bundles)))}
Expand All @@ -107,52 +108,63 @@ func (ds *DatabaseService) getBundleIds(ctx context.Context, blockNumber uint64,
cRequestInd += 1
requestsToMake = append(requestsToMake, make([]string, 0, Min(500, len(bundles)-i)))
}
requestsToMake[cRequestInd] = append(requestsToMake[cRequestInd], bundle.OriginalBundle.Hash.String())
requestsToMake[cRequestInd] = append(requestsToMake[cRequestInd], bundle.SimulatedBundle.OriginalBundle.Hash.String())
}

for _, request := range requestsToMake {
query, args, err := sqlx.In("select id, bundle_hash from bundles where param_block_number = ? and bundle_hash in (?)", blockNumber, request)
query, args, err := sqlx.In("select id, bundle_hash, bundle_uuid from bundles where param_block_number = ? and bundle_hash in (?)", blockNumber, request)
if err != nil {
return nil, err
}
query = ds.db.Rebind(query)

queryRes := []struct {
Id uint64 `db:"id"`
BundleHash string `db:"bundle_hash"`
Id uint64 `db:"id"`
BundleHash string `db:"bundle_hash"`
BundleUUID uuid.UUID `db:"bundle_uuid"`
}{}

err = ds.db.SelectContext(ctx, &queryRes, query, args...)
if err != nil {
return nil, err
}

RowLoop:
for _, row := range queryRes {
bundleIdsMap[row.BundleHash] = row.Id
for _, b := range bundles {
// if UUID agree it's same exact bundle we stop searching
if b.UUID == row.BundleUUID {
bundleIdsMap[b.UUID] = row.Id
continue RowLoop
}
// we can have multiple bundles with same hash eventually, so we fall back on getting row with same hash
if b.SimulatedBundle.OriginalBundle.Hash.String() == row.BundleHash {
bundleIdsMap[b.UUID] = row.Id
}
}
}
}

return bundleIdsMap, nil
}

// TODO: cache locally for current block!
func (ds *DatabaseService) getBundleIdsAndInsertMissingBundles(ctx context.Context, blockNumber uint64, bundles []types.SimulatedBundle) (map[string]uint64, error) {
func (ds *DatabaseService) getBundleIdsAndInsertMissingBundles(ctx context.Context, blockNumber uint64, bundles []uuidBundle) (map[uuid.UUID]uint64, error) {
bundleIdsMap, err := ds.getBundleIds(ctx, blockNumber, bundles)
if err != nil {
return nil, err
}

toRetry := []types.SimulatedBundle{}
toRetry := make([]uuidBundle, 0)
for _, bundle := range bundles {
bundleHashString := bundle.OriginalBundle.Hash.String()
if _, found := bundleIdsMap[bundleHashString]; found {
if _, found := bundleIdsMap[bundle.UUID]; found {
continue
}

var bundleId uint64
missingBundleData := SimulatedBundleToDbBundle(&bundle) // nolint: gosec
missingBundleData := SimulatedBundleToDbBundle(&bundle.SimulatedBundle) // nolint: gosec
err = ds.insertMissingBundleStmt.GetContext(ctx, &bundleId, missingBundleData) // not using the tx as it relies on the unique constraint!
if err == nil {
bundleIdsMap[bundleHashString] = bundleId
bundleIdsMap[bundle.UUID] = bundleId
} else if err == sql.ErrNoRows /* conflict, someone else inserted the bundle before we could */ {
toRetry = append(toRetry, bundle)
} else {
Expand Down Expand Up @@ -213,13 +225,13 @@ func (ds *DatabaseService) insertBuildBlockBundleIds(tx *sqlx.Tx, ctx context.Co
return err
}

func (ds *DatabaseService) insertAllBlockBundleIds(tx *sqlx.Tx, ctx context.Context, blockId uint64, bundleIdsMap map[string]uint64) error {
if len(bundleIdsMap) == 0 {
func (ds *DatabaseService) insertAllBlockBundleIds(tx *sqlx.Tx, ctx context.Context, blockId uint64, bundleIds []uint64) error {
if len(bundleIds) == 0 {
return nil
}

toInsert := make([]blockAndBundleId, 0, len(bundleIdsMap))
for _, bundleId := range bundleIdsMap {
toInsert := make([]blockAndBundleId, 0, len(bundleIds))
for _, bundleId := range bundleIds {
toInsert = append(toInsert, blockAndBundleId{blockId, bundleId})
}

Expand All @@ -244,14 +256,29 @@ func (ds *DatabaseService) insertUsedSBundleIds(tx *sqlx.Tx, ctx context.Context
return err
}

type uuidBundle struct {
SimulatedBundle types.SimulatedBundle
UUID uuid.UUID
}

func (ds *DatabaseService) ConsumeBuiltBlock(block *types.Block, blockValue *big.Int, ordersClosedAt time.Time, sealedAt time.Time,
commitedBundles []types.SimulatedBundle, allBundles []types.SimulatedBundle,
usedSbundles []types.UsedSBundle,
bidTrace *apiv1.BidTrace) {
var allUUIDBundles = make([]uuidBundle, 0, len(allBundles))
for _, bundle := range allBundles {
allUUIDBundles = append(allUUIDBundles, uuidBundle{bundle, bundle.OriginalBundle.ComputeUUID()})
}

var commitedUUIDBundles = make([]uuidBundle, 0, len(commitedBundles))
for _, bundle := range commitedBundles {
commitedUUIDBundles = append(commitedUUIDBundles, uuidBundle{bundle, bundle.OriginalBundle.ComputeUUID()})
}

ctx, cancel := context.WithTimeout(context.Background(), 12*time.Second)
defer cancel()

bundleIdsMap, err := ds.getBundleIdsAndInsertMissingBundles(ctx, block.NumberU64(), allBundles)
bundleIdsMap, err := ds.getBundleIdsAndInsertMissingBundles(ctx, block.NumberU64(), allUUIDBundles)
if err != nil {
log.Error("could not insert bundles", "err", err)
}
Expand All @@ -270,8 +297,8 @@ func (ds *DatabaseService) ConsumeBuiltBlock(block *types.Block, blockValue *big
}

commitedBundlesIds := make([]uint64, 0, len(commitedBundles))
for _, bundle := range commitedBundles {
if id, found := bundleIdsMap[bundle.OriginalBundle.Hash.String()]; found {
for _, bundle := range commitedUUIDBundles {
if id, found := bundleIdsMap[bundle.UUID]; found {
commitedBundlesIds = append(commitedBundlesIds, id)
}
}
Expand All @@ -283,10 +310,20 @@ func (ds *DatabaseService) ConsumeBuiltBlock(block *types.Block, blockValue *big
return
}

err = ds.insertAllBlockBundleIds(tx, ctx, blockId, bundleIdsMap)
var uniqueBundleIDs = make(map[uint64]struct{})
var allBundleIds []uint64
// we need to filter out duplicates while we still have unique constraint on bundle_hash+block_number which leads to data discrepancies
for _, v := range bundleIdsMap {
if _, ok := uniqueBundleIDs[v]; ok {
continue
}
uniqueBundleIDs[v] = struct{}{}
allBundleIds = append(allBundleIds, v)
}
err = ds.insertAllBlockBundleIds(tx, ctx, blockId, allBundleIds)
if err != nil {
tx.Rollback()
log.Error("could not insert built block all bundles", "err", err)
log.Error("could not insert built block all bundles", "err", err, "block", block.NumberU64(), "commitedBundles", commitedBundlesIds)
return
}

Expand Down Expand Up @@ -326,6 +363,7 @@ func (ds *DatabaseService) GetLatestUuidBundles(ctx context.Context, blockNum in
Uuid: dbLub.Uuid,
SigningAddress: common.HexToAddress(dbLub.SigningAddress),
BundleHash: common.HexToHash(dbLub.BundleHash),
BundleUUID: dbLub.BundleUUID,
})
}
return latestBundles, nil
Expand Down
Loading
Loading