Skip to content

Commit

Permalink
accelerated-dht: cleanup peers from message sender on disconnection
Browse files Browse the repository at this point in the history
  • Loading branch information
sukunrt committed Dec 27, 2024
1 parent 88ef336 commit de2d52f
Showing 1 changed file with 33 additions and 4 deletions.
37 changes: 33 additions & 4 deletions fullrt/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/multiformats/go-multihash"

"github.com/libp2p/go-libp2p-routing-helpers/tracing"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
Expand Down Expand Up @@ -196,13 +197,12 @@ func NewFullRT(h host.Host, protocolPrefix protocol.ID, options ...Option) (*Ful
crawlerInterval: fullrtcfg.crawlInterval,

bulkSendParallelism: fullrtcfg.bulkSendParallelism,

self: self,
self: self,

Check warning on line 200 in fullrt/dht.go

View check run for this annotation

Codecov / codecov/patch

fullrt/dht.go#L200

Added line #L200 was not covered by tests
}

rt.wg.Add(1)
rt.wg.Add(2)

Check warning on line 203 in fullrt/dht.go

View check run for this annotation

Codecov / codecov/patch

fullrt/dht.go#L203

Added line #L203 was not covered by tests
go rt.runCrawler(ctx)

go rt.runSubscriber()

Check warning on line 205 in fullrt/dht.go

View check run for this annotation

Codecov / codecov/patch

fullrt/dht.go#L205

Added line #L205 was not covered by tests
return rt, nil
}

Expand All @@ -211,6 +211,35 @@ type crawlVal struct {
key kadkey.Key
}

func (dht *FullRT) runSubscriber() {
defer dht.wg.Done()
ms, ok := dht.messageSender.(dht_pb.MessageSenderWithDisconnect)
if !ok {
return
}
sub, err := dht.Host().EventBus().Subscribe(new(event.EvtPeerConnectednessChanged))
if err != nil {
logger.Errorf("peer connectedness subscription failed: %s", err)
return
}
defer sub.Close()
for {
select {
case e := <-sub.Out():
pc, ok := e.(event.EvtPeerConnectednessChanged)
if !ok {
logger.Errorf("invalid event message type: %T", e)
}
if pc.Connectedness != network.Connected {
ms.OnDisconnect(dht.ctx, pc.Peer)
}
case <-dht.ctx.Done():
return

Check warning on line 237 in fullrt/dht.go

View check run for this annotation

Codecov / codecov/patch

fullrt/dht.go#L214-L237

Added lines #L214 - L237 were not covered by tests
}
}

}

func (dht *FullRT) TriggerRefresh(ctx context.Context) error {
select {
case <-ctx.Done():
Expand Down

0 comments on commit de2d52f

Please sign in to comment.