From fffd246b74f2427f592e43274e2a38ff6e9c47da Mon Sep 17 00:00:00 2001 From: Sarat Chandra Date: Thu, 14 Mar 2024 11:42:09 +0530 Subject: [PATCH] fix: Initialize queue in in-memory broker if the queue isn't found in Consume (#47) * fix: Initialize queue in in-memory broker if the queue isn't found in consume. * fix: Remove unneccesary locking in in-memory broker for sending to work channel. --- brokers/in-memory/broker.go | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/brokers/in-memory/broker.go b/brokers/in-memory/broker.go index 524eacb..2467d07 100644 --- a/brokers/in-memory/broker.go +++ b/brokers/in-memory/broker.go @@ -24,9 +24,18 @@ func New() *Broker { func (r *Broker) Consume(ctx context.Context, work chan []byte, queue string) { r.mu.RLock() - ch := r.queues[queue] + ch, ok := r.queues[queue] r.mu.RUnlock() + // If the queue isn't found, make a queue. + if !ok { + ch = make(chan []byte, 100) + r.mu.Lock() + r.queues[queue] = ch + r.mu.Unlock() + + } + for { select { case <-ctx.Done(): @@ -37,9 +46,7 @@ func (r *Broker) Consume(ctx context.Context, work chan []byte, queue string) { r.pending[queue] = r.pending[queue][1:] r.pmu.Unlock() - r.mu.RLock() work <- d - r.mu.RUnlock() } } }