Skip to content

Commit

Permalink
adjusts comments
Browse files Browse the repository at this point in the history
  • Loading branch information
xtaci committed Dec 24, 2019
1 parent 6e39fe9 commit cc8b062
Showing 1 changed file with 28 additions and 19 deletions.
47 changes: 28 additions & 19 deletions watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ type aiocb struct {
done chan OpResult
}

// OpResult of operation
// OpResult is the result of an aysnc-io
type OpResult struct {
Fd int
Fd int // related file descriptor to this result
Buffer []byte // the original committed buffer
Size int
Err error
Size int // number of bytes sent or received
Err error // IO error
}

// Watcher will monitor events and process Request(s)
// Watcher will monitor events and process async-io request(s),
type Watcher struct {
pfd *poller // poll fd

Expand All @@ -41,7 +41,7 @@ type Watcher struct {
chWriters chan aiocb

// internal buffer for reading
swapbuffer chan []byte
swapBuffer chan []byte

die chan struct{}
dieOnce sync.Once
Expand All @@ -51,7 +51,7 @@ type Watcher struct {
connsLock sync.Mutex
}

// CreateWatcher creates a management object for monitoring events of net.Conn
// CreateWatcher creates a management object for monitoring file descriptors
func CreateWatcher(bufsize int) (*Watcher, error) {
w := new(Watcher)
pfd, err := openPoll()
Expand All @@ -60,17 +60,20 @@ func CreateWatcher(bufsize int) (*Watcher, error) {
}
w.pfd = pfd

w.swapbuffer = make(chan []byte, 2)
for i := 0; i < cap(w.swapbuffer); i++ {
w.swapbuffer <- make([]byte, bufsize)
// swapBuffer for concurrent read
w.swapBuffer = make(chan []byte, 2)
for i := 0; i < cap(w.swapBuffer); i++ {
w.swapBuffer <- make([]byte, bufsize)
}

// loop related chan
w.chReadableNotify = make(chan int)
w.chWritableNotify = make(chan int)
w.chStopWatchNotify = make(chan int)
w.chReaders = make(chan aiocb)
w.chWriters = make(chan aiocb)

// hold net.Conn only
w.conns = make(map[int]net.Conn)
w.die = make(chan struct{})

Expand All @@ -80,15 +83,18 @@ func CreateWatcher(bufsize int) (*Watcher, error) {
}

// Close stops monitoring on events for all connections
func (w *Watcher) Close() error {
func (w *Watcher) Close() (err error) {
w.dieOnce.Do(func() {
close(w.die)
err = w.pfd.Close()
})
return w.pfd.Close()
return err
}

// Watch starts watching events on connection `conn`
// Watch starts watching events on `conn`, and returns a file descriptor
// for following IO operations.
func (w *Watcher) Watch(conn net.Conn) (fd int, err error) {
// get file descriptor
c, ok := conn.(interface {
SyscallConn() (syscall.RawConn, error)
})
Expand All @@ -115,7 +121,7 @@ func (w *Watcher) Watch(conn net.Conn) (fd int, err error) {
// poll this fd
w.pfd.Watch(fd)

// prevent GC net.Conn
// prevent conn from GC
w.connsLock.Lock()
w.conns[fd] = conn
w.connsLock.Unlock()
Expand All @@ -135,7 +141,7 @@ func (w *Watcher) StopWatch(fd int) {
}
}

// Read submits a read requests and notify with done, cap(done) must 0
// Read submits a read requests and notify with done, the cap(done) must be 0, i.e unbuffered chan.
func (w *Watcher) Read(fd int, done chan OpResult) error {
if cap(done) != 0 {
return ErrBufferedChan
Expand All @@ -148,7 +154,7 @@ func (w *Watcher) Read(fd int, done chan OpResult) error {
}
}

// Write submits a write requests and notify with done
// Write submits a write requests and notify with done, the cap(done) must be 0, i.e unbuffered chan.
func (w *Watcher) Write(fd int, buf []byte, done chan OpResult) error {
// do nothing
if len(buf) == 0 {
Expand All @@ -168,10 +174,10 @@ func (w *Watcher) Write(fd int, buf []byte, done chan OpResult) error {
}

// tryRead will try to read data on aiocb and notify
// returns true if io has completed, false means EAGAIN
// returns true if io has completed, false means not.
func (w *Watcher) tryRead(pcb *aiocb) (complete bool) {
buf := <-w.swapbuffer
defer func() { w.swapbuffer <- buf }()
buf := <-w.swapBuffer
defer func() { w.swapBuffer <- buf }()

nr, er := syscall.Read(pcb.fd, buf)
if er == syscall.EAGAIN {
Expand All @@ -189,10 +195,12 @@ func (w *Watcher) tryWrite(pcb *aiocb) (complete bool) {
return false
}

// if ew is nil, accumulate bytes written
if ew == nil {
pcb.size += nw
}

// all bytes written or has error
if pcb.size == len(pcb.buffer) || ew != nil {
if pcb.done != nil {
pcb.done <- OpResult{Fd: pcb.fd, Buffer: pcb.buffer, Size: nw, Err: ew}
Expand All @@ -202,6 +210,7 @@ func (w *Watcher) tryWrite(pcb *aiocb) (complete bool) {
return false
}

// the core event loop of this watcher
func (w *Watcher) loop() {
pendingReaders := make(map[int][]aiocb)
pendingWriters := make(map[int][]aiocb)
Expand Down

0 comments on commit cc8b062

Please sign in to comment.