Skip to content

Commit

Permalink
added overlapped ring buffer impl, see the #5
Browse files Browse the repository at this point in the history
An overlapped ring buffer will overwrite the existing head element in putting new elem but the ring buffer is full.

The original ring buffer of ourselves is to return a wrong state and stop putting action, see the `ErrQueueFull`.

The new overlapped ring buffer will be created by calling `NewOverlappedRingBuffer()`.

Any issues are welcome.
  • Loading branch information
hedzr committed Aug 10, 2024
1 parent f5661e7 commit c415859
Show file tree
Hide file tree
Showing 5 changed files with 286 additions and 46 deletions.
79 changes: 57 additions & 22 deletions mpmc/rb.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,33 +9,68 @@ import (
"github.com/hedzr/go-ringbuf/v2/mpmc/state"
)

// New returns the RingBuffer object
// New returns the RingBuffer object.
//
// It returns ErrQueueFull when the queue is full and
// putting a new element.
func New[T any](capacity uint32, opts ...Opt[T]) (ringBuffer RingBuffer[T]) {
if isInitialized() {
return newRingBuffer(func(capacity uint32, opts ...Opt[T]) (ringBuffer RingBuffer[T]) {
size := roundUpToPower2(capacity)

rb := &ringBuf[T]{
data: make([]rbItem[T], size),
cap: size,
capModMask: size - 1, // = 2^n - 1
}

for _, opt := range opts {
opt(rb)
}
ringBuffer = rb
return
}, capacity, opts...)
}

// NewOverlappedRingBuffer initials a ring buffer, which overwrites
// its head element if it's full.
func NewOverlappedRingBuffer[T any](capacity uint32, opts ...Opt[T]) (ringBuffer RingBuffer[T]) {
return newRingBuffer(func(capacity uint32, opts ...Opt[T]) (ringBuffer RingBuffer[T]) {
size := roundUpToPower2(capacity)
rb := &orbuf[T]{
ringBuf[T]{
data: make([]rbItem[T], size),
cap: size,
capModMask: size - 1, // = 2^n - 1
},
}
for _, opt := range opts {
opt(rb)
opt(&rb.ringBuf)
}
ringBuffer = rb
return
}, capacity, opts...)
}

type Creator[T any] func(capacity uint32, opts ...Opt[T]) (ringBuffer RingBuffer[T])

func newRingBuffer[T any](creator Creator[T], capacity uint32, opts ...Opt[T]) (ringBuffer RingBuffer[T]) {
if isInitialized() {
ringBuffer = creator(capacity, opts...)

// ringBuffer = rb

// if rb.debugMode && rb.logger != nil {
// // rb.logger.Debug("[ringbuf][INI] ", zap.Uint32("cap", rb.cap), zap.Uint32("capModMask", rb.capModMask))
// for _, opt := range opts {
// opt(rb)
// }

for i := 0; i < int(size); i++ {
rb.data[i].readWrite &= 0 // bit 0: readable, bit 1: writable
if rb.initializer != nil {
rb.data[i].value = rb.initializer.PreAlloc(i)
}
}
// // if rb.debugMode && rb.logger != nil {
// // // rb.logger.Debug("[ringbuf][INI] ", zap.Uint32("cap", rb.cap), zap.Uint32("capModMask", rb.capModMask))
// // }

// for i := 0; i < int(size); i++ {
// rb.data[i].readWrite &= 0 // bit 0: readable, bit 1: writable
// if rb.initializer != nil {
// rb.data[i].value = rb.initializer.PreAlloc(i)
// }
// }
}
return
}
Expand All @@ -52,7 +87,7 @@ func WithItemInitializer[T any](initializeable Initializeable[T]) Opt[T] {

// WithDebugMode enables the internal debug mode for more logging output, and collect the metrics for debugging
func WithDebugMode[T any](_ bool) Opt[T] {
return func(buf *ringBuf[T]) {
return func(_ *ringBuf[T]) {
// buf.debugMode = debug
}
}
Expand All @@ -71,13 +106,13 @@ type ringBuf[T any] struct {
capModMask uint32
_ [CacheLinePadSize - 8]byte
head uint32
_ [CacheLinePadSize - 4]byte
_ [CacheLinePadSize - 4]byte //nolint:revive
tail uint32
_ [CacheLinePadSize - 4]byte
_ [CacheLinePadSize - 4]byte //nolint:revive
putWaits uint64
_ [CacheLinePadSize - 8]byte
_ [CacheLinePadSize - 8]byte //nolint:revive
getWaits uint64
_ [CacheLinePadSize - 8]byte
_ [CacheLinePadSize - 8]byte //nolint:revive
data []rbItem[T]
// debugMode bool
// logger log.Logger
Expand All @@ -93,9 +128,9 @@ type rbItem[T any] struct {
// _ cpu.CacheLinePad
}

func (rb *ringBuf[T]) Put(item T) (err error) { return rb.Enqueue(item) }
func (rb *ringBuf[T]) Put(item T) (err error) { return rb.Enqueue(item) } //nolint:revive

func (rb *ringBuf[T]) Enqueue(item T) (err error) {
func (rb *ringBuf[T]) Enqueue(item T) (err error) { //nolint:revive
var tail, head, nt uint32
var holder *rbItem[T]
for {
Expand Down Expand Up @@ -145,9 +180,9 @@ func (rb *ringBuf[T]) Enqueue(item T) (err error) {
}
}

func (rb *ringBuf[T]) Get() (item T, err error) { return rb.Dequeue() }
func (rb *ringBuf[T]) Get() (item T, err error) { return rb.Dequeue() } //nolint:revive

func (rb *ringBuf[T]) Dequeue() (item T, err error) {
func (rb *ringBuf[T]) Dequeue() (item T, err error) { //nolint:revive
var tail, head, nh uint32
var holder *rbItem[T]
for {
Expand Down
121 changes: 121 additions & 0 deletions mpmc/rb2.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package mpmc

import (
"runtime"
"sync/atomic"

"github.com/hedzr/go-ringbuf/v2/mpmc/state"
)

type orbuf[T any] struct {
ringBuf[T]
}

func (rb *orbuf[T]) Put(item T) (err error) { return rb.Enqueue(item) } //nolint:revive

func (rb *orbuf[T]) Enqueue(item T) (err error) { //nolint:revive
var tail, head, nt, nh uint32
var holder *rbItem[T]
for {
head = atomic.LoadUint32(&rb.head)
tail = atomic.LoadUint32(&rb.tail)
nt = (tail + 1) & rb.capModMask

isEmpty := head == tail
if isEmpty && head == MaxUint32 {
err = ErrQueueNotReady
return
}

isFull := nt == head
if isFull {
nh = (head + 1) & rb.capModMask
atomic.CompareAndSwapUint32(&rb.head, head, nh)
}

holder = &rb.data[tail]
atomic.CompareAndSwapUint32(&rb.tail, tail, nt)
retry:
if !atomic.CompareAndSwapUint64(&holder.readWrite, 0, 2) { //nolint:gomnd
if !atomic.CompareAndSwapUint64(&holder.readWrite, 1, 2) { //nolint:gomnd
if atomic.LoadUint64(&holder.readWrite) == 0 {
goto retry // sometimes, short circuit
}
runtime.Gosched() // time to time
continue
}
}

if rb.initializer != nil {
rb.initializer.CloneIn(item, &holder.value)
} else {
holder.value = item
}
if !atomic.CompareAndSwapUint64(&holder.readWrite, 2, 1) { //nolint:gomnd
err = ErrRaced // runtime.Gosched() // never happens
}

if state.VerboseEnabled {
state.Verbose("[W] enqueued",
"tail", tail, "new-tail", nt, "head", head, "value", toString(holder.value),
"value(rb.data[0])", toString(rb.data[0].value),
"value(rb.data[1])", toString(rb.data[1].value))
}
return
}
}

func (rb *orbuf[T]) Get() (item T, err error) { return rb.Dequeue() } //nolint:revive

func (rb *orbuf[T]) Dequeue() (item T, err error) { //nolint:revive
var tail, head, nh uint32
var holder *rbItem[T]
for {
// var quad uint64
// quad = atomic.LoadUint64((*uint64)(unsafe.Pointer(&rb.head)))
// head = (uint32)(quad & MaxUint32_64)
// tail = (uint32)(quad >> 32)
head = atomic.LoadUint32(&rb.head)
tail = atomic.LoadUint32(&rb.tail)

isEmpty := head == tail
if isEmpty {
if head == MaxUint32 {
err = ErrQueueNotReady
return
}
err = ErrQueueEmpty
return
}

holder = &rb.data[head]

nh = (head + 1) & rb.capModMask
atomic.CompareAndSwapUint32(&rb.head, head, nh)
retry:
if !atomic.CompareAndSwapUint64(&holder.readWrite, 1, 3) { //nolint:gomnd
if atomic.LoadUint64(&holder.readWrite) == 1 {
goto retry // sometimes, short circuit
}
runtime.Gosched() // time to time
continue
}

if rb.initializer != nil {
item = rb.initializer.CloneOut(&holder.value)
} else {
item = holder.value
// holder.value = zero
}
if !atomic.CompareAndSwapUint64(&holder.readWrite, 3, 0) { //nolint:gomnd
err = ErrRaced // runtime.Gosched() // never happens
}

if state.VerboseEnabled {
state.Verbose("[ringbuf][GET] states are:",
"cap", rb.Cap(), "qty", rb.qty(head, tail), "tail", tail, "head", head, "new-head", nh, "item", toString(item))
}

return
}
}
79 changes: 79 additions & 0 deletions mpmc/rb2_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package mpmc

import (
"errors"
"fmt"
"testing"
)

func TestOverlappedRingBuf_Put_OneByOne(t *testing.T) { //nolint:revive
var err error
rb := NewOverlappedRingBuffer(NLtd, WithDebugMode[uint32](true))
size := rb.Cap() - 1
// t.Logf("Ring Buffer created, capacity = %v, real size: %v", size+1, size)
defer rb.Close()

for i := uint32(0); i < size; i++ {
err = rb.Enqueue(i)
if err != nil {
t.Fatalf("faild on i=%v. err: %+v", i, err)
// } else {
// t.Logf("%5d. '%v' put, quantity = %v.", i, i, fast.Quantity())
}
t.Logf(" %3d. ringbuf elements -> %v", i, rb)
}

for i := size; i < size+size; i++ {
err = rb.Put(i)
if errors.Is(err, ErrQueueFull) {
t.Fatalf("> %3d. expect ErrQueueFull but no error raised. err: %+v", i, err)
}
t.Logf(" %3d. ringbuf elements -> %v", i, rb)
}

var it any
it, err = rb.Dequeue()
_, _ = it, err
t.Logf(" %3d. ringbuf elements -> %v", -1, rb)
it, err = rb.Dequeue()
_, _ = it, err
t.Logf(" %3d. ringbuf elements -> %v", -1, rb)
if fmt.Sprintf("%v", rb) != "[17,18,19,20,21,22,23,24,25,26,27,28,29,]/13" {
t.Fatalf("faild: expecting elements are: [17,18,19,20,21,22,23,24,25,26,27,28,29,]/13")
}

for i := size * 2; i < size*3+2; i++ {
err = rb.Put(i)
if errors.Is(err, ErrQueueFull) {
t.Fatalf("> %3d. expect ErrQueueFull but no error raised. err: %+v", i, err)
}
t.Logf(" %3d. ringbuf elements -> %v", i, rb)
}
if fmt.Sprintf("%v", rb) != "[32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,]/15" {
t.Fatalf("faild: expecting elements are: [32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,]/15")
}

for i := 0; ; i++ {
it, err = rb.Dequeue()
if err != nil {
if errors.Is(err, ErrQueueEmpty) {
break
}
t.Fatalf("faild on i=%v. err: %+v. item: %v", i, err, it)
// } else {
// t.Logf("< %3d. '%v' GOT, quantity = %v.", i, it, fast.Quantity())
}
if rb.Size() == 0 {
// t.Log("empty ring buffer elements")
if fmt.Sprintf("%v", rb) != "[]/0" {
t.Fatalf("faild: expecting elements are: []/0")
}
}
t.Logf(" %3d. ringbuf elements -> %v", i, rb)
}

it, err = rb.Dequeue()
if err == nil {
t.Fatalf("faild: Dequeue on an empty ringbuf should return an ErrQueueEmpty object.")
}
}
Loading

0 comments on commit c415859

Please sign in to comment.