Skip to content

Commit

Permalink
Merge pull request #698 from libp2p/fix/697
Browse files Browse the repository at this point in the history
Fix constructor ordering
  • Loading branch information
aschmahmann authored Dec 9, 2020
2 parents 7db4172 + 33ec247 commit 09d923f
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 14 deletions.
11 changes: 9 additions & 2 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,14 +218,21 @@ func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error)
// handle providers
dht.proc.AddChild(dht.ProviderManager.Process())

dht.proc.Go(dht.populatePeers)

// go-routine to make sure we ALWAYS have RT peer addresses in the peerstore
// since RT membership is decoupled from connectivity
go dht.persistRTPeersInPeerStore()

dht.proc.Go(dht.rtPeerLoop)

// Fill routing table with currently connected peers that are DHT servers
dht.plk.Lock()
for _, p := range dht.host.Network().Peers() {
dht.peerFound(dht.ctx, p, false)
}
dht.plk.Unlock()

dht.proc.Go(dht.populatePeers)

return dht, nil
}

Expand Down
2 changes: 1 addition & 1 deletion dht_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ type config struct {
diversityFilter peerdiversity.PeerIPGroupFilter
}

bootstrapPeers []peer.AddrInfo
bootstrapPeers []peer.AddrInfo

// test specific config options
disableFixLowPeers bool
Expand Down
78 changes: 74 additions & 4 deletions dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1871,12 +1871,12 @@ func TestV1ProtocolOverride(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

d1 := setupDHT(ctx, t, false, V1ProtocolOverride("/myproto") )
d2 := setupDHT(ctx, t, false, V1ProtocolOverride("/myproto") )
d1 := setupDHT(ctx, t, false, V1ProtocolOverride("/myproto"))
d2 := setupDHT(ctx, t, false, V1ProtocolOverride("/myproto"))
d3 := setupDHT(ctx, t, false, V1ProtocolOverride("/myproto2"))
d4 := setupDHT(ctx, t, false)

dhts := []*IpfsDHT{d1,d2,d3,d4}
dhts := []*IpfsDHT{d1, d2, d3, d4}

for i, dout := range dhts {
for _, din := range dhts[i+1:] {
Expand All @@ -1893,7 +1893,7 @@ func TestV1ProtocolOverride(t *testing.T) {
t.Fatal("should have one peer in the routing table")
}

if d3.RoutingTable().Size() > 0 || d4.RoutingTable().Size() > 0{
if d3.RoutingTable().Size() > 0 || d4.RoutingTable().Size() > 0 {
t.Fatal("should have an empty routing table")
}
}
Expand Down Expand Up @@ -2023,3 +2023,73 @@ func TestBootStrapWhenRTIsEmpty(t *testing.T) {
rt.Find(bootstrappers[2].self) != "" && rt.Find(bootstrapcons[1].self) != "" && rt.Find(bootstrapcons[2].self) != ""
}, 5*time.Second, 500*time.Millisecond)
}

func TestPreconnectedNodes(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// If this test fails it may hang so set a timeout
ctx, cancel = context.WithTimeout(ctx, time.Second*10)
defer cancel()

opts := []Option{
testPrefix,
DisableAutoRefresh(),
Mode(ModeServer),
}

// Create hosts
h1 := bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport))
h2 := bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport))

// Setup first DHT
d1, err := New(ctx, h1, opts...)
if err != nil {
t.Fatal(err)
}

// Connect the first host to the second
if err := h1.Connect(ctx, peer.AddrInfo{ID: h2.ID(), Addrs: h2.Addrs()}); err != nil {
t.Fatal(err)
}

// Wait until we know identify has completed by checking for supported protocols
// TODO: Is this needed? Could we do h2.Connect(h1) and that would wait for identify to complete.
for {
h1Protos, err := h2.Peerstore().SupportsProtocols(h1.ID(), d1.protocolsStrs...)
if err != nil {
t.Fatal(err)
}

if len(h1Protos) > 0 {
break
}

select {
case <-time.After(time.Millisecond * 100):
case <-ctx.Done():
t.Fatal("test hung")
}
}

// Setup the second DHT
d2, err := New(ctx, h2, opts...)
if err != nil {
t.Fatal(err)
}

// See if it works
peerCh, err := d2.GetClosestPeers(ctx, "testkey")
if err != nil {
t.Fatal(err)
}

select {
case p := <-peerCh:
if p == h1.ID() {
break
}
t.Fatal("could not find peer")
case <-ctx.Done():
t.Fatal("test hung")
}
}
7 changes: 0 additions & 7 deletions subscriber_notifee.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,6 @@ func newSubscriberNotifiee(dht *IpfsDHT) (*subscriberNotifee, error) {
// register for network notifications
dht.host.Network().Notify(nn)

// Fill routing table with currently connected peers that are DHT servers
dht.plk.Lock()
defer dht.plk.Unlock()
for _, p := range dht.host.Network().Peers() {
dht.peerFound(dht.ctx, p, false)
}

return nn, nil
}

Expand Down

0 comments on commit 09d923f

Please sign in to comment.