Skip to content

Commit

Permalink
feature: subscribe (#18)
Browse files Browse the repository at this point in the history
* feature: implement subscribe

* bugfix: safe close js connection
Co-authored-by: Francisco <[email protected]>
  • Loading branch information
zipzap11 authored Oct 28, 2022
1 parent fe309a9 commit dd79d60
Show file tree
Hide file tree
Showing 3 changed files with 216 additions and 32 deletions.
37 changes: 31 additions & 6 deletions jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@ type (
JetStream interface {
Publish(subject string, value []byte, opts ...nats.PubOpt) (*nats.PubAck, error)
QueueSubscribe(subj, queue string, cb nats.MsgHandler, opts ...nats.SubOpt) (*nats.Subscription, error)
Close()
Subscribe(subj string, cb nats.MsgHandler, opts ...nats.SubOpt) (*nats.Subscription, error)
AddStream(cfg *nats.StreamConfig, opts ...nats.JSOpt) (*nats.StreamInfo, error)
ConsumerInfo(streamName, consumerName string, opts ...nats.JSOpt) (*nats.ConsumerInfo, error)
GetNATSConnection() *nats.Conn
}

// jsImpl JetStream implementation
Expand Down Expand Up @@ -150,6 +151,14 @@ func connect(url string, options ...nats.Option) (*nats.Conn, error) {
return nc, nil
}

// GetNATSConnection :nodoc:
func (j *jsImpl) GetNATSConnection() *nats.Conn {
if j == nil {
return nil
}
return j.natsConn
}

func (j *jsImpl) checkConnIsValid() (b bool) {
return j.natsConn != nil && j.natsConn.IsConnected()
}
Expand All @@ -170,11 +179,12 @@ func (j *jsImpl) QueueSubscribe(subj, queue string, cb nats.MsgHandler, opts ...
return j.jsCtx.QueueSubscribe(subj, queue, cb, opts...)
}

// Close close NATS connection
func (j *jsImpl) Close() {
if j.checkConnIsValid() {
j.natsConn.Close()
// Subscribe :nodoc:
func (j *jsImpl) Subscribe(subj string, cb nats.MsgHandler, opts ...nats.SubOpt) (*nats.Subscription, error) {
if !j.checkConnIsValid() {
return nil, ErrConnectionLost
}
return j.jsCtx.Subscribe(subj, cb, opts...)
}

// AddStream add stream
Expand All @@ -193,11 +203,26 @@ func (j *jsImpl) AddStream(cfg *nats.StreamConfig, opts ...nats.JSOpt) (*nats.St

}

// ConsumerInfo
// ConsumerInfo :nodoc:
func (j *jsImpl) ConsumerInfo(streamName, consumerName string, opts ...nats.JSOpt) (*nats.ConsumerInfo, error) {
if !j.checkConnIsValid() {
return nil, ErrConnectionLost
}

return j.jsCtx.ConsumerInfo(streamName, consumerName, opts...)
}

// SafeClose :nodoc:
func SafeClose(js JetStream) {
if js == nil {
return
}
natsConn := js.GetNATSConnection()
if natsConn == nil {
return
}
if !natsConn.IsConnected() {
return
}
natsConn.Close()
}
165 changes: 151 additions & 14 deletions jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,7 @@ func TestPublish(t *testing.T) {

n, err := NewNATSConnection(defaultURL, natsOpts...)
require.NoError(t, err)
defer func() {
n.Close()
}()
defer SafeClose(n)

streamConf := &nats.StreamConfig{
Name: "STREAM_NAME_PUBLISH",
Expand All @@ -61,9 +59,7 @@ func TestQueueSubscribe(t *testing.T) {
n, err := NewNATSConnection(defaultURL)
require.NoError(t, err)

defer func() {
n.Close()
}()
defer SafeClose(n)

streamConf := &nats.StreamConfig{
Name: "STREAM_NAME_QUEUE_SUBSCRIBE",
Expand Down Expand Up @@ -110,10 +106,7 @@ func TestQueueSubscribe(t *testing.T) {
t.Run("queue subscribe NatsEventAuditLogMessage", func(t *testing.T) {
n, err := NewNATSConnection(defaultURL)
require.NoError(t, err)

defer func() {
n.Close()
}()
defer SafeClose(n)

streamConf := &nats.StreamConfig{
Name: "STREAM_NAME_AUDIT",
Expand Down Expand Up @@ -190,13 +183,157 @@ func TestQueueSubscribe(t *testing.T) {
})
}

func TestSubscribe(t *testing.T) {
t.Run("subscribe NatsEventMessage", func(t *testing.T) {
n, err := NewNATSConnection(defaultURL)
require.NoError(t, err)

n2, err := NewNATSConnection(defaultURL)
require.NoError(t, err)
defer SafeClose(n)
defer SafeClose(n2)

streamConf := &nats.StreamConfig{
Name: "STREAM_NAME_SUBSCRIBE",
Subjects: []string{"STREAM_NAME_SUBSCRIBE.*"},
Storage: nats.FileStorage,
}

_, err = n.AddStream(streamConf)
require.NoError(t, err)

countMsg := 10
subject := "STREAM_NAME_SUBSCRIBE.TEST"

msgBytes, err := NewNatsEventMessage().WithEvent(&NatsEvent{
ID: int64(1232),
UserID: int64(21),
}).Build()

require.NoError(t, err)

for i := 0; i < countMsg; i++ {
_, err = n.Publish(subject, msgBytes)
require.NoError(t, err)
}

receiverCh := make(chan *nats.Msg)
sub, err := n.Subscribe(subject, func(msg *nats.Msg) {
receiverCh <- msg
})
require.NoError(t, err)

sub2, err := n2.Subscribe(subject, func(msg *nats.Msg) {
receiverCh <- msg
})
require.NoError(t, err)

// receive double of the message count beacuse of 2 subscriber
for i := 0; i < countMsg*2; i++ {
b := <-receiverCh

assert.Equal(t, msgBytes, b.Data)
assert.Equal(t, subject, b.Subject, "test subject")
}

_ = sub.Unsubscribe()
_ = sub2.Unsubscribe()
})

t.Run("subscribe NatsEventAuditLogMessage", func(t *testing.T) {
n, err := NewNATSConnection(defaultURL)
require.NoError(t, err)

n2, err := NewNATSConnection(defaultURL)
require.NoError(t, err)

defer SafeClose(n)
defer SafeClose(n2)

streamConf := &nats.StreamConfig{
Name: "STREAM_NAME_AUDIT",
Subjects: []string{"STREAM_NAME_AUDIT.*"},
Storage: nats.FileStorage,
}

_, err = n.AddStream(streamConf)
require.NoError(t, err)

countMsg := 10
subject := "STREAM_NAME_AUDIT.TEST_NATS_EVENT_AUDIT_LOG_MESSAGE"

type User struct {
ID int64 `json:"id"`
Name string `json:"name"`
}

oldData := User{
ID: int64(123),
Name: "test name",
}

newData := User{
ID: int64(123),
Name: "new test name",
}

byteOldData, err := json.Marshal(oldData)
require.NoError(t, err)
byteNewData, err := json.Marshal(newData)
require.NoError(t, err)

createdAt, err := time.Parse("2006-01-02", "2020-01-29")
require.NoError(t, err)

msg := &NatsEventAuditLogMessage{
ServiceName: "test-audit",
UserID: 123,
AuditableType: "user",
AuditableID: "123",
Action: "update",
AuditedChanges: string(byteNewData),
OldData: string(byteOldData),
NewData: string(byteNewData),
CreatedAt: createdAt,
Error: nil,
}
msgBytes, err := msg.Build()
require.NoError(t, err)

for i := 0; i < countMsg; i++ {
_, err = n.Publish(subject, msgBytes)
require.NoError(t, err)

}

receiverCh := make(chan *nats.Msg)
sub, err := n.Subscribe(subject, func(msg *nats.Msg) {
receiverCh <- msg
})
require.NoError(t, err)

sub2, err := n2.Subscribe(subject, func(msg *nats.Msg) {
receiverCh <- msg
})
require.NoError(t, err)

for i := 0; i < countMsg*2; i++ {
b := <-receiverCh

assert.Equal(t, msgBytes, b.Data)
assert.Equal(t, subject, b.Subject, "test subject")
}

_ = sub.Unsubscribe()
_ = sub2.Unsubscribe()
})
}

func TestAddStream(t *testing.T) {
n, err := NewNATSConnection(defaultURL)
require.NoError(t, err)

defer func() {
n.Close()
}()
defer SafeClose(n)

streamConf := &nats.StreamConfig{
Name: "STREAM_NAMEXX",
Expand All @@ -223,7 +360,7 @@ func TestAddStream(t *testing.T) {
func TestConsumerInfo(t *testing.T) {
n, err := NewNATSConnection(defaultURL)
require.NoError(t, err)
defer n.Close()
defer SafeClose(n)

updateConf := &nats.StreamConfig{
Name: "STREAM_NAME",
Expand Down
46 changes: 34 additions & 12 deletions mock/mock_jetstream.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit dd79d60

Please sign in to comment.