Skip to content

Commit

Permalink
fix: Dont fail when attempting to remove non-existant up metric
Browse files Browse the repository at this point in the history
  • Loading branch information
morgsmccauley committed Jun 27, 2024
1 parent 0728655 commit a185c02
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 7 deletions.
5 changes: 3 additions & 2 deletions block-streamer/src/block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,9 @@ impl BlockStream {
task.cancellation_token.cancel();
let _ = task.handle.await?;

metrics::BLOCK_STREAM_UP
.remove_label_values(&[&self.indexer_config.get_full_name()])?;
// Fails if metric doesn't exist, i.e. task was never polled
let _ = metrics::BLOCK_STREAM_UP
.remove_label_values(&[&self.indexer_config.get_full_name()]);

return Ok(());
}
Expand Down
16 changes: 11 additions & 5 deletions block-streamer/src/server/block_streamer_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ impl BlockStreamerService {

#[tonic::async_trait]
impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerService {
#[tracing::instrument(skip(self))]
async fn start_stream(
&self,
request: Request<blockstreamer::StartStreamRequest>,
Expand Down Expand Up @@ -117,7 +118,11 @@ impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerServic
self.delta_lake_client.clone(),
self.lake_s3_client.clone(),
)
.map_err(|_| Status::internal("Failed to start block stream"))?;
.map_err(|err| {
tracing::error!(?err, "Failed to start block stream");

Status::internal("Failed to start block stream")
})?;

let mut lock = self.get_block_streams_lock()?;
lock.insert(indexer_config.get_hash_id(), block_stream);
Expand All @@ -127,6 +132,7 @@ impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerServic
}))
}

#[tracing::instrument(skip(self))]
async fn stop_stream(
&self,
request: Request<blockstreamer::StopStreamRequest>,
Expand All @@ -148,10 +154,10 @@ impl blockstreamer::block_streamer_server::BlockStreamer for BlockStreamerServic
)))
}
Some(mut block_stream) => {
block_stream
.cancel()
.await
.map_err(|_| Status::internal("Failed to cancel block stream"))?;
block_stream.cancel().await.map_err(|err| {
tracing::error!(?err, "Failed to cancel block stream");
Status::internal("Failed to cancel block stream")
})?;
}
}

Expand Down

0 comments on commit a185c02

Please sign in to comment.