Skip to content

Commit

Permalink
fix: Initialize queue in in-memory broker if the queue isn't found in…
Browse files Browse the repository at this point in the history
… consume.
  • Loading branch information
iamd3vil committed Mar 14, 2024
1 parent e2d7d2b commit b2e1026
Showing 1 changed file with 10 additions and 1 deletion.
11 changes: 10 additions & 1 deletion brokers/in-memory/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,18 @@ func New() *Broker {

func (r *Broker) Consume(ctx context.Context, work chan []byte, queue string) {
r.mu.RLock()
ch := r.queues[queue]
ch, ok := r.queues[queue]
r.mu.RUnlock()

// If the queue isn't found, make a queue.
if !ok {
ch = make(chan []byte, 100)
r.mu.Lock()
r.queues[queue] = ch
r.mu.Unlock()

}

for {
select {
case <-ctx.Done():
Expand Down

0 comments on commit b2e1026

Please sign in to comment.