Skip to content

Commit

Permalink
misc(p2p/peerTracker): extend conditions for peers handling
Browse files Browse the repository at this point in the history
  • Loading branch information
vgonkivs committed Apr 1, 2024
1 parent 0c5d8c2 commit d1f04df
Show file tree
Hide file tree
Showing 10 changed files with 205 additions and 217 deletions.
12 changes: 12 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,24 @@ require (
github.com/benbjohnson/clock v1.3.5 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/containerd/cgroups v1.1.0 // indirect
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/elastic/gosigar v0.14.2 // indirect
github.com/flynn/noise v1.0.0 // indirect
github.com/francoispqt/gojay v1.2.13 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect
github.com/godbus/dbus/v5 v5.1.0 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/gopacket v1.1.19 // indirect
github.com/google/pprof v0.0.0-20231023181126-ff6d637d2a7b // indirect
github.com/google/uuid v1.5.0 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect
github.com/huin/goupnp v1.3.0 // indirect
github.com/ipfs/go-cid v0.4.1 // indirect
Expand Down Expand Up @@ -73,6 +79,9 @@ require (
github.com/multiformats/go-multistream v0.5.0 // indirect
github.com/multiformats/go-varint v0.0.7 // indirect
github.com/onsi/ginkgo/v2 v2.13.0 // indirect
github.com/opencontainers/runtime-spec v1.1.0 // indirect
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.14.0 // indirect
github.com/prometheus/client_model v0.4.0 // indirect
Expand All @@ -82,7 +91,10 @@ require (
github.com/quic-go/qtls-go1-20 v0.3.4 // indirect
github.com/quic-go/quic-go v0.39.4 // indirect
github.com/quic-go/webtransport-go v0.6.0 // indirect
github.com/raulk/go-watchdog v1.3.0 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
go.uber.org/dig v1.17.1 // indirect
go.uber.org/fx v1.20.1 // indirect
go.uber.org/mock v0.3.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.26.0 // indirect
Expand Down
9 changes: 8 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -120,16 +120,17 @@ github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE
github.com/containerd/cgroups v0.0.0-20201119153540-4cbc285b3327/go.mod h1:ZJeTFisyysqgcCdecO57Dj79RfL0LNeGiFUqLYQRYLE=
github.com/containerd/cgroups v1.0.3/go.mod h1:/ofk34relqNjSGyqPrmEULrO4Sc8LJhvJmWbUCUKqj8=
github.com/containerd/cgroups v1.1.0 h1:v8rEWFl6EoqHB+swVNjVoCJE8o3jX7e8nqBGPLaDFBM=
github.com/containerd/cgroups v1.1.0/go.mod h1:6ppBcbh/NOOUU+dMKrykgaBnK9lCIBxHqJDGwsa1mIw=
github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk=
github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/coreos/go-systemd v0.0.0-20180511133405-39ca1b05acc7/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/coreos/go-systemd v0.0.0-20181012123002-c6f51f82210d h1:t5Wuyh53qYyg9eqn4BbnlIT+vmhyww0TatL+zT3uWgI=
github.com/coreos/go-systemd v0.0.0-20181012123002-c6f51f82210d/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/coreos/go-systemd/v22 v22.1.0/go.mod h1:xO0FLkIi5MaZafQlIrOotqXZ90ih+1atmu1JpKERPPk=
github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/coreos/go-systemd/v22 v22.5.0 h1:RrqgGjYQKalulkV8NGVIfkXQf6YYmOyiJKk8iXXhfZs=
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE=
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
Expand All @@ -152,6 +153,7 @@ github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZm
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4=
github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs=
Expand Down Expand Up @@ -698,6 +700,7 @@ github.com/onsi/gomega v1.27.10 h1:naR28SdDFlqrG6kScpT8VWpu1xWY5nJRCF3XaYyBjhI=
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7/go.mod h1:HzydrMdWErDVzsI23lYNej1Htcns9BCg93Dk0bBINWk=
github.com/opencontainers/runtime-spec v1.0.2/go.mod h1:jwyrGlmzljRJv/Fgzds9SsS/C5hL+LL3ko9hs6T5lQ0=
github.com/opencontainers/runtime-spec v1.1.0 h1:HHUyrt9mwHUjtasSbXSMvs4cyFxh+Bll4AjJ9odEGpg=
github.com/opencontainers/runtime-spec v1.1.0/go.mod h1:jwyrGlmzljRJv/Fgzds9SsS/C5hL+LL3ko9hs6T5lQ0=
github.com/opentracing-contrib/go-observer v0.0.0-20170622124052-a52f23424492/go.mod h1:Ngi6UdF0k5OKD5t5wlmGhe/EDKPoUM3BXZSSfIuJbis=
github.com/opentracing/basictracer-go v1.0.0/go.mod h1:QfBfYuafItcjQuMwinw9GhYKwFXS9KnPs5lxoYwgW74=
github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
Expand Down Expand Up @@ -779,6 +782,7 @@ github.com/quic-go/webtransport-go v0.6.0/go.mod h1:9KjU4AEBqEQidGHNDkZrb8CAa1ab
github.com/raulk/clock v1.1.0/go.mod h1:3MpVxdZ/ODBQDxbN+kzshf5OSZwPjtMDx6BBXBmOeY0=
github.com/raulk/go-watchdog v1.2.0/go.mod h1:lzSbAl5sh4rtI8tYHU01BWIDzgzqaQLj6RcA1i4mlqI=
github.com/raulk/go-watchdog v1.3.0 h1:oUmdlHxdkXRJlwfG0O9omj8ukerm8MEQavSiDTEtBsk=
github.com/raulk/go-watchdog v1.3.0/go.mod h1:fIvOnLbF0b0ZwkB9YU4mOW9Did//4vPZtDqv66NfsMU=
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
Expand Down Expand Up @@ -896,8 +900,11 @@ go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE=
go.uber.org/dig v1.17.1 h1:Tga8Lz8PcYNsWsyHMZ1Vm0OQOUaJNDyvPImgbAu9YSc=
go.uber.org/dig v1.17.1/go.mod h1:Us0rSJiThwCv2GteUN0Q7OKvU7n5J4dxZ9JKUXozFdE=
go.uber.org/fx v1.20.1 h1:zVwVQGS8zYvhh9Xxcu4w1M6ESyeMzebzj2NbSayZ4Mk=
go.uber.org/fx v1.20.1/go.mod h1:iSYNbHf2y55acNCwCXKx7LbWb5WG1Bnue5RDXz1OREg=
go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
go.uber.org/goleak v1.1.11-0.20210813005559-691160354723/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
Expand Down
8 changes: 4 additions & 4 deletions p2p/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,11 @@ func NewExchange[H header.Header[H]](
}
}

id := protocolID(params.networkID)
ex := &Exchange[H]{
host: host,
protocolID: protocolID(params.networkID),
peerTracker: newPeerTracker(host, gater, params.pidstore, metrics),
protocolID: id,
peerTracker: newPeerTracker(host, gater, params.networkID, params.pidstore, metrics),
Params: params,
metrics: metrics,
}
Expand All @@ -98,7 +99,6 @@ func (ex *Exchange[H]) Start(ctx context.Context) error {
ex.ctx, ex.cancel = context.WithCancel(context.Background())
log.Infow("client: starting client", "protocol ID", ex.protocolID)

go ex.peerTracker.gc()
go ex.peerTracker.track()

// bootstrap the peerTracker with trusted peers as well as previously seen
Expand Down Expand Up @@ -172,7 +172,7 @@ func (ex *Exchange[H]) Head(ctx context.Context, opts ...header.HeadOption[H]) (
trace.WithAttributes(attribute.String("peerID", from.String())),
)
defer newSpan.End()

headers, err := ex.request(reqCtx, from, headerReq)
if err != nil {
newSpan.SetStatus(codes.Error, err.Error())
Expand Down
70 changes: 49 additions & 21 deletions p2p/exchange_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@ import (

"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/sync"
"github.com/libp2p/go-libp2p"
libhost "github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
blankhost "github.com/libp2p/go-libp2p/p2p/host/blank"
"github.com/libp2p/go-libp2p/p2p/net/conngater"
"github.com/libp2p/go-libp2p/p2p/net/connmgr"
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
swarm "github.com/libp2p/go-libp2p/p2p/net/swarm/testing"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -242,7 +244,7 @@ func TestExchange_RequestFullRangeHeaders(t *testing.T) {
require.NoError(t, err)
servers[index].Start(context.Background()) //nolint:errcheck
exchange.peerTracker.peerLk.Lock()
exchange.peerTracker.trackedPeers[hosts[index].ID()] = &peerStat{peerID: hosts[index].ID()}
exchange.peerTracker.trackedPeers[hosts[index].ID()] = struct{}{}
exchange.peerTracker.peerLk.Unlock()
}

Expand All @@ -262,30 +264,38 @@ func TestExchange_RequestFullRangeHeaders(t *testing.T) {
// TestExchange_RequestHeadersFromAnotherPeer tests that the Exchange instance will request range
// from another peer with lower score after receiving header.ErrNotFound
func TestExchange_RequestHeadersFromAnotherPeer(t *testing.T) {
hosts := createMocknet(t, 3)
hosts := quicHosts(t, 3)

// create client + server(it does not have needed headers)
exchg, store := createP2PExAndServer(t, hosts[0], hosts[1])

serverSideStore := headertest.NewStore[*headertest.DummyHeader](t, headertest.NewTestSuite(t), 10)
tmServerSideStore := &timedOutStore{timeout: time.Millisecond * 200, Store: *serverSideStore}

hosts[0].ConnManager().TagPeer(hosts[1].ID(), string(protocolID(networkID)), 100)
hosts[0].ConnManager().TagPeer(hosts[2].ID(), string(protocolID(networkID)), 90)

// create one more server(with more headers in the store)
serverSideEx, err := NewExchangeServer[*headertest.DummyHeader](
hosts[2], headertest.NewStore[*headertest.DummyHeader](t, headertest.NewTestSuite(t), 10),
hosts[2], tmServerSideStore,
WithNetworkID[ServerParameters](networkID),
)
require.NoError(t, err)
require.NoError(t, serverSideEx.Start(context.Background()))
t.Cleanup(func() {
serverSideEx.Stop(context.Background()) //nolint:errcheck
})

exchg.peerTracker.peerLk.Lock()
exchg.peerTracker.trackedPeers[hosts[2].ID()] = &peerStat{peerID: hosts[2].ID(), peerScore: 20}
exchg.peerTracker.trackedPeers[hosts[2].ID()] = struct{}{}
exchg.peerTracker.peerLk.Unlock()

h, err := store.GetByHeight(context.Background(), 5)
require.NoError(t, err)

_, err = exchg.GetRangeByHeight(context.Background(), h, 8)
require.NoError(t, err)
// ensure that peerScore for the second peer is changed
newPeerScore := exchg.peerTracker.trackedPeers[hosts[2].ID()].score()
newPeerScore := score(t, exchg.peerTracker.host, hosts[2].ID())
require.NotEqual(t, 20, newPeerScore)
}

Expand Down Expand Up @@ -464,7 +474,9 @@ func TestExchange_HandleHeaderWithDifferentChainID(t *testing.T) {
func TestExchange_RequestHeadersFromAnotherPeerWhenTimeout(t *testing.T) {
// create blankhost because mocknet does not support deadlines
swarm0 := swarm.GenSwarm(t)
host0 := blankhost.NewBlankHost(swarm0)
mngr, err := connmgr.NewConnManager(0, 50)
require.NoError(t, err)
host0 := blankhost.NewBlankHost(swarm0, blankhost.WithConnectionManager(mngr))
swarm1 := swarm.GenSwarm(t)
host1 := blankhost.NewBlankHost(swarm1)
swarm2 := swarm.GenSwarm(t)
Expand Down Expand Up @@ -495,24 +507,25 @@ func TestExchange_RequestHeadersFromAnotherPeerWhenTimeout(t *testing.T) {
t.Cleanup(func() {
serverSideEx.Stop(context.Background()) //nolint:errcheck
})
prevScore := exchg.peerTracker.trackedPeers[host1.ID()].score()
prevScore := score(t, exchg.host, host1.ID())
exchg.peerTracker.peerLk.Lock()
exchg.peerTracker.trackedPeers[host2.ID()] = &peerStat{peerID: host2.ID(), peerScore: 200}
host0.ConnManager().TagPeer(host2.ID(), string(protocolID(networkID)), 100)
exchg.peerTracker.trackedPeers[host2.ID()] = struct{}{}
exchg.peerTracker.peerLk.Unlock()

gen, err := store.GetByHeight(context.Background(), 1)
require.NoError(t, err)

_, err = exchg.GetRangeByHeight(context.Background(), gen, 3)
require.NoError(t, err)
newPeerScore := exchg.peerTracker.trackedPeers[host1.ID()].score()
newPeerScore := score(t, exchg.host, host1.ID())
assert.NotEqual(t, newPeerScore, prevScore)
}

// TestExchange_RequestPartialRange enusres in case of receiving a partial response
// from server, Exchange will re-request remaining headers from another peer
func TestExchange_RequestPartialRange(t *testing.T) {
hosts := createMocknet(t, 3)
hosts := quicHosts(t, 3)
exchg, store := createP2PExAndServer(t, hosts[0], hosts[1])

// create one more server(with more headers in the store)
Expand All @@ -523,13 +536,14 @@ func TestExchange_RequestPartialRange(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
t.Cleanup(cancel)

hosts[0].ConnManager().TagPeer(hosts[1].ID(), string(protocolID(networkID)), 100)

require.NoError(t, err)
require.NoError(t, serverSideEx.Start(ctx))
exchg.peerTracker.peerLk.Lock()
prevScoreBefore1 := exchg.peerTracker.trackedPeers[hosts[1].ID()].peerScore
prevScoreBefore2 := 50
// reducing peerScore of the second server, so our exchange will request host[1] first.
exchg.peerTracker.trackedPeers[hosts[2].ID()] = &peerStat{peerID: hosts[2].ID(), peerScore: 50}
prevScoreBefore1 := score(t, exchg.host, hosts[1].ID())
prevScoreBefore2 := score(t, exchg.host, hosts[2].ID())
exchg.peerTracker.trackedPeers[hosts[2].ID()] = struct{}{}
exchg.peerTracker.peerLk.Unlock()

gen, err := store.GetByHeight(context.Background(), 1)
Expand All @@ -540,8 +554,8 @@ func TestExchange_RequestPartialRange(t *testing.T) {
require.NoError(t, err)

exchg.peerTracker.peerLk.Lock()
prevScoreAfter1 := exchg.peerTracker.trackedPeers[hosts[1].ID()].peerScore
prevScoreAfter2 := exchg.peerTracker.trackedPeers[hosts[2].ID()].peerScore
prevScoreAfter1 := score(t, exchg.host, hosts[1].ID())
prevScoreAfter2 := score(t, exchg.host, hosts[2].ID())
exchg.peerTracker.peerLk.Unlock()

assert.NotEqual(t, prevScoreBefore1, prevScoreAfter1)
Expand All @@ -561,7 +575,6 @@ func createP2PExAndServer(
host, tpeer libhost.Host,
) (*Exchange[*headertest.DummyHeader], *headertest.Store[*headertest.DummyHeader]) {
store := headertest.NewStore[*headertest.DummyHeader](t, headertest.NewTestSuite(t), 5)

serverSideEx, err := NewExchangeServer[*headertest.DummyHeader](tpeer, store,
WithNetworkID[ServerParameters](networkID),
)
Expand All @@ -582,7 +595,7 @@ func createP2PExAndServer(
time.Sleep(time.Millisecond * 100) // give peerTracker time to add a trusted peer

ex.peerTracker.peerLk.Lock()
ex.peerTracker.trackedPeers[tpeer.ID()] = &peerStat{peerID: tpeer.ID(), peerScore: 100.0}
ex.peerTracker.trackedPeers[tpeer.ID()] = struct{}{}
ex.peerTracker.peerLk.Unlock()

t.Cleanup(func() {
Expand All @@ -595,12 +608,15 @@ func createP2PExAndServer(

func quicHosts(t *testing.T, n int) []libhost.Host {
hosts := make([]libhost.Host, n)
var err error
for i := range hosts {
swrm := swarm.GenSwarm(t, swarm.OptDisableTCP)
hosts[i] = blankhost.NewBlankHost(swrm)
require.NoError(t, err)
hosts[i], err = libp2p.New()
for _, host := range hosts[:i] {
hosts[i].Peerstore().AddAddrs(host.ID(), host.Network().ListenAddresses(), peerstore.PermanentAddrTTL)
host.Peerstore().AddAddrs(hosts[i].ID(), hosts[i].Network().ListenAddresses(), peerstore.PermanentAddrTTL)
err = hosts[i].Connect(context.Background(), peer.AddrInfo{ID: host.ID(), Addrs: host.Addrs()})
require.NoError(t, err)
}
}

Expand Down Expand Up @@ -647,3 +663,15 @@ func (t *timedOutStore) Head(context.Context, ...header.HeadOption[*headertest.D
time.Sleep(t.timeout)
return nil, header.ErrNoHead
}

func (t *timedOutStore) GetRange(ctx context.Context, from, to uint64) ([]*headertest.DummyHeader, error) {
time.Sleep(t.timeout)
return t.Store.GetRange(ctx, from, to)
}

func score(t *testing.T, h libhost.Host, id peer.ID) int {
t.Helper()
tags := h.ConnManager().GetTagInfo(id)
tag, _ := tags.Tags[string(protocolID(networkID))]
return tag
}
40 changes: 14 additions & 26 deletions p2p/peer_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,7 @@ type peerStat struct {
sync.RWMutex
peerID peer.ID
// score is the average speed per single request
peerScore float32
// pruneDeadline specifies when disconnected peer will be removed if
// it does not return online.
pruneDeadline time.Time
peerScore int
}

// updateStats recalculates peer.score by averaging the last score
Expand All @@ -26,33 +23,28 @@ type peerStat struct {
// by dividing the amount by time, so the result score will represent how many bytes
// were retrieved in 1 millisecond. This value will then be averaged relative to the
// previous peerScore.
func (p *peerStat) updateStats(amount uint64, duration time.Duration) {
p.Lock()
defer p.Unlock()
func (p *peerStat) updateStats(amount uint64, duration time.Duration) int {
if amount == 0 && duration == 0 {
// decrease peerScore by 20% of the peer that failed the request by any reason.
// NOTE: peerScore will not be decreased if the score is less than 100.
p.peerScore -= p.peerScore / 100 * 20
return p.peerScore
}

averageSpeed := float32(amount)
if duration != 0 {
averageSpeed /= float32(duration.Milliseconds())
}
if p.peerScore == 0.0 {
p.peerScore = averageSpeed
return
p.peerScore = int(averageSpeed * 100)
return p.peerScore
}
p.peerScore = (p.peerScore + averageSpeed) / 2
}

// decreaseScore decreases peerScore by 20% of the peer that failed the request by any reason.
// NOTE: decreasing peerScore in one session will not affect its position in queue in another
// session(as we can have multiple sessions running concurrently).
// TODO(vgonkivs): to figure out the better scoring increments/decrements
func (p *peerStat) decreaseScore() {
p.Lock()
defer p.Unlock()

p.peerScore -= p.peerScore / 100 * 20
p.peerScore = (p.peerScore + int(averageSpeed*100)) / 2
return p.peerScore
}

// score reads a peer's latest score from the queue
func (p *peerStat) score() float32 {
func (p *peerStat) score() int {
p.RLock()
defer p.RUnlock()
return p.peerScore
Expand Down Expand Up @@ -123,10 +115,6 @@ func newPeerQueue(ctx context.Context, stats []*peerStat) *peerQueue {
// in case if there are no peer available in current session, it blocks until
// the peer will be pushed in.
func (p *peerQueue) waitPop(ctx context.Context) *peerStat {
// TODO(vgonkivs): implement fallback solution for cases when peer queue is empty.
// As we discussed with @Wondertan there could be 2 possible solutions:
// * use libp2p.Discovery to find new peers outside peerTracker to request headers;
// * implement IWANT/IHAVE messaging system and start requesting ranges from the Peerstore;
select {
case <-ctx.Done():
return &peerStat{}
Expand Down
Loading

0 comments on commit d1f04df

Please sign in to comment.