Skip to content

Commit

Permalink
fix concurrent lock race problem
Browse files Browse the repository at this point in the history
  • Loading branch information
numb3r3 committed May 16, 2018
1 parent 0b03d59 commit 0dba344
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 25 deletions.
18 changes: 9 additions & 9 deletions pubsub/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ func (b *Broker) Attach() (*Subscriber, error) {
messages: make(chan *Message, 1),
createAt: time.Now().UnixNano(),
destroyed: false,
lock: &sync.RWMutex{},
topics: map[string]bool{},
closing: make(chan bool, 1),
//lock: &sync.RWMutex{},
topics: map[string]bool{},
closing: make(chan bool, 1),
}
// b.subscribers[s.id] = s
return s, nil
Expand Down Expand Up @@ -85,16 +85,16 @@ func (b *Broker) Unsubscribe(s *Subscriber, topics ...string) {
func (b *Broker) Broadcast(data []byte, topics ...string) {
b.tlock.RLock()
defer b.tlock.RUnlock()
now := time.Now().UnixNano()
now := time.Now().UnixNano()
for _, topic := range topics {
if nil == b.topics[topic] {
continue
}
m := &Message{
topic: topic,
data: data,
createAt: now,
}
m := &Message{
topic: topic,
data: data,
createAt: now,
}
for _, s := range b.topics[topic] {
go (func(s *Subscriber) {
s.Signal(m)
Expand Down
26 changes: 16 additions & 10 deletions pubsub/subscriber.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package pubsub

import (
"sync"
//"sync"
)

type Subscribers map[string]*Subscriber
Expand All @@ -11,9 +11,9 @@ type Subscriber struct {
messages chan *Message
createAt int64
destroyed bool
lock *sync.RWMutex
topics map[string]bool
closing chan bool
//lock *sync.RWMutex
topics map[string]bool
closing chan bool
}

// to get the subscriber id
Expand All @@ -37,22 +37,28 @@ func (s *Subscriber) GetMessages() <-chan *Message {

// to send a message to subscriber
func (s *Subscriber) Signal(m *Message) *Subscriber {
s.lock.RLock()
defer s.lock.RUnlock()
// s.lock.RLock()
// defer s.lock.RUnlock()
if !s.destroyed {
s.messages <- m
select {
case <-s.closing:
return s
default:
s.messages <- m
}
// s.messages <- m
}
return s
}

// to close the underlying channels/resources
func (s *Subscriber) Destroy() {
s.lock.Lock()
defer s.lock.Unlock()
// s.lock.Lock()
// defer s.lock.Unlock()
if !s.destroyed {
s.destroyed = true
close(s.messages)
s.closing <- true
close(s.messages)
close(s.closing)
}
}
Expand Down
12 changes: 6 additions & 6 deletions websocket/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,12 @@ func newWebsocketConn(ws websocketConn) *websocketTransport {
ws.SetReadDeadline(time.Now().Add(pongWait))
ws.SetPongHandler(func(string) error { ws.SetReadDeadline(time.Now().Add(pongWait)); return nil })
utils.Repeat(func() {
// logging.Debug("to write ping")
if err := ws.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(writeWait)); err != nil {
logging.Debug("ping err: ", err)
conn.closing <- true
}
}, pingPeriod, conn.closing)
// logging.Debug("to write ping")
if err := ws.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(writeWait)); err != nil {
logging.Debug("ping err: ", err)
conn.closing <- true
}
}, pingPeriod, conn.closing)

/*ws.SetReadLimit(maxMessageSize)
ws.SetReadDeadline(time.Now().Add(pongWait))
Expand Down

0 comments on commit 0dba344

Please sign in to comment.