diff --git a/syncer/peer.go b/syncer/peer.go index 5ebacec..79193d5 100644 --- a/syncer/peer.go +++ b/syncer/peer.go @@ -22,6 +22,7 @@ type Peer struct { ConnAddr string Inbound bool mu sync.Mutex + synced bool err error } @@ -56,6 +57,19 @@ func (p *Peer) Close() error { return nil } +// Synced returns the peer's sync status. +func (p *Peer) Synced() bool { + p.mu.Lock() + defer p.mu.Unlock() + return p.synced +} + +func (p *Peer) setSynced(synced bool) { + p.mu.Lock() + defer p.mu.Unlock() + p.synced = synced +} + func (p *Peer) callRPC(r gateway.Object, timeout time.Duration) error { s, err := p.t.DialStream() if err != nil { diff --git a/syncer/syncer.go b/syncer/syncer.go index eefae87..b3f0371 100644 --- a/syncer/syncer.go +++ b/syncer/syncer.go @@ -184,15 +184,12 @@ type Syncer struct { mu sync.Mutex peers map[string]*Peer - synced map[string]bool strikes map[string]int } func (s *Syncer) resync(p *Peer, reason string) { - s.mu.Lock() - alreadyResyncing := !s.synced[p.t.Addr] - s.synced[p.t.Addr] = false - s.mu.Unlock() + alreadyResyncing := !p.Synced() + p.setSynced(false) if !alreadyResyncing { s.log.Debug("resync triggered", zap.String("peer", p.t.Addr), zap.String("reason", reason)) } @@ -474,7 +471,7 @@ func (s *Syncer) syncLoop(closeChan <-chan struct{}) error { s.mu.Lock() defer s.mu.Unlock() for _, p := range s.peers { - if s.synced[p.t.Addr] { + if p.Synced() { continue } if peers = append(peers, p); len(peers) >= 3 { @@ -500,9 +497,7 @@ func (s *Syncer) syncLoop(closeChan <-chan struct{}) error { if err != nil { return err // generally fatal } - s.mu.Lock() - s.synced[p.t.Addr] = true - s.mu.Unlock() + p.setSynced(true) s.log.Debug("syncing with peer", zap.Stringer("peer", p)) oldTip := s.cm.Tip() oldTime := time.Now() @@ -720,7 +715,6 @@ func New(l net.Listener, cm ChainManager, pm PeerStore, header gateway.Header, o config: config, log: config.Logger, peers: make(map[string]*Peer), - synced: make(map[string]bool), strikes: make(map[string]int), } }