Skip to content

Commit

Permalink
fix: Cancel monitoring task on block stream stop
Browse files Browse the repository at this point in the history
  • Loading branch information
morgsmccauley committed Jul 17, 2024
1 parent c814ff8 commit 1c9dc53
Showing 1 changed file with 10 additions and 7 deletions.
17 changes: 10 additions & 7 deletions block-streamer/src/block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ impl<F: Future> Future for PollCounter<F> {
}

pub struct Task {
handle: JoinHandle<anyhow::Result<()>>,
stream_handle: JoinHandle<anyhow::Result<()>>,
monitor_handle: JoinHandle<()>,
cancellation_token: tokio_util::sync::CancellationToken,
}

Expand Down Expand Up @@ -119,7 +120,7 @@ impl BlockStream {
}
}

fn start_health_monitoring_task(&self, redis: Arc<RedisClient>) {
fn start_health_monitoring_task(&self, redis: Arc<RedisClient>) -> JoinHandle<()> {
tokio::spawn({
let config = self.indexer_config.clone();
let health = self.health.clone();
Expand Down Expand Up @@ -194,7 +195,7 @@ impl BlockStream {
last_processed_block = new_last_processed_block;
}
}
});
})
}

fn start_block_stream_task(
Expand Down Expand Up @@ -265,9 +266,9 @@ impl BlockStream {

let cancellation_token = tokio_util::sync::CancellationToken::new();

self.start_health_monitoring_task(redis.clone());
let monitor_handle = self.start_health_monitoring_task(redis.clone());

let handle = self.start_block_stream_task(
let stream_handle = self.start_block_stream_task(
start_block_height,
redis,
reciever_blocks_processor,
Expand All @@ -276,7 +277,8 @@ impl BlockStream {
);

self.task = Some(Task {
handle,
stream_handle,
monitor_handle,
cancellation_token,
});

Expand All @@ -285,8 +287,9 @@ impl BlockStream {

pub async fn cancel(&mut self) -> anyhow::Result<()> {
if let Some(task) = self.task.take() {
task.monitor_handle.abort();
task.cancellation_token.cancel();
let _ = task.handle.await?;
let _ = task.stream_handle.await?;

// Fails if metric doesn't exist, i.e. task was never polled
let _ = metrics::BLOCK_STREAM_UP
Expand Down

0 comments on commit 1c9dc53

Please sign in to comment.