From f45a9397c6ddde7112bf2170019360ea95d39e0a Mon Sep 17 00:00:00 2001 From: shengyanli1982 Date: Wed, 19 Jun 2024 17:05:21 +0800 Subject: [PATCH] Optimize code structure to add Node object pool to LockFree CAS Queue. --- README.md | 2 +- README_CN.md | 2 +- benchmark/asyncwriter_test.go | 8 --- internal/lockfree/node.go | 49 ++++++++++---- internal/lockfree/queue.go | 65 ++++++++++++++----- internal/writer/{element.go => bufferpool.go} | 18 ++--- writer.go | 28 ++++---- 7 files changed, 108 insertions(+), 64 deletions(-) rename internal/writer/{element.go => bufferpool.go} (70%) diff --git a/README.md b/README.md index b21ee4c..53b963c 100644 --- a/README.md +++ b/README.md @@ -678,4 +678,4 @@ done **Results:** -![sync](examples/http/server/pics/asyncwriter.png) +![async](examples/http/server/pics/asyncwriter.png) diff --git a/README_CN.md b/README_CN.md index fefa624..c4b4be7 100644 --- a/README_CN.md +++ b/README_CN.md @@ -677,4 +677,4 @@ done **执行结果** -![sync](examples/http/server/pics/asyncwriter.png) +![async](examples/http/server/pics/asyncwriter.png) diff --git a/benchmark/asyncwriter_test.go b/benchmark/asyncwriter_test.go index 120daa9..4e759f1 100644 --- a/benchmark/asyncwriter_test.go +++ b/benchmark/asyncwriter_test.go @@ -21,7 +21,6 @@ var encoderCfg = zapcore.EncoderConfig{ func BenchmarkBlackHoleWriter(b *testing.B) { w := xu.BlackHoleWriter{} - b.ReportAllocs() b.ResetTimer() for i := 0; i < b.N; i++ { @@ -32,7 +31,6 @@ func BenchmarkBlackHoleWriter(b *testing.B) { func BenchmarkBlackHoleWriterParallel(b *testing.B) { w := xu.BlackHoleWriter{} - b.ReportAllocs() b.ResetTimer() b.RunParallel(func(pb *testing.PB) { @@ -47,7 +45,6 @@ func BenchmarkLogAsyncWriter(b *testing.B) { aw := x.NewWriteAsyncer(&w, nil) defer aw.Stop() - b.ReportAllocs() b.ResetTimer() for i := 0; i < b.N; i++ { @@ -61,7 +58,6 @@ func BenchmarkLogAsyncWriterParallel(b *testing.B) { aw := x.NewWriteAsyncer(&w, nil) defer aw.Stop() - b.ReportAllocs() b.ResetTimer() b.RunParallel(func(pb *testing.PB) { @@ -78,7 +74,6 @@ func BenchmarkZapSyncWriter(b *testing.B) { zapCore := zapcore.NewCore(zapcore.NewJSONEncoder(encoderCfg), zapSyncWriter, zapcore.DebugLevel) zapLogger := zap.New(zapCore) - b.ReportAllocs() b.ResetTimer() for i := 0; i < b.N; i++ { @@ -92,7 +87,6 @@ func BenchmarkZapSyncWriterParallel(b *testing.B) { zapCore := zapcore.NewCore(zapcore.NewJSONEncoder(encoderCfg), zapSyncWriter, zapcore.DebugLevel) zapLogger := zap.New(zapCore) - b.ReportAllocs() b.ResetTimer() b.RunParallel(func(pb *testing.PB) { @@ -112,7 +106,6 @@ func BenchmarkZapAsyncWriter(b *testing.B) { zapCore := zapcore.NewCore(zapcore.NewJSONEncoder(encoderCfg), zapAsyncWriter, zapcore.DebugLevel) zapLogger := zap.New(zapCore) - b.ReportAllocs() b.ResetTimer() for i := 0; i < b.N; i++ { @@ -130,7 +123,6 @@ func BenchmarkZapAsyncWriterParallel(b *testing.B) { zapCore := zapcore.NewCore(zapcore.NewJSONEncoder(encoderCfg), zapAsyncWriter, zapcore.DebugLevel) zapLogger := zap.New(zapCore) - b.ReportAllocs() b.ResetTimer() b.RunParallel(func(pb *testing.PB) { diff --git a/internal/lockfree/node.go b/internal/lockfree/node.go index 5cf0d3b..b17e520 100644 --- a/internal/lockfree/node.go +++ b/internal/lockfree/node.go @@ -1,7 +1,7 @@ package lockfree import ( - "sync/atomic" + "sync" "unsafe" ) @@ -37,18 +37,41 @@ func (n *Node) Reset() { n.next = nil } -// loadNode 函数用于加载指定指针 p 指向的 Node 结构体 -// The loadNode function is used to load the Node struct pointed to by the specified pointer p -func loadNode(p *unsafe.Pointer) *Node { - // 使用 atomic.LoadPointer 加载并返回指定指针 p 指向的 Node 结构体 - // Uses atomic.LoadPointer to load and return the Node struct pointed to by the specified pointer p - return (*Node)(atomic.LoadPointer(p)) +// NodePool 是一个结构体,它包含一个同步池(sync.Pool)的指针。 +// NodePool is a struct that contains a pointer to a sync.Pool. +type NodePool struct { + pool *sync.Pool } -// compareAndSwapNode 函数用于比较并交换指定指针 p 指向的 Node 结构体 -// The compareAndSwapNode function is used to compare and swap the Node struct pointed to by the specified pointer p -func compareAndSwapNode(p *unsafe.Pointer, old, new *Node) bool { - // 使用 atomic.CompareAndSwapPointer 比较并交换指定指针 p 指向的 Node 结构体 - // Uses atomic.CompareAndSwapPointer to compare and swap the Node struct pointed to by the specified pointer p - return atomic.CompareAndSwapPointer(p, unsafe.Pointer(old), unsafe.Pointer(new)) +// NewNodePool 是一个构造函数,它返回一个新的 NodePool 实例。 +// NewNodePool is a constructor that returns a new instance of NodePool. +func NewNodePool() *NodePool { + return &NodePool{ + // 在这里,我们初始化 sync.Pool,并提供一个函数来生成新的 Node 实例。 + // Here, we initialize the sync.Pool and provide a function to generate new Node instances. + pool: &sync.Pool{ + New: func() interface{} { + return NewNode(nil) + }, + }, + } +} + +// Get 方法从 NodePool 中获取一个 Node 实例。 +// The Get method retrieves a Node instance from the NodePool. +func (p *NodePool) Get() *Node { + // 我们从 sync.Pool 中获取一个对象,并将其转换为 Node 指针。 + // We get an object from the sync.Pool and cast it to a Node pointer. + return p.pool.Get().(*Node) +} + +// Put 方法将一个 Node 实例放回到 NodePool 中。 +// The Put method puts a Node instance back into the NodePool. +func (p *NodePool) Put(n *Node) { + // 如果 Node 不为 nil,我们将其重置并放回到 sync.Pool 中。 + // If the Node is not nil, we reset it and put it back into the sync.Pool. + if n != nil { + n.Reset() + p.pool.Put(n) + } } diff --git a/internal/lockfree/queue.go b/internal/lockfree/queue.go index a4f3701..108bb7f 100644 --- a/internal/lockfree/queue.go +++ b/internal/lockfree/queue.go @@ -5,9 +5,29 @@ import ( "unsafe" ) +// loadNode 函数用于加载指定指针 p 指向的 Node 结构体 +// The loadNode function is used to load the Node struct pointed to by the specified pointer p +func loadNode(p *unsafe.Pointer) *Node { + // 使用 atomic.LoadPointer 加载并返回指定指针 p 指向的 Node 结构体 + // Uses atomic.LoadPointer to load and return the Node struct pointed to by the specified pointer p + return (*Node)(atomic.LoadPointer(p)) +} + +// compareAndSwapNode 函数用于比较并交换指定指针 p 指向的 Node 结构体 +// The compareAndSwapNode function is used to compare and swap the Node struct pointed to by the specified pointer p +func compareAndSwapNode(p *unsafe.Pointer, old, new *Node) bool { + // 使用 atomic.CompareAndSwapPointer 比较并交换指定指针 p 指向的 Node 结构体 + // Uses atomic.CompareAndSwapPointer to compare and swap the Node struct pointed to by the specified pointer p + return atomic.CompareAndSwapPointer(p, unsafe.Pointer(old), unsafe.Pointer(new)) +} + // LockFreeQueue 是一个无锁队列结构体 // LockFreeQueue is a lock-free queue struct type LockFreeQueue struct { + // pool 是一个 NodePool 结构体实例的指针 + // pool is a pointer to an instance of the NodePool struct + pool *NodePool + // length 是队列的长度 // length is the length of the queue length int64 @@ -21,27 +41,36 @@ type LockFreeQueue struct { tail unsafe.Pointer } -// NewLockFreeQueue 函数用于创建一个新的 LockFreeQueue 结构体实例 -// The NewLockFreeQueue function is used to create a new instance of the LockFreeQueue struct +// NewLockFreeQueue 函数用于创建一个新的 LockFreeQueue 结构体实例。 +// The NewLockFreeQueue function is used to create a new instance of the LockFreeQueue struct. func NewLockFreeQueue() *LockFreeQueue { - // 创建一个新的 Node 结构体实例 - // Create a new Node struct instance - emptyNode := NewNode(nil) + // 创建一个新的 Node 结构体实例。 + // Create a new Node struct instance. + firstNode := NewNode(nil) - // 返回一个新的 LockFreeQueue 结构体实例,其中 head 和 tail 都指向 dummy 节点 - // Returns a new instance of the LockFreeQueue struct, where both head and tail point to the dummy node + // 返回一个新的 LockFreeQueue 结构体实例。 + // Returns a new instance of the LockFreeQueue struct. return &LockFreeQueue{ - head: unsafe.Pointer(emptyNode), - tail: unsafe.Pointer(emptyNode), + // 创建一个新的 NodePool 实例,用于管理 Node 实例。 + // Create a new NodePool instance for managing Node instances. + pool: NewNodePool(), + + // 初始化 head 指针,指向我们刚刚创建的 Node 实例。 + // Initialize the head pointer to point to the Node instance we just created. + head: unsafe.Pointer(firstNode), + + // 初始化 tail 指针,也指向我们刚刚创建的 Node 实例。 + // Initialize the tail pointer to also point to the Node instance we just created. + tail: unsafe.Pointer(firstNode), } } // Push 方法用于将一个值添加到 LockFreeQueue 队列的末尾 // The Push method is used to add a value to the end of the LockFreeQueue queue func (q *LockFreeQueue) Push(value interface{}) { - // 创建一个新的 Node 结构体实例 - // Create a new Node struct instance - node := NewNode(nil) + // 从 NodePool 中获取一个新的 Node 实例 + // Get a new Node instance from the NodePool + node := q.pool.Get() // 将新节点的 value 字段设置为传入的值 // Set the value field of the new node to the passed in value @@ -133,9 +162,9 @@ func (q *LockFreeQueue) Pop() interface{} { // If successful, then decrease the length of the queue atomic.AddInt64(&q.length, -1) - // 然后重置头节点 - // Then reset the head node - head.Reset() + // 重置头节点,将其归还 NodePool + // Reset the head node and put it back into the NodePool + q.pool.Put(head) // 如果结果不是空值,返回结果 // If the result is not an empty value, return the result @@ -163,12 +192,12 @@ func (q *LockFreeQueue) Length() int64 { func (q *LockFreeQueue) Reset() { // 创建一个新的 Node 结构体实例 // Create a new Node struct instance - emptyNode := NewNode(nil) + fristNode := NewNode(nil) // 将队列的头节点和尾节点都设置为新创建的节点 // Set both the head node and the tail node of the queue to the newly created node - q.head = unsafe.Pointer(emptyNode) - q.tail = unsafe.Pointer(emptyNode) + q.head = unsafe.Pointer(fristNode) + q.tail = unsafe.Pointer(fristNode) // 使用 atomic.StoreInt64 函数将队列的长度设置为 0 // Use the atomic.StoreInt64 function to set the length of the queue to 0 diff --git a/internal/writer/element.go b/internal/writer/bufferpool.go similarity index 70% rename from internal/writer/element.go rename to internal/writer/bufferpool.go index ade64d3..cdb1569 100644 --- a/internal/writer/element.go +++ b/internal/writer/bufferpool.go @@ -5,15 +5,15 @@ import ( "sync" ) -// ElementPool 是一个结构体,它包含一个同步池 -// ElementPool is a struct that contains a sync pool -type ElementPool struct { +// BufferPool 是一个结构体,它包含一个同步池 +// BufferPool is a struct that contains a sync pool +type BufferPool struct { pool *sync.Pool } -// NewElementPool 是一个函数,它创建并返回一个新的 elementPool -// NewElementPool is a function that creates and returns a new elementPool -func NewElementPool() *ElementPool { +// NewBufferPool 是一个函数,它创建并返回一个新的 elementPool +// NewBufferPool is a function that creates and returns a new elementPool +func NewBufferPool() *BufferPool { // 创建一个新的同步池 // Create a new sync pool pool := &sync.Pool{ @@ -26,18 +26,18 @@ func NewElementPool() *ElementPool { // 返回一个新的 elementPool,它包含刚刚创建的同步池 // Return a new elementPool that contains the sync pool we just created - return &ElementPool{pool: pool} + return &BufferPool{pool: pool} } // Get 是一个方法,它从 elementPool 的同步池中获取一个 Element // Get is a method that gets an Element from the sync pool of the elementPool -func (p *ElementPool) Get() *bytes.Buffer { +func (p *BufferPool) Get() *bytes.Buffer { return p.pool.Get().(*bytes.Buffer) } // Put 是一个方法,它将一个 Element 放回 elementPool 的同步池中 // Put is a method that puts an Element back into the sync pool of the elementPool -func (p *ElementPool) Put(e *bytes.Buffer) { +func (p *BufferPool) Put(e *bytes.Buffer) { // 如果 Element 不为空,则重置它并将其放回同步池中 // If the Element is not nil, reset it and put it back into the sync pool if e != nil { diff --git a/writer.go b/writer.go index dd9cc70..b601815 100644 --- a/writer.go +++ b/writer.go @@ -65,9 +65,9 @@ type WriteAsyncer struct { // state is used to store the status of the write asyncer state *wr.Status - // elementpool 用于存储元素池 - // elementpool is used to store the element pool - elementpool *wr.ElementPool + // bufferpool 用于存储元素池 + // bufferpool is used to store the element pool + bufferpool *wr.BufferPool } // NewWriteAsyncer 函数用于创建一个新的 WriteAsyncer 实例 @@ -116,7 +116,7 @@ func NewWriteAsyncer(writer io.Writer, conf *Config) *WriteAsyncer { // 初始化元素池 // Initialize the element pool - elementpool: wr.NewElementPool(), + bufferpool: wr.NewBufferPool(), } // 创建一个新的 context.Context 实例,并设置一个取消函数 @@ -190,18 +190,18 @@ func (wa *WriteAsyncer) Write(p []byte) (n int, err error) { } // 从元素池中获取一个元素 - // Get an elem from the elem pool - elem := wa.elementpool.Get() + // Get an buff from the buff pool + buff := wa.bufferpool.Get() // 将数据设置到元素的 buffer 字段 // Set the data to the buffer field of the element - if n, err = elem.Write(p); err != nil { + if n, err = buff.Write(p); err != nil { return } // 将元素添加到队列 // Add the element to the queue - wa.config.queue.Push(elem) + wa.config.queue.Push(buff) // 返回数据的长度和 nil 错误 // Return the length of the data and a nil error @@ -247,12 +247,12 @@ func (wa *WriteAsyncer) poller() { for { // 尝试从队列中弹出一个元素 // Try to pop an element from the queue - elem := wa.config.queue.Pop() + element := wa.config.queue.Pop() // 如果元素不为空,执行 executeFunc 函数 // If the element is not null, execute the executeFunc function - if elem != nil { - wa.executeFunc(elem.(*bytes.Buffer)) + if element != nil { + wa.executeFunc(element.(*bytes.Buffer)) } else { select { // 如果接收到 ctx.Done 的信号,那么结束循环 @@ -324,7 +324,7 @@ func (wa *WriteAsyncer) updateTimer() { // executeFunc 方法用于执行 WriteAsyncer 的写入操作 // The executeFunc method is used to perform the write operation of the WriteAsyncer -func (wa *WriteAsyncer) executeFunc(elem *bytes.Buffer) { +func (wa *WriteAsyncer) executeFunc(buff *bytes.Buffer) { // 获取当前的 Unix 毫秒时间 // Get the current Unix millisecond time now := wa.timer.Load() @@ -335,7 +335,7 @@ func (wa *WriteAsyncer) executeFunc(elem *bytes.Buffer) { // content 是一个变量,它获取 elem 的缓冲区的字节 // content is a variable that gets the bytes of the buffer of elem - content := elem.Bytes() + content := buff.Bytes() // 将元素的数据写入到 bufferedWriter // Write the data of the element to the bufferedWriter @@ -352,7 +352,7 @@ func (wa *WriteAsyncer) executeFunc(elem *bytes.Buffer) { // 将 elem 放回到 elementpool 中 // Put elem back into the elementpool - wa.elementpool.Put(elem) + wa.bufferpool.Put(buff) } // cleanQueueToWriter 方法用于将队列中的所有数据写入到 writer