Skip to content

Commit

Permalink
feat(BUX-631): prevent unneeded header sync (#220)
Browse files Browse the repository at this point in the history
  • Loading branch information
kuba-4chain authored Mar 12, 2024
1 parent b997f68 commit 73b072b
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 41 deletions.
22 changes: 16 additions & 6 deletions service/chain_service.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package service

import (
"strings"

"github.com/bitcoin-sv/block-headers-service/domains"
"github.com/bitcoin-sv/block-headers-service/internal/chaincfg"
"github.com/bitcoin-sv/block-headers-service/internal/chaincfg/chainhash"
Expand All @@ -18,7 +20,7 @@ type BlockHasher interface {

// Notification is "port" through which chain service can notify clients about important events.
type Notification interface {
//Notify notifies about new header stored.
// Notify notifies about new header stored.
Notify(any)
}

Expand Down Expand Up @@ -60,6 +62,11 @@ func NewChainsService(
func (cs *chainService) Add(bs domains.BlockHeaderSource) (*domains.BlockHeader, error) {
hash := cs.BlockHasher.BlockHash(&bs)

existingHeader, err := cs.Headers.GetHeaderByHash(hash.String())
if existingHeader != nil {
return nil, HeaderAlreadyExists.error()
}

if cs.ignoreBlockHash(&hash) {
cs.log.Warn().Msgf("Message rejected - containing forbidden header")
return domains.NewRejectedBlockHeader(hash), BlockRejected.error()
Expand Down Expand Up @@ -236,17 +243,20 @@ type AddBlockError struct {
type AddBlockErrorCode string

const (
//BlockRejected error code representing situation when block is on the blacklist.
// BlockRejected error code representing situation when block is on the blacklist.
BlockRejected AddBlockErrorCode = "BlockRejected"

//HeaderCreationFail error code representing situation when block cannot be created from source.
// HeaderCreationFail error code representing situation when block cannot be created from source.
HeaderCreationFail AddBlockErrorCode = "HeaderCreationFail"

//ChainUpdateFail error code representing situation when STALE chain should become Longest chain but the update of chains failed.
// ChainUpdateFail error code representing situation when STALE chain should become Longest chain but the update of chains failed.
ChainUpdateFail AddBlockErrorCode = "ChainUpdateFail"

//HeaderSaveFail error code representing situation when saving header in the repository failed.
// HeaderSaveFail error code representing situation when saving header in the repository failed.
HeaderSaveFail AddBlockErrorCode = "HeaderSaveFail"

// HeaderAlreadyExists error code representing situation when header received from peers already exists in db.
HeaderAlreadyExists AddBlockErrorCode = "HeaderAlreadyExists"
)

func (e *AddBlockError) Error() string {
Expand All @@ -272,5 +282,5 @@ func (c AddBlockErrorCode) causedBy(cause *error) error {

// Is checks if given error contains AddBlockErrorCode.
func (c AddBlockErrorCode) Is(err error) bool {
return err != nil && err.Error() == c.String()
return err != nil && strings.Contains(err.Error(), c.String())
}
54 changes: 22 additions & 32 deletions service/chain_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,16 @@ import (
)

func TestRejectBlockHeader(t *testing.T) {
//given
// given
r, longestChainTip := givenLongestChainInRepository()
h, hash := givenIgnoredHeaderToAddNextTo(longestChainTip)

cs := createChainsService(serviceSetup{Repositories: &r, IgnoredHash: hash})

//when
// when
header, err := cs.Add(h)

//then
// then
assert.IsError(t, err, BlockRejected.String())

assertHeaderExist(t, header)
Expand All @@ -34,16 +34,16 @@ func TestRejectBlockHeader(t *testing.T) {
}

func TestAddTheHeaderToLongestChain(t *testing.T) {
//given
// given
r, longestChainTip := givenChainWithOnlyGenesisBlockInRepository()
h := givenHeaderToAddNextTo(longestChainTip)

cs := createChainsService(serviceSetup{Repositories: &r})

//when
// when
header, addErr := cs.Add(h)

//then
// then
assert.NoError(t, addErr)
assertHeaderExist(t, header)
assertHeaderInDb(t, r, header)
Expand All @@ -62,16 +62,16 @@ func TestAddTheHeaderToLongestChain(t *testing.T) {
}

func TestAddOrphanHeaderToChain(t *testing.T) {
//given
// given
r, _ := givenLongestChainInRepository()
h := givenOrphanedHeaderToAdd()

cs := createChainsService(serviceSetup{Repositories: &r})

//when
// when
header, addErr := cs.Add(h)

//then
// then
assert.NoError(t, addErr)
assertHeaderExist(t, header)
assertHeaderInDb(t, r, header)
Expand All @@ -90,17 +90,17 @@ func TestAddOrphanHeaderToChain(t *testing.T) {
}

func TestAddHeaderToOrphanChain(t *testing.T) {
//given
// given
r, _ := givenLongestChainInRepository()
tip := givenOrphanChainInRepository(&r)
h := givenHeaderToAddNextTo(tip)

cs := createChainsService(serviceSetup{Repositories: &r})

//when
// when
header, addErr := cs.Add(h)

//then
// then
assert.NoError(t, addErr)
assertHeaderExist(t, header)
assertHeaderInDb(t, r, header)
Expand All @@ -119,34 +119,24 @@ func TestAddHeaderToOrphanChain(t *testing.T) {
}

func TestAddHeaderThatAlreadyExist(t *testing.T) {
//given
// given
r, tip := givenLongestChainInRepository()
h := fixtures.BlockHeaderSourceOf(tip)

cs := createChainsService(serviceSetup{Repositories: &r})

//when
// when
header, addErr := cs.Add(*h)

//then
assert.NoError(t, addErr)
assertHeaderExist(t, header)
assertHeaderInDb(t, r, header)

if header.State != "LONGEST_CHAIN" {
t.Errorf("Header should belong to the longest chain but is %s", header.State)
}

if !header.IsLongestChain() {
t.Error("Header should be marked as longest chain but is not")
}

assertOnlyOneHeaderOnHeight(t, r, header)
// then
assert.Equal(t, HeaderAlreadyExists.Is(addErr), true)
assert.Equal(t, header, nil)
assertOnlyOneHeaderOnHeight(t, r, tip)
}

func TestAddConcurrentChainBlock(t *testing.T) {
var blockFromLongestChain = fixtures.HashHeight1
var blockFromStaleChain = fixtures.StaleHashHeight2
blockFromLongestChain := fixtures.HashHeight1
blockFromStaleChain := fixtures.StaleHashHeight2
const bitsExceedingCumulatedChainWork uint32 = 0x180f0dc7

testCases := map[string]struct {
Expand Down Expand Up @@ -216,10 +206,10 @@ func TestAddConcurrentChainBlock(t *testing.T) {

cs := createChainsService(serviceSetup{Repositories: &r})

//when
// when
header, addErr := cs.Add(h)

//then
// then
assert.NoError(t, addErr)
assertHeaderExist(t, header)

Expand Down
14 changes: 11 additions & 3 deletions transports/p2p/p2psync/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,6 @@ func (sm *SyncManager) handleCheckSyncPeer() {

// topBlock returns the best chains top block height.
func (sm *SyncManager) topBlock() int32 {

if sm.syncPeer.LastBlock() > sm.syncPeer.StartingHeight() {
return sm.syncPeer.LastBlock()
}
Expand Down Expand Up @@ -531,6 +530,10 @@ func (sm *SyncManager) handleHeadersMsg(hmsg *headersMsg) {
for _, blockHeader := range msg.Headers {
h, addErr := sm.Services.Chains.Add(domains.BlockHeaderSource(*blockHeader))

if service.HeaderAlreadyExists.Is(addErr) {
continue
}

if service.BlockRejected.Is(addErr) {
sm.peerNotifier.BanPeer(peer)
peer.Disconnect()
Expand Down Expand Up @@ -570,6 +573,13 @@ func (sm *SyncManager) handleHeadersMsg(hmsg *headersMsg) {
}
}

// If all the headers received where rejected or already in the database,
// don't request more headers from that peer. Do nothing.
if finalHash == nil {
sm.log.Warn().Msgf("Received only existing or rejected headers from peer: %s", peer.String())
return
}

// When this header is a checkpoint, switch to fetching the blocks for
// all the headers since the last checkpoint.
if receivedCheckpoint {
Expand Down Expand Up @@ -606,7 +616,6 @@ func (sm *SyncManager) handleHeadersMsg(hmsg *headersMsg) {
// headers starting from the latest known header and ending with the
// next checkpoint.
sm.sendGetHeadersWithPassedParams([]*chainhash.Hash{finalHash}, sm.nextCheckpoint.Hash, peer)

}

func (sm *SyncManager) requestForNextHeaderBatch(prevHash *chainhash.Hash, peer *peerpkg.Peer, prevHeight int32) {
Expand Down Expand Up @@ -709,7 +718,6 @@ func (sm *SyncManager) handleInvMsg(imsg *invMsg) {
}

if lastBlock != -1 {
lastHeader := sm.Services.Headers.GetTip()
sm.log.Info().Msgf("[Manager] handleInvMsg lastConfirmedHeaderNode.hash : %s", lastHeader.Hash)
sm.log.Info().Msgf("[Manager] handleInvMsg lastConfirmedHeaderNode.height : %d", lastHeader.Height)
sm.log.Info().Msgf("[Manager] handleInvMsg &invVects[lastBlock].Hash : %v", &invVects[lastBlock].Hash)
Expand Down

0 comments on commit 73b072b

Please sign in to comment.