diff --git a/brokers/in-memory/broker.go b/brokers/in-memory/broker.go index 524eacb..2467d07 100644 --- a/brokers/in-memory/broker.go +++ b/brokers/in-memory/broker.go @@ -24,9 +24,18 @@ func New() *Broker { func (r *Broker) Consume(ctx context.Context, work chan []byte, queue string) { r.mu.RLock() - ch := r.queues[queue] + ch, ok := r.queues[queue] r.mu.RUnlock() + // If the queue isn't found, make a queue. + if !ok { + ch = make(chan []byte, 100) + r.mu.Lock() + r.queues[queue] = ch + r.mu.Unlock() + + } + for { select { case <-ctx.Done(): @@ -37,9 +46,7 @@ func (r *Broker) Consume(ctx context.Context, work chan []byte, queue string) { r.pending[queue] = r.pending[queue][1:] r.pmu.Unlock() - r.mu.RLock() work <- d - r.mu.RUnlock() } } }