Skip to content

Commit

Permalink
refactor(rust): Add tracking of async task wait time statistics (#19373)
Browse files Browse the repository at this point in the history
  • Loading branch information
orlp authored Oct 22, 2024
1 parent c731d11 commit c230071
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 5 deletions.
58 changes: 55 additions & 3 deletions crates/polars-stream/src/async_executor/mod.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
#![allow(clippy::disallowed_types)]

mod park_group;
mod task;

use std::cell::{Cell, UnsafeCell};
use std::collections::HashMap;
use std::future::Future;
use std::marker::PhantomData;
use std::panic::AssertUnwindSafe;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, OnceLock, Weak};
use std::panic::{AssertUnwindSafe, Location};
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
use std::sync::{Arc, LazyLock, OnceLock, Weak};
use std::time::Duration;

use crossbeam_deque::{Injector, Steal, Stealer, Worker as WorkQueue};
use crossbeam_utils::CachePadded;
Expand All @@ -30,6 +34,27 @@ thread_local!(
static TLS_THREAD_ID: Cell<usize> = const { Cell::new(usize::MAX) };
);

static NS_SPENT_BLOCKED: LazyLock<Mutex<HashMap<&'static Location<'static>, u64>>> =
LazyLock::new(Mutex::default);

static TRACK_WAIT_STATISTICS: AtomicBool = AtomicBool::new(false);

pub fn track_task_wait_statistics(should_track: bool) {
TRACK_WAIT_STATISTICS.store(should_track, Ordering::Relaxed);
}

pub fn get_task_wait_statistics() -> Vec<(&'static Location<'static>, Duration)> {
NS_SPENT_BLOCKED
.lock()
.iter()
.map(|(l, ns)| (*l, Duration::from_nanos(*ns)))
.collect()
}

pub fn clear_task_wait_statistics() {
NS_SPENT_BLOCKED.lock().clear()
}

slotmap::new_key_type! {
struct TaskKey;
}
Expand All @@ -48,13 +73,19 @@ struct ScopedTaskMetadata {
}

struct TaskMetadata {
spawn_location: &'static Location<'static>,
ns_spent_blocked: AtomicU64,
priority: TaskPriority,
freshly_spawned: AtomicBool,
scoped: Option<ScopedTaskMetadata>,
}

impl Drop for TaskMetadata {
fn drop(&mut self) {
*NS_SPENT_BLOCKED
.lock()
.entry(self.spawn_location)
.or_default() += self.ns_spent_blocked.load(Ordering::Relaxed);
if let Some(scoped) = &self.scoped {
if let Some(completed_tasks) = scoped.completed_tasks.upgrade() {
completed_tasks.lock().push(scoped.task_key);
Expand Down Expand Up @@ -182,6 +213,7 @@ impl Executor {

let mut rng = SmallRng::from_rng(&mut rand::thread_rng()).unwrap();
let mut worker = self.park_group.new_worker();
let mut last_block_start = None;

loop {
let ttl = &self.thread_task_lists[thread];
Expand All @@ -206,11 +238,23 @@ impl Executor {
if let Some(task) = self.try_steal_task(thread, &mut rng) {
return Some(task);
}

if last_block_start.is_none() && TRACK_WAIT_STATISTICS.load(Ordering::Relaxed) {
last_block_start = Some(std::time::Instant::now());
}
park.park();
None
})();

if let Some(task) = task {
if let Some(t) = last_block_start.take() {
if TRACK_WAIT_STATISTICS.load(Ordering::Relaxed) {
let ns: u64 = t.elapsed().as_nanos().try_into().unwrap();
task.metadata()
.ns_spent_blocked
.fetch_add(ns, Ordering::Relaxed);
}
}
worker.recruit_next();
task.run();
}
Expand Down Expand Up @@ -280,6 +324,7 @@ impl<'scope, 'env> TaskScope<'scope, 'env> {
}
}

#[track_caller]
pub fn spawn_task<F: Future + Send + 'scope>(
&self,
priority: TaskPriority,
Expand All @@ -288,6 +333,7 @@ impl<'scope, 'env> TaskScope<'scope, 'env> {
where
<F as Future>::Output: Send + 'static,
{
let spawn_location = Location::caller();
self.clear_completed_tasks();

let mut runnable = None;
Expand All @@ -301,6 +347,8 @@ impl<'scope, 'env> TaskScope<'scope, 'env> {
fut,
on_wake,
TaskMetadata {
spawn_location,
ns_spent_blocked: AtomicU64::new(0),
priority,
freshly_spawned: AtomicBool::new(true),
scoped: Some(ScopedTaskMetadata {
Expand Down Expand Up @@ -345,16 +393,20 @@ where
}
}

#[track_caller]
pub fn spawn<F: Future + Send + 'static>(priority: TaskPriority, fut: F) -> JoinHandle<F::Output>
where
<F as Future>::Output: Send + 'static,
{
let spawn_location = Location::caller();
let executor = Executor::global();
let on_wake = move |task| executor.schedule_task(task);
let (runnable, join_handle) = task::spawn(
fut,
on_wake,
TaskMetadata {
spawn_location,
ns_spent_blocked: AtomicU64::new(0),
priority,
freshly_spawned: AtomicBool::new(true),
scoped: None,
Expand Down
13 changes: 11 additions & 2 deletions crates/polars-stream/src/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,12 +220,21 @@ fn run_subgraph(
}

// Wait until all tasks are done.
polars_io::pl_async::get_runtime().block_on(async move {
// Only now do we turn on/off wait statistics tracking to reduce noise
// from task startup.
if std::env::var("POLARS_TRACK_WAIT_STATS").as_deref() == Ok("1") {
async_executor::track_task_wait_statistics(true);
}
let ret = polars_io::pl_async::get_runtime().block_on(async move {
for handle in join_handles {
handle.await?;
}
PolarsResult::Ok(())
})
});
if std::env::var("POLARS_TRACK_WAIT_STATS").as_deref() == Ok("1") {
async_executor::track_task_wait_statistics(false);
}
ret
})?;

Ok(())
Expand Down
12 changes: 12 additions & 0 deletions crates/polars-stream/src/skeleton.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
#![allow(unused)] // TODO: remove me
use std::cmp::Reverse;

use polars_core::prelude::*;
use polars_core::POOL;
use polars_expr::planner::{create_physical_expr, get_expr_depth_limit, ExpressionConversionState};
Expand Down Expand Up @@ -35,6 +37,16 @@ pub fn run_query(
}
let (mut graph, phys_to_graph) =
crate::physical_plan::physical_plan_to_graph(root, &phys_sm, expr_arena)?;
crate::async_executor::clear_task_wait_statistics();
let mut results = crate::execute::execute_graph(&mut graph)?;
if std::env::var("POLARS_TRACK_WAIT_STATS").as_deref() == Ok("1") {
let mut stats = crate::async_executor::get_task_wait_statistics();
stats.sort_by_key(|(_l, w)| Reverse(*w));
eprintln!("Time spent waiting for async tasks:");
for (loc, wait_time) in stats {
eprintln!("{}:{} - {:?}", loc.file(), loc.line(), wait_time);
}
}

Ok(results.remove(phys_to_graph[root]).unwrap())
}

0 comments on commit c230071

Please sign in to comment.