-
Notifications
You must be signed in to change notification settings - Fork 154
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement Consumers Pause #1093
Conversation
a59421e
to
d8a3b85
Compare
Waiting for a fix to nats-io/nats-server#5163. Currently fails the consumer info check that's been added in the last commit. EDIT: Fix has been merged, the tests now pass 🎉 |
10ceba8
to
039279a
Compare
Signed-off-by: Maurice van Veen <[email protected]>
Signed-off-by: Maurice van Veen <[email protected]>
Signed-off-by: Maurice van Veen <[email protected]>
Signed-off-by: Maurice van Veen <[email protected]>
Signed-off-by: Maurice van Veen <[email protected]>
Signed-off-by: Maurice van Veen <[email protected]>
039279a
to
68b6121
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great, just some minor changes. Also can you add this to the example:
// 8. Try to pause a consumer that does not exist and you will get an exception
System.out.println("\n----------\n8. Pause non-existent consumer .");
try
{
jsm.pauseConsumer(exArgs.stream, durable1, ZonedDateTime.now());
}
catch (JetStreamApiException e)
{
System.out.println("Exception was: '" + e.getMessage() + "'");
}
* server such as timeout or interruption | ||
* @throws JetStreamApiException the request had an error related to the data, for instance the consumer does not exist. | ||
*/ | ||
ConsumerPauseResponse pauseConsumer(String streamName, String consumerName, ZonedDateTime pauseUntil) throws IOException, JetStreamApiException; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there should be two api here, one that does not take pauseUntil, since it is not required.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean having three methods in the end:
pauseConsumer(streamName, consumerName, pauseUntil)
pauseConsumer(streamName, consumerName)
resumeConsumer(streamName, consumerName)
When not sending a pauseUntil
, the consumer does not get paused and should be equivalent to calling resumeConsumer
instead. So pauseConsumer
is used to pause a consumer until the set date, and resumeConsumer
resumes it. Could you explain why you'd expect two api here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I thought no pauseUntil meant pause until resume. So just ignore this comment.
"pauseUntil='" + pauseUntil + "'" + | ||
'}'; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add a test for this, otherwise coverage goes down. Usually I just assign it and do something like contains("true"). Or just remove it since someone could call toJson
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have removed it
Signed-off-by: Maurice van Veen <[email protected]>
Signed-off-by: Maurice van Veen <[email protected]>
Signed-off-by: Maurice van Veen <[email protected]>
Signed-off-by: Maurice van Veen <[email protected]>
Signed-off-by: Maurice van Veen <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Resolves #1084
Adds support for pausing and resuming consumers.
JetStreamManagement
now has methods tojsm.pauseConsumer(stream, consumer, pauseUntil)
, as well asjsm.resumeConsumer(stream, consumer)
.src/examples
.ConsumerConfiguration
/builder now has support for settingpauseUntil
as well. Allowing to set it during consumer creation.An optional addition (not implemented now) could be: