From 73b072b0b2813e499847865ac7380c32e890b885 Mon Sep 17 00:00:00 2001 From: Kuba <127198012+kuba-4chain@users.noreply.github.com> Date: Tue, 12 Mar 2024 10:52:32 +0100 Subject: [PATCH] feat(BUX-631): prevent unneeded header sync (#220) --- service/chain_service.go | 22 +++++++++---- service/chain_service_test.go | 54 +++++++++++++------------------ transports/p2p/p2psync/manager.go | 14 ++++++-- 3 files changed, 49 insertions(+), 41 deletions(-) diff --git a/service/chain_service.go b/service/chain_service.go index 6a56633e..81c3520c 100644 --- a/service/chain_service.go +++ b/service/chain_service.go @@ -1,6 +1,8 @@ package service import ( + "strings" + "github.com/bitcoin-sv/block-headers-service/domains" "github.com/bitcoin-sv/block-headers-service/internal/chaincfg" "github.com/bitcoin-sv/block-headers-service/internal/chaincfg/chainhash" @@ -18,7 +20,7 @@ type BlockHasher interface { // Notification is "port" through which chain service can notify clients about important events. type Notification interface { - //Notify notifies about new header stored. + // Notify notifies about new header stored. Notify(any) } @@ -60,6 +62,11 @@ func NewChainsService( func (cs *chainService) Add(bs domains.BlockHeaderSource) (*domains.BlockHeader, error) { hash := cs.BlockHasher.BlockHash(&bs) + existingHeader, err := cs.Headers.GetHeaderByHash(hash.String()) + if existingHeader != nil { + return nil, HeaderAlreadyExists.error() + } + if cs.ignoreBlockHash(&hash) { cs.log.Warn().Msgf("Message rejected - containing forbidden header") return domains.NewRejectedBlockHeader(hash), BlockRejected.error() @@ -236,17 +243,20 @@ type AddBlockError struct { type AddBlockErrorCode string const ( - //BlockRejected error code representing situation when block is on the blacklist. + // BlockRejected error code representing situation when block is on the blacklist. BlockRejected AddBlockErrorCode = "BlockRejected" - //HeaderCreationFail error code representing situation when block cannot be created from source. + // HeaderCreationFail error code representing situation when block cannot be created from source. HeaderCreationFail AddBlockErrorCode = "HeaderCreationFail" - //ChainUpdateFail error code representing situation when STALE chain should become Longest chain but the update of chains failed. + // ChainUpdateFail error code representing situation when STALE chain should become Longest chain but the update of chains failed. ChainUpdateFail AddBlockErrorCode = "ChainUpdateFail" - //HeaderSaveFail error code representing situation when saving header in the repository failed. + // HeaderSaveFail error code representing situation when saving header in the repository failed. HeaderSaveFail AddBlockErrorCode = "HeaderSaveFail" + + // HeaderAlreadyExists error code representing situation when header received from peers already exists in db. + HeaderAlreadyExists AddBlockErrorCode = "HeaderAlreadyExists" ) func (e *AddBlockError) Error() string { @@ -272,5 +282,5 @@ func (c AddBlockErrorCode) causedBy(cause *error) error { // Is checks if given error contains AddBlockErrorCode. func (c AddBlockErrorCode) Is(err error) bool { - return err != nil && err.Error() == c.String() + return err != nil && strings.Contains(err.Error(), c.String()) } diff --git a/service/chain_service_test.go b/service/chain_service_test.go index 95d475fe..968cf5a7 100644 --- a/service/chain_service_test.go +++ b/service/chain_service_test.go @@ -16,16 +16,16 @@ import ( ) func TestRejectBlockHeader(t *testing.T) { - //given + // given r, longestChainTip := givenLongestChainInRepository() h, hash := givenIgnoredHeaderToAddNextTo(longestChainTip) cs := createChainsService(serviceSetup{Repositories: &r, IgnoredHash: hash}) - //when + // when header, err := cs.Add(h) - //then + // then assert.IsError(t, err, BlockRejected.String()) assertHeaderExist(t, header) @@ -34,16 +34,16 @@ func TestRejectBlockHeader(t *testing.T) { } func TestAddTheHeaderToLongestChain(t *testing.T) { - //given + // given r, longestChainTip := givenChainWithOnlyGenesisBlockInRepository() h := givenHeaderToAddNextTo(longestChainTip) cs := createChainsService(serviceSetup{Repositories: &r}) - //when + // when header, addErr := cs.Add(h) - //then + // then assert.NoError(t, addErr) assertHeaderExist(t, header) assertHeaderInDb(t, r, header) @@ -62,16 +62,16 @@ func TestAddTheHeaderToLongestChain(t *testing.T) { } func TestAddOrphanHeaderToChain(t *testing.T) { - //given + // given r, _ := givenLongestChainInRepository() h := givenOrphanedHeaderToAdd() cs := createChainsService(serviceSetup{Repositories: &r}) - //when + // when header, addErr := cs.Add(h) - //then + // then assert.NoError(t, addErr) assertHeaderExist(t, header) assertHeaderInDb(t, r, header) @@ -90,17 +90,17 @@ func TestAddOrphanHeaderToChain(t *testing.T) { } func TestAddHeaderToOrphanChain(t *testing.T) { - //given + // given r, _ := givenLongestChainInRepository() tip := givenOrphanChainInRepository(&r) h := givenHeaderToAddNextTo(tip) cs := createChainsService(serviceSetup{Repositories: &r}) - //when + // when header, addErr := cs.Add(h) - //then + // then assert.NoError(t, addErr) assertHeaderExist(t, header) assertHeaderInDb(t, r, header) @@ -119,34 +119,24 @@ func TestAddHeaderToOrphanChain(t *testing.T) { } func TestAddHeaderThatAlreadyExist(t *testing.T) { - //given + // given r, tip := givenLongestChainInRepository() h := fixtures.BlockHeaderSourceOf(tip) cs := createChainsService(serviceSetup{Repositories: &r}) - //when + // when header, addErr := cs.Add(*h) - //then - assert.NoError(t, addErr) - assertHeaderExist(t, header) - assertHeaderInDb(t, r, header) - - if header.State != "LONGEST_CHAIN" { - t.Errorf("Header should belong to the longest chain but is %s", header.State) - } - - if !header.IsLongestChain() { - t.Error("Header should be marked as longest chain but is not") - } - - assertOnlyOneHeaderOnHeight(t, r, header) + // then + assert.Equal(t, HeaderAlreadyExists.Is(addErr), true) + assert.Equal(t, header, nil) + assertOnlyOneHeaderOnHeight(t, r, tip) } func TestAddConcurrentChainBlock(t *testing.T) { - var blockFromLongestChain = fixtures.HashHeight1 - var blockFromStaleChain = fixtures.StaleHashHeight2 + blockFromLongestChain := fixtures.HashHeight1 + blockFromStaleChain := fixtures.StaleHashHeight2 const bitsExceedingCumulatedChainWork uint32 = 0x180f0dc7 testCases := map[string]struct { @@ -216,10 +206,10 @@ func TestAddConcurrentChainBlock(t *testing.T) { cs := createChainsService(serviceSetup{Repositories: &r}) - //when + // when header, addErr := cs.Add(h) - //then + // then assert.NoError(t, addErr) assertHeaderExist(t, header) diff --git a/transports/p2p/p2psync/manager.go b/transports/p2p/p2psync/manager.go index dfa2a6f8..889b6a6b 100644 --- a/transports/p2p/p2psync/manager.go +++ b/transports/p2p/p2psync/manager.go @@ -424,7 +424,6 @@ func (sm *SyncManager) handleCheckSyncPeer() { // topBlock returns the best chains top block height. func (sm *SyncManager) topBlock() int32 { - if sm.syncPeer.LastBlock() > sm.syncPeer.StartingHeight() { return sm.syncPeer.LastBlock() } @@ -531,6 +530,10 @@ func (sm *SyncManager) handleHeadersMsg(hmsg *headersMsg) { for _, blockHeader := range msg.Headers { h, addErr := sm.Services.Chains.Add(domains.BlockHeaderSource(*blockHeader)) + if service.HeaderAlreadyExists.Is(addErr) { + continue + } + if service.BlockRejected.Is(addErr) { sm.peerNotifier.BanPeer(peer) peer.Disconnect() @@ -570,6 +573,13 @@ func (sm *SyncManager) handleHeadersMsg(hmsg *headersMsg) { } } + // If all the headers received where rejected or already in the database, + // don't request more headers from that peer. Do nothing. + if finalHash == nil { + sm.log.Warn().Msgf("Received only existing or rejected headers from peer: %s", peer.String()) + return + } + // When this header is a checkpoint, switch to fetching the blocks for // all the headers since the last checkpoint. if receivedCheckpoint { @@ -606,7 +616,6 @@ func (sm *SyncManager) handleHeadersMsg(hmsg *headersMsg) { // headers starting from the latest known header and ending with the // next checkpoint. sm.sendGetHeadersWithPassedParams([]*chainhash.Hash{finalHash}, sm.nextCheckpoint.Hash, peer) - } func (sm *SyncManager) requestForNextHeaderBatch(prevHash *chainhash.Hash, peer *peerpkg.Peer, prevHeight int32) { @@ -709,7 +718,6 @@ func (sm *SyncManager) handleInvMsg(imsg *invMsg) { } if lastBlock != -1 { - lastHeader := sm.Services.Headers.GetTip() sm.log.Info().Msgf("[Manager] handleInvMsg lastConfirmedHeaderNode.hash : %s", lastHeader.Hash) sm.log.Info().Msgf("[Manager] handleInvMsg lastConfirmedHeaderNode.height : %d", lastHeader.Height) sm.log.Info().Msgf("[Manager] handleInvMsg &invVects[lastBlock].Hash : %v", &invVects[lastBlock].Hash)