Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(BUX-631): prevent unneeded header sync #220

Merged
merged 11 commits into from
Mar 12, 2024
22 changes: 16 additions & 6 deletions service/chain_service.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package service

import (
"strings"

"github.com/bitcoin-sv/block-headers-service/domains"
"github.com/bitcoin-sv/block-headers-service/internal/chaincfg"
"github.com/bitcoin-sv/block-headers-service/internal/chaincfg/chainhash"
Expand All @@ -18,7 +20,7 @@ type BlockHasher interface {

// Notification is "port" through which chain service can notify clients about important events.
type Notification interface {
//Notify notifies about new header stored.
// Notify notifies about new header stored.
Notify(any)
}

Expand Down Expand Up @@ -60,6 +62,11 @@ func NewChainsService(
func (cs *chainService) Add(bs domains.BlockHeaderSource) (*domains.BlockHeader, error) {
hash := cs.BlockHasher.BlockHash(&bs)

existingHeader, err := cs.Headers.GetHeaderByHash(hash.String())
if existingHeader != nil {
return nil, HeaderAlreadyExists.error()
}

if cs.ignoreBlockHash(&hash) {
cs.log.Warn().Msgf("Message rejected - containing forbidden header")
return domains.NewRejectedBlockHeader(hash), BlockRejected.error()
Expand Down Expand Up @@ -236,17 +243,20 @@ type AddBlockError struct {
type AddBlockErrorCode string

const (
//BlockRejected error code representing situation when block is on the blacklist.
// BlockRejected error code representing situation when block is on the blacklist.
BlockRejected AddBlockErrorCode = "BlockRejected"

//HeaderCreationFail error code representing situation when block cannot be created from source.
// HeaderCreationFail error code representing situation when block cannot be created from source.
HeaderCreationFail AddBlockErrorCode = "HeaderCreationFail"

//ChainUpdateFail error code representing situation when STALE chain should become Longest chain but the update of chains failed.
// ChainUpdateFail error code representing situation when STALE chain should become Longest chain but the update of chains failed.
ChainUpdateFail AddBlockErrorCode = "ChainUpdateFail"

//HeaderSaveFail error code representing situation when saving header in the repository failed.
// HeaderSaveFail error code representing situation when saving header in the repository failed.
HeaderSaveFail AddBlockErrorCode = "HeaderSaveFail"

// HeaderAlreadyExists error code representing situation when header received from peers already exists in db.
HeaderAlreadyExists AddBlockErrorCode = "HeaderAlreadyExists"
)

func (e *AddBlockError) Error() string {
Expand All @@ -272,5 +282,5 @@ func (c AddBlockErrorCode) causedBy(cause *error) error {

// Is checks if given error contains AddBlockErrorCode.
func (c AddBlockErrorCode) Is(err error) bool {
return err != nil && err.Error() == c.String()
return err != nil && strings.Contains(err.Error(), c.String())
}
54 changes: 22 additions & 32 deletions service/chain_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,16 @@ import (
)

func TestRejectBlockHeader(t *testing.T) {
//given
// given
r, longestChainTip := givenLongestChainInRepository()
h, hash := givenIgnoredHeaderToAddNextTo(longestChainTip)

cs := createChainsService(serviceSetup{Repositories: &r, IgnoredHash: hash})

//when
// when
header, err := cs.Add(h)

//then
// then
assert.IsError(t, err, BlockRejected.String())

assertHeaderExist(t, header)
Expand All @@ -34,16 +34,16 @@ func TestRejectBlockHeader(t *testing.T) {
}

func TestAddTheHeaderToLongestChain(t *testing.T) {
//given
// given
r, longestChainTip := givenChainWithOnlyGenesisBlockInRepository()
h := givenHeaderToAddNextTo(longestChainTip)

cs := createChainsService(serviceSetup{Repositories: &r})

//when
// when
header, addErr := cs.Add(h)

//then
// then
assert.NoError(t, addErr)
assertHeaderExist(t, header)
assertHeaderInDb(t, r, header)
Expand All @@ -62,16 +62,16 @@ func TestAddTheHeaderToLongestChain(t *testing.T) {
}

func TestAddOrphanHeaderToChain(t *testing.T) {
//given
// given
r, _ := givenLongestChainInRepository()
h := givenOrphanedHeaderToAdd()

cs := createChainsService(serviceSetup{Repositories: &r})

//when
// when
header, addErr := cs.Add(h)

//then
// then
assert.NoError(t, addErr)
assertHeaderExist(t, header)
assertHeaderInDb(t, r, header)
Expand All @@ -90,17 +90,17 @@ func TestAddOrphanHeaderToChain(t *testing.T) {
}

func TestAddHeaderToOrphanChain(t *testing.T) {
//given
// given
r, _ := givenLongestChainInRepository()
tip := givenOrphanChainInRepository(&r)
h := givenHeaderToAddNextTo(tip)

cs := createChainsService(serviceSetup{Repositories: &r})

//when
// when
header, addErr := cs.Add(h)

//then
// then
assert.NoError(t, addErr)
assertHeaderExist(t, header)
assertHeaderInDb(t, r, header)
Expand All @@ -119,34 +119,24 @@ func TestAddHeaderToOrphanChain(t *testing.T) {
}

func TestAddHeaderThatAlreadyExist(t *testing.T) {
//given
// given
r, tip := givenLongestChainInRepository()
h := fixtures.BlockHeaderSourceOf(tip)

cs := createChainsService(serviceSetup{Repositories: &r})

//when
// when
header, addErr := cs.Add(*h)

//then
assert.NoError(t, addErr)
assertHeaderExist(t, header)
assertHeaderInDb(t, r, header)

if header.State != "LONGEST_CHAIN" {
t.Errorf("Header should belong to the longest chain but is %s", header.State)
}

if !header.IsLongestChain() {
t.Error("Header should be marked as longest chain but is not")
}

assertOnlyOneHeaderOnHeight(t, r, header)
// then
assert.Equal(t, HeaderAlreadyExists.Is(addErr), true)
assert.Equal(t, header, nil)
assertOnlyOneHeaderOnHeight(t, r, tip)
}

func TestAddConcurrentChainBlock(t *testing.T) {
var blockFromLongestChain = fixtures.HashHeight1
var blockFromStaleChain = fixtures.StaleHashHeight2
blockFromLongestChain := fixtures.HashHeight1
blockFromStaleChain := fixtures.StaleHashHeight2
const bitsExceedingCumulatedChainWork uint32 = 0x180f0dc7

testCases := map[string]struct {
Expand Down Expand Up @@ -216,10 +206,10 @@ func TestAddConcurrentChainBlock(t *testing.T) {

cs := createChainsService(serviceSetup{Repositories: &r})

//when
// when
header, addErr := cs.Add(h)

//then
// then
assert.NoError(t, addErr)
assertHeaderExist(t, header)

Expand Down
14 changes: 11 additions & 3 deletions transports/p2p/p2psync/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,6 @@ func (sm *SyncManager) handleCheckSyncPeer() {

// topBlock returns the best chains top block height.
func (sm *SyncManager) topBlock() int32 {

if sm.syncPeer.LastBlock() > sm.syncPeer.StartingHeight() {
return sm.syncPeer.LastBlock()
}
Expand Down Expand Up @@ -531,6 +530,10 @@ func (sm *SyncManager) handleHeadersMsg(hmsg *headersMsg) {
for _, blockHeader := range msg.Headers {
h, addErr := sm.Services.Chains.Add(domains.BlockHeaderSource(*blockHeader))

if service.HeaderAlreadyExists.Is(addErr) {
continue
}

if service.BlockRejected.Is(addErr) {
sm.peerNotifier.BanPeer(peer)
peer.Disconnect()
Expand Down Expand Up @@ -570,6 +573,13 @@ func (sm *SyncManager) handleHeadersMsg(hmsg *headersMsg) {
}
}

// If all the headers received where rejected or already in the database,
// don't request more headers from that peer. Do nothing.
if finalHash == nil {
sm.log.Warn().Msgf("Received only existing or rejected headers from peer: %s", peer.String())
return
}

// When this header is a checkpoint, switch to fetching the blocks for
// all the headers since the last checkpoint.
if receivedCheckpoint {
Expand Down Expand Up @@ -606,7 +616,6 @@ func (sm *SyncManager) handleHeadersMsg(hmsg *headersMsg) {
// headers starting from the latest known header and ending with the
// next checkpoint.
sm.sendGetHeadersWithPassedParams([]*chainhash.Hash{finalHash}, sm.nextCheckpoint.Hash, peer)
dorzepowski marked this conversation as resolved.
Show resolved Hide resolved

}

func (sm *SyncManager) requestForNextHeaderBatch(prevHash *chainhash.Hash, peer *peerpkg.Peer, prevHeight int32) {
Expand Down Expand Up @@ -709,7 +718,6 @@ func (sm *SyncManager) handleInvMsg(imsg *invMsg) {
}

if lastBlock != -1 {
lastHeader := sm.Services.Headers.GetTip()
sm.log.Info().Msgf("[Manager] handleInvMsg lastConfirmedHeaderNode.hash : %s", lastHeader.Hash)
sm.log.Info().Msgf("[Manager] handleInvMsg lastConfirmedHeaderNode.height : %d", lastHeader.Height)
sm.log.Info().Msgf("[Manager] handleInvMsg &invVects[lastBlock].Hash : %v", &invVects[lastBlock].Hash)
Expand Down
Loading