diff --git a/internal/natsclient/jetstream.go b/internal/natsclient/jetstream.go index 27d5220..03b61af 100644 --- a/internal/natsclient/jetstream.go +++ b/internal/natsclient/jetstream.go @@ -82,7 +82,7 @@ func (j *Jetstream) StartJetstreamMessaging() { // Subscribe subscribes to a list of subjects and returns a channel with incoming messages func (j *Jetstream) createSubscribe(subject string) chan *Message { - messageHandler, h := messageHandlerFactoryJetstream() + messageHandler, h := j.messageHandlerFactoryJetstream() _, err := j.jetstream.Subscribe( subject, messageHandler, @@ -104,7 +104,12 @@ func (j *Jetstream) createSubscribe(subject string) chan *Message { func (j *Jetstream) jetstreamSubscribe(h chan *Message) { for msg := range h { var publishTime time.Time - publishTime.UnmarshalBinary(msg.Data) + err := publishTime.UnmarshalBinary(msg.Data) + if err != nil { + j.logger.Error("unable to unmarshal binary data for publishTime.") + j.logger.Info("received message but could not calculate latency due to unmarshalling error.", zap.String("subject", msg.Subject)) + return + } latency := time.Since(publishTime).Seconds() j.metrics.Latency.Observe(latency) j.metrics.SuccessCounter.WithLabelValues("successful subscribe").Add(1) @@ -137,23 +142,16 @@ func (j *Jetstream) jetstreamPublish(subject string) { } } -// Close closes NATS connection -func (j *Jetstream) close() { - if err := j.connection.FlushTimeout(j.config.FlushTimeout); err != nil { - j.logger.Error("could not flush", zap.Error(err)) - } - - j.connection.Close() - j.logger.Info("NATS is closed.") -} - -func messageHandlerFactoryJetstream() (nats.MsgHandler, chan *Message) { +func (j *Jetstream) messageHandlerFactoryJetstream() (nats.MsgHandler, chan *Message) { ch := make(chan *Message) return func(msg *nats.Msg) { ch <- &Message{ Subject: msg.Subject, Data: msg.Data, } - msg.Ack() + err := msg.Ack() + if err != nil { + j.logger.Error("Failed to acknowledge the message", zap.Error(err)) + } }, ch }