Skip to content

Commit

Permalink
feat: add pause consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
yordis committed Mar 14, 2024
1 parent 177c4c7 commit 7738d8f
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 0 deletions.
70 changes: 70 additions & 0 deletions nats/src/jetstream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1764,6 +1764,76 @@ impl JetStream {
.map(|dr| dr.success)
}

/// Pause a `JetStream` consumer until the given time.
pub fn pause_consumer<S, C>(&self, stream: S, consumer: C, pause_until: DateTime) -> io::Result<PauseResponse>
where
S: AsRef<str>,
C: AsRef<str>,
{
let stream = stream.as_ref();
if stream.is_empty() {
return Err(io::Error::new(
ErrorKind::InvalidInput,
"the stream name must not be empty",
));
}
let consumer = consumer.as_ref();
if consumer.is_empty() {
return Err(io::Error::new(
ErrorKind::InvalidInput,
"the consumer name must not be empty",
));
}

let subject = format!(
"{}CONSUMER.PAUSE.{}.{}",
self.api_prefix(),
stream,
consumer
);

let req = serde_json::ser::to_vec(&PauseConsumerRequest {
pause_until: Some(pause_until),
})?;

self.js_request::<PauseResponse>(&subject, &req)
}

/// Resume a `JetStream` consumer.
pub fn resume_consumer<S, C>(&self, stream: S, consumer: C) -> io::Result<PauseResponse>
where
S: AsRef<str>,
C: AsRef<str>,
{
let stream = stream.as_ref();
if stream.is_empty() {
return Err(io::Error::new(
ErrorKind::InvalidInput,
"the stream name must not be empty",
));
}
let consumer = consumer.as_ref();
if consumer.is_empty() {
return Err(io::Error::new(
ErrorKind::InvalidInput,
"the consumer name must not be empty",
));
}

let subject = format!(
"{}CONSUMER.PAUSE.{}.{}",
self.api_prefix(),
stream,
consumer
);

let req = serde_json::ser::to_vec(&PauseConsumerRequest {
pause_until: None,
})?;

self.js_request::<PauseResponse>(&subject, &req)
}

/// Query `JetStream` consumer information.
pub fn consumer_info<S, C>(&self, stream: S, consumer: C) -> io::Result<ConsumerInfo>
where
Expand Down
19 changes: 19 additions & 0 deletions nats/src/jetstream/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,18 @@ pub(crate) struct DeleteResponse {
pub success: bool,
}

#[derive(Deserialize)]
pub(crate) struct PauseResponse {
pub paused: bool,
pub pause_until: Option<DateTime>,
pub pause_remaining: Option<Duration>,
}

#[derive(Debug, Default, Serialize, Deserialize)]
pub(crate) struct PauseConsumerRequest {
pub pause_until: Option<DateTime>,
}

#[derive(Debug, Default, Serialize, Deserialize)]
pub(crate) struct CreateConsumerRequest {
pub stream_name: String,
Expand Down Expand Up @@ -252,6 +264,8 @@ pub struct ConsumerConfig {
/// Threshold for ephemeral consumer inactivity
#[serde(default, with = "serde_nanos", skip_serializing_if = "is_default")]
pub inactive_threshold: Duration,
/// PauseUntil is for suspending the consumer until the deadline.
pub pause_until: Option<DateTime>,
}

pub(crate) enum ConsumerKind {
Expand Down Expand Up @@ -680,6 +694,11 @@ pub struct ConsumerInfo {
/// Indicates if any client is connected and receiving messages from a push consumer
#[serde(default)]
pub push_bound: bool,
/// Paused indicates whether the consumer is paused.
pub paused: bool,
/// PauseRemaining contains the amount of time left until the consumer unpauses. It will only
/// be non-zero if the consumer is currently paused.
pub pause_remaining: Option<Duration>,
}

/// Information about the stream's, consumer's associated `JetStream` cluster
Expand Down

0 comments on commit 7738d8f

Please sign in to comment.