Skip to content

Commit

Permalink
Merge pull request #6 from kumparan/add-consumer-info
Browse files Browse the repository at this point in the history
feature: add consumer info and register method
  • Loading branch information
atjhoendz authored Jul 20, 2022
2 parents 5d3dcc6 + 9c88356 commit 18e3074
Show file tree
Hide file tree
Showing 4 changed files with 108 additions and 5 deletions.
12 changes: 9 additions & 3 deletions event_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ type (
MessageParser interface {
ParseFromBytes(data []byte) error
AddSubject(subj string)
ToJSON() (string, error)
ToJSONString() (string, error)
ToJSONByte() ([]byte, error)
}
)

Expand Down Expand Up @@ -151,12 +152,17 @@ func (n *NatsEventMessage) AddSubject(subj string) {
n.NatsEvent.Subject = subj
}

// ToJSON marshal message to JSON format
func (n *NatsEventMessage) ToJSON() (string, error) {
// ToJSONString marshal message to JSON string
func (n *NatsEventMessage) ToJSONString() (string, error) {
bt, err := tapao.Marshal(n, tapao.With(tapao.JSON))
return string(bt), err
}

// ToJSONByte marshal message to JSON byte
func (n *NatsEventMessage) ToJSONByte() ([]byte, error) {
return tapao.Marshal(n, tapao.With(tapao.JSON))
}

// ParseJSON parse JSON into message
func ParseJSON(in string) (*NatsEventMessage, error) {
msg := &NatsEventMessage{}
Expand Down
24 changes: 22 additions & 2 deletions event_message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func TestNatsEventMessage_Build(t *testing.T) {
})
}

func TestNatsEventMessage_ToJSON(t *testing.T) {
func TestNatsEventMessage_ToJSONString(t *testing.T) {
natsEvent := &NatsEvent{
ID: 123,
UserID: 333,
Expand All @@ -189,13 +189,33 @@ func TestNatsEventMessage_ToJSON(t *testing.T) {
WithEvent(natsEvent).
WithBody(body)

parsed, err := msg.ToJSON()
parsed, err := msg.ToJSONString()
require.NoError(t, err)

expectedRes := "{\"NatsEvent\":{\"id\":123,\"user_id\":333,\"tenant_id\":0,\"subject\":\"\"},\"body\":\"[\\\"test\\\"]\",\"old_body\":\"\",\"request\":null,\"error\":null}"
assert.Equal(t, expectedRes, parsed)
}

func TestNatsEventMessage_ToJSONByte(t *testing.T) {
natsEvent := &NatsEvent{
ID: 123,
UserID: 333,
}
body := []string{"test"}

msg := NewNatsEventMessage().
WithEvent(natsEvent).
WithBody(body)

jsonByte, err := msg.ToJSONByte()
require.NoError(t, err)

parsed, err := ParseJSON(string(jsonByte))
require.NoError(t, err)

assert.Equal(t, parsed, msg)
}

func TestNatsEventMessage_ParseJSON(t *testing.T) {
json := "{\"NatsEvent\":{\"id\":123,\"user_id\":333,\"tenant_id\":0,\"subject\":\"\"},\"body\":\"[\\\"test\\\"]\",\"old_body\":\"\",\"request\":null,\"error\":null}"

Expand Down
45 changes: 45 additions & 0 deletions jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type (
QueueSubscribe(subj, queue string, cb nats.MsgHandler, opts ...nats.SubOpt) (*nats.Subscription, error)
Close()
AddStream(cfg *nats.StreamConfig, opts ...nats.JSOpt) (*nats.StreamInfo, error)
ConsumerInfo(streamName, consumerName string, opts ...nats.JSOpt) (*nats.ConsumerInfo, error)
}

// jsImpl JetStream implementation
Expand All @@ -23,6 +24,21 @@ type (
jsCtx nats.JetStreamContext
}

// JetStreamRegistrar
JetStreamRegistrar interface {
RegisterNATSJetStream(js JetStream)
}

// StreamRegistration
StreamRegistrar interface {
InitStream() error
}

// Subscriber
Subscriber interface {
SubscribeJetStreamEvent() error
}

// Message Handler :nodoc:
MessageHandler func(payload MessageParser) (err error)
)
Expand Down Expand Up @@ -51,6 +67,26 @@ func NewNATSConnection(url string, natsOpts ...nats.Option) (JetStream, error) {
return impl, nil
}

// RegisterJetStreamClient provide jetstream instance, stream, and subscription registration
func RegisterJetStreamClient(js JetStream, clients []JetStreamRegistrar) error {
for _, client := range clients {
client.RegisterNATSJetStream(js)
if streamRegistrar, ok := client.(StreamRegistrar); ok {
err := streamRegistrar.InitStream()
if err != nil {
return err
}
}
if subscriber, ok := client.(Subscriber); ok {
err := subscriber.SubscribeJetStreamEvent()
if err != nil {
return err
}
}
}
return nil
}

// NewNATSMessageHandler a wrapper to standardize how we handle NATS messages.
// Payload (arg 0) should always be empty when the method is called. The payload data will later parse data from msg.Data.
func NewNATSMessageHandler(payload MessageParser, retryAttempts int, retryInterval time.Duration, msgHandler MessageHandler, errHandler MessageHandler) nats.MsgHandler {
Expand Down Expand Up @@ -151,3 +187,12 @@ func (j *jsImpl) AddStream(cfg *nats.StreamConfig, opts ...nats.JSOpt) (*nats.St
return j.jsCtx.UpdateStream(cfg)

}

// ConsumerInfo
func (j *jsImpl) ConsumerInfo(streamName, consumerName string, opts ...nats.JSOpt) (*nats.ConsumerInfo, error) {
if !j.checkConnIsValid() {
return nil, ErrConnectionLost
}

return j.jsCtx.ConsumerInfo(streamName, consumerName, opts...)
}
32 changes: 32 additions & 0 deletions jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
natsserver "github.com/nats-io/nats-server/v2/test"
"github.com/nats-io/nats.go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

const defaultURL = "nats://127.0.0.1:4222"
Expand Down Expand Up @@ -153,3 +154,34 @@ func TestAddStream(t *testing.T) {
t.Fatal(err)
}
}

func TestConsumerInfo(t *testing.T) {
n, err := NewNATSConnection(defaultURL)
if err != nil {
t.Fatal(err)
}
defer n.Close()

updateConf := &nats.StreamConfig{
Name: "STREAM_NAME",
Subjects: []string{"STREAM_EVENT.*"},
MaxAge: 2 * time.Hour,
Storage: nats.FileStorage,
}

_, err = n.AddStream(updateConf)
require.NoError(t, err)

_, err = n.QueueSubscribe("STREAM_EVENT.A", "QUEUEUEUE", func(msg *nats.Msg) {}, nats.Durable("DURABLE_NAME"))
require.NoError(t, err)

consumerInfo, err := n.ConsumerInfo("STREAM_NAME", "DURABLE_NAME")
assert.NotNil(t, consumerInfo)
require.NoError(t, err)

consumerInfo2, err := n.ConsumerInfo("STREAM_NAME", "DURABLE_NAME2")
require.Error(t, err)
assert.Nil(t, consumerInfo2)
assert.Equal(t, nats.ErrConsumerNotFound, err)

}

0 comments on commit 18e3074

Please sign in to comment.