Skip to content

Commit

Permalink
Merge pull request #2 from kumparan/feature/add-subject-in-msg
Browse files Browse the repository at this point in the history
feature: add subject in msg event
  • Loading branch information
atjhoendz authored Feb 4, 2022
2 parents e0d0b45 + bd849c5 commit 0e2c548
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 59 deletions.
24 changes: 11 additions & 13 deletions event_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,11 @@ import (
)

type (
// EventType :nodoc:
EventType string

// NatsEvent :nodoc:
NatsEvent struct {
ID int64
UserID int64
Type EventType
ID int64
UserID int64
Subject string // empty on publish
}

// NatsEventMessage :nodoc:
Expand All @@ -30,6 +27,7 @@ type (

MessageParser interface {
ParseFromBytes(data []byte) error
AddSubject(subj string)
}
)

Expand All @@ -49,12 +47,12 @@ func (n *NatsEvent) GetUserID() int64 {
return n.UserID
}

// GetType :nodoc:
func (n *NatsEvent) GetType() EventType {
// GetSubject :nodoc:
func (n *NatsEvent) GetSubject() string {
if n == nil {
return ""
}
return n.Type
return n.Subject
}

// NewNatsEventMessage :nodoc:
Expand Down Expand Up @@ -93,10 +91,6 @@ func (n *NatsEventMessage) WithEvent(e *NatsEvent) *NatsEventMessage {
n.wrapError(errors.New("empty user id"))
return n
}
if e.GetType() == "" {
n.wrapError(errors.New("empty NatsEvent type"))
return n
}

n.NatsEvent = e
return n
Expand Down Expand Up @@ -142,3 +136,7 @@ func (n *NatsEventMessage) ParseFromBytes(data []byte) (err error) {
}
return
}

func (n *NatsEventMessage) AddSubject(subj string) {
n.NatsEvent.Subject = subj
}
24 changes: 2 additions & 22 deletions event_message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,31 +21,20 @@ func TestNatsEventMessage_WithEvent(t *testing.T) {
Given: &NatsEvent{
ID: 111,
UserID: 432,
Type: "type",
},
ExpectedError: false,
},
{
Name: "empty id",
Given: &NatsEvent{
UserID: 432,
Type: "type",
},
ExpectedError: true,
},
{
Name: "empty user",
Given: &NatsEvent{
ID: 111,
Type: "type",
},
ExpectedError: true,
},
{
Name: "empty type",
Given: &NatsEvent{
ID: 111,
UserID: 432,
ID: 111,
},
ExpectedError: true,
},
Expand Down Expand Up @@ -93,7 +82,6 @@ func TestNatsEventMessage_Build(t *testing.T) {
event := &NatsEvent{
ID: 1,
UserID: 123,
Type: "type",
}

body := []string{"test"}
Expand All @@ -119,7 +107,6 @@ func TestNatsEventMessage_Build(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, event.ID, result.NatsEvent.ID)
assert.Equal(t, event.UserID, result.NatsEvent.UserID)
assert.Equal(t, event.Type, result.NatsEvent.Type)
assert.Equal(t, utils.Dump(body), result.Body)

var requestResult pb.Greeting
Expand All @@ -146,7 +133,6 @@ func TestNatsEventMessage_Build(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, event.ID, result.NatsEvent.ID)
assert.Equal(t, event.UserID, result.NatsEvent.UserID)
assert.Equal(t, event.Type, result.NatsEvent.Type)
assert.Equal(t, utils.Dump(body), result.Body)
assert.Equal(t, utils.Dump(oldBody), result.OldBody)

Expand All @@ -168,15 +154,9 @@ func TestNatsEventMessage_Build(t *testing.T) {
nil,
{
UserID: 131,
Type: "type",
},
{
ID: 21,
Type: "type",
},
{
ID: 33,
UserID: 423,
ID: 21,
},
}

Expand Down
2 changes: 2 additions & 0 deletions jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ func NewNATSMessageHandler(payload MessageParser, retryAttempts int, retryInterv
return
}

payload.AddSubject(msg.Subject)

retryErr := utils.Retry(retryAttempts, retryInterval, func() error {
return msgHandler(payload)
})
Expand Down
36 changes: 12 additions & 24 deletions jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ import (
"testing"
"time"

"github.com/kumparan/tapao"
"github.com/nats-io/nats-server/v2/server"
natsserver "github.com/nats-io/nats-server/v2/test"
"github.com/nats-io/nats.go"
"github.com/stretchr/testify/assert"
)

const defaultURL = "nats://127.0.0.1:4222"
Expand Down Expand Up @@ -69,10 +69,6 @@ func TestQueueSubscribe(t *testing.T) {
n.Close()
}()

type msg struct {
Data int64 `json:"data"`
}

streamConf := &nats.StreamConfig{
Name: "STREAM_NAME_ANOTHER",
Subjects: []string{"STREAM_EVENT_ANOTHER.*"},
Expand All @@ -89,41 +85,33 @@ func TestQueueSubscribe(t *testing.T) {
subject := "STREAM_EVENT_ANOTHER.TEST"
queue := "test_queue_group"

for i := 0; i < countMsg; i++ {
ms := &msg{
Data: int64(1554775372665126857),
}
msgBytes, err := tapao.Marshal(ms)
if err != nil {
t.Fatal(err)
}
msgBytes, err := NewNatsEventMessage().WithEvent(&NatsEvent{
ID: int64(1232),
UserID: int64(21),
}).Build()

assert.NoError(t, err)

for i := 0; i < countMsg; i++ {
_, err = n.Publish(subject, msgBytes)
if err != nil {
t.Fatal(err)
}
}

receiverCh := make(chan []byte, countMsg)
receiverCh := make(chan *nats.Msg)
sub, err := n.QueueSubscribe(subject, queue, func(msg *nats.Msg) {
receiverCh <- msg.Data
receiverCh <- msg
})
if err != nil {
t.Fatal(err)
}

for i := 0; i < countMsg; i++ {
b := <-receiverCh
msg := new(msg)

err = tapao.Unmarshal(b, msg)
if err != nil {
t.Fatal(err)
}

if msg.Data != int64(1554775372665126857) {
t.Fatal("error")
}
assert.Equal(t, msgBytes, b.Data)
assert.Equal(t, subject, b.Subject, "test subject")
}

_ = sub.Unsubscribe()
Expand Down

0 comments on commit 0e2c548

Please sign in to comment.