diff --git a/fullrt/dht.go b/fullrt/dht.go index a630d995..b334f780 100644 --- a/fullrt/dht.go +++ b/fullrt/dht.go @@ -15,6 +15,7 @@ import ( "github.com/multiformats/go-multihash" "github.com/libp2p/go-libp2p-routing-helpers/tracing" + "github.com/libp2p/go-libp2p/core/event" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" @@ -196,13 +197,12 @@ func NewFullRT(h host.Host, protocolPrefix protocol.ID, options ...Option) (*Ful crawlerInterval: fullrtcfg.crawlInterval, bulkSendParallelism: fullrtcfg.bulkSendParallelism, - - self: self, + self: self, } - rt.wg.Add(1) + rt.wg.Add(2) go rt.runCrawler(ctx) - + go rt.runSubscriber() return rt, nil } @@ -211,6 +211,35 @@ type crawlVal struct { key kadkey.Key } +func (dht *FullRT) runSubscriber() { + defer dht.wg.Done() + ms, ok := dht.messageSender.(dht_pb.MessageSenderWithDisconnect) + if !ok { + return + } + sub, err := dht.Host().EventBus().Subscribe(new(event.EvtPeerConnectednessChanged)) + if err != nil { + logger.Errorf("peer connectedness subscription failed: %s", err) + return + } + defer sub.Close() + for { + select { + case e := <-sub.Out(): + pc, ok := e.(event.EvtPeerConnectednessChanged) + if !ok { + logger.Errorf("invalid event message type: %T", e) + } + if pc.Connectedness != network.Connected { + ms.OnDisconnect(dht.ctx, pc.Peer) + } + case <-dht.ctx.Done(): + return + } + } + +} + func (dht *FullRT) TriggerRefresh(ctx context.Context) error { select { case <-ctx.Done():