Skip to content

Commit

Permalink
accelerated-dht: cleanup peer from message sender on disconnection (#…
Browse files Browse the repository at this point in the history
…1009)

* accelerated-dht: cleanup peers from message sender on disconnection

---------

Co-authored-by: Marco Munizaga <[email protected]>
  • Loading branch information
sukunrt and MarcoPolo authored Jan 9, 2025
1 parent ab9423d commit c35df8a
Showing 1 changed file with 39 additions and 5 deletions.
44 changes: 39 additions & 5 deletions fullrt/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@ import (
"github.com/multiformats/go-multihash"

"github.com/libp2p/go-libp2p-routing-helpers/tracing"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/core/routing"
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
swarm "github.com/libp2p/go-libp2p/p2p/net/swarm"

"github.com/gogo/protobuf/proto"
Expand Down Expand Up @@ -98,6 +100,8 @@ type FullRT struct {
bulkSendParallelism int

self peer.ID

peerConnectednessSubscriber event.Subscription
}

// NewFullRT creates a DHT client that tracks the full network. It takes a protocol prefix for the given network,
Expand Down Expand Up @@ -151,6 +155,11 @@ func NewFullRT(h host.Host, protocolPrefix protocol.ID, options ...Option) (*Ful
}
}

sub, err := h.EventBus().Subscribe(new(event.EvtPeerConnectednessChanged), eventbus.Name("fullrt-dht"))
if err != nil {
return nil, fmt.Errorf("peer connectedness subscription failed: %w", err)
}

ctx, cancel := context.WithCancel(context.Background())

self := h.ID()
Expand Down Expand Up @@ -195,14 +204,14 @@ func NewFullRT(h host.Host, protocolPrefix protocol.ID, options ...Option) (*Ful

crawlerInterval: fullrtcfg.crawlInterval,

bulkSendParallelism: fullrtcfg.bulkSendParallelism,

self: self,
bulkSendParallelism: fullrtcfg.bulkSendParallelism,
self: self,
peerConnectednessSubscriber: sub,
}

rt.wg.Add(1)
rt.wg.Add(2)
go rt.runCrawler(ctx)

go rt.runSubscriber()
return rt, nil
}

Expand All @@ -211,6 +220,31 @@ type crawlVal struct {
key kadkey.Key
}

func (dht *FullRT) runSubscriber() {
defer dht.wg.Done()
ms, ok := dht.messageSender.(dht_pb.MessageSenderWithDisconnect)
defer dht.peerConnectednessSubscriber.Close()
if !ok {
return
}
for {
select {
case e := <-dht.peerConnectednessSubscriber.Out():
pc, ok := e.(event.EvtPeerConnectednessChanged)
if !ok {
logger.Errorf("invalid event message type: %T", e)
continue
}

if pc.Connectedness != network.Connected {
ms.OnDisconnect(dht.ctx, pc.Peer)
}
case <-dht.ctx.Done():
return
}
}
}

func (dht *FullRT) TriggerRefresh(ctx context.Context) error {
select {
case <-ctx.Done():
Expand Down

0 comments on commit c35df8a

Please sign in to comment.