Skip to content

Commit

Permalink
Allow to temporarily set the current registry even if it is not assoc…
Browse files Browse the repository at this point in the history
…iated with a worker thread
  • Loading branch information
adamreichold committed May 12, 2024
1 parent b3bd4bc commit 36e5d84
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 7 deletions.
65 changes: 64 additions & 1 deletion rayon-core/src/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,36 @@ fn default_global_registry() -> Result<Arc<Registry>, ThreadPoolBuildError> {
result
}

// This is used to temporarily overwrite the current registry.
//
// This either null, a pointer to the global registry if it was
// ever used to access the global registry or a pointer to a
// registry which is temporarily made current because the current
// thread is not a worker thread but is running a scope associated
// to a specific thread pool.
thread_local! {
static CURRENT_REGISTRY: Cell<*const Arc<Registry>> = const { Cell::new(ptr::null()) };
}

#[cold]
fn set_current_registry_to_global_registry() -> *const Arc<Registry> {
let global = global_registry();

CURRENT_REGISTRY.with(|current_registry| current_registry.set(global));

global
}

fn current_registry() -> *const Arc<Registry> {
let mut current = CURRENT_REGISTRY.with(Cell::get);

if current.is_null() {
current = set_current_registry_to_global_registry();
}

current
}

struct Terminator<'a>(&'a Arc<Registry>);

impl<'a> Drop for Terminator<'a> {
Expand Down Expand Up @@ -315,14 +345,47 @@ impl Registry {
unsafe {
let worker_thread = WorkerThread::current();
let registry = if worker_thread.is_null() {
global_registry()
&*current_registry()
} else {
&(*worker_thread).registry
};
Arc::clone(registry)
}
}

/// Optionally install a specific registry as the current one.
///
/// This is used when a thread which is not a worker executes
/// a scope which should use the specific thread pool instead of
/// the global one.
pub(super) fn with_current<F, R>(registry: Option<&Arc<Registry>>, f: F) -> R
where
F: FnOnce() -> R,
{
struct Guard {
current: *const Arc<Registry>,
}

impl Guard {
fn new(registry: &Arc<Registry>) -> Self {
let current =
CURRENT_REGISTRY.with(|current_registry| current_registry.replace(registry));

Self { current }
}
}

impl Drop for Guard {
fn drop(&mut self) {
CURRENT_REGISTRY.with(|current_registry| current_registry.set(self.current));
}
}

let _guard = registry.map(Guard::new);

f()
}

/// Returns the number of threads in the current registry. This
/// is better than `Registry::current().num_threads()` because it
/// avoids incrementing the `Arc`.
Expand Down
16 changes: 10 additions & 6 deletions rayon-core/src/scope/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -416,9 +416,11 @@ pub(crate) fn do_in_place_scope<'scope, OP, R>(registry: Option<&Arc<Registry>>,
where
OP: FnOnce(&Scope<'scope>) -> R,
{
let thread = unsafe { WorkerThread::current().as_ref() };
let scope = Scope::<'scope>::new(thread, registry);
scope.base.complete(thread, || op(&scope))
Registry::with_current(registry, || {
let thread = unsafe { WorkerThread::current().as_ref() };
let scope = Scope::<'scope>::new(thread, registry);
scope.base.complete(thread, || op(&scope))
})
}

/// Creates a "fork-join" scope `s` with FIFO order, and invokes the
Expand Down Expand Up @@ -453,9 +455,11 @@ pub(crate) fn do_in_place_scope_fifo<'scope, OP, R>(registry: Option<&Arc<Regist
where
OP: FnOnce(&ScopeFifo<'scope>) -> R,
{
let thread = unsafe { WorkerThread::current().as_ref() };
let scope = ScopeFifo::<'scope>::new(thread, registry);
scope.base.complete(thread, || op(&scope))
Registry::with_current(registry, || {
let thread = unsafe { WorkerThread::current().as_ref() };
let scope = ScopeFifo::<'scope>::new(thread, registry);
scope.base.complete(thread, || op(&scope))
})
}

impl<'scope> Scope<'scope> {
Expand Down

0 comments on commit 36e5d84

Please sign in to comment.