Skip to content

Commit

Permalink
Address PR feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
wbjin committed Nov 5, 2024
1 parent d3d819f commit b9b93cd
Show file tree
Hide file tree
Showing 9 changed files with 145 additions and 118 deletions.
1 change: 0 additions & 1 deletion zeusd/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,5 @@ paste = "1"
once_cell = "1.7.2"

[dev-dependencies]
once_cell = "1.7.2"
reqwest = { version = "0.11", default-features = false, features = ["json"] }
serde_json = "1"
138 changes: 77 additions & 61 deletions zeusd/src/devices/cpu/linux.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
//! CPU power measurement with RAPL. Only supported on Linux.

use once_cell::sync::OnceCell;
use std::fs;
use std::io::Read;
use std::path::PathBuf;
use std::path::{Path, PathBuf};
use std::string::String;
use std::sync::{Arc, RwLock};

use once_cell::sync::OnceCell;
use tokio::io::AsyncReadExt;
use tokio::task::JoinHandle;
use tokio::time::{sleep, Duration};
Expand All @@ -16,32 +15,29 @@ use crate::error::ZeusdError;

// NOTE: To support Zeusd deployment in a docker container, this should support
// sysfs mounts under places like `/zeus_sys`.
// static RAPL_DIR: &'static str = "/sys/class/powercap/intel-rapl";
static RAPL_DIR: &'static str = "./dummy-rapl";
static RAPL_DIR: &str = "/sys/class/powercap/intel-rapl";

pub struct RaplCpu {
cpu: Arc<PackageInfo>,
dram: Arc<Option<PackageInfo>>,
dram: Option<Arc<PackageInfo>>,
cpu_monitoring_task: OnceCell<JoinHandle<Result<(), ZeusdError>>>,
dram_monitoring_task: OnceCell<JoinHandle<Result<(), ZeusdError>>>,
}

impl RaplCpu {
pub fn init(_index: usize) -> Result<Self, ZeusdError> {
let fields = RaplCpu::get_available_fields(_index)?;
let cpu = Arc::new(fields.0);
let dram = Arc::new(fields.1);
Ok(Self {
cpu,
dram,
cpu: fields.0,
dram: fields.1,
cpu_monitoring_task: OnceCell::new(),
dram_monitoring_task: OnceCell::new(),
})
}
}

impl PackageInfo {
pub fn new(base_path: &PathBuf, index: usize) -> anyhow::Result<Self, ZeusdError> {
pub fn new(base_path: &Path, index: usize) -> anyhow::Result<Self, ZeusdError> {
let cpu_name_path = base_path.join("name");
let cpu_energy_path = base_path.join("energy_uj");
let cpu_max_energy_path = base_path.join("max_energy_range_uj");
Expand All @@ -50,7 +46,7 @@ impl PackageInfo {
return Err(ZeusdError::CpuInitializationError(index));
}

let cpu_name = fs::read_to_string(&cpu_name_path)?;
let cpu_name = fs::read_to_string(&cpu_name_path)?.trim_end().to_string();
// Try reding from energy_uj file
read_u64(&cpu_energy_path)?;
let cpu_max_energy = read_u64(&cpu_max_energy_path)?;
Expand All @@ -72,15 +68,13 @@ impl CpuManager for RaplCpu {

match fs::read_dir(&base_path) {
Ok(entries) => {
for entry in entries {
if let Ok(entry) = entry {
let path = entry.path();
if path.is_dir() {
if let Some(dir_name_str) = path.file_name() {
let dir_name = dir_name_str.to_string_lossy();
if dir_name.contains("intel-rapl") {
index_count += 1;
}
for entry in entries.flatten() {
let path = entry.path();
if path.is_dir() {
if let Some(dir_name_str) = path.file_name() {
let dir_name = dir_name_str.to_string_lossy();
if dir_name.contains("intel-rapl") {
index_count += 1;
}
}
}
Expand All @@ -93,27 +87,27 @@ impl CpuManager for RaplCpu {
Ok(index_count)
}

fn get_available_fields(index: usize) -> Result<(PackageInfo, Option<PackageInfo>), ZeusdError> {
fn get_available_fields(
index: usize,
) -> Result<(Arc<PackageInfo>, Option<Arc<PackageInfo>>), ZeusdError> {
let base_path = PathBuf::from(format!("{}/intel-rapl:{}", RAPL_DIR, index));
let cpu_info = PackageInfo::new(&base_path, index)?;

let mut dram_info = None;
match fs::read_dir(&base_path) {
Ok(entries) => {
for entry in entries {
if let Ok(entry) = entry {
let path = entry.path();
if path.is_dir() {
if let Some(dir_name_str) = path.file_name() {
let dir_name = dir_name_str.to_string_lossy();
if dir_name.contains("intel-rapl") {
let subpackage_path = base_path.join(&*dir_name);
let subpackage_info =
PackageInfo::new(&subpackage_path, index)?;
if subpackage_info.name == "dram" {
dram_info = Some(subpackage_info);
break;
}
for entry in entries.flatten() {
let path = entry.path();
if path.is_dir() {
if let Some(dir_name_str) = path.file_name() {
let dir_name = dir_name_str.to_string_lossy();
if dir_name.contains("intel-rapl") {
let subpackage_path = base_path.join(&*dir_name);
let subpackage_info = PackageInfo::new(&subpackage_path, index)?;
if subpackage_info.name == "dram" {
return Ok((
Arc::new(cpu_info),
Some(Arc::new(subpackage_info)),
));
}
}
}
Expand All @@ -125,40 +119,62 @@ impl CpuManager for RaplCpu {
}
};

Ok((cpu_info, dram_info))
Ok((Arc::new(cpu_info), None))
}

fn get_cpu_energy(&self) -> anyhow::Result<u64, ZeusdError> {
fn get_cpu_energy(&self) -> Result<u64, ZeusdError> {
let handle = self
.cpu_monitoring_task
.get_or_init(|| tokio::spawn(monitor_rapl(Arc::clone(&self.cpu))));
if handle.is_finished() {
return Err(ZeusdError::CpuManagementTaskTerminatedError(
self.cpu.index.try_into().unwrap(),
));
return Err(ZeusdError::CpuManagementTaskTerminatedError(self.cpu.index));
}

let num_wraparounds_before = *self
.cpu
.num_wraparounds
.read()
.map_err(|_| ZeusdError::CpuManagementTaskTerminatedError(self.cpu.index))?;
let mut measurement = read_u64(&self.cpu.energy_uj_path)?;
let num_wraparounds = *self
.cpu
.num_wraparounds
.read()
.map_err(|_| ZeusdError::CpuManagementTaskTerminatedError(self.cpu.index))?;
if num_wraparounds != num_wraparounds_before {
// Wraparound has happened after measurement, take measurement again
measurement = read_u64(&self.cpu.energy_uj_path)?;
}

let measurement = read_u64(&self.cpu.energy_uj_path)?;
let num_wraparounds = self.cpu.num_wraparounds.read().unwrap();
Ok(measurement + *num_wraparounds * self.cpu.max_energy_uj)
Ok(measurement + num_wraparounds * self.cpu.max_energy_uj)
}

fn get_dram_energy(&self) -> Result<Option<u64>, ZeusdError> {
match &*self.dram {
None => Ok(None),
fn get_dram_energy(&self) -> Result<u64, ZeusdError> {
match &self.dram {
None => Err(ZeusdError::CpuManagementTaskTerminatedError(self.cpu.index)),
Some(dram) => {
let handle = self
.dram_monitoring_task
.get_or_init(|| tokio::spawn(monitor_rapl(Arc::clone(&self.cpu))));
.get_or_init(|| tokio::spawn(monitor_rapl(Arc::clone(dram))));
if handle.is_finished() {
return Err(ZeusdError::CpuManagementTaskTerminatedError(
self.cpu.index.try_into().unwrap(),
));
return Err(ZeusdError::CpuManagementTaskTerminatedError(dram.index));
}

let num_wraparounds_before = *dram
.num_wraparounds
.read()
.map_err(|_| ZeusdError::CpuManagementTaskTerminatedError(dram.index))?;
let mut measurement = read_u64(&dram.energy_uj_path)?;
let num_wraparounds = *dram
.num_wraparounds
.read()
.map_err(|_| ZeusdError::CpuManagementTaskTerminatedError(dram.index))?;
if num_wraparounds != num_wraparounds_before {
// Wraparound has happened after measurement, take measurement again
measurement = read_u64(&dram.energy_uj_path)?;
}

let measurement = read_u64(&dram.energy_uj_path)?;
let num_wraparounds = dram.num_wraparounds.read().unwrap();
Ok(Some(measurement + *num_wraparounds * dram.max_energy_uj))
Ok(measurement + num_wraparounds * dram.max_energy_uj)
}
}
}
Expand All @@ -171,6 +187,10 @@ impl CpuManager for RaplCpu {
handle.abort();
}
}

fn is_dram_available(&self) -> bool {
self.dram.is_some()
}
}

fn read_u64(path: &PathBuf) -> anyhow::Result<u64, std::io::Error> {
Expand All @@ -194,19 +214,15 @@ async fn read_u64_async(path: &PathBuf) -> Result<u64, std::io::Error> {
async fn monitor_rapl(rapl_file: Arc<PackageInfo>) -> anyhow::Result<(), ZeusdError> {
let mut last_energy_uj = read_u64_async(&rapl_file.energy_uj_path)
.await
.map_err(|_| {
ZeusdError::CpuManagementTaskTerminatedError(rapl_file.index.try_into().unwrap())
})?;
.map_err(|_| ZeusdError::CpuManagementTaskTerminatedError(rapl_file.index))?;
tracing::info!(
"Monitoring started for {}",
rapl_file.energy_uj_path.display()
);
loop {
let current_energy_uj = read_u64_async(&rapl_file.energy_uj_path)
.await
.map_err(|_| {
ZeusdError::CpuManagementTaskTerminatedError(rapl_file.index.try_into().unwrap())
})?;
.map_err(|_| ZeusdError::CpuManagementTaskTerminatedError(rapl_file.index))?;

if current_energy_uj < last_energy_uj {
let mut wraparound_guard = rapl_file.num_wraparounds.write().unwrap();
Expand Down
24 changes: 14 additions & 10 deletions zeusd/src/devices/cpu/macos.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
//! Fake `RaplCpu` implementation to allow development and testing on MacOS.
use std::path::PathBuf;
use std::sync::RwLock;
use std::sync::{Arc, RwLock};

use crate::devices::cpu::{CpuManager, PackageInfo};
use crate::error::ZeusdError;

pub struct RaplCpu {
cpu: PackageInfo,
dram: Option<PackageInfo>
cpu: Arc<PackageInfo>,
dram: Option<Arc<PackageInfo>>
}

impl RaplCpu {
Expand All @@ -25,24 +25,24 @@ impl CpuManager for RaplCpu {
Ok(1)
}

fn get_available_fields(_index: usize) -> Result<(PackageInfo, Option<PackageInfo>), ZeusdError> {
fn get_available_fields(_index: usize) -> Result<(Arc<PackageInfo>, Option<Arc<PackageInfo>>), ZeusdError> {
Ok(
(
PackageInfo{
Arc::new(PackageInfo{
index: _index,
name: "package-0".to_string(),
energy_uj_path: PathBuf::from("/sys/class/powercap/intel-rapl/intel-rapl:0/energy_uj"),
max_energy_uj: 1000000,
num_wraparounds: RwLock::new(0)
},
}),
Some(
PackageInfo{
Arc::new(PackageInfo{
index: _index,
name: "dram".to_string(),
energy_uj_path: PathBuf::from("/sys/class/powercap/intel-rapl/intel-rapl:0/intel-rapl:0:0/energy_uj"),
max_energy_uj: 1000000,
num_wraparounds: RwLock::new(0)
}
})
)
)
)
Expand All @@ -52,11 +52,15 @@ impl CpuManager for RaplCpu {
Ok(10001)
}

fn get_dram_energy(&self) -> Result<Option<u64>, ZeusdError> {
Ok(Some(1001))
fn get_dram_energy(&self) -> Result<u64, ZeusdError> {
Ok(1001)
}

fn stop_monitoring(&mut self) {

}

fn is_dram_available(&self) -> bool {
true
}
}
Loading

0 comments on commit b9b93cd

Please sign in to comment.