From 943ddcd37e581242abd1b0eaeb9be4208ea65f8b Mon Sep 17 00:00:00 2001 From: Emile Fugulin Date: Thu, 9 May 2024 14:22:17 -0400 Subject: [PATCH 1/2] Add a builder for arbiter --- actix-rt/CHANGES.md | 2 + actix-rt/src/arbiter.rs | 138 ++++++++++++++++++++++++++++++++-------- actix-rt/src/lib.rs | 2 +- actix-rt/tests/tests.rs | 22 +++++++ 4 files changed, 138 insertions(+), 26 deletions(-) diff --git a/actix-rt/CHANGES.md b/actix-rt/CHANGES.md index 18466fa6f..4aadc419a 100644 --- a/actix-rt/CHANGES.md +++ b/actix-rt/CHANGES.md @@ -2,6 +2,8 @@ ## Unreleased +- Add `actix_rt::ArbiterBuilder` to allow user to configure the thread spawned for the arbiter. + ## 2.10.0 - Relax `F`'s bound (`Fn => FnOnce`) on `{Arbiter, System}::with_tokio_rt()` functions. diff --git a/actix-rt/src/arbiter.rs b/actix-rt/src/arbiter.rs index 1da76c525..574fd6f10 100644 --- a/actix-rt/src/arbiter.rs +++ b/actix-rt/src/arbiter.rs @@ -80,42 +80,79 @@ impl ArbiterHandle { } } -/// An Arbiter represents a thread that provides an asynchronous execution environment for futures -/// and functions. -/// -/// When an arbiter is created, it spawns a new [OS thread](thread), and hosts an event loop. -#[derive(Debug)] -pub struct Arbiter { - tx: mpsc::UnboundedSender, - thread_handle: thread::JoinHandle<()>, +/// A builder for configuring and spawning a new [Arbiter] thread. +pub struct ArbiterBuilder { + name_factory: Option String + 'static>>, + #[cfg(not(all(target_os = "linux", feature = "io-uring")))] + runtime_factory: Option tokio::runtime::Runtime + Send + 'static>>, } -impl Arbiter { - /// Spawn a new Arbiter thread and start its event loop. - /// - /// # Panics - /// Panics if a [System] is not registered on the current thread. - #[cfg(not(all(target_os = "linux", feature = "io-uring")))] +impl ArbiterBuilder { + /// Create a new [ArbiterBuilder]. #[allow(clippy::new_without_default)] - pub fn new() -> Arbiter { - Self::with_tokio_rt(|| { - crate::runtime::default_tokio_runtime().expect("Cannot create new Arbiter's Runtime.") - }) + pub fn new() -> Self { + Self { + name_factory: None, + #[cfg(not(all(target_os = "linux", feature = "io-uring")))] + runtime_factory: None, + } } - /// Spawn a new Arbiter using the [Tokio Runtime](tokio-runtime) returned from a closure. + /// Specify a factory function for generating the name of the Arbiter thread. + /// + /// Defaults to `actix-rt|system:|arbiter:` + /// + /// # Example + /// + /// ```no_run + /// let _ = actix_rt::System::new(); + /// actix_rt::ArbiterBuilder::new() + /// .name(|system_id, arb_id| { + /// format!("some-prefix|system:{}|arbiter:{}", system_id, arb_id) + /// }) + /// .build(); + /// ``` + pub fn name(mut self, name_factory: N) -> Self + where + N: Fn(usize, usize) -> String + 'static, + { + self.name_factory = Some(Box::new(name_factory)); + self + } + + /// Specify a factory function for generating the [Tokio Runtime](tokio-runtime) used by the Arbiter. /// /// [tokio-runtime]: tokio::runtime::Runtime #[cfg(not(all(target_os = "linux", feature = "io-uring")))] - pub fn with_tokio_rt(runtime_factory: F) -> Arbiter + pub fn runtime(mut self, runtime_factory: R) -> Self where - F: FnOnce() -> tokio::runtime::Runtime + Send + 'static, + R: FnOnce() -> tokio::runtime::Runtime + Send + 'static, { + self.runtime_factory = Some(Box::new(runtime_factory)); + self + } + + /// Spawn a new Arbiter thread and start its event loop. + /// + /// # Panics + /// Panics if a [System] is not registered on the current thread. + #[cfg(not(all(target_os = "linux", feature = "io-uring")))] + pub fn build(self) -> Arbiter { let sys = System::current(); let system_id = sys.id(); let arb_id = COUNT.fetch_add(1, Ordering::Relaxed); - let name = format!("actix-rt|system:{}|arbiter:{}", system_id, arb_id); + let name = self.name_factory.unwrap_or_else(|| { + Box::new(|system_id, arb_id| { + format!("actix-rt|system:{}|arbiter:{}", system_id, arb_id) + }) + })(system_id, arb_id); + let runtime_factory = self.runtime_factory.unwrap_or_else(|| { + Box::new(|| { + crate::runtime::default_tokio_runtime() + .expect("Cannot create new Arbiter's Runtime.") + }) + }); let (tx, rx) = mpsc::unbounded_channel(); let (ready_tx, ready_rx) = std::sync::mpsc::channel::<()>(); @@ -160,13 +197,16 @@ impl Arbiter { /// # Panics /// Panics if a [System] is not registered on the current thread. #[cfg(all(target_os = "linux", feature = "io-uring"))] - #[allow(clippy::new_without_default)] - pub fn new() -> Arbiter { + pub fn build(self) -> Arbiter { let sys = System::current(); let system_id = sys.id(); let arb_id = COUNT.fetch_add(1, Ordering::Relaxed); - let name = format!("actix-rt|system:{}|arbiter:{}", system_id, arb_id); + let name = self.name_factory.unwrap_or_else(|| { + Box::new(|system_id, arb_id| { + format!("actix-rt|system:{}|arbiter:{}", system_id, arb_id) + }) + })(system_id, arb_id); let (tx, rx) = mpsc::unbounded_channel(); let (ready_tx, ready_rx) = std::sync::mpsc::channel::<()>(); @@ -204,6 +244,54 @@ impl Arbiter { Arbiter { tx, thread_handle } } +} + +/// An Arbiter represents a thread that provides an asynchronous execution environment for futures +/// and functions. +/// +/// When an arbiter is created, it spawns a new [OS thread](thread), and hosts an event loop. +#[derive(Debug)] +pub struct Arbiter { + tx: mpsc::UnboundedSender, + thread_handle: thread::JoinHandle<()>, +} + +impl Arbiter { + /// Create an [ArbiterBuilder] to configure and spawn a new Arbiter thread. + pub fn builder() -> ArbiterBuilder { + ArbiterBuilder::new() + } + + /// Spawn a new Arbiter thread and start its event loop. + /// + /// # Panics + /// Panics if a [System] is not registered on the current thread. + #[cfg(not(all(target_os = "linux", feature = "io-uring")))] + #[allow(clippy::new_without_default)] + pub fn new() -> Arbiter { + ArbiterBuilder::new().build() + } + + /// Spawn a new Arbiter using the [Tokio Runtime](tokio-runtime) returned from a closure. + /// + /// [tokio-runtime]: tokio::runtime::Runtime + #[cfg(not(all(target_os = "linux", feature = "io-uring")))] + pub fn with_tokio_rt(runtime_factory: F) -> Arbiter + where + F: FnOnce() -> tokio::runtime::Runtime + Send + 'static, + { + ArbiterBuilder::new().runtime(runtime_factory).build() + } + + /// Spawn a new Arbiter thread and start its event loop with `tokio-uring` runtime. + /// + /// # Panics + /// Panics if a [System] is not registered on the current thread. + #[cfg(all(target_os = "linux", feature = "io-uring"))] + #[allow(clippy::new_without_default)] + pub fn new() -> Arbiter { + ArbiterBuilder::new().build() + } /// Sets up an Arbiter runner in a new System using the environment's local set. pub(crate) fn in_new_system() -> ArbiterHandle { diff --git a/actix-rt/src/lib.rs b/actix-rt/src/lib.rs index fc2a56ba2..987d4ca7e 100644 --- a/actix-rt/src/lib.rs +++ b/actix-rt/src/lib.rs @@ -67,7 +67,7 @@ pub use tokio::pin; use tokio::task::JoinHandle; pub use self::{ - arbiter::{Arbiter, ArbiterHandle}, + arbiter::{Arbiter, ArbiterBuilder, ArbiterHandle}, runtime::Runtime, system::{System, SystemRunner}, }; diff --git a/actix-rt/tests/tests.rs b/actix-rt/tests/tests.rs index 330e27ff9..bd6e1a1e9 100644 --- a/actix-rt/tests/tests.rs +++ b/actix-rt/tests/tests.rs @@ -301,6 +301,28 @@ fn new_arbiter_with_tokio() { assert!(!counter.load(Ordering::SeqCst)); } +#[test] +fn arbiter_builder_name() { + let _ = System::new(); + + let arbiter = Arbiter::builder() + .name(|_, _| "test_thread".to_string()) + .build(); + + let (tx, rx) = std::sync::mpsc::channel(); + arbiter.spawn(async move { + let current_thread = std::thread::current(); + let thread_name = current_thread.name().unwrap().to_string(); + tx.send(thread_name).unwrap(); + }); + + let name = rx.recv().unwrap(); + assert_eq!(name, "test_thread"); + + arbiter.stop(); + arbiter.join().unwrap(); +} + #[test] #[should_panic] fn no_system_current_panic() { From 4a6322ba343ff33501608ac2235f003b048f05d0 Mon Sep 17 00:00:00 2001 From: Emile Fugulin Date: Fri, 10 May 2024 12:43:54 -0400 Subject: [PATCH 2/2] Add alive method on arbiter --- actix-rt/CHANGES.md | 1 + actix-rt/src/arbiter.rs | 14 ++++++++++++++ 2 files changed, 15 insertions(+) diff --git a/actix-rt/CHANGES.md b/actix-rt/CHANGES.md index 4aadc419a..207845366 100644 --- a/actix-rt/CHANGES.md +++ b/actix-rt/CHANGES.md @@ -3,6 +3,7 @@ ## Unreleased - Add `actix_rt::ArbiterBuilder` to allow user to configure the thread spawned for the arbiter. +- Add `Arbiter::alive` and `ArbiterHandle::alive` to check is the arbiter is still alive. ## 2.10.0 diff --git a/actix-rt/src/arbiter.rs b/actix-rt/src/arbiter.rs index 574fd6f10..192435d2f 100644 --- a/actix-rt/src/arbiter.rs +++ b/actix-rt/src/arbiter.rs @@ -71,6 +71,13 @@ impl ArbiterHandle { self.spawn(async { f() }) } + /// Check if the [Arbiter] is still alive. + /// + /// Returns false if the [Arbiter] has been dropped, returns true otherwise. + pub fn alive(&self) -> bool { + !self.tx.is_closed() + } + /// Instruct [Arbiter] to stop processing it's event loop. /// /// Returns true if stop message was sent successfully and false if the [Arbiter] has @@ -367,6 +374,13 @@ impl Arbiter { self.spawn(async { f() }) } + /// Check if the [Arbiter] is still alive. + /// + /// Returns false if the [Arbiter] has been dropped, returns true otherwise. + pub fn alive(&self) -> bool { + !self.tx.is_closed() + } + /// Wait for Arbiter's event loop to complete. /// /// Joins the underlying OS thread handle. See [`JoinHandle::join`](thread::JoinHandle::join).