Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
kianaza committed Jul 12, 2024
2 parents 16c1048 + 97facdb commit 3f7b729
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 7 deletions.
3 changes: 2 additions & 1 deletion internal/config/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ func Default() Config {
Level: "debug",
},
NATS: natsclient.Config{
NewStreamAllow: true,
AllExistingStreams: false,
NewStreamAllow: true,
Streams: []natsclient.Stream{{
Name: "test",
Subject: "test",
Expand Down
1 change: 1 addition & 0 deletions internal/natsclient/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package natsclient
import "time"

type Config struct {
AllExistingStreams bool `json:"all_existing_streams" koanf:"all_existing_streams"`
NewStreamAllow bool `json:"new_stream_allow" koanf:"new_stream_allow"`
Streams []Stream `json:"streams,omitempty" koanf:"streams"`
URL string `json:"url,omitempty" koanf:"url"`
Expand Down
26 changes: 20 additions & 6 deletions internal/natsclient/jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ var (
successfulSubscribe = "successful subscribe"
failedPublish = "failed publish"
successfulPublish = "successful publish"
subjectSuffix = "_blackbox_exporter"
)

type Message struct {
Expand Down Expand Up @@ -73,14 +74,24 @@ func (j *Jetstream) createJetstreamContext() {
}

func (j *Jetstream) UpdateOrCreateStream() {
for _, stream := range j.config.Streams {
if j.config.AllExistingStreams {
streamNames := j.jetstream.StreamNames()
for stream := range streamNames {
j.config.Streams = append(j.config.Streams, Stream{Name: stream})
}
}
for i, stream := range j.config.Streams {
if stream.Subject == "" {
j.config.Streams[i].Subject = stream.Name + subjectSuffix
}

info, err := j.jetstream.StreamInfo(stream.Name)
if err == nil {
j.updateStream(stream, info)
j.updateStream(j.config.Streams[i], info)
} else if err == nats.ErrStreamNotFound && j.config.NewStreamAllow {
j.createStream(stream)
j.createStream(j.config.Streams[i])
} else {
j.logger.Panic("could not add subject", zap.Error(err))
j.logger.Error("could not add subject", zap.String("stream", stream.Name), zap.Error(err))
}
}
}
Expand All @@ -93,7 +104,7 @@ func (j *Jetstream) updateStream(stream Stream, info *nats.StreamInfo) {
Subjects: subjects,
})
if err != nil {
j.logger.Panic("could not add subject to existing stream", zap.Error(err))
j.logger.Error("could not add subject to existing stream", zap.String("stream", stream.Name), zap.Error(err))
}
j.logger.Info("stream updated")
}
Expand All @@ -104,12 +115,15 @@ func (j *Jetstream) createStream(stream Stream) {
Subjects: []string{stream.Subject},
})
if err != nil {
j.logger.Panic("could not add stream", zap.Error(err))
j.logger.Error("could not add stream", zap.String("stream", stream.Name), zap.Error(err))
}
j.logger.Info("add new stream")
}

func (j *Jetstream) StartBlackboxTest() {
if j.config.Streams == nil {
j.logger.Panic("at least one stream is required.")
}
for _, stream := range j.config.Streams {
messageChannel := j.createSubscribe(stream.Subject)
go j.jetstreamPublish(stream.Subject, stream.Name)
Expand Down

0 comments on commit 3f7b729

Please sign in to comment.