Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor types & consensus core #66

Open
wants to merge 10 commits into
base: dev
Choose a base branch
from
251 changes: 165 additions & 86 deletions consensus/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package consensus
// uses common for datatypes
import (
"bytes"
"context"
"encoding/hex"
"fmt"
"log"
Expand Down Expand Up @@ -181,48 +180,72 @@ func (con ConsensusClient) Expected_current_slot() uint64 {
}

func sync_fallback(inner *Inner, fallback *string) error {
cf, err := (&checkpoints.CheckpointFallback{}).FetchLatestCheckpointFromApi(*fallback)
if err != nil {
return errors.Wrap(err, "failed to fetch checkpoint from API")
}
return inner.sync(cf)
// Create a buffered channel to receive any errors from the goroutine
errorChan := make(chan error, 1)

go func() {
// Attempt to fetch the latest checkpoint from the API
cf, err := (&checkpoints.CheckpointFallback{}).FetchLatestCheckpointFromApi(*fallback)
if err != nil {

errorChan <- err
return
}

if err := inner.sync(cf); err != nil {

errorChan <- err
return
}

errorChan <- nil
}()

return <-errorChan
}

func sync_all_fallback(inner *Inner, chainID uint64) error {
var n config.Network
network, err := n.ChainID(chainID)
if err != nil {
return err
}
errorChan := make(chan error, 1)

ch := checkpoints.CheckpointFallback{}
go func() {

checkpointFallback, errWhileCheckpoint := ch.Build()
if errWhileCheckpoint != nil {
return err
}
ch := checkpoints.CheckpointFallback{}

chainId := network.Chain.ChainID
var networkName config.Network
if chainId == 1 {
networkName = config.MAINNET
} else if chainId == 5 {
networkName = config.GOERLI
} else if chainId == 11155111 {
networkName = config.SEPOLIA
} else {
return errors.New("chain id not recognized")
}
checkpointFallback, errWhileCheckpoint := ch.Build()
if errWhileCheckpoint != nil {
errorChan <- errWhileCheckpoint
return
}

// Fetch the latest checkpoint from the network
checkpoint := checkpointFallback.FetchLatestCheckpoint(networkName)
chainId := network.Chain.ChainID
var networkName config.Network
if chainId == 1 {
networkName = config.MAINNET
} else if chainId == 5 {
networkName = config.GOERLI
} else if chainId == 11155111 {
networkName = config.SEPOLIA
} else {
errorChan <- errors.New("chain id not recognized")
return
}

// Sync using the inner struct's sync method
if err := inner.sync(checkpoint); err != nil {
return err
}
// Fetch the latest checkpoint from the network
checkpoint := checkpointFallback.FetchLatestCheckpoint(networkName)

return nil
// Sync using the inner struct's sync method
if err := inner.sync(checkpoint); err != nil {
errorChan <- err
}

errorChan <- nil
}()
return <-errorChan
}

func (in *Inner) New(rpcURL string, blockSend chan common.Block, finalizedBlockSend chan *common.Block, checkpointSend chan *[]byte, config *config.Config) *Inner {
Expand All @@ -240,21 +263,40 @@ func (in *Inner) New(rpcURL string, blockSend chan common.Block, finalizedBlockS

}
func (in *Inner) Check_rpc() error {
chainID, err := in.RPC.ChainId()
if err != nil {
return err
}
if chainID != in.Config.Chain.ChainID {
return ErrIncorrectRpcNetwork
}
return nil
errorChan := make(chan error, 1)

go func() {
chainID, err := in.RPC.ChainId()
if err != nil {
errorChan <- err
return
}
if chainID != in.Config.Chain.ChainID {
errorChan <- ErrIncorrectRpcNetwork
return
}
errorChan <- nil
}()
return <-errorChan
}
func (in *Inner) get_execution_payload(ctx context.Context, slot *uint64) (*consensus_core.ExecutionPayload, error) {
block, err := in.RPC.GetBlock(*slot)
if err != nil {
func (in *Inner) get_execution_payload(slot *uint64) (*consensus_core.ExecutionPayload, error) {
errorChan := make(chan error, 1)
blockChan := make(chan consensus_core.BeaconBlock, 1)
go func() {
var err error
block, err := in.RPC.GetBlock(*slot)
if err != nil {
errorChan <- err
}
errorChan <- nil
blockChan <- block
}()

if err := <-errorChan; err != nil {
return nil, err
}

block := <-blockChan
blockHash, err := utils.TreeHashRoot(block.Body.ToBytes())
if err != nil {
return nil, err
Expand Down Expand Up @@ -288,7 +330,7 @@ func (in *Inner) get_execution_payload(ctx context.Context, slot *uint64) (*cons
return &payload, nil
}

func (in *Inner) Get_payloads(ctx context.Context, startSlot, endSlot uint64) ([]interface{}, error) {
func (in *Inner) Get_payloads(startSlot, endSlot uint64) ([]interface{}, error) {
var payloads []interface{}

// Fetch the block at endSlot to get the initial parent hash
Expand Down Expand Up @@ -345,11 +387,22 @@ func (in *Inner) Get_payloads(ctx context.Context, startSlot, endSlot uint64) ([
}
}
func (in *Inner) advance() error {
// Fetch and apply finality update
finalityUpdate, err := in.RPC.GetFinalityUpdate()
if err != nil {
return err
ErrorChan := make(chan error, 1)
finalityChan := make(chan consensus_core.FinalityUpdate, 1)

go func() {
finalityUpdate, err := in.RPC.GetFinalityUpdate()
if err != nil {
ErrorChan <- err
return
}
finalityChan <- finalityUpdate
ErrorChan <- nil
}()
if ErrorChan != nil {
return <-ErrorChan
}
finalityUpdate := <-finalityChan
if err := in.verify_finality_update(&finalityUpdate); err != nil {
return err
}
Expand Down Expand Up @@ -394,60 +447,72 @@ func (in *Inner) sync(checkpoint [32]byte) error {
// Perform bootstrap with the given checkpoint
in.bootstrap(checkpoint)

// Calculate the current sync period

currentPeriod := utils.CalcSyncPeriod(in.Store.FinalizedHeader.Slot)

// Fetch updates
updates, err := in.RPC.GetUpdates(currentPeriod, MAX_REQUEST_LIGHT_CLIENT_UPDATES)
if err != nil {
return err
}
errorChan := make(chan error, 1)
var updates []consensus_core.Update
var err error
go func() {
updates, err = in.RPC.GetUpdates(currentPeriod, MAX_REQUEST_LIGHT_CLIENT_UPDATES)
if err != nil {
errorChan <- err
}

// Apply updates
for _, update := range updates {
if err := in.verify_update(&update); err != nil {
return err
// Apply updates
for _, update := range updates {
if err := in.verify_update(&update); err != nil {
errorChan <- err
return
}
in.apply_update(&update)
}
in.apply_update(&update)
}

// Fetch and apply finality update
finalityUpdate, err := in.RPC.GetFinalityUpdate()
if err != nil {
return err
}
if err := in.verify_finality_update(&finalityUpdate); err != nil {
return err
}
in.apply_finality_update(&finalityUpdate)
finalityUpdate, err := in.RPC.GetFinalityUpdate()
if err != nil {
errorChan <- err
return
}
if err := in.verify_finality_update(&finalityUpdate); err != nil {
errorChan <- err
return
}
in.apply_finality_update(&finalityUpdate)

// Fetch and apply optimistic update
// Fetch and apply optimistic update

optimisticUpdate, err := in.RPC.GetOptimisticUpdate()
if err != nil {
return err
}
if err := in.verify_optimistic_update(&optimisticUpdate); err != nil {
optimisticUpdate, err := in.RPC.GetOptimisticUpdate()
if err != nil {
errorChan <- err
return
}
if err := in.verify_optimistic_update(&optimisticUpdate); err != nil {
errorChan <- err
return
}
in.apply_optimistic_update(&optimisticUpdate)
errorChan <- nil
log.Printf("consensus client in sync with checkpoint: 0x%s", hex.EncodeToString(checkpoint[:]))
}()

if err := <-errorChan; err != nil {
return err
}
in.apply_optimistic_update(&optimisticUpdate)

// Log the success message
log.Printf("consensus client in sync with checkpoint: 0x%s", hex.EncodeToString(checkpoint[:]))

return nil
}
func (in *Inner) send_blocks() error {
// Get slot from the optimistic header
slot := in.Store.OptimisticHeader.Slot
payload, err := in.get_execution_payload(context.Background(), &slot)
payload, err := in.get_execution_payload(&slot)
if err != nil {
return err
}

// Get finalized slot from the finalized header
finalizedSlot := in.Store.FinalizedHeader.Slot
finalizedPayload, err := in.get_execution_payload(context.Background(), &finalizedSlot)
finalizedPayload, err := in.get_execution_payload(&finalizedSlot)
if err != nil {
return err
}
Expand Down Expand Up @@ -493,12 +558,25 @@ func (in *Inner) duration_until_next_update() time.Duration {
return time.Duration(nextUpdate) * time.Second
}
func (in *Inner) bootstrap(checkpoint [32]byte) {

bootstrap, errInBootstrap := in.RPC.GetBootstrap(checkpoint)
if errInBootstrap != nil {
log.Printf("failed to fetch bootstrap: %v", errInBootstrap)
errorChan := make(chan error, 1)
bootstrapChan := make(chan consensus_core.Bootstrap, 1)
go func() {
bootstrap, errInBootstrap := in.RPC.GetBootstrap(checkpoint)

if errInBootstrap != nil {
log.Printf("failed to fetch bootstrap: %v", errInBootstrap)
errorChan <- errInBootstrap
return
}
bootstrapChan <- bootstrap
errorChan <- nil
}()
if err := <-errorChan; err != nil {
return
}
bootstrap := <-bootstrapChan



isValid := in.is_valid_checkpoint(bootstrap.Header.Slot)
if !isValid {
Expand Down Expand Up @@ -547,7 +625,7 @@ func apply_bootstrap(store *LightClientStore, bootstrap consensus_core.Bootstrap

func (in *Inner) verify_generic_update(update *GenericUpdate, expectedCurrentSlot uint64, store *LightClientStore, genesisRoots []byte, forks consensus_core.Forks) error {
{
bits := getBits(update.SyncAggregate.SyncCommitteeBits)
bits := getBits(update.SyncAggregate.SyncCommitteeBits[:])
if bits == 0 {
return ErrInsufficientParticipation
}
Expand Down Expand Up @@ -614,6 +692,7 @@ func (in *Inner) verify_generic_update(update *GenericUpdate, expectedCurrentSlo

forkVersion := utils.CalculateForkVersion(&forks, update.SignatureSlot)
forkDataRoot := utils.ComputeForkDataRoot(forkVersion, consensus_core.Bytes32(in.Config.Chain.GenesisRoot))


if !verifySyncCommitteeSignature(pks, &update.AttestedHeader, &update.SyncAggregate.SyncCommitteeSignature, forkDataRoot) {
return ErrInvalidSignature
Expand Down Expand Up @@ -653,7 +732,7 @@ func (in *Inner) verify_optimistic_update(update *consensus_core.OptimisticUpdat
return in.verify_generic_update(&genUpdate, in.expected_current_slot(), &in.Store, in.Config.Chain.GenesisRoot, in.Config.Forks)
}
func (in *Inner) apply_generic_update(store *LightClientStore, update *GenericUpdate) *[]byte {
committeeBits := getBits(update.SyncAggregate.SyncCommitteeBits)
committeeBits := getBits(update.SyncAggregate.SyncCommitteeBits[:])

// Update max active participants
if committeeBits > store.CurrentMaxActiveParticipants {
Expand Down Expand Up @@ -766,7 +845,7 @@ func (in *Inner) apply_optimistic_update(update *consensus_core.OptimisticUpdate
}
}
func (in *Inner) Log_finality_update(update *consensus_core.FinalityUpdate) {
participation := float32(getBits(update.SyncAggregate.SyncCommitteeBits)) / 512.0 * 100.0
participation := float32(getBits(update.SyncAggregate.SyncCommitteeBits[:])) / 512.0 * 100.0
decimals := 2
if participation == 100.0 {
decimals = 1
Expand All @@ -784,7 +863,7 @@ func (in *Inner) Log_finality_update(update *consensus_core.FinalityUpdate) {
)
}
func (in *Inner) Log_optimistic_update(update *consensus_core.OptimisticUpdate) {
participation := float32(getBits(update.SyncAggregate.SyncCommitteeBits)) / 512.0 * 100.0
participation := float32(getBits(update.SyncAggregate.SyncCommitteeBits[:])) / 512.0 * 100.0
decimals := 2
if participation == 100.0 {
decimals = 1
Expand Down Expand Up @@ -1005,7 +1084,7 @@ func processTransaction(txBytes *[1073741824]byte, blockHash consensus_core.Byte
}

// getBits counts the number of bits set to 1 in a [64]byte array
func getBits(bitfield [64]byte) uint64 {
func getBits(bitfield []byte) uint64 {
var count uint64
for _, b := range bitfield {
count += uint64(popCount(b))
Expand Down
Loading
Loading