Skip to content

Commit

Permalink
Abort join set once callback duration limit is exceeded
Browse files Browse the repository at this point in the history
  • Loading branch information
john-z-yang committed Oct 24, 2024
1 parent 9ea2310 commit 451071e
Showing 1 changed file with 21 additions and 26 deletions.
47 changes: 21 additions & 26 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,57 +266,52 @@ pub async fn handle_events(
(ConsumerState::Consuming { .. }, Event::Assign(_)) => {
unreachable!("Got partition assignment after consumer has started")
}
(ConsumerState::Consuming((_, shutdown_actors, mut rendezvous)), Event::Revoke(_)) => {
(
ConsumerState::Consuming((mut join_set, shutdown_actors, mut rendezvous)),
Event::Revoke(_),
) => {
debug!("Signaling shutdown to actors...");
shutdown_actors.cancel();
info!("Actor shutdown signaled, waiting for rendezvous...");

select! {
_ = &mut rendezvous => {
info!(
"Rendezvous complete within callback deadline,\
transitioning consumer state to Ready"
);
ConsumerState::Ready
info!("Rendezvous complete within callback deadline.");
}
_ = sleep(CALLBACK_DURATION) => {
debug!(
error!(
"Unable to rendezvous within callback deadline, \
transitioning consumer state to Draining"
);
todo!(
"schedule a drain deadline here, \
poll it in the select arm, evaluate to ConsumerState::Draining"
aborting all tasks within JoinSet"
);
join_set.abort_all();
}
}
debug!("Transitioning consumer state to Ready");
ConsumerState::Ready
}
(ConsumerState::Consuming((_, shutdown_actors, mut rendezvous)), Event::Shutdown) => {
(
ConsumerState::Consuming((mut join_set, shutdown_actors, mut rendezvous)),
Event::Shutdown,
) => {
debug!("Signaling shutdown to actors...");
shutdown_actors.cancel();
info!("Actor shutdown signaled, waiting for rendezvous...");

select! {
_ = &mut rendezvous => {
info!(
"Rendezvous complete within callback deadline, \
transitioning consumer state to Stopped"
);
debug!("Signaling shutdown to client...");
shutdown_client.take();
ConsumerState::Stopped
info!("Rendezvous complete within callback deadline.");
}
_ = sleep(CALLBACK_DURATION) => {
debug!(
error!(
"Unable to rendezvous within callback deadline, \
transitioning consumer state to Closing"
);
todo!(
"schedule a drain deadline here, \
poll it in the select arm, evaluate to ConsumerState::Closing"
aborting all tasks within JoinSet"
);
join_set.abort_all();
}
}
debug!("Signaling shutdown to client...");
shutdown_client.take();
ConsumerState::Stopped
}
(ConsumerState::Stopped, _) => {
unreachable!("Got event after consumer has stopped")
Expand Down

0 comments on commit 451071e

Please sign in to comment.