From 3678abe16a1c6c28381bf03bb3b21fd88190c4d0 Mon Sep 17 00:00:00 2001 From: Marco Amador Date: Thu, 23 May 2024 19:37:38 +0100 Subject: [PATCH] Add an opt-in parameter to create the stream and subject if not exist Signed-off-by: Marco Amador --- internal/impl/nats/input_jetstream.go | 40 +++++++++++++++++++ .../docs/components/inputs/nats_jetstream.md | 19 +++++++++ 2 files changed, 59 insertions(+) diff --git a/internal/impl/nats/input_jetstream.go b/internal/impl/nats/input_jetstream.go index f5bc0e8259..23a81f2cdb 100644 --- a/internal/impl/nats/input_jetstream.go +++ b/internal/impl/nats/input_jetstream.go @@ -79,6 +79,14 @@ You can access these metadata fields using Description("The maximum number of outstanding acks to be allowed before consuming is halted."). Advanced(). Default(1024)). + Field(service.NewBoolField("create_if_not_exists"). + Description("Create the `stream` and `subject` if do not exist."). + Advanced(). + Default(false)). + Field(service.NewStringEnumField("storage_type", "memory", "file"). + Description("Storage type to use when the stream does not exist and is created when `create_if_not_exists` is set to true. Can be `memory` or `file` storage"). + Advanced(). + Default("memory")). Field(service.NewDurationField("nak_delay"). Description("An optional delay duration on redelivering the messages when negatively acknowledged."). Example("1m"). @@ -122,6 +130,8 @@ type jetStreamReader struct { nakDelay time.Duration nakDelayUntilHeader string maxAckPending int + createIfNotExists bool + storageType string log *service.Logger @@ -207,6 +217,17 @@ func newJetStreamReaderFromConfig(conf *service.ParsedConfig, mgr *service.Resou } } + if conf.Contains("create_if_not_exists") { + if j.createIfNotExists, err = conf.FieldBool("create_if_not_exists"); err != nil { + return nil, err + } + } + if conf.Contains("storage_type") { + if j.storageType, err = conf.FieldString("storage_type"); err != nil { + return nil, err + } + } + if conf.Contains("nak_delay") { if j.nakDelay, err = conf.FieldDuration("nak_delay"); err != nil { return nil, err @@ -308,6 +329,25 @@ func (j *jetStreamReader) Connect(ctx context.Context) (err error) { return err } + if err != nil { + var natsErr *nats.APIError + if j.createIfNotExists && errors.As(err, &natsErr) { + if natsErr.ErrorCode == nats.JSErrCodeStreamNotFound { + _, err = jCtx.AddStream(&nats.StreamConfig{ + Name: j.stream, + Subjects: []string{j.subject}, + Storage: func() nats.StorageType { + if j.storageType == "file" { + return nats.FileStorage + } + return nats.MemoryStorage + }(), + }) + } + } + return err + } + j.natsConn = natsConn j.natsSub = natsSub return nil diff --git a/website/docs/components/inputs/nats_jetstream.md b/website/docs/components/inputs/nats_jetstream.md index ef09817555..6e86447725 100644 --- a/website/docs/components/inputs/nats_jetstream.md +++ b/website/docs/components/inputs/nats_jetstream.md @@ -58,6 +58,8 @@ input: deliver: all ack_wait: 30s max_ack_pending: 1024 + create_if_not_exists: false + storage_type: memory nak_delay: 1m # No default (optional) nak_delay_until_header: nak_delay_until tls: @@ -243,6 +245,23 @@ The maximum number of outstanding acks to be allowed before consuming is halted. Type: `int` Default: `1024` +### `create_if_not_exists` + +Create the `stream` and `subject` if do not exist. + + +Type: `bool` +Default: `false` + +### `storage_type` + +Storage type to use when the stream does not exist and is created when `create_if_not_exists` is set to true. Can be `memory` or `file` storage + + +Type: `string` +Default: `"memory"` +Options: `memory`, `file`. + ### `nak_delay` An optional delay duration on redelivering the messages when negatively acknowledged.