From c230071f7ae282320c540cc7ca2d236df5f35940 Mon Sep 17 00:00:00 2001 From: Orson Peters Date: Tue, 22 Oct 2024 10:23:24 +0200 Subject: [PATCH] refactor(rust): Add tracking of async task wait time statistics (#19373) --- .../polars-stream/src/async_executor/mod.rs | 58 ++++++++++++++++++- crates/polars-stream/src/execute.rs | 13 ++++- crates/polars-stream/src/skeleton.rs | 12 ++++ 3 files changed, 78 insertions(+), 5 deletions(-) diff --git a/crates/polars-stream/src/async_executor/mod.rs b/crates/polars-stream/src/async_executor/mod.rs index dec560845b09..243109e5facb 100644 --- a/crates/polars-stream/src/async_executor/mod.rs +++ b/crates/polars-stream/src/async_executor/mod.rs @@ -1,12 +1,16 @@ +#![allow(clippy::disallowed_types)] + mod park_group; mod task; use std::cell::{Cell, UnsafeCell}; +use std::collections::HashMap; use std::future::Future; use std::marker::PhantomData; -use std::panic::AssertUnwindSafe; -use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; -use std::sync::{Arc, OnceLock, Weak}; +use std::panic::{AssertUnwindSafe, Location}; +use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}; +use std::sync::{Arc, LazyLock, OnceLock, Weak}; +use std::time::Duration; use crossbeam_deque::{Injector, Steal, Stealer, Worker as WorkQueue}; use crossbeam_utils::CachePadded; @@ -30,6 +34,27 @@ thread_local!( static TLS_THREAD_ID: Cell = const { Cell::new(usize::MAX) }; ); +static NS_SPENT_BLOCKED: LazyLock, u64>>> = + LazyLock::new(Mutex::default); + +static TRACK_WAIT_STATISTICS: AtomicBool = AtomicBool::new(false); + +pub fn track_task_wait_statistics(should_track: bool) { + TRACK_WAIT_STATISTICS.store(should_track, Ordering::Relaxed); +} + +pub fn get_task_wait_statistics() -> Vec<(&'static Location<'static>, Duration)> { + NS_SPENT_BLOCKED + .lock() + .iter() + .map(|(l, ns)| (*l, Duration::from_nanos(*ns))) + .collect() +} + +pub fn clear_task_wait_statistics() { + NS_SPENT_BLOCKED.lock().clear() +} + slotmap::new_key_type! { struct TaskKey; } @@ -48,6 +73,8 @@ struct ScopedTaskMetadata { } struct TaskMetadata { + spawn_location: &'static Location<'static>, + ns_spent_blocked: AtomicU64, priority: TaskPriority, freshly_spawned: AtomicBool, scoped: Option, @@ -55,6 +82,10 @@ struct TaskMetadata { impl Drop for TaskMetadata { fn drop(&mut self) { + *NS_SPENT_BLOCKED + .lock() + .entry(self.spawn_location) + .or_default() += self.ns_spent_blocked.load(Ordering::Relaxed); if let Some(scoped) = &self.scoped { if let Some(completed_tasks) = scoped.completed_tasks.upgrade() { completed_tasks.lock().push(scoped.task_key); @@ -182,6 +213,7 @@ impl Executor { let mut rng = SmallRng::from_rng(&mut rand::thread_rng()).unwrap(); let mut worker = self.park_group.new_worker(); + let mut last_block_start = None; loop { let ttl = &self.thread_task_lists[thread]; @@ -206,11 +238,23 @@ impl Executor { if let Some(task) = self.try_steal_task(thread, &mut rng) { return Some(task); } + + if last_block_start.is_none() && TRACK_WAIT_STATISTICS.load(Ordering::Relaxed) { + last_block_start = Some(std::time::Instant::now()); + } park.park(); None })(); if let Some(task) = task { + if let Some(t) = last_block_start.take() { + if TRACK_WAIT_STATISTICS.load(Ordering::Relaxed) { + let ns: u64 = t.elapsed().as_nanos().try_into().unwrap(); + task.metadata() + .ns_spent_blocked + .fetch_add(ns, Ordering::Relaxed); + } + } worker.recruit_next(); task.run(); } @@ -280,6 +324,7 @@ impl<'scope, 'env> TaskScope<'scope, 'env> { } } + #[track_caller] pub fn spawn_task( &self, priority: TaskPriority, @@ -288,6 +333,7 @@ impl<'scope, 'env> TaskScope<'scope, 'env> { where ::Output: Send + 'static, { + let spawn_location = Location::caller(); self.clear_completed_tasks(); let mut runnable = None; @@ -301,6 +347,8 @@ impl<'scope, 'env> TaskScope<'scope, 'env> { fut, on_wake, TaskMetadata { + spawn_location, + ns_spent_blocked: AtomicU64::new(0), priority, freshly_spawned: AtomicBool::new(true), scoped: Some(ScopedTaskMetadata { @@ -345,16 +393,20 @@ where } } +#[track_caller] pub fn spawn(priority: TaskPriority, fut: F) -> JoinHandle where ::Output: Send + 'static, { + let spawn_location = Location::caller(); let executor = Executor::global(); let on_wake = move |task| executor.schedule_task(task); let (runnable, join_handle) = task::spawn( fut, on_wake, TaskMetadata { + spawn_location, + ns_spent_blocked: AtomicU64::new(0), priority, freshly_spawned: AtomicBool::new(true), scoped: None, diff --git a/crates/polars-stream/src/execute.rs b/crates/polars-stream/src/execute.rs index b199c0044e92..2d68cae2c90e 100644 --- a/crates/polars-stream/src/execute.rs +++ b/crates/polars-stream/src/execute.rs @@ -220,12 +220,21 @@ fn run_subgraph( } // Wait until all tasks are done. - polars_io::pl_async::get_runtime().block_on(async move { + // Only now do we turn on/off wait statistics tracking to reduce noise + // from task startup. + if std::env::var("POLARS_TRACK_WAIT_STATS").as_deref() == Ok("1") { + async_executor::track_task_wait_statistics(true); + } + let ret = polars_io::pl_async::get_runtime().block_on(async move { for handle in join_handles { handle.await?; } PolarsResult::Ok(()) - }) + }); + if std::env::var("POLARS_TRACK_WAIT_STATS").as_deref() == Ok("1") { + async_executor::track_task_wait_statistics(false); + } + ret })?; Ok(()) diff --git a/crates/polars-stream/src/skeleton.rs b/crates/polars-stream/src/skeleton.rs index 20ca189de9e0..cae0c97886f4 100644 --- a/crates/polars-stream/src/skeleton.rs +++ b/crates/polars-stream/src/skeleton.rs @@ -1,4 +1,6 @@ #![allow(unused)] // TODO: remove me +use std::cmp::Reverse; + use polars_core::prelude::*; use polars_core::POOL; use polars_expr::planner::{create_physical_expr, get_expr_depth_limit, ExpressionConversionState}; @@ -35,6 +37,16 @@ pub fn run_query( } let (mut graph, phys_to_graph) = crate::physical_plan::physical_plan_to_graph(root, &phys_sm, expr_arena)?; + crate::async_executor::clear_task_wait_statistics(); let mut results = crate::execute::execute_graph(&mut graph)?; + if std::env::var("POLARS_TRACK_WAIT_STATS").as_deref() == Ok("1") { + let mut stats = crate::async_executor::get_task_wait_statistics(); + stats.sort_by_key(|(_l, w)| Reverse(*w)); + eprintln!("Time spent waiting for async tasks:"); + for (loc, wait_time) in stats { + eprintln!("{}:{} - {:?}", loc.file(), loc.line(), wait_time); + } + } + Ok(results.remove(phys_to_graph[root]).unwrap()) }