-
-
Notifications
You must be signed in to change notification settings - Fork 628
/
peer-conn-msg-writer.go
141 lines (129 loc) · 3.34 KB
/
peer-conn-msg-writer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
package torrent
import (
"bytes"
"io"
"time"
"github.com/anacrolix/chansync"
"github.com/anacrolix/log"
"github.com/anacrolix/sync"
pp "github.com/anacrolix/torrent/peer_protocol"
)
func (pc *PeerConn) initMessageWriter() {
w := &pc.messageWriter
*w = peerConnMsgWriter{
fillWriteBuffer: func() {
pc.locker().Lock()
defer pc.locker().Unlock()
if pc.closed.IsSet() {
return
}
pc.fillWriteBuffer()
},
closed: &pc.closed,
logger: pc.logger,
w: pc.w,
keepAlive: func() bool {
pc.locker().RLock()
defer pc.locker().RUnlock()
return pc.useful()
},
writeBuffer: new(bytes.Buffer),
}
}
func (pc *PeerConn) startMessageWriter() {
pc.initMessageWriter()
go pc.messageWriterRunner()
}
func (pc *PeerConn) messageWriterRunner() {
defer pc.locker().Unlock()
defer pc.close()
defer pc.locker().Lock()
pc.messageWriter.run(pc.t.cl.config.KeepAliveTimeout)
}
type peerConnMsgWriter struct {
// Must not be called with the local mutex held, as it will call back into the write method.
fillWriteBuffer func()
closed *chansync.SetOnce
logger log.Logger
w io.Writer
keepAlive func() bool
mu sync.Mutex
writeCond chansync.BroadcastCond
// Pointer so we can swap with the "front buffer".
writeBuffer *bytes.Buffer
}
// Routine that writes to the peer. Some of what to write is buffered by
// activity elsewhere in the Client, and some is determined locally when the
// connection is writable.
func (cn *peerConnMsgWriter) run(keepAliveTimeout time.Duration) {
lastWrite := time.Now()
keepAliveTimer := time.NewTimer(keepAliveTimeout)
frontBuf := new(bytes.Buffer)
for {
if cn.closed.IsSet() {
return
}
cn.fillWriteBuffer()
keepAlive := cn.keepAlive()
cn.mu.Lock()
if cn.writeBuffer.Len() == 0 && time.Since(lastWrite) >= keepAliveTimeout && keepAlive {
cn.writeBuffer.Write(pp.Message{Keepalive: true}.MustMarshalBinary())
torrent.Add("written keepalives", 1)
}
if cn.writeBuffer.Len() == 0 {
writeCond := cn.writeCond.Signaled()
cn.mu.Unlock()
select {
case <-cn.closed.Done():
case <-writeCond:
case <-keepAliveTimer.C:
}
continue
}
// Flip the buffers.
frontBuf, cn.writeBuffer = cn.writeBuffer, frontBuf
cn.mu.Unlock()
if frontBuf.Len() == 0 {
panic("expected non-empty front buffer")
}
var err error
for frontBuf.Len() != 0 {
next := frontBuf.Bytes()
var n int
n, err = cn.w.Write(next)
frontBuf.Next(n)
if err == nil && n != len(next) {
panic("expected full write")
}
if err != nil {
break
}
}
if err != nil {
cn.logger.WithDefaultLevel(log.Debug).Printf("error writing: %v", err)
return
}
lastWrite = time.Now()
keepAliveTimer.Reset(keepAliveTimeout)
}
}
func (cn *peerConnMsgWriter) writeToBuffer(msg pp.Message) (err error) {
originalLen := cn.writeBuffer.Len()
defer func() {
if err != nil {
// Since an error occurred during buffer write, revert buffer to its original state before the write.
cn.writeBuffer.Truncate(originalLen)
}
}()
return msg.WriteTo(cn.writeBuffer)
}
func (cn *peerConnMsgWriter) write(msg pp.Message) bool {
cn.mu.Lock()
defer cn.mu.Unlock()
cn.writeToBuffer(msg)
cn.writeCond.Broadcast()
return !cn.writeBufferFull()
}
func (cn *peerConnMsgWriter) writeBufferFull() bool {
return cn.writeBuffer.Len() >= writeBufferHighWaterLen
}