Skip to content

Commit

Permalink
Merge branch 'main' into dependabot/go_modules/master/github.com/test…
Browse files Browse the repository at this point in the history
…containers/testcontainers-go-0.30.0
  • Loading branch information
Nazarii-4chain authored Apr 18, 2024
2 parents d1ac68f + 575a8a3 commit 906b025
Show file tree
Hide file tree
Showing 8 changed files with 114 additions and 45 deletions.
2 changes: 1 addition & 1 deletion .github/dependabot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
version: 2
updates:
- package-ecosystem: "gomod"
target-branch: "master"
target-branch: "main"
directory: "/"
schedule:
interval: "daily"
Expand Down
2 changes: 1 addition & 1 deletion .github/mergify.yml
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ pull_request_rules:

- name: Close stale pull request
conditions:
- base=master
- base=main
- -closed
- updated-at<21 days ago
actions:
Expand Down
8 changes: 6 additions & 2 deletions .github/workflows/codeql-analysis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,14 @@ name: "CodeQL"

on:
push:
branches: [master]
branches:
- master
- main
pull_request:
# The branches below must be a subset of the branches above
branches: [master]
branches:
- master
- main
schedule:
- cron: '0 23 * * 0'

Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ Block header service is a go service which connects into BSV P2P network to gath
#### Main functionality
The main functionality of the application is synchornization with peers and collecting all headers. After starting the server, it creates default objects and connects to BSV P2P network. Application has defined checkpoints (specific headers) which are used in synchronization. During this process, server is asking peers for headers (from checkpoint to checkpoint) in batches of 2000. Every header received from peers is saved in memory. After full synchronization, server is changing the operating mode and start to listening for new header. After when new block has been mined, this information should be sended from peers to our server.

For in-depth information and guidance, please refer to the [SPV Wallet Documentation](https://bsvblockchain.gitbook.io/docs).

## How to use it

### Docker image
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ require (
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.17.7
github.com/klauspost/compress v1.17.8
github.com/klauspost/cpuid/v2 v2.2.6 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/leodido/go-urn v1.2.4 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,8 @@ github.com/kinbiko/jsonassert v1.1.1 h1:DB12divY+YB+cVpHULLuKePSi6+ui4M/shHSzJIS
github.com/kinbiko/jsonassert v1.1.1/go.mod h1:NO4lzrogohtIdNUNzx8sdzB55M4R4Q1bsrWVdqQ7C+A=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.17.7 h1:ehO88t2UGzQK66LMdE8tibEd1ErmzZjNEqWkjLAKQQg=
github.com/klauspost/compress v1.17.7/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU=
github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.2.6 h1:ndNyv040zDGIDh8thGkXYjnFtiN02M1PVVF+JE/48xc=
github.com/klauspost/cpuid/v2 v2.2.6/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
Expand Down
137 changes: 100 additions & 37 deletions internal/transports/p2p/peer/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,25 @@ import (
)

type Peer struct {
conn net.Conn
addr *net.TCPAddr
cfg *config.P2PConfig
chainParams *chaincfg.Params
checkpoints []chaincfg.Checkpoint
nextCheckpoint *chaincfg.Checkpoint
headersService service.Headers
chainService service.Chains
log *zerolog.Logger
services wire.ServiceFlag
protocolVersion uint32
nonce uint64
lastBlock int32
timeOffset int64
userAgent string
synced bool
conn net.Conn
addr *net.TCPAddr
cfg *config.P2PConfig
chainParams *chaincfg.Params
checkpoints []chaincfg.Checkpoint
nextCheckpoint *chaincfg.Checkpoint
headersService service.Headers
chainService service.Chains
log *zerolog.Logger
services wire.ServiceFlag
protocolVersion uint32
nonce uint64
latestHeight int32
latestHash *chainhash.Hash
latestStatsMutex sync.RWMutex
timeOffset int64
userAgent string
syncedCheckpoints bool
sendHeadersMode bool

wg sync.WaitGroup
msgChan chan wire.Message
Expand Down Expand Up @@ -71,21 +74,21 @@ func NewPeer(
nextCheckpoint := findNextHeaderCheckpoint(chainParams.Checkpoints, currentTipHeight)

peer := &Peer{
addr: netAddr,
cfg: cfg,
chainParams: chainParams,
headersService: headersService,
chainService: chainService,
checkpoints: chainParams.Checkpoints,
nextCheckpoint: nextCheckpoint,
log: log,
services: wire.SFspv,
protocolVersion: initialProtocolVersion,
synced: nextCheckpoint == nil,
wg: sync.WaitGroup{},
msgChan: make(chan wire.Message, writeMsgChannelBufferSize),
quitting: false,
quit: make(chan struct{}),
addr: netAddr,
cfg: cfg,
chainParams: chainParams,
headersService: headersService,
chainService: chainService,
checkpoints: chainParams.Checkpoints,
nextCheckpoint: nextCheckpoint,
log: log,
services: wire.SFspv,
protocolVersion: initialProtocolVersion,
syncedCheckpoints: nextCheckpoint == nil,
wg: sync.WaitGroup{},
msgChan: make(chan wire.Message, writeMsgChannelBufferSize),
quitting: false,
quit: make(chan struct{}),
}
return peer, nil
}
Expand Down Expand Up @@ -132,6 +135,7 @@ func (p *Peer) StartHeadersSync() error {

currentTipHeight := p.headersService.GetTipHeight()
p.setNextHeaderCheckpoint(currentTipHeight)
p.sendHeadersMode = false

err := p.requestHeaders()
if err != nil {
Expand All @@ -145,13 +149,11 @@ func (p *Peer) StartHeadersSync() error {
func (p *Peer) setNextHeaderCheckpoint(height int32) {
p.nextCheckpoint = findNextHeaderCheckpoint(p.checkpoints, height)
if p.nextCheckpoint == nil {
p.synced = true
p.syncedCheckpoints = true
}
}

func (p *Peer) requestHeaders() error {
p.log.Info().Msgf("requesting headers from peer %s", p)

var err error
if p.nextCheckpoint != nil {
p.log.Info().Msgf("requesting next headers batch from peer %s, up to height %d", p, p.nextCheckpoint.Height)
Expand Down Expand Up @@ -393,7 +395,7 @@ func (p *Peer) handleVersionMessage(msg *wire.MsgVersion) error {

p.protocolVersion = min(p.protocolVersion, uint32(msg.ProtocolVersion))
p.services = msg.Services
p.lastBlock = msg.LastBlock
p.latestHeight = msg.LastBlock
p.timeOffset = msg.Timestamp.Unix() - time.Now().Unix()
p.userAgent = msg.UserAgent

Expand Down Expand Up @@ -421,13 +423,14 @@ func (p *Peer) requireVerAckReceived(remoteMsg wire.Message) (*wire.MsgVerAck, e
func (p *Peer) handlePingMsg(msg *wire.MsgPing) {
p.log.Info().Msgf("received ping from peer %s with nonce: %d", p, msg.Nonce)
if p.protocolVersion > wire.BIP0031Version {
p.log.Info().Msgf("sending pong to peer %s with nonce: %d", p, msg.Nonce)
p.queueMessage(wire.NewMsgPong(msg.Nonce))
}
}

func (p *Peer) handleInvMsg(msg *wire.MsgInv) {
p.log.Info().Msgf("received inv msg from peer %s", p)
if !p.synced {
if !p.syncedCheckpoints {
p.log.Info().Msgf("we are still syncing, ignoring inv msg from peer %s", p)
return
}
Expand All @@ -438,13 +441,13 @@ func (p *Peer) handleInvMsg(msg *wire.MsgInv) {
return
}

// if last header from inv msg is already in database, ignore
lastBlockHash := &msg.InvList[lastBlock].Hash
_, err := p.headersService.GetHeightByHash(lastBlockHash)
if err == nil {
p.log.Info().Msgf("blocks from inv msg from peer %s already existsing in db", p)
return
}
p.updateLatestStats(0, lastBlockHash)

p.log.Info().Msgf("requesting new headers from peer %s", p)
err = p.writeGetHeadersMsg(lastBlockHash)
Expand All @@ -454,9 +457,12 @@ func (p *Peer) handleInvMsg(msg *wire.MsgInv) {
}

func (p *Peer) handleHeadersMsg(msg *wire.MsgHeaders) {
p.log.Info().Msgf("received headers msg from peer %s", p)

receivedCheckpoint := false
lastHeight := int32(0)
headersReceived := 0
var lastHash *chainhash.Hash

for _, header := range msg.Headers {
h, err := p.chainService.Add(domains.BlockHeaderSource(*header))
Expand Down Expand Up @@ -488,6 +494,15 @@ func (p *Peer) handleHeadersMsg(msg *wire.MsgHeaders) {
}
}

if !h.IsLongestChain() {
// TODO: ban peer or lower sync score
p.log.Warn().Msgf(
"received header with hash: %s that's not a part of the longest chain, from peer %s",
h.Hash.String(), p,
)
continue
}

receivedCheckpoint, err = p.verifyCheckpointReached(h, receivedCheckpoint)
if err != nil {
// TODO: ban peer or lower peer sync score
Expand All @@ -496,6 +511,7 @@ func (p *Peer) handleHeadersMsg(msg *wire.MsgHeaders) {
}

lastHeight = h.Height
lastHash = &h.Hash
headersReceived += 1
}

Expand All @@ -509,6 +525,18 @@ func (p *Peer) handleHeadersMsg(msg *wire.MsgHeaders) {
headersReceived, p, lastHeight,
)

p.updateLatestStats(lastHeight, lastHash)

if p.sendHeadersMode {
return
}

if p.isSynced() {
p.log.Info().Msgf("synced with the tip of chain from peer %s", p)
p.switchToSendHeadersMode()
return
}

if receivedCheckpoint {
p.setNextHeaderCheckpoint(p.nextCheckpoint.Height)
}
Expand Down Expand Up @@ -539,6 +567,41 @@ func (p *Peer) verifyCheckpointReached(h *domains.BlockHeader, receivedCheckpoin
return receivedCheckpoint, nil
}

func (p *Peer) switchToSendHeadersMode() {
if !p.sendHeadersMode && p.protocolVersion >= wire.SendHeadersVersion {
p.log.Info().Msgf("switching to send headers mode - requesting peer %s to send us headers directly instead of inv msg", p)
p.queueMessage(wire.NewMsgSendHeaders())
p.sendHeadersMode = true
}
}

func (p *Peer) updateLatestStats(lastHeight int32, lastHash *chainhash.Hash) {
p.latestStatsMutex.Lock()
defer p.latestStatsMutex.Unlock()

if lastHeight > p.latestHeight {
p.latestHeight = lastHeight
}
if lastHash.IsEqual(p.latestHash) {
p.latestHash = nil
}
}

func (p *Peer) getLatestStats() (lastHeight int32, lastHash *chainhash.Hash) {
p.latestStatsMutex.RLock()
defer p.latestStatsMutex.RUnlock()

return p.latestHeight, p.latestHash
}

func (p *Peer) isSynced() bool {
tipHeight := p.headersService.GetTipHeight()
latestHeight, latestHash := p.getLatestStats()
noNewHash := latestHash.IsEqual(nil)

return noNewHash && latestHeight == tipHeight
}

func (p *Peer) String() string {
return p.addr.String()
}
2 changes: 1 addition & 1 deletion release/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ ARG version
ARG gh_repository=$repository_owner/$project_name
ARG tag=${version:+v$version}
ENV VERSION=${version:-develop}
ENV TAG=${tag:-master}
ENV TAG=${tag:-main}
ENV PRELOADED_DB_URL="https://raw.githubusercontent.com/${gh_repository}/${TAG}/data/blockheaders.csv.gz"

VOLUME ["/app/data"]
Expand Down

0 comments on commit 906b025

Please sign in to comment.