-
Notifications
You must be signed in to change notification settings - Fork 219
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add ability to use PullConsumers with NATS jetstream with a specified callback #1075
add ability to use PullConsumers with NATS jetstream with a specified callback #1075
Conversation
if err != nil || msgs == nil { | ||
continue | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure I'm a fan of just dropping the error message here, unless it is logged internall before it is called?
Also there's no need for the nil check on msgs
- range
over a nil slice is a valid operation and just skips.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
returning error to calling function.
continue | ||
} | ||
for i := range msgs { | ||
msg := msgs[i] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think an assignment is needed here given MsgHandler
is synchonous?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed assignment
9212db4
to
bd93dc5
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not a NATS expert, but IMHO this PR is using a legacy API:
// NOTE: JetStream is part of legacy API.
// Users are encouraged to switch to the new JetStream API for enhanced capabilities and
// simplified API. Please refer to thejetstream
package.
// See: https://github.com/nats-io/nats.go/blob/main/jetstream/README.md
I like the idea of pull-based subscriptions to give more control, but IMHO we should switch to the newer API and clean up the whole code base of this implementation to avoid setting up both, push and pull-based consumers. This can also simplify the current proposal, removing having two similar for select
implementations as proposed in this PR.
@embano1 Using the new api is a good idea. The old api was already in use and it seemed I would have to version the library because many of the exposed options are different. Or I would have to do some hacky translation of the old option to the new option. There are several slight issues I have with the current implementation that I believe using the new api might benefit. |
Do you think it would be possible to use the new NATS API/library without breaking CloudEvents users? We're currently doing a similar migration in AMQP and while we're still discussing how to avoid small breaking changes (there might be some, but minor), I think we could do the same with NATS incrementally and not break users? |
@stephen-totty-hpe ping |
… callback Signed-off-by: stephen-totty-hpe <[email protected]>
bd93dc5
to
46f3bf8
Compare
I will close this PR if #1095 gets merged. |
In many cases, the reciever of a message might want some ability to exercise control over the fetching. Also, with NATS, there are options inside the NATS server that might be beneficial to use a pull-based consumer.