From 8276193c77ebcb8d021ec7e37e94d74aed8c1b83 Mon Sep 17 00:00:00 2001 From: Gregory Chen Date: Mon, 22 Mar 2021 08:56:50 -0400 Subject: [PATCH 1/2] renaming --- internal/espresso/temperature/temperature.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/internal/espresso/temperature/temperature.go b/internal/espresso/temperature/temperature.go index 31c6a77..1f2599a 100644 --- a/internal/espresso/temperature/temperature.go +++ b/internal/espresso/temperature/temperature.go @@ -20,7 +20,7 @@ type Sampler interface { } type Monitor struct { - subscriptionChans map[uuid.UUID]chan *Sample + subscriptions map[uuid.UUID]chan *Sample sampler Sampler temperatureHistoryMu sync.RWMutex @@ -29,8 +29,8 @@ type Monitor struct { func NewMonitor(sampler Sampler, sampleRate time.Duration) *Monitor { return &Monitor{ - subscriptionChans: map[uuid.UUID]chan *Sample{}, - sampler: sampler, + subscriptions: map[uuid.UUID]chan *Sample{}, + sampler: sampler, } } @@ -49,7 +49,7 @@ func (m *Monitor) Run() { m.temperatureHistory = append(m.temperatureHistory, sample) m.temperatureHistoryMu.Unlock() - for _, ch := range m.subscriptionChans { + for _, ch := range m.subscriptions { ch <- sample } @@ -77,12 +77,12 @@ func (m *Monitor) Run() { func (m *Monitor) Subscribe() (uuid.UUID, chan *Sample) { subId := uuid.New() subscriptionCh := make(chan *Sample) - m.subscriptionChans[subId] = subscriptionCh + m.subscriptions[subId] = subscriptionCh return subId, subscriptionCh } func (m *Monitor) Unsubscribe(subId uuid.UUID) { - delete(m.subscriptionChans, subId) + delete(m.subscriptions, subId) } func (m *Monitor) GetHistory() []*Sample { From cd8dfffaa241672f20c39223bef9aba106d96695 Mon Sep 17 00:00:00 2001 From: Gregory Chen Date: Mon, 22 Mar 2021 08:59:49 -0400 Subject: [PATCH 2/2] mutex around subs map --- internal/espresso/temperature/temperature.go | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/internal/espresso/temperature/temperature.go b/internal/espresso/temperature/temperature.go index 1f2599a..05f1eaa 100644 --- a/internal/espresso/temperature/temperature.go +++ b/internal/espresso/temperature/temperature.go @@ -20,7 +20,8 @@ type Sampler interface { } type Monitor struct { - subscriptions map[uuid.UUID]chan *Sample + subscriptionsMu sync.RWMutex + subscriptions map[uuid.UUID]chan *Sample sampler Sampler temperatureHistoryMu sync.RWMutex @@ -49,9 +50,11 @@ func (m *Monitor) Run() { m.temperatureHistory = append(m.temperatureHistory, sample) m.temperatureHistoryMu.Unlock() + m.subscriptionsMu.RLock() for _, ch := range m.subscriptions { ch <- sample } + m.subscriptionsMu.RUnlock() time.Sleep(time.Second) } @@ -75,13 +78,20 @@ func (m *Monitor) Run() { } func (m *Monitor) Subscribe() (uuid.UUID, chan *Sample) { + m.subscriptionsMu.Lock() + defer m.subscriptionsMu.Unlock() + subId := uuid.New() subscriptionCh := make(chan *Sample) m.subscriptions[subId] = subscriptionCh + return subId, subscriptionCh } func (m *Monitor) Unsubscribe(subId uuid.UUID) { + m.subscriptionsMu.Lock() + defer m.subscriptionsMu.Unlock() + delete(m.subscriptions, subId) }