Skip to content

Commit

Permalink
limit number of writes to do in parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
cenkalti committed Aug 30, 2019
1 parent 794f053 commit 2dcd4f2
Show file tree
Hide file tree
Showing 8 changed files with 20 additions and 3 deletions.
5 changes: 4 additions & 1 deletion internal/piecewriter/piecewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/cenkalti/rain/internal/bufferpool"
"github.com/cenkalti/rain/internal/piece"
"github.com/cenkalti/rain/internal/semaphore"
"github.com/rcrowley/go-metrics"
)

Expand All @@ -25,12 +26,14 @@ func New(p *piece.Piece, source interface{}, buf bufferpool.Buffer) *PieceWriter
}
}

func (w *PieceWriter) Run(resultC chan *PieceWriter, closeC chan struct{}, writesPerSecond, writeBytesPerSecond metrics.EWMA) {
func (w *PieceWriter) Run(resultC chan *PieceWriter, closeC chan struct{}, writesPerSecond, writeBytesPerSecond metrics.EWMA, sem *semaphore.Semaphore) {
w.HashOK = w.Piece.VerifyHash(w.Buffer.Data, sha1.New()) // nolint: gosec
if w.HashOK {
writesPerSecond.Update(1)
writeBytesPerSecond.Update(int64(len(w.Buffer.Data)))
sem.Wait()
_, w.Error = w.Piece.Data.Write(w.Buffer.Data)
sem.Signal()
}
select {
case resultC <- w:
Expand Down
2 changes: 2 additions & 0 deletions internal/rpctypes/rpctypes.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ type SessionStats struct {
Uptime int
WritesPerSecond int
WriteBytesPerSecond int
WritesActive int
WritesPending int
}

type Stats struct {
Expand Down
3 changes: 3 additions & 0 deletions torrent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ type Config struct {
PieceCacheTTL time.Duration
// Number of read operations to do in parallel.
ParallelReads uint
// Number of write operations to do in parallel.
ParallelWrites uint

// When the client want to connect a peer, first it tries to do encrypted handshake.
// If it does not work, it connects to same peer again and does unencrypted handshake.
Expand Down Expand Up @@ -223,6 +225,7 @@ var DefaultConfig = Config{
PieceCacheSize: 256 * 1024 * 1024,
PieceCacheTTL: 5 * time.Minute,
ParallelReads: 1,
ParallelWrites: 1,

// Webseed settings
WebseedDialTimeout: 10 * time.Second,
Expand Down
3 changes: 3 additions & 0 deletions torrent/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cenkalti/rain/internal/resolver"
"github.com/cenkalti/rain/internal/resourcemanager"
"github.com/cenkalti/rain/internal/resumer/boltdbresumer"
"github.com/cenkalti/rain/internal/semaphore"
"github.com/cenkalti/rain/internal/storage/filestorage"
"github.com/cenkalti/rain/internal/tracker"
"github.com/cenkalti/rain/internal/trackermanager"
Expand Down Expand Up @@ -68,6 +69,7 @@ type Session struct {

writesPerSecond metrics.EWMA
writeBytesPerSecond metrics.EWMA
semWrite *semaphore.Semaphore
}

// NewSession creates a new Session for downloading and seeding torrents.
Expand Down Expand Up @@ -183,6 +185,7 @@ func NewSession(cfg Config) (*Session, error) {
},
writesPerSecond: metrics.NewEWMA1(),
writeBytesPerSecond: metrics.NewEWMA1(),
semWrite: semaphore.New(int(cfg.ParallelWrites)),
}
ext, err := bitfield.NewBytes(c.extensions[:], 64)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions torrent/session_rpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ func (h *rpcHandler) GetSessionStats(args *rpctypes.GetSessionStatsRequest, repl
Uptime: int(s.Uptime / time.Second),
WritesPerSecond: s.WritesPerSecond,
WriteBytesPerSecond: s.WriteBytesPerSecond,
WritesActive: s.WritesActive,
WritesPending: s.WritesPending,
}
return nil
}
Expand Down
4 changes: 4 additions & 0 deletions torrent/session_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ type SessionStats struct {
Uptime time.Duration
WritesPerSecond int
WriteBytesPerSecond int
WritesActive int
WritesPending int
}

func (s *Session) Stats() SessionStats {
Expand Down Expand Up @@ -60,6 +62,8 @@ func (s *Session) Stats() SessionStats {
Uptime: time.Since(s.createdAt),
WritesPerSecond: int(s.writesPerSecond.Rate()),
WriteBytesPerSecond: int(s.writeBytesPerSecond.Rate()),
WritesActive: s.semWrite.Len(),
WritesPending: s.semWrite.Waiting(),
}
}

Expand Down
2 changes: 1 addition & 1 deletion torrent/torrent_messagehandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (t *torrent) handlePieceMessage(pm peer.PieceMessage) {
t.webseedPieceResultC.Suspend()

pw := piecewriter.New(piece, pe, pd.Buffer)
go pw.Run(t.pieceWriterResultC, t.doneC, t.session.writesPerSecond, t.session.writeBytesPerSecond)
go pw.Run(t.pieceWriterResultC, t.doneC, t.session.writesPerSecond, t.session.writeBytesPerSecond, t.session.semWrite)
}

func (t *torrent) handlePeerMessage(pm peer.Message) {
Expand Down
2 changes: 1 addition & 1 deletion torrent/torrent_webseed.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (t *torrent) handleWebseedPieceResult(msg *urldownloader.PieceResult) {
t.webseedPieceResultC.Suspend()

pw := piecewriter.New(piece, msg.Downloader, msg.Buffer)
go pw.Run(t.pieceWriterResultC, t.doneC, t.session.writesPerSecond, t.session.writeBytesPerSecond)
go pw.Run(t.pieceWriterResultC, t.doneC, t.session.writesPerSecond, t.session.writeBytesPerSecond, t.session.semWrite)

if msg.Done {
for _, src := range t.webseedSources {
Expand Down

0 comments on commit 2dcd4f2

Please sign in to comment.