Skip to content

Commit

Permalink
(update): Use sync.atomic to replace some sync.RWMutex and `sync.…
Browse files Browse the repository at this point in the history
…Mutex`.
  • Loading branch information
zishang520 committed May 29, 2024
1 parent 9489600 commit 53935e8
Show file tree
Hide file tree
Showing 6 changed files with 120 additions and 141 deletions.
20 changes: 10 additions & 10 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ go 1.22.2

require (
github.com/mitchellh/mapstructure v1.5.0
github.com/zishang520/engine.io-go-parser v1.2.4
github.com/zishang520/engine.io/v2 v2.0.8
github.com/zishang520/engine.io-go-parser v1.2.5
github.com/zishang520/engine.io/v2 v2.1.0
)

require (
Expand All @@ -15,17 +15,17 @@ require (
github.com/gorilla/websocket v1.5.1 // indirect
github.com/onsi/ginkgo/v2 v2.12.0 // indirect
github.com/quic-go/qpack v0.4.0 // indirect
github.com/quic-go/quic-go v0.43.0 // indirect
github.com/quic-go/quic-go v0.44.0 // indirect
github.com/quic-go/webtransport-go v0.8.0 // indirect
github.com/vmihailenco/msgpack/v5 v5.4.1 // indirect
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
github.com/xo/terminfo v0.0.0-20210125001918-ca9a967f8778 // indirect
go.uber.org/mock v0.4.0 // indirect
golang.org/x/crypto v0.21.0 // indirect
golang.org/x/exp v0.0.0-20230817173708-d852ddb80c63 // indirect
golang.org/x/mod v0.12.0 // indirect
golang.org/x/net v0.23.0 // indirect
golang.org/x/sys v0.18.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/tools v0.12.1-0.20230815132531-74c255bcf846 // indirect
golang.org/x/crypto v0.23.0 // indirect
golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 // indirect
golang.org/x/mod v0.17.0 // indirect
golang.org/x/net v0.25.0 // indirect
golang.org/x/sys v0.20.0 // indirect
golang.org/x/text v0.15.0 // indirect
golang.org/x/tools v0.21.0 // indirect
)
46 changes: 24 additions & 22 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEe
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572/go.mod h1:9Pwr4B2jHnOSGXyyzV8ROjYa2ojvAY6HCGYYfMoC3Ls=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/pprof v0.0.0-20230821062121-407c9e7a662f h1:pDhu5sgp8yJlEF/g6osliIIpF9K4F5jvkULXa4daRDQ=
github.com/google/pprof v0.0.0-20230821062121-407c9e7a662f/go.mod h1:czg5+yv1E0ZGTi6S6vVK1mke0fV+FaUhNGcd6VRS9Ik=
github.com/gookit/color v1.5.4 h1:FZmqs7XOyGgCAxmWyPslpiok1k05wmY3SJTytgvYFs0=
Expand All @@ -27,8 +27,8 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/quic-go/qpack v0.4.0 h1:Cr9BXA1sQS2SmDUWjSofMPNKmvF6IiIfDRmgU0w1ZCo=
github.com/quic-go/qpack v0.4.0/go.mod h1:UZVnYIfi5GRk+zI9UMaCPsmZ2xKJP7XBUvVyT1Knj9A=
github.com/quic-go/quic-go v0.43.0 h1:sjtsTKWX0dsHpuMJvLxGqoQdtgJnbAPWY+W+5vjYW/g=
github.com/quic-go/quic-go v0.43.0/go.mod h1:132kz4kL3F9vxhW3CtQJLDVwcFe5wdWeJXXijhsO57M=
github.com/quic-go/quic-go v0.44.0 h1:So5wOr7jyO4vzL2sd8/pD9Kesciv91zSk8BoFngItQ0=
github.com/quic-go/quic-go v0.44.0/go.mod h1:z4cx/9Ny9UtGITIPzmPTXh1ULfOyWh4qGQlpnPcWmek=
github.com/quic-go/webtransport-go v0.8.0 h1:HxSrwun11U+LlmwpgM1kEqIqH90IT4N8auv/cD7QFJg=
github.com/quic-go/webtransport-go v0.8.0/go.mod h1:N99tjprW432Ut5ONql/aUhSLT0YVSlwHohQsuac9WaM=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand All @@ -41,28 +41,30 @@ github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAh
github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
github.com/xo/terminfo v0.0.0-20210125001918-ca9a967f8778 h1:QldyIu/L63oPpyvQmHgvgickp1Yw510KJOqX7H24mg8=
github.com/xo/terminfo v0.0.0-20210125001918-ca9a967f8778/go.mod h1:2MuV+tbUrU1zIOPMxZ5EncGwgmMJsa+9ucAQZXxsObs=
github.com/zishang520/engine.io-go-parser v1.2.4 h1:37h7Mt3Msc3aqub6hl+mYlG6wB81O4zcynrZQIjG41s=
github.com/zishang520/engine.io-go-parser v1.2.4/go.mod h1:G1DciRIGH4/S7x01DIdZQaXrk09ZeRgEw5e/Z9ms4Is=
github.com/zishang520/engine.io/v2 v2.0.8 h1:84rkbpWPzblAMj62uYsaD+XuZQTJTempSTCaxzemNSA=
github.com/zishang520/engine.io/v2 v2.0.8/go.mod h1:z9wFZLzqW1ykzWA84jt//1x0dQjMSim1G3SzIPovdHw=
github.com/zishang520/engine.io-go-parser v1.2.5 h1:Disf4rvNQzDsgoC+3yuwuFx5A7JNWlPp+QLUW32WDtc=
github.com/zishang520/engine.io-go-parser v1.2.5/go.mod h1:G1DciRIGH4/S7x01DIdZQaXrk09ZeRgEw5e/Z9ms4Is=
github.com/zishang520/engine.io/v2 v2.1.0 h1:dh3O7OcAfqfhg7AhqlqPRM/6pfdAcoRlEmNbe2wv8qE=
github.com/zishang520/engine.io/v2 v2.1.0/go.mod h1:FnXtT+k/6g2uOb9MpqY71DhV7COwlCH5DCbczn6Q3K8=
go.uber.org/mock v0.4.0 h1:VcM4ZOtdbR4f6VXfiOpwpVJDL6lCReaZ6mw31wqh7KU=
go.uber.org/mock v0.4.0/go.mod h1:a6FSlNadKUHUa9IP5Vyt1zh4fC7uAwxMutEAscFbkZc=
golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA=
golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs=
golang.org/x/exp v0.0.0-20230817173708-d852ddb80c63 h1:m64FZMko/V45gv0bNmrNYoDEq8U5YUhetc9cBWKS1TQ=
golang.org/x/exp v0.0.0-20230817173708-d852ddb80c63/go.mod h1:0v4NqG35kSWCMzLaMeX+IQrlSnVE/bqGSyC2cz/9Le8=
golang.org/x/mod v0.12.0 h1:rmsUpXtvNzj340zd98LZ4KntptpfRHwpFOHG188oHXc=
golang.org/x/mod v0.12.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs=
golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/crypto v0.23.0 h1:dIJU/v2J8Mdglj/8rJ6UUOM3Zc9zLZxVZwwxMooUSAI=
golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8=
golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 h1:vr/HnozRka3pE4EsMEg1lgkXJkTFJCVUX+S/ZT6wYzM=
golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842/go.mod h1:XtvwrStGgqGPLc4cjQfWqZHG1YFdYs6swckp8vpsjnc=
golang.org/x/mod v0.17.0 h1:zY54UmvipHiNd+pm+m0x9KhZ9hl1/7QNMyxXbc6ICqA=
golang.org/x/mod v0.17.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac=
golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y=
golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk=
golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/tools v0.12.1-0.20230815132531-74c255bcf846 h1:Vve/L0v7CXXuxUmaMGIEK/dEeq7uiqb5qBgQrZzIE7E=
golang.org/x/tools v0.12.1-0.20230815132531-74c255bcf846/go.mod h1:Sc0INKfu04TlqNoRA1hgpFZbhYXHPr4V5DzpSBTPqQM=
golang.org/x/tools v0.21.0 h1:qc0xYgIbsSDt9EyWz05J5wfa7LOVW0YTLOXrqdLAWIw=
golang.org/x/tools v0.21.0/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk=
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
67 changes: 35 additions & 32 deletions parser/binary.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

type Placeholder struct {
Placeholder bool `json:"_placeholder" mapstructure:"_placeholder" msgpack:"_placeholder"`
Num int `json:"num" mapstructure:"num" msgpack:"num"`
Num int `json:"num" mapstructure:"num" msgpack:"num"`
}

// Replaces every io.Reader | []byte in packet with a numbered placeholder.
Expand Down Expand Up @@ -41,37 +41,41 @@ func _deconstructPacket(data any, buffers *[]types.BufferInterface) any {
}
*buffers = append(*buffers, rdata)
return _placeholder
} else {
switch tdata := data.(type) {
case []any:
newData := make([]any, 0, len(tdata))
for _, v := range tdata {
newData = append(newData, _deconstructPacket(v, buffers))
}
return newData
case map[string]any:
newData := map[string]any{}
for k, v := range tdata {
newData[k] = _deconstructPacket(v, buffers)
}
return newData
}

switch tdata := data.(type) {
case []any:
newData := make([]any, 0, len(tdata))
for _, v := range tdata {
newData = append(newData, _deconstructPacket(v, buffers))
}
return newData
case map[string]any:
newData := map[string]any{}
for k, v := range tdata {
newData[k] = _deconstructPacket(v, buffers)
}
return newData
default:
return data
}
return data
}

// Reconstructs a binary packet from its placeholder packet and buffers
func ReconstructPacket(data *Packet, buffers []types.BufferInterface) (_ *Packet, err error) {
data.Data, err = _reconstructPacket(data.Data, &buffers)
data.Attachments = nil // no longer useful
return data, nil
func ReconstructPacket(packet *Packet, buffers []types.BufferInterface) (*Packet, error) {
data, err := _reconstructPacket(packet.Data, &buffers)
if err != nil {
return nil, err
}
packet.Data = data
packet.Attachments = nil // Attachments are no longer needed
return packet, nil
}

func _reconstructPacket(data any, buffers *[]types.BufferInterface) (any, error) {
if data == nil {
return nil, nil
}
switch d := data.(type) {
case nil:
return nil, nil
case []any:
newData := make([]any, 0, len(d))
for _, v := range d {
Expand All @@ -83,16 +87,14 @@ func _reconstructPacket(data any, buffers *[]types.BufferInterface) (any, error)
}
return newData, nil
case map[string]any:
var _placeholder *Placeholder
if mapstructure.Decode(d, &_placeholder) == nil {
if _placeholder.Placeholder {
if _placeholder.Num >= 0 && _placeholder.Num < len(*buffers) {
return (*buffers)[_placeholder.Num], nil // appropriate buffer (should be natural order anyway)
}
return nil, errors.New("illegal attachments")
var _placeholder Placeholder
if mapstructure.Decode(d, &_placeholder) == nil && _placeholder.Placeholder {
if _placeholder.Num >= 0 && _placeholder.Num < len(*buffers) {
return (*buffers)[_placeholder.Num], nil // appropriate buffer (should be natural order anyway)
}
return nil, errors.New("illegal attachments")
}
newData := map[string]any{}
newData := make(map[string]any, len(d))
for k, v := range d {
_data, err := _reconstructPacket(v, buffers)
if err != nil {
Expand All @@ -101,6 +103,7 @@ func _reconstructPacket(data any, buffers *[]types.BufferInterface) (any, error)
newData[k] = _data
}
return newData, nil
default:
return data, nil
}
return data, nil
}
45 changes: 12 additions & 33 deletions parser/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"io"
"strconv"
"strings"
"sync"
"sync/atomic"

"github.com/zishang520/engine.io-go-parser/types"
"github.com/zishang520/engine.io/v2/events"
Expand All @@ -31,8 +31,7 @@ var (
type decoder struct {
events.EventEmitter

reconstructor *binaryreconstructor
mu sync.RWMutex
reconstructor atomic.Pointer[binaryReconstructor]
}

func NewDecoder() Decoder {
Expand All @@ -43,22 +42,16 @@ func NewDecoder() Decoder {
func (d *decoder) Add(data any) error {
switch tdata := data.(type) {
case string:
d.mu.RLock()
if d.reconstructor != nil {
defer d.mu.RUnlock()
if d.reconstructor.Load() != nil {
return errors.New("got plaintext data when reconstructing a packet")
}
d.mu.RUnlock()
if err := d.decodeAsString(types.NewStringBufferString(tdata)); err != nil {
return err
}
case *strings.Reader:
d.mu.RLock()
if d.reconstructor != nil {
defer d.mu.RUnlock()
if d.reconstructor.Load() != nil {
return errors.New("got plaintext data when reconstructing a packet")
}
d.mu.RUnlock()
rdata, err := types.NewStringBufferReader(tdata)
if err != nil {
return err
Expand All @@ -67,24 +60,19 @@ func (d *decoder) Add(data any) error {
return err
}
case *types.StringBuffer:
d.mu.RLock()
if d.reconstructor != nil {
defer d.mu.RUnlock()
if d.reconstructor.Load() != nil {
return errors.New("got plaintext data when reconstructing a packet")
}
d.mu.RUnlock()
if err := d.decodeAsString(tdata); err != nil {
return err
}
default:
if IsBinary(data) {
// raw binary data
d.mu.RLock()
if d.reconstructor == nil {
defer d.mu.RUnlock()
reconstructor := d.reconstructor.Load()
if reconstructor == nil {
return errors.New("got binary data when not reconstructing a packet")
}
d.mu.RUnlock()

rdata := types.NewBytesBuffer(nil)
switch tdata := data.(type) {
Expand All @@ -100,17 +88,13 @@ func (d *decoder) Add(data any) error {
return err
}
}
d.mu.RLock()
packet, err := d.reconstructor.takeBinaryData(rdata)
d.mu.RUnlock()
packet, err := reconstructor.takeBinaryData(rdata)
if err != nil {
return errors.New(fmt.Sprintf("Decode error: %v", err.Error()))
}
if packet != nil {
// received final buffer
d.mu.Lock()
d.reconstructor = nil
d.mu.Unlock()
d.reconstructor.Store(nil)
d.Emit("decoded", packet)
}
} else {
Expand All @@ -129,9 +113,7 @@ func (d *decoder) decodeAsString(str types.BufferInterface) error {
}
if packet.Type == BINARY_EVENT || packet.Type == BINARY_ACK {
// binary packet's json
d.mu.Lock()
d.reconstructor = NewBinaryReconstructor(packet)
d.mu.Unlock()
d.reconstructor.Store(newBinaryReconstructor(packet))
// no attachments, labeled binary but no binary data to follow
if attachments := packet.Attachments; attachments != nil && *attachments == 0 {
d.Emit("decoded", packet)
Expand Down Expand Up @@ -285,10 +267,7 @@ func isPayloadValid(t PacketType, payload any) bool {

// Deallocates a parser's resources
func (d *decoder) Destroy() {
d.mu.RLock()
defer d.mu.RUnlock()

if d.reconstructor != nil {
d.reconstructor.finishedReconstruction()
if reconstructor := d.reconstructor.Load(); reconstructor != nil {
reconstructor.finishedReconstruction()
}
}
12 changes: 5 additions & 7 deletions parser/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,9 @@ func (e *encoder) Encode(packet *Packet) []types.BufferInterface {
}

func _encodeData(data any) any {
if data == nil {
return nil
}

switch tdata := data.(type) {
case nil:
return nil
// *strings.Reader special handling
case *strings.Reader:
rdata, _ := types.NewStringBufferReader(tdata)
Expand All @@ -50,14 +48,14 @@ func _encodeData(data any) any {
}
return newData
case map[string]any:
newData := map[string]any{}
newData := make(map[string]any, len(tdata))
for k, v := range tdata {
newData[k] = _encodeData(v)
}
return newData
default:
return data
}

return data
}

// Encode packet as string.
Expand Down
Loading

0 comments on commit 53935e8

Please sign in to comment.