Skip to content

Commit

Permalink
fix: fix golint errors
Browse files Browse the repository at this point in the history
  • Loading branch information
kianaza committed May 24, 2024
1 parent 3a99deb commit 9b8cd4b
Showing 1 changed file with 12 additions and 14 deletions.
26 changes: 12 additions & 14 deletions internal/natsclient/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (j *Jetstream) StartJetstreamMessaging() {
// Subscribe subscribes to a list of subjects and returns a channel with incoming messages
func (j *Jetstream) createSubscribe(subject string) chan *Message {

messageHandler, h := messageHandlerFactoryJetstream()
messageHandler, h := j.messageHandlerFactoryJetstream()
_, err := j.jetstream.Subscribe(
subject,
messageHandler,
Expand All @@ -104,7 +104,12 @@ func (j *Jetstream) createSubscribe(subject string) chan *Message {
func (j *Jetstream) jetstreamSubscribe(h chan *Message) {
for msg := range h {
var publishTime time.Time
publishTime.UnmarshalBinary(msg.Data)
err := publishTime.UnmarshalBinary(msg.Data)
if err != nil {
j.logger.Error("unable to unmarshal binary data for publishTime.")
j.logger.Info("received message but could not calculate latency due to unmarshalling error.", zap.String("subject", msg.Subject))
return
}
latency := time.Since(publishTime).Seconds()
j.metrics.Latency.Observe(latency)
j.metrics.SuccessCounter.WithLabelValues("successful subscribe").Add(1)
Expand Down Expand Up @@ -137,23 +142,16 @@ func (j *Jetstream) jetstreamPublish(subject string) {
}
}

// Close closes NATS connection
func (j *Jetstream) close() {
if err := j.connection.FlushTimeout(j.config.FlushTimeout); err != nil {
j.logger.Error("could not flush", zap.Error(err))
}

j.connection.Close()
j.logger.Info("NATS is closed.")
}

func messageHandlerFactoryJetstream() (nats.MsgHandler, chan *Message) {
func (j *Jetstream) messageHandlerFactoryJetstream() (nats.MsgHandler, chan *Message) {
ch := make(chan *Message)
return func(msg *nats.Msg) {
ch <- &Message{
Subject: msg.Subject,
Data: msg.Data,
}
msg.Ack()
err := msg.Ack()
if err != nil {
j.logger.Error("Failed to acknowledge the message", zap.Error(err))
}
}, ch
}

0 comments on commit 9b8cd4b

Please sign in to comment.