Skip to content

Commit

Permalink
feat(SPV-742): adjust comments- move network to wire package; reimple…
Browse files Browse the repository at this point in the history
…ment address book with buckets to improve geting random address; connect to one randomly chosen seed at start instead of all of them
  • Loading branch information
arkadiuszos4chain committed May 10, 2024
1 parent bb1004c commit 25f2fbb
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 105 deletions.
132 changes: 79 additions & 53 deletions internal/transports/p2p/network/address_book.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,49 +6,59 @@ import (
"sync"
"time"

"github.com/bitcoin-sv/block-headers-service/internal/transports/p2p/peer"
"github.com/bitcoin-sv/block-headers-service/internal/wire"
)

type addressBucketType string

const (
freeBucket addressBucketType = "free"
usedBucket addressBucketType = "used"
bannedBucket addressBucketType = "banned"
)

// AddressBook represents a collection of known network addresses.
type AddressBook struct {
banDuration time.Duration
addrs []*knownAddress // addrs is a slice containing known addresses
addrsLookup map[string]int // addrLookup is a map for fast lookup of addresses, maps address key to index in addrs slice
addrs map[addressBucketType]*addrBucket
mu sync.Mutex
addrFitlerFn func(*wire.NetAddress) bool
}

// NewAddressBook creates and initializes a new AddressBook instance.
func NewAddressBook(banDuration time.Duration, acceptLocalAddresses bool) *AddressBook {
// Set the address filter function based on whether local addresses are accepted
addrFilterFn := IsRoutable
addrFilterFn := wire.IsRoutable
if acceptLocalAddresses {
addrFilterFn = IsRoutableWithLocal
addrFilterFn = wire.IsRoutableWithLocal
}

const addressesInitCapacity = 500
const usedAddressesInitCapacity = 8

knownAddress := make(map[addressBucketType]*addrBucket, 3)
knownAddress[freeBucket] = newAddrBucket(addressesInitCapacity)
knownAddress[bannedBucket] = newAddrBucket(addressesInitCapacity)
knownAddress[usedBucket] = newAddrBucket(usedAddressesInitCapacity)

return &AddressBook{
addrs: make([]*knownAddress, 0, addressesInitCapacity),
addrsLookup: make(map[string]int, addressesInitCapacity),
banDuration: banDuration,
addrFitlerFn: addrFilterFn,
addrs: knownAddress,
}
}

// UpsertPeerAddr updates or adds a peer's address.
func (a *AddressBook) UpsertPeerAddr(p *peer.Peer) {
// MarkUsed updates or adds a peer's address.
func (a *AddressBook) MarkUsed(pa *wire.NetAddress) {
a.mu.Lock()
defer a.mu.Unlock()

pa := p.GetPeerAddr()
key, ka := a.findAddr(pa)
key := addrKey(pa)
// remove from free if exists
a.addrs[freeBucket].rm(key)
// add to used
a.addrs[usedBucket].add(key, &knownAddress{addr: pa})

if ka != nil {
ka.peer = p
} else {
a.addAddr(key, &knownAddress{addr: pa, peer: p})
}
}

// UpsertAddrs updates or adds multiple addresses.
Expand All @@ -61,10 +71,10 @@ func (a *AddressBook) UpsertAddrs(address []*wire.NetAddress) {
continue
}

key, ka := a.findAddr(addr)
key, ka, _ := a.findAddr(addr)
// If the address is not found, add it to the AddressBook.
if ka == nil {
a.addAddr(key, &knownAddress{addr: addr})
a.addrs[freeBucket].add(key, &knownAddress{addr: addr})
} else if addr.Timestamp.After(ka.addr.Timestamp) {
// Otherwise, update the timestamp if the new one is newer.
ka.addr.Timestamp = addr.Timestamp
Expand All @@ -77,49 +87,72 @@ func (a *AddressBook) BanAddr(addr *wire.NetAddress) {
a.mu.Lock()
defer a.mu.Unlock()

_, ka := a.findAddr(addr)
if ka != nil {
now := time.Now()
ka.banTimestamp = &now
if key, ka, bucket := a.findAddr(addr); ka != nil {
switch bucket {

Check failure on line 91 in internal/transports/p2p/network/address_book.go

View workflow job for this annotation

GitHub Actions / on-push / Lint

missing cases in switch of type network.addressBucketType: network.bannedBucket (exhaustive)
case freeBucket:
a.addBan(bucket, key, ka)
case usedBucket:
a.addBan(bucket, key, ka)
default:
// Do nothing
}
}
}

// GetRandUnusedAddr returns a randomly chosen unused network address.
func (a *AddressBook) GetRandUnusedAddr(tries uint) *wire.NetAddress {
// GetRandFreeAddr returns a randomly chosen unused network address.
func (a *AddressBook) GetRandFreeAddr() *wire.NetAddress {
a.mu.Lock()
defer a.mu.Unlock()

alen := len(a.addrs)
for i := uint(0); i < tries; i++ {
// #nosec G404
ka := a.addrs[rand.Intn(alen)]
if ka.peer == nil {
if ka.isBanned(a.banDuration) {
continue
}
return ka.addr
}
freeAddres := a.addrs[freeBucket].items
fLen := len(freeAddres)
if fLen == 0 {
return nil
}

// return nil if no suitable address is found
return nil
// #nosec G404
randIndx := rand.Intn(fLen)
return freeAddres[randIndx].addr
}

func (a *AddressBook) findAddr(addr *wire.NetAddress) (string, *knownAddress) {
key := addrKey(addr)
addrIndex, ok := a.addrsLookup[key]
func (a *AddressBook) findAddr(addr *wire.NetAddress) (key string, ka *knownAddress, bucket addressBucketType) {
key = addrKey(addr)

if ok {
return key, a.addrs[addrIndex]
// search in free addresses
if ka = a.addrs[freeBucket].find(key); ka != nil {
bucket = freeBucket
return
}
return key, nil

// search in used
if ka = a.addrs[usedBucket].find(key); ka != nil {
bucket = usedBucket
return
}

// search in banned
if ka = a.addrs[bannedBucket].find(key); ka != nil {
bucket = bannedBucket
return
}

return key, nil, ""
}

func (a *AddressBook) addAddr(key string, addr *knownAddress) {
newItemIndex := len(a.addrs)
func (a *AddressBook) addBan(bucket addressBucketType, key string, ka *knownAddress) {
a.addrs[bucket].rm(key)
a.addrs[bannedBucket].add(key, ka)
go a.removeBan(key, ka)
}

func (a *AddressBook) removeBan(key string, ka *knownAddress) {
time.Sleep(a.banDuration)

a.mu.Lock()
defer a.mu.Unlock()

a.addrs = append(a.addrs, addr)
a.addrsLookup[key] = newItemIndex
a.addrs[bannedBucket].rm(key)
a.addrs[freeBucket].add(key, ka)
}

func addrKey(addr *wire.NetAddress) string {
Expand All @@ -128,11 +161,4 @@ func addrKey(addr *wire.NetAddress) string {

type knownAddress struct {
addr *wire.NetAddress
peer *peer.Peer

banTimestamp *time.Time
}

func (a *knownAddress) isBanned(duration time.Duration) bool {
return a.banTimestamp != nil && time.Since(*a.banTimestamp) < duration
}
15 changes: 9 additions & 6 deletions internal/transports/p2p/network/address_book_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func TestAddressBook_UpsertAddrs(t *testing.T) {
sut.UpsertAddrs([]*wire.NetAddress{local, external})

// then
require.Len(t, sut.addrs, 2)
require.Len(t, sut.addrs[freeBucket].items, 2)
})

t.Run("add new - do not accept local", func(t *testing.T) {
Expand All @@ -51,7 +51,7 @@ func TestAddressBook_UpsertAddrs(t *testing.T) {
sut.UpsertAddrs([]*wire.NetAddress{local, external})

// then
require.Len(t, sut.addrs, 1)
require.Len(t, sut.addrs[freeBucket].items, 1)
})

t.Run("add existing", func(t *testing.T) {
Expand All @@ -74,8 +74,9 @@ func TestAddressBook_UpsertAddrs(t *testing.T) {
sut.UpsertAddrs([]*wire.NetAddress{updated})

// then
require.Len(t, sut.addrs, 1)
require.Equal(t, updated.Timestamp, sut.addrs[0].addr.Timestamp)
freeItems := sut.addrs[freeBucket].items
require.Len(t, freeItems, 1)
require.Equal(t, updated.Timestamp, freeItems[0].addr.Timestamp)
})
}

Expand All @@ -96,7 +97,9 @@ func TestAddressBook_BanAddr(t *testing.T) {
sut.BanAddr(addr)

// then
require.True(t, sut.addrs[0].isBanned(time.Hour))
require.Len(t, sut.addrs[bannedBucket].items, 1)
require.Len(t, sut.addrs[freeBucket].items, 0)

Check failure on line 101 in internal/transports/p2p/network/address_book_test.go

View workflow job for this annotation

GitHub Actions / on-push / Lint

empty: use require.Empty (testifylint)
require.Len(t, sut.addrs[usedBucket].items, 0)

Check failure on line 102 in internal/transports/p2p/network/address_book_test.go

View workflow job for this annotation

GitHub Actions / on-push / Lint

empty: use require.Empty (testifylint)
})
}

Expand All @@ -121,7 +124,7 @@ func TestAddressBook_GetRandUnusedAddr(t *testing.T) {
sut.BanAddr(addr2)

// when
r := sut.GetRandUnusedAddr(100)
r := sut.GetRandFreeAddr()

// then
require.Equal(t, addr, r)
Expand Down
42 changes: 42 additions & 0 deletions internal/transports/p2p/network/addresses_bucket.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package network

type addrBucket struct {
items []*knownAddress
lookup map[string]int
}

func newAddrBucket(initCapacity uint) *addrBucket {
return &addrBucket{
items: make([]*knownAddress, 0, initCapacity),
lookup: make(map[string]int, initCapacity),
}
}

func (a *addrBucket) find(key string) *knownAddress {
addrIndex, ok := a.lookup[key]

if ok {
return a.items[addrIndex]
}
return nil
}

func (a *addrBucket) add(key string, addr *knownAddress) {
newItemIndex := len(a.items)

a.items = append(a.items, addr)
a.lookup[key] = newItemIndex
}

func (a *addrBucket) rm(key string) {
addrIndex, ok := a.lookup[key]

if ok {
// substitute with last element
a.items[addrIndex] = a.items[len(a.items)-1]
// remove last element
a.items = a.items[:len(a.items)-1]

delete(a.lookup, key)
}
}
36 changes: 15 additions & 21 deletions internal/transports/p2p/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"math/rand"
"net"
"sync"
"time"
Expand Down Expand Up @@ -103,28 +104,22 @@ func (s *server) connectOutboundPeers() error {
return errors.New("no seeds found")
}

if len(seeds) > int(s.config.MaxOutboundConnections) {
seeds = seeds[:s.config.MaxOutboundConnections]
}

// add seed addreses to address book
addrs := make([]*wire.NetAddress, 0, len(seeds))
for _, seed := range seeds {
s.log.Debug().Msgf("got peer addr: %s", seed.String())
addrs = append(addrs, &wire.NetAddress{IP: seed, Port: s.chainParams.DefaultPort})
}
s.addresses.UpsertAddrs(addrs)

peersCounter := 0
for _, addr := range seeds {
if err := s.connectToAddr(addr, s.chainParams.DefaultPort); err != nil {
continue
}

peersCounter++
}

if peersCounter == 0 {
return errors.New("cannot connect to any peers from seed")
// connect to random seed
// #nosec G404
randomSeed := seeds[rand.Intn(len(seeds))]
if err := s.connectToAddr(randomSeed, s.chainParams.DefaultPort); err != nil {
return err
}

s.log.Info().Msgf("connected to %d peers", peersCounter)
s.log.Info().Msgf("connected to %s peer", randomSeed.String())
go s.observeOutboundPeers()
return nil
}
Expand Down Expand Up @@ -173,7 +168,7 @@ func (s *server) connectToAddr(addr net.IP, port uint16) error {
}

_ = s.outboundPeers.AddPeer(peer) // don't need to check error here
s.addresses.UpsertPeerAddr(peer)
s.addresses.MarkUsed(peer.GetPeerAddr())
return nil
}

Expand Down Expand Up @@ -230,10 +225,9 @@ func (s *server) observeOutboundPeers() {
}

func (s *server) connectToRandomAddr() {
const tries = 20
addr := s.addresses.GetRandUnusedAddr(tries)
addr := s.addresses.GetRandFreeAddr()
if addr == nil {
s.log.Warn().Msgf("[observeOutboundPeers] coudnt find random unused/unbanned peer address with %d tries", tries)
s.log.Warn().Msgf("[observeOutboundPeers] no free addresses to connect")
return
}

Expand Down Expand Up @@ -286,7 +280,7 @@ func (s *server) waitForIncomingConnection() {
}

_ = s.inboundPeers.AddPeer(peer)
s.addresses.UpsertPeerAddr(peer)
s.addresses.MarkUsed(peer.GetPeerAddr())
}

// AddAddrs adds addresses to the address book of the P2P server. It's peer.Manager functionality.
Expand Down
Loading

0 comments on commit 25f2fbb

Please sign in to comment.