diff --git a/event_message.go b/event_message.go index f5e1fe6..310b7b0 100644 --- a/event_message.go +++ b/event_message.go @@ -4,12 +4,14 @@ import ( "time" "github.com/kumparan/go-utils" - "google.golang.org/protobuf/proto" - "github.com/kumparan/tapao" "github.com/pkg/errors" + "google.golang.org/protobuf/proto" ) +// NatsEventTimeFormat time format for NatsEvent 'time' field +const NatsEventTimeFormat = time.RFC3339Nano + type ( // NatsEvent :nodoc: NatsEvent struct { @@ -17,6 +19,7 @@ type ( IDString string `json:"id_string"` UserID int64 `json:"user_id"` TenantID int64 `json:"tenant_id"` + Time string `json:"time"` Subject string `json:"subject"` // empty on publish } @@ -93,6 +96,20 @@ func (n *NatsEvent) GetIDString() string { return n.IDString } +// GetTime :nodoc: +func (n *NatsEvent) GetTime() string { + if n == nil { + return "" + } + return n.Time +} + +// IsTimeValid :nodoc: +func (n *NatsEvent) IsTimeValid() bool { + _, err := time.Parse(NatsEventTimeFormat, n.GetTime()) + return err == nil +} + // NewNatsEventMessage :nodoc: func NewNatsEventMessage() *NatsEventMessage { return &NatsEventMessage{} @@ -130,6 +147,16 @@ func (n *NatsEventMessage) WithEvent(e *NatsEvent) *NatsEventMessage { return n } + switch e.GetTime() { + case "": + e.Time = time.Now().Format(NatsEventTimeFormat) + default: + if !e.IsTimeValid() { + n.wrapError(errors.New("invalid time format")) + return n + } + } + n.NatsEvent = e return n } diff --git a/event_message_test.go b/event_message_test.go index dae27d5..b0d3ddb 100644 --- a/event_message_test.go +++ b/event_message_test.go @@ -2,6 +2,7 @@ package ferstream import ( "encoding/json" + "fmt" "testing" "time" @@ -35,6 +36,15 @@ func TestNatsEventMessage_WithEvent(t *testing.T) { }, ExpectedError: false, }, + { + Name: "success with time", + Given: &NatsEvent{ + ID: 111, + UserID: 432, + Time: time.Now().Format(NatsEventTimeFormat), + }, + ExpectedError: false, + }, { Name: "empty id", Given: &NatsEvent{ @@ -49,6 +59,15 @@ func TestNatsEventMessage_WithEvent(t *testing.T) { }, ExpectedError: true, }, + { + Name: "invalid time format", + Given: &NatsEvent{ + ID: 111, + UserID: 432, + Time: time.Now().Format(time.RFC822), + }, + ExpectedError: true, + }, } for _, test := range tests { @@ -180,10 +199,12 @@ func TestNatsEventMessage_Build(t *testing.T) { } func TestNatsEventMessage_ToJSONString(t *testing.T) { + now := time.Now().Format(NatsEventTimeFormat) t.Run("success", func(t *testing.T) { natsEvent := &NatsEvent{ ID: 123, UserID: 333, + Time: now, } body := []string{"test"} @@ -193,7 +214,7 @@ func TestNatsEventMessage_ToJSONString(t *testing.T) { parsed, err := msg.ToJSONString() require.NoError(t, err) - expectedRes := "{\"NatsEvent\":{\"id\":123,\"id_string\":\"\",\"user_id\":333,\"tenant_id\":0,\"subject\":\"\"},\"body\":\"[\\\"test\\\"]\",\"old_body\":\"\",\"request\":null,\"error\":null}" + expectedRes := fmt.Sprintf("{\"NatsEvent\":{\"id\":123,\"id_string\":\"\",\"user_id\":333,\"tenant_id\":0,\"time\":\"%s\",\"subject\":\"\"},\"body\":\"[\\\"test\\\"]\",\"old_body\":\"\",\"request\":null,\"error\":null}", now) assert.Equal(t, expectedRes, parsed) }) @@ -201,6 +222,7 @@ func TestNatsEventMessage_ToJSONString(t *testing.T) { natsEvent := &NatsEvent{ UserID: 333, IDString: "630484ae00f0d71df588a0ab", + Time: now, } body := []string{"test"} @@ -210,7 +232,7 @@ func TestNatsEventMessage_ToJSONString(t *testing.T) { parsed, err := msg.ToJSONString() require.NoError(t, err) - expectedRes := "{\"NatsEvent\":{\"id\":0,\"id_string\":\"630484ae00f0d71df588a0ab\",\"user_id\":333,\"tenant_id\":0,\"subject\":\"\"},\"body\":\"[\\\"test\\\"]\",\"old_body\":\"\",\"request\":null,\"error\":null}" + expectedRes := fmt.Sprintf("{\"NatsEvent\":{\"id\":0,\"id_string\":\"630484ae00f0d71df588a0ab\",\"user_id\":333,\"tenant_id\":0,\"time\":\"%s\",\"subject\":\"\"},\"body\":\"[\\\"test\\\"]\",\"old_body\":\"\",\"request\":null,\"error\":null}", now) assert.Equal(t, expectedRes, parsed) }) } @@ -236,11 +258,12 @@ func TestNatsEventMessage_ToJSONByte(t *testing.T) { } func TestNatsEventMessage_ParseJSON(t *testing.T) { - json := "{\"NatsEvent\":{\"id\":123,\"user_id\":333,\"tenant_id\":0,\"subject\":\"\"},\"body\":\"[\\\"test\\\"]\",\"old_body\":\"\",\"request\":null,\"error\":null}" - + now := time.Now().Format(NatsEventTimeFormat) + json := fmt.Sprintf("{\"NatsEvent\":{\"id\":123,\"user_id\":333,\"tenant_id\":0,\"time\":\"%s\",\"subject\":\"\"},\"body\":\"[\\\"test\\\"]\",\"old_body\":\"\",\"request\":null,\"error\":null}", now) natsEvent := &NatsEvent{ ID: 123, UserID: 333, + Time: now, } body := []string{"test"} msg := NewNatsEventMessage().WithEvent(natsEvent).WithBody(body) diff --git a/jetstream_test.go b/jetstream_test.go index ff20445..4bc6ef8 100644 --- a/jetstream_test.go +++ b/jetstream_test.go @@ -2,7 +2,6 @@ package ferstream import ( "encoding/json" - "log" "os" "testing" "time" @@ -45,8 +44,8 @@ func TestPublish(t *testing.T) { }() streamConf := &nats.StreamConfig{ - Name: "STREAM_NAME", - Subjects: []string{"STREAM_EVENT.*"}, + Name: "STREAM_NAME_PUBLISH_2", + Subjects: []string{"STREAM_NAME_PUBLISH_2.*"}, MaxAge: 1 * time.Hour, Storage: nats.FileStorage, } @@ -56,7 +55,7 @@ func TestPublish(t *testing.T) { t.Fatal(err) } - _, err = n.Publish("STREAM_EVENT.TEST", []byte("test")) + _, err = n.Publish("STREAM_NAME_PUBLISH_2.TEST", []byte("test")) if err != nil { t.Fatal(err) } @@ -73,8 +72,8 @@ func TestQueueSubscribe(t *testing.T) { }() streamConf := &nats.StreamConfig{ - Name: "STREAM_NAME_ANOTHER", - Subjects: []string{"STREAM_EVENT_ANOTHER.*"}, + Name: "STREAM_NAME_QUEUE_SUBSCRIBE_2", + Subjects: []string{"STREAM_NAME_QUEUE_SUBSCRIBE_2.*"}, MaxAge: 1 * time.Hour, Storage: nats.FileStorage, } @@ -85,7 +84,7 @@ func TestQueueSubscribe(t *testing.T) { } countMsg := 10 - subject := "STREAM_EVENT_ANOTHER.TEST" + subject := "STREAM_NAME_QUEUE_SUBSCRIBE_2.TEST" queue := "test_queue_group" msgBytes, err := NewNatsEventMessage().WithEvent(&NatsEvent{ @@ -130,8 +129,8 @@ func TestQueueSubscribe(t *testing.T) { }() streamConf := &nats.StreamConfig{ - Name: "STREAM_NAME_ANOTHER", - Subjects: []string{"STREAM_EVENT_ANOTHER.*"}, + Name: "STREAM_NAME_AUDIT_2", + Subjects: []string{"STREAM_NAME_AUDIT_2.*"}, MaxAge: 1 * time.Hour, Storage: nats.FileStorage, } @@ -142,7 +141,7 @@ func TestQueueSubscribe(t *testing.T) { } countMsg := 10 - subject := "STREAM_EVENT_ANOTHER.TEST_NATS_EVENT_AUDIT_LOG_MESSAGE" + subject := "STREAM_NAME_AUDIT_2.TEST_NATS_EVENT_AUDIT_LOG_MESSAGE" queue := "test_queue_group" type User struct { @@ -228,7 +227,7 @@ func TestAddStream(t *testing.T) { _, err = n.AddStream(streamConf) if err != nil { - log.Fatal(err) + t.Fatal(err) } // test update config @@ -274,5 +273,4 @@ func TestConsumerInfo(t *testing.T) { require.Error(t, err) assert.Nil(t, consumerInfo2) assert.Equal(t, nats.ErrConsumerNotFound, err) - }