diff --git a/internal/piecewriter/piecewriter.go b/internal/piecewriter/piecewriter.go index d6908c69..d2a37bdb 100644 --- a/internal/piecewriter/piecewriter.go +++ b/internal/piecewriter/piecewriter.go @@ -5,6 +5,7 @@ import ( "github.com/cenkalti/rain/internal/bufferpool" "github.com/cenkalti/rain/internal/piece" + "github.com/cenkalti/rain/internal/semaphore" "github.com/rcrowley/go-metrics" ) @@ -25,12 +26,14 @@ func New(p *piece.Piece, source interface{}, buf bufferpool.Buffer) *PieceWriter } } -func (w *PieceWriter) Run(resultC chan *PieceWriter, closeC chan struct{}, writesPerSecond, writeBytesPerSecond metrics.EWMA) { +func (w *PieceWriter) Run(resultC chan *PieceWriter, closeC chan struct{}, writesPerSecond, writeBytesPerSecond metrics.EWMA, sem *semaphore.Semaphore) { w.HashOK = w.Piece.VerifyHash(w.Buffer.Data, sha1.New()) // nolint: gosec if w.HashOK { writesPerSecond.Update(1) writeBytesPerSecond.Update(int64(len(w.Buffer.Data))) + sem.Wait() _, w.Error = w.Piece.Data.Write(w.Buffer.Data) + sem.Signal() } select { case resultC <- w: diff --git a/internal/rpctypes/rpctypes.go b/internal/rpctypes/rpctypes.go index e1a23b0c..2bbb1d08 100644 --- a/internal/rpctypes/rpctypes.go +++ b/internal/rpctypes/rpctypes.go @@ -63,6 +63,8 @@ type SessionStats struct { Uptime int WritesPerSecond int WriteBytesPerSecond int + WritesActive int + WritesPending int } type Stats struct { diff --git a/torrent/config.go b/torrent/config.go index 6940658b..3ec4b47b 100644 --- a/torrent/config.go +++ b/torrent/config.go @@ -128,6 +128,8 @@ type Config struct { PieceCacheTTL time.Duration // Number of read operations to do in parallel. ParallelReads uint + // Number of write operations to do in parallel. + ParallelWrites uint // When the client want to connect a peer, first it tries to do encrypted handshake. // If it does not work, it connects to same peer again and does unencrypted handshake. @@ -223,6 +225,7 @@ var DefaultConfig = Config{ PieceCacheSize: 256 * 1024 * 1024, PieceCacheTTL: 5 * time.Minute, ParallelReads: 1, + ParallelWrites: 1, // Webseed settings WebseedDialTimeout: 10 * time.Second, diff --git a/torrent/session.go b/torrent/session.go index 31771c62..003f173b 100644 --- a/torrent/session.go +++ b/torrent/session.go @@ -21,6 +21,7 @@ import ( "github.com/cenkalti/rain/internal/resolver" "github.com/cenkalti/rain/internal/resourcemanager" "github.com/cenkalti/rain/internal/resumer/boltdbresumer" + "github.com/cenkalti/rain/internal/semaphore" "github.com/cenkalti/rain/internal/storage/filestorage" "github.com/cenkalti/rain/internal/tracker" "github.com/cenkalti/rain/internal/trackermanager" @@ -68,6 +69,7 @@ type Session struct { writesPerSecond metrics.EWMA writeBytesPerSecond metrics.EWMA + semWrite *semaphore.Semaphore } // NewSession creates a new Session for downloading and seeding torrents. @@ -183,6 +185,7 @@ func NewSession(cfg Config) (*Session, error) { }, writesPerSecond: metrics.NewEWMA1(), writeBytesPerSecond: metrics.NewEWMA1(), + semWrite: semaphore.New(int(cfg.ParallelWrites)), } ext, err := bitfield.NewBytes(c.extensions[:], 64) if err != nil { diff --git a/torrent/session_rpc_handler.go b/torrent/session_rpc_handler.go index 4ffade58..40a511cf 100644 --- a/torrent/session_rpc_handler.go +++ b/torrent/session_rpc_handler.go @@ -94,6 +94,8 @@ func (h *rpcHandler) GetSessionStats(args *rpctypes.GetSessionStatsRequest, repl Uptime: int(s.Uptime / time.Second), WritesPerSecond: s.WritesPerSecond, WriteBytesPerSecond: s.WriteBytesPerSecond, + WritesActive: s.WritesActive, + WritesPending: s.WritesPending, } return nil } diff --git a/torrent/session_stats.go b/torrent/session_stats.go index 673489b7..0b259e7f 100644 --- a/torrent/session_stats.go +++ b/torrent/session_stats.go @@ -26,6 +26,8 @@ type SessionStats struct { Uptime time.Duration WritesPerSecond int WriteBytesPerSecond int + WritesActive int + WritesPending int } func (s *Session) Stats() SessionStats { @@ -60,6 +62,8 @@ func (s *Session) Stats() SessionStats { Uptime: time.Since(s.createdAt), WritesPerSecond: int(s.writesPerSecond.Rate()), WriteBytesPerSecond: int(s.writeBytesPerSecond.Rate()), + WritesActive: s.semWrite.Len(), + WritesPending: s.semWrite.Waiting(), } } diff --git a/torrent/torrent_messagehandler.go b/torrent/torrent_messagehandler.go index 4318b783..5c793e63 100644 --- a/torrent/torrent_messagehandler.go +++ b/torrent/torrent_messagehandler.go @@ -107,7 +107,7 @@ func (t *torrent) handlePieceMessage(pm peer.PieceMessage) { t.webseedPieceResultC.Suspend() pw := piecewriter.New(piece, pe, pd.Buffer) - go pw.Run(t.pieceWriterResultC, t.doneC, t.session.writesPerSecond, t.session.writeBytesPerSecond) + go pw.Run(t.pieceWriterResultC, t.doneC, t.session.writesPerSecond, t.session.writeBytesPerSecond, t.session.semWrite) } func (t *torrent) handlePeerMessage(pm peer.Message) { diff --git a/torrent/torrent_webseed.go b/torrent/torrent_webseed.go index c1386f75..7b59e603 100644 --- a/torrent/torrent_webseed.go +++ b/torrent/torrent_webseed.go @@ -43,7 +43,7 @@ func (t *torrent) handleWebseedPieceResult(msg *urldownloader.PieceResult) { t.webseedPieceResultC.Suspend() pw := piecewriter.New(piece, msg.Downloader, msg.Buffer) - go pw.Run(t.pieceWriterResultC, t.doneC, t.session.writesPerSecond, t.session.writeBytesPerSecond) + go pw.Run(t.pieceWriterResultC, t.doneC, t.session.writesPerSecond, t.session.writeBytesPerSecond, t.session.semWrite) if msg.Done { for _, src := range t.webseedSources {