Skip to content

Commit

Permalink
Merge pull request #59 from arighi/lowlatency-improvements
Browse files Browse the repository at this point in the history
scx_rustland: lowlatency improvements
  • Loading branch information
htejun authored Dec 31, 2023
2 parents 804180a + 1cdcb8a commit 70803d5
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 8 deletions.
32 changes: 25 additions & 7 deletions scheds/rust/scx_rustland/src/bpf/main.bpf.c
Original file line number Diff line number Diff line change
Expand Up @@ -309,19 +309,38 @@ s32 BPF_STRUCT_OPS(rustland_select_cpu, struct task_struct *p, s32 prev_cpu,
* .select_cpu()), since this function may be called on a different CPU (so we
* cannot check the current CPU directly).
*/
static bool is_task_cpu_available(struct task_struct *p)
static bool is_task_cpu_available(struct task_struct *p, u64 enq_flags)
{
struct task_ctx *tctx;

/*
* Always dispatch per-CPU kthread on the same CPU, bypassing the
* Always dispatch per-CPU kthreads on the same CPU, bypassing the
* user-space scheduler (in this way we can to prioritize critical
* kernel threads that may potentially slow down the entire system if
* they are blocked for too long).
*/
if (is_kthread(p) && p->nr_cpus_allowed == 1)
return true;

/*
* Moreover, immediately dispatch kthreads that still have more than
* half of their runtime budget. As they are likely to release the CPU
* soon, granting them a substantial priority boost can enhance the
* overall system performance.
*
* In the event that one of these kthreads turns into a CPU hog, it
* will deplete its runtime budget and therefore it will be scheduled
* like any other normal task.
*/
if (is_kthread(p) && p->scx.slice > slice_ns / 2)
return true;

/*
* No scheduling required if it's the last task running.
*/
if (enq_flags & SCX_ENQ_LAST)
return true;

/*
* For regular tasks always rely on force_local to determine if we can
* bypass the scheduler.
Expand Down Expand Up @@ -365,7 +384,7 @@ void BPF_STRUCT_OPS(rustland_enqueue, struct task_struct *p, u64 enq_flags)
* Dispatch the task on the local FIFO directly if the selected task's
* CPU is available (no scheduling decision required).
*/
if (is_task_cpu_available(p)) {
if (is_task_cpu_available(p, enq_flags)) {
dispatch_local(p, enq_flags);
__sync_fetch_and_add(&nr_kernel_dispatches, 1);
return;
Expand Down Expand Up @@ -444,11 +463,9 @@ void BPF_STRUCT_OPS(rustland_dispatch, s32 cpu, struct task_struct *prev)
* task and migrate (if possible); otherwise, dispatch on the
* global DSQ.
*/
prev_cpu = scx_bpf_task_cpu(p);
dbg_msg("usersched: pid=%d prev_cpu=%d cpu=%d payload=%llu",
dbg_msg("usersched: pid=%d cpu=%d payload=%llu",
task.pid, task.cpu, task.payload);
if ((task.cpu != prev_cpu) &&
bpf_cpumask_test_cpu(task.cpu, p->cpus_ptr))
if (bpf_cpumask_test_cpu(task.cpu, p->cpus_ptr))
dispatch_on_cpu(p, task.cpu, 0);
else
dispatch_global(p, 0);
Expand Down Expand Up @@ -626,6 +643,7 @@ struct sched_ext_ops rustland = {
.prep_enable = (void *)rustland_prep_enable,
.init = (void *)rustland_init,
.exit = (void *)rustland_exit,
.flags = SCX_OPS_ENQ_LAST,
.timeout_ms = 5000,
.name = "rustland",
};
45 changes: 44 additions & 1 deletion scheds/rust/scx_rustland/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::{Duration, SystemTime};

use std::fs::File;
use std::io::{self, Read};
use std::path::Path;

use anyhow::bail;
use anyhow::Context;
use anyhow::Result;
Expand Down Expand Up @@ -449,6 +453,37 @@ impl<'a> Scheduler<'a> {
thread::yield_now();
}

// Get the current CPU where the scheduler is running.
fn get_current_cpu() -> io::Result<i32> {
// Open /proc/self/stat file
let path = Path::new("/proc/self/stat");
let mut file = File::open(path)?;

// Read the content of the file into a String
let mut content = String::new();
file.read_to_string(&mut content)?;

// Split the content into fields using whitespace as the delimiter
let fields: Vec<&str> = content.split_whitespace().collect();

// Parse the 39th field as an i32 and return it.
if let Some(field) = fields.get(38) {
if let Ok(value) = field.parse::<i32>() {
Ok(value)
} else {
Err(io::Error::new(
io::ErrorKind::InvalidData,
"Unable to parse current CPU information as i32",
))
}
} else {
Err(io::Error::new(
io::ErrorKind::InvalidData,
"Unable to get current CPU information",
))
}
}

// Print internal scheduler statistics (fetched from the BPF part)
fn print_stats(&mut self) {
// Show minimum vruntime (this should be constantly incrementing).
Expand All @@ -473,9 +508,17 @@ impl<'a> Scheduler<'a> {
);

// Show tasks that are currently running.
let sched_cpu = match Self::get_current_cpu() {
Ok(cpu_info) => cpu_info,
Err(_) => -1,
};
info!("Running tasks:");
for cpu in 0..self.nr_cpus_online {
let pid = self.get_cpu_pid(cpu);
let pid = if cpu == sched_cpu {
"[self]".to_string()
} else {
self.get_cpu_pid(cpu).to_string()
};
info!(" cpu={} pid={}", cpu, pid);
}

Expand Down

0 comments on commit 70803d5

Please sign in to comment.