Skip to content

Commit

Permalink
fix: Initialize queue in in-memory broker if the queue isn't found in…
Browse files Browse the repository at this point in the history
… Consume (#47)

* fix: Initialize queue in in-memory broker if the queue isn't found in consume.

* fix: Remove unneccesary locking in in-memory broker for sending to work channel.
  • Loading branch information
iamd3vil authored Mar 14, 2024
1 parent e2d7d2b commit fffd246
Showing 1 changed file with 10 additions and 3 deletions.
13 changes: 10 additions & 3 deletions brokers/in-memory/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,18 @@ func New() *Broker {

func (r *Broker) Consume(ctx context.Context, work chan []byte, queue string) {
r.mu.RLock()
ch := r.queues[queue]
ch, ok := r.queues[queue]
r.mu.RUnlock()

// If the queue isn't found, make a queue.
if !ok {
ch = make(chan []byte, 100)
r.mu.Lock()
r.queues[queue] = ch
r.mu.Unlock()

}

for {
select {
case <-ctx.Done():
Expand All @@ -37,9 +46,7 @@ func (r *Broker) Consume(ctx context.Context, work chan []byte, queue string) {
r.pending[queue] = r.pending[queue][1:]
r.pmu.Unlock()

r.mu.RLock()
work <- d
r.mu.RUnlock()
}
}
}
Expand Down

0 comments on commit fffd246

Please sign in to comment.