Skip to content

Commit

Permalink
Merge pull request #15 from kumparan/fixFallBackUnmarshal
Browse files Browse the repository at this point in the history
hotfix: update ParseFromBytes NatsEventMessage using fallbackWith
  • Loading branch information
atjhoendz authored Oct 4, 2022
2 parents f3cb6bf + 3b93caf commit 1d973f0
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 69 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@
vendor/
/config.yml
.vscode
.idea/
.idea
.jetstream_log
6 changes: 3 additions & 3 deletions event_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (n *NatsEvent) GetTime() string {
}

// IsTimeValid :nodoc:
func (n *NatsEvent) IsTimeValid() bool {
func (n *NatsEvent) isTimeValid() bool {
_, err := time.Parse(NatsEventTimeFormat, n.GetTime())
return err == nil
}
Expand Down Expand Up @@ -151,7 +151,7 @@ func (n *NatsEventMessage) WithEvent(e *NatsEvent) *NatsEventMessage {
case "":
e.Time = time.Now().Format(NatsEventTimeFormat)
default:
if !e.IsTimeValid() {
if !e.isTimeValid() {
n.wrapError(errors.New("invalid time format"))
return n
}
Expand Down Expand Up @@ -195,7 +195,7 @@ func (n *NatsEventMessage) wrapError(err error) {

// ParseFromBytes :nodoc:
func (n *NatsEventMessage) ParseFromBytes(data []byte) (err error) {
err = tapao.Unmarshal(data, &n, tapao.With(tapao.JSON))
err = tapao.Unmarshal(data, &n, tapao.With(tapao.JSON), tapao.FallbackWith(tapao.MessagePack))
if err != nil {
n.Error = errors.Wrap(n.Error, err.Error())
return err
Expand Down
103 changes: 38 additions & 65 deletions jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const defaultURL = "nats://127.0.0.1:4222"
func RunBasicJetStreamServer() *server.Server {
opts := natsserver.DefaultTestOptions
opts.JetStream = true
opts.StoreDir = ".jetstream_log"
return natsserver.RunServer(&opts)
}

Expand All @@ -36,78 +37,65 @@ func TestPublish(t *testing.T) {
}

n, err := NewNATSConnection(defaultURL, natsOpts...)
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)
defer func() {
n.Close()
}()

streamConf := &nats.StreamConfig{
Name: "STREAM_NAME_PUBLISH_2",
Subjects: []string{"STREAM_NAME_PUBLISH_2.*"},
MaxAge: 1 * time.Hour,
Storage: nats.FileStorage,
Name: "STREAM_NAME_PUBLISH",
Subjects: []string{"STREAM_NAME_PUBLISH.*"},
Storage: nats.FileStorage,
Retention: nats.WorkQueuePolicy,
}

_, err = n.AddStream(streamConf)
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)

_, err = n.Publish("STREAM_NAME_PUBLISH_2.TEST", []byte("test"))
if err != nil {
t.Fatal(err)
}
_, err = n.Publish("STREAM_NAME_PUBLISH.TEST", []byte("test"))
require.NoError(t, err)
}

func TestQueueSubscribe(t *testing.T) {
t.Run("queue subscribe NatsEventMessage", func(t *testing.T) {
n, err := NewNATSConnection(defaultURL)
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)

defer func() {
n.Close()
}()

streamConf := &nats.StreamConfig{
Name: "STREAM_NAME_QUEUE_SUBSCRIBE_2",
Subjects: []string{"STREAM_NAME_QUEUE_SUBSCRIBE_2.*"},
MaxAge: 1 * time.Hour,
Storage: nats.FileStorage,
Name: "STREAM_NAME_QUEUE_SUBSCRIBE",
Subjects: []string{"STREAM_NAME_QUEUE_SUBSCRIBE.*"},
Storage: nats.FileStorage,
Retention: nats.WorkQueuePolicy,
}

_, err = n.AddStream(streamConf)
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)

countMsg := 10
subject := "STREAM_NAME_QUEUE_SUBSCRIBE_2.TEST"
subject := "STREAM_NAME_QUEUE_SUBSCRIBE.TEST"
queue := "test_queue_group"

msgBytes, err := NewNatsEventMessage().WithEvent(&NatsEvent{
ID: int64(1232),
UserID: int64(21),
}).Build()

assert.NoError(t, err)
require.NoError(t, err)

for i := 0; i < countMsg; i++ {
_, err = n.Publish(subject, msgBytes)
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)
}

receiverCh := make(chan *nats.Msg)
sub, err := n.QueueSubscribe(subject, queue, func(msg *nats.Msg) {
receiverCh <- msg
})
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)

for i := 0; i < countMsg; i++ {
b := <-receiverCh
Expand All @@ -121,27 +109,24 @@ func TestQueueSubscribe(t *testing.T) {

t.Run("queue subscribe NatsEventAuditLogMessage", func(t *testing.T) {
n, err := NewNATSConnection(defaultURL)
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)

defer func() {
n.Close()
}()

streamConf := &nats.StreamConfig{
Name: "STREAM_NAME_AUDIT_2",
Subjects: []string{"STREAM_NAME_AUDIT_2.*"},
MaxAge: 1 * time.Hour,
Storage: nats.FileStorage,
Name: "STREAM_NAME_AUDIT",
Subjects: []string{"STREAM_NAME_AUDIT.*"},
Storage: nats.FileStorage,
Retention: nats.WorkQueuePolicy,
}

_, err = n.AddStream(streamConf)
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)

countMsg := 10
subject := "STREAM_NAME_AUDIT_2.TEST_NATS_EVENT_AUDIT_LOG_MESSAGE"
subject := "STREAM_NAME_AUDIT.TEST_NATS_EVENT_AUDIT_LOG_MESSAGE"
queue := "test_queue_group"

type User struct {
Expand Down Expand Up @@ -180,22 +165,19 @@ func TestQueueSubscribe(t *testing.T) {
Error: nil,
}
msgBytes, err := msg.Build()
assert.NoError(t, err)
require.NoError(t, err)

for i := 0; i < countMsg; i++ {
_, err = n.Publish(subject, msgBytes)
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)

}

receiverCh := make(chan *nats.Msg)
sub, err := n.QueueSubscribe(subject, queue, func(msg *nats.Msg) {
receiverCh <- msg
})
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)

for i := 0; i < countMsg; i++ {
b := <-receiverCh
Expand All @@ -210,9 +192,8 @@ func TestQueueSubscribe(t *testing.T) {

func TestAddStream(t *testing.T) {
n, err := NewNATSConnection(defaultURL)
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)

defer func() {
n.Close()
}()
Expand All @@ -225,31 +206,23 @@ func TestAddStream(t *testing.T) {
}

_, err = n.AddStream(streamConf)

if err != nil {
t.Fatal(err)
}
require.NoError(t, err)

// test update config
updateConf := &nats.StreamConfig{
Name: "STREAM_NAMEXX",
Subjects: []string{"STREAM_EVENTXX.*"},
MaxAge: 2 * time.Hour,
MaxAge: 1 * time.Hour,
Storage: nats.FileStorage,
}

_, err = n.AddStream(updateConf)

if err != nil {
t.Fatal(err)
}
require.NoError(t, err)
}

func TestConsumerInfo(t *testing.T) {
n, err := NewNATSConnection(defaultURL)
if err != nil {
t.Fatal(err)
}
require.NoError(t, err)
defer n.Close()

updateConf := &nats.StreamConfig{
Expand Down

0 comments on commit 1d973f0

Please sign in to comment.