Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

shim: add tracing macros #269

Merged
merged 5 commits into from
May 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions crates/shim/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ async = [
"signal-hook-tokio",
"tokio",
]
tracing = ["dep:tracing"]
docs = []

[[example]]
Expand Down Expand Up @@ -54,6 +55,9 @@ serde_json.workspace = true
thiserror.workspace = true
time.workspace = true

# tracing
tracing = { version = "0.1", optional = true }

# Async dependencies
async-trait = { workspace = true, optional = true }
futures = { workspace = true, optional = true }
Expand Down
1 change: 1 addition & 0 deletions crates/shim/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ pub struct Flags {
}

/// Parses command line arguments passed to the shim.
#[cfg_attr(feature = "tracing", tracing::instrument(skip_all, level = "Info"))]
pub fn parse<S: AsRef<OsStr>>(args: &[S]) -> Result<Flags> {
let mut flags = Flags::default();

Expand Down
9 changes: 9 additions & 0 deletions crates/shim/src/asynchronous/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ pub trait Shim {
}

/// Async Shim entry point that must be invoked from tokio `main`.
#[cfg_attr(feature = "tracing", tracing::instrument(level = "info"))]
pub async fn run<T>(runtime_id: &str, opts: Option<Config>)
where
T: Shim + Send + Sync + 'static,
Expand All @@ -109,6 +110,7 @@ where
}
}

#[cfg_attr(feature = "tracing", tracing::instrument(level = "info"))]
async fn bootstrap<T>(runtime_id: &str, opts: Option<Config>) -> Result<()>
where
T: Shim + Send + Sync + 'static,
Expand Down Expand Up @@ -239,6 +241,7 @@ impl ExitSignal {

/// Spawn is a helper func to launch shim process asynchronously.
/// Typically this expected to be called from `StartShim`.
#[cfg_attr(feature = "tracing", tracing::instrument(level = "info"))]
pub async fn spawn(opts: StartOpts, grouping: &str, vars: Vec<(&str, &str)>) -> Result<String> {
let cmd = env::current_exe().map_err(io_error!(e, ""))?;
let cwd = env::current_dir().map_err(io_error!(e, ""))?;
Expand Down Expand Up @@ -299,6 +302,7 @@ pub async fn spawn(opts: StartOpts, grouping: &str, vars: Vec<(&str, &str)>) ->
Ok(address)
}

#[cfg_attr(feature = "tracing", tracing::instrument(skip_all, level = "info"))]
fn setup_signals_tokio(config: &Config) -> Signals {
if config.no_reaper {
Signals::new([SIGTERM, SIGINT, SIGPIPE]).expect("new signal failed")
Expand All @@ -307,6 +311,7 @@ fn setup_signals_tokio(config: &Config) -> Signals {
}
}

#[cfg_attr(feature = "tracing", tracing::instrument(skip_all, level = "info"))]
async fn handle_signals(signals: Signals) {
let mut signals = signals.fuse();
while let Some(sig) = signals.next().await {
Expand Down Expand Up @@ -360,12 +365,14 @@ async fn handle_signals(signals: Signals) {
}
}

#[cfg_attr(feature = "tracing", tracing::instrument(level = "info"))]
async fn remove_socket_silently(address: &str) {
remove_socket(address)
.await
.unwrap_or_else(|e| warn!("failed to remove socket: {}", e))
}

#[cfg_attr(feature = "tracing", tracing::instrument(level = "info"))]
async fn remove_socket(address: &str) -> Result<()> {
let path = parse_sockaddr(address);
if let Ok(md) = Path::new(path).metadata() {
Expand All @@ -380,6 +387,7 @@ async fn remove_socket(address: &str) -> Result<()> {
Ok(())
}

#[cfg_attr(feature = "tracing", tracing::instrument(skip_all, level = "info"))]
async fn start_listener(address: &str) -> Result<UnixListener> {
let addr = address.to_string();
asyncify(move || -> Result<UnixListener> {
Expand All @@ -391,6 +399,7 @@ async fn start_listener(address: &str) -> Result<UnixListener> {
.await
}

#[cfg_attr(feature = "tracing", tracing::instrument(level = "info"))]
async fn wait_socket_working(address: &str, interval_in_ms: u64, count: u32) -> Result<()> {
for _i in 0..count {
match Client::connect(address) {
Expand Down
10 changes: 10 additions & 0 deletions crates/shim/src/cgroup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use crate::error::{Error, Result};
// OOM_SCORE_ADJ_MAX is from https://github.com/torvalds/linux/blob/master/include/uapi/linux/oom.h#L10
const OOM_SCORE_ADJ_MAX: i64 = 1000;

#[cfg_attr(feature = "tracing", tracing::instrument(level = "Info"))]
pub fn set_cgroup_and_oom_score(pid: u32) -> Result<()> {
if pid == 0 {
return Ok(());
Expand All @@ -62,6 +63,7 @@ pub fn set_cgroup_and_oom_score(pid: u32) -> Result<()> {
}

/// Add a process to the given relative cgroup path
#[cfg_attr(feature = "tracing", tracing::instrument(level = "Info"))]
pub fn add_task_to_cgroup(path: &str, pid: u32) -> Result<()> {
let h = hierarchies::auto();
// use relative path here, need to trim prefix '/'
Expand All @@ -74,6 +76,7 @@ pub fn add_task_to_cgroup(path: &str, pid: u32) -> Result<()> {

/// Sets the OOM score for the process to the parents OOM score + 1
/// to ensure that they parent has a lower score than the shim
#[cfg_attr(feature = "tracing", tracing::instrument(level = "Info"))]
pub fn adjust_oom_score(pid: u32) -> Result<()> {
let score = read_process_oom_score(std::os::unix::process::parent_id())?;
if score < OOM_SCORE_ADJ_MAX {
Expand All @@ -82,6 +85,7 @@ pub fn adjust_oom_score(pid: u32) -> Result<()> {
Ok(())
}

#[cfg_attr(feature = "tracing", tracing::instrument(level = "info"))]
fn read_process_oom_score(pid: u32) -> Result<i64> {
let content = fs::read_to_string(format!("/proc/{}/oom_score_adj", pid))
.map_err(io_error!(e, "read oom score"))?;
Expand All @@ -92,12 +96,14 @@ fn read_process_oom_score(pid: u32) -> Result<i64> {
Ok(score)
}

#[cfg_attr(feature = "tracing", tracing::instrument(level = "info"))]
fn write_process_oom_score(pid: u32, score: i64) -> Result<()> {
fs::write(format!("/proc/{}/oom_score_adj", pid), score.to_string())
.map_err(io_error!(e, "write oom score"))
}

/// Collect process cgroup stats, return only necessary parts of it
#[cfg_attr(feature = "tracing", tracing::instrument(level = "info"))]
pub fn collect_metrics(pid: u32) -> Result<Metrics> {
let mut metrics = Metrics::new();

Expand Down Expand Up @@ -179,6 +185,7 @@ pub fn collect_metrics(pid: u32) -> Result<Metrics> {
}

// get_cgroup will return either cgroup v1 or v2 depending on system configuration
#[cfg_attr(feature = "tracing", tracing::instrument(level = "info"))]
fn get_cgroup(pid: u32) -> Result<Cgroup> {
let hierarchies = hierarchies::auto();
let cgroup = if hierarchies.v2() {
Expand All @@ -194,6 +201,7 @@ fn get_cgroup(pid: u32) -> Result<Cgroup> {
}

/// Get the cgroups v2 path given a PID
#[cfg_attr(feature = "tracing", tracing::instrument(level = "info"))]
pub fn get_cgroups_v2_path_by_pid(pid: u32) -> Result<PathBuf> {
// todo: should upstream to cgroups-rs
let path = format!("/proc/{}/cgroup", pid);
Expand All @@ -207,6 +215,7 @@ pub fn get_cgroups_v2_path_by_pid(pid: u32) -> Result<PathBuf> {
}

// https://github.com/opencontainers/runc/blob/1950892f69597aa844cbf000fbdf77610dda3a44/libcontainer/cgroups/fs2/defaultpath.go#L83
#[cfg_attr(feature = "tracing", tracing::instrument(level = "info"))]
fn parse_cgroups_v2_path(content: &str) -> Result<PathBuf> {
// the entry for cgroup v2 is always in the format like `0::$PATH`
// where 0 is the hierarchy ID, the controller name is omitted in cgroup v2
Expand All @@ -222,6 +231,7 @@ fn parse_cgroups_v2_path(content: &str) -> Result<PathBuf> {
}

/// Update process cgroup limits
#[cfg_attr(feature = "tracing", tracing::instrument(level = "info"))]
pub fn update_resources(pid: u32, resources: &LinuxResources) -> Result<()> {
// get container main process cgroup
let cgroup = get_cgroup(pid)?;
Expand Down
2 changes: 2 additions & 0 deletions crates/shim/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub use protos::{
shim::shim::DeleteResponse,
ttrpc::{context::Context, Result as TtrpcResult},
};

#[cfg(unix)]
ioctl_write_ptr_bad!(ioctl_set_winsz, libc::TIOCSWINSZ, libc::winsize);

Expand Down Expand Up @@ -167,6 +168,7 @@ pub const SOCKET_ROOT: &str = "/var/run/containerd";
pub const SOCKET_ROOT: &str = r"\\.\pipe\containerd-containerd";

/// Make socket path from containerd socket path, namespace and id.
#[cfg_attr(feature = "tracing", tracing::instrument(level = "Info"))]
pub fn socket_address(socket_path: &str, namespace: &str, id: &str) -> String {
let path = PathBuf::from(socket_path)
.join(namespace)
Expand Down
9 changes: 9 additions & 0 deletions crates/shim/src/synchronous/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ pub trait Shim {
}

/// Shim entry point that must be invoked from `main`.
#[cfg_attr(feature = "tracing", tracing::instrument(level = "info"))]
pub fn run<T>(runtime_id: &str, opts: Option<Config>)
where
T: Shim + Send + Sync + 'static,
Expand All @@ -192,6 +193,7 @@ where
}
}

#[cfg_attr(feature = "tracing", tracing::instrument(level = "info"))]
fn bootstrap<T>(runtime_id: &str, opts: Option<Config>) -> Result<()>
where
T: Shim + Send + Sync + 'static,
Expand Down Expand Up @@ -289,6 +291,7 @@ where
}
}

#[cfg_attr(feature = "tracing", tracing::instrument(skip_all, level = "info"))]
fn create_server(_flags: args::Flags) -> Result<Server> {
let mut server = Server::new();

Expand All @@ -306,6 +309,7 @@ fn create_server(_flags: args::Flags) -> Result<Server> {
Ok(server)
}

#[cfg_attr(feature = "tracing", tracing::instrument(skip_all, level = "info"))]
fn setup_signals(_config: &Config) -> Option<AppSignals> {
#[cfg(unix)]
{
Expand Down Expand Up @@ -341,6 +345,7 @@ unsafe extern "system" fn signal_handler(_: u32) -> i32 {
1
}

#[cfg_attr(feature = "tracing", tracing::instrument(skip_all, level = "info"))]
fn handle_signals(mut _signals: Option<AppSignals>) {
#[cfg(unix)]
{
Expand Down Expand Up @@ -402,6 +407,7 @@ fn handle_signals(mut _signals: Option<AppSignals>) {
}
}

#[cfg_attr(feature = "tracing", tracing::instrument(level = "info"))]
fn wait_socket_working(address: &str, interval_in_ms: u64, count: u32) -> Result<()> {
for _i in 0..count {
match Client::connect(address) {
Expand All @@ -416,10 +422,12 @@ fn wait_socket_working(address: &str, interval_in_ms: u64, count: u32) -> Result
Err(other!("time out waiting for socket {}", address))
}

#[cfg_attr(feature = "tracing", tracing::instrument(level = "info"))]
fn remove_socket_silently(address: &str) {
remove_socket(address).unwrap_or_else(|e| warn!("failed to remove file {} {:?}", address, e))
}

#[cfg_attr(feature = "tracing", tracing::instrument(level = "info"))]
fn remove_socket(address: &str) -> Result<()> {
#[cfg(unix)]
{
Expand Down Expand Up @@ -448,6 +456,7 @@ fn remove_socket(address: &str) -> Result<()> {

/// Spawn is a helper func to launch shim process.
/// Typically this expected to be called from `StartShim`.
#[cfg_attr(feature = "tracing", tracing::instrument(level = "info"))]
pub fn spawn(opts: StartOpts, grouping: &str, vars: Vec<(&str, &str)>) -> Result<(u32, String)> {
let cmd = env::current_exe().map_err(io_error!(e, ""))?;
let cwd = env::current_dir().map_err(io_error!(e, ""))?;
Expand Down
Loading