Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(profiling): extract SystemProfiler #2794

Draft
wants to merge 9 commits into
base: master
Choose a base branch
from
5 changes: 3 additions & 2 deletions profiling/src/allocation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ use crate::bindings::{
self as zend, datadog_php_install_handler, datadog_php_zif_handler,
ddog_php_prof_copy_long_into_zval,
};
use crate::{PROFILER, PROFILER_NAME, REQUEST_LOCALS};
use crate::profiling::SystemProfiler;
use crate::{PROFILER_NAME, REQUEST_LOCALS};
use lazy_static::lazy_static;
use libc::{c_char, c_int, c_void, size_t};
use log::{debug, error, trace, warn};
Expand Down Expand Up @@ -94,7 +95,7 @@ impl AllocationProfilingStats {

self.next_sampling_interval();

if let Some(profiler) = PROFILER.lock().unwrap().as_ref() {
if let Some(profiler) = SystemProfiler::get() {
// Safety: execute_data was provided by the engine, and the profiler doesn't mutate it.
unsafe {
profiler.collect_allocations(
Expand Down
4 changes: 2 additions & 2 deletions profiling/src/exception.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::profiling::SystemProfiler;
use crate::zend::{self, zend_execute_data, zend_generator, zval, InternalFunctionHandler};
use crate::PROFILER;
use crate::REQUEST_LOCALS;
use log::{error, info};
use rand::rngs::ThreadRng;
Expand Down Expand Up @@ -83,7 +83,7 @@ impl ExceptionProfilingStats {

self.next_sampling_interval();

if let Some(profiler) = PROFILER.lock().unwrap().as_ref() {
if let Some(profiler) = SystemProfiler::get() {
// Safety: execute_data was provided by the engine, and the profiler doesn't mutate it.
unsafe {
profiler.collect_exception(
Expand Down
37 changes: 8 additions & 29 deletions profiling/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,24 +33,20 @@ use lazy_static::lazy_static;
use libc::c_char;
use log::{debug, error, info, trace, warn};
use once_cell::sync::{Lazy, OnceCell};
use profiling::{LocalRootSpanResourceMessage, Profiler, VmInterrupt};
use profiling::{LocalRootSpanResourceMessage, SystemProfiler, VmInterrupt};
use sapi::Sapi;
use std::borrow::Cow;
use std::cell::RefCell;
use std::ffi::CStr;
use std::ops::Deref;
use std::os::raw::c_int;
use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use std::sync::{Arc, Mutex, Once};
use std::sync::{Arc, Once};
use std::time::{Duration, Instant};
use uuid::Uuid;

use crate::zend::datadog_sapi_globals_request_info;

/// The global profiler. Profiler gets made during the first rinit after an
/// minit, and is destroyed on mshutdown.
static PROFILER: Mutex<Option<Profiler>> = Mutex::new(None);

/// Name of the profiling module and zend_extension. Must not contain any
/// interior null bytes and must be null terminated.
static PROFILER_NAME: &[u8] = b"datadog-profiling\0";
Expand Down Expand Up @@ -508,18 +504,7 @@ extern "C" fn rinit(_type: c_int, _module_number: c_int) -> ZendResult {
return ZendResult::Success;
}

// reminder: this cannot be done in minit because of Apache forking model
{
/* It would be nice if this could be cheaper. OnceCell would be cheaper
* but it doesn't quite fit the model, as going back to uninitialized
* requires either a &mut or .take(), and neither works for us (unless
* we go for unsafe, which is what we are trying to avoid).
*/
let mut profiler = PROFILER.lock().unwrap();
if profiler.is_none() {
*profiler = Some(Profiler::new(system_settings))
}
};
SystemProfiler::init(system_settings);

if system_settings.profiling_enabled {
REQUEST_LOCALS.with(|cell| {
Expand Down Expand Up @@ -556,7 +541,7 @@ extern "C" fn rinit(_type: c_int, _module_number: c_int) -> ZendResult {
return;
}

if let Some(profiler) = PROFILER.lock().unwrap().as_ref() {
if let Some(profiler) = SystemProfiler::get() {
let interrupt = VmInterrupt {
interrupt_count_ptr: &locals.interrupt_count as *const AtomicU32,
engine_ptr: locals.vm_interrupt_addr,
Expand Down Expand Up @@ -612,7 +597,7 @@ extern "C" fn rshutdown(_type: c_int, _module_number: c_int) -> ZendResult {
// wall-time is not expected to ever be disabled, except in testing,
// and we don't need to optimize for that.
if system_settings.profiling_enabled {
if let Some(profiler) = PROFILER.lock().unwrap().as_ref() {
if let Some(profiler) = SystemProfiler::get() {
let interrupt = VmInterrupt {
interrupt_count_ptr: &locals.interrupt_count,
engine_ptr: locals.vm_interrupt_addr,
Expand Down Expand Up @@ -822,10 +807,7 @@ extern "C" fn mshutdown(_type: c_int, _module_number: c_int) -> ZendResult {
#[cfg(feature = "exception_profiling")]
exception::exception_profiling_mshutdown();

let mut profiler = PROFILER.lock().unwrap();
if let Some(profiler) = profiler.as_mut() {
profiler.stop(Duration::from_secs(1));
}
SystemProfiler::stop(Duration::from_secs(1));

ZendResult::Success
}
Expand Down Expand Up @@ -859,10 +841,7 @@ extern "C" fn shutdown(_extension: *mut ZendExtension) {
#[cfg(debug_assertions)]
trace!("shutdown({:p})", _extension);

let mut profiler = PROFILER.lock().unwrap();
if let Some(profiler) = profiler.take() {
profiler.shutdown(Duration::from_secs(2));
}
SystemProfiler::shutdown(Duration::from_secs(2));

// SAFETY: calling in shutdown before zai config is shutdown, and after
// all configuration is done being accessed. Well... in the happy-path,
Expand Down Expand Up @@ -891,7 +870,7 @@ fn notify_trace_finished(local_root_span_id: u64, span_type: Cow<str>, resource:
return;
}

if let Some(profiler) = PROFILER.lock().unwrap().as_ref() {
if let Some(profiler) = SystemProfiler::get() {
let message = LocalRootSpanResourceMessage {
local_root_span_id,
resource: resource.into_owned(),
Expand Down
18 changes: 7 additions & 11 deletions profiling/src/pcntl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@ use crate::bindings::{
datadog_php_install_handler, datadog_php_zif_handler, zend_execute_data, zend_long, zval,
InternalFunctionHandler,
};
use crate::{config, PROFILER};
use crate::{config, SystemProfiler};
use ddcommon::cstr;
use log::{error, warn};
use std::ffi::CStr;
use std::mem::forget;
use std::ptr;

static mut PCNTL_FORK_HANDLER: InternalFunctionHandler = None;
Expand Down Expand Up @@ -62,22 +61,21 @@ unsafe fn handle_fork(
// Hold mutexes across the handler. If there are any spurious wakeups by
// the threads while the fork is occurring, they cannot acquire locks
// since this thread holds them, preventing a deadlock situation.
let mut profiler_lock = PROFILER.lock().unwrap();
if let Some(profiler) = profiler_lock.as_ref() {
profiler.fork_prepare();
if let Some(profiler) = SystemProfiler::get() {
let _ = profiler.fork_prepare();
}

match dispatch(handler, execute_data, return_value) {
Ok(ForkId::Parent) => {
if let Some(profiler) = profiler_lock.as_ref() {
if let Some(profiler) = SystemProfiler::get() {
profiler.post_fork_parent();
}
return;
}
Ok(ForkId::Child) => { /* fallthrough */ }
Err(ForkError::NullRetval) => {
// Skip error message if no profiler.
if profiler_lock.is_some() {
if SystemProfiler::get().is_some() {
error!(
"Failed to read return value of forking function. A crash or deadlock may occur."
);
Expand All @@ -87,7 +85,7 @@ unsafe fn handle_fork(

Err(ForkError::ZvalType(r#type)) => {
// Skip error message if no profiler.
if profiler_lock.is_some() {
if SystemProfiler::get().is_some() {
warn!(
"Return type of forking function was unexpected: {type}. A crash or deadlock may occur."
);
Expand All @@ -101,9 +99,7 @@ unsafe fn handle_fork(
// 2. Something went wrong, and disable it to be safe.
// And then leak the old profiler. Its drop method is not safe to run in
// these situations.
if let Some(profiler) = profiler_lock.take() {
forget(profiler);
}
SystemProfiler::kill();

alloc_prof_rshutdown();

Expand Down
46 changes: 23 additions & 23 deletions profiling/src/profiling/mod.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
mod interrupts;
mod sample_type_filter;
pub mod stack_walking;
mod system_profiler;
mod thread_utils;
mod uploader;

pub use interrupts::*;
pub use sample_type_filter::*;
pub use stack_walking::*;
pub use system_profiler::*;
use uploader::*;

#[cfg(all(php_has_fibers, not(test)))]
Expand All @@ -25,7 +27,7 @@ use datadog_profiling::api::{
};
use datadog_profiling::exporter::Tag;
use datadog_profiling::internal::Profile as InternalProfile;
use log::{debug, error, info, trace, warn};
use log::{debug, info, trace, warn};
use std::borrow::Cow;
use std::collections::HashMap;
use std::hash::Hash;
Expand Down Expand Up @@ -510,6 +512,9 @@ pub enum UploadMessage {
#[cfg(feature = "timeline")]
const COW_EVAL: Cow<str> = Cow::Borrowed("[eval]");

const DDPROF_TIME: &str = "ddprof_time";
const DDPROF_UPLOAD: &str = "ddprof_upload";

impl Profiler {
pub fn new(system_settings: &mut SystemSettings) -> Self {
let fork_barrier = Arc::new(Barrier::new(3));
Expand All @@ -532,21 +537,19 @@ impl Profiler {
Utc::now(),
);

let ddprof_time = "ddprof_time";
let ddprof_upload = "ddprof_upload";
let sample_types_filter = SampleTypeFilter::new(system_settings);
Profiler {
fork_barrier,
interrupt_manager,
message_sender,
upload_sender,
time_collector_handle: thread_utils::spawn(ddprof_time, move || {
time_collector_handle: thread_utils::spawn(DDPROF_TIME, move || {
time_collector.run();
trace!("thread {ddprof_time} complete, shutting down");
trace!("thread {DDPROF_TIME} complete, shutting down");
}),
uploader_handle: thread_utils::spawn(ddprof_upload, move || {
uploader_handle: thread_utils::spawn(DDPROF_UPLOAD, move || {
uploader.run();
trace!("thread {ddprof_upload} complete, shutting down");
trace!("thread {DDPROF_UPLOAD} complete, shutting down");
}),
should_join: AtomicBool::new(true),
sample_types_filter,
Expand Down Expand Up @@ -576,22 +579,26 @@ impl Profiler {
}

/// Call before a fork, on the thread of the parent process that will fork.
pub fn fork_prepare(&self) {
// Send the message to the uploader first, as it has a longer worst-
pub fn fork_prepare(&self) -> anyhow::Result<()> {
// Send the message to the uploader first, as it has a longer worst
// case time to wait.
let uploader_result = self.upload_sender.send(UploadMessage::Pause);
let profiler_result = self.message_sender.send(ProfilerMessage::Pause);

// todo: handle fails more gracefully, but it's tricky to sync 3
// threads, any of which could have crashed or be delayed. This
// could also deadlock.
match (uploader_result, profiler_result) {
(Ok(_), Ok(_)) => {
self.fork_barrier.wait();
Ok(())
}
(Err(err), Ok(_)) => {
anyhow::bail!("failed to prepare {DDPROF_UPLOAD} thread for forking: {err}")
}
(_, _) => {
error!("failed to prepare the profiler for forking, a deadlock could occur")
(Ok(_), Err(err)) => {
anyhow::bail!("failed to prepare {DDPROF_TIME} thread for forking: {err}")
}
(Err(_), Err(_)) => anyhow::bail!(
"failed to prepare both {DDPROF_UPLOAD} and {DDPROF_TIME} threads for forking"
),
}
}

Expand All @@ -618,11 +625,7 @@ impl Profiler {
.map_err(Box::new)
}

/// Begins the shutdown process. To complete it, call [Profiler::shutdown].
/// Note that you must call [Profiler::shutdown] afterwards; it's two
/// parts of the same operation. It's split so you (or other extensions)
/// can do something while the other threads finish up.
pub fn stop(&mut self, timeout: Duration) {
pub fn join_and_drop_sender(&mut self, timeout: Duration) {
debug!("Stopping profiler.");

let sent = match self
Expand Down Expand Up @@ -650,10 +653,7 @@ impl Profiler {
std::mem::swap(&mut self.upload_sender, &mut empty_sender);
}

/// Completes the shutdown process; to start it, call [Profiler::stop]
/// before calling [Profiler::shutdown].
/// Note the timeout is per thread, and there may be multiple threads.
pub fn shutdown(self, timeout: Duration) {
pub fn join_collector_and_uploader(self, timeout: Duration) {
if self.should_join.load(Ordering::SeqCst) {
thread_utils::join_timeout(
self.time_collector_handle,
Expand Down
87 changes: 87 additions & 0 deletions profiling/src/profiling/system_profiler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
use super::{InterruptManager, Profiler, SystemSettings};
use once_cell::sync::OnceCell;
use std::mem::forget;
use std::sync::Arc;
use std::time::Duration;

/// The global profiler. Profiler gets made during the first rinit after an
/// minit, and is destroyed on mshutdown.
static mut PROFILER: SystemProfiler = SystemProfiler::new();

pub struct SystemProfiler(OnceCell<Profiler>);

impl SystemProfiler {
pub const fn new() -> Self {
SystemProfiler(OnceCell::new())
}

/// Initializes the system profiler safely by one thread.
pub fn init(system_settings: &mut SystemSettings) {
// SAFETY: the `get_or_init` access is a thread-safe API, and the
// PROFILER is not being mutated outside single-threaded phases such
// as minit and mshutdown.
unsafe { PROFILER.0.get_or_init(|| Profiler::new(system_settings)) };
}

pub fn get() -> Option<&'static Profiler> {
// SAFETY: the `get` access is a thread-safe API, and the PROFILER is
// not being mutated outside single-threaded phases such as minit and
// mshutdown.
unsafe { PROFILER.0.get() }
}

/// Begins the shutdown process. To complete it, call [Profiler::shutdown].
/// Note that you must call [Profiler::shutdown] afterward; it's two
/// parts of the same operation. It's split so you (or other extensions)
/// can do something while the other threads finish up.
pub fn stop(timeout: Duration) {
// SAFETY: the `get_mut` access is a thread-safe API, and the PROFILER
// is not being mutated outside single-threaded phases such as minit
// and mshutdown.
if let Some(profiler) = unsafe { PROFILER.0.get_mut() } {
profiler.join_and_drop_sender(timeout);
}
}

/// Completes the shutdown process; to start it, call [Profiler::stop]
/// before calling [Profiler::shutdown].
/// Note the timeout is per thread, and there may be multiple threads.
///
/// Safety: only safe to be called in `SHUTDOWN`/`MSHUTDOWN` phase
pub fn shutdown(timeout: Duration) {
// SAFETY: the `take` access is a thread-safe API, and the PROFILER is
// not being mutated outside single-threaded phases such as minit and
// mshutdown.
if let Some(profiler) = unsafe { PROFILER.0.take() } {
profiler.join_collector_and_uploader(timeout);
}
}

/// Throws away the profiler and moves it to uninitialized.
///
/// In a forking situation, the currently active profiler may not be valid
/// because it has join handles and other state shared by other threads,
/// and threads are not copied when the process is forked.
/// Additionally, if we've hit certain other issues like not being able to
/// determine the return type of the pcntl_fork function, we don't know if
/// we're the parent or child.
/// So, we throw away the current profiler and forget it, which avoids
/// running the destructor. Yes, this will leak some memory.
///
/// # Safety
/// Must be called when no other thread is using the PROFILER object. That
/// includes this thread in some kind of recursive manner.
pub unsafe fn kill() {
// SAFETY: see this function's safety conditions.
if let Some(mut profiler) = PROFILER.0.take() {
// Drop some things to reduce memory.
profiler.interrupt_manager = Arc::new(InterruptManager::new());
profiler.message_sender = crossbeam_channel::bounded(0).0;
profiler.upload_sender = crossbeam_channel::bounded(0).0;

// But we're not 100% sure everything is safe to drop, notably the
// join handles, so we leak the rest.
forget(profiler)
}
}
}
Loading
Loading