Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support NAK delay on nats_jetstream input #2556

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions docs/modules/components/pages/inputs/nats_jetstream.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ input:
deliver: all
ack_wait: 30s
max_ack_pending: 1024
create_if_not_exists: false
num_replicas: 1
storage_type: memory
nak_delay: 1m # No default (optional)
nak_delay_until_header: nak_delay_until
tls:
enabled: false
skip_cert_verify: false
Expand Down Expand Up @@ -267,6 +272,61 @@ The maximum number of outstanding acks to be allowed before consuming is halted.

*Default*: `1024`

=== `create_if_not_exists`

Create the `stream` and `subject` if do not exist.


*Type*: `bool`

*Default*: `false`

=== `num_replicas`

The number of stream replicas, only supported in clustered mode and if the stream is created when `create_if_not_exists` is set to true. Defaults to 1, maximum is 5.


*Type*: `int`

*Default*: `1`

=== `storage_type`

Storage type to use when the stream does not exist and is created when `create_if_not_exists` is set to true. Can be `memory` or `file` storage.


*Type*: `string`

*Default*: `"memory"`

Options:
`memory`
, `file`
.

=== `nak_delay`

An optional delay duration on redelivering the messages when negatively acknowledged.


*Type*: `string`


```yml
# Examples

nak_delay: 1m
```

=== `nak_delay_until_header`

An optional header name on which will come a unix epoch timestamp in seconds until when the message delivery should be delayed. By default is `nak_delay_until`


*Type*: `string`

*Default*: `"nak_delay_until"`

=== `tls`

Custom TLS settings can be used to override system defaults.
Expand Down
142 changes: 124 additions & 18 deletions internal/impl/nats/input_jetstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"errors"
"fmt"
"strconv"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -96,6 +97,27 @@ xref:configuration:interpolation.adoc#bloblang-queries[function interpolation].
Description("The maximum number of outstanding acks to be allowed before consuming is halted.").
Advanced().
Default(1024)).
Field(service.NewBoolField("create_if_not_exists").
Description("Create the `stream` and `subject` if do not exist.").
Advanced().
Default(false)).
Field(service.NewIntField("num_replicas").
Description("The number of stream replicas, only supported in clustered mode and if the stream is created when `create_if_not_exists` is set to true. Defaults to 1, maximum is 5.").
Advanced().
Default(1)).
Field(service.NewStringEnumField("storage_type", "memory", "file").
Description("Storage type to use when the stream does not exist and is created when `create_if_not_exists` is set to true. Can be `memory` or `file` storage.").
Advanced().
Default("memory")).
Field(service.NewDurationField("nak_delay").
Description("An optional delay duration on redelivering the messages when negatively acknowledged.").
Example("1m").
Advanced().
Optional()).
Field(service.NewStringField("nak_delay_until_header").
Description("An optional header name on which will come a unix epoch timestamp in seconds until when the message delivery should be delayed. By default is `nak_delay_until`").
Advanced().
Default("nak_delay_until")).
Fields(connectionTailFields()...).
Field(inputTracingDocs())
}
Expand All @@ -118,16 +140,21 @@ func init() {
//------------------------------------------------------------------------------

type jetStreamReader struct {
connDetails connectionDetails
deliverOpt nats.SubOpt
subject string
queue string
stream string
bind bool
pull bool
durable string
ackWait time.Duration
maxAckPending int
connDetails connectionDetails
deliverOpt nats.SubOpt
subject string
queue string
stream string
bind bool
pull bool
durable string
ackWait time.Duration
nakDelay time.Duration
nakDelayUntilHeader string
maxAckPending int
createIfNotExists bool
numReplicas int
storageType string

log *service.Logger

Expand Down Expand Up @@ -201,7 +228,7 @@ func newJetStreamReaderFromConfig(conf *service.ParsedConfig, mgr *service.Resou
}
} else {
if j.subject == "" && j.stream == "" {
return nil, errors.New("subject and stream is empty")
return nil, errors.New("subject and stream are empty")
}
}

Expand All @@ -216,6 +243,34 @@ func newJetStreamReaderFromConfig(conf *service.ParsedConfig, mgr *service.Resou
}
}

if conf.Contains("create_if_not_exists") {
if j.createIfNotExists, err = conf.FieldBool("create_if_not_exists"); err != nil {
return nil, err
}
}
if conf.Contains("num_replicas") {
if j.numReplicas, err = conf.FieldInt("num_replicas"); err != nil {
return nil, err
}
if j.numReplicas < 1 || j.numReplicas > 5 {
return nil, fmt.Errorf("num_replicas %d is invalid, it must be between 1 and 5", j.numReplicas)
}
}
if conf.Contains("storage_type") {
if j.storageType, err = conf.FieldString("storage_type"); err != nil {
return nil, err
}
}

if conf.Contains("nak_delay") {
if j.nakDelay, err = conf.FieldDuration("nak_delay"); err != nil {
return nil, err
}
}
if j.nakDelayUntilHeader, err = conf.FieldString("nak_delay_until_header"); err != nil {
return nil, err
}

if j.maxAckPending, err = conf.FieldInt("max_ack_pending"); err != nil {
return nil, err
}
Expand Down Expand Up @@ -304,7 +359,49 @@ func (j *jetStreamReader) Connect(ctx context.Context) (err error) {
natsSub, err = jCtx.QueueSubscribeSync(j.subject, j.queue, options...)
}
}

if err != nil {
if j.createIfNotExists {
var natsErr *nats.APIError
if errors.As(err, &natsErr) {
if natsErr.ErrorCode == nats.JSErrCodeStreamNotFound {
// create stream and subject
_, err = jCtx.AddStream(&nats.StreamConfig{
Name: j.stream,
Subjects: func() []string {
if j.subject == "" {
return nil
}
return []string{j.subject}
}(),
Storage: func() nats.StorageType {
if j.storageType == "file" {
return nats.FileStorage
}
return nats.MemoryStorage
}(),
Replicas: j.numReplicas,
})
}
} else if strings.Contains(err.Error(), "does not match consumer") {
// create subject on existent stream
_, err = jCtx.UpdateStream(&nats.StreamConfig{
Name: j.stream,
Subjects: func() []string {
if j.subject == "" {
return nil
}
return []string{j.subject}
}(),
Storage: func() nats.StorageType {
if j.storageType == "file" {
return nats.FileStorage
}
return nats.MemoryStorage
}(),
})
}
}
return err
}

Expand Down Expand Up @@ -334,14 +431,13 @@ func (j *jetStreamReader) Read(ctx context.Context) (*service.Message, service.A
if natsSub == nil {
return nil, nil, service.ErrNotConnected
}

if !j.pull {
nmsg, err := natsSub.NextMsgWithContext(ctx)
if err != nil {
// TODO: Any errors need capturing here to signal a lost connection?
return nil, nil, err
}
return convertMessage(nmsg)
return j.convertMessage(nmsg)
}

for {
Expand All @@ -362,7 +458,7 @@ func (j *jetStreamReader) Read(ctx context.Context) (*service.Message, service.A
if len(msgs) == 0 {
continue
}
return convertMessage(msgs[0])
return j.convertMessage(msgs[0])
}
}

Expand All @@ -379,7 +475,7 @@ func (j *jetStreamReader) Close(ctx context.Context) error {
return nil
}

func convertMessage(m *nats.Msg) (*service.Message, service.AckFunc, error) {
func (j *jetStreamReader) convertMessage(m *nats.Msg) (*service.Message, service.AckFunc, error) {
msg := service.NewMessage(m.Data)
msg.MetaSet("nats_subject", m.Subject)

Expand All @@ -401,9 +497,19 @@ func convertMessage(m *nats.Msg) (*service.Message, service.AckFunc, error) {
}

return msg, func(ctx context.Context, res error) error {
if res == nil {
return m.Ack()
if res != nil {
if val, ok := m.Header[j.nakDelayUntilHeader]; ok {
if unixTime, err := strconv.ParseInt(val[0], 10, 64); err != nil {
j.log.Warnf("error parsing unix epoch time from header %s: %s error: %v", j.nakDelayUntilHeader, val[0], err)
return m.Nak()
} else {
return m.NakWithDelay(time.Unix(unixTime, 0).Sub(time.Now().UTC()))
}
} else if j.nakDelay > 0 {
return m.NakWithDelay(j.nakDelay)
}
return m.Nak()
}
return m.Nak()
return m.Ack()
}, nil
}
Loading