Skip to content

Commit

Permalink
Merge pull request #56 from zhquzzuli/feat/support_arm
Browse files Browse the repository at this point in the history
support arm64
  • Loading branch information
zhquzzuli authored Aug 26, 2024
2 parents a55c797 + 8ab38be commit 4e116be
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 16 deletions.
18 changes: 7 additions & 11 deletions buffer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,7 @@ type bufferList struct {
// it points to the last free buffer, whose offset in bufferRegion
tail *uint32
capPerBuffer *uint32
pushCount *uint64
popCount *uint64
counter *int32
//underlying memory
bufferRegion []byte
bufferRegionOffsetInShm uint32
Expand Down Expand Up @@ -360,8 +359,7 @@ func createFreeBufferList(bufferNum, capPerBuffer uint32, mem []byte, offsetInMe
head: (*uint32)(unsafe.Pointer(&mem[offsetInMem+8])),
tail: (*uint32)(unsafe.Pointer(&mem[offsetInMem+12])),
capPerBuffer: (*uint32)(unsafe.Pointer(&mem[offsetInMem+16])),
pushCount: (*uint64)(unsafe.Pointer(&mem[offsetInMem+20])),
popCount: (*uint64)(unsafe.Pointer(&mem[offsetInMem+28])),
counter: (*int32)(unsafe.Pointer(&mem[offsetInMem+20])),
bufferRegion: mem[offsetInMem+bufferListHeaderSize : offsetInMem+atLeastSize],
bufferRegionOffsetInShm: offsetInMem + bufferListHeaderSize,
offsetInShm: offsetInMem,
Expand All @@ -371,8 +369,7 @@ func createFreeBufferList(bufferNum, capPerBuffer uint32, mem []byte, offsetInMe
*b.head = 0
*b.tail = (bufferNum - 1) * (capPerBuffer + bufferHeaderSize)
*b.capPerBuffer = capPerBuffer
*b.pushCount = 0
*b.popCount = 0
*b.counter = 0
internalLogger.infof("createFreeBufferList bufferNum:%d capPerBuffer:%d offsetInMem:%d needSize:%d bufferRegionLen:%d",
bufferNum, capPerBuffer, offsetInMem, atLeastSize, len(b.bufferRegion))

Expand Down Expand Up @@ -404,8 +401,7 @@ func mappingFreeBufferList(mem []byte, offset uint32) (*bufferList, error) {
head: (*uint32)(unsafe.Pointer(&mem[offset+8])),
tail: (*uint32)(unsafe.Pointer(&mem[offset+12])),
capPerBuffer: (*uint32)(unsafe.Pointer(&mem[offset+16])),
pushCount: (*uint64)(unsafe.Pointer(&mem[offset+20])),
popCount: (*uint64)(unsafe.Pointer(&mem[offset+28])),
counter: (*int32)(unsafe.Pointer(&mem[offset+24])),
offsetInShm: offset,
}
needSize := countBufferListMemSize(*b.cap, *b.capPerBuffer)
Expand Down Expand Up @@ -433,7 +429,7 @@ func (b *bufferList) pop() (*bufferSlice, error) {
h := bufferHeader(b.bufferRegion[oldHead : oldHead+bufferHeaderSize])
h.clearFlag()
h.setInUsed()
atomic.AddUint64(b.popCount, 1)
atomic.AddInt32(b.counter, 1)
return newBufferSlice(h,
b.bufferRegion[oldHead+bufferHeaderSize:oldHead+bufferHeaderSize+*b.capPerBuffer],
oldHead+b.bufferRegionOffsetInShm, true), nil
Expand All @@ -459,7 +455,7 @@ func (b *bufferList) push(buffer *bufferSlice) {
if atomic.CompareAndSwapUint32(b.tail, oldTail, newTail) {
bufferHeader(b.bufferRegion[oldTail : oldTail+bufferHeaderSize]).linkNext(newTail)
atomic.AddInt32(b.size, 1)
atomic.AddUint64(b.pushCount, 1)
atomic.AddInt32(b.counter, -1)
return
}
}
Expand Down Expand Up @@ -610,7 +606,7 @@ func (b *bufferManager) checkBufferReturned() bool {
if uint32(atomic.LoadInt32(l.size)) != atomic.LoadUint32(l.cap) {
return false
}
if atomic.LoadUint64(l.pushCount) != atomic.LoadUint64(l.popCount) {
if atomic.LoadInt32(l.counter) != 0 {
return false
}
}
Expand Down
12 changes: 12 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,19 @@ func VerifyConfig(config *Config) error {
return fmt.Errorf("BufferSliceSizes's Size:%d couldn't greater than ShareMemoryBufferCap:%d",
pair.Size, config.ShareMemoryBufferCap)
}

if isArmArch() && pair.Size%4 != 0 {
return fmt.Errorf("the SizePercentPair.Size must be a multiple of 4")
}
}
if sum != 100 {
return errors.New("the sum of BufferSliceSizes's Percent should be 100")
}

if isArmArch() && config.QueueCap%8 != 0 {
return fmt.Errorf("the QueueCap must be a multiple of 8")
}

if config.ShareMemoryPathPrefix == "" || config.QueuePath == "" {
return errors.New("buffer path or queue path could not be nil")
}
Expand All @@ -124,5 +132,9 @@ func VerifyConfig(config *Config) error {
return ErrOSNonSupported
}

if runtime.GOARCH != "amd64" && runtime.GOARCH != "arm64" {
return ErrArchNonSupported
}

return nil
}
3 changes: 3 additions & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ var (
//ErrOSNonSupported means that shmipc couldn't work in current OS. (only support Linux now)
ErrOSNonSupported = errors.New("shmipc just support linux OS now")

//ErrArchNonSupported means that shmipc only support amd64 and arm64
ErrArchNonSupported = errors.New("shmipc just support amd64 or arm64 arch")

//ErrHotRestartInProgress was returned by Listener.HotRestart when the Session had under the hot restart state
ErrHotRestartInProgress = errors.New("hot restart in progress, try again later")

Expand Down
18 changes: 18 additions & 0 deletions queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ func mappingQueueManagerMemfd(queuePathName string, memFd int) (*queueManager, e
}

mappingSize := int(fileInfo.Size)
//a queueManager have two queue, a queue's head and tail should align to 8 byte boundary
if isArmArch() && mappingSize%16 != 0 {
return nil, fmt.Errorf("the memory size of queue should be a multiple of 16")
}
mem, err := syscall.Mmap(memFd, 0, mappingSize, syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_SHARED)
if err != nil {
return nil, err
Expand All @@ -152,6 +156,10 @@ func mappingQueueManager(shmPath string) (*queueManager, error) {
}
mappingSize := int(fileInfo.Size())

//a queueManager have two queue, a queue's head and tail should align to 8 byte boundary
if isArmArch() && mappingSize%16 != 0 {
return nil, fmt.Errorf("the memory size of queue should be a multiple of 16")
}
mem, err := syscall.Mmap(int(f.Fd()), 0, mappingSize, syscall.PROT_READ|syscall.PROT_WRITE, syscall.MAP_SHARED)
if err != nil {
return nil, err
Expand Down Expand Up @@ -181,6 +189,16 @@ func mappingQueueFromBytes(data []byte) *queue {
cap := *(*uint32)(unsafe.Pointer(&data[0]))
queueStartOffset := queueHeaderLength
queueEndOffset := queueHeaderLength + cap*queueElementLen
if isArmArch() {
// align 8 byte boundary for head and tail
return &queue{
cap: int64(cap),
workingFlag: (*uint32)(unsafe.Pointer(&data[4])),
head: (*int64)(unsafe.Pointer(&data[8])),
tail: (*int64)(unsafe.Pointer(&data[16])),
queueBytesOnMemory: data[queueStartOffset:queueEndOffset],
}
}
return &queue{
cap: int64(cap),
head: (*int64)(unsafe.Pointer(&data[4])),
Expand Down
10 changes: 5 additions & 5 deletions stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func newClientServerWithNoCheck(conf *Config) (client *Session, server *Session)
// Close 94.4%
func TestStream_Close(t *testing.T) {
c := testConf()
c.QueueCap = 1
c.QueueCap = 8
client, server := newClientServerWithNoCheck(c)
defer server.Close()
defer client.Close()
Expand Down Expand Up @@ -312,14 +312,14 @@ func TestStream_HalfClose(t *testing.T) {

func TestStream_SendQueueFull(t *testing.T) {
conf := testConf()
conf.QueueCap = 1 // can only contain 1 queue elem(12B)
conf.QueueCap = 8 // can only contain 1 queue elem(12B)
client, server := testClientServerConfig(conf)
defer client.Close()
defer server.Close()

done := make(chan struct{})
dataSize := 10
mockDataLength := 50
mockDataLength := 500
mockData := make([][]byte, mockDataLength)
for i := range mockData {
mockData[i] = make([]byte, dataSize)
Expand Down Expand Up @@ -367,13 +367,13 @@ func TestStream_SendQueueFull(t *testing.T) {

func TestStream_SendQueueFullTimeout(t *testing.T) {
conf := testConf()
conf.QueueCap = 1 // can only contain 1 queue elem(12B)
conf.QueueCap = 8 // can only contain 1 queue elem(12B)
client, server := testClientServerConfig(conf)
defer client.Close()
defer server.Close()

dataSize := 10
mockDataLength := 5
mockDataLength := 500
mockData := make([][]byte, mockDataLength)
for i := range mockData {
mockData[i] = make([]byte, dataSize)
Expand Down
4 changes: 4 additions & 0 deletions util.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,3 +133,7 @@ func safeRemoveUdsFile(filename string) bool {

return true
}

func isArmArch() bool {
return runtime.GOARCH == "arm" || runtime.GOARCH == "arm64"
}

0 comments on commit 4e116be

Please sign in to comment.