Skip to content

Commit

Permalink
Merge pull request #14 from kumparan/feature/BAN-13809-add-time-field…
Browse files Browse the repository at this point in the history
…-in-nats-event-struct

feature: add time field in NatsEvent struct
  • Loading branch information
ssentinull authored Sep 29, 2022
2 parents cda13ea + d616dd2 commit 1f275a7
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 18 deletions.
31 changes: 29 additions & 2 deletions event_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,22 @@ import (
"time"

"github.com/kumparan/go-utils"
"google.golang.org/protobuf/proto"

"github.com/kumparan/tapao"
"github.com/pkg/errors"
"google.golang.org/protobuf/proto"
)

// NatsEventTimeFormat time format for NatsEvent 'time' field
const NatsEventTimeFormat = time.RFC3339Nano

type (
// NatsEvent :nodoc:
NatsEvent struct {
ID int64 `json:"id"`
IDString string `json:"id_string"`
UserID int64 `json:"user_id"`
TenantID int64 `json:"tenant_id"`
Time string `json:"time"`
Subject string `json:"subject"` // empty on publish
}

Expand Down Expand Up @@ -93,6 +96,20 @@ func (n *NatsEvent) GetIDString() string {
return n.IDString
}

// GetTime :nodoc:
func (n *NatsEvent) GetTime() string {
if n == nil {
return ""
}
return n.Time
}

// IsTimeValid :nodoc:
func (n *NatsEvent) IsTimeValid() bool {
_, err := time.Parse(NatsEventTimeFormat, n.GetTime())
return err == nil
}

// NewNatsEventMessage :nodoc:
func NewNatsEventMessage() *NatsEventMessage {
return &NatsEventMessage{}
Expand Down Expand Up @@ -130,6 +147,16 @@ func (n *NatsEventMessage) WithEvent(e *NatsEvent) *NatsEventMessage {
return n
}

switch e.GetTime() {
case "":
e.Time = time.Now().Format(NatsEventTimeFormat)
default:
if !e.IsTimeValid() {
n.wrapError(errors.New("invalid time format"))
return n
}
}

n.NatsEvent = e
return n
}
Expand Down
31 changes: 27 additions & 4 deletions event_message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ferstream

import (
"encoding/json"
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -35,6 +36,15 @@ func TestNatsEventMessage_WithEvent(t *testing.T) {
},
ExpectedError: false,
},
{
Name: "success with time",
Given: &NatsEvent{
ID: 111,
UserID: 432,
Time: time.Now().Format(NatsEventTimeFormat),
},
ExpectedError: false,
},
{
Name: "empty id",
Given: &NatsEvent{
Expand All @@ -49,6 +59,15 @@ func TestNatsEventMessage_WithEvent(t *testing.T) {
},
ExpectedError: true,
},
{
Name: "invalid time format",
Given: &NatsEvent{
ID: 111,
UserID: 432,
Time: time.Now().Format(time.RFC822),
},
ExpectedError: true,
},
}

for _, test := range tests {
Expand Down Expand Up @@ -180,10 +199,12 @@ func TestNatsEventMessage_Build(t *testing.T) {
}

func TestNatsEventMessage_ToJSONString(t *testing.T) {
now := time.Now().Format(NatsEventTimeFormat)
t.Run("success", func(t *testing.T) {
natsEvent := &NatsEvent{
ID: 123,
UserID: 333,
Time: now,
}
body := []string{"test"}

Expand All @@ -193,14 +214,15 @@ func TestNatsEventMessage_ToJSONString(t *testing.T) {

parsed, err := msg.ToJSONString()
require.NoError(t, err)
expectedRes := "{\"NatsEvent\":{\"id\":123,\"id_string\":\"\",\"user_id\":333,\"tenant_id\":0,\"subject\":\"\"},\"body\":\"[\\\"test\\\"]\",\"old_body\":\"\",\"request\":null,\"error\":null}"
expectedRes := fmt.Sprintf("{\"NatsEvent\":{\"id\":123,\"id_string\":\"\",\"user_id\":333,\"tenant_id\":0,\"time\":\"%s\",\"subject\":\"\"},\"body\":\"[\\\"test\\\"]\",\"old_body\":\"\",\"request\":null,\"error\":null}", now)
assert.Equal(t, expectedRes, parsed)
})

t.Run("success with IDString", func(t *testing.T) {
natsEvent := &NatsEvent{
UserID: 333,
IDString: "630484ae00f0d71df588a0ab",
Time: now,
}
body := []string{"test"}

Expand All @@ -210,7 +232,7 @@ func TestNatsEventMessage_ToJSONString(t *testing.T) {

parsed, err := msg.ToJSONString()
require.NoError(t, err)
expectedRes := "{\"NatsEvent\":{\"id\":0,\"id_string\":\"630484ae00f0d71df588a0ab\",\"user_id\":333,\"tenant_id\":0,\"subject\":\"\"},\"body\":\"[\\\"test\\\"]\",\"old_body\":\"\",\"request\":null,\"error\":null}"
expectedRes := fmt.Sprintf("{\"NatsEvent\":{\"id\":0,\"id_string\":\"630484ae00f0d71df588a0ab\",\"user_id\":333,\"tenant_id\":0,\"time\":\"%s\",\"subject\":\"\"},\"body\":\"[\\\"test\\\"]\",\"old_body\":\"\",\"request\":null,\"error\":null}", now)
assert.Equal(t, expectedRes, parsed)
})
}
Expand All @@ -236,11 +258,12 @@ func TestNatsEventMessage_ToJSONByte(t *testing.T) {
}

func TestNatsEventMessage_ParseJSON(t *testing.T) {
json := "{\"NatsEvent\":{\"id\":123,\"user_id\":333,\"tenant_id\":0,\"subject\":\"\"},\"body\":\"[\\\"test\\\"]\",\"old_body\":\"\",\"request\":null,\"error\":null}"

now := time.Now().Format(NatsEventTimeFormat)
json := fmt.Sprintf("{\"NatsEvent\":{\"id\":123,\"user_id\":333,\"tenant_id\":0,\"time\":\"%s\",\"subject\":\"\"},\"body\":\"[\\\"test\\\"]\",\"old_body\":\"\",\"request\":null,\"error\":null}", now)
natsEvent := &NatsEvent{
ID: 123,
UserID: 333,
Time: now,
}
body := []string{"test"}
msg := NewNatsEventMessage().WithEvent(natsEvent).WithBody(body)
Expand Down
22 changes: 10 additions & 12 deletions jetstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package ferstream

import (
"encoding/json"
"log"
"os"
"testing"
"time"
Expand Down Expand Up @@ -45,8 +44,8 @@ func TestPublish(t *testing.T) {
}()

streamConf := &nats.StreamConfig{
Name: "STREAM_NAME",
Subjects: []string{"STREAM_EVENT.*"},
Name: "STREAM_NAME_PUBLISH_2",
Subjects: []string{"STREAM_NAME_PUBLISH_2.*"},
MaxAge: 1 * time.Hour,
Storage: nats.FileStorage,
}
Expand All @@ -56,7 +55,7 @@ func TestPublish(t *testing.T) {
t.Fatal(err)
}

_, err = n.Publish("STREAM_EVENT.TEST", []byte("test"))
_, err = n.Publish("STREAM_NAME_PUBLISH_2.TEST", []byte("test"))
if err != nil {
t.Fatal(err)
}
Expand All @@ -73,8 +72,8 @@ func TestQueueSubscribe(t *testing.T) {
}()

streamConf := &nats.StreamConfig{
Name: "STREAM_NAME_ANOTHER",
Subjects: []string{"STREAM_EVENT_ANOTHER.*"},
Name: "STREAM_NAME_QUEUE_SUBSCRIBE_2",
Subjects: []string{"STREAM_NAME_QUEUE_SUBSCRIBE_2.*"},
MaxAge: 1 * time.Hour,
Storage: nats.FileStorage,
}
Expand All @@ -85,7 +84,7 @@ func TestQueueSubscribe(t *testing.T) {
}

countMsg := 10
subject := "STREAM_EVENT_ANOTHER.TEST"
subject := "STREAM_NAME_QUEUE_SUBSCRIBE_2.TEST"
queue := "test_queue_group"

msgBytes, err := NewNatsEventMessage().WithEvent(&NatsEvent{
Expand Down Expand Up @@ -130,8 +129,8 @@ func TestQueueSubscribe(t *testing.T) {
}()

streamConf := &nats.StreamConfig{
Name: "STREAM_NAME_ANOTHER",
Subjects: []string{"STREAM_EVENT_ANOTHER.*"},
Name: "STREAM_NAME_AUDIT_2",
Subjects: []string{"STREAM_NAME_AUDIT_2.*"},
MaxAge: 1 * time.Hour,
Storage: nats.FileStorage,
}
Expand All @@ -142,7 +141,7 @@ func TestQueueSubscribe(t *testing.T) {
}

countMsg := 10
subject := "STREAM_EVENT_ANOTHER.TEST_NATS_EVENT_AUDIT_LOG_MESSAGE"
subject := "STREAM_NAME_AUDIT_2.TEST_NATS_EVENT_AUDIT_LOG_MESSAGE"
queue := "test_queue_group"

type User struct {
Expand Down Expand Up @@ -228,7 +227,7 @@ func TestAddStream(t *testing.T) {
_, err = n.AddStream(streamConf)

if err != nil {
log.Fatal(err)
t.Fatal(err)
}

// test update config
Expand Down Expand Up @@ -274,5 +273,4 @@ func TestConsumerInfo(t *testing.T) {
require.Error(t, err)
assert.Nil(t, consumerInfo2)
assert.Equal(t, nats.ErrConsumerNotFound, err)

}

0 comments on commit 1f275a7

Please sign in to comment.