Skip to content

Commit

Permalink
do not get stuck on NeedMorePeers
Browse files Browse the repository at this point in the history
  • Loading branch information
cenkalti committed Mar 1, 2019
1 parent e3f4fd8 commit 4e74046
Showing 1 changed file with 37 additions and 24 deletions.
61 changes: 37 additions & 24 deletions internal/announcer/periodic.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package announcer
import (
"math"
"net"
"sync"
"time"

"github.com/cenkalti/backoff"
Expand All @@ -20,25 +21,28 @@ const (
)

type PeriodicalAnnouncer struct {
Tracker tracker.Tracker
status Status
statsCommandC chan statsRequest
numWant int
interval time.Duration
minInterval time.Duration
seeders int
leechers int
lastError error
log logger.Logger
completedC chan struct{}
newPeers chan []*net.TCPAddr
backoff backoff.BackOff
requests chan *Request
lastAnnounce time.Time
HasAnnounced bool
needMorePeersC chan bool
closeC chan struct{}
doneC chan struct{}
Tracker tracker.Tracker
status Status
statsCommandC chan statsRequest
numWant int
interval time.Duration
minInterval time.Duration
seeders int
leechers int
lastError error
log logger.Logger
completedC chan struct{}
newPeers chan []*net.TCPAddr
backoff backoff.BackOff
requests chan *Request
lastAnnounce time.Time
HasAnnounced bool
closeC chan struct{}
doneC chan struct{}

needMorePeers bool
mNeedMorePeers sync.RWMutex
needMorePeersC chan struct{}
}

type Request struct {
Expand All @@ -61,7 +65,7 @@ func NewPeriodicalAnnouncer(trk tracker.Tracker, numWant int, minInterval time.D
completedC: completedC,
newPeers: newPeers,
requests: requests,
needMorePeersC: make(chan bool),
needMorePeersC: make(chan struct{}, 1),
closeC: make(chan struct{}),
doneC: make(chan struct{}),
backoff: &backoff.ExponentialBackOff{
Expand Down Expand Up @@ -99,9 +103,13 @@ func (a *PeriodicalAnnouncer) Stats() Stats {
}

func (a *PeriodicalAnnouncer) NeedMorePeers(val bool) {
a.mNeedMorePeers.Lock()
a.needMorePeers = val
a.mNeedMorePeers.Unlock()
select {
case a.needMorePeersC <- val:
case a.needMorePeersC <- struct{}{}:
case <-a.doneC:
default:
}
}

Expand All @@ -112,8 +120,6 @@ func (a *PeriodicalAnnouncer) Run() {
timer := time.NewTimer(math.MaxInt64)
defer timer.Stop()

var needMorePeers bool

ca := newCancelableAnnouncer(a.Tracker, a.requests, a.newPeers)
defer ca.Cancel()

Expand All @@ -136,20 +142,27 @@ func (a *PeriodicalAnnouncer) Run() {
a.lastError = nil
a.status = Working
a.backoff.Reset()
a.mNeedMorePeers.RLock()
needMorePeers := a.needMorePeers
a.mNeedMorePeers.RUnlock()
if needMorePeers {
timer.Reset(a.minInterval)
} else {
timer.Reset(a.interval)
}
case a.lastError = <-ca.ErrorC:
ca.announcing = false
a.lastAnnounce = time.Now()
a.status = NotWorking
a.log.Debugln("announce error:", a.lastError)
timer.Reset(a.backoff.NextBackOff())
case needMorePeers = <-a.needMorePeersC:
case <-a.needMorePeersC:
if ca.announcing {
break
}
a.mNeedMorePeers.RLock()
needMorePeers := a.needMorePeers
a.mNeedMorePeers.RUnlock()
if needMorePeers {
timer.Reset(time.Until(a.lastAnnounce.Add(a.minInterval)))
} else {
Expand Down

0 comments on commit 4e74046

Please sign in to comment.