Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a builder for arbiter + alive method #557

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions actix-rt/CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

## Unreleased

- Add `actix_rt::ArbiterBuilder` to allow user to configure the thread spawned for the arbiter.
- Add `Arbiter::alive` and `ArbiterHandle::alive` to check is the arbiter is still alive.

## 2.10.0

- Relax `F`'s bound (`Fn => FnOnce`) on `{Arbiter, System}::with_tokio_rt()` functions.
Expand Down
152 changes: 127 additions & 25 deletions actix-rt/src/arbiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,13 @@ impl ArbiterHandle {
self.spawn(async { f() })
}

/// Check if the [Arbiter] is still alive.
///
/// Returns false if the [Arbiter] has been dropped, returns true otherwise.
pub fn alive(&self) -> bool {
!self.tx.is_closed()
}

/// Instruct [Arbiter] to stop processing it's event loop.
///
/// Returns true if stop message was sent successfully and false if the [Arbiter] has
Expand All @@ -80,42 +87,79 @@ impl ArbiterHandle {
}
}

/// An Arbiter represents a thread that provides an asynchronous execution environment for futures
/// and functions.
///
/// When an arbiter is created, it spawns a new [OS thread](thread), and hosts an event loop.
#[derive(Debug)]
pub struct Arbiter {
tx: mpsc::UnboundedSender<ArbiterCommand>,
thread_handle: thread::JoinHandle<()>,
/// A builder for configuring and spawning a new [Arbiter] thread.
pub struct ArbiterBuilder {
name_factory: Option<Box<dyn Fn(usize, usize) -> String + 'static>>,
#[cfg(not(all(target_os = "linux", feature = "io-uring")))]
runtime_factory: Option<Box<dyn FnOnce() -> tokio::runtime::Runtime + Send + 'static>>,
}

impl Arbiter {
/// Spawn a new Arbiter thread and start its event loop.
///
/// # Panics
/// Panics if a [System] is not registered on the current thread.
#[cfg(not(all(target_os = "linux", feature = "io-uring")))]
impl ArbiterBuilder {
/// Create a new [ArbiterBuilder].
#[allow(clippy::new_without_default)]
pub fn new() -> Arbiter {
Self::with_tokio_rt(|| {
crate::runtime::default_tokio_runtime().expect("Cannot create new Arbiter's Runtime.")
})
pub fn new() -> Self {
Self {
name_factory: None,
#[cfg(not(all(target_os = "linux", feature = "io-uring")))]
runtime_factory: None,
}
}

/// Spawn a new Arbiter using the [Tokio Runtime](tokio-runtime) returned from a closure.
/// Specify a factory function for generating the name of the Arbiter thread.
///
/// Defaults to `actix-rt|system:<system_id>|arbiter:<arb_id>`
///
/// # Example
///
/// ```no_run
/// let _ = actix_rt::System::new();
/// actix_rt::ArbiterBuilder::new()
/// .name(|system_id, arb_id| {
/// format!("some-prefix|system:{}|arbiter:{}", system_id, arb_id)
/// })
/// .build();
/// ```
pub fn name<N>(mut self, name_factory: N) -> Self
where
N: Fn(usize, usize) -> String + 'static,
{
self.name_factory = Some(Box::new(name_factory));
self
}

/// Specify a factory function for generating the [Tokio Runtime](tokio-runtime) used by the Arbiter.
///
/// [tokio-runtime]: tokio::runtime::Runtime
#[cfg(not(all(target_os = "linux", feature = "io-uring")))]
pub fn with_tokio_rt<F>(runtime_factory: F) -> Arbiter
pub fn runtime<R>(mut self, runtime_factory: R) -> Self
where
F: FnOnce() -> tokio::runtime::Runtime + Send + 'static,
R: FnOnce() -> tokio::runtime::Runtime + Send + 'static,
{
self.runtime_factory = Some(Box::new(runtime_factory));
self
}

/// Spawn a new Arbiter thread and start its event loop.
///
/// # Panics
/// Panics if a [System] is not registered on the current thread.
#[cfg(not(all(target_os = "linux", feature = "io-uring")))]
pub fn build(self) -> Arbiter {
let sys = System::current();
let system_id = sys.id();
let arb_id = COUNT.fetch_add(1, Ordering::Relaxed);

let name = format!("actix-rt|system:{}|arbiter:{}", system_id, arb_id);
let name = self.name_factory.unwrap_or_else(|| {
Box::new(|system_id, arb_id| {
format!("actix-rt|system:{}|arbiter:{}", system_id, arb_id)
})
})(system_id, arb_id);
let runtime_factory = self.runtime_factory.unwrap_or_else(|| {
Box::new(|| {
crate::runtime::default_tokio_runtime()
.expect("Cannot create new Arbiter's Runtime.")
})
});
let (tx, rx) = mpsc::unbounded_channel();

let (ready_tx, ready_rx) = std::sync::mpsc::channel::<()>();
Expand Down Expand Up @@ -160,13 +204,16 @@ impl Arbiter {
/// # Panics
/// Panics if a [System] is not registered on the current thread.
#[cfg(all(target_os = "linux", feature = "io-uring"))]
#[allow(clippy::new_without_default)]
pub fn new() -> Arbiter {
pub fn build(self) -> Arbiter {
let sys = System::current();
let system_id = sys.id();
let arb_id = COUNT.fetch_add(1, Ordering::Relaxed);

let name = format!("actix-rt|system:{}|arbiter:{}", system_id, arb_id);
let name = self.name_factory.unwrap_or_else(|| {
Box::new(|system_id, arb_id| {
format!("actix-rt|system:{}|arbiter:{}", system_id, arb_id)
})
})(system_id, arb_id);
let (tx, rx) = mpsc::unbounded_channel();

let (ready_tx, ready_rx) = std::sync::mpsc::channel::<()>();
Expand Down Expand Up @@ -204,6 +251,54 @@ impl Arbiter {

Arbiter { tx, thread_handle }
}
}

/// An Arbiter represents a thread that provides an asynchronous execution environment for futures
/// and functions.
///
/// When an arbiter is created, it spawns a new [OS thread](thread), and hosts an event loop.
#[derive(Debug)]
pub struct Arbiter {
tx: mpsc::UnboundedSender<ArbiterCommand>,
thread_handle: thread::JoinHandle<()>,
}

impl Arbiter {
/// Create an [ArbiterBuilder] to configure and spawn a new Arbiter thread.
pub fn builder() -> ArbiterBuilder {
ArbiterBuilder::new()
}

/// Spawn a new Arbiter thread and start its event loop.
///
/// # Panics
/// Panics if a [System] is not registered on the current thread.
#[cfg(not(all(target_os = "linux", feature = "io-uring")))]
#[allow(clippy::new_without_default)]
pub fn new() -> Arbiter {
ArbiterBuilder::new().build()
}

/// Spawn a new Arbiter using the [Tokio Runtime](tokio-runtime) returned from a closure.
///
/// [tokio-runtime]: tokio::runtime::Runtime
#[cfg(not(all(target_os = "linux", feature = "io-uring")))]
pub fn with_tokio_rt<F>(runtime_factory: F) -> Arbiter
where
F: FnOnce() -> tokio::runtime::Runtime + Send + 'static,
{
ArbiterBuilder::new().runtime(runtime_factory).build()
}

/// Spawn a new Arbiter thread and start its event loop with `tokio-uring` runtime.
///
/// # Panics
/// Panics if a [System] is not registered on the current thread.
#[cfg(all(target_os = "linux", feature = "io-uring"))]
#[allow(clippy::new_without_default)]
pub fn new() -> Arbiter {
ArbiterBuilder::new().build()
}

/// Sets up an Arbiter runner in a new System using the environment's local set.
pub(crate) fn in_new_system() -> ArbiterHandle {
Expand Down Expand Up @@ -279,6 +374,13 @@ impl Arbiter {
self.spawn(async { f() })
}

/// Check if the [Arbiter] is still alive.
///
/// Returns false if the [Arbiter] has been dropped, returns true otherwise.
pub fn alive(&self) -> bool {
!self.tx.is_closed()
}

/// Wait for Arbiter's event loop to complete.
///
/// Joins the underlying OS thread handle. See [`JoinHandle::join`](thread::JoinHandle::join).
Expand Down
2 changes: 1 addition & 1 deletion actix-rt/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ pub use tokio::pin;
use tokio::task::JoinHandle;

pub use self::{
arbiter::{Arbiter, ArbiterHandle},
arbiter::{Arbiter, ArbiterBuilder, ArbiterHandle},
runtime::Runtime,
system::{System, SystemRunner},
};
Expand Down
22 changes: 22 additions & 0 deletions actix-rt/tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,28 @@ fn new_arbiter_with_tokio() {
assert!(!counter.load(Ordering::SeqCst));
}

#[test]
fn arbiter_builder_name() {
let _ = System::new();

let arbiter = Arbiter::builder()
.name(|_, _| "test_thread".to_string())
.build();

let (tx, rx) = std::sync::mpsc::channel();
arbiter.spawn(async move {
let current_thread = std::thread::current();
let thread_name = current_thread.name().unwrap().to_string();
tx.send(thread_name).unwrap();
});

let name = rx.recv().unwrap();
assert_eq!(name, "test_thread");

arbiter.stop();
arbiter.join().unwrap();
}

#[test]
#[should_panic]
fn no_system_current_panic() {
Expand Down
Loading