diff --git a/watcher.go b/watcher.go index fb165f7..bb190cb 100644 --- a/watcher.go +++ b/watcher.go @@ -21,15 +21,15 @@ type aiocb struct { done chan OpResult } -// OpResult of operation +// OpResult is the result of an aysnc-io type OpResult struct { - Fd int + Fd int // related file descriptor to this result Buffer []byte // the original committed buffer - Size int - Err error + Size int // number of bytes sent or received + Err error // IO error } -// Watcher will monitor events and process Request(s) +// Watcher will monitor events and process async-io request(s), type Watcher struct { pfd *poller // poll fd @@ -41,7 +41,7 @@ type Watcher struct { chWriters chan aiocb // internal buffer for reading - swapbuffer chan []byte + swapBuffer chan []byte die chan struct{} dieOnce sync.Once @@ -51,7 +51,7 @@ type Watcher struct { connsLock sync.Mutex } -// CreateWatcher creates a management object for monitoring events of net.Conn +// CreateWatcher creates a management object for monitoring file descriptors func CreateWatcher(bufsize int) (*Watcher, error) { w := new(Watcher) pfd, err := openPoll() @@ -60,17 +60,20 @@ func CreateWatcher(bufsize int) (*Watcher, error) { } w.pfd = pfd - w.swapbuffer = make(chan []byte, 2) - for i := 0; i < cap(w.swapbuffer); i++ { - w.swapbuffer <- make([]byte, bufsize) + // swapBuffer for concurrent read + w.swapBuffer = make(chan []byte, 2) + for i := 0; i < cap(w.swapBuffer); i++ { + w.swapBuffer <- make([]byte, bufsize) } + // loop related chan w.chReadableNotify = make(chan int) w.chWritableNotify = make(chan int) w.chStopWatchNotify = make(chan int) w.chReaders = make(chan aiocb) w.chWriters = make(chan aiocb) + // hold net.Conn only w.conns = make(map[int]net.Conn) w.die = make(chan struct{}) @@ -80,15 +83,18 @@ func CreateWatcher(bufsize int) (*Watcher, error) { } // Close stops monitoring on events for all connections -func (w *Watcher) Close() error { +func (w *Watcher) Close() (err error) { w.dieOnce.Do(func() { close(w.die) + err = w.pfd.Close() }) - return w.pfd.Close() + return err } -// Watch starts watching events on connection `conn` +// Watch starts watching events on `conn`, and returns a file descriptor +// for following IO operations. func (w *Watcher) Watch(conn net.Conn) (fd int, err error) { + // get file descriptor c, ok := conn.(interface { SyscallConn() (syscall.RawConn, error) }) @@ -115,7 +121,7 @@ func (w *Watcher) Watch(conn net.Conn) (fd int, err error) { // poll this fd w.pfd.Watch(fd) - // prevent GC net.Conn + // prevent conn from GC w.connsLock.Lock() w.conns[fd] = conn w.connsLock.Unlock() @@ -135,7 +141,7 @@ func (w *Watcher) StopWatch(fd int) { } } -// Read submits a read requests and notify with done, cap(done) must 0 +// Read submits a read requests and notify with done, the cap(done) must be 0, i.e unbuffered chan. func (w *Watcher) Read(fd int, done chan OpResult) error { if cap(done) != 0 { return ErrBufferedChan @@ -148,7 +154,7 @@ func (w *Watcher) Read(fd int, done chan OpResult) error { } } -// Write submits a write requests and notify with done +// Write submits a write requests and notify with done, the cap(done) must be 0, i.e unbuffered chan. func (w *Watcher) Write(fd int, buf []byte, done chan OpResult) error { // do nothing if len(buf) == 0 { @@ -168,10 +174,10 @@ func (w *Watcher) Write(fd int, buf []byte, done chan OpResult) error { } // tryRead will try to read data on aiocb and notify -// returns true if io has completed, false means EAGAIN +// returns true if io has completed, false means not. func (w *Watcher) tryRead(pcb *aiocb) (complete bool) { - buf := <-w.swapbuffer - defer func() { w.swapbuffer <- buf }() + buf := <-w.swapBuffer + defer func() { w.swapBuffer <- buf }() nr, er := syscall.Read(pcb.fd, buf) if er == syscall.EAGAIN { @@ -189,10 +195,12 @@ func (w *Watcher) tryWrite(pcb *aiocb) (complete bool) { return false } + // if ew is nil, accumulate bytes written if ew == nil { pcb.size += nw } + // all bytes written or has error if pcb.size == len(pcb.buffer) || ew != nil { if pcb.done != nil { pcb.done <- OpResult{Fd: pcb.fd, Buffer: pcb.buffer, Size: nw, Err: ew} @@ -202,6 +210,7 @@ func (w *Watcher) tryWrite(pcb *aiocb) (complete bool) { return false } +// the core event loop of this watcher func (w *Watcher) loop() { pendingReaders := make(map[int][]aiocb) pendingWriters := make(map[int][]aiocb)