Skip to content

Commit

Permalink
Remove coordinator and support forking (#1067)
Browse files Browse the repository at this point in the history
This PR removes the coordinator (a.k.a. controller) thread, and allows
temporarily terminating and restarting all GC worker threads in order to
support forking.

Major changes include:

- `GCController` is removed. All synchronization mechanisms involving
the controller are removed, too. Important synchronization operations,
such as opening buckets and declaring GC finished, are done by the last
parked worker. The work packet `EndOfGC` is removed, and its job is now
done by the last parked worker.
- The `WorkerMonitor`, which previously synchronizes between GC workers,
now also synchronizes between mutators and GC workers. This allows
mutators to trigger GC by directly communicating with GC workers.
- Introduced a new mechanism: "goals". Mutators can now request "goals",
and GC workers will work on one goal at a time. Currently, a "goal" can
be either GC or StopForFork. Triggering GC is now implemented as
requesting the GC goal.
- Added a pair of new APIs, namely `MMTK::prepare_to_fork()` and
`MMTK::after_fork()`. VM bindings call them before and after forking to
let the MMTK instance do proper preparation for forking.

Fixes: #1053
Fixes: #1054
  • Loading branch information
wks authored Apr 9, 2024
1 parent 86518d2 commit 5ab62f9
Show file tree
Hide file tree
Showing 21 changed files with 1,277 additions and 579 deletions.
3 changes: 0 additions & 3 deletions docs/header/mmtk.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,6 @@ extern void mmtk_scan_region();
// Request MMTk to trigger a GC. Note that this may not actually trigger a GC
extern void mmtk_handle_user_collection_request(void* tls);

// Run the main loop for the GC controller thread. Does not return
extern void mmtk_start_control_collector(void* tls, void* worker);

// Run the main loop for a GC worker. Does not return
extern void mmtk_start_worker(void* tls, void* worker);

Expand Down
6 changes: 6 additions & 0 deletions src/global_state.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Mutex;
use std::time::Instant;

use atomic_refcell::AtomicRefCell;

/// This stores some global states for an MMTK instance.
/// Some MMTK components like plans and allocators may keep an reference to the struct, and can access it.
Expand All @@ -15,6 +18,8 @@ pub struct GlobalState {
pub(crate) initialized: AtomicBool,
/// The current GC status.
pub(crate) gc_status: Mutex<GcStatus>,
/// When did the last GC start? Only accessed by the last parked worker.
pub(crate) gc_start_time: AtomicRefCell<Option<Instant>>,
/// Is the current GC an emergency collection? Emergency means we may run out of memory soon, and we should
/// attempt to collect as much as we can.
pub(crate) emergency_collection: AtomicBool,
Expand Down Expand Up @@ -195,6 +200,7 @@ impl Default for GlobalState {
Self {
initialized: AtomicBool::new(false),
gc_status: Mutex::new(GcStatus::NotInGC),
gc_start_time: AtomicRefCell::new(None),
stacks_prepared: AtomicBool::new(false),
emergency_collection: AtomicBool::new(false),
user_triggered_collection: AtomicBool::new(false),
Expand Down
49 changes: 8 additions & 41 deletions src/memory_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::mmtk::MMTK;
use crate::plan::AllocationSemantics;
use crate::plan::{Mutator, MutatorContext};
use crate::scheduler::WorkBucketStage;
use crate::scheduler::{GCController, GCWork, GCWorker};
use crate::scheduler::{GCWork, GCWorker};
use crate::util::alloc::allocators::AllocatorSelector;
use crate::util::constants::{LOG_BYTES_IN_PAGE, MIN_OBJECT_SIZE};
use crate::util::heap::layout::vm_layout::vm_layout;
Expand All @@ -25,7 +25,7 @@ use crate::util::{Address, ObjectReference};
use crate::vm::edge_shape::MemorySlice;
use crate::vm::ReferenceGlue;
use crate::vm::VMBinding;
use std::sync::atomic::Ordering;

/// Initialize an MMTk instance. A VM should call this method after creating an [`crate::MMTK`]
/// instance but before using any of the methods provided in MMTk (except `process()` and `process_bulk()`).
///
Expand Down Expand Up @@ -438,6 +438,7 @@ pub fn free_with_size<VM: VMBinding>(mmtk: &MMTK<VM>, addr: Address, old_size: u
/// Get the current active malloc'd bytes. Here MMTk only accounts for bytes that are done through those 'counted malloc' functions.
#[cfg(feature = "malloc_counted_size")]
pub fn get_malloc_bytes<VM: VMBinding>(mmtk: &MMTK<VM>) -> usize {
use std::sync::atomic::Ordering;
mmtk.state.malloc_bytes.load(Ordering::SeqCst)
}

Expand All @@ -460,53 +461,18 @@ pub fn gc_poll<VM: VMBinding>(mmtk: &MMTK<VM>, tls: VMMutatorThread) {
}
}

/// Run the main loop for the GC controller thread. This method does not return.
///
/// Arguments:
/// * `tls`: The thread that will be used as the GC controller.
/// * `gc_controller`: The execution context of the GC controller threa.
/// It is the `GCController` passed to `Collection::spawn_gc_thread`.
/// * `mmtk`: A reference to an MMTk instance.
pub fn start_control_collector<VM: VMBinding>(
_mmtk: &'static MMTK<VM>,
tls: VMWorkerThread,
gc_controller: &mut GCController<VM>,
) {
gc_controller.run(tls);
}

/// Run the main loop of a GC worker. This method does not return.
///
/// Arguments:
/// * `tls`: The thread that will be used as the GC worker.
/// * `worker`: The execution context of the GC worker thread.
/// It is the `GCWorker` passed to `Collection::spawn_gc_thread`.
/// * `mmtk`: A reference to an MMTk instance.
/// Wrapper for [`crate::scheduler::GCWorker::run`].
pub fn start_worker<VM: VMBinding>(
mmtk: &'static MMTK<VM>,
tls: VMWorkerThread,
worker: &mut GCWorker<VM>,
worker: Box<GCWorker<VM>>,
) {
worker.run(tls, mmtk);
}

/// Initialize the scheduler and GC workers that are required for doing garbage collections.
/// This is a mandatory call for a VM during its boot process once its thread system
/// is ready. This should only be called once. This call will invoke Collection::spawn_gc_thread()
/// to create GC threads.
///
/// Arguments:
/// * `mmtk`: A reference to an MMTk instance.
/// * `tls`: The thread that wants to enable the collection. This value will be passed back to the VM in
/// Collection::spawn_gc_thread() so that the VM knows the context.
/// Wrapper for [`crate::mmtk::MMTK::initialize_collection`].
pub fn initialize_collection<VM: VMBinding>(mmtk: &'static MMTK<VM>, tls: VMThread) {
assert!(
!mmtk.state.is_initialized(),
"MMTk collection has been initialized (was initialize_collection() already called before?)"
);
mmtk.scheduler.spawn_gc_threads(mmtk, tls);
mmtk.state.initialized.store(true, Ordering::SeqCst);
probe!(mmtk, collection_initialized);
mmtk.initialize_collection(tls);
}

/// Process MMTk run-time options. Returns true if the option is processed successfully.
Expand Down Expand Up @@ -554,6 +520,7 @@ pub fn free_bytes<VM: VMBinding>(mmtk: &MMTK<VM>) -> usize {
/// to call this method is at the end of a GC (e.g. when the runtime is about to resume threads).
#[cfg(feature = "count_live_bytes_in_gc")]
pub fn live_bytes_in_last_gc<VM: VMBinding>(mmtk: &MMTK<VM>) -> usize {
use std::sync::atomic::Ordering;
mmtk.state.live_bytes_in_last_gc.load(Ordering::SeqCst)
}

Expand Down
91 changes: 90 additions & 1 deletion src/mmtk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ impl<VM: VMBinding> MMTK<VM> {

let state = Arc::new(GlobalState::default());

let gc_requester = Arc::new(GCRequester::new());
let gc_requester = Arc::new(GCRequester::new(scheduler.clone()));

let gc_trigger = Arc::new(GCTrigger::new(
options.clone(),
Expand Down Expand Up @@ -220,6 +220,93 @@ impl<VM: VMBinding> MMTK<VM> {
}
}

/// Initialize the GC worker threads that are required for doing garbage collections.
/// This is a mandatory call for a VM during its boot process once its thread system
/// is ready.
///
/// Internally, this function will invoke [`Collection::spawn_gc_thread()`] to spawn GC worker
/// threads.
///
/// # Arguments
///
/// * `tls`: The thread that wants to enable the collection. This value will be passed back
/// to the VM in [`Collection::spawn_gc_thread()`] so that the VM knows the context.
///
/// [`Collection::spawn_gc_thread()`]: crate::vm::Collection::spawn_gc_thread()
pub fn initialize_collection(&'static self, tls: VMThread) {
assert!(
!self.state.is_initialized(),
"MMTk collection has been initialized (was initialize_collection() already called before?)"
);
self.scheduler.spawn_gc_threads(self, tls);
self.state.initialized.store(true, Ordering::SeqCst);
probe!(mmtk, collection_initialized);
}

/// Prepare an MMTk instance for calling the `fork()` system call.
///
/// The `fork()` system call is available on Linux and some UNIX variants, and may be emulated
/// on other platforms by libraries such as Cygwin. The properties of the `fork()` system call
/// requires the users to do some preparation before calling it.
///
/// - **Multi-threading**: If `fork()` is called when the process has multiple threads, it
/// will only duplicate the current thread into the child process, and the child process can
/// only call async-signal-safe functions, notably `exec()`. For VMs that that use
/// multi-process concurrency, it is imperative that when calling `fork()`, only one thread may
/// exist in the process.
///
/// - **File descriptors**: The child process inherits copies of the parent's set of open
/// file descriptors. This may or may not be desired depending on use cases.
///
/// This function helps VMs that use `fork()` for multi-process concurrency. It instructs all
/// GC threads to save their contexts and return from their entry-point functions. Currently,
/// such threads only include GC workers, and the entry point is
/// [`crate::memory_manager::start_worker`]. A subsequent call to `MMTK::after_fork()` will
/// re-spawn the threads using their saved contexts. The VM must not allocate objects in the
/// MMTk heap before calling `MMTK::after_fork()`.
///
/// TODO: Currently, the MMTk core does not keep any files open for a long time. In the
/// future, this function and the `after_fork` function may be used for handling open file
/// descriptors across invocations of `fork()`. One possible use case is logging GC activities
/// and statistics to files, such as performing heap dumps across multiple GCs.
///
/// If a VM intends to execute another program by calling `fork()` and immediately calling
/// `exec`, it may skip this function because the state of the MMTk instance will be irrelevant
/// in that case.
///
/// # Caution!
///
/// This function sends an asynchronous message to GC threads and returns immediately, but it
/// is only safe for the VM to call `fork()` after the underlying **native threads** of the GC
/// threads have exited. After calling this function, the VM should wait for their underlying
/// native threads to exit in VM-specific manner before calling `fork()`.
pub fn prepare_to_fork(&'static self) {
assert!(
self.state.is_initialized(),
"MMTk collection has not been initialized, yet (was initialize_collection() called before?)"
);
probe!(mmtk, prepare_to_fork);
self.scheduler.stop_gc_threads_for_forking();
}

/// Call this function after the VM called the `fork()` system call.
///
/// This function will re-spawn MMTk threads from saved contexts.
///
/// # Arguments
///
/// * `tls`: The thread that wants to respawn MMTk threads after forking. This value will be
/// passed back to the VM in `Collection::spawn_gc_thread()` so that the VM knows the
/// context.
pub fn after_fork(&'static self, tls: VMThread) {
assert!(
self.state.is_initialized(),
"MMTk collection has not been initialized, yet (was initialize_collection() called before?)"
);
probe!(mmtk, after_fork);
self.scheduler.respawn_gc_threads_after_forking(tls);
}

/// Generic hook to allow benchmarks to be harnessed. MMTk will trigger a GC
/// to clear any residual garbage and start collecting statistics for the benchmark.
/// This is usually called by the benchmark harness as its last step before the actual benchmark.
Expand Down Expand Up @@ -349,6 +436,8 @@ impl<VM: VMBinding> MMTK<VM> {
self.state
.internal_triggered_collection
.store(true, Ordering::Relaxed);
// TODO: The current `GCRequester::request()` is probably incorrect for internally triggered GC.
// Consider removing functions related to "internal triggered collection".
self.gc_requester.request();
}

Expand Down
58 changes: 17 additions & 41 deletions src/plan/gc_requester.rs
Original file line number Diff line number Diff line change
@@ -1,66 +1,42 @@
use crate::scheduler::GCWorkScheduler;
use crate::vm::VMBinding;
use std::marker::PhantomData;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Condvar, Mutex};
use std::sync::Arc;

struct RequestSync {
request_count: isize,
last_request_count: isize,
}

/// GC requester. This object allows other threads to request (trigger) GC,
/// and the GC coordinator thread waits for GC requests using this object.
/// This data structure lets mutators trigger GC.
pub struct GCRequester<VM: VMBinding> {
request_sync: Mutex<RequestSync>,
request_condvar: Condvar,
/// Set by mutators to trigger GC. It is atomic so that mutators can check if GC has already
/// been requested efficiently in `poll` without acquiring any mutex.
request_flag: AtomicBool,
phantom: PhantomData<VM>,
}

// Clippy says we need this...
impl<VM: VMBinding> Default for GCRequester<VM> {
fn default() -> Self {
Self::new()
}
scheduler: Arc<GCWorkScheduler<VM>>,
}

impl<VM: VMBinding> GCRequester<VM> {
pub fn new() -> Self {
pub fn new(scheduler: Arc<GCWorkScheduler<VM>>) -> Self {
GCRequester {
request_sync: Mutex::new(RequestSync {
request_count: 0,
last_request_count: -1,
}),
request_condvar: Condvar::new(),
request_flag: AtomicBool::new(false),
phantom: PhantomData,
scheduler,
}
}

/// Request a GC. Called by mutators when polling (during allocation) and when handling user
/// GC requests (e.g. `System.gc();` in Java).
pub fn request(&self) {
if self.request_flag.load(Ordering::Relaxed) {
return;
}

let mut guard = self.request_sync.lock().unwrap();
if !self.request_flag.load(Ordering::Relaxed) {
self.request_flag.store(true, Ordering::Relaxed);
guard.request_count += 1;
self.request_condvar.notify_all();
if !self.request_flag.swap(true, Ordering::Relaxed) {
// `GCWorkScheduler::request_schedule_collection` needs to hold a mutex to communicate
// with GC workers, which is expensive for functions like `poll`. We use the atomic
// flag `request_flag` to elide the need to acquire the mutex in subsequent calls.
self.scheduler.request_schedule_collection();
}
}

/// Clear the "GC requested" flag so that mutators can trigger the next GC.
/// Called by a GC worker when all mutators have come to a stop.
pub fn clear_request(&self) {
let guard = self.request_sync.lock().unwrap();
self.request_flag.store(false, Ordering::Relaxed);
drop(guard);
}

pub fn wait_for_request(&self) {
let mut guard = self.request_sync.lock().unwrap();
guard.last_request_count += 1;
while guard.last_request_count == guard.request_count {
guard = self.request_condvar.wait(guard).unwrap();
}
}
}
Loading

0 comments on commit 5ab62f9

Please sign in to comment.