diff --git a/aio_linux.go b/aio_linux.go index dd74be5..1b6561f 100644 --- a/aio_linux.go +++ b/aio_linux.go @@ -88,7 +88,6 @@ func (p *poller) Wait(chEventNotify chan pollerEvents, die chan struct{}) { return } - // note chan swap must not continue unexpected pe := swapEvents[swapIdx] pe = pe[:0] swapIdx = (swapIdx + 1) % len(swapEvents) diff --git a/global.go b/global.go index ec82961..a369af3 100644 --- a/global.go +++ b/global.go @@ -31,7 +31,7 @@ func WaitIO() (r []OpResult, err error) { return defaultWatcher.WaitIO() } -// Read submits an async read request on 'fd' with context 'ctx', using buffer 'buf' +// Read submits an async read request on 'fd' with context 'ctx', using buffer 'buf'. // 'buf' can be set to nil to use internal buffer. // 'ctx' is the user-defined value passed through the gaio watcher unchanged. func Read(ctx interface{}, conn net.Conn, buf []byte) error { diff --git a/watcher.go b/watcher.go index ff82a9f..347136c 100644 --- a/watcher.go +++ b/watcher.go @@ -47,7 +47,7 @@ const ( opDelete ) -// aiocb contains all info for a request +// aiocb contains all info for a single request type aiocb struct { l *list.List // list where this request belongs to elem *list.Element @@ -69,7 +69,7 @@ const ( fdWrite = 2 ) -// fdDesc contains all info related to fd +// fdDesc contains all data structures associated to fd type fdDesc struct { status byte // fd read/write status readers list.List // all read/write requests @@ -122,7 +122,7 @@ type Watcher struct { dieOnce sync.Once } -// NewWatcher creates a management object for monitoring file descriptors +// NewWatcher creates a management object for monitoring file descriptors. // 'bufsize' sets the internal swap buffer size for Read() with nil. func NewWatcherSize(bufsize int) (*Watcher, error) { w := new(Watcher) @@ -150,7 +150,7 @@ func NewWatcherSize(bufsize int) (*Watcher, error) { w.swapResults[i] = make([]OpResult, 0, maxEvents) } - // finalizer for system resources + // watcher finalizer for system resources runtime.SetFinalizer(w, func(w *Watcher) { close(w.die) w.pfd.Close() @@ -179,7 +179,7 @@ func (w *Watcher) notifyPending() { } } -// WaitIO blocks until any read/write completion, or error +// WaitIO blocks until any read/write completion, or error. func (w *Watcher) WaitIO() (r []OpResult, err error) { select { case r := <-w.chNotifyCompletion: @@ -318,11 +318,13 @@ func (w *Watcher) loop() { gc := make(chan uintptr) // for timeout operations - // aiocb has non-zero deadline exists in timeouts & queue - // at same time or in neither of them + // aiocb has non-zero deadline exists + // in bothtimeouts & queue at any time + // or in neither of them. timer := time.NewTimer(0) var timeouts timedHeap + // release connection related resources releaseConn := func(ident int) { if desc, ok := descs[ident]; ok { // delete from heap @@ -347,7 +349,7 @@ func (w *Watcher) loop() { } } - // release all resources + // defer function to release all resources defer func() { for ident := range descs { releaseConn(ident) @@ -376,7 +378,7 @@ func (w *Watcher) loop() { continue } - // new conn + // handling new connection var desc *fdDesc if ok { desc = descs[ident] @@ -392,7 +394,7 @@ func (w *Watcher) loop() { // assign idents ident = dupfd - // unexpected situation, should notify caller + // unexpected situation, should notify caller if we cannot dup(2) werr := w.pfd.Watch(ident) if werr != nil { select { @@ -403,7 +405,7 @@ func (w *Watcher) loop() { continue } - // bindings + // file description bindings desc = &fdDesc{ptr: pcb.ptr} descs[ident] = desc connIdents[pcb.ptr] = ident @@ -411,9 +413,9 @@ func (w *Watcher) loop() { // close the original connection pcb.conn.Close() - // the conn is still useful for GC finalizer - // note finalizer function cannot hold reference to net.Conn - // if not it will never be GC-ed + // the conn is still useful for GC finalizer. + // note finalizer function cannot hold reference to net.Conn, + // if not it will never be GC-ed. runtime.SetFinalizer(pcb.conn, func(c net.Conn) { select { case gc <- reflect.ValueOf(c).Pointer(): @@ -426,6 +428,7 @@ func (w *Watcher) loop() { // operations splitted into different buckets switch pcb.op { case OpRead: + // try immediately if readable/writable if desc.readers.Len() == 0 && desc.status&fdRead > 0 { if w.tryRead(ident, pcb) { select { @@ -441,6 +444,7 @@ func (w *Watcher) loop() { desc.status &^= fdRead } } + // enqueue for poller events pcb.l = &desc.readers pcb.elem = pcb.l.PushBack(pcb) case OpWrite: @@ -463,7 +467,7 @@ func (w *Watcher) loop() { pcb.elem = pcb.l.PushBack(pcb) } - // timer + // push to heap for timeout operation if !pcb.deadline.IsZero() { heap.Push(&timeouts, pcb) if timeouts.Len() == 1 { @@ -472,7 +476,7 @@ func (w *Watcher) loop() { } } pending = pending[:0] - case pe := <-w.chEventNotify: + case pe := <-w.chEventNotify: // poller events // suppose fd(s) being polled is closed by conn.Close() from outside after chanrecv, // and a new conn has re-opened with the same handler number(fd). The read and write // on this fd is fatal. @@ -494,7 +498,7 @@ func (w *Watcher) loop() { pcb := elem.Value.(*aiocb) if w.tryRead(e.ident, pcb) { results = append(results, OpResult{Operation: OpRead, Conn: pcb.conn, Buffer: pcb.buffer, Size: pcb.size, Error: pcb.err, Context: pcb.ctx}) - // for shared memory, we need to notify WaitIO immediately + // for requests using internal swap buffer, we need to notify WaitIO immediately if pcb.useSwap { select { case w.chNotifyCompletion <- results: @@ -558,7 +562,7 @@ func (w *Watcher) loop() { } } - case <-timer.C: + case <-timer.C: // timeout heap for timeouts.Len() > 0 { now := time.Now() pcb := timeouts[0]