Skip to content

Commit

Permalink
scx_rustland: introduce nr_waiting concept
Browse files Browse the repository at this point in the history
We want to activate the user-space scheduler only when there are pending
tasks that require scheduling actions.

To do so we keep track of the queued tasks via nr_queued, that is
incremented in .enqueue() when a task is sent to the user-space
scheduler and decremented in .dispatch() when a task is dispatched.

However, we may trigger an unbalance if the same pid is sent to the
scheduler multiple times (because the scheduler store all the tasks by
their unique pid).

When this happens nr_queued is never decremented back to 0, leading the
user-space scheduler to constantly spin, even if there's no activity to
do.

To prevent this from happening split nr_queued into nr_queued and
nr_scheduled. The former will be updated by the BPF component every time
that a task is sent to the scheduler and it's up to the user-space
scheduler to reset the counter when the queue is fully dreained. The
latter is maintained by the user-space scheduler and represents the
amount of tasks that are still processed by the scheduler and are
waiting to be dispatched.

The sum of nr_queued + nr_scheduled will be called nr_waiting and we can
rely on this metric to determine if the user-space scheduler has some
pending work to do or not.

This change makes rust_rustland more reliable and it strongly reduces
the CPU usage of the user-space scheduler by eliminating a lot of
unnecessary activations.

Signed-off-by: Andrea Righi <[email protected]>
  • Loading branch information
Andrea Righi committed Dec 29, 2023
1 parent d67dfe5 commit e90bc92
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 13 deletions.
54 changes: 46 additions & 8 deletions scheds/rust/scx_rustland/src/bpf/main.bpf.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,25 @@ u32 usersched_pid; /* User-space scheduler PID */
const volatile bool switch_partial; /* Switch all tasks or SCHED_EXT tasks */
const volatile u64 slice_ns = SCX_SLICE_DFL; /* Base time slice duration */

/* Statistics */
u64 nr_queued, nr_user_dispatches, nr_kernel_dispatches, nr_sched_congested;
/*
* Number of tasks that are queued for scheduling.
*
* This number is incremented by the BPF component when a task is queued to the
* user-space scheduler and it must be decremented by the user-space scheduler
* when a task is consumed.
*/
volatile u64 nr_queued;

/*
* Number of tasks that are waiting for scheduling.
*
* This number must be updated by the user-space scheduler to keep track if
* there is still some scheduling work to do.
*/
volatile u64 nr_scheduled;

/* Misc statistics */
volatile u64 nr_user_dispatches, nr_kernel_dispatches, nr_sched_congested;

/* Report additional debugging information */
const volatile bool debug;
Expand Down Expand Up @@ -417,7 +434,6 @@ void BPF_STRUCT_OPS(rustland_dispatch, s32 cpu, struct task_struct *prev)
/* Pop first task from the dispatched queue */
if (bpf_map_pop_elem(&dispatched, &task))
break;
__sync_fetch_and_sub(&nr_queued, 1);

/* Ignore entry if the task doesn't exist anymore */
p = bpf_task_from_pid(task.pid);
Expand Down Expand Up @@ -483,13 +499,35 @@ void BPF_STRUCT_OPS(rustland_update_idle, s32 cpu, bool idle)
return;
/*
* A CPU is now available, notify the user-space scheduler that tasks
* can be dispatched, if there is at least one task queued (ready to be
* scheduled).
* can be dispatched, if there is at least one task waiting to be
* scheduled, either queued (accounted in nr_queued) or scheduled
* (accounted in nr_scheduled).
*
* NOTE: nr_queued is incremented by the BPF component, more exactly in
* enqueue(), when a task is sent to the user-space scheduler, then
* the scheduler drains the queued tasks (updating nr_queued) and adds
* them to its internal data structures / state; at this point tasks
* become "scheduled" and the user-space scheduler will take care of
* updating nr_scheduled accordingly; lastly tasks will be dispatched
* and the user-space scheduler will update nr_scheduled again.
*
* Moreover, kick the CPU to make it immediately ready to accept
* dispatched tasks.
* Checking both counters allows to determine if there is still some
* pending work to do for the scheduler: new tasks have been queued
* since last check, or there are still tasks "queued" or "scheduled"
* since the previous user-space scheduler run. If the counters are
* both zero it is pointless to wake-up the scheduler (even if a CPU
* becomes idle), because there is nothing to do.
*
* Keep in mind that update_idle() doesn't run concurrently with the
* user-space scheduler (that is single-threaded): this function is
* naturally serialized with the user-space scheduler code, therefore
* this check here is also safe from a concurrency perspective.
*/
if (__sync_fetch_and_add(&nr_queued, 0)) {
if (nr_queued || nr_scheduled) {
/*
* Kick the CPU to make it immediately ready to accept
* dispatched tasks.
*/
scx_bpf_kick_cpu(cpu, 0);
set_usersched_needed();
}
Expand Down
35 changes: 30 additions & 5 deletions scheds/rust/scx_rustland/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,14 @@ impl<'a> Scheduler<'a> {
self.task_pool.push(task.pid, cpu, self.min_vruntime);
}
}
Ok(None) => break,
Ok(None) => {
// Reset nr_queued and update nr_scheduled, to notify the dispatcher that
// queued tasks are drained, but there is still some work left to do in the
// scheduler.
self.skel.bss_mut().nr_queued = 0;
self.skel.bss_mut().nr_scheduled = self.task_pool.tasks.len() as u64;
break;
}
Err(err) => {
warn!("Error: {}", err);
break;
Expand Down Expand Up @@ -429,6 +436,10 @@ impl<'a> Scheduler<'a> {
None => break,
}
}
// Reset nr_scheduled to notify the dispatcher that all the tasks received by the scheduler
// has been dispatched, so there is no reason to re-activate the scheduler, unless more
// tasks are queued.
self.skel.bss_mut().nr_scheduled = self.task_pool.tasks.len() as u64;
}

// Main scheduling function (called in a loop to periodically drain tasks from the queued list
Expand All @@ -443,18 +454,32 @@ impl<'a> Scheduler<'a> {

// Print internal scheduler statistics (fetched from the BPF part)
fn print_stats(&mut self) {
let nr_queued = self.skel.bss().nr_queued as u64;
// Show minimum vruntime (this should be constantly incrementing).
info!("vruntime={}", self.min_vruntime);

// Show general statistics.
let nr_user_dispatches = self.skel.bss().nr_user_dispatches as u64;
let nr_kernel_dispatches = self.skel.bss().nr_kernel_dispatches as u64;
let nr_sched_congested = self.skel.bss().nr_sched_congested as u64;
info!(
" nr_user_dispatched={} nr_kernel_dispatches={} nr_sched_congested={}",
nr_user_dispatches, nr_kernel_dispatches, nr_sched_congested
);

// Show tasks that are waiting to be dispatched.
let nr_queued = self.skel.bss().nr_queued as u64;
let nr_scheduled = self.skel.bss().nr_scheduled as u64;
let nr_waiting = nr_queued + nr_scheduled;
info!(
"min_vtime={} nr_queued={} nr_user_dispatched={} nr_kernel_dispatches={} nr_sched_congested={}",
self.min_vruntime, nr_queued, nr_user_dispatches, nr_kernel_dispatches, nr_sched_congested
" nr_waiting={} [nr_queued={} + nr_scheduled={}]",
nr_waiting, nr_queued, nr_scheduled
);

// Show tasks that are currently running.
info!("Running tasks:");
for cpu in 0..self.nr_cpus_online {
let pid = self.get_cpu_pid(cpu as u32);
info!("cpu={} pid={}", cpu, pid);
info!(" cpu={} pid={}", cpu, pid);
}

log::logger().flush();
Expand Down

0 comments on commit e90bc92

Please sign in to comment.