From ae96c3d8fbe030b0bc05d2188917bd4ec324a1f7 Mon Sep 17 00:00:00 2001 From: kianaza Date: Thu, 11 Jul 2024 02:59:02 +0330 Subject: [PATCH 1/4] feat: enable publish and consume from all existing streams --- internal/config/default.go | 3 ++- internal/natsclient/config.go | 1 + internal/natsclient/jetstream.go | 17 ++++++++++++++--- 3 files changed, 17 insertions(+), 4 deletions(-) diff --git a/internal/config/default.go b/internal/config/default.go index cd1ae32..625470e 100644 --- a/internal/config/default.go +++ b/internal/config/default.go @@ -16,7 +16,8 @@ func Default() Config { Level: "debug", }, NATS: natsclient.Config{ - NewStreamAllow: true, + AllExistingStreams: true, + NewStreamAllow: true, Streams: []natsclient.Stream{{ Name: "test", Subject: "test", diff --git a/internal/natsclient/config.go b/internal/natsclient/config.go index 9e69f84..58dedd8 100644 --- a/internal/natsclient/config.go +++ b/internal/natsclient/config.go @@ -3,6 +3,7 @@ package natsclient import "time" type Config struct { + AllExistingStreams bool `json:"all_existing_streams" koanf:"all_existing_streams"` NewStreamAllow bool `json:"new_stream_allow" koanf:"new_stream_allow"` Streams []Stream `json:"stream,omitempty" koanf:"stream"` URL string `json:"url,omitempty" koanf:"url"` diff --git a/internal/natsclient/jetstream.go b/internal/natsclient/jetstream.go index f1b0b96..d348a26 100644 --- a/internal/natsclient/jetstream.go +++ b/internal/natsclient/jetstream.go @@ -13,6 +13,7 @@ var ( successfulSubscribe = "successful subscribe" failedPublish = "failed publish" successfulPublish = "successful publish" + subject_suffix = "_blackbox_exporter" ) type Message struct { @@ -73,12 +74,22 @@ func (j *Jetstream) createJetstreamContext() { } func (j *Jetstream) UpdateOrCreateStream() { - for _, stream := range j.config.Streams { + if j.config.AllExistingStreams { + streamNames := j.jetstream.StreamNames() + for stream := range streamNames { + j.config.Streams = append(j.config.Streams, Stream{Name: stream}) + } + } + for i, stream := range j.config.Streams { + if stream.Subject == "" { + j.config.Streams[i].Subject = stream.Name + subject_suffix + } + info, err := j.jetstream.StreamInfo(stream.Name) if err == nil { - j.updateStream(stream, info) + j.updateStream(j.config.Streams[i], info) } else if err == nats.ErrStreamNotFound && j.config.NewStreamAllow { - j.createStream(stream) + j.createStream(j.config.Streams[i]) } else { j.logger.Panic("could not add subject", zap.Error(err)) } From a02bb715bdb2c66ae073cd1dbb052168d2766a81 Mon Sep 17 00:00:00 2001 From: kianaza Date: Thu, 11 Jul 2024 03:04:33 +0330 Subject: [PATCH 2/4] Fix: fix linter problem --- internal/natsclient/jetstream.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/natsclient/jetstream.go b/internal/natsclient/jetstream.go index d348a26..b21b626 100644 --- a/internal/natsclient/jetstream.go +++ b/internal/natsclient/jetstream.go @@ -13,7 +13,7 @@ var ( successfulSubscribe = "successful subscribe" failedPublish = "failed publish" successfulPublish = "successful publish" - subject_suffix = "_blackbox_exporter" + subjectSuffix = "_blackbox_exporter" ) type Message struct { @@ -82,7 +82,7 @@ func (j *Jetstream) UpdateOrCreateStream() { } for i, stream := range j.config.Streams { if stream.Subject == "" { - j.config.Streams[i].Subject = stream.Name + subject_suffix + j.config.Streams[i].Subject = stream.Name + subjectSuffix } info, err := j.jetstream.StreamInfo(stream.Name) From ef0ec1104307b0d8ce2711a783a4940f74b5466b Mon Sep 17 00:00:00 2001 From: kianaza Date: Thu, 11 Jul 2024 03:17:38 +0330 Subject: [PATCH 3/4] fix: error management --- internal/natsclient/jetstream.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/internal/natsclient/jetstream.go b/internal/natsclient/jetstream.go index b21b626..994cf4a 100644 --- a/internal/natsclient/jetstream.go +++ b/internal/natsclient/jetstream.go @@ -91,7 +91,7 @@ func (j *Jetstream) UpdateOrCreateStream() { } else if err == nats.ErrStreamNotFound && j.config.NewStreamAllow { j.createStream(j.config.Streams[i]) } else { - j.logger.Panic("could not add subject", zap.Error(err)) + j.logger.Error("could not add subject", zap.String("stream", stream.Name), zap.Error(err)) } } } @@ -104,7 +104,7 @@ func (j *Jetstream) updateStream(stream Stream, info *nats.StreamInfo) { Subjects: subjects, }) if err != nil { - j.logger.Panic("could not add subject to existing stream", zap.Error(err)) + j.logger.Error("could not add subject to existing stream", zap.String("stream", stream.Name), zap.Error(err)) } j.logger.Info("stream updated") } @@ -115,12 +115,15 @@ func (j *Jetstream) createStream(stream Stream) { Subjects: []string{stream.Subject}, }) if err != nil { - j.logger.Panic("could not add stream", zap.Error(err)) + j.logger.Error("could not add stream", zap.String("stream", stream.Name), zap.Error(err)) } j.logger.Info("add new stream") } func (j *Jetstream) StartBlackboxTest() { + if j.config.Streams == nil { + j.logger.Panic("at least one stream is required.") + } for _, stream := range j.config.Streams { messageChannel := j.createSubscribe(stream.Subject) go j.jetstreamPublish(stream.Subject, stream.Name) From f18c8e9ed3f30d1b9b7135d7ab2f87175c5f7aa8 Mon Sep 17 00:00:00 2001 From: kianaza Date: Fri, 12 Jul 2024 14:28:50 +0330 Subject: [PATCH 4/4] fix: change AllExistingStreams default value --- internal/config/default.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/config/default.go b/internal/config/default.go index 625470e..267bebc 100644 --- a/internal/config/default.go +++ b/internal/config/default.go @@ -16,7 +16,7 @@ func Default() Config { Level: "debug", }, NATS: natsclient.Config{ - AllExistingStreams: true, + AllExistingStreams: false, NewStreamAllow: true, Streams: []natsclient.Stream{{ Name: "test",