Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add optimistic header v2 endpoint #618

Open
wants to merge 1 commit into
base: deneb-optimistic
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 21 additions & 3 deletions common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ var (

SlotsPerEpoch = uint64(cli.GetEnvInt("SLOTS_PER_EPOCH", 32))
DurationPerEpoch = DurationPerSlot * time.Duration(SlotsPerEpoch)

EmptyTxRoot = "0x7ffe241ea60187fdb0187bfa22de35d1f9bed7ab061d9401fd47e34a54fbede1"
)

func SlotToEpoch(slot uint64) uint64 {
Expand All @@ -38,10 +40,10 @@ type BuilderStatus struct {
IsOptimistic bool
}

// Profile captures performance metrics for the block submission handler. Each
// BlockSubmissionProfile captures performance metrics for the block submission handler. Each
// field corresponds to the number of microseconds in each stage. The `Total`
// field is the number of microseconds taken for entire flow.
type Profile struct {
type BlockSubmissionProfile struct {
PayloadLoad uint64
Decode uint64
Prechecks uint64
Expand All @@ -50,6 +52,22 @@ type Profile struct {
Total uint64
}

func (p *Profile) String() string {
func (p *BlockSubmissionProfile) String() string {
return fmt.Sprintf("%v,%v,%v,%v,%v", p.Decode, p.Prechecks, p.Simulation, p.RedisUpdate, p.Total)
}

// HeaderSubmissionProfile captures performance metrics for the header submission handler. Each
// field corresponds to the number of microseconds at the start of each stage.
type HeaderSubmissionProfile struct {
PayloadLoad uint64
Decode uint64
Prechecks uint64
Signature uint64
RedisChecks uint64
RedisUpdate uint64
Total uint64
}

func (p *HeaderSubmissionProfile) String() string {
return fmt.Sprintf("%v,%v,%v,%v,%v,%v,%v", p.PayloadLoad, p.Decode, p.Prechecks, p.Signature, p.RedisChecks, p.RedisUpdate, p.Total)
}
5 changes: 5 additions & 0 deletions common/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,14 @@ func CreateTestBlockSubmission(t *testing.T, builderPubkey string, value *uint25
Message: bidTrace,
ExecutionPayload: &deneb.ExecutionPayload{ //nolint:exhaustruct
BaseFeePerGas: uint256.NewInt(0),
ExtraData: make([]byte, 32),
Transactions: make([]bellatrix.Transaction, 0),
Withdrawals: make([]*capella.Withdrawal, 0),
},
BlobsBundle: &builderApiDeneb.BlobsBundle{ //nolint:exhaustruct
Commitments: make([]deneb.KZGCommitment, 0),
Proofs: make([]deneb.KZGProof, 0),
Blobs: make([]deneb.Blob, 0),
},
Signature: phase0.BLSSignature{},
},
Expand Down
4 changes: 2 additions & 2 deletions database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type IDatabaseService interface {
GetValidatorRegistration(pubkey string) (*ValidatorRegistrationEntry, error)
GetValidatorRegistrationsForPubkeys(pubkeys []string) ([]*ValidatorRegistrationEntry, error)

SaveBuilderBlockSubmission(payload *common.VersionedSubmitBlockRequest, requestError, validationError error, receivedAt, eligibleAt time.Time, wasSimulated, saveExecPayload bool, profile common.Profile, optimisticSubmission bool) (entry *BuilderBlockSubmissionEntry, err error)
SaveBuilderBlockSubmission(payload *common.VersionedSubmitBlockRequest, requestError, validationError error, receivedAt, eligibleAt time.Time, wasSimulated, saveExecPayload bool, profile common.BlockSubmissionProfile, optimisticSubmission bool) (entry *BuilderBlockSubmissionEntry, err error)
GetBlockSubmissionEntry(slot uint64, proposerPubkey, blockHash string) (entry *BuilderBlockSubmissionEntry, err error)
GetBuilderSubmissions(filters GetBuilderSubmissionsFilters) ([]*BuilderBlockSubmissionEntry, error)
GetBuilderSubmissionsBySlots(slotFrom, slotTo uint64) (entries []*BuilderBlockSubmissionEntry, err error)
Expand Down Expand Up @@ -175,7 +175,7 @@ func (s *DatabaseService) GetLatestValidatorRegistrations(timestampOnly bool) ([
return registrations, err
}

func (s *DatabaseService) SaveBuilderBlockSubmission(payload *common.VersionedSubmitBlockRequest, requestError, validationError error, receivedAt, eligibleAt time.Time, wasSimulated, saveExecPayload bool, profile common.Profile, optimisticSubmission bool) (entry *BuilderBlockSubmissionEntry, err error) {
func (s *DatabaseService) SaveBuilderBlockSubmission(payload *common.VersionedSubmitBlockRequest, requestError, validationError error, receivedAt, eligibleAt time.Time, wasSimulated, saveExecPayload bool, profile common.BlockSubmissionProfile, optimisticSubmission bool) (entry *BuilderBlockSubmissionEntry, err error) {
// Save execution_payload: insert, or if already exists update to be able to return the id ('on conflict do nothing' doesn't return an id)
execPayloadEntry, err := PayloadToExecPayloadEntry(payload)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion database/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ var (
feeRecipient = bellatrix.ExecutionAddress{0x02}
blockHashStr = "0xa645370cc112c2e8e3cce121416c7dc849e773506d4b6fb9b752ada711355369"
testDBDSN = common.GetEnv("TEST_DB_DSN", "postgres://postgres:postgres@localhost:5432/postgres?sslmode=disable")
profile = common.Profile{
profile = common.BlockSubmissionProfile{
Decode: 42,
Prechecks: 43,
Simulation: 44,
Expand Down
2 changes: 1 addition & 1 deletion database/mockdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (db MockDB) GetLatestValidatorRegistrations(timestampOnly bool) ([]*Validat
return nil, nil
}

func (db MockDB) SaveBuilderBlockSubmission(payload *common.VersionedSubmitBlockRequest, requestError, validationError error, receivedAt, eligibleAt time.Time, wasSimulated, saveExecPayload bool, profile common.Profile, optimisticSubmission bool) (entry *BuilderBlockSubmissionEntry, err error) {
func (db MockDB) SaveBuilderBlockSubmission(payload *common.VersionedSubmitBlockRequest, requestError, validationError error, receivedAt, eligibleAt time.Time, wasSimulated, saveExecPayload bool, profile common.BlockSubmissionProfile, optimisticSubmission bool) (entry *BuilderBlockSubmissionEntry, err error) {
return nil, nil
}

Expand Down
104 changes: 56 additions & 48 deletions datastore/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

builderApi "github.com/attestantio/go-builder-client/api"
builderApiDeneb "github.com/attestantio/go-builder-client/api/deneb"
builderApiV1 "github.com/attestantio/go-builder-client/api/v1"
builderSpec "github.com/attestantio/go-builder-client/spec"
"github.com/attestantio/go-eth2-client/spec"
"github.com/attestantio/go-eth2-client/spec/capella"
Expand Down Expand Up @@ -496,31 +497,26 @@ type SaveBidAndUpdateTopBidResponse struct {
PrevTopBidValue *big.Int

TimePrep time.Duration
TimeSavePayload time.Duration
TimeSaveBid time.Duration
TimeSaveTrace time.Duration
TimeSavePayload time.Duration
TimeUpdateTopBid time.Duration
TimeUpdateFloor time.Duration
}

func (r *RedisCache) SaveBidAndUpdateTopBid(ctx context.Context, pipeliner redis.Pipeliner, trace *common.BidTraceV2WithBlobFields, payload *common.VersionedSubmitBlockRequest, getPayloadResponse *builderApi.VersionedSubmitBlindedBlockResponse, getHeaderResponse *builderSpec.VersionedSignedBuilderBid, reqReceivedAt time.Time, isCancellationEnabled bool, floorValue *big.Int) (state SaveBidAndUpdateTopBidResponse, err error) {
func (r *RedisCache) SaveBidAndUpdateTopBid(ctx context.Context, pipeliner redis.Pipeliner, trace *builderApiV1.BidTrace, blockSubmission *common.BlockSubmissionInfo, getPayloadResponse *builderApi.VersionedSubmitBlindedBlockResponse, getHeaderResponse *builderSpec.VersionedSignedBuilderBid, reqReceivedAt time.Time, isCancellationEnabled bool, floorValue *big.Int) (state SaveBidAndUpdateTopBidResponse, err error) {
var prevTime, nextTime time.Time
prevTime = time.Now()

submission, err := common.GetBlockSubmissionInfo(payload)
if err != nil {
return state, err
}

// Load latest bids for a given slot+parent+proposer
builderBids, err := NewBuilderBidsFromRedis(ctx, r, pipeliner, submission.BidTrace.Slot, submission.BidTrace.ParentHash.String(), submission.BidTrace.ProposerPubkey.String())
builderBids, err := NewBuilderBidsFromRedis(ctx, r, pipeliner, trace.Slot, trace.ParentHash.String(), trace.ProposerPubkey.String())
if err != nil {
return state, err
}

// Load floor value (if not passed in already)
if floorValue == nil {
floorValue, err = r.GetFloorBidValue(ctx, pipeliner, submission.BidTrace.Slot, submission.BidTrace.ParentHash.String(), submission.BidTrace.ProposerPubkey.String())
floorValue, err = r.GetFloorBidValue(ctx, pipeliner, trace.Slot, trace.ParentHash.String(), trace.ProposerPubkey.String())
if err != nil {
return state, err
}
Expand All @@ -534,7 +530,7 @@ func (r *RedisCache) SaveBidAndUpdateTopBid(ctx context.Context, pipeliner redis
state.PrevTopBidValue = state.TopBidValue

// Abort now if non-cancellation bid is lower than floor value
isBidAboveFloor := submission.BidTrace.Value.ToBig().Cmp(floorValue) == 1
isBidAboveFloor := trace.Value.ToBig().Cmp(floorValue) == 1
if !isCancellationEnabled && !isBidAboveFloor {
return state, nil
}
Expand All @@ -547,61 +543,73 @@ func (r *RedisCache) SaveBidAndUpdateTopBid(ctx context.Context, pipeliner redis
//
// Time to save things in Redis
//
// 1. Save the execution payload
switch payload.Version {
case spec.DataVersionCapella:
err = r.SaveExecutionPayloadCapella(ctx, pipeliner, submission.BidTrace.Slot, submission.BidTrace.ProposerPubkey.String(), submission.BidTrace.BlockHash.String(), getPayloadResponse.Capella)
if err != nil {
return state, err
}
case spec.DataVersionDeneb:
err = r.SavePayloadContentsDeneb(ctx, pipeliner, submission.BidTrace.Slot, submission.BidTrace.ProposerPubkey.String(), submission.BidTrace.BlockHash.String(), getPayloadResponse.Deneb)
if err != nil {
return state, err
}
case spec.DataVersionUnknown, spec.DataVersionPhase0, spec.DataVersionAltair, spec.DataVersionBellatrix:
return state, fmt.Errorf("unsupported payload version: %s", payload.Version) //nolint:goerr113
}

// Record time needed to save payload
nextTime = time.Now().UTC()
state.TimeSavePayload = nextTime.Sub(prevTime)
prevTime = nextTime

// 2. Save latest bid for this builder
err = r.SaveBuilderBid(ctx, pipeliner, submission.BidTrace.Slot, submission.BidTrace.ParentHash.String(), submission.BidTrace.ProposerPubkey.String(), submission.BidTrace.BuilderPubkey.String(), reqReceivedAt, getHeaderResponse)
// 1. Save latest bid for this builder
err = r.SaveBuilderBid(ctx, pipeliner, trace.Slot, trace.ParentHash.String(), trace.ProposerPubkey.String(), trace.BuilderPubkey.String(), reqReceivedAt, getHeaderResponse)
if err != nil {
return state, err
}
builderBids.bidValues[submission.BidTrace.BuilderPubkey.String()] = submission.BidTrace.Value.ToBig()
builderBids.bidValues[trace.BuilderPubkey.String()] = trace.Value.ToBig()

// Record time needed to save bid
nextTime = time.Now().UTC()
state.TimeSaveBid = nextTime.Sub(prevTime)
prevTime = nextTime

// 3. Save the bid trace
err = r.SaveBidTrace(ctx, pipeliner, trace)
if err != nil {
return state, err
// 2. Save the bid trace
if blockSubmission != nil {
bidTrace := common.BidTraceV2WithBlobFields{
BidTrace: *trace,
BlockNumber: blockSubmission.BlockNumber,
NumTx: uint64(len(blockSubmission.Transactions)),
NumBlobs: uint64(len(blockSubmission.Blobs)),
BlobGasUsed: blockSubmission.BlobGasUsed,
ExcessBlobGas: blockSubmission.ExcessBlobGas,
}
err = r.SaveBidTrace(ctx, pipeliner, &bidTrace)
if err != nil {
return state, err
}

// Record time needed to save trace
nextTime = time.Now().UTC()
state.TimeSaveTrace = nextTime.Sub(prevTime)
prevTime = nextTime
}

// Record time needed to save trace
nextTime = time.Now().UTC()
state.TimeSaveTrace = nextTime.Sub(prevTime)
prevTime = nextTime
// 3. Save the execution payload
if getPayloadResponse != nil {
switch getPayloadResponse.Version {
case spec.DataVersionCapella:
err = r.SaveExecutionPayloadCapella(ctx, pipeliner, trace.Slot, trace.ProposerPubkey.String(), trace.BlockHash.String(), getPayloadResponse.Capella)
if err != nil {
return state, err
}
case spec.DataVersionDeneb:
err = r.SavePayloadContentsDeneb(ctx, pipeliner, trace.Slot, trace.ProposerPubkey.String(), trace.BlockHash.String(), getPayloadResponse.Deneb)
if err != nil {
return state, err
}
case spec.DataVersionUnknown, spec.DataVersionPhase0, spec.DataVersionAltair, spec.DataVersionBellatrix:
return state, fmt.Errorf("unsupported payload version: %s", getPayloadResponse.Version) //nolint:goerr113
}

// Record time needed to save payload
nextTime = time.Now().UTC()
state.TimeSavePayload = nextTime.Sub(prevTime)
prevTime = nextTime
}

// If top bid value hasn't change, abort now
_, state.TopBidValue = builderBids.getTopBid()
if state.TopBidValue.Cmp(state.PrevTopBidValue) == 0 {
return state, nil
}

state, err = r._updateTopBid(ctx, pipeliner, state, builderBids, submission.BidTrace.Slot, submission.BidTrace.ParentHash.String(), submission.BidTrace.ProposerPubkey.String(), floorValue)
state, err = r._updateTopBid(ctx, pipeliner, state, builderBids, trace.Slot, trace.ParentHash.String(), trace.ProposerPubkey.String(), floorValue)
if err != nil {
return state, err
}
state.IsNewTopBid = submission.BidTrace.Value.ToBig().Cmp(state.TopBidValue) == 0
state.IsNewTopBid = trace.Value.ToBig().Cmp(state.TopBidValue) == 0
// An Exec happens in _updateTopBid.
state.WasBidSaved = true

Expand All @@ -615,8 +623,8 @@ func (r *RedisCache) SaveBidAndUpdateTopBid(ctx context.Context, pipeliner redis
}

// Non-cancellable bid above floor should set new floor
keyBidSource := r.keyLatestBidByBuilder(submission.BidTrace.Slot, submission.BidTrace.ParentHash.String(), submission.BidTrace.ProposerPubkey.String(), submission.BidTrace.BuilderPubkey.String())
keyFloorBid := r.keyFloorBid(submission.BidTrace.Slot, submission.BidTrace.ParentHash.String(), submission.BidTrace.ProposerPubkey.String())
keyBidSource := r.keyLatestBidByBuilder(trace.Slot, trace.ParentHash.String(), trace.ProposerPubkey.String(), trace.BuilderPubkey.String())
keyFloorBid := r.keyFloorBid(trace.Slot, trace.ParentHash.String(), trace.ProposerPubkey.String())
c := pipeliner.Copy(ctx, keyBidSource, keyFloorBid, 0, true)
_, err = pipeliner.Exec(ctx)
if err != nil {
Expand All @@ -634,8 +642,8 @@ func (r *RedisCache) SaveBidAndUpdateTopBid(ctx context.Context, pipeliner redis
return state, err
}

keyFloorBidValue := r.keyFloorBidValue(submission.BidTrace.Slot, submission.BidTrace.ParentHash.String(), submission.BidTrace.ProposerPubkey.String())
err = pipeliner.Set(ctx, keyFloorBidValue, submission.BidTrace.Value.Dec(), expiryBidCache).Err()
keyFloorBidValue := r.keyFloorBidValue(trace.Slot, trace.ParentHash.String(), trace.ProposerPubkey.String())
err = pipeliner.Set(ctx, keyFloorBidValue, trace.Value.Dec(), expiryBidCache).Err()
if err != nil {
return state, err
}
Expand Down
Loading
Loading