Skip to content

Commit

Permalink
create counter type
Browse files Browse the repository at this point in the history
  • Loading branch information
cenkalti committed Mar 29, 2019
1 parent da18d26 commit b9c2b95
Show file tree
Hide file tree
Showing 8 changed files with 55 additions and 26 deletions.
5 changes: 2 additions & 3 deletions torrent/announce.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package torrent

import (
"math"
"sync/atomic"

"github.com/cenkalti/rain/internal/tracker"
)
Expand All @@ -22,8 +21,8 @@ func (t *torrent) announcerFields() tracker.Torrent {
InfoHash: t.infoHash,
PeerID: t.peerID,
Port: t.port,
BytesDownloaded: atomic.LoadInt64(&t.resumerStats.BytesDownloaded),
BytesUploaded: atomic.LoadInt64(&t.resumerStats.BytesUploaded),
BytesDownloaded: t.counters.Read(counterBytesDownloaded),
BytesUploaded: t.counters.Read(counterBytesUploaded),
}
t.mBitfield.RLock()
if t.bitfield == nil {
Expand Down
33 changes: 33 additions & 0 deletions torrent/counters.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package torrent

import "sync/atomic"

type counterName int

// stats
const (
counterBytesDownloaded counterName = iota
counterBytesUploaded
counterBytesWasted
counterSeededFor // time.Duration
)

// counters provides concurrent-safe access over set of integers.
type counters [4]int64

func newCounters(dl, ul, waste, seed int64) counters {
var c counters
c.Incr(counterBytesDownloaded, dl)
c.Incr(counterBytesUploaded, ul)
c.Incr(counterBytesWasted, waste)
c.Incr(counterSeededFor, seed)
return c
}

func (c *counters) Incr(name counterName, value int64) {
atomic.AddInt64(&c[name], value)
}

func (c *counters) Read(name counterName) int64 {
return atomic.LoadInt64(&c[name])
}
14 changes: 7 additions & 7 deletions torrent/messagehandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,36 +18,36 @@ func (t *torrent) handlePieceMessage(pm peer.PieceMessage) {
pe := pm.Peer
if t.pieces == nil || t.bitfield == nil {
pe.Logger().Error("piece received but we don't have info")
t.resumerStats.BytesWasted += int64(len(msg.Buffer.Data))
t.counters.Incr(counterBytesWasted, int64(len(msg.Buffer.Data)))
t.closePeer(pe)
msg.Buffer.Release()
return
}
if msg.Index >= uint32(len(t.pieces)) {
pe.Logger().Errorln("invalid piece index:", msg.Index)
t.resumerStats.BytesWasted += int64(len(msg.Buffer.Data))
t.counters.Incr(counterBytesWasted, int64(len(msg.Buffer.Data)))
t.closePeer(pe)
msg.Buffer.Release()
return
}
t.downloadSpeed.Update(int64(len(msg.Buffer.Data)))
t.resumerStats.BytesDownloaded += int64(len(msg.Buffer.Data))
t.counters.Incr(counterBytesDownloaded, int64(len(msg.Buffer.Data)))
pd, ok := t.pieceDownloaders[pe]
if !ok {
t.resumerStats.BytesWasted += int64(len(msg.Buffer.Data))
t.counters.Incr(counterBytesWasted, int64(len(msg.Buffer.Data)))
msg.Buffer.Release()
return
}
if pd.Piece.Index != msg.Index {
t.resumerStats.BytesWasted += int64(len(msg.Buffer.Data))
t.counters.Incr(counterBytesWasted, int64(len(msg.Buffer.Data)))
msg.Buffer.Release()
return
}
piece := pd.Piece
block, ok := piece.FindBlock(msg.Begin, uint32(len(msg.Buffer.Data)))
if !ok {
pe.Logger().Errorln("invalid piece index:", msg.Index, "begin:", msg.Begin, "length:", len(msg.Buffer.Data))
t.resumerStats.BytesWasted += int64(len(msg.Buffer.Data))
t.counters.Incr(counterBytesWasted, int64(len(msg.Buffer.Data)))
t.closePeer(pe)
msg.Buffer.Release()
return
Expand Down Expand Up @@ -288,7 +288,7 @@ func (t *torrent) handlePeerMessage(pm peer.Message) {
}
case peerwriter.BlockUploaded:
t.uploadSpeed.Update(int64(msg.Length))
t.resumerStats.BytesUploaded += int64(msg.Length)
t.counters.Incr(counterBytesUploaded, int64(msg.Length))
case peerprotocol.ExtensionHandshakeMessage:
pe.Logger().Debugln("extension handshake received:", msg)
if pe.ExtensionHandshake != nil {
Expand Down
10 changes: 4 additions & 6 deletions torrent/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"runtime"
"strconv"
"sync"
"sync/atomic"
"time"

"github.com/boltdb/bolt"
Expand Down Expand Up @@ -216,11 +215,10 @@ func (s *Session) updateStats() {
s.mTorrents.RLock()
for _, t := range s.torrents {
b := mb.Bucket([]byte(t.torrent.id))
_ = b.Put(boltdbresumer.Keys.BytesDownloaded, []byte(strconv.FormatInt(atomic.LoadInt64(&t.torrent.resumerStats.BytesDownloaded), 10)))
_ = b.Put(boltdbresumer.Keys.BytesDownloaded, []byte(strconv.FormatInt(atomic.LoadInt64(&t.torrent.resumerStats.BytesDownloaded), 10)))
_ = b.Put(boltdbresumer.Keys.BytesUploaded, []byte(strconv.FormatInt(atomic.LoadInt64(&t.torrent.resumerStats.BytesUploaded), 10)))
_ = b.Put(boltdbresumer.Keys.BytesWasted, []byte(strconv.FormatInt(atomic.LoadInt64(&t.torrent.resumerStats.BytesWasted), 10)))
_ = b.Put(boltdbresumer.Keys.SeededFor, []byte(time.Duration(atomic.LoadInt64(&t.torrent.resumerStats.SeededFor)).String()))
_ = b.Put(boltdbresumer.Keys.BytesDownloaded, []byte(strconv.FormatInt(t.torrent.counters.Read(counterBytesDownloaded), 10)))
_ = b.Put(boltdbresumer.Keys.BytesUploaded, []byte(strconv.FormatInt(t.torrent.counters.Read(counterBytesUploaded), 10)))
_ = b.Put(boltdbresumer.Keys.BytesWasted, []byte(strconv.FormatInt(t.torrent.counters.Read(counterBytesWasted), 10)))
_ = b.Put(boltdbresumer.Keys.SeededFor, []byte(time.Duration(t.torrent.counters.Read(counterSeededFor)).String()))
}
s.mTorrents.RUnlock()
return nil
Expand Down
11 changes: 5 additions & 6 deletions torrent/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package torrent

import (
"strings"
"sync/atomic"
"time"
"unicode"

Expand Down Expand Up @@ -134,10 +133,10 @@ func (t *torrent) stats() Stats {
s.Downloads.Choked = len(t.pieceDownloadersChoked)
s.Downloads.Running = len(t.pieceDownloaders) - len(t.pieceDownloadersChoked) - len(t.pieceDownloadersSnubbed)
s.Pieces.Available = t.avaliablePieceCount()
s.Bytes.Downloaded = t.resumerStats.BytesDownloaded
s.Bytes.Uploaded = t.resumerStats.BytesUploaded
s.Bytes.Wasted = t.resumerStats.BytesWasted
s.SeededFor = time.Duration(t.resumerStats.SeededFor)
s.Bytes.Downloaded = t.counters.Read(counterBytesDownloaded)
s.Bytes.Uploaded = t.counters.Read(counterBytesUploaded)
s.Bytes.Wasted = t.counters.Read(counterBytesWasted)
s.SeededFor = time.Duration(t.counters.Read(counterSeededFor))
s.Bytes.Allocated = t.bytesAllocated
s.Pieces.Checked = t.checkedPieces
s.Speed.Download = uint(t.downloadSpeed.Rate())
Expand Down Expand Up @@ -292,6 +291,6 @@ func (t *torrent) updateSeedDuration(now time.Time) {
t.seedDurationUpdatedAt = now
return
}
atomic.AddInt64(&t.resumerStats.SeededFor, int64(now.Sub(t.seedDurationUpdatedAt)))
t.counters.Incr(counterSeededFor, int64(now.Sub(t.seedDurationUpdatedAt)))
t.seedDurationUpdatedAt = now
}
4 changes: 2 additions & 2 deletions torrent/torrent.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ type torrent struct {
verifierResultC chan *verifier.Verifier
checkedPieces uint32

resumerStats resumer.Stats
counters counters
seedDurationUpdatedAt time.Time
seedDurationTicker *time.Ticker

Expand Down Expand Up @@ -325,7 +325,7 @@ func newTorrent2(
bannedPeerIPs: make(map[string]struct{}),
announcersStoppedC: make(chan struct{}),
dhtPeersC: make(chan []*net.TCPAddr, 1),
resumerStats: stats,
counters: newCounters(stats.BytesDownloaded, stats.BytesUploaded, stats.BytesWasted, stats.SeededFor),
externalIP: externalip.FirstExternalIP(),
downloadSpeed: metrics.NewEWMA1(),
uploadSpeed: metrics.NewEWMA1(),
Expand Down
2 changes: 1 addition & 1 deletion torrent/webseed.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func (t *torrent) handleWebseedPieceResult(msg *urldownloader.PieceResult) {

piece := &t.pieces[msg.Index]

t.resumerStats.BytesDownloaded += int64(len(msg.Buffer.Data))
t.counters.Incr(counterBytesDownloaded, int64(len(msg.Buffer.Data)))
t.downloadSpeed.Update(int64(len(msg.Buffer.Data)))
for _, src := range t.webseedSources {
if src.URL != msg.Downloader.URL {
Expand Down
2 changes: 1 addition & 1 deletion torrent/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func (t *torrent) handlePieceWriteDone(pw *piecewriter.PieceWriter) {
pw.Buffer.Release()

if !pw.HashOK {
t.resumerStats.BytesWasted += int64(len(pw.Buffer.Data))
t.counters.Incr(counterBytesWasted, int64(len(pw.Buffer.Data)))
switch src := pw.Source.(type) {
case *peer.Peer:
t.log.Errorln("received corrupt piece from peer", src.String())
Expand Down

0 comments on commit b9c2b95

Please sign in to comment.