Skip to content

Commit

Permalink
feat: Enabling tracking the execution phase of a server
Browse files Browse the repository at this point in the history
Adds a way for subcribers to track the execution phase of a Server.

The new method `Server::watch_execution_phase` returns a broadcast
channel that will receive ExecutionPhase messages on each phase transition.

Example use cases:

* A /health endpoint that reports the current execution phase
  (only interesting for shutdown)
* Producing metrics for the various phase transitions
  • Loading branch information
theduke committed Jan 11, 2025
1 parent 42e11c4 commit 521b794
Show file tree
Hide file tree
Showing 3 changed files with 238 additions and 7 deletions.
123 changes: 116 additions & 7 deletions pingora-core/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use std::sync::Arc;
use std::thread;
#[cfg(unix)]
use tokio::signal::unix;
use tokio::sync::{watch, Mutex};
use tokio::sync::{broadcast, watch, Mutex};
use tokio::time::{sleep, Duration};

use crate::services::Service;
Expand All @@ -53,6 +53,49 @@ enum ShutdownType {
Quick,
}

/// The execution phase the server is currently in.
#[derive(Clone, Debug)]
#[non_exhaustive]
pub enum ExecutionPhase {
/// The server was created, but has not started yet.
Setup,

/// Services are being prepared.
///
/// During graceful upgrades this phase acquires the listening FDs from the old process.
Bootstrap,

/// Bootstrap has finished, listening FDs have been transferred.
BootstrapComplete,

/// The server is running and is listening for shutdown signals.
Running,

/// A QUIT signal was received, indicating that a new process wants to take over.
///
/// The server is trying to send the fds to the new process over a Unix socket.
GracefulUpgradeTransferringFds,

/// FDs have been sent to the new process.
/// Waiting a fixed amount of time to allow the new process to take the sockets.
GracefulUpgradeCloseTimeout,

/// A TERM signal was received, indicating that the server should shut down gracefully.
GracefulTerminate,

/// The server is shutting down.
ShutdownStarted,

/// Waiting for the configured grace period to end before shutting down.
ShutdownGracePeriod,

/// Wait for runtimes to finish.
ShutdownRuntimes,

/// The server has stopped.
Terminated,
}

/// The receiver for server's shutdown event. The value will turn to true once the server starts
/// to shutdown
pub type ShutdownWatch = watch::Receiver<bool>;
Expand All @@ -71,6 +114,12 @@ pub struct Server {
shutdown_watch: watch::Sender<bool>,
// TODO: we many want to drop this copy to let sender call closed()
shutdown_recv: ShutdownWatch,

/// Tracks the execution phase of the server during upgrades and graceful shutdowns.
///
/// Users can subscribe to the phase with [`Self::watch_execution_phase()`].
execution_phase_watch: broadcast::Sender<ExecutionPhase>,

/// The parsed server configuration
pub configuration: Arc<ServerConf>,
/// The parser command line options
Expand All @@ -86,13 +135,25 @@ pub struct Server {
// TODO: delete the pid when exit

impl Server {
/// Acquire a receiver for the server's execution phase.
///
/// The receiver will produce values for each transition.
pub fn watch_execution_phase(&self) -> broadcast::Receiver<ExecutionPhase> {
self.execution_phase_watch.subscribe()
}

#[cfg(unix)]
async fn main_loop(&self) -> ShutdownType {
// waiting for exit signal
// TODO: there should be a signal handling function
let mut graceful_upgrade_signal = unix::signal(unix::SignalKind::quit()).unwrap();
let mut graceful_terminate_signal = unix::signal(unix::SignalKind::terminate()).unwrap();
let mut fast_shutdown_signal = unix::signal(unix::SignalKind::interrupt()).unwrap();

self.execution_phase_watch
.send(ExecutionPhase::Running)
.ok();

tokio::select! {
_ = fast_shutdown_signal.recv() => {
info!("SIGINT received, exiting");
Expand All @@ -110,12 +171,18 @@ impl Server {
}
}
info!("Broadcast graceful shutdown complete");

self.execution_phase_watch.send(ExecutionPhase::GracefulTerminate).ok();

ShutdownType::Graceful
}
_ = graceful_upgrade_signal.recv() => {
// TODO: still need to select! on signals in case a fast shutdown is needed
// aka: move below to another task and only kick it off here
info!("SIGQUIT received, sending socks and gracefully exiting");

self.execution_phase_watch.send(ExecutionPhase::GracefulUpgradeTransferringFds).ok();

if let Some(fds) = &self.listen_fds {
let fds = fds.lock().await;
info!("Trying to send socks");
Expand All @@ -131,6 +198,7 @@ impl Server {
sentry::capture_error(&e);
}
}
self.execution_phase_watch.send(ExecutionPhase::GracefulUpgradeCloseTimeout).ok();
sleep(Duration::from_secs(CLOSE_TIMEOUT)).await;
info!("Broadcasting graceful shutdown");
// gracefully exiting
Expand Down Expand Up @@ -211,6 +279,7 @@ impl Server {
listen_fds: None,
shutdown_watch: tx,
shutdown_recv: rx,
execution_phase_watch: broadcast::channel(100).0,
configuration: Arc::new(conf),
options: opt,
#[cfg(feature = "sentry")]
Expand Down Expand Up @@ -253,6 +322,7 @@ impl Server {
listen_fds: None,
shutdown_watch: tx,
shutdown_recv: rx,
execution_phase_watch: broadcast::channel(100).0,
configuration: Arc::new(conf),
options: opt,
#[cfg(feature = "sentry")]
Expand Down Expand Up @@ -280,6 +350,10 @@ impl Server {
info!("Bootstrap starting");
debug!("{:#?}", self.options);

self.execution_phase_watch
.send(ExecutionPhase::Bootstrap)
.ok();

/* only init sentry in release builds */
#[cfg(all(not(debug_assertions), feature = "sentry"))]
let _guard = self.sentry.as_ref().map(|opts| sentry::init(opts.clone()));
Expand All @@ -304,16 +378,23 @@ impl Server {
std::process::exit(1);
}
}

self.execution_phase_watch
.send(ExecutionPhase::BootstrapComplete)
.ok();
}

/// Start the server
/// Run the server until execution finished.
///
/// This function will block forever until the server needs to quit. So this would be the last
/// function to call for this object.
/// This function will run until the server has been instructed to shut down
/// through a signal, and will then wait for all services to finish and
/// runtimes to exit.
///
/// Note: this function may fork the process for daemonization, so any additional threads created
/// before this function will be lost to any service logic once this function is called.
pub fn run_forever(mut self) -> ! {
/// Note: if daemonization is enabled in the config, this function will
/// never return.
/// Instead it will either start the daemon process and exit, or panic
/// if daemonization fails.
pub fn run(mut self) {
info!("Server starting");

let conf = self.configuration.as_ref();
Expand Down Expand Up @@ -358,7 +439,15 @@ impl Server {
#[cfg(windows)]
let shutdown_type = ShutdownType::Graceful;

self.execution_phase_watch
.send(ExecutionPhase::ShutdownStarted)
.ok();

if matches!(shutdown_type, ShutdownType::Graceful) {
self.execution_phase_watch
.send(ExecutionPhase::ShutdownGracePeriod)
.ok();

let exit_timeout = self
.configuration
.as_ref()
Expand All @@ -379,6 +468,11 @@ impl Server {
.unwrap_or(5),
),
};

self.execution_phase_watch
.send(ExecutionPhase::ShutdownRuntimes)
.ok();

let shutdowns: Vec<_> = runtimes
.into_iter()
.map(|rt| {
Expand All @@ -395,6 +489,21 @@ impl Server {
}
}
info!("All runtimes exited, exiting now");

self.execution_phase_watch
.send(ExecutionPhase::Terminated)
.ok();
}

/// Start the server
///
/// This function will block forever until the server needs to quit. So this would be the last
/// function to call for this object.
///
/// Note: this function may fork the process for daemonization, so any additional threads created
/// before this function will be lost to any service logic once this function is called.
pub fn run_forever(self) -> ! {
self.run();
std::process::exit(0)
}

Expand Down
53 changes: 53 additions & 0 deletions pingora-core/tests/execution_phase_monitor_fast_shutdown.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// NOTE: This test sends a shutdown signal to itself,
// so it needs to be in an isolated test to prevent concurrency.

use pingora_core::server::{ExecutionPhase, Server};

// Ensure that execution phases are reported correctly.
#[test]
fn test_server_execution_phase_monitor_fast_shutdown() {
let mut server = Server::new(None).unwrap();

let mut phase = server.watch_execution_phase();

let join = std::thread::spawn(move || {
server.bootstrap();
server.run();
});

assert!(matches!(
phase.blocking_recv().unwrap(),
ExecutionPhase::Bootstrap
));
assert!(matches!(
phase.blocking_recv().unwrap(),
ExecutionPhase::BootstrapComplete,
));
assert!(matches!(
phase.blocking_recv().unwrap(),
ExecutionPhase::Running,
));

// Need to wait for startup, otherwise the signal handler is not
// installed yet.
unsafe {
libc::raise(libc::SIGINT);
}

assert!(matches!(
phase.blocking_recv().unwrap(),
ExecutionPhase::ShutdownStarted,
));

assert!(matches!(
phase.blocking_recv().unwrap(),
ExecutionPhase::ShutdownRuntimes,
));

join.join().unwrap();

assert!(matches!(
phase.blocking_recv().unwrap(),
ExecutionPhase::Terminated,
));
}
69 changes: 69 additions & 0 deletions pingora-core/tests/execution_phase_monitor_graceful_shutdown.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// NOTE: This test sends a shutdown signal to itself,
// so it needs to be in an isolated test to prevent concurrency.

use pingora_core::server::{configuration::ServerConf, ExecutionPhase, Server};

// Ensure that execution phases are reported correctly.
#[test]
fn test_server_execution_phase_monitor_graceful_shutdown() {
let conf = ServerConf {
// Use small timeouts to speed up the test.
grace_period_seconds: Some(1),
graceful_shutdown_timeout_seconds: Some(1),
..Default::default()
};
let mut server = Server::new_with_opt_and_conf(None, conf);

let mut phase = server.watch_execution_phase();

let join = std::thread::spawn(move || {
server.bootstrap();
server.run();
});

assert!(matches!(
phase.blocking_recv().unwrap(),
ExecutionPhase::Bootstrap
));
assert!(matches!(
phase.blocking_recv().unwrap(),
ExecutionPhase::BootstrapComplete,
));
assert!(matches!(
phase.blocking_recv().unwrap(),
ExecutionPhase::Running,
));

// Need to wait for startup, otherwise the signal handler is not
// installed yet.
unsafe {
libc::raise(libc::SIGTERM);
}

assert!(matches!(
phase.blocking_recv().unwrap(),
ExecutionPhase::GracefulTerminate,
));

assert!(matches!(
phase.blocking_recv().unwrap(),
ExecutionPhase::ShutdownStarted,
));

assert!(matches!(
phase.blocking_recv().unwrap(),
ExecutionPhase::ShutdownGracePeriod,
));

assert!(matches!(
dbg!(phase.blocking_recv().unwrap()),
ExecutionPhase::ShutdownRuntimes,
));

join.join().unwrap();

assert!(matches!(
phase.blocking_recv().unwrap(),
ExecutionPhase::Terminated,
));
}

0 comments on commit 521b794

Please sign in to comment.