Skip to content

Commit

Permalink
Merge pull request #33 from shengyanli1982/dev
Browse files Browse the repository at this point in the history
Optimize code structure to add Node object pool to LockFree CAS Queue.
  • Loading branch information
shengyanli1982 authored Jun 19, 2024
2 parents d94731f + f45a939 commit 2684bd7
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 64 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -678,4 +678,4 @@ done

**Results:**

![sync](examples/http/server/pics/asyncwriter.png)
![async](examples/http/server/pics/asyncwriter.png)
2 changes: 1 addition & 1 deletion README_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -677,4 +677,4 @@ done

**执行结果**

![sync](examples/http/server/pics/asyncwriter.png)
![async](examples/http/server/pics/asyncwriter.png)
8 changes: 0 additions & 8 deletions benchmark/asyncwriter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ var encoderCfg = zapcore.EncoderConfig{
func BenchmarkBlackHoleWriter(b *testing.B) {
w := xu.BlackHoleWriter{}

b.ReportAllocs()
b.ResetTimer()

for i := 0; i < b.N; i++ {
Expand All @@ -32,7 +31,6 @@ func BenchmarkBlackHoleWriter(b *testing.B) {
func BenchmarkBlackHoleWriterParallel(b *testing.B) {
w := xu.BlackHoleWriter{}

b.ReportAllocs()
b.ResetTimer()

b.RunParallel(func(pb *testing.PB) {
Expand All @@ -47,7 +45,6 @@ func BenchmarkLogAsyncWriter(b *testing.B) {
aw := x.NewWriteAsyncer(&w, nil)
defer aw.Stop()

b.ReportAllocs()
b.ResetTimer()

for i := 0; i < b.N; i++ {
Expand All @@ -61,7 +58,6 @@ func BenchmarkLogAsyncWriterParallel(b *testing.B) {
aw := x.NewWriteAsyncer(&w, nil)
defer aw.Stop()

b.ReportAllocs()
b.ResetTimer()

b.RunParallel(func(pb *testing.PB) {
Expand All @@ -78,7 +74,6 @@ func BenchmarkZapSyncWriter(b *testing.B) {
zapCore := zapcore.NewCore(zapcore.NewJSONEncoder(encoderCfg), zapSyncWriter, zapcore.DebugLevel)
zapLogger := zap.New(zapCore)

b.ReportAllocs()
b.ResetTimer()

for i := 0; i < b.N; i++ {
Expand All @@ -92,7 +87,6 @@ func BenchmarkZapSyncWriterParallel(b *testing.B) {
zapCore := zapcore.NewCore(zapcore.NewJSONEncoder(encoderCfg), zapSyncWriter, zapcore.DebugLevel)
zapLogger := zap.New(zapCore)

b.ReportAllocs()
b.ResetTimer()

b.RunParallel(func(pb *testing.PB) {
Expand All @@ -112,7 +106,6 @@ func BenchmarkZapAsyncWriter(b *testing.B) {
zapCore := zapcore.NewCore(zapcore.NewJSONEncoder(encoderCfg), zapAsyncWriter, zapcore.DebugLevel)
zapLogger := zap.New(zapCore)

b.ReportAllocs()
b.ResetTimer()

for i := 0; i < b.N; i++ {
Expand All @@ -130,7 +123,6 @@ func BenchmarkZapAsyncWriterParallel(b *testing.B) {
zapCore := zapcore.NewCore(zapcore.NewJSONEncoder(encoderCfg), zapAsyncWriter, zapcore.DebugLevel)
zapLogger := zap.New(zapCore)

b.ReportAllocs()
b.ResetTimer()

b.RunParallel(func(pb *testing.PB) {
Expand Down
49 changes: 36 additions & 13 deletions internal/lockfree/node.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package lockfree

import (
"sync/atomic"
"sync"
"unsafe"
)

Expand Down Expand Up @@ -37,18 +37,41 @@ func (n *Node) Reset() {
n.next = nil
}

// loadNode 函数用于加载指定指针 p 指向的 Node 结构体
// The loadNode function is used to load the Node struct pointed to by the specified pointer p
func loadNode(p *unsafe.Pointer) *Node {
// 使用 atomic.LoadPointer 加载并返回指定指针 p 指向的 Node 结构体
// Uses atomic.LoadPointer to load and return the Node struct pointed to by the specified pointer p
return (*Node)(atomic.LoadPointer(p))
// NodePool 是一个结构体,它包含一个同步池(sync.Pool)的指针。
// NodePool is a struct that contains a pointer to a sync.Pool.
type NodePool struct {
pool *sync.Pool
}

// compareAndSwapNode 函数用于比较并交换指定指针 p 指向的 Node 结构体
// The compareAndSwapNode function is used to compare and swap the Node struct pointed to by the specified pointer p
func compareAndSwapNode(p *unsafe.Pointer, old, new *Node) bool {
// 使用 atomic.CompareAndSwapPointer 比较并交换指定指针 p 指向的 Node 结构体
// Uses atomic.CompareAndSwapPointer to compare and swap the Node struct pointed to by the specified pointer p
return atomic.CompareAndSwapPointer(p, unsafe.Pointer(old), unsafe.Pointer(new))
// NewNodePool 是一个构造函数,它返回一个新的 NodePool 实例。
// NewNodePool is a constructor that returns a new instance of NodePool.
func NewNodePool() *NodePool {
return &NodePool{
// 在这里,我们初始化 sync.Pool,并提供一个函数来生成新的 Node 实例。
// Here, we initialize the sync.Pool and provide a function to generate new Node instances.
pool: &sync.Pool{
New: func() interface{} {
return NewNode(nil)
},
},
}
}

// Get 方法从 NodePool 中获取一个 Node 实例。
// The Get method retrieves a Node instance from the NodePool.
func (p *NodePool) Get() *Node {
// 我们从 sync.Pool 中获取一个对象,并将其转换为 Node 指针。
// We get an object from the sync.Pool and cast it to a Node pointer.
return p.pool.Get().(*Node)
}

// Put 方法将一个 Node 实例放回到 NodePool 中。
// The Put method puts a Node instance back into the NodePool.
func (p *NodePool) Put(n *Node) {
// 如果 Node 不为 nil,我们将其重置并放回到 sync.Pool 中。
// If the Node is not nil, we reset it and put it back into the sync.Pool.
if n != nil {
n.Reset()
p.pool.Put(n)
}
}
65 changes: 47 additions & 18 deletions internal/lockfree/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,29 @@ import (
"unsafe"
)

// loadNode 函数用于加载指定指针 p 指向的 Node 结构体
// The loadNode function is used to load the Node struct pointed to by the specified pointer p
func loadNode(p *unsafe.Pointer) *Node {
// 使用 atomic.LoadPointer 加载并返回指定指针 p 指向的 Node 结构体
// Uses atomic.LoadPointer to load and return the Node struct pointed to by the specified pointer p
return (*Node)(atomic.LoadPointer(p))
}

// compareAndSwapNode 函数用于比较并交换指定指针 p 指向的 Node 结构体
// The compareAndSwapNode function is used to compare and swap the Node struct pointed to by the specified pointer p
func compareAndSwapNode(p *unsafe.Pointer, old, new *Node) bool {
// 使用 atomic.CompareAndSwapPointer 比较并交换指定指针 p 指向的 Node 结构体
// Uses atomic.CompareAndSwapPointer to compare and swap the Node struct pointed to by the specified pointer p
return atomic.CompareAndSwapPointer(p, unsafe.Pointer(old), unsafe.Pointer(new))
}

// LockFreeQueue 是一个无锁队列结构体
// LockFreeQueue is a lock-free queue struct
type LockFreeQueue struct {
// pool 是一个 NodePool 结构体实例的指针
// pool is a pointer to an instance of the NodePool struct
pool *NodePool

// length 是队列的长度
// length is the length of the queue
length int64
Expand All @@ -21,27 +41,36 @@ type LockFreeQueue struct {
tail unsafe.Pointer
}

// NewLockFreeQueue 函数用于创建一个新的 LockFreeQueue 结构体实例
// The NewLockFreeQueue function is used to create a new instance of the LockFreeQueue struct
// NewLockFreeQueue 函数用于创建一个新的 LockFreeQueue 结构体实例
// The NewLockFreeQueue function is used to create a new instance of the LockFreeQueue struct.
func NewLockFreeQueue() *LockFreeQueue {
// 创建一个新的 Node 结构体实例
// Create a new Node struct instance
emptyNode := NewNode(nil)
// 创建一个新的 Node 结构体实例
// Create a new Node struct instance.
firstNode := NewNode(nil)

// 返回一个新的 LockFreeQueue 结构体实例,其中 head 和 tail 都指向 dummy 节点
// Returns a new instance of the LockFreeQueue struct, where both head and tail point to the dummy node
// 返回一个新的 LockFreeQueue 结构体实例
// Returns a new instance of the LockFreeQueue struct.
return &LockFreeQueue{
head: unsafe.Pointer(emptyNode),
tail: unsafe.Pointer(emptyNode),
// 创建一个新的 NodePool 实例,用于管理 Node 实例。
// Create a new NodePool instance for managing Node instances.
pool: NewNodePool(),

// 初始化 head 指针,指向我们刚刚创建的 Node 实例。
// Initialize the head pointer to point to the Node instance we just created.
head: unsafe.Pointer(firstNode),

// 初始化 tail 指针,也指向我们刚刚创建的 Node 实例。
// Initialize the tail pointer to also point to the Node instance we just created.
tail: unsafe.Pointer(firstNode),
}
}

// Push 方法用于将一个值添加到 LockFreeQueue 队列的末尾
// The Push method is used to add a value to the end of the LockFreeQueue queue
func (q *LockFreeQueue) Push(value interface{}) {
// 创建一个新的 Node 结构体实例
// Create a new Node struct instance
node := NewNode(nil)
// 从 NodePool 中获取一个新的 Node 实例
// Get a new Node instance from the NodePool
node := q.pool.Get()

// 将新节点的 value 字段设置为传入的值
// Set the value field of the new node to the passed in value
Expand Down Expand Up @@ -133,9 +162,9 @@ func (q *LockFreeQueue) Pop() interface{} {
// If successful, then decrease the length of the queue
atomic.AddInt64(&q.length, -1)

// 然后重置头节点
// Then reset the head node
head.Reset()
// 重置头节点,将其归还 NodePool
// Reset the head node and put it back into the NodePool
q.pool.Put(head)

// 如果结果不是空值,返回结果
// If the result is not an empty value, return the result
Expand Down Expand Up @@ -163,12 +192,12 @@ func (q *LockFreeQueue) Length() int64 {
func (q *LockFreeQueue) Reset() {
// 创建一个新的 Node 结构体实例
// Create a new Node struct instance
emptyNode := NewNode(nil)
fristNode := NewNode(nil)

// 将队列的头节点和尾节点都设置为新创建的节点
// Set both the head node and the tail node of the queue to the newly created node
q.head = unsafe.Pointer(emptyNode)
q.tail = unsafe.Pointer(emptyNode)
q.head = unsafe.Pointer(fristNode)
q.tail = unsafe.Pointer(fristNode)

// 使用 atomic.StoreInt64 函数将队列的长度设置为 0
// Use the atomic.StoreInt64 function to set the length of the queue to 0
Expand Down
18 changes: 9 additions & 9 deletions internal/writer/element.go → internal/writer/bufferpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ import (
"sync"
)

// ElementPool 是一个结构体,它包含一个同步池
// ElementPool is a struct that contains a sync pool
type ElementPool struct {
// BufferPool 是一个结构体,它包含一个同步池
// BufferPool is a struct that contains a sync pool
type BufferPool struct {
pool *sync.Pool
}

// NewElementPool 是一个函数,它创建并返回一个新的 elementPool
// NewElementPool is a function that creates and returns a new elementPool
func NewElementPool() *ElementPool {
// NewBufferPool 是一个函数,它创建并返回一个新的 elementPool
// NewBufferPool is a function that creates and returns a new elementPool
func NewBufferPool() *BufferPool {
// 创建一个新的同步池
// Create a new sync pool
pool := &sync.Pool{
Expand All @@ -26,18 +26,18 @@ func NewElementPool() *ElementPool {

// 返回一个新的 elementPool,它包含刚刚创建的同步池
// Return a new elementPool that contains the sync pool we just created
return &ElementPool{pool: pool}
return &BufferPool{pool: pool}
}

// Get 是一个方法,它从 elementPool 的同步池中获取一个 Element
// Get is a method that gets an Element from the sync pool of the elementPool
func (p *ElementPool) Get() *bytes.Buffer {
func (p *BufferPool) Get() *bytes.Buffer {
return p.pool.Get().(*bytes.Buffer)
}

// Put 是一个方法,它将一个 Element 放回 elementPool 的同步池中
// Put is a method that puts an Element back into the sync pool of the elementPool
func (p *ElementPool) Put(e *bytes.Buffer) {
func (p *BufferPool) Put(e *bytes.Buffer) {
// 如果 Element 不为空,则重置它并将其放回同步池中
// If the Element is not nil, reset it and put it back into the sync pool
if e != nil {
Expand Down
28 changes: 14 additions & 14 deletions writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ type WriteAsyncer struct {
// state is used to store the status of the write asyncer
state *wr.Status

// elementpool 用于存储元素池
// elementpool is used to store the element pool
elementpool *wr.ElementPool
// bufferpool 用于存储元素池
// bufferpool is used to store the element pool
bufferpool *wr.BufferPool
}

// NewWriteAsyncer 函数用于创建一个新的 WriteAsyncer 实例
Expand Down Expand Up @@ -116,7 +116,7 @@ func NewWriteAsyncer(writer io.Writer, conf *Config) *WriteAsyncer {

// 初始化元素池
// Initialize the element pool
elementpool: wr.NewElementPool(),
bufferpool: wr.NewBufferPool(),
}

// 创建一个新的 context.Context 实例,并设置一个取消函数
Expand Down Expand Up @@ -190,18 +190,18 @@ func (wa *WriteAsyncer) Write(p []byte) (n int, err error) {
}

// 从元素池中获取一个元素
// Get an elem from the elem pool
elem := wa.elementpool.Get()
// Get an buff from the buff pool
buff := wa.bufferpool.Get()

// 将数据设置到元素的 buffer 字段
// Set the data to the buffer field of the element
if n, err = elem.Write(p); err != nil {
if n, err = buff.Write(p); err != nil {
return
}

// 将元素添加到队列
// Add the element to the queue
wa.config.queue.Push(elem)
wa.config.queue.Push(buff)

// 返回数据的长度和 nil 错误
// Return the length of the data and a nil error
Expand Down Expand Up @@ -247,12 +247,12 @@ func (wa *WriteAsyncer) poller() {
for {
// 尝试从队列中弹出一个元素
// Try to pop an element from the queue
elem := wa.config.queue.Pop()
element := wa.config.queue.Pop()

// 如果元素不为空,执行 executeFunc 函数
// If the element is not null, execute the executeFunc function
if elem != nil {
wa.executeFunc(elem.(*bytes.Buffer))
if element != nil {
wa.executeFunc(element.(*bytes.Buffer))
} else {
select {
// 如果接收到 ctx.Done 的信号,那么结束循环
Expand Down Expand Up @@ -324,7 +324,7 @@ func (wa *WriteAsyncer) updateTimer() {

// executeFunc 方法用于执行 WriteAsyncer 的写入操作
// The executeFunc method is used to perform the write operation of the WriteAsyncer
func (wa *WriteAsyncer) executeFunc(elem *bytes.Buffer) {
func (wa *WriteAsyncer) executeFunc(buff *bytes.Buffer) {
// 获取当前的 Unix 毫秒时间
// Get the current Unix millisecond time
now := wa.timer.Load()
Expand All @@ -335,7 +335,7 @@ func (wa *WriteAsyncer) executeFunc(elem *bytes.Buffer) {

// content 是一个变量,它获取 elem 的缓冲区的字节
// content is a variable that gets the bytes of the buffer of elem
content := elem.Bytes()
content := buff.Bytes()

// 将元素的数据写入到 bufferedWriter
// Write the data of the element to the bufferedWriter
Expand All @@ -352,7 +352,7 @@ func (wa *WriteAsyncer) executeFunc(elem *bytes.Buffer) {

// 将 elem 放回到 elementpool 中
// Put elem back into the elementpool
wa.elementpool.Put(elem)
wa.bufferpool.Put(buff)
}

// cleanQueueToWriter 方法用于将队列中的所有数据写入到 writer
Expand Down

0 comments on commit 2684bd7

Please sign in to comment.