Skip to content

Commit

Permalink
ref(rust): Add join-time metric for threadpools (#362)
Browse files Browse the repository at this point in the history
We suspect that joining a threadpool takes most of the time when
rebalancing.
  • Loading branch information
untitaker authored May 15, 2024
1 parent 8a90458 commit 78de503
Showing 1 changed file with 9 additions and 2 deletions.
11 changes: 9 additions & 2 deletions rust-arroyo/src/processing/strategies/run_task_in_threads.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
use std::collections::VecDeque;
use std::future::Future;
use std::pin::Pin;
use std::time::Duration;
use std::time::{Duration, Instant};

use tokio::runtime::{Handle, Runtime};
use tokio::task::JoinHandle;

use crate::gauge;
use crate::processing::strategies::{
merge_commit_request, CommitRequest, InvalidMessage, MessageRejected, ProcessingStrategy,
SubmitError,
};
use crate::types::Message;
use crate::utils::timing::Deadline;
use crate::{gauge, timer};

use super::StrategyError;

Expand Down Expand Up @@ -218,6 +218,7 @@ where

fn join(&mut self, timeout: Option<Duration>) -> Result<Option<CommitRequest>, StrategyError> {
let deadline = timeout.map(Deadline::new);
let start = Instant::now();

// Poll until there are no more messages or timeout is hit
while self.message_carried_over.is_some() || !self.handles.is_empty() {
Expand All @@ -240,6 +241,12 @@ where
}
self.handles.clear();

timer!(
"arroyo.strategies.run_task_in_threads.join_time",
start.elapsed(),
"strategy_name" => self.metric_strategy_name
);

let next_commit = self.next_step.join(deadline.map(|d| d.remaining()))?;

Ok(merge_commit_request(
Expand Down

0 comments on commit 78de503

Please sign in to comment.