From 31a7289084989e929de06e98ac7b5db478dda3ed Mon Sep 17 00:00:00 2001 From: n-holmstedt Date: Thu, 23 Nov 2023 10:33:04 +0100 Subject: [PATCH 1/3] Fix error in NATS worker queues --- receiver/nats.go | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/receiver/nats.go b/receiver/nats.go index 8cf65513..af36a3aa 100644 --- a/receiver/nats.go +++ b/receiver/nats.go @@ -149,11 +149,19 @@ func (n *Nats) Start() error { } n.wg.Add(1) - if n.Queue == "" { - n.natsCon.QueueSubscribe(n.Subject, n.Queue, cb) + if len(n.Queue) > 0 { + natsLog.Debugf("Starting queued subscription on %v with queue %v", n.Subject, n.Queue) + n.natsSub, err = n.natsCon.QueueSubscribe(n.Subject, n.Queue, cb) } else { - n.natsCon.Subscribe(n.Subject, cb) + natsLog.Debugf("Starting subscription on %v", n.Subject) + n.natsSub, err = n.natsCon.Subscribe(n.Subject, cb) } + + if err != nil { + n.wg.Done() + return err + } + n.wg.Wait() return n.natsCon.LastError() } From cfc167e5308439102e0d44559dd7f094b9a7f51f Mon Sep 17 00:00:00 2001 From: n-holmstedt Date: Thu, 23 Nov 2023 10:40:43 +0100 Subject: [PATCH 2/3] Add subscription property --- receiver/nats.go | 1 + 1 file changed, 1 insertion(+) diff --git a/receiver/nats.go b/receiver/nats.go index af36a3aa..a5cb9380 100644 --- a/receiver/nats.go +++ b/receiver/nats.go @@ -51,6 +51,7 @@ type Nats struct { NKeyFile string `doc:"Nats nkey file path"` Insecure bool `doc:"TLS InsecureSkipVerify"` conOpts *[]nats.Option + natsSub *nats.Subscription natsCon *nats.Conn wg sync.WaitGroup } From d9b3bda4f05e7fe4362e7bcf24435f6a7749e306 Mon Sep 17 00:00:00 2001 From: n-holmstedt Date: Thu, 23 Nov 2023 10:44:33 +0100 Subject: [PATCH 3/3] Fmtfix --- receiver/nats.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/receiver/nats.go b/receiver/nats.go index a5cb9380..24aef722 100644 --- a/receiver/nats.go +++ b/receiver/nats.go @@ -154,8 +154,8 @@ func (n *Nats) Start() error { natsLog.Debugf("Starting queued subscription on %v with queue %v", n.Subject, n.Queue) n.natsSub, err = n.natsCon.QueueSubscribe(n.Subject, n.Queue, cb) } else { - natsLog.Debugf("Starting subscription on %v", n.Subject) - n.natsSub, err = n.natsCon.Subscribe(n.Subject, cb) + natsLog.Debugf("Starting subscription on %v", n.Subject) + n.natsSub, err = n.natsCon.Subscribe(n.Subject, cb) } if err != nil {