diff --git a/.gitignore b/.gitignore index 830eecb..9e98632 100644 --- a/.gitignore +++ b/.gitignore @@ -15,4 +15,5 @@ vendor/ /config.yml .vscode -.idea/ \ No newline at end of file +.idea +.jetstream_log \ No newline at end of file diff --git a/event_message.go b/event_message.go index 310b7b0..c1704bc 100644 --- a/event_message.go +++ b/event_message.go @@ -105,7 +105,7 @@ func (n *NatsEvent) GetTime() string { } // IsTimeValid :nodoc: -func (n *NatsEvent) IsTimeValid() bool { +func (n *NatsEvent) isTimeValid() bool { _, err := time.Parse(NatsEventTimeFormat, n.GetTime()) return err == nil } @@ -151,7 +151,7 @@ func (n *NatsEventMessage) WithEvent(e *NatsEvent) *NatsEventMessage { case "": e.Time = time.Now().Format(NatsEventTimeFormat) default: - if !e.IsTimeValid() { + if !e.isTimeValid() { n.wrapError(errors.New("invalid time format")) return n } @@ -195,7 +195,7 @@ func (n *NatsEventMessage) wrapError(err error) { // ParseFromBytes :nodoc: func (n *NatsEventMessage) ParseFromBytes(data []byte) (err error) { - err = tapao.Unmarshal(data, &n, tapao.With(tapao.JSON)) + err = tapao.Unmarshal(data, &n, tapao.With(tapao.JSON), tapao.FallbackWith(tapao.MessagePack)) if err != nil { n.Error = errors.Wrap(n.Error, err.Error()) return err diff --git a/jetstream_test.go b/jetstream_test.go index 4bc6ef8..c3f6140 100644 --- a/jetstream_test.go +++ b/jetstream_test.go @@ -18,6 +18,7 @@ const defaultURL = "nats://127.0.0.1:4222" func RunBasicJetStreamServer() *server.Server { opts := natsserver.DefaultTestOptions opts.JetStream = true + opts.StoreDir = ".jetstream_log" return natsserver.RunServer(&opts) } @@ -36,55 +37,46 @@ func TestPublish(t *testing.T) { } n, err := NewNATSConnection(defaultURL, natsOpts...) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) defer func() { n.Close() }() streamConf := &nats.StreamConfig{ - Name: "STREAM_NAME_PUBLISH_2", - Subjects: []string{"STREAM_NAME_PUBLISH_2.*"}, - MaxAge: 1 * time.Hour, - Storage: nats.FileStorage, + Name: "STREAM_NAME_PUBLISH", + Subjects: []string{"STREAM_NAME_PUBLISH.*"}, + Storage: nats.FileStorage, + Retention: nats.WorkQueuePolicy, } _, err = n.AddStream(streamConf) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) - _, err = n.Publish("STREAM_NAME_PUBLISH_2.TEST", []byte("test")) - if err != nil { - t.Fatal(err) - } + _, err = n.Publish("STREAM_NAME_PUBLISH.TEST", []byte("test")) + require.NoError(t, err) } func TestQueueSubscribe(t *testing.T) { t.Run("queue subscribe NatsEventMessage", func(t *testing.T) { n, err := NewNATSConnection(defaultURL) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) + defer func() { n.Close() }() streamConf := &nats.StreamConfig{ - Name: "STREAM_NAME_QUEUE_SUBSCRIBE_2", - Subjects: []string{"STREAM_NAME_QUEUE_SUBSCRIBE_2.*"}, - MaxAge: 1 * time.Hour, - Storage: nats.FileStorage, + Name: "STREAM_NAME_QUEUE_SUBSCRIBE", + Subjects: []string{"STREAM_NAME_QUEUE_SUBSCRIBE.*"}, + Storage: nats.FileStorage, + Retention: nats.WorkQueuePolicy, } _, err = n.AddStream(streamConf) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) countMsg := 10 - subject := "STREAM_NAME_QUEUE_SUBSCRIBE_2.TEST" + subject := "STREAM_NAME_QUEUE_SUBSCRIBE.TEST" queue := "test_queue_group" msgBytes, err := NewNatsEventMessage().WithEvent(&NatsEvent{ @@ -92,22 +84,18 @@ func TestQueueSubscribe(t *testing.T) { UserID: int64(21), }).Build() - assert.NoError(t, err) + require.NoError(t, err) for i := 0; i < countMsg; i++ { _, err = n.Publish(subject, msgBytes) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) } receiverCh := make(chan *nats.Msg) sub, err := n.QueueSubscribe(subject, queue, func(msg *nats.Msg) { receiverCh <- msg }) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) for i := 0; i < countMsg; i++ { b := <-receiverCh @@ -121,27 +109,24 @@ func TestQueueSubscribe(t *testing.T) { t.Run("queue subscribe NatsEventAuditLogMessage", func(t *testing.T) { n, err := NewNATSConnection(defaultURL) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) + defer func() { n.Close() }() streamConf := &nats.StreamConfig{ - Name: "STREAM_NAME_AUDIT_2", - Subjects: []string{"STREAM_NAME_AUDIT_2.*"}, - MaxAge: 1 * time.Hour, - Storage: nats.FileStorage, + Name: "STREAM_NAME_AUDIT", + Subjects: []string{"STREAM_NAME_AUDIT.*"}, + Storage: nats.FileStorage, + Retention: nats.WorkQueuePolicy, } _, err = n.AddStream(streamConf) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) countMsg := 10 - subject := "STREAM_NAME_AUDIT_2.TEST_NATS_EVENT_AUDIT_LOG_MESSAGE" + subject := "STREAM_NAME_AUDIT.TEST_NATS_EVENT_AUDIT_LOG_MESSAGE" queue := "test_queue_group" type User struct { @@ -180,22 +165,19 @@ func TestQueueSubscribe(t *testing.T) { Error: nil, } msgBytes, err := msg.Build() - assert.NoError(t, err) + require.NoError(t, err) for i := 0; i < countMsg; i++ { _, err = n.Publish(subject, msgBytes) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) + } receiverCh := make(chan *nats.Msg) sub, err := n.QueueSubscribe(subject, queue, func(msg *nats.Msg) { receiverCh <- msg }) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) for i := 0; i < countMsg; i++ { b := <-receiverCh @@ -210,9 +192,8 @@ func TestQueueSubscribe(t *testing.T) { func TestAddStream(t *testing.T) { n, err := NewNATSConnection(defaultURL) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) + defer func() { n.Close() }() @@ -225,31 +206,23 @@ func TestAddStream(t *testing.T) { } _, err = n.AddStream(streamConf) - - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) // test update config updateConf := &nats.StreamConfig{ Name: "STREAM_NAMEXX", Subjects: []string{"STREAM_EVENTXX.*"}, - MaxAge: 2 * time.Hour, + MaxAge: 1 * time.Hour, Storage: nats.FileStorage, } _, err = n.AddStream(updateConf) - - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) } func TestConsumerInfo(t *testing.T) { n, err := NewNATSConnection(defaultURL) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) defer n.Close() updateConf := &nats.StreamConfig{