Skip to content

Commit

Permalink
Merge pull request #418 from libp2p/feat/refresh-and-wait
Browse files Browse the repository at this point in the history
feat: refresh and wait
  • Loading branch information
Stebalien authored Dec 10, 2019
2 parents 6a005f4 + 50a9858 commit 6028925
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 25 deletions.
4 changes: 2 additions & 2 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ type IpfsDHT struct {
autoRefresh bool
rtRefreshQueryTimeout time.Duration
rtRefreshPeriod time.Duration
triggerRtRefresh chan struct{}
triggerRtRefresh chan chan<- error

maxRecordAge time.Duration
}
Expand Down Expand Up @@ -167,7 +167,7 @@ func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []p
routingTable: rt,
protocols: protocols,
bucketSize: bucketSize,
triggerRtRefresh: make(chan struct{}),
triggerRtRefresh: make(chan chan<- error),
}

dht.ctx = dht.newContextWithLocalTags(ctx)
Expand Down
69 changes: 56 additions & 13 deletions dht_bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package dht

import (
"context"
"fmt"
"time"

multierror "github.com/hashicorp/go-multierror"
process "github.com/jbenet/goprocess"
processctx "github.com/jbenet/goprocess/context"
"github.com/libp2p/go-libp2p-core/routing"
Expand Down Expand Up @@ -59,27 +61,57 @@ func (dht *IpfsDHT) startRefreshing() error {
}

for {
var waiting []chan<- error
select {
case <-refreshTicker.C:
case <-dht.triggerRtRefresh:
logger.Infof("triggering a refresh: RT has %d peers", dht.routingTable.Size())
case res := <-dht.triggerRtRefresh:
if res != nil {
waiting = append(waiting, res)
}
case <-ctx.Done():
return
}
dht.doRefresh(ctx)

// Batch multiple refresh requests if they're all waiting at the same time.
collectWaiting:
for {
select {
case res := <-dht.triggerRtRefresh:
if res != nil {
waiting = append(waiting, res)
}
default:
break collectWaiting
}
}

err := dht.doRefresh(ctx)
for _, w := range waiting {
w <- err
close(w)
}
if err != nil {
logger.Warning(err)
}
}
})

return nil
}

func (dht *IpfsDHT) doRefresh(ctx context.Context) {
dht.selfWalk(ctx)
dht.refreshBuckets(ctx)
func (dht *IpfsDHT) doRefresh(ctx context.Context) error {
var merr error
if err := dht.selfWalk(ctx); err != nil {
merr = multierror.Append(merr, err)
}
if err := dht.refreshBuckets(ctx); err != nil {
merr = multierror.Append(merr, err)
}
return merr
}

// refreshBuckets scans the routing table, and does a random walk on k-buckets that haven't been queried since the given bucket period
func (dht *IpfsDHT) refreshBuckets(ctx context.Context) {
func (dht *IpfsDHT) refreshBuckets(ctx context.Context) error {
doQuery := func(bucketId int, target string, f func(context.Context) error) error {
logger.Infof("starting refreshing bucket %d to %s (routing table size was %d)",
bucketId, target, dht.routingTable.Size())
Expand All @@ -103,6 +135,8 @@ func (dht *IpfsDHT) refreshBuckets(ctx context.Context) {
// 16 bits specified anyways.
buckets = buckets[:16]
}

var merr error
for bucketID, bucket := range buckets {
if time.Since(bucket.RefreshedAt()) <= dht.rtRefreshPeriod {
continue
Expand All @@ -120,20 +154,24 @@ func (dht *IpfsDHT) refreshBuckets(ctx context.Context) {
}

if err := doQuery(bucketID, randPeerInBucket.String(), walkFnc); err != nil {
logger.Warningf("failed to do a random walk on bucket %d: %s", bucketID, err)
merr = multierror.Append(
merr,
fmt.Errorf("failed to do a random walk on bucket %d: %s", bucketID, err),
)
}
}
return merr
}

// Traverse the DHT toward the self ID
func (dht *IpfsDHT) selfWalk(ctx context.Context) {
func (dht *IpfsDHT) selfWalk(ctx context.Context) error {
queryCtx, cancel := context.WithTimeout(ctx, dht.rtRefreshQueryTimeout)
defer cancel()
_, err := dht.FindPeer(queryCtx, dht.self)
if err == routing.ErrNotFound {
return
return nil
}
logger.Warningf("failed to query self during routing table refresh: %s", err)
return fmt.Errorf("failed to query self during routing table refresh: %s", err)
}

// Bootstrap tells the DHT to get into a bootstrapped state satisfying the
Expand All @@ -146,9 +184,14 @@ func (dht *IpfsDHT) Bootstrap(_ context.Context) error {
}

// RefreshRoutingTable tells the DHT to refresh it's routing tables.
func (dht *IpfsDHT) RefreshRoutingTable() {
//
// The returned channel will block until the refresh finishes, then yield the
// error and close. The channel is buffered and safe to ignore.
func (dht *IpfsDHT) RefreshRoutingTable() <-chan error {
res := make(chan error, 1)
select {
case dht.triggerRtRefresh <- struct{}{}:
case dht.triggerRtRefresh <- res:
default:
}
return res
}
24 changes: 16 additions & 8 deletions dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,14 @@ func bootstrap(t *testing.T, ctx context.Context, dhts []*IpfsDHT) {
start := rand.Intn(len(dhts)) // randomize to decrease bias.
for i := range dhts {
dht := dhts[(start+i)%len(dhts)]
dht.RefreshRoutingTable()
select {
case err := <-dht.RefreshRoutingTable():
if err != nil {
t.Error(err)
}
case <-ctx.Done():
return
}
}
}

Expand Down Expand Up @@ -663,25 +670,26 @@ func TestRefresh(t *testing.T) {

<-time.After(100 * time.Millisecond)
// bootstrap a few times until we get good tables.
stop := make(chan struct{})
t.Logf("bootstrapping them so they find each other %d", nDHTs)
ctxT, cancelT := context.WithTimeout(ctx, 5*time.Second)
defer cancelT()

go func() {
for {
t.Logf("bootstrapping them so they find each other %d", nDHTs)
ctxT, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
for ctxT.Err() == nil {
bootstrap(t, ctxT, dhts)

// wait a bit.
select {
case <-time.After(50 * time.Millisecond):
continue // being explicit
case <-stop:
case <-ctxT.Done():
return
}
}
}()

waitForWellFormedTables(t, dhts, 7, 10, 20*time.Second)
close(stop)
cancelT()

if u.Debug {
// the routing tables should be full now. let's inspect them.
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.12

require (
github.com/gogo/protobuf v1.3.1
github.com/hashicorp/go-multierror v1.0.0
github.com/hashicorp/golang-lru v0.5.3
github.com/ipfs/go-cid v0.0.3
github.com/ipfs/go-datastore v0.3.1
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ github.com/gxed/hashland/keccakpg v0.0.1 h1:wrk3uMNaMxbXiHibbPO4S0ymqJMm41WiudyF
github.com/gxed/hashland/keccakpg v0.0.1/go.mod h1:kRzw3HkwxFU1mpmPP8v1WyQzwdGfmKFJ6tItnhQ67kU=
github.com/gxed/hashland/murmur3 v0.0.1 h1:SheiaIt0sda5K+8FLz952/1iWS9zrnKsEJaOJu4ZbSc=
github.com/gxed/hashland/murmur3 v0.0.1/go.mod h1:KjXop02n4/ckmZSnY2+HKcLud/tcmvhST0bie/0lS48=
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.0.0 h1:iVjPR7a6H0tWELX5NxNe7bYopibicUzc7uPribsnS6o=
github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk=
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1 h1:0hERBMJE1eitiLkihrMvRVBYAkpHzc/J3QdDN+dAcgU=
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
Expand Down
4 changes: 2 additions & 2 deletions notif.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (nn *netNotifiee) Connected(n network.Network, v network.Conn) {
dht.Update(dht.Context(), p)
if refresh && dht.autoRefresh {
select {
case dht.triggerRtRefresh <- struct{}{}:
case dht.triggerRtRefresh <- nil:
default:
}
}
Expand Down Expand Up @@ -82,7 +82,7 @@ func (nn *netNotifiee) testConnection(v network.Conn) {
dht.Update(dht.Context(), p)
if refresh && dht.autoRefresh {
select {
case dht.triggerRtRefresh <- struct{}{}:
case dht.triggerRtRefresh <- nil:
default:
}
}
Expand Down

0 comments on commit 6028925

Please sign in to comment.