Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate proof check on sync #6501

Open
wants to merge 12 commits into
base: feat/equivalent-messages
Choose a base branch
from
3 changes: 3 additions & 0 deletions epochStart/bootstrap/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ func checkArguments(args ArgsEpochStartBootstrap) error {
if check.IfNil(args.NodesCoordinatorRegistryFactory) {
return fmt.Errorf("%s: %w", baseErrorMessage, nodesCoordinator.ErrNilNodesCoordinatorRegistryFactory)
}
if check.IfNil(args.EnableEpochsHandler) {
return fmt.Errorf("%s: %w", baseErrorMessage, epochStart.ErrNilEnableEpochsHandler)
}

return nil
}
227 changes: 200 additions & 27 deletions epochStart/bootstrap/epochStartMetaBlockProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/multiversx/mx-chain-core-go/data/block"
"github.com/multiversx/mx-chain-core-go/hashing"
"github.com/multiversx/mx-chain-core-go/marshal"
"github.com/multiversx/mx-chain-go/common"
"github.com/multiversx/mx-chain-go/epochStart"
"github.com/multiversx/mx-chain-go/process"
"github.com/multiversx/mx-chain-go/process/factory"
Expand All @@ -26,14 +27,25 @@ const minNumConnectedPeers = 1
var _ process.InterceptorProcessor = (*epochStartMetaBlockProcessor)(nil)

type epochStartMetaBlockProcessor struct {
messenger Messenger
requestHandler RequestHandler
marshalizer marshal.Marshalizer
hasher hashing.Hasher
mutReceivedMetaBlocks sync.RWMutex
mapReceivedMetaBlocks map[string]data.MetaHeaderHandler
mapMetaBlocksFromPeers map[string][]core.PeerID
messenger Messenger
requestHandler RequestHandler
marshalizer marshal.Marshalizer
hasher hashing.Hasher
enableEpochsHandler common.EnableEpochsHandler

mutReceivedMetaBlocks sync.RWMutex
mapReceivedMetaBlocks map[string]data.MetaHeaderHandler
mapMetaBlocksFromPeers map[string][]core.PeerID

// TODO: refactor to use a separate component for meta block sync handling
// for epoch start metablock and epoch start confirmation block
mutReceivedConfMetaBlocks sync.RWMutex
mapReceivedConfMetaBlocks map[string]data.MetaHeaderHandler
mapConfMetaBlocksFromPeers map[string][]core.PeerID

chanConsensusReached chan bool
chanMetaBlockReached chan bool
chanConfMetaBlockReached chan bool
metaBlock data.MetaHeaderHandler
peerCountTarget int
minNumConnectedPeers int
Expand All @@ -49,6 +61,7 @@ func NewEpochStartMetaBlockProcessor(
consensusPercentage uint8,
minNumConnectedPeersConfig int,
minNumOfPeersToConsiderBlockValidConfig int,
enableEpochsHandler common.EnableEpochsHandler,
) (*epochStartMetaBlockProcessor, error) {
if check.IfNil(messenger) {
return nil, epochStart.ErrNilMessenger
Expand All @@ -71,6 +84,9 @@ func NewEpochStartMetaBlockProcessor(
if minNumOfPeersToConsiderBlockValidConfig < minNumPeersToConsiderMetaBlockValid {
return nil, epochStart.ErrNotEnoughNumOfPeersToConsiderBlockValid
}
if check.IfNil(enableEpochsHandler) {
return nil, epochStart.ErrNilEnableEpochsHandler
}

processor := &epochStartMetaBlockProcessor{
messenger: messenger,
Expand All @@ -79,10 +95,15 @@ func NewEpochStartMetaBlockProcessor(
hasher: hasher,
minNumConnectedPeers: minNumConnectedPeersConfig,
minNumOfPeersToConsiderBlockValid: minNumOfPeersToConsiderBlockValidConfig,
enableEpochsHandler: enableEpochsHandler,
mutReceivedMetaBlocks: sync.RWMutex{},
mapReceivedMetaBlocks: make(map[string]data.MetaHeaderHandler),
mapMetaBlocksFromPeers: make(map[string][]core.PeerID),
mapReceivedConfMetaBlocks: make(map[string]data.MetaHeaderHandler),
mapConfMetaBlocksFromPeers: make(map[string][]core.PeerID),
chanConsensusReached: make(chan bool, 1),
chanMetaBlockReached: make(chan bool, 1),
chanConfMetaBlockReached: make(chan bool, 1),
}

processor.waitForEnoughNumConnectedPeers(messenger)
Expand Down Expand Up @@ -136,22 +157,50 @@ func (e *epochStartMetaBlockProcessor) Save(data process.InterceptedData, fromCo
return nil
}

if !metaBlock.IsStartOfEpochBlock() {
log.Debug("received metablock is not of type epoch start", "error", epochStart.ErrNotEpochStartBlock)
mbHash := interceptedHdr.Hash()

if metaBlock.IsStartOfEpochBlock() {
log.Debug("received epoch start meta", "epoch", metaBlock.GetEpoch(), "from peer", fromConnectedPeer.Pretty())
e.mutReceivedMetaBlocks.Lock()
e.mapReceivedMetaBlocks[string(mbHash)] = metaBlock
e.addToPeerList(string(mbHash), fromConnectedPeer)
e.mutReceivedMetaBlocks.Unlock()

return nil
}

mbHash := interceptedHdr.Hash()
if e.isEpochStartConfirmationBlock(metaBlock) {
log.Debug("received epoch start confirmation meta", "epoch", metaBlock.GetEpoch(), "from peer", fromConnectedPeer.Pretty())
e.mutReceivedConfMetaBlocks.Lock()
e.mapReceivedConfMetaBlocks[string(mbHash)] = metaBlock
e.addToConfPeerList(string(mbHash), fromConnectedPeer)
e.mutReceivedConfMetaBlocks.Unlock()

return nil
}

log.Debug("received epoch start meta", "epoch", metaBlock.GetEpoch(), "from peer", fromConnectedPeer.Pretty())
e.mutReceivedMetaBlocks.Lock()
e.mapReceivedMetaBlocks[string(mbHash)] = metaBlock
e.addToPeerList(string(mbHash), fromConnectedPeer)
e.mutReceivedMetaBlocks.Unlock()
log.Debug("received metablock is not of type epoch start", "error", epochStart.ErrNotEpochStartBlock)

return nil
}

func (e *epochStartMetaBlockProcessor) isEpochStartConfirmationBlock(metaBlock data.HeaderHandler) bool {
if !e.enableEpochsHandler.IsFlagEnabledInEpoch(common.EquivalentMessagesFlag, metaBlock.GetEpoch()) {
return false
}

startOfEpochMetaBlock, err := e.getMostReceivedMetaBlock()
if err != nil {
return false
}

if startOfEpochMetaBlock.GetNonce() != metaBlock.GetNonce()-1 {
return false
}

return true
}

// this func should be called under mutex protection
func (e *epochStartMetaBlockProcessor) addToPeerList(hash string, peer core.PeerID) {
peersListForHash := e.mapMetaBlocksFromPeers[hash]
Expand All @@ -163,6 +212,16 @@ func (e *epochStartMetaBlockProcessor) addToPeerList(hash string, peer core.Peer
e.mapMetaBlocksFromPeers[hash] = append(e.mapMetaBlocksFromPeers[hash], peer)
}

func (e *epochStartMetaBlockProcessor) addToConfPeerList(hash string, peer core.PeerID) {
peersListForHash := e.mapConfMetaBlocksFromPeers[hash]
for _, pid := range peersListForHash {
if pid == peer {
return
}
}
e.mapConfMetaBlocksFromPeers[hash] = append(e.mapConfMetaBlocksFromPeers[hash], peer)
}

// GetEpochStartMetaBlock will return the metablock after it is confirmed or an error if the number of tries was exceeded
// This is a blocking method which will end after the consensus for the meta block is obtained or the context is done
func (e *epochStartMetaBlockProcessor) GetEpochStartMetaBlock(ctx context.Context) (data.MetaHeaderHandler, error) {
Expand All @@ -180,27 +239,90 @@ func (e *epochStartMetaBlockProcessor) GetEpochStartMetaBlock(ctx context.Contex
}
}()

err = e.requestMetaBlock()
err = e.waitForMetaBlock(ctx)
if err != nil {
return nil, err
}

err = e.waitForConfMetaBlock(ctx)
if err != nil {
return nil, err
}

chanRequests := time.After(durationBetweenReRequests)
chanCheckMaps := time.After(durationBetweenChecks)

for {
select {
case <-e.chanConsensusReached:
return e.metaBlock, nil
case <-ctx.Done():
return e.getMostReceivedMetaBlock()
case <-chanCheckMaps:
e.checkMaps()
chanCheckMaps = time.After(durationBetweenChecks)
}
}
}

func (e *epochStartMetaBlockProcessor) waitForMetaBlock(ctx context.Context) error {
err := e.requestMetaBlock()
if err != nil {
return err
}

chanRequests := time.After(durationBetweenReRequests)
chanCheckMaps := time.After(durationBetweenChecks)

for {
select {
case <-e.chanMetaBlockReached:
return nil
case <-ctx.Done():
return epochStart.ErrTimeoutWaitingForMetaBlock
case <-chanRequests:
err = e.requestMetaBlock()
if err != nil {
return nil, err
return err
}
chanRequests = time.After(durationBetweenReRequests)
case <-chanCheckMaps:
e.checkMaps()
e.checkMetaBlockMaps()
chanCheckMaps = time.After(durationBetweenChecks)
}
}
}

func (e *epochStartMetaBlockProcessor) waitForConfMetaBlock(ctx context.Context) error {
if check.IfNil(e.metaBlock) {
return epochStart.ErrNilMetaBlock
}

if !e.enableEpochsHandler.IsFlagEnabledInEpoch(common.EquivalentMessagesFlag, e.metaBlock.GetEpoch()) {
return nil
}

err := e.requestConfirmationMetaBlock(e.metaBlock.GetNonce())
if err != nil {
return err
}

chanRequests := time.After(durationBetweenReRequests)
chanCheckMaps := time.After(durationBetweenChecks)

for {
select {
case <-e.chanConfMetaBlockReached:
return nil
case <-ctx.Done():
return epochStart.ErrTimeoutWaitingForMetaBlock
case <-chanRequests:
err = e.requestConfirmationMetaBlock(e.metaBlock.GetNonce())
if err != nil {
return err
}
chanRequests = time.After(durationBetweenReRequests)
case <-chanCheckMaps:
e.checkConfMetaBlockMaps()
chanCheckMaps = time.After(durationBetweenChecks)
}
}
Expand Down Expand Up @@ -238,27 +360,78 @@ func (e *epochStartMetaBlockProcessor) requestMetaBlock() error {
return nil
}

func (e *epochStartMetaBlockProcessor) checkMaps() {
func (e *epochStartMetaBlockProcessor) requestConfirmationMetaBlock(nonce uint64) error {
numConnectedPeers := len(e.messenger.ConnectedPeers())
err := e.requestHandler.SetNumPeersToQuery(factory.MetachainBlocksTopic, numConnectedPeers, numConnectedPeers)
if err != nil {
return err
}

e.requestHandler.RequestMetaHeaderByNonce(nonce)

return nil
}

func (e *epochStartMetaBlockProcessor) checkMetaBlockMaps() {
e.mutReceivedMetaBlocks.RLock()
defer e.mutReceivedMetaBlocks.RUnlock()

for hash, peersList := range e.mapMetaBlocksFromPeers {
hash, metaBlockFound := e.checkReceivedMetaBlock(e.mapMetaBlocksFromPeers)
if metaBlockFound {
e.metaBlock = e.mapReceivedMetaBlocks[hash]
e.chanMetaBlockReached <- true
}
}

func (e *epochStartMetaBlockProcessor) checkConfMetaBlockMaps() {
e.mutReceivedConfMetaBlocks.RLock()
defer e.mutReceivedConfMetaBlocks.RUnlock()

_, confMetaBlockFound := e.checkReceivedMetaBlock(e.mapConfMetaBlocksFromPeers)
if confMetaBlockFound {
e.chanConfMetaBlockReached <- true
}
}

func (e *epochStartMetaBlockProcessor) checkMaps() {
e.mutReceivedMetaBlocks.RLock()
_, metaBlockFound := e.checkReceivedMetaBlock(e.mapMetaBlocksFromPeers)
e.mutReceivedMetaBlocks.RUnlock()

consensusReached := metaBlockFound
if e.enableEpochsHandler.IsFlagEnabledInEpoch(common.EquivalentMessagesFlag, e.metaBlock.GetEpoch()) {
e.mutReceivedConfMetaBlocks.RLock()
_, confMetaBlockFound := e.checkReceivedMetaBlock(e.mapConfMetaBlocksFromPeers)
e.mutReceivedConfMetaBlocks.RUnlock()

consensusReached = metaBlockFound && confMetaBlockFound
}

// no need to check proof here since it is checked in interceptor
if consensusReached {
e.chanConsensusReached <- true
}
}

func (e *epochStartMetaBlockProcessor) checkReceivedMetaBlock(blocksFromPeers map[string][]core.PeerID) (string, bool) {
for hash, peersList := range blocksFromPeers {
log.Debug("metablock from peers", "num peers", len(peersList), "target", e.peerCountTarget, "hash", []byte(hash))
found := e.processEntry(peersList, hash)
if found {
break

metaBlockFound := e.processMetaBlockEntry(peersList, hash)
if metaBlockFound {
return hash, true
}
}

return "", false
}

func (e *epochStartMetaBlockProcessor) processEntry(
func (e *epochStartMetaBlockProcessor) processMetaBlockEntry(
peersList []core.PeerID,
hash string,
) bool {
if len(peersList) >= e.peerCountTarget {
log.Info("got consensus for epoch start metablock", "len", len(peersList))
e.metaBlock = e.mapReceivedMetaBlocks[hash]
e.chanConsensusReached <- true
return true
}

Expand Down
Loading
Loading