Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scx_rustland: prevent starvation and improve responsiveness #60

Merged
merged 8 commits into from
Jan 2, 2024
38 changes: 15 additions & 23 deletions scheds/rust/scx_rustland/src/bpf/main.bpf.c
Original file line number Diff line number Diff line change
Expand Up @@ -252,20 +252,17 @@ static void dispatch_local(struct task_struct *p, u64 enq_flags)
*/
static void dispatch_on_cpu(struct task_struct *p, s32 cpu, u64 enq_flags)
{
/*
* If it's not possible to dispatch on the selected CPU, re-use the
* previously used one.
*/
if (!bpf_cpumask_test_cpu(cpu, p->cpus_ptr))
cpu = scx_bpf_task_cpu(p);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should work fine but can you please verify that changing CPU affinity for a constantly running task which is bound to one CPU works? ie. Have one thread which busy loops, bind it to one cpu using taskset, and then change it to another CPU.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, it seems to work:

09:10 arighi@gpd$ yes >/dev/null &
[1] 1797739
6.7.0-3-generic ~
09:10 arighi@gpd$ taskset -pc 2 $!
pid 1797739's current affinity list: 0-7
pid 1797739's new affinity list: 2
6.7.0-3-generic ~
09:10 arighi@gpd$ cut -d' ' -f39 /proc/$!/stat
2
6.7.0-3-generic ~
09:11 arighi@gpd$ taskset -pc 5 $!
pid 1797739's current affinity list: 2
pid 1797739's new affinity list: 5
6.7.0-3-generic ~
09:11 arighi@gpd$ cut -d' ' -f39 /proc/$!/stat
5

Of course it changes cpu only if the task is doing something (i.e. a sleep infinity won't change cpu), but the same happens with the default scheduler, so we shouldn't change any behavior here.

dbg_msg("%s: pid=%d cpu=%ld", __func__, p->pid, cpu);
scx_bpf_dispatch(p, SCX_DSQ_LOCAL_ON | cpu, slice_ns,
enq_flags | SCX_ENQ_LOCAL);
}

/*
* Dispatch a task on the global FIFO.
*/
static void dispatch_global(struct task_struct *p, u64 enq_flags)
{
dbg_msg("%s: pid=%d", __func__, p->pid);
scx_bpf_dispatch(p, SCX_DSQ_GLOBAL, slice_ns, enq_flags);
}

/*
* Select the target CPU where a task can be directly dispatched to from
* .enqueue().
Expand Down Expand Up @@ -335,12 +332,6 @@ static bool is_task_cpu_available(struct task_struct *p, u64 enq_flags)
if (is_kthread(p) && p->scx.slice > slice_ns / 2)
return true;

/*
* No scheduling required if it's the last task running.
*/
if (enq_flags & SCX_ENQ_LAST)
return true;

/*
* For regular tasks always rely on force_local to determine if we can
* bypass the scheduler.
Expand Down Expand Up @@ -395,14 +386,15 @@ void BPF_STRUCT_OPS(rustland_enqueue, struct task_struct *p, u64 enq_flags)
* processed by the user-space scheduler.
*
* If @queued list is full (user-space scheduler is congested) tasks
* will be dispatched directly from the kernel to the global FIFO.
* will be dispatched directly from the kernel (re-using their
* previously used CPU in this case).
*/
get_task_info(&task, p);
dbg_msg("enqueue: pid=%d", task.pid);
if (bpf_map_push_elem(&queued, &task, 0)) {
dbg_msg("scheduler congested: pid=%d", task.pid);
__sync_fetch_and_add(&nr_sched_congested, 1);
dispatch_global(p, enq_flags);
dispatch_on_cpu(p, task.cpu, enq_flags);
__sync_fetch_and_add(&nr_kernel_dispatches, 1);
return;
}
Expand All @@ -424,7 +416,11 @@ static void dispatch_user_scheduler(void)
scx_bpf_error("Failed to find usersched task %d", usersched_pid);
return;
}
dispatch_global(p, 0);
/*
* Always try to dispatch the user-space scheduler on the current CPU,
* if possible.
*/
dispatch_on_cpu(p, bpf_get_smp_processor_id(), 0);
__sync_fetch_and_add(&nr_kernel_dispatches, 1);
bpf_task_release(p);
}
Expand All @@ -445,7 +441,6 @@ void BPF_STRUCT_OPS(rustland_dispatch, s32 cpu, struct task_struct *prev)
bpf_repeat(MAX_ENQUEUED_TASKS) {
struct task_struct *p;
struct dispatched_task_ctx task;
s32 prev_cpu;

if (!scx_bpf_dispatch_nr_slots())
break;
Expand All @@ -465,10 +460,7 @@ void BPF_STRUCT_OPS(rustland_dispatch, s32 cpu, struct task_struct *prev)
*/
dbg_msg("usersched: pid=%d cpu=%d payload=%llu",
task.pid, task.cpu, task.payload);
if (bpf_cpumask_test_cpu(task.cpu, p->cpus_ptr))
dispatch_on_cpu(p, task.cpu, 0);
else
dispatch_global(p, 0);
dispatch_on_cpu(p, task.cpu, 0);
__sync_fetch_and_add(&nr_user_dispatches, 1);
bpf_task_release(p);
}
Expand Down
96 changes: 61 additions & 35 deletions scheds/rust/scx_rustland/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::{Duration, SystemTime};

use std::fs::metadata;
use std::fs::File;
use std::io::{self, Read};
use std::path::Path;
Expand Down Expand Up @@ -158,15 +159,18 @@ impl TaskInfoMap {
}
}

// Get an item (as mutable) from the HashMap (by pid)
fn get_mut(&mut self, pid: i32) -> Option<&mut TaskInfo> {
self.tasks.get_mut(&pid)
}

// Add or update an item in the HashMap (by pid), if the pid is already present the item will
// be replaced (updated)
fn insert(&mut self, pid: i32, task: TaskInfo) {
self.tasks.insert(pid, task);
// Clean up old entries (pids that don't exist anymore).
fn gc(&mut self) {
fn is_pid_running(pid: i32) -> bool {
let path = format!("/proc/{}", pid);
metadata(path).is_ok()
}
let pids: Vec<i32> = self.tasks.keys().cloned().collect();
for pid in pids {
if !is_pid_running(pid) {
self.tasks.remove(&pid);
}
}
}
}

Expand Down Expand Up @@ -331,16 +335,18 @@ impl<'a> Scheduler<'a> {
sum_exec_runtime: u64,
weight: u64,
min_vruntime: u64,
max_slice_ns: u64,
) {
// Add cputime delta normalized by weight to the vruntime (if delta > 0).
if sum_exec_runtime > task_info.sum_exec_runtime {
let delta = (sum_exec_runtime - task_info.sum_exec_runtime) / weight;
task_info.vruntime += delta;
let delta = (sum_exec_runtime - task_info.sum_exec_runtime) * 100 / weight;
// Never account more than max_slice_ns. This helps to prevent starving a task for too
// long in the scheduler task pool.
task_info.vruntime += delta.min(max_slice_ns);
}
// Make sure vruntime is moving forward (> current minimum).
if min_vruntime > task_info.vruntime {
task_info.vruntime = min_vruntime;
}
task_info.vruntime = task_info.vruntime.max(min_vruntime);

// Update total task cputime.
task_info.sum_exec_runtime = sum_exec_runtime;
}
Expand All @@ -354,24 +360,36 @@ impl<'a> Scheduler<'a> {
loop {
match queued.lookup_and_delete(&[]) {
Ok(Some(msg)) => {
// Schedule the task and update task information.
// Extract the task object from the message.
let task = EnqueuedMessage::from_bytes(msg.as_slice()).as_queued_task_ctx();
if let Some(task_info) = self.task_map.get_mut(task.pid) {
Self::update_enqueued(
task_info,
task.sum_exec_runtime,
task.weight,
self.min_vruntime,
);
self.task_pool.push(task.pid, task.cpu, task_info.vruntime);
} else {
let task_info = TaskInfo {
sum_exec_runtime: task.sum_exec_runtime,
vruntime: self.min_vruntime,
};
self.task_map.insert(task.pid, task_info);
self.task_pool.push(task.pid, task.cpu, self.min_vruntime);
}

// Get task information if the task is already stored in the task map,
// otherwise create a new entry for it.
let task_info =
self.task_map
.tasks
.entry(task.pid)
.or_insert_with_key(|&_pid| TaskInfo {
sum_exec_runtime: task.sum_exec_runtime,
vruntime: self.min_vruntime,
});

// Update task information.
Self::update_enqueued(
task_info,
task.sum_exec_runtime,
task.weight,
// Make sure the global vruntime is always progressing (at least by +1)
// during each scheduler run, providing a priority boost to newer tasks
// (that is still beneficial for potential short-lived tasks), while also
// preventing excessive starvation of the other tasks sitting in the
// self.task_pool tree, waiting to be dispatched.
self.min_vruntime + 1,
self.skel.rodata().slice_ns,
);

// Insert task in the task pool (ordered by vruntime).
self.task_pool.push(task.pid, task.cpu, task_info.vruntime);
}
Ok(None) => {
// Reset nr_queued and update nr_scheduled, to notify the dispatcher that
Expand Down Expand Up @@ -410,10 +428,10 @@ impl<'a> Scheduler<'a> {
//
// Use the previously used CPU if idle, that is always the best choice (to
// mitigate migration overhead), otherwise pick the next idle CPU available.
if let Some(id) = idle_cpus.iter().position(|&x| x == task.cpu) {
if let Some(pos) = idle_cpus.iter().position(|&x| x == task.cpu) {
// The CPU assigned to the task is in idle_cpus, keep the assignment and
// remove the CPU from idle_cpus.
idle_cpus.remove(id);
idle_cpus.remove(pos);
} else {
// The CPU assigned to the task is not in idle_cpus, pop the first CPU from
// idle_cpus and assign it to the task.
Expand Down Expand Up @@ -484,10 +502,14 @@ impl<'a> Scheduler<'a> {
}
}

// Print internal scheduler statistics (fetched from the BPF part)
// Print internal scheduler statistics (fetched from the BPF part).
fn print_stats(&mut self) {
// Show minimum vruntime (this should be constantly incrementing).
info!("vruntime={}", self.min_vruntime);
info!(
"vruntime={} tasks={}",
self.min_vruntime,
self.task_map.tasks.len()
);

// Show general statistics.
let nr_user_dispatches = self.skel.bss().nr_user_dispatches as u64;
Expand Down Expand Up @@ -538,7 +560,11 @@ impl<'a> Scheduler<'a> {

// Print scheduler statistics every second.
if elapsed > Duration::from_secs(1) {
// Free up unused scheduler resources.
self.task_map.gc();
// Print scheduler statistics.
self.print_stats();

prev_ts = curr_ts;
}
}
Expand Down