From c32dac71f76a952140b26a46ed4170f4ac899ed5 Mon Sep 17 00:00:00 2001 From: Michael Neuder Date: Wed, 14 Jun 2023 20:26:45 -0400 Subject: [PATCH 1/2] v2 --- common/test_utils.go | 32 ++- common/types.go | 256 ++++++++++++++++++- common/types_spec.go | 29 +++ datastore/redis.go | 12 +- services/api/optimistic_test.go | 163 +++++++++++- services/api/service.go | 423 ++++++++++++++++++++++++++++++++ 6 files changed, 904 insertions(+), 11 deletions(-) diff --git a/common/test_utils.go b/common/test_utils.go index 176edb1f..70231914 100644 --- a/common/test_utils.go +++ b/common/test_utils.go @@ -2,14 +2,16 @@ package common import ( "compress/gzip" + "encoding/hex" "encoding/json" "io" "os" "testing" "github.com/attestantio/go-builder-client/api/capella" - "github.com/attestantio/go-eth2-client/spec/bellatrix" + bellatrixspec "github.com/attestantio/go-eth2-client/spec/bellatrix" consensuscapella "github.com/attestantio/go-eth2-client/spec/capella" + "github.com/attestantio/go-eth2-client/spec/phase0" "github.com/flashbots/go-boost-utils/bls" boostTypes "github.com/flashbots/go-boost-utils/types" "github.com/sirupsen/logrus" @@ -70,7 +72,7 @@ func TestBuilderSubmitBlockRequest(sk *bls.SecretKey, bid *BidTraceV2) BuilderSu Message: &bid.BidTrace, Signature: [96]byte(signature), ExecutionPayload: &consensuscapella.ExecutionPayload{ //nolint:exhaustruct - Transactions: []bellatrix.Transaction{[]byte{0x03}}, + Transactions: []bellatrixspec.Transaction{[]byte{0x03}}, Timestamp: bid.Slot * 12, // 12 seconds per slot. PrevRandao: _HexToHash("01234567890123456789012345678901"), Withdrawals: []*consensuscapella.Withdrawal{}, @@ -98,3 +100,29 @@ func LoadGzippedJSON(t *testing.T, filename string, dst any) { err := json.Unmarshal(b, dst) require.NoError(t, err) } + +func TestBuilderSubmitBlockRequestV2(sk *bls.SecretKey, bid *BidTraceV2) *SubmitBlockRequest { + signature, err := boostTypes.SignMessage(bid, boostTypes.DomainBuilder, sk) + check(err, " SignMessage: ", bid, sk) + + wRoot, err := hex.DecodeString("792930bbd5baac43bcc798ee49aa8185ef76bb3b44ba62b91d86ae569e4bb535") + check(err) + return &SubmitBlockRequest{ + Message: &bid.BidTrace, + ExecutionPayloadHeader: &consensuscapella.ExecutionPayloadHeader{ //nolint:exhaustruct + TransactionsRoot: [32]byte{}, + Timestamp: bid.Slot * 12, // 12 seconds per slot. + PrevRandao: _HexToHash("01234567890123456789012345678901"), + WithdrawalsRoot: phase0.Root(wRoot), + ExtraData: []byte{ + 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, + 0x08, 0x09, 0x0a, 0x0b, 0x0c, 0x0d, 0x0e, 0x0f, + 0x10, 0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, + 0x18, 0x19, 0x1a, 0x1b, 0x1c, 0x1d, 0x1e, 0x1f, + }, + }, + Signature: [96]byte(signature), + Transactions: []bellatrixspec.Transaction{[]byte{0x03}}, + Withdrawals: []*consensuscapella.Withdrawal{}, + } +} diff --git a/common/types.go b/common/types.go index 2e54782f..c5510bfd 100644 --- a/common/types.go +++ b/common/types.go @@ -13,9 +13,10 @@ import ( "github.com/attestantio/go-builder-client/spec" apiv1capella "github.com/attestantio/go-eth2-client/api/v1/capella" consensusspec "github.com/attestantio/go-eth2-client/spec" - "github.com/attestantio/go-eth2-client/spec/bellatrix" + consensusbellatrix "github.com/attestantio/go-eth2-client/spec/bellatrix" consensuscapella "github.com/attestantio/go-eth2-client/spec/capella" "github.com/attestantio/go-eth2-client/spec/phase0" + ssz "github.com/ferranbt/fastssz" boostTypes "github.com/flashbots/go-boost-utils/types" ) @@ -671,7 +672,7 @@ func BoostBidToBidTrace(bidTrace *boostTypes.BidTrace) *apiv1.BidTrace { BuilderPubkey: phase0.BLSPubKey(bidTrace.BuilderPubkey), Slot: bidTrace.Slot, ProposerPubkey: phase0.BLSPubKey(bidTrace.ProposerPubkey), - ProposerFeeRecipient: bellatrix.ExecutionAddress(bidTrace.ProposerFeeRecipient), + ProposerFeeRecipient: consensusbellatrix.ExecutionAddress(bidTrace.ProposerFeeRecipient), BlockHash: phase0.Hash32(bidTrace.BlockHash), Value: U256StrToUint256(bidTrace.Value), ParentHash: phase0.Hash32(bidTrace.ParentHash), @@ -781,3 +782,254 @@ func (b *BuilderSubmitBlockRequest) Withdrawals() []*consensuscapella.Withdrawal } return nil } + +// SubmitBlockRequest is the v2 request from the builder to submit a block. +type SubmitBlockRequest struct { + Message *apiv1.BidTrace + ExecutionPayloadHeader *consensuscapella.ExecutionPayloadHeader + Signature phase0.BLSSignature `ssz-size:"96"` + Transactions []consensusbellatrix.Transaction `ssz-max:"1048576,1073741824" ssz-size:"?,?"` + Withdrawals []*consensuscapella.Withdrawal `ssz-max:"16"` +} + +// MarshalSSZ ssz marshals the SubmitBlockRequest object +func (s *SubmitBlockRequest) MarshalSSZ() ([]byte, error) { + return ssz.MarshalSSZ(s) +} + +// UnmarshalSSZ ssz unmarshals the SubmitBlockRequest object +func (s *SubmitBlockRequest) UnmarshalSSZ(buf []byte) error { + var err error + size := uint64(len(buf)) + if size < 344 { + return ssz.ErrSize + } + + tail := buf + var o1, o3, o4 uint64 + + // Field (0) 'Message' + if s.Message == nil { + s.Message = new(apiv1.BidTrace) + } + if err = s.Message.UnmarshalSSZ(buf[0:236]); err != nil { + return err + } + + // Offset (1) 'ExecutionPayloadHeader' + if o1 = ssz.ReadOffset(buf[236:240]); o1 > size { + return ssz.ErrOffset + } + + if o1 < 344 { + return ssz.ErrInvalidVariableOffset + } + + // Field (2) 'Signature' + copy(s.Signature[:], buf[240:336]) + + // Offset (3) 'Transactions' + if o3 = ssz.ReadOffset(buf[336:340]); o3 > size || o1 > o3 { + return ssz.ErrOffset + } + + // Offset (4) 'Withdrawals' + if o4 = ssz.ReadOffset(buf[340:344]); o4 > size || o3 > o4 { + return ssz.ErrOffset + } + + // Field (1) 'ExecutionPayloadHeader' + { + buf = tail[o1:o3] + if s.ExecutionPayloadHeader == nil { + s.ExecutionPayloadHeader = new(consensuscapella.ExecutionPayloadHeader) + } + if err = s.ExecutionPayloadHeader.UnmarshalSSZ(buf); err != nil { + return err + } + } + + // Field (3) 'Transactions' + { + buf = tail[o3:o4] + num, err := ssz.DecodeDynamicLength(buf, 1073741824) + if err != nil { + return err + } + s.Transactions = make([]consensusbellatrix.Transaction, num) + err = ssz.UnmarshalDynamic(buf, num, func(indx int, buf []byte) (err error) { + if len(buf) > 1073741824 { + return ssz.ErrBytesLength + } + if cap(s.Transactions[indx]) == 0 { + s.Transactions[indx] = consensusbellatrix.Transaction(make([]byte, 0, len(buf))) + } + s.Transactions[indx] = append(s.Transactions[indx], buf...) + return nil + }) + if err != nil { + return err + } + } + + // Field (4) 'Withdrawals' + { + buf = tail[o4:] + num, err := ssz.DivideInt2(len(buf), 44, 16) + if err != nil { + return err + } + s.Withdrawals = make([]*consensuscapella.Withdrawal, num) + for ii := 0; ii < num; ii++ { + if s.Withdrawals[ii] == nil { + s.Withdrawals[ii] = new(consensuscapella.Withdrawal) + } + if err = s.Withdrawals[ii].UnmarshalSSZ(buf[ii*44 : (ii+1)*44]); err != nil { + return err + } + } + } + return err +} + +// UnmarshalSSZHeaderOnly ssz unmarshals the first 3 fields of the SubmitBlockRequest object +func (s *SubmitBlockRequest) UnmarshalSSZHeaderOnly(buf []byte) error { + var err error + size := uint64(len(buf)) + if size < 344 { + return ssz.ErrSize + } + + tail := buf + var o1, o3 uint64 + + // Field (0) 'Message' + if s.Message == nil { + s.Message = new(apiv1.BidTrace) + } + if err = s.Message.UnmarshalSSZ(buf[0:236]); err != nil { + return err + } + + // Offset (1) 'ExecutionPayloadHeader' + if o1 = ssz.ReadOffset(buf[236:240]); o1 > size { + return ssz.ErrOffset + } + + if o1 < 344 { + return ssz.ErrInvalidVariableOffset + } + + // Field (2) 'Signature' + copy(s.Signature[:], buf[240:336]) + + // Offset (3) 'Transactions' + if o3 = ssz.ReadOffset(buf[336:340]); o3 > size || o1 > o3 { + return ssz.ErrOffset + } + + // Field (1) 'ExecutionPayloadHeader' + { + buf = tail[o1:o3] + if s.ExecutionPayloadHeader == nil { + s.ExecutionPayloadHeader = new(consensuscapella.ExecutionPayloadHeader) + } + if err = s.ExecutionPayloadHeader.UnmarshalSSZ(buf); err != nil { + return err + } + } + return err +} + +// MarshalSSZTo ssz marshals the SubmitBlockRequest object to a target array +func (s *SubmitBlockRequest) MarshalSSZTo(buf []byte) (dst []byte, err error) { + dst = buf + offset := int(344) + + // Field (0) 'Message' + if s.Message == nil { + s.Message = new(apiv1.BidTrace) + } + if dst, err = s.Message.MarshalSSZTo(dst); err != nil { + return + } + + // Offset (1) 'ExecutionPayloadHeader' + dst = ssz.WriteOffset(dst, offset) + if s.ExecutionPayloadHeader == nil { + s.ExecutionPayloadHeader = new(consensuscapella.ExecutionPayloadHeader) + } + offset += s.ExecutionPayloadHeader.SizeSSZ() + + // Field (2) 'Signature' + dst = append(dst, s.Signature[:]...) + + // Offset (3) 'Transactions' + dst = ssz.WriteOffset(dst, offset) + for ii := 0; ii < len(s.Transactions); ii++ { + offset += 4 + offset += len(s.Transactions[ii]) + } + + // Offset (4) 'Withdrawals' + dst = ssz.WriteOffset(dst, offset) + + // Field (1) 'ExecutionPayloadHeader' + if dst, err = s.ExecutionPayloadHeader.MarshalSSZTo(dst); err != nil { + return + } + + // Field (3) 'Transactions' + if size := len(s.Transactions); size > 1073741824 { + err = ssz.ErrListTooBigFn("SubmitBlockRequest.Transactions", size, 1073741824) + return + } + { + offset = 4 * len(s.Transactions) + for ii := 0; ii < len(s.Transactions); ii++ { + dst = ssz.WriteOffset(dst, offset) + offset += len(s.Transactions[ii]) + } + } + for ii := 0; ii < len(s.Transactions); ii++ { + if size := len(s.Transactions[ii]); size > 1073741824 { + err = ssz.ErrBytesLengthFn("SubmitBlockRequest.Transactions[ii]", size, 1073741824) + return + } + dst = append(dst, s.Transactions[ii]...) + } + + // Field (4) 'Withdrawals' + if size := len(s.Withdrawals); size > 16 { + err = ssz.ErrListTooBigFn("SubmitBlockRequest.Withdrawals", size, 16) + return + } + for ii := 0; ii < len(s.Withdrawals); ii++ { + if dst, err = s.Withdrawals[ii].MarshalSSZTo(dst); err != nil { + return + } + } + return dst, nil +} + +// SizeSSZ returns the ssz encoded size in bytes for the SubmitBlockRequest object +func (s *SubmitBlockRequest) SizeSSZ() (size int) { + size = 344 + + // Field (1) 'ExecutionPayloadHeader' + if s.ExecutionPayloadHeader == nil { + s.ExecutionPayloadHeader = new(consensuscapella.ExecutionPayloadHeader) + } + size += s.ExecutionPayloadHeader.SizeSSZ() + + // Field (3) 'Transactions' + for ii := 0; ii < len(s.Transactions); ii++ { + size += 4 + size += len(s.Transactions[ii]) + } + + // Field (4) 'Withdrawals' + size += len(s.Withdrawals) * 44 + + return +} diff --git a/common/types_spec.go b/common/types_spec.go index e4adc9b2..45db6585 100644 --- a/common/types_spec.go +++ b/common/types_spec.go @@ -14,6 +14,7 @@ import ( utilcapella "github.com/attestantio/go-eth2-client/util/capella" "github.com/flashbots/go-boost-utils/bls" boostTypes "github.com/flashbots/go-boost-utils/types" + "github.com/holiman/uint256" ) var ( @@ -71,6 +72,34 @@ func BuildGetHeaderResponse(payload *BuilderSubmitBlockRequest, sk *bls.SecretKe return nil, ErrEmptyPayload } +func BuildGetHeaderResponseHeaderOnly(value *uint256.Int, header *consensuscapella.ExecutionPayloadHeader, sk *bls.SecretKey, pubkey *boostTypes.PublicKey, domain boostTypes.Domain) (*GetHeaderResponse, error) { + builderBid := capella.BuilderBid{ + Value: value, + Header: header, + Pubkey: *(*phase0.BLSPubKey)(pubkey), + } + + sig, err := boostTypes.SignMessage(&builderBid, domain, sk) + if err != nil { + return nil, err + } + + signedBuilderBid := &capella.SignedBuilderBid{ + Message: &builderBid, + Signature: phase0.BLSSignature(sig), + } + + return &GetHeaderResponse{ + Capella: &spec.VersionedSignedBuilderBid{ + Version: consensusspec.DataVersionCapella, + Capella: signedBuilderBid, + Bellatrix: nil, + }, + Bellatrix: nil, + }, nil + +} + func BuildGetPayloadResponse(payload *BuilderSubmitBlockRequest) (*GetPayloadResponse, error) { if payload.Bellatrix != nil { return &GetPayloadResponse{ diff --git a/datastore/redis.go b/datastore/redis.go index 3b18393a..0cca67bf 100644 --- a/datastore/redis.go +++ b/datastore/redis.go @@ -230,7 +230,7 @@ func (r *RedisCache) HSetObj(key, field string, value any, expiration time.Durat } func (r *RedisCache) GetValidatorRegistrationTimestamp(proposerPubkey boostTypes.PubkeyHex) (uint64, error) { - timestamp, err := r.client.HGet(context.Background(), r.keyValidatorRegistrationTimestamp, strings.ToLower(proposerPubkey.String())).Uint64() + timestamp, err := r.readonlyClient.HGet(context.Background(), r.keyValidatorRegistrationTimestamp, strings.ToLower(proposerPubkey.String())).Uint64() if errors.Is(err, redis.Nil) { return 0, nil } @@ -499,10 +499,12 @@ func (r *RedisCache) SaveBidAndUpdateTopBid(ctx context.Context, tx redis.Pipeli // // Time to save things in Redis // - // 1. Save the execution payload - err = r.SaveExecutionPayloadCapella(ctx, tx, payload.Slot(), payload.ProposerPubkey(), payload.BlockHash(), getPayloadResponse.Capella.Capella) - if err != nil { - return state, err + // 1. Save the execution payload (only if it was passed in). + if getPayloadResponse != nil { + err = r.SaveExecutionPayloadCapella(ctx, tx, payload.Slot(), payload.ProposerPubkey(), payload.BlockHash(), getPayloadResponse.Capella.Capella) + if err != nil { + return state, err + } } // Record time needed to save payload diff --git a/services/api/optimistic_test.go b/services/api/optimistic_test.go index 5774d2d9..57ddcb21 100644 --- a/services/api/optimistic_test.go +++ b/services/api/optimistic_test.go @@ -1,6 +1,7 @@ package api import ( + "bytes" "context" "encoding/json" "fmt" @@ -12,6 +13,7 @@ import ( "time" "github.com/alicebob/miniredis/v2" + builderCapella "github.com/attestantio/go-builder-client/api/capella" v1 "github.com/attestantio/go-builder-client/api/v1" "github.com/attestantio/go-eth2-client/spec/bellatrix" consensuscapella "github.com/attestantio/go-eth2-client/spec/capella" @@ -32,6 +34,7 @@ const ( builderID = "builder0x69" randao = "01234567890123456789012345678901" emptyHash = "0x0000000000000000000000000000000000000000000000000000000000000000" + emptyPubkey = "0x000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000" proposerInd = uint64(987) genesis = 1606824023 ) @@ -52,6 +55,33 @@ func getTestBidTrace(pubkey phase0.BLSPubKey, value uint64) *common.BidTraceV2 { } } +func getTestPayload(req *common.SubmitBlockRequest) *common.BuilderSubmitBlockRequest { + eph := req.ExecutionPayloadHeader + return &common.BuilderSubmitBlockRequest{ + Bellatrix: nil, + Capella: &builderCapella.SubmitBlockRequest{ + Message: req.Message, + // Transactions and Withdrawals are intentionally omitted. + ExecutionPayload: &consensuscapella.ExecutionPayload{ //nolint:exhaustruct + ParentHash: eph.ParentHash, + FeeRecipient: eph.FeeRecipient, + StateRoot: eph.StateRoot, + ReceiptsRoot: eph.ReceiptsRoot, + LogsBloom: eph.LogsBloom, + PrevRandao: eph.PrevRandao, + BlockNumber: eph.BlockNumber, + GasLimit: eph.GasLimit, + GasUsed: eph.GasUsed, + Timestamp: eph.Timestamp, + ExtraData: eph.ExtraData, + BaseFeePerGas: eph.BaseFeePerGas, + BlockHash: eph.BlockHash, + }, + Signature: req.Signature, + }, + } +} + type blockRequestOpts struct { pubkey phase0.BLSPubKey secretkey *bls.SecretKey @@ -145,9 +175,20 @@ func runOptimisticBlockSubmission(t *testing.T, opts blockRequestOpts, simErr er req := common.TestBuilderSubmitBlockRequest(opts.secretkey, getTestBidTrace(opts.pubkey, opts.blockValue)) rr := backend.request(http.MethodPost, pathSubmitNewBlock, &req) + return rr +} - // Let updates happen async. - time.Sleep(100 * time.Millisecond) +func runOptimisticBlockSubmissionV2(t *testing.T, opts blockRequestOpts, simErr error, backend *testBackend) *httptest.ResponseRecorder { + t.Helper() + backend.relay.blockSimRateLimiter = &MockBlockSimulationRateLimiter{ + simulationError: simErr, + } + + req := common.TestBuilderSubmitBlockRequestV2(opts.secretkey, getTestBidTrace(opts.pubkey, opts.blockValue)) + outBytes, err := req.MarshalSSZ() + require.NoError(t, err) + + rr := backend.requestBytes(http.MethodPost, pathSubmitNewBlockV2, outBytes, map[string]string{}) return rr } @@ -434,3 +475,121 @@ func TestInternalBuilderCollateral(t *testing.T) { require.Equal(t, resp.BuilderID, "builder0x69") require.Equal(t, resp.Collateral, "10000") } + +func TestBuilderApiSubmitNewBlockOptimisticV2(t *testing.T) { + testCases := []struct { + description string + httpCode uint64 + blockValue uint64 + }{ + { + description: "value_less_than_collateral", + httpCode: 200, // success + blockValue: collateral - 1, + }, + { + description: "value_more_than_collateral", + httpCode: 400, // failure + blockValue: collateral + 1, + }, + } + + for _, tc := range testCases { + t.Run(tc.description, func(t *testing.T) { + pubkey, secretkey, backend := startTestBackend(t) + backend.relay.optimisticSlot.Store(slot) + backend.relay.capellaEpoch = 1 + var randaoHash boostTypes.Hash + err := randaoHash.FromSlice([]byte(randao)) + require.NoError(t, err) + withRoot, err := ComputeWithdrawalsRoot([]*consensuscapella.Withdrawal{}) + require.NoError(t, err) + backend.relay.payloadAttributes[emptyHash] = payloadAttributesHelper{ + slot: slot, + withdrawalsRoot: withRoot, + payloadAttributes: beaconclient.PayloadAttributes{ + PrevRandao: randaoHash.String(), + }, + } + rr := runOptimisticBlockSubmissionV2(t, blockRequestOpts{ + secretkey: secretkey, + pubkey: *pubkey, + blockValue: tc.blockValue, + domain: backend.relay.opts.EthNetDetails.DomainBuilder, + }, nil, backend) + + // Check http code. + require.Equal(t, uint64(rr.Code), tc.httpCode) + }) + } +} + +func TestOptimisticV2SlowPath(t *testing.T) { + testCases := []struct { + description string + simErr error + demotion bool + }{ + { + description: "success", + simErr: nil, + demotion: false, + }, + { + description: "sim_error", + simErr: errFake, + demotion: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.description, func(t *testing.T) { + pubkey, secretkey, backend := startTestBackend(t) + backend.relay.optimisticSlot.Store(slot) + + req := common.TestBuilderSubmitBlockRequestV2(secretkey, getTestBidTrace(*pubkey, 999)) + payload := getTestPayload(req) + outBytes, err := req.MarshalSSZ() + require.NoError(t, err) + r := bytes.NewReader(outBytes) + v2Opts := v2SlowPathOpts{ + header: req, + payload: payload, + entry: &blockBuilderCacheEntry{ + status: common.BuilderStatus{ + IsOptimistic: true, + }, + }, + pipeliner: backend.redis.NewTxPipeline(), + } + backend.relay.blockSimRateLimiter = &MockBlockSimulationRateLimiter{ + simulationError: tc.simErr, + } + backend.relay.optimisticV2SlowPath(r, v2Opts) + time.Sleep(100 * time.Millisecond) + // Check demotion status is set to expected. + mockDB, ok := backend.relay.db.(*database.MockDB) + require.True(t, ok) + require.Equal(t, mockDB.Demotions[pubkey.String()], tc.demotion) + }) + } +} + +func TestOptimisticV2MarshallError(t *testing.T) { + pubkey, secretkey, backend := startTestBackend(t) + backend.relay.optimisticSlot.Store(slot) + + req := common.TestBuilderSubmitBlockRequestV2(secretkey, getTestBidTrace(*pubkey, 999)) + payload := getTestPayload(req) + errBytes := make([]byte, req.SizeSSZ()) + r := bytes.NewReader(errBytes) + v2Opts := v2SlowPathOpts{ + header: req, + payload: payload, + } + backend.relay.optimisticV2SlowPath(r, v2Opts) + // Check demotion status is set to true. + mockDB, ok := backend.relay.db.(*database.MockDB) + require.True(t, ok) + require.True(t, mockDB.Demotions[pubkey.String()]) +} diff --git a/services/api/service.go b/services/api/service.go index dcbf54ce..21217101 100644 --- a/services/api/service.go +++ b/services/api/service.go @@ -23,6 +23,7 @@ import ( "github.com/NYTimes/gziphandler" builderCapella "github.com/attestantio/go-builder-client/api/capella" "github.com/attestantio/go-eth2-client/api/v1/capella" + capellaspec "github.com/attestantio/go-eth2-client/spec/capella" "github.com/attestantio/go-eth2-client/spec/phase0" "github.com/buger/jsonparser" "github.com/flashbots/go-boost-utils/bls" @@ -67,6 +68,7 @@ var ( // Block builder API pathBuilderGetValidators = "/relay/v1/builder/validators" pathSubmitNewBlock = "/relay/v1/builder/blocks" + pathSubmitNewBlockV2 = "/relay/v2/builder/blocks" // Data API pathDataProposerPayloadDelivered = "/relay/v1/data/bidtraces/proposer_payload_delivered" @@ -334,6 +336,7 @@ func (api *RelayAPI) getRouter() http.Handler { api.log.Info("block builder API enabled") r.HandleFunc(pathBuilderGetValidators, api.handleBuilderGetValidators).Methods(http.MethodGet) r.HandleFunc(pathSubmitNewBlock, api.handleSubmitNewBlock).Methods(http.MethodPost) + r.HandleFunc(pathSubmitNewBlockV2, api.handleSubmitNewBlockV2).Methods(http.MethodPost) } // Data API @@ -1934,6 +1937,426 @@ func (api *RelayAPI) handleSubmitNewBlock(w http.ResponseWriter, req *http.Reque w.WriteHeader(http.StatusOK) } +func (api *RelayAPI) handleSubmitNewBlockV2(w http.ResponseWriter, req *http.Request) { + var pf common.Profile + var prevTime, nextTime time.Time + + headSlot := api.headSlot.Load() + receivedAt := time.Now().UTC() + prevTime = receivedAt + + args := req.URL.Query() + isCancellationEnabled := args.Get("cancellations") == "1" + + log := api.log.WithFields(logrus.Fields{ + "method": "submitNewBlockV2", + "contentLength": req.ContentLength, + "headSlot": headSlot, + "cancellationEnabled": isCancellationEnabled, + "timestampRequestStart": receivedAt.UnixMilli(), + }) + + // Log at start and end of request + log.Info("request initiated") + defer func() { + log.WithFields(logrus.Fields{ + "timestampRequestFin": time.Now().UTC().UnixMilli(), + "requestDurationMs": time.Since(receivedAt).Milliseconds(), + }).Info("request finished") + }() + + // If cancellations are disabled but builder requested it, return error + if isCancellationEnabled && !api.ffEnableCancellations { + log.Info("builder submitted with cancellations enabled, but feature flag is disabled") + api.RespondError(w, http.StatusBadRequest, "cancellations are disabled") + return + } + + var err error + var r io.Reader = req.Body + if req.Header.Get("Content-Encoding") == "gzip" { + r, err = gzip.NewReader(req.Body) + if err != nil { + log.WithError(err).Warn("could not create gzip reader") + api.RespondError(w, http.StatusBadRequest, err.Error()) + return + } + log = log.WithField("gzip-req", true) + } + + var buf bytes.Buffer + rHeader := io.TeeReader(r, &buf) + + // Header at most 944 bytes. + headBuf := make([]byte, 944) + + // Read header bytes. + _, err = io.ReadFull(rHeader, headBuf) + if err != nil { + log.WithError(err).Warn("could not read full header") + api.RespondError(w, http.StatusBadRequest, err.Error()) + return + } + + // Unmarshall just header. + var header common.SubmitBlockRequest + err = header.UnmarshalSSZHeaderOnly(headBuf) + if err != nil { + log.WithError(err).Warn("could not unmarshall request") + api.RespondError(w, http.StatusBadRequest, err.Error()) + return + } + + nextTime = time.Now().UTC() + pf.Decode = uint64(nextTime.Sub(prevTime).Microseconds()) + prevTime = nextTime + + bid := header.Message + sig := header.Signature + eph := header.ExecutionPayloadHeader + + log = log.WithFields(logrus.Fields{ + "timestampAfterDecoding": time.Now().UTC().UnixMilli(), + "slot": bid.Slot, + "builderPubkey": bid.BuilderPubkey.String(), + "blockHash": bid.BlockHash.String(), + "proposerPubkey": bid.ProposerPubkey.String(), + "parentHash": bid.ParentHash.String(), + "value": bid.Value.String(), + }) + + ok, err := boostTypes.VerifySignature(bid, api.opts.EthNetDetails.DomainBuilder, bid.BuilderPubkey[:], sig[:]) + if !ok || err != nil { + log.WithError(err).Warn("could not verify builder signature") + api.RespondError(w, http.StatusBadRequest, "invalid signature") + return + } + + log.WithFields(logrus.Fields{ + "bid": bid, + "signature": sig, + "decode_time": pf.Decode, + }).Info("optimistically parsed bid and verified signature") + + // Check optimistic eligibility. + builderPubkey := bid.BuilderPubkey + builderEntry, ok := api.blockBuildersCache[builderPubkey.String()] + if !ok { + log.Errorf("unable to read builder: %x from the builder cache, rejecting submission.", builderPubkey.String()) + api.RespondError(w, http.StatusBadRequest, "unknown builder pubkey") + return + } + if !builderEntry.status.IsOptimistic { + log.Errorf("builder: %x not eligible for optimistic relaying.", builderPubkey.String()) + api.RespondError(w, http.StatusBadRequest, "builder not eligible for optimistic relaying") + return + } + if builderEntry.collateral.Cmp(bid.Value.ToBig()) <= 0 || bid.Slot != api.optimisticSlot.Load() { + log.Warningf("insufficient collateral or non-optimistic slot. reverting to standard relaying.") + api.RespondError(w, http.StatusBadRequest, "insufficient collateral, unable to execute v2 optimistic relaying") + // api.handleSubmitNewBlock(w, req) // TODO(mikeneuder): req is already partially consumed here. + return + } + log = log.WithFields(logrus.Fields{ + "builderEntry": builderEntry, + }) + + // Optimistic prechecks. + if bid.Slot <= headSlot { + api.log.Info("submitNewBlock failed: submission for past slot") + api.RespondError(w, http.StatusBadRequest, "submission for past slot") + return + } + + // Timestamp check + expectedTimestamp := api.genesisInfo.Data.GenesisTime + (bid.Slot * common.SecondsPerSlot) + if eph.Timestamp != expectedTimestamp { + log.Warnf("incorrect timestamp. got %d, expected %d", eph.Timestamp, expectedTimestamp) + api.RespondError(w, http.StatusBadRequest, fmt.Sprintf("incorrect timestamp. got %d, expected %d", eph.Timestamp, expectedTimestamp)) + return + } + + if builderEntry.status.IsBlacklisted { + log.Info("builder is blacklisted") + time.Sleep(200 * time.Millisecond) + w.WriteHeader(http.StatusOK) + return + } + + // In case only high-prio requests are accepted, fail others + if api.ffDisableLowPrioBuilders && !builderEntry.status.IsHighPrio { + log.Info("rejecting low-prio builder (ff-disable-low-prio-builders)") + time.Sleep(200 * time.Millisecond) + w.WriteHeader(http.StatusOK) + return + } + + log = log.WithField("timestampAfterChecks1", time.Now().UTC().UnixMilli()) + + // ensure correct feeRecipient is used + api.proposerDutiesLock.RLock() + slotDuty := api.proposerDutiesMap[bid.Slot] + api.proposerDutiesLock.RUnlock() + if slotDuty == nil { + log.Warn("could not find slot duty") + api.RespondError(w, http.StatusBadRequest, "could not find slot duty") + return + } else if !strings.EqualFold(slotDuty.Entry.Message.FeeRecipient.String(), bid.ProposerFeeRecipient.String()) { + log.Info("fee recipient does not match") + api.RespondError(w, http.StatusBadRequest, "fee recipient does not match") + return + } + + log = log.WithField("timestampBeforeAttributesCheck", time.Now().UTC().UnixMilli()) + + api.payloadAttributesLock.RLock() + attrs, ok := api.payloadAttributes[bid.ParentHash.String()] + api.payloadAttributesLock.RUnlock() + if !ok || bid.Slot != attrs.slot { + log.Warn("payload attributes not (yet) known") + api.RespondError(w, http.StatusBadRequest, "payload attributes not (yet) known") + return + } + + randao := fmt.Sprintf("0x%x", eph.PrevRandao) + if randao != attrs.payloadAttributes.PrevRandao { + msg := fmt.Sprintf("incorrect prev_randao - got: %s, expected: %s", randao, attrs.payloadAttributes.PrevRandao) + log.Info(msg) + api.RespondError(w, http.StatusBadRequest, msg) + return + } + + if eph.WithdrawalsRoot != attrs.withdrawalsRoot { + msg := fmt.Sprintf("incorrect withdrawals root - got: %s, expected: %s", eph.WithdrawalsRoot.String(), attrs.withdrawalsRoot.String()) + log.Info(msg) + api.RespondError(w, http.StatusBadRequest, msg) + return + } + + // Create the redis pipeline tx + tx := api.redis.NewTxPipeline() + + // Reject new submissions once the payload for this slot was delivered - TODO: store in memory as well + slotLastPayloadDelivered, err := api.redis.GetLastSlotDelivered(req.Context(), tx) + if err != nil && !errors.Is(err, redis.Nil) { + log.WithError(err).Error("failed to get delivered payload slot from redis") + } else if bid.Slot <= slotLastPayloadDelivered { + log.Info("rejecting submission because payload for this slot was already delivered") + api.RespondError(w, http.StatusBadRequest, "payload for this slot was already delivered") + return + } + + nextTime = time.Now().UTC() + pf.Prechecks = uint64(nextTime.Sub(prevTime).Microseconds()) + + payload := &common.BuilderSubmitBlockRequest{ + Bellatrix: nil, + Capella: &builderCapella.SubmitBlockRequest{ + Message: header.Message, + // Transactions and Withdrawals are intentionally omitted. + ExecutionPayload: &capellaspec.ExecutionPayload{ //nolint:exhaustruct + ParentHash: eph.ParentHash, + FeeRecipient: eph.FeeRecipient, + StateRoot: eph.StateRoot, + ReceiptsRoot: eph.ReceiptsRoot, + LogsBloom: eph.LogsBloom, + PrevRandao: eph.PrevRandao, + BlockNumber: eph.BlockNumber, + GasLimit: eph.GasLimit, + GasUsed: eph.GasUsed, + Timestamp: eph.Timestamp, + ExtraData: eph.ExtraData, + BaseFeePerGas: eph.BaseFeePerGas, + BlockHash: eph.BlockHash, + }, + Signature: header.Signature, + }, + } + + // Prepare the response data + getHeaderResponse, err := common.BuildGetHeaderResponseHeaderOnly(header.Message.Value, eph, api.blsSk, api.publicKey, api.opts.EthNetDetails.DomainBuilder) + if err != nil { + log.WithError(err).Error("could not sign builder bid") + api.RespondError(w, http.StatusBadRequest, err.Error()) + return + } + + bidTrace := &common.BidTraceV2{ + BidTrace: *header.Message, + BlockNumber: header.ExecutionPayloadHeader.BlockNumber, + } + + + Save to Redis + + updateBidResult, err := api.redis.SaveBidAndUpdateTopBid(context.Background(), tx, bidTrace, payload, nil, getHeaderResponse, receivedAt, isCancellationEnabled, nil) + if err != nil { + log.WithError(err).Error("could not save bid and update top bids") + api.RespondError(w, http.StatusInternalServerError, "failed saving and updating bid") + return + } + + Add fields to logs + log = log.WithFields(logrus.Fields{ + "timestampAfterBidUpdate": time.Now().UTC().UnixMilli(), + "wasBidSavedInRedis": updateBidResult.WasBidSaved, + "wasTopBidUpdated": updateBidResult.WasTopBidUpdated, + "topBidValue": updateBidResult.TopBidValue, + "prevTopBidValue": updateBidResult.PrevTopBidValue, + "profileRedisSavePayloadUs": updateBidResult.TimeSavePayload.Microseconds(), + "profileRedisUpdateTopBidUs": updateBidResult.TimeUpdateTopBid.Microseconds(), + "profileRedisUpdateFloorUs": updateBidResult.TimeUpdateFloor.Microseconds(), + }) + + eligibleAt := time.Now().UTC() + + // Read all remaining bytes into the tee reader + remainder, err := io.ReadAll(r) + if err != nil { + demotionErr := fmt.Errorf("%w: could not read full message", err) + api.demoteBuilder(payload.BuilderPubkey().String(), payload, demotionErr) + log.WithError(err).Warn("could not read full message") + return + } + remainderReader := bytes.NewReader(remainder) + + // Join the header bytes with the remaining bytes. + go api.optimisticV2SlowPath(io.MultiReader(&buf, remainderReader), v2SlowPathOpts{ + header: &header, + payload: payload, + receivedAt: receivedAt, + eligibleAt: eligibleAt, + pf: pf, + isCancellationEnabled: isCancellationEnabled, + entry: builderEntry, + gasLimit: slotDuty.Entry.Message.GasLimit, + pipeliner: tx, + }) + + log.WithFields(logrus.Fields{ + "value": bid.Value.String(), + "profile": pf.String(), + }).Info("saving v2 optimistic bid from builder") + w.WriteHeader(http.StatusOK) +} + +type v2SlowPathOpts struct { + header *common.SubmitBlockRequest + payload *common.BuilderSubmitBlockRequest + receivedAt time.Time + eligibleAt time.Time + pf common.Profile + isCancellationEnabled bool + entry *blockBuilderCacheEntry + gasLimit uint64 + pipeliner redis.Pipeliner +} + +func (api *RelayAPI) optimisticV2SlowPath(r io.Reader, v2Opts v2SlowPathOpts) { + log := api.log.WithFields(logrus.Fields{"method": "optimisticV2SlowPath"}) + + payload := v2Opts.payload + msg, err := io.ReadAll(r) + if err != nil { + demotionErr := fmt.Errorf("%w: could not read full message", err) + api.demoteBuilder(payload.BuilderPubkey().String(), payload, demotionErr) + log.WithError(err).Warn("could not read full message") + return + } + + // Unmarshall full request. + var req common.SubmitBlockRequest + err = req.UnmarshalSSZ(msg) + if err != nil { + demotionErr := fmt.Errorf("%w: could not unmarshall full request", err) + api.demoteBuilder(payload.BuilderPubkey().String(), payload, demotionErr) + log.WithError(err).Warn("could not unmarshall full request") + return + } + + // Fill in txns and withdrawals. + payload.Capella.ExecutionPayload.Transactions = req.Transactions + payload.Capella.ExecutionPayload.Withdrawals = req.Withdrawals + + getPayloadResponse, err := common.BuildGetPayloadResponse(payload) + if err != nil { + demotionErr := fmt.Errorf("%w: could not construct getPayloadResponse", err) + api.demoteBuilder(payload.BuilderPubkey().String(), payload, demotionErr) + log.WithError(err).Warn("could not construct getPayloadResponse") + return + } + + // Create the redis pipeline tx + tx := api.redis.NewTxPipeline() + + // Save payload. + err = api.redis.SaveExecutionPayloadCapella(context.Background(), tx, payload.Slot(), payload.ProposerPubkey(), payload.BlockHash(), getPayloadResponse.Capella.Capella) + if err != nil { + demotionErr := fmt.Errorf("%w: could not save execution payload", err) + api.demoteBuilder(payload.BuilderPubkey().String(), payload, demotionErr) + log.WithError(err).Warn("could not save execution payload") + return + } + + currentTime := time.Now().UTC() + log.WithFields(logrus.Fields{ + "timeStampExecutionPayloadSaved": currentTime.UnixMilli(), + "timeSinceReceivedAt": v2Opts.receivedAt.Sub(currentTime).Milliseconds(), + "timeSinceEligibleAt": v2Opts.eligibleAt.Sub(currentTime).Milliseconds(), + }).Info("v2 execution payload saved") + + // Used to communicate simulation result to the deferred function. + simResultC := make(chan *blockSimResult, 1) + + // Save the builder submission to the database whenever this function ends + defer func() { + savePayloadToDatabase := !api.ffDisablePayloadDBStorage + var simResult *blockSimResult + select { + case simResult = <-simResultC: + case <-time.After(10 * time.Second): + log.Warn("timed out waiting for simulation result") + simResult = &blockSimResult{false, false, nil, nil} + } + + submissionEntry, err := api.db.SaveBuilderBlockSubmission(payload, simResult.requestErr, simResult.validationErr, v2Opts.receivedAt, v2Opts.eligibleAt, simResult.wasSimulated, savePayloadToDatabase, v2Opts.pf, simResult.optimisticSubmission) + if err != nil { + log.WithError(err).WithField("payload", payload).Error("saving builder block submission to database failed") + return + } + + err = api.db.UpsertBlockBuilderEntryAfterSubmission(submissionEntry, simResult.validationErr != nil) + if err != nil { + log.WithError(err).Error("failed to upsert block-builder-entry") + } + }() + + // Simulate the block submission and save to db + timeBeforeValidation := time.Now().UTC() + + log = log.WithFields(logrus.Fields{ + "timestampBeforeValidation": timeBeforeValidation.UTC().UnixMilli(), + }) + + // Construct simulation request. + opts := blockSimOptions{ + isHighPrio: v2Opts.entry.status.IsHighPrio, + log: log, + builder: v2Opts.entry, + req: &common.BuilderBlockValidationRequest{ + BuilderSubmitBlockRequest: *payload, + RegisteredGasLimit: v2Opts.gasLimit, + }, + } + go api.processOptimisticBlock(opts, simResultC) + + nextTime := time.Now().UTC() + v2Opts.pf.Simulation = uint64(nextTime.Sub(v2Opts.eligibleAt).Microseconds()) + + // All done + log.Info("received v2 block from builder") +} + // --------------- // // INTERNAL APIS From c7e75c1d2ad30ad16912dc3b7f48d205130c763f Mon Sep 17 00:00:00 2001 From: Michael Neuder Date: Wed, 14 Jun 2023 20:50:01 -0400 Subject: [PATCH 2/2] fix comment --- services/api/service.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/services/api/service.go b/services/api/service.go index 21217101..66778333 100644 --- a/services/api/service.go +++ b/services/api/service.go @@ -2186,9 +2186,9 @@ func (api *RelayAPI) handleSubmitNewBlockV2(w http.ResponseWriter, req *http.Req BlockNumber: header.ExecutionPayloadHeader.BlockNumber, } - - Save to Redis - + // + // Save to Redis + // updateBidResult, err := api.redis.SaveBidAndUpdateTopBid(context.Background(), tx, bidTrace, payload, nil, getHeaderResponse, receivedAt, isCancellationEnabled, nil) if err != nil { log.WithError(err).Error("could not save bid and update top bids") @@ -2196,7 +2196,7 @@ func (api *RelayAPI) handleSubmitNewBlockV2(w http.ResponseWriter, req *http.Req return } - Add fields to logs + // Add fields to logs log = log.WithFields(logrus.Fields{ "timestampAfterBidUpdate": time.Now().UTC().UnixMilli(), "wasBidSavedInRedis": updateBidResult.WasBidSaved,