From 59e2a53c438b386fb4bde7a1abc56f0113f31d0f Mon Sep 17 00:00:00 2001 From: Marco Amador Date: Tue, 4 Jun 2024 13:06:35 +0100 Subject: [PATCH 1/3] Add a nak_delay --- internal/impl/nats/input_jetstream.go | 63 +++++++++++++++++++-------- 1 file changed, 46 insertions(+), 17 deletions(-) diff --git a/internal/impl/nats/input_jetstream.go b/internal/impl/nats/input_jetstream.go index 4426a747a..2af929091 100644 --- a/internal/impl/nats/input_jetstream.go +++ b/internal/impl/nats/input_jetstream.go @@ -96,6 +96,15 @@ xref:configuration:interpolation.adoc#bloblang-queries[function interpolation]. Description("The maximum number of outstanding acks to be allowed before consuming is halted."). Advanced(). Default(1024)). + Field(service.NewDurationField("nak_delay"). + Description("An optional delay duration on redelivering the messages when negatively acknowledged."). + Example("1m"). + Advanced(). + Optional()). + Field(service.NewStringField("nak_delay_until_header"). + Description("An optional header name on which will come a unix epoch timestamp in seconds until when the message delivery should be delayed. By default is `nak_delay_until`"). + Advanced(). + Default("nak_delay_until")). Fields(connectionTailFields()...). Field(inputTracingDocs()) } @@ -118,16 +127,18 @@ func init() { //------------------------------------------------------------------------------ type jetStreamReader struct { - connDetails connectionDetails - deliverOpt nats.SubOpt - subject string - queue string - stream string - bind bool - pull bool - durable string - ackWait time.Duration - maxAckPending int + connDetails connectionDetails + deliverOpt nats.SubOpt + subject string + queue string + stream string + bind bool + pull bool + durable string + ackWait time.Duration + nakDelay time.Duration + nakDelayUntilHeader string + maxAckPending int log *service.Logger @@ -216,6 +227,15 @@ func newJetStreamReaderFromConfig(conf *service.ParsedConfig, mgr *service.Resou } } + if conf.Contains("nak_delay") { + if j.nakDelay, err = conf.FieldDuration("nak_delay"); err != nil { + return nil, err + } + } + if j.nakDelayUntilHeader, err = conf.FieldString("nak_delay_until_header"); err != nil { + return nil, err + } + if j.maxAckPending, err = conf.FieldInt("max_ack_pending"); err != nil { return nil, err } @@ -334,14 +354,13 @@ func (j *jetStreamReader) Read(ctx context.Context) (*service.Message, service.A if natsSub == nil { return nil, nil, service.ErrNotConnected } - if !j.pull { nmsg, err := natsSub.NextMsgWithContext(ctx) if err != nil { // TODO: Any errors need capturing here to signal a lost connection? return nil, nil, err } - return convertMessage(nmsg) + return j.convertMessage(nmsg) } for { @@ -362,7 +381,7 @@ func (j *jetStreamReader) Read(ctx context.Context) (*service.Message, service.A if len(msgs) == 0 { continue } - return convertMessage(msgs[0]) + return j.convertMessage(msgs[0]) } } @@ -379,7 +398,7 @@ func (j *jetStreamReader) Close(ctx context.Context) error { return nil } -func convertMessage(m *nats.Msg) (*service.Message, service.AckFunc, error) { +func (j *jetStreamReader) convertMessage(m *nats.Msg) (*service.Message, service.AckFunc, error) { msg := service.NewMessage(m.Data) msg.MetaSet("nats_subject", m.Subject) @@ -401,9 +420,19 @@ func convertMessage(m *nats.Msg) (*service.Message, service.AckFunc, error) { } return msg, func(ctx context.Context, res error) error { - if res == nil { - return m.Ack() + if res != nil { + if val, ok := m.Header[j.nakDelayUntilHeader]; ok { + if unixTime, err := strconv.ParseInt(val[0], 10, 64); err != nil { + j.log.Warnf("error parsing unix epoch time from header %s: %s error: %v", j.nakDelayUntilHeader, val[0], err) + return m.Nak() + } else { + return m.NakWithDelay(time.Unix(unixTime, 0).Sub(time.Now().UTC())) + } + } else if j.nakDelay > 0 { + return m.NakWithDelay(j.nakDelay) + } + return m.Nak() } - return m.Nak() + return m.Ack() }, nil } From 2d4241d81f5fc6cf1e2c1e9f40c1b04b7c55d7c6 Mon Sep 17 00:00:00 2001 From: Marco Amador Date: Thu, 23 May 2024 19:37:38 +0100 Subject: [PATCH 2/3] Add an opt-in parameter to create the stream and subject if not exist Signed-off-by: Marco Amador --- .../pages/inputs/nats_jetstream.adoc | 50 +++++++++++++++ internal/impl/nats/input_jetstream.go | 64 ++++++++++++++++++- 2 files changed, 113 insertions(+), 1 deletion(-) diff --git a/docs/modules/components/pages/inputs/nats_jetstream.adoc b/docs/modules/components/pages/inputs/nats_jetstream.adoc index 9e5c407b1..b61c807d0 100644 --- a/docs/modules/components/pages/inputs/nats_jetstream.adoc +++ b/docs/modules/components/pages/inputs/nats_jetstream.adoc @@ -67,6 +67,10 @@ input: deliver: all ack_wait: 30s max_ack_pending: 1024 + create_if_not_exists: false + storage_type: memory + nak_delay: 1m # No default (optional) + nak_delay_until_header: nak_delay_until tls: enabled: false skip_cert_verify: false @@ -267,6 +271,52 @@ The maximum number of outstanding acks to be allowed before consuming is halted. *Default*: `1024` +=== `create_if_not_exists` + +Create the `stream` and `subject` if do not exist. + + +*Type*: `bool` + +*Default*: `false` + +=== `storage_type` + +Storage type to use when the stream does not exist and is created when `create_if_not_exists` is set to true. Can be `memory` or `file` storage + + +*Type*: `string` + +*Default*: `"memory"` + +Options: +`memory` +, `file` +. + +=== `nak_delay` + +An optional delay duration on redelivering the messages when negatively acknowledged. + + +*Type*: `string` + + +```yml +# Examples + +nak_delay: 1m +``` + +=== `nak_delay_until_header` + +An optional header name on which will come a unix epoch timestamp in seconds until when the message delivery should be delayed. By default is `nak_delay_until` + + +*Type*: `string` + +*Default*: `"nak_delay_until"` + === `tls` Custom TLS settings can be used to override system defaults. diff --git a/internal/impl/nats/input_jetstream.go b/internal/impl/nats/input_jetstream.go index 2af929091..fc89e360b 100644 --- a/internal/impl/nats/input_jetstream.go +++ b/internal/impl/nats/input_jetstream.go @@ -19,6 +19,7 @@ import ( "errors" "fmt" "strconv" + "strings" "sync" "time" @@ -96,6 +97,14 @@ xref:configuration:interpolation.adoc#bloblang-queries[function interpolation]. Description("The maximum number of outstanding acks to be allowed before consuming is halted."). Advanced(). Default(1024)). + Field(service.NewBoolField("create_if_not_exists"). + Description("Create the `stream` and `subject` if do not exist."). + Advanced(). + Default(false)). + Field(service.NewStringEnumField("storage_type", "memory", "file"). + Description("Storage type to use when the stream does not exist and is created when `create_if_not_exists` is set to true. Can be `memory` or `file` storage"). + Advanced(). + Default("memory")). Field(service.NewDurationField("nak_delay"). Description("An optional delay duration on redelivering the messages when negatively acknowledged."). Example("1m"). @@ -139,6 +148,8 @@ type jetStreamReader struct { nakDelay time.Duration nakDelayUntilHeader string maxAckPending int + createIfNotExists bool + storageType string log *service.Logger @@ -212,7 +223,7 @@ func newJetStreamReaderFromConfig(conf *service.ParsedConfig, mgr *service.Resou } } else { if j.subject == "" && j.stream == "" { - return nil, errors.New("subject and stream is empty") + return nil, errors.New("subject and stream are empty") } } @@ -227,6 +238,17 @@ func newJetStreamReaderFromConfig(conf *service.ParsedConfig, mgr *service.Resou } } + if conf.Contains("create_if_not_exists") { + if j.createIfNotExists, err = conf.FieldBool("create_if_not_exists"); err != nil { + return nil, err + } + } + if conf.Contains("storage_type") { + if j.storageType, err = conf.FieldString("storage_type"); err != nil { + return nil, err + } + } + if conf.Contains("nak_delay") { if j.nakDelay, err = conf.FieldDuration("nak_delay"); err != nil { return nil, err @@ -324,7 +346,47 @@ func (j *jetStreamReader) Connect(ctx context.Context) (err error) { natsSub, err = jCtx.QueueSubscribeSync(j.subject, j.queue, options...) } } + if err != nil { + if j.createIfNotExists { + var natsErr *nats.APIError + if errors.As(err, &natsErr) { + if natsErr.ErrorCode == nats.JSErrCodeStreamNotFound { + _, err = jCtx.AddStream(&nats.StreamConfig{ + Name: j.stream, + Subjects: func() []string { + if j.subject == "" { + return nil + } + return []string{j.subject} + }(), + Storage: func() nats.StorageType { + if j.storageType == "file" { + return nats.FileStorage + } + return nats.MemoryStorage + }(), + }) + } + } else if strings.Contains(err.Error(), "does not match consumer") { + // create subject to existent stream .stream + _, err = jCtx.UpdateStream(&nats.StreamConfig{ + Name: j.stream, + Subjects: func() []string { + if j.subject == "" { + return nil + } + return []string{j.subject} + }(), + Storage: func() nats.StorageType { + if j.storageType == "file" { + return nats.FileStorage + } + return nats.MemoryStorage + }(), + }) + } + } return err } From 195c6aaf9e1b5a6ac30f9b267db49d9d80d5881e Mon Sep 17 00:00:00 2001 From: Marco Amador Date: Fri, 4 Oct 2024 18:11:22 +0100 Subject: [PATCH 3/3] Add num_replicas parameter to specify the number of stream replicas when in clustered mode Signed-off-by: Marco Amador --- .../pages/inputs/nats_jetstream.adoc | 12 +++++++++++- internal/impl/nats/input_jetstream.go | 19 +++++++++++++++++-- 2 files changed, 28 insertions(+), 3 deletions(-) diff --git a/docs/modules/components/pages/inputs/nats_jetstream.adoc b/docs/modules/components/pages/inputs/nats_jetstream.adoc index b61c807d0..a244e1916 100644 --- a/docs/modules/components/pages/inputs/nats_jetstream.adoc +++ b/docs/modules/components/pages/inputs/nats_jetstream.adoc @@ -68,6 +68,7 @@ input: ack_wait: 30s max_ack_pending: 1024 create_if_not_exists: false + num_replicas: 1 storage_type: memory nak_delay: 1m # No default (optional) nak_delay_until_header: nak_delay_until @@ -280,9 +281,18 @@ Create the `stream` and `subject` if do not exist. *Default*: `false` +=== `num_replicas` + +The number of stream replicas, only supported in clustered mode and if the stream is created when `create_if_not_exists` is set to true. Defaults to 1, maximum is 5. + + +*Type*: `int` + +*Default*: `1` + === `storage_type` -Storage type to use when the stream does not exist and is created when `create_if_not_exists` is set to true. Can be `memory` or `file` storage +Storage type to use when the stream does not exist and is created when `create_if_not_exists` is set to true. Can be `memory` or `file` storage. *Type*: `string` diff --git a/internal/impl/nats/input_jetstream.go b/internal/impl/nats/input_jetstream.go index fc89e360b..75cdfcb74 100644 --- a/internal/impl/nats/input_jetstream.go +++ b/internal/impl/nats/input_jetstream.go @@ -101,8 +101,12 @@ xref:configuration:interpolation.adoc#bloblang-queries[function interpolation]. Description("Create the `stream` and `subject` if do not exist."). Advanced(). Default(false)). + Field(service.NewIntField("num_replicas"). + Description("The number of stream replicas, only supported in clustered mode and if the stream is created when `create_if_not_exists` is set to true. Defaults to 1, maximum is 5."). + Advanced(). + Default(1)). Field(service.NewStringEnumField("storage_type", "memory", "file"). - Description("Storage type to use when the stream does not exist and is created when `create_if_not_exists` is set to true. Can be `memory` or `file` storage"). + Description("Storage type to use when the stream does not exist and is created when `create_if_not_exists` is set to true. Can be `memory` or `file` storage."). Advanced(). Default("memory")). Field(service.NewDurationField("nak_delay"). @@ -149,6 +153,7 @@ type jetStreamReader struct { nakDelayUntilHeader string maxAckPending int createIfNotExists bool + numReplicas int storageType string log *service.Logger @@ -243,6 +248,14 @@ func newJetStreamReaderFromConfig(conf *service.ParsedConfig, mgr *service.Resou return nil, err } } + if conf.Contains("num_replicas") { + if j.numReplicas, err = conf.FieldInt("num_replicas"); err != nil { + return nil, err + } + if j.numReplicas < 1 || j.numReplicas > 5 { + return nil, fmt.Errorf("num_replicas %d is invalid, it must be between 1 and 5", j.numReplicas) + } + } if conf.Contains("storage_type") { if j.storageType, err = conf.FieldString("storage_type"); err != nil { return nil, err @@ -352,6 +365,7 @@ func (j *jetStreamReader) Connect(ctx context.Context) (err error) { var natsErr *nats.APIError if errors.As(err, &natsErr) { if natsErr.ErrorCode == nats.JSErrCodeStreamNotFound { + // create stream and subject _, err = jCtx.AddStream(&nats.StreamConfig{ Name: j.stream, Subjects: func() []string { @@ -366,10 +380,11 @@ func (j *jetStreamReader) Connect(ctx context.Context) (err error) { } return nats.MemoryStorage }(), + Replicas: j.numReplicas, }) } } else if strings.Contains(err.Error(), "does not match consumer") { - // create subject to existent stream .stream + // create subject on existent stream _, err = jCtx.UpdateStream(&nats.StreamConfig{ Name: j.stream, Subjects: func() []string {