Skip to content

Commit

Permalink
Various pprof improvements
Browse files Browse the repository at this point in the history
Send the right profile duration. The profile start time is still the end
time rather than the start time, but this is offset by the profiling
session duration (5s) so it's acceptable as of now.

There are planned changes on the way samples are collected from the
unwinders so the start time will be tackled then.

Test Plan
=========

CI
  • Loading branch information
javierhonduco committed Nov 4, 2024
1 parent 3537153 commit c3218a0
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 55 deletions.
29 changes: 15 additions & 14 deletions lightswitch-proto/src/profile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use std::time::{Duration, SystemTime};
use anyhow::{anyhow, Result};

pub struct PprofBuilder {
time_nanos: i64,
duration: Duration,
freq_in_hz: i64,

Expand All @@ -38,10 +39,14 @@ pub enum LabelStringOrNumber {
}

impl PprofBuilder {
pub fn new(duration: Duration, freq_in_hz: i64) -> Self {
pub fn new(profile_start: SystemTime, duration: Duration, freq_in_hz: u64) -> Self {
Self {
time_nanos: profile_start
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_nanos() as i64,
duration,
freq_in_hz,
freq_in_hz: freq_in_hz as i64,

known_mappings: HashMap::new(),
mappings: Vec::new(),
Expand Down Expand Up @@ -274,7 +279,7 @@ impl PprofBuilder {
label
}

pub fn profile(mut self) -> pprof::Profile {
pub fn build(mut self) -> pprof::Profile {
let sample_type = pprof::ValueType {
r#type: self.get_or_insert_string("samples"),
unit: self.get_or_insert_string("count"),
Expand All @@ -299,11 +304,7 @@ impl PprofBuilder {
string_table: self.string_table,
drop_frames: 0,
keep_frames: 0,
// TODO: change this to send the time when the profile was collected.
time_nanos: SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_nanos() as i64,
time_nanos: self.time_nanos,
duration_nanos: self.duration.as_nanos() as i64,
period_type: Some(period_type),
period: 1_000_000_000 / self.freq_in_hz,
Expand All @@ -324,7 +325,7 @@ mod tests {

#[test]
fn test_string_table() {
let mut pprof = PprofBuilder::new(Duration::from_secs(5), 27);
let mut pprof = PprofBuilder::new(SystemTime::now(), Duration::from_secs(5), 27);
assert_eq!(pprof.get_or_insert_string("hi"), 1);
assert_eq!(pprof.get_or_insert_string("salut"), 2);
assert_eq!(pprof.string_table, vec!["", "hi", "salut"]);
Expand All @@ -337,7 +338,7 @@ mod tests {

#[test]
fn test_mappings() {
let mut pprof = PprofBuilder::new(Duration::from_secs(5), 27);
let mut pprof = PprofBuilder::new(SystemTime::now(), Duration::from_secs(5), 27);
assert_eq!(
pprof.add_mapping(0, 0x100, 0x200, 0x0, "file.so", "sha256-abc"),
1
Expand All @@ -355,7 +356,7 @@ mod tests {

#[test]
fn test_locations() {
let mut pprof = PprofBuilder::new(Duration::from_secs(5), 27);
let mut pprof = PprofBuilder::new(SystemTime::now(), Duration::from_secs(5), 27);
let _ = pprof.add_line("hahahaha-first-line");
let (line, function_id) = pprof.add_line("test-line");

Expand All @@ -382,7 +383,7 @@ mod tests {

#[test]
fn test_sample() {
let mut pprof = PprofBuilder::new(Duration::from_secs(5), 27);
let mut pprof = PprofBuilder::new(SystemTime::now(), Duration::from_secs(5), 27);
let labels = vec![
pprof.new_label("key", LabelStringOrNumber::String("value".into())),
pprof.new_label("key", LabelStringOrNumber::Number(123, "pid".into())),
Expand Down Expand Up @@ -414,7 +415,7 @@ mod tests {
use rand::Rng;

let mut rng = rand::thread_rng();
let mut pprof = PprofBuilder::new(Duration::from_secs(5), 27);
let mut pprof = PprofBuilder::new(SystemTime::now(), Duration::from_secs(5), 27);
let raw_samples = vec![
(vec![123], 200),
(vec![0, 20, 30, 40, 50], 900),
Expand Down Expand Up @@ -445,6 +446,6 @@ mod tests {
}

assert!(pprof.validate().is_ok());
pprof.profile();
pprof.build();
}
}
24 changes: 18 additions & 6 deletions src/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ impl Collector for NullCollector {
pub struct StreamingCollector {
local_symbolizer: bool,
pprof_ingest_url: String,
timeout: Duration,
http_client_timeout: Duration,
profile_duration: Duration,
profile_frequency_hz: u64,
procs: HashMap<i32, ProcessInfo>,
objs: HashMap<ExecutableId, ObjectFileInfo>,
metadata_provider: ThreadSafeGlobalMetadataProvider,
Expand All @@ -80,12 +82,16 @@ impl StreamingCollector {
pub fn new(
local_symbolizer: bool,
pprof_ingest_url: &str,
profile_duration: Duration,
profile_frequency_hz: u64,
metadata_provider: ThreadSafeGlobalMetadataProvider,
) -> Self {
Self {
local_symbolizer,
pprof_ingest_url: pprof_ingest_url.into(),
timeout: Duration::from_secs(30),
http_client_timeout: Duration::from_secs(30),
profile_duration,
profile_frequency_hz,
metadata_provider,
..Default::default()
}
Expand All @@ -107,14 +113,20 @@ impl Collector for StreamingCollector {
profile = symbolize_profile(&profile, procs, objs);
}

let pprof_builder = to_pprof(profile, procs, objs, &self.metadata_provider);
let pprof = pprof_builder.profile();
let pprof_profile = to_pprof(
profile,
procs,
objs,
&self.metadata_provider,
self.profile_duration,
self.profile_frequency_hz,
);

let client_builder = reqwest::blocking::Client::builder().timeout(self.timeout);
let client_builder = reqwest::blocking::Client::builder().timeout(self.http_client_timeout);
let client = client_builder.build().unwrap();
let response = client
.post(self.pprof_ingest_url.clone())
.body(pprof.encode_to_vec())
.body(pprof_profile.encode_to_vec())
.send();

tracing::debug!("http response: {:?}", response);
Expand Down
40 changes: 23 additions & 17 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,16 @@ use lightswitch::unwind_info::compact_unwind_info;
use lightswitch::unwind_info::CompactUnwindInfoBuilder;
use lightswitch_object::ObjectFile;

const SAMPLE_FREQ_RANGE: RangeInclusive<usize> = 1..=1009;
const SAMPLE_FREQ_RANGE: RangeInclusive<u64> = 1..=1009;
const PPROF_INGEST_URL: &str = "http://localhost:4567/pprof/new";

fn parse_duration(arg: &str) -> Result<Duration, std::num::ParseIntError> {
let seconds = arg.parse()?;
Ok(Duration::from_secs(seconds))
}

fn sample_freq_in_range(s: &str) -> Result<u16, String> {
let sample_freq: usize = s
fn sample_freq_in_range(s: &str) -> Result<u64, String> {
let sample_freq: u64 = s
.parse()
.map_err(|_| format!("`{s}' isn't a valid frequency"))?;
if !SAMPLE_FREQ_RANGE.contains(&sample_freq) {
Expand All @@ -50,8 +50,8 @@ fn sample_freq_in_range(s: &str) -> Result<u16, String> {
SAMPLE_FREQ_RANGE.end()
));
}
if !is_prime(sample_freq.try_into().unwrap()) {
let ba_result = primes_before_after(sample_freq);
if !is_prime(sample_freq) {
let ba_result = primes_before_after(sample_freq as usize);
match ba_result {
Ok((prime_before, prime_after)) => {
return Err(format!(
Expand All @@ -62,7 +62,7 @@ fn sample_freq_in_range(s: &str) -> Result<u16, String> {
Err(_) => println!("primes_before_after should not have failed"),
}
}
Ok(sample_freq as u16)
Ok(sample_freq)
}

// Clap value_parser() in the form of: Fn(&str) -> Result<T,E>
Expand All @@ -85,7 +85,7 @@ fn value_is_power_of_two(s: &str) -> Result<usize, String> {
fn primes_before_after(non_prime: usize) -> Result<(usize, usize), String> {
// If a prime number passed in, return Err
if is_prime(non_prime.try_into().unwrap()) {
return Err(format!("{} IS prime", non_prime));
return Err(format!("{} is prime", non_prime));
}
// What is the count (not value) of the prime just before our non_prime?
let n_before: usize = primal::StreamingSieve::prime_pi(non_prime);
Expand Down Expand Up @@ -168,7 +168,7 @@ struct Cli {
#[arg(long, default_value_t = ProfilerConfig::default().sample_freq, value_name = "SAMPLE_FREQ_IN_HZ",
value_parser = sample_freq_in_range,
)]
sample_freq: u16,
sample_freq: u64,
/// Output file for Flame Graph in SVG format
#[arg(long, default_value_t, value_enum)]
profile_format: ProfileFormat,
Expand Down Expand Up @@ -300,6 +300,8 @@ fn main() -> Result<(), Box<dyn Error>> {
ProfileSender::Remote => Box::new(StreamingCollector::new(
args.symbolizer == Symbolizer::Local,
PPROF_INGEST_URL,
ProfilerConfig::default().session_duration,
args.sample_freq,
metadata_provider.clone(),
)) as Box<dyn Collector + Send>,
}));
Expand Down Expand Up @@ -328,7 +330,7 @@ fn main() -> Result<(), Box<dyn Error>> {

let mut p: Profiler<'_> = Profiler::new(profiler_config, stop_signal_receive);
p.profile_pids(args.pids);
p.run(collector.clone());
let profile_duration = p.run(collector.clone());

let collector = collector.lock().unwrap();
let (mut profile, procs, objs) = collector.finish();
Expand Down Expand Up @@ -370,9 +372,15 @@ fn main() -> Result<(), Box<dyn Error>> {
}
ProfileFormat::Pprof => {
let mut buffer = Vec::new();
let proto = to_pprof(profile, procs, objs, &metadata_provider);
proto.validate().unwrap();
proto.profile().encode(&mut buffer).unwrap();
let pprof_profile = to_pprof(
profile,
procs,
objs,
&metadata_provider,
profile_duration,
args.sample_freq,
);
pprof_profile.encode(&mut buffer).unwrap();
let profile_name = args.profile_name.unwrap_or_else(|| "profile.pb".into());
let profile_path = profile_path.join(profile_name);
let mut pprof_file = File::create(&profile_path).unwrap();
Expand Down Expand Up @@ -453,26 +461,24 @@ mod tests {
let result = Cli::try_parse_from(myargs.iter());
match result {
Ok(config) => {
// println!("{:?}", config);
assert_eq!(config.sample_freq, desired_freq.parse::<u16>().unwrap());
assert_eq!(config.sample_freq, desired_freq.parse::<u64>().unwrap());
}
Err(err) => {
let actual_message = err.to_string();
// println!("Errored with: {}", actual_message);
assert!(actual_message.contains(expected_msg.as_str()));
}
}
}

#[rstest]
#[case(49, (47,53), "")]
#[case(97, (0, 0), "97 IS prime")]
#[case(97, (0, 0), "97 is prime")]
#[case(100, (97,101), "")]
#[case(398, (397,401), "")]
#[case(500, (499,503), "")]
#[case(1000, (997, 1009), "")]
#[case(1001, (997, 1009), "")]
#[case(1009, (0, 0), "1009 IS prime")]
#[case(1009, (0, 0), "1009 is prime")]
fn test_primes_before_after(
#[case] non_prime: usize,
#[case] expected_tuple: (usize, usize),
Expand Down
15 changes: 10 additions & 5 deletions src/profile/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ use lightswitch_metadata_provider::metadata_provider::{TaskKey, ThreadSafeGlobal
use lightswitch_metadata_provider::taskname::TaskName;

use lightswitch_proto::profile::pprof::Label;
use lightswitch_proto::profile::{LabelStringOrNumber, PprofBuilder};
use lightswitch_proto::profile::{pprof, LabelStringOrNumber, PprofBuilder};

use std::collections::HashMap;
use std::fmt::Write;
use std::path::PathBuf;
use std::time::Duration;
use std::time::SystemTime;
use tracing::{debug, error, span, Level};

use crate::ksym::Ksym;
Expand Down Expand Up @@ -39,10 +40,13 @@ pub fn to_pprof(
procs: &HashMap<i32, ProcessInfo>,
objs: &HashMap<ExecutableId, ObjectFileInfo>,
metadata_provider: &ThreadSafeGlobalMetadataProvider,
) -> PprofBuilder {
// TODO: pass right duration and frequency.
let mut pprof = PprofBuilder::new(Duration::from_secs(5), 27);
profile_duration: Duration,
profile_frequency_hz: u64,
) -> pprof::Profile {
// Not exactly when the profile session really started but works for now.
let profile_start = SystemTime::now();

let mut pprof = PprofBuilder::new(profile_start, profile_duration, profile_frequency_hz);
let mut task_to_labels: HashMap<i32, Vec<Label>> = HashMap::new();

for sample in profile {
Expand Down Expand Up @@ -161,7 +165,8 @@ pub fn to_pprof(
});
pprof.add_sample(location_ids, sample.count as i64, labels);
}
pprof

pprof.build()
}

/// Converts a collection of symbolized aggregated profiles to their folded representation that most flamegraph renderers use.
Expand Down
Loading

0 comments on commit c3218a0

Please sign in to comment.