diff --git a/receiver/nats.go b/receiver/nats.go index 8cf65513..24aef722 100644 --- a/receiver/nats.go +++ b/receiver/nats.go @@ -51,6 +51,7 @@ type Nats struct { NKeyFile string `doc:"Nats nkey file path"` Insecure bool `doc:"TLS InsecureSkipVerify"` conOpts *[]nats.Option + natsSub *nats.Subscription natsCon *nats.Conn wg sync.WaitGroup } @@ -149,11 +150,19 @@ func (n *Nats) Start() error { } n.wg.Add(1) - if n.Queue == "" { - n.natsCon.QueueSubscribe(n.Subject, n.Queue, cb) + if len(n.Queue) > 0 { + natsLog.Debugf("Starting queued subscription on %v with queue %v", n.Subject, n.Queue) + n.natsSub, err = n.natsCon.QueueSubscribe(n.Subject, n.Queue, cb) } else { - n.natsCon.Subscribe(n.Subject, cb) + natsLog.Debugf("Starting subscription on %v", n.Subject) + n.natsSub, err = n.natsCon.Subscribe(n.Subject, cb) } + + if err != nil { + n.wg.Done() + return err + } + n.wg.Wait() return n.natsCon.LastError() }