Skip to content

Commit

Permalink
feat: subscribe by jetstream
Browse files Browse the repository at this point in the history
  • Loading branch information
kianaza committed May 19, 2024
1 parent 54cfd04 commit 5a85528
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 90 deletions.
147 changes: 68 additions & 79 deletions internal/client/client.go
Original file line number Diff line number Diff line change
@@ -1,110 +1,99 @@
package client

import (
"time"
"fmt"
"strings"

"github.com/nats-io/nats.go"
"go.uber.org/zap"
)

type Client struct {
Conn *nats.Conn
Logger *zap.Logger
Config Config
Metrics Metrics
type Message struct {
Subject string
Data []byte
}

func New(logger *zap.Logger, cfg Config) *Client {
nc := connect(logger, cfg)

metrics := NewMetrics()
// Jetstream represents the NATS core handler
type Jetstream struct {
connection *nats.Conn
jetstream nats.JetStreamContext
config *Config
logger *zap.Logger
}

client := &Client{
Conn: nc,
Logger: logger,
Config: cfg,
Metrics: NewMetrics(),
// NewJetstream initializes NATS JetStream connection
func NewJetstream(hostname string, config Config, logger *zap.Logger) *Jetstream {
j := Jetstream{
config: &config,
logger: logger,
}

nc.SetDisconnectErrHandler(func(_ *nats.Conn, err error) {
metrics.ConnectionErrors.Add(1)
logger.Error("nats disconnected", zap.Error(err))
})

nc.SetReconnectHandler(func(_ *nats.Conn) {
logger.Warn("nats reconnected")
})

return client
}
opts := []nats.Option{
// NATS connection options...
}

func connect(logger *zap.Logger, cfg Config) *nats.Conn {
nc, err := nats.Connect(cfg.URL)
var err error
j.connection, err = nats.Connect(j.config.URL, opts...)
if err != nil {
logger.Fatal("nats connection failed", zap.Error(err))
logger.Panic("could not connect to nats", zap.Error(err))
}

logger.Info("nats connection successful",
zap.String("connected-addr", nc.ConnectedAddr()),
zap.Strings("discovered-servers", nc.DiscoveredServers()))
// JetStream connection
j.jetstream, err = j.connection.JetStream()
if err != nil {
logger.Panic("could not connect to jetstream", zap.Error(err))
}

return nc
return &j
}

func (c *Client) StartMessaging() {
go c.Subscribe("")
go c.Publish("")
}
// Subscribe subscribes to a list of subjects and returns a channel with incoming messages
func (j *Jetstream) Subscribe(subjects []string) chan *Message {
messageHandler, ch := messageHandlerFactoryJetstream()

for _, subject := range subjects {
queueSubscriptionGroup := fmt.Sprintf("%s-%s",
j.config.QueueSubscriptionGroup,
strings.ReplaceAll(subject, ".", "-"),
)
_, err := j.jetstream.QueueSubscribe(
subject,
queueSubscriptionGroup,
messageHandler,
nats.DeliverNew(),
nats.ReplayInstant(),
nats.Durable(queueSubscriptionGroup),
nats.AckExplicit(), // make sure to set it to this value
nats.MaxAckPending(j.config.MaxPubAcksInflight), // make sure to config this value
)

func (c *Client) Publish(subject string) {
if subject == "" {
subject = c.Config.DefaultSubject
}
for {
t := time.Now()
tt, _ := t.MarshalBinary()
msg, err := c.Conn.Request(subject, tt, c.Config.RequestTimeout)
if err != nil {
c.Metrics.SuccessCounter.WithLabelValues("failed publish").Add(1)
if err == nats.ErrTimeout {
c.Logger.Error("Request timeout: No response received within the timeout period.")
} else if err == nats.ErrNoResponders {
c.Logger.Error("Request failed: No responders available for the subject.")
} else {
c.Logger.Error("Request failed: %v", zap.Error(err))
}
j.logger.Panic("could not QueueSubscribe", zap.Error(err))
} else {
c.Metrics.SuccessCounter.WithLabelValues("successful publish").Add(1)
c.Logger.Info("Received response successfully:", zap.ByteString("response", msg.Data))
j.logger.Info("Subscribed to %s successfully", zap.String("subject", subject))
}

time.Sleep(c.Config.PublishInterval)
}

return ch
}

func (c *Client) Subscribe(subject string) {
if subject == "" {
subject = c.Config.DefaultSubject
// Close closes NATS connection
func (j *Jetstream) Close() {
if err := j.connection.FlushTimeout(j.config.FlushTimeout); err != nil {
j.logger.Error("could not flush", zap.Error(err))
}
_, err := c.Conn.Subscribe(subject, func(msg *nats.Msg) {
var publishTime time.Time
err := publishTime.UnmarshalBinary(msg.Data)
if err != nil {
c.Logger.Error("Received message successfully but message is not valid time value")
} else {
latency := time.Since(publishTime).Seconds()
c.Metrics.Latency.Observe(latency)
c.Logger.Info("Received message successfully: ", zap.Float64("latency", latency))
}
c.Metrics.SuccessCounter.WithLabelValues("subscribe").Add(1)

err = c.Conn.Publish(msg.Reply, []byte("ack!"))
if err != nil {
c.Logger.Error("Failed to publish response: %v", zap.Error(err))
}
})
if err != nil {
c.Logger.Error("Failed to subscribe to subject '%v': %v", zap.String(subject, subject), zap.Error(err))
}
j.connection.Close()
j.logger.Info("NATS is closed.")
}

func messageHandlerFactoryJetstream() (nats.MsgHandler, chan *Message) {
ch := make(chan *Message)
return func(msg *nats.Msg) {
ch <- &Message{
Subject: msg.Subject,
Data: msg.Data,
}
msg.Ack()

Check failure on line 97 in internal/client/client.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `msg.Ack` is not checked (errcheck)
}, ch
}
11 changes: 7 additions & 4 deletions internal/client/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ package client
import "time"

type Config struct {
URL string `json:"url,omitempty" koanf:"url"`
PublishInterval time.Duration `json:"publish_interval" koanf:"publish_interval"`
RequestTimeout time.Duration `json:"request_timeout" koanf:"request_timeout"`
DefaultSubject string `json:"default_subject" koanf:"default_subject"`
URL string `json:"url,omitempty" koanf:"url"`
PublishInterval time.Duration `json:"publish_interval" koanf:"publish_interval"`
RequestTimeout time.Duration `json:"request_timeout" koanf:"request_timeout"`
DefaultSubject string `json:"default_subject" koanf:"default_subject"`
MaxPubAcksInflight int `json:"max_pubAcks_inflight" koanf:"max_pubAcks_inflight"`
QueueSubscriptionGroup string `json:"queue_subscription_group" koanf:"queue_subscription_group"`
FlushTimeout time.Duration `json:"flush_timeout" koanf:"flush_timeout"`
}
17 changes: 14 additions & 3 deletions internal/cmd/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cmd

import (
"fmt"
"os"
"os/signal"
"syscall"
Expand All @@ -11,11 +12,21 @@ import (
)

func main(cfg config.Config, logger *zap.Logger) {
natsConfig := cfg.NATS
js := client.NewJetstream("hostname", cfg.NATS, logger)

natsClient := client.New(logger, natsConfig)
// Define subjects to subscribe to
subjects := []string{"updates.1", "updates.2"}

natsClient.StartMessaging()
// Subscribe to the queue
messageChannel := js.Subscribe(subjects)

// Receive and process messages
for msg := range messageChannel {
fmt.Printf("Received message: Subject=%s, Data=%s\n", msg.Subject, string(msg.Data))
}

// Close NATS connection when done
js.Close()

sig := make(chan os.Signal, 1)
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
Expand Down
11 changes: 7 additions & 4 deletions internal/config/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@ func Default() Config {
Level: "debug",
},
NATS: client.Config{
URL: "localhost:4222",
PublishInterval: 2 * time.Second,
RequestTimeout: 50 * time.Millisecond,
DefaultSubject: "test",
URL: "localhost:4222",
PublishInterval: 2 * time.Second,
RequestTimeout: 50 * time.Millisecond,
DefaultSubject: "test",
MaxPubAcksInflight: 10,
QueueSubscriptionGroup: "group",
FlushTimeout: 2 * time.Second,
},
Metric: metric.Config{
Address: ":8080",
Expand Down

0 comments on commit 5a85528

Please sign in to comment.