From 0de31e00cff0a3242f48653dad2ecc6b21dbd5fe Mon Sep 17 00:00:00 2001 From: "ibnu.ahsani" Date: Wed, 28 Sep 2022 17:50:00 +0700 Subject: [PATCH 1/3] feat: add time field in NatsEvent struct --- event_message.go | 9 +++++++++ event_message_test.go | 33 +++++++++++++++++++++++++++++++-- jetstream_test.go | 22 ++++++++++------------ 3 files changed, 50 insertions(+), 14 deletions(-) diff --git a/event_message.go b/event_message.go index f5e1fe6..f9f2d68 100644 --- a/event_message.go +++ b/event_message.go @@ -17,6 +17,7 @@ type ( IDString string `json:"id_string"` UserID int64 `json:"user_id"` TenantID int64 `json:"tenant_id"` + Time string `json:"time"` Subject string `json:"subject"` // empty on publish } @@ -93,6 +94,14 @@ func (n *NatsEvent) GetIDString() string { return n.IDString } +// GetTime :nodoc: +func (n *NatsEvent) GetTime() string { + if n == nil { + return "" + } + return n.Time +} + // NewNatsEventMessage :nodoc: func NewNatsEventMessage() *NatsEventMessage { return &NatsEventMessage{} diff --git a/event_message_test.go b/event_message_test.go index dae27d5..1e5c381 100644 --- a/event_message_test.go +++ b/event_message_test.go @@ -2,6 +2,7 @@ package ferstream import ( "encoding/json" + "fmt" "testing" "time" @@ -35,6 +36,15 @@ func TestNatsEventMessage_WithEvent(t *testing.T) { }, ExpectedError: false, }, + { + Name: "success with time", + Given: &NatsEvent{ + ID: 111, + UserID: 432, + Time: time.Now().Format(time.RFC3339Nano), + }, + ExpectedError: false, + }, { Name: "empty id", Given: &NatsEvent{ @@ -193,7 +203,7 @@ func TestNatsEventMessage_ToJSONString(t *testing.T) { parsed, err := msg.ToJSONString() require.NoError(t, err) - expectedRes := "{\"NatsEvent\":{\"id\":123,\"id_string\":\"\",\"user_id\":333,\"tenant_id\":0,\"subject\":\"\"},\"body\":\"[\\\"test\\\"]\",\"old_body\":\"\",\"request\":null,\"error\":null}" + expectedRes := "{\"NatsEvent\":{\"id\":123,\"id_string\":\"\",\"user_id\":333,\"tenant_id\":0,\"time\":\"\",\"subject\":\"\"},\"body\":\"[\\\"test\\\"]\",\"old_body\":\"\",\"request\":null,\"error\":null}" assert.Equal(t, expectedRes, parsed) }) @@ -210,7 +220,26 @@ func TestNatsEventMessage_ToJSONString(t *testing.T) { parsed, err := msg.ToJSONString() require.NoError(t, err) - expectedRes := "{\"NatsEvent\":{\"id\":0,\"id_string\":\"630484ae00f0d71df588a0ab\",\"user_id\":333,\"tenant_id\":0,\"subject\":\"\"},\"body\":\"[\\\"test\\\"]\",\"old_body\":\"\",\"request\":null,\"error\":null}" + expectedRes := "{\"NatsEvent\":{\"id\":0,\"id_string\":\"630484ae00f0d71df588a0ab\",\"user_id\":333,\"tenant_id\":0,\"time\":\"\",\"subject\":\"\"},\"body\":\"[\\\"test\\\"]\",\"old_body\":\"\",\"request\":null,\"error\":null}" + assert.Equal(t, expectedRes, parsed) + }) + + t.Run("success with time", func(t *testing.T) { + now := time.Now().Format(time.RFC3339Nano) + natsEvent := &NatsEvent{ + ID: 123, + UserID: 333, + Time: now, + } + body := []string{"test"} + + msg := NewNatsEventMessage(). + WithEvent(natsEvent). + WithBody(body) + + parsed, err := msg.ToJSONString() + require.NoError(t, err) + expectedRes := fmt.Sprintf("{\"NatsEvent\":{\"id\":123,\"id_string\":\"\",\"user_id\":333,\"tenant_id\":0,\"time\":\"%s\",\"subject\":\"\"},\"body\":\"[\\\"test\\\"]\",\"old_body\":\"\",\"request\":null,\"error\":null}", now) assert.Equal(t, expectedRes, parsed) }) } diff --git a/jetstream_test.go b/jetstream_test.go index ff20445..07383c5 100644 --- a/jetstream_test.go +++ b/jetstream_test.go @@ -2,7 +2,6 @@ package ferstream import ( "encoding/json" - "log" "os" "testing" "time" @@ -45,8 +44,8 @@ func TestPublish(t *testing.T) { }() streamConf := &nats.StreamConfig{ - Name: "STREAM_NAME", - Subjects: []string{"STREAM_EVENT.*"}, + Name: "STREAM_NAME_PUBLISH", + Subjects: []string{"STREAM_NAME_PUBLISH.*"}, MaxAge: 1 * time.Hour, Storage: nats.FileStorage, } @@ -56,7 +55,7 @@ func TestPublish(t *testing.T) { t.Fatal(err) } - _, err = n.Publish("STREAM_EVENT.TEST", []byte("test")) + _, err = n.Publish("STREAM_NAME_PUBLISH.TEST", []byte("test")) if err != nil { t.Fatal(err) } @@ -73,8 +72,8 @@ func TestQueueSubscribe(t *testing.T) { }() streamConf := &nats.StreamConfig{ - Name: "STREAM_NAME_ANOTHER", - Subjects: []string{"STREAM_EVENT_ANOTHER.*"}, + Name: "STREAM_NAME_QUEUE_SUBSCRIBE", + Subjects: []string{"STREAM_NAME_QUEUE_SUBSCRIBE.*"}, MaxAge: 1 * time.Hour, Storage: nats.FileStorage, } @@ -85,7 +84,7 @@ func TestQueueSubscribe(t *testing.T) { } countMsg := 10 - subject := "STREAM_EVENT_ANOTHER.TEST" + subject := "STREAM_NAME_QUEUE_SUBSCRIBE.TEST" queue := "test_queue_group" msgBytes, err := NewNatsEventMessage().WithEvent(&NatsEvent{ @@ -130,8 +129,8 @@ func TestQueueSubscribe(t *testing.T) { }() streamConf := &nats.StreamConfig{ - Name: "STREAM_NAME_ANOTHER", - Subjects: []string{"STREAM_EVENT_ANOTHER.*"}, + Name: "STREAM_NAME_AUDIT", + Subjects: []string{"STREAM_NAME_AUDIT.*"}, MaxAge: 1 * time.Hour, Storage: nats.FileStorage, } @@ -142,7 +141,7 @@ func TestQueueSubscribe(t *testing.T) { } countMsg := 10 - subject := "STREAM_EVENT_ANOTHER.TEST_NATS_EVENT_AUDIT_LOG_MESSAGE" + subject := "STREAM_NAME_AUDIT.TEST_NATS_EVENT_AUDIT_LOG_MESSAGE" queue := "test_queue_group" type User struct { @@ -228,7 +227,7 @@ func TestAddStream(t *testing.T) { _, err = n.AddStream(streamConf) if err != nil { - log.Fatal(err) + t.Fatal(err) } // test update config @@ -274,5 +273,4 @@ func TestConsumerInfo(t *testing.T) { require.Error(t, err) assert.Nil(t, consumerInfo2) assert.Equal(t, nats.ErrConsumerNotFound, err) - } From b1fdf2b111a0a29b5c2a284a4f9b0d89b8657597 Mon Sep 17 00:00:00 2001 From: "ibnu.ahsani" Date: Thu, 29 Sep 2022 10:12:05 +0700 Subject: [PATCH 2/3] feat: change time format to rfc3999 & add time init and validator --- event_message.go | 22 ++++++++++++++++++++-- event_message_test.go | 42 ++++++++++++++++++------------------------ jetstream_test.go | 18 +++++++++--------- 3 files changed, 47 insertions(+), 35 deletions(-) diff --git a/event_message.go b/event_message.go index f9f2d68..022e606 100644 --- a/event_message.go +++ b/event_message.go @@ -4,12 +4,14 @@ import ( "time" "github.com/kumparan/go-utils" - "google.golang.org/protobuf/proto" - "github.com/kumparan/tapao" "github.com/pkg/errors" + "google.golang.org/protobuf/proto" ) +// NatsEventTimeFormat time format for NatsEvent 'time' field +const NatsEventTimeFormat = time.RFC3339 + type ( // NatsEvent :nodoc: NatsEvent struct { @@ -102,6 +104,12 @@ func (n *NatsEvent) GetTime() string { return n.Time } +// IsTimeValid :nodoc: +func (n *NatsEvent) IsTimeValid() bool { + _, err := time.Parse(NatsEventTimeFormat, n.GetTime()) + return err == nil +} + // NewNatsEventMessage :nodoc: func NewNatsEventMessage() *NatsEventMessage { return &NatsEventMessage{} @@ -139,6 +147,16 @@ func (n *NatsEventMessage) WithEvent(e *NatsEvent) *NatsEventMessage { return n } + switch e.GetTime() { + case "": + e.Time = time.Now().Format(NatsEventTimeFormat) + default: + if !e.IsTimeValid() { + n.wrapError(errors.New("invalid time format")) + return n + } + } + n.NatsEvent = e return n } diff --git a/event_message_test.go b/event_message_test.go index 1e5c381..b0d3ddb 100644 --- a/event_message_test.go +++ b/event_message_test.go @@ -41,7 +41,7 @@ func TestNatsEventMessage_WithEvent(t *testing.T) { Given: &NatsEvent{ ID: 111, UserID: 432, - Time: time.Now().Format(time.RFC3339Nano), + Time: time.Now().Format(NatsEventTimeFormat), }, ExpectedError: false, }, @@ -59,6 +59,15 @@ func TestNatsEventMessage_WithEvent(t *testing.T) { }, ExpectedError: true, }, + { + Name: "invalid time format", + Given: &NatsEvent{ + ID: 111, + UserID: 432, + Time: time.Now().Format(time.RFC822), + }, + ExpectedError: true, + }, } for _, test := range tests { @@ -190,10 +199,12 @@ func TestNatsEventMessage_Build(t *testing.T) { } func TestNatsEventMessage_ToJSONString(t *testing.T) { + now := time.Now().Format(NatsEventTimeFormat) t.Run("success", func(t *testing.T) { natsEvent := &NatsEvent{ ID: 123, UserID: 333, + Time: now, } body := []string{"test"} @@ -203,7 +214,7 @@ func TestNatsEventMessage_ToJSONString(t *testing.T) { parsed, err := msg.ToJSONString() require.NoError(t, err) - expectedRes := "{\"NatsEvent\":{\"id\":123,\"id_string\":\"\",\"user_id\":333,\"tenant_id\":0,\"time\":\"\",\"subject\":\"\"},\"body\":\"[\\\"test\\\"]\",\"old_body\":\"\",\"request\":null,\"error\":null}" + expectedRes := fmt.Sprintf("{\"NatsEvent\":{\"id\":123,\"id_string\":\"\",\"user_id\":333,\"tenant_id\":0,\"time\":\"%s\",\"subject\":\"\"},\"body\":\"[\\\"test\\\"]\",\"old_body\":\"\",\"request\":null,\"error\":null}", now) assert.Equal(t, expectedRes, parsed) }) @@ -211,6 +222,7 @@ func TestNatsEventMessage_ToJSONString(t *testing.T) { natsEvent := &NatsEvent{ UserID: 333, IDString: "630484ae00f0d71df588a0ab", + Time: now, } body := []string{"test"} @@ -220,26 +232,7 @@ func TestNatsEventMessage_ToJSONString(t *testing.T) { parsed, err := msg.ToJSONString() require.NoError(t, err) - expectedRes := "{\"NatsEvent\":{\"id\":0,\"id_string\":\"630484ae00f0d71df588a0ab\",\"user_id\":333,\"tenant_id\":0,\"time\":\"\",\"subject\":\"\"},\"body\":\"[\\\"test\\\"]\",\"old_body\":\"\",\"request\":null,\"error\":null}" - assert.Equal(t, expectedRes, parsed) - }) - - t.Run("success with time", func(t *testing.T) { - now := time.Now().Format(time.RFC3339Nano) - natsEvent := &NatsEvent{ - ID: 123, - UserID: 333, - Time: now, - } - body := []string{"test"} - - msg := NewNatsEventMessage(). - WithEvent(natsEvent). - WithBody(body) - - parsed, err := msg.ToJSONString() - require.NoError(t, err) - expectedRes := fmt.Sprintf("{\"NatsEvent\":{\"id\":123,\"id_string\":\"\",\"user_id\":333,\"tenant_id\":0,\"time\":\"%s\",\"subject\":\"\"},\"body\":\"[\\\"test\\\"]\",\"old_body\":\"\",\"request\":null,\"error\":null}", now) + expectedRes := fmt.Sprintf("{\"NatsEvent\":{\"id\":0,\"id_string\":\"630484ae00f0d71df588a0ab\",\"user_id\":333,\"tenant_id\":0,\"time\":\"%s\",\"subject\":\"\"},\"body\":\"[\\\"test\\\"]\",\"old_body\":\"\",\"request\":null,\"error\":null}", now) assert.Equal(t, expectedRes, parsed) }) } @@ -265,11 +258,12 @@ func TestNatsEventMessage_ToJSONByte(t *testing.T) { } func TestNatsEventMessage_ParseJSON(t *testing.T) { - json := "{\"NatsEvent\":{\"id\":123,\"user_id\":333,\"tenant_id\":0,\"subject\":\"\"},\"body\":\"[\\\"test\\\"]\",\"old_body\":\"\",\"request\":null,\"error\":null}" - + now := time.Now().Format(NatsEventTimeFormat) + json := fmt.Sprintf("{\"NatsEvent\":{\"id\":123,\"user_id\":333,\"tenant_id\":0,\"time\":\"%s\",\"subject\":\"\"},\"body\":\"[\\\"test\\\"]\",\"old_body\":\"\",\"request\":null,\"error\":null}", now) natsEvent := &NatsEvent{ ID: 123, UserID: 333, + Time: now, } body := []string{"test"} msg := NewNatsEventMessage().WithEvent(natsEvent).WithBody(body) diff --git a/jetstream_test.go b/jetstream_test.go index 07383c5..4bc6ef8 100644 --- a/jetstream_test.go +++ b/jetstream_test.go @@ -44,8 +44,8 @@ func TestPublish(t *testing.T) { }() streamConf := &nats.StreamConfig{ - Name: "STREAM_NAME_PUBLISH", - Subjects: []string{"STREAM_NAME_PUBLISH.*"}, + Name: "STREAM_NAME_PUBLISH_2", + Subjects: []string{"STREAM_NAME_PUBLISH_2.*"}, MaxAge: 1 * time.Hour, Storage: nats.FileStorage, } @@ -55,7 +55,7 @@ func TestPublish(t *testing.T) { t.Fatal(err) } - _, err = n.Publish("STREAM_NAME_PUBLISH.TEST", []byte("test")) + _, err = n.Publish("STREAM_NAME_PUBLISH_2.TEST", []byte("test")) if err != nil { t.Fatal(err) } @@ -72,8 +72,8 @@ func TestQueueSubscribe(t *testing.T) { }() streamConf := &nats.StreamConfig{ - Name: "STREAM_NAME_QUEUE_SUBSCRIBE", - Subjects: []string{"STREAM_NAME_QUEUE_SUBSCRIBE.*"}, + Name: "STREAM_NAME_QUEUE_SUBSCRIBE_2", + Subjects: []string{"STREAM_NAME_QUEUE_SUBSCRIBE_2.*"}, MaxAge: 1 * time.Hour, Storage: nats.FileStorage, } @@ -84,7 +84,7 @@ func TestQueueSubscribe(t *testing.T) { } countMsg := 10 - subject := "STREAM_NAME_QUEUE_SUBSCRIBE.TEST" + subject := "STREAM_NAME_QUEUE_SUBSCRIBE_2.TEST" queue := "test_queue_group" msgBytes, err := NewNatsEventMessage().WithEvent(&NatsEvent{ @@ -129,8 +129,8 @@ func TestQueueSubscribe(t *testing.T) { }() streamConf := &nats.StreamConfig{ - Name: "STREAM_NAME_AUDIT", - Subjects: []string{"STREAM_NAME_AUDIT.*"}, + Name: "STREAM_NAME_AUDIT_2", + Subjects: []string{"STREAM_NAME_AUDIT_2.*"}, MaxAge: 1 * time.Hour, Storage: nats.FileStorage, } @@ -141,7 +141,7 @@ func TestQueueSubscribe(t *testing.T) { } countMsg := 10 - subject := "STREAM_NAME_AUDIT.TEST_NATS_EVENT_AUDIT_LOG_MESSAGE" + subject := "STREAM_NAME_AUDIT_2.TEST_NATS_EVENT_AUDIT_LOG_MESSAGE" queue := "test_queue_group" type User struct { From d616dd2641d488fa03c7a690516bb0a5ee3b34b3 Mon Sep 17 00:00:00 2001 From: "ibnu.ahsani" Date: Thu, 29 Sep 2022 11:06:14 +0700 Subject: [PATCH 3/3] feat: change time format to RFC3339Nano --- event_message.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/event_message.go b/event_message.go index 022e606..310b7b0 100644 --- a/event_message.go +++ b/event_message.go @@ -10,7 +10,7 @@ import ( ) // NatsEventTimeFormat time format for NatsEvent 'time' field -const NatsEventTimeFormat = time.RFC3339 +const NatsEventTimeFormat = time.RFC3339Nano type ( // NatsEvent :nodoc: