From f763d6d7d37a153c0233910f3faad9ff931a5329 Mon Sep 17 00:00:00 2001 From: xtaci Date: Thu, 1 Aug 2024 17:04:31 +0800 Subject: [PATCH] optimize gc and add tests for GC --- aio_test.go | 73 +++++++++++++++++++++++++++++++++++++++++++++++++++++ watcher.go | 46 ++++++++++++++++++++++----------- 2 files changed, 104 insertions(+), 15 deletions(-) diff --git a/aio_test.go b/aio_test.go index 31e84f5..87d8ffa 100644 --- a/aio_test.go +++ b/aio_test.go @@ -971,3 +971,76 @@ func BenchmarkContextSwitch(b *testing.B) { } close(die) } + +func TestGC(t *testing.T) { + par := 1024 + msgsize := 65536 + t.Log("testing GC:", par, "connections") + ln := echoServer(t, msgsize) + defer ln.Close() + + w, err := NewWatcher() + if err != nil { + t.Fatal(err) + } + defer w.Close() + + for i := 0; i < par; i++ { + go func() { + data := make([]byte, msgsize) + conn, err := net.Dial("tcp", ln.Addr().String()) + if err != nil { + log.Fatal(err) + } + + // send + err = w.Write(nil, conn, data) + if err != nil { + log.Fatal(err) + } + + conn = nil + }() + } + + count := 0 +LOOP: + for { + results, err := w.WaitIO() + if err != nil { + t.Fatal("waitio:", err) + return + } + + for _, res := range results { + switch res.Operation { + case OpWrite: + case OpRead: + } + res.Conn = nil + + count++ + if count >= par { + break LOOP + } + } + } + + found, closed := w.GetGC() + t.Logf("GC found:%d closed:%d", found, closed) + <-time.After(2 * time.Second) + runtime.GC() + + found, closed = w.GetGC() + t.Logf("GC found:%d closed:%d", found, closed) + <-time.After(2 * time.Second) + runtime.GC() + + found, closed = w.GetGC() + t.Logf("GC found:%d closed:%d", found, closed) + <-time.After(2 * time.Second) + runtime.GC() + + found, closed = w.GetGC() + t.Logf("GC found:%d closed:%d", found, closed) +} diff --git a/watcher.go b/watcher.go index 511b7c5..41f51b2 100644 --- a/watcher.go +++ b/watcher.go @@ -96,9 +96,11 @@ type watcher struct { timer *time.Timer // Timer for handling timeouts // Garbage collection - gc []net.Conn // List of connections to be garbage collected + gc []uintptr // List of connections to be garbage collected gcMutex sync.Mutex // Mutex to synchronize access to the gc list gcNotify chan struct{} // Channel to notify the GC processor + gcFound uint32 // number of net.Conn objects found unreachable by runtime + gcClosed uint32 // record number of objects closed successfully // Shutdown and cleanup die chan struct{} // Channel for signaling shutdown @@ -571,19 +573,8 @@ func (w *watcher) loop() { break } } - - case <-w.gcNotify: // GC recycled net.Conn - w.gcMutex.Lock() - for i, c := range w.gc { - ptr := reflect.ValueOf(c).Pointer() - if ident, ok := w.connIdents[ptr]; ok { - w.releaseConn(ident) - } - w.gc[i] = nil - } - w.gc = w.gc[:0] - w.gcMutex.Unlock() - + case <-w.gcNotify: + w.handleGC() case cpuid := <-w.chCPUID: setAffinity(cpuid) @@ -593,6 +584,22 @@ func (w *watcher) loop() { } } +// handleGC processes the garbage collection of net.Conn objects. +func (w *watcher) handleGC() { + runtime.GC() + w.gcMutex.Lock() + if len(w.gc) > 0 { + for _, ptr := range w.gc { + if ident, ok := w.connIdents[ptr]; ok { + w.releaseConn(ident) + } + } + w.gcClosed += uint32(len(w.gc)) + w.gc = w.gc[:0] + } + w.gcMutex.Unlock() +} + // handlePending processes new requests, acting as a reception desk. func (w *watcher) handlePending(pending []*aiocb) { PENDING: @@ -641,7 +648,9 @@ PENDING: // if not it will never be GC-ed. runtime.SetFinalizer(pcb.conn, func(c net.Conn) { w.gcMutex.Lock() - w.gc = append(w.gc, c) + ptr := reflect.ValueOf(c).Pointer() + w.gc = append(w.gc, ptr) + w.gcFound++ w.gcMutex.Unlock() // notify gc processor @@ -741,3 +750,10 @@ func (w *watcher) handleEvents(events pollerEvents) { } } } + +// read gcFound & gcClosed +func (w *watcher) GetGC() (found uint32, closed uint32) { + w.gcMutex.Lock() + defer w.gcMutex.Unlock() + return w.gcFound, w.gcClosed +}