diff --git a/scheds/rust/meson.build b/scheds/rust/meson.build index 5d86f1c21..c515dd6cd 100644 --- a/scheds/rust/meson.build +++ b/scheds/rust/meson.build @@ -1,2 +1,3 @@ subdir('scx_layered') subdir('scx_rusty') +subdir('scx_rustlite') diff --git a/scheds/rust/scx_rustlite/.gitignore b/scheds/rust/scx_rustlite/.gitignore new file mode 100644 index 000000000..186dba259 --- /dev/null +++ b/scheds/rust/scx_rustlite/.gitignore @@ -0,0 +1,3 @@ +src/bpf/.output +Cargo.lock +target diff --git a/scheds/rust/scx_rustlite/Cargo.toml b/scheds/rust/scx_rustlite/Cargo.toml new file mode 100644 index 000000000..d72a2084e --- /dev/null +++ b/scheds/rust/scx_rustlite/Cargo.toml @@ -0,0 +1,27 @@ +[package] +name = "scx_rustlite" +version = "0.0.1" +authors = ["Andrea Righi ", "Canonical"] +edition = "2021" +description = "Userspace scheduling with BPF" +license = "GPL-2.0-only" + +[dependencies] +anyhow = "1.0.65" +bitvec = { version = "1.0", features = ["serde"] } +clap = { version = "4.1", features = ["derive", "env", "unicode", "wrap_help"] } +ctrlc = { version = "3.1", features = ["termination"] } +fb_procfs = "0.7.0" +hex = "0.4.3" +libbpf-rs = "0.22.0" +libc = "0.2.137" +log = "0.4.17" +ordered-float = "3.4.0" +scx_utils = { path = "../../../rust/scx_utils", version = "0.4" } +simplelog = "0.12.0" + +[build-dependencies] +scx_utils = { path = "../../../rust/scx_utils", version = "0.4" } + +[features] +enable_backtrace = [] diff --git a/scheds/rust/scx_rustlite/LICENSE b/scheds/rust/scx_rustlite/LICENSE new file mode 120000 index 000000000..5853aaea5 --- /dev/null +++ b/scheds/rust/scx_rustlite/LICENSE @@ -0,0 +1 @@ +../../../LICENSE \ No newline at end of file diff --git a/scheds/rust/scx_rustlite/build.rs b/scheds/rust/scx_rustlite/build.rs new file mode 100644 index 000000000..42e96b711 --- /dev/null +++ b/scheds/rust/scx_rustlite/build.rs @@ -0,0 +1,11 @@ +// This software may be used and distributed according to the terms of the +// GNU General Public License version 2. + +fn main() { + scx_utils::BpfBuilder::new() + .unwrap() + .enable_intf("src/bpf/intf.h", "bpf_intf.rs") + .enable_skel("src/bpf/main.bpf.c", "bpf") + .build() + .unwrap(); +} diff --git a/scheds/rust/scx_rustlite/meson.build b/scheds/rust/scx_rustlite/meson.build new file mode 100644 index 000000000..33d5d6a41 --- /dev/null +++ b/scheds/rust/scx_rustlite/meson.build @@ -0,0 +1,7 @@ +custom_target('scx_rustlite', + output: '@PLAINNAME@.__PHONY__', + input: 'Cargo.toml', + command: [cargo, 'build', '--manifest-path=@INPUT@', '--target-dir=@OUTDIR@', + cargo_build_args], + env: cargo_env, + build_by_default: true) diff --git a/scheds/rust/scx_rustlite/rustfmt.toml b/scheds/rust/scx_rustlite/rustfmt.toml new file mode 100644 index 000000000..b7258ed0a --- /dev/null +++ b/scheds/rust/scx_rustlite/rustfmt.toml @@ -0,0 +1,8 @@ +# Get help on options with `rustfmt --help=config` +# Please keep these in alphabetical order. +edition = "2021" +group_imports = "StdExternalCrate" +imports_granularity = "Item" +merge_derives = false +use_field_init_shorthand = true +version = "Two" diff --git a/scheds/rust/scx_rustlite/src/bpf/intf.h b/scheds/rust/scx_rustlite/src/bpf/intf.h new file mode 100644 index 000000000..36dae1e32 --- /dev/null +++ b/scheds/rust/scx_rustlite/src/bpf/intf.h @@ -0,0 +1,55 @@ +// This software may be used and distributed according to the terms of the +// GNU General Public License version 2. + +#ifndef __INTF_H +#define __INTF_H + +#define MAX(x, y) ((x) > (y) ? (x) : (y)) +#define MIN(x, y) ((x) < (y) ? (x) : (y)) + +#define NSEC_PER_SEC 1000000000L +#define CLOCK_BOOTTIME 7 + +#include +#ifndef __kptr +#ifdef __KERNEL__ +#error "__kptr_ref not defined in the kernel" +#endif +#define __kptr +#endif + +#ifndef __KERNEL__ +typedef unsigned char u8; +typedef unsigned int u32; +typedef int s32; +typedef unsigned long long u64; +typedef long long s64; +#endif + +/* + * Task sent to the user-space scheduler by the BPF dispatcher. + * + * All attributes are collected from the kernel by the the BPF component. + */ +struct queued_task_ctx { + s32 pid; + u64 sum_exec_runtime; /* Total cpu time */ + u64 weight; /* Task static priority */ +}; + +/* + * Task sent to the BPF dispatcher by the user-space scheduler. + * + * This structure has a payload that can be used by the user-space scheduler to + * send debugging information to the BPF dispatcher (i.e., vruntime, etc.), + * depending on the particular scheduler implementation. + * + * This struct can be easily extended to send more information to the + * dispatcher (i.e., a target CPU, a variable time slice, etc.). + */ +struct dispatched_task_ctx { + s32 pid; + u64 payload; /* Task payload */ +}; + +#endif /* __INTF_H */ diff --git a/scheds/rust/scx_rustlite/src/bpf/main.bpf.c b/scheds/rust/scx_rustlite/src/bpf/main.bpf.c new file mode 100644 index 000000000..661969191 --- /dev/null +++ b/scheds/rust/scx_rustlite/src/bpf/main.bpf.c @@ -0,0 +1,491 @@ +/* Copyright (c) Andrea Righi */ +/* + * scx_rustlite: simple user-space scheduler written in Rust + * + * The main goal of this scheduler is be an "easy to read" template that can be + * used to quickly test more complex scheduling policies. For this reason this + * scheduler is mostly focused on simplicity and code readability. + * + * The scheduler is made of a BPF component (dispatcher) that implements the + * low level sched-ext functionalities and a user-space counterpart + * (scheduler), written in Rust, that implements the actual scheduling policy. + * + * The BPF dispatcher collects total cputime and weight from the tasks that + * need to run, then it sends all details to the user-space scheduler that + * decides the best order of execution of the tasks (based on the collected + * metrics). + * + * The user-space scheduler then returns to the BPF component the list of tasks + * to be dispatched in the proper order. + * + * Messages between the BPF component and the user-space scheduler are passed + * using two BPF_MAP_TYPE_QUEUE maps: @queued for the messages sent by the BPF + * dispatcher to the user-space scheduler and @dispatched for the messages sent + * by the user-space scheduler to the BPF dispatcher. + * + * The BPF dispatcher is completely agnostic of the particular scheduling + * policy implemented in user-space. For this reason developers that are + * willing to use this scheduler to experiment scheduling policies should be + * able to simply modify the Rust component, without having to deal with any + * internal kernel / BPF details. + * + * This software may be used and distributed according to the terms of the + * GNU General Public License version 2. + */ +#include +#include "intf.h" + +char _license[] SEC("license") = "GPL"; + +/* + * Exit info (passed to the user-space counterpart). + */ +int exit_kind = SCX_EXIT_NONE; +char exit_msg[SCX_EXIT_MSG_LEN]; + +/* + * Scheduler attributes and statistics. + */ +u32 usersched_pid; /* User-space scheduler PID */ +static volatile bool usersched_needed; /* Used to wake-up the user-space scheduler */ +const volatile bool switch_partial; /* Switch all tasks or SCHED_EXT tasks */ +const volatile u64 slice_ns = SCX_SLICE_DFL; /* Base time slice duration */ + +/* Statistics */ +u64 nr_enqueues, nr_user_dispatches, nr_kernel_dispatches; +u64 nr_sched_congested, nr_tasks_running; + + /* Report additional debugging information */ +const volatile bool debug; + +/* Allow to use bpf_printk() only when @debug is set */ +#define dbg_msg(_fmt, ...) do { \ + if (debug) \ + bpf_printk(_fmt, ##__VA_ARGS__); \ +} while(0) + +/* + * Maximum amount of tasks queued between kernel and user-space at a certain + * time. + * + * The @queued and @dispatched lists are used in a producer/consumer fashion + * between the BPF part and the user-space part. + */ +#define MAX_ENQUEUED_TASKS 1024 + +/* + * The map containing tasks that are queued to user space from the kernel. + * + * This map is drained by the user space scheduler. + */ +struct { + __uint(type, BPF_MAP_TYPE_QUEUE); + __type(value, struct queued_task_ctx); + __uint(max_entries, MAX_ENQUEUED_TASKS); +} queued SEC(".maps"); + +/* + * The map containing pids that are dispatched from user space to the kernel. + * + * Drained by the kernel in .dispatch(). + */ +struct { + __uint(type, BPF_MAP_TYPE_QUEUE); + __type(value, struct dispatched_task_ctx); + __uint(max_entries, MAX_ENQUEUED_TASKS); +} dispatched SEC(".maps"); + +/* + * Per-task local storage. + * + * This contain all the per-task information used internally by the BPF code. + */ +struct task_ctx { + /* + * Set this flag to dispatch directly from .enqueueu() to the local DSQ + * of the cpu where the task is going to run (bypassing the scheduler). + * + * This can be used for example when the selected cpu is idle; in this + * case we can simply dispatch the task on the same target cpu and + * avoid unnecessary calls to the user-space scheduler. + */ + bool force_local; +}; + +/* Map that contains task-local storage. */ +struct { + __uint(type, BPF_MAP_TYPE_TASK_STORAGE); + __uint(map_flags, BPF_F_NO_PREALLOC); + __type(key, int); + __type(value, struct task_ctx); +} task_ctx_stor SEC(".maps"); + +/* + * Heartbeat timer used to periodically trigger the check to run the user-space + * scheduler. + * + * Without this timer we may starve the scheduler if the system is completely + * idle and hit the watchdog that would auto-kill this scheduler. + */ +struct usersched_timer { + struct bpf_timer timer; +}; + +struct { + __uint(type, BPF_MAP_TYPE_ARRAY); + __uint(max_entries, 1); + __type(key, u32); + __type(value, struct usersched_timer); +} usersched_timer SEC(".maps"); + +/* + * Return true if the target task @p is the user-space scheduler. + */ +static inline bool is_usersched_task(const struct task_struct *p) +{ + return p->pid == usersched_pid; +} + +/* + * Return true if the target task @p is a kernel thread. + */ +static inline bool is_kthread(const struct task_struct *p) +{ + return !!(p->flags & PF_KTHREAD); +} + +/* + * Dispatch a task on the local per-CPU FIFO. + */ +static void +dispatch_task_local(struct task_struct *p, u64 enq_flags) +{ + dbg_msg("dispatch (local): pid=%d", p->pid); + scx_bpf_dispatch(p, SCX_DSQ_LOCAL, slice_ns, enq_flags | SCX_ENQ_LOCAL); + __sync_fetch_and_add(&nr_kernel_dispatches, 1); +} + +/* + * Dispatch a task on the global FIFO. + */ +static void +dispatch_task_global(struct task_struct *p, u64 payload, u64 enq_flags) +{ + /* + * Tasks without a payload are dispatched directly by the kernel, tasks + * with payload are dispatched by the user-space scheduler. + */ + if (!payload) + dbg_msg("dispatch (global): pid=%d", p->pid); + else + dbg_msg("dispatch (user): pid=%d payload=%llu", p->pid, payload); + + scx_bpf_dispatch(p, SCX_DSQ_GLOBAL, slice_ns, enq_flags); + __sync_fetch_and_add(&nr_user_dispatches, 1); +} + +/* + * Select the target CPU where a task can be directly dispatched to from + * .enqueue(). + * + * The idea here is to try to find an idle CPU in the system, and preferably + * maintain the task on the same CPU. + * + * If the CPU where the task was running is still idle, then the task can be + * dispatched immediately on the same CPU from .enqueue(), without having to + * call the scheduler. + * + * In the future we may want to improve this part and figure out a way to move + * this logic into the user-space scheduler as well. + */ +s32 BPF_STRUCT_OPS(rustlite_select_cpu, struct task_struct *p, s32 prev_cpu, + u64 wake_flags) +{ + s32 cpu; + struct task_ctx *tctx; + + tctx = bpf_task_storage_get(&task_ctx_stor, p, 0, 0); + if (!tctx) { + scx_bpf_error("Failed to look up task-local storage for %s", p->comm); + return -ESRCH; + } + + /* + * Check if the previously used CPU is still idle, in this case we can + * dispatch directly in .enqueue() bypassing the scheduler. + */ + if (scx_bpf_test_and_clear_cpu_idle(prev_cpu)) { + tctx->force_local = true; + return prev_cpu; + } + + /* + * Otherwise try to find another idle CPU in the system, but in this + * case do not bypass the user-space scheduler (to prevent having too + * much migration overhead). + */ + cpu = scx_bpf_pick_idle_cpu(p->cpus_ptr, 0); + if (cpu >= 0) + return cpu; + + /* + * If we cannot find an idle CPU keep using the same one and pass + * through the user-space scheduler. + */ + return prev_cpu; +} + +/* + * Return true if the selected CPU for the task is immediately avaliable + * (user-space scheduler not required), false otherwise (user-space scheduler + * required). + * + * To determine if the CPU is available we rely on tctx->force_idle (set in + * .select_cpu()), since this function may be called on a different CPU (so we + * cannot check the current CPU directly). + */ +static bool is_task_cpu_available(struct task_struct *p) +{ + struct task_ctx *tctx; + + /* + * Always dispatch per-CPU kthread on the same CPU, bypassing the + * user-space scheduler (in this way we can to prioritize critical + * kernel threads that may potentially slow down the entire system if + * they are blocked for too long). + */ + if (is_kthread(p) && p->nr_cpus_allowed == 1) + return true; + + /* + * For regular tasks always rely on force_local to determine if we can + * bypass the scheduler. + */ + tctx = bpf_task_storage_get(&task_ctx_stor, p, 0, 0); + if (!tctx) { + scx_bpf_error("Failed to lookup task ctx for %s", p->comm); + return false; + } + if (tctx->force_local) { + tctx->force_local = false; + return true; + } + + return false; +} + +/* + * Fill @task with all the information that need to be sent to the user-space + * scheduler. + */ +static void +get_task_info(struct queued_task_ctx *task, const struct task_struct *p) +{ + task->pid = p->pid; + task->sum_exec_runtime = p->se.sum_exec_runtime; + task->weight = p->scx.weight; +} + +/* + * Task @p becomes ready to run. + */ +void BPF_STRUCT_OPS(rustlite_enqueue, struct task_struct *p, u64 enq_flags) +{ + struct queued_task_ctx task; + + /* + * Scheduler is dispatched directly in .dispatch() when needed, so + * we can skip it here. + */ + if (is_usersched_task(p)) + return; + + /* + * Dispatch the task on the local FIFO directly if the selected task's + * CPU is available (no scheduling decision required). + */ + if (is_task_cpu_available(p)) { + dispatch_task_local(p, enq_flags); + return; + } + + /* + * Other tasks can be added to the @queued list and they will be + * processed by the user-space scheduler. + * + * If @queued list is full (user-space scheduler is congested) tasks + * will be dispatched directly from the kernel to the global FIFO. + */ + get_task_info(&task, p); + dbg_msg("enqueue: pid=%d", task.pid); + if (bpf_map_push_elem(&queued, &task, 0)) { + dbg_msg("scheduler congested: pid=%d", task.pid); + __sync_fetch_and_add(&nr_sched_congested, 1); + dispatch_task_global(p, 0, enq_flags); + return; + } + __sync_fetch_and_add(&nr_enqueues, 1); + + /* + * Task was sent to user-space correctly, now we can wake-up the + * user-space scheduler. + */ + usersched_needed = true; +} + +/* + * Dispatch the user-space scheduler. + */ +static void dispatch_user_scheduler(void) +{ + struct task_struct *p; + + if (!usersched_needed) + return; + usersched_needed = false; + + p = bpf_task_from_pid(usersched_pid); + if (!p) { + scx_bpf_error("Failed to find usersched task %d", usersched_pid); + return; + } + dispatch_task_global(p, 0, 0); + bpf_task_release(p); +} + +/* + * Dispatch tasks that are ready to run. + */ +void BPF_STRUCT_OPS(rustlite_dispatch, s32 cpu, struct task_struct *prev) +{ + /* Check if the user-space scheduler needs to run */ + dispatch_user_scheduler(); + + /* + * Consume all tasks from the @dispatched list and immediately dispatch + * them to the global FIFO (the proper ordering has been already + * determined by the user-space scheduler). + */ + bpf_repeat(MAX_ENQUEUED_TASKS) { + struct task_struct *p; + struct dispatched_task_ctx task; + + if (!scx_bpf_dispatch_nr_slots()) + break; + if (bpf_map_pop_elem(&dispatched, &task)) + break; + p = bpf_task_from_pid(task.pid); + if (!p) + continue; + dispatch_task_global(p, task.payload, 0); + bpf_task_release(p); + } +} + +/* Task @p starts on a CPU */ +void BPF_STRUCT_OPS(rustlite_running, struct task_struct *p) +{ + dbg_msg("start: pid=%d (%s)", p->pid, p->comm); + __sync_fetch_and_add(&nr_tasks_running, 1); +} + +/* Task @p releases a CPU */ +void BPF_STRUCT_OPS(rustlite_stopping, struct task_struct *p, bool runnable) +{ + dbg_msg("stop: pid=%d (%s)", p->pid, p->comm); + __sync_fetch_and_sub(&nr_tasks_running, 1); +} + +/* Task @p is created */ +s32 BPF_STRUCT_OPS(rustlite_prep_enable, struct task_struct *p, + struct scx_enable_args *args) +{ + /* Allocate task's local storage */ + if (bpf_task_storage_get(&task_ctx_stor, p, 0, + BPF_LOCAL_STORAGE_GET_F_CREATE)) + return 0; + else + return -ENOMEM; +} + +/* + * Heartbeat scheduler timer callback. + */ +static int usersched_timer_fn(void *map, int *key, struct bpf_timer *timer) +{ + int err = 0; + + /* Kick the scheduler */ + usersched_needed = true; + + /* Re-arm the timer */ + err = bpf_timer_start(timer, NSEC_PER_SEC, 0); + if (err) + scx_bpf_error("Failed to arm stats timer"); + + return 0; +} + +/* + * Initialize the heartbeat scheduler timer. + */ +static int usersched_timer_init(void) +{ + struct bpf_timer *timer; + u32 key = 0; + int err; + + timer = bpf_map_lookup_elem(&usersched_timer, &key); + if (!timer) { + scx_bpf_error("Failed to lookup scheduler timer"); + return -ESRCH; + } + bpf_timer_init(timer, &usersched_timer, CLOCK_BOOTTIME); + bpf_timer_set_callback(timer, usersched_timer_fn); + err = bpf_timer_start(timer, NSEC_PER_SEC, 0); + if (err) + scx_bpf_error("Failed to arm scheduler timer"); + + return err; +} + +/* + * Initialize the scheduling class. + */ +s32 BPF_STRUCT_OPS_SLEEPABLE(rustlite_init) +{ + int err; + + err = usersched_timer_init(); + if (err) + return err; + if (!switch_partial) + scx_bpf_switch_all(); + return 0; +} + +/* + * Unregister the scheduling class. + */ +void BPF_STRUCT_OPS(rustlite_exit, struct scx_exit_info *ei) +{ + bpf_probe_read_kernel_str(exit_msg, sizeof(exit_msg), ei->msg); + exit_kind = ei->kind; +} + +/* + * Scheduling class declaration. + */ +SEC(".struct_ops.link") +struct sched_ext_ops rustlite = { + .select_cpu = (void *)rustlite_select_cpu, + .enqueue = (void *)rustlite_enqueue, + .dispatch = (void *)rustlite_dispatch, + .running = (void *)rustlite_running, + .stopping = (void *)rustlite_stopping, + .prep_enable = (void *)rustlite_prep_enable, + .init = (void *)rustlite_init, + .exit = (void *)rustlite_exit, + .timeout_ms = 5000, + .name = "rustlite", +}; diff --git a/scheds/rust/scx_rustlite/src/bpf_intf.rs b/scheds/rust/scx_rustlite/src/bpf_intf.rs new file mode 100644 index 000000000..9db020efd --- /dev/null +++ b/scheds/rust/scx_rustlite/src/bpf_intf.rs @@ -0,0 +1,9 @@ +// This software may be used and distributed according to the terms of the +// GNU General Public License version 2. + +#![allow(non_upper_case_globals)] +#![allow(non_camel_case_types)] +#![allow(non_snake_case)] +#![allow(dead_code)] + +include!(concat!(env!("OUT_DIR"), "/bpf_intf.rs")); diff --git a/scheds/rust/scx_rustlite/src/bpf_skel.rs b/scheds/rust/scx_rustlite/src/bpf_skel.rs new file mode 100644 index 000000000..c42af33d6 --- /dev/null +++ b/scheds/rust/scx_rustlite/src/bpf_skel.rs @@ -0,0 +1,4 @@ +// This software may be used and distributed according to the terms of the +// GNU General Public License version 2. + +include!(concat!(env!("OUT_DIR"), "/bpf_skel.rs")); diff --git a/scheds/rust/scx_rustlite/src/main.rs b/scheds/rust/scx_rustlite/src/main.rs new file mode 100644 index 000000000..a309a0ebb --- /dev/null +++ b/scheds/rust/scx_rustlite/src/main.rs @@ -0,0 +1,490 @@ +// Copyright (c) Andrea Righi + +// This software may be used and distributed according to the terms of the +// GNU General Public License version 2. +mod bpf_skel; +pub use bpf_skel::*; +pub mod bpf_intf; + +use std::thread; + +use std::collections::BTreeSet; +use std::collections::HashMap; + +use std::ffi::CStr; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; +use std::sync::Arc; +use std::time::{Duration, SystemTime}; + +use anyhow::bail; +use anyhow::Context; +use anyhow::Result; +use clap::Parser; +use libbpf_rs::skel::OpenSkel as _; +use libbpf_rs::skel::Skel as _; +use libbpf_rs::skel::SkelBuilder as _; +use log::info; +use log::warn; + +use libc::{sched_param, sched_setscheduler}; + +const SCHEDULER_NAME: &'static str = "RustLite"; + +// Defined in UAPI +const SCHED_EXT: i32 = 7; + +/// scx_rustlite: simple user-space scheduler written in Rust +/// +/// The main goal of this scheduler is be an "easy to read" template that can be used to quickly +/// test more complex scheduling policies. For this reason this scheduler is mostly focused on +/// simplicity and code readability. +/// +/// The scheduler is made of a BPF component (dispatcher) that implements the low level sched-ext +/// functionalities and a user-space counterpart (scheduler), written in Rust, that implements the +/// actual scheduling policy. +/// +/// The default scheduling policy implemented in the user-space scheduler is a based on virtual +/// runtime (vruntime): +/// +/// - each task receives the same time slice of execution (slice_ns) +/// +/// - the actual execution time, adjusted based on the task's static priority (weight), determines +/// the vruntime +/// +/// - tasks are then dispatched from the lowest to the highest vruntime +/// +/// All the tasks are stored in a BTreeSet (TaskTree), using vruntime as the ordering key. +/// Once the order of execution is determined all tasks are sent back to the BPF counterpart to be +/// dispatched. To keep track of the accumulated cputime and vruntime the scheduler maintain a +/// HashMap (TaskInfoMap) indexed by pid. +/// +/// The BPF dispatcher is completely agnostic of the particular scheduling policy implemented in +/// user-space. For this reason developers that are willing to use this scheduler to experiment +/// scheduling policies should be able to simply modify the Rust component, without having to deal +/// with any internal kernel / BPF details. +/// +#[derive(Debug, Parser)] +struct Opts { + /// Scheduling slice duration in microseconds. + #[clap(short = 's', long, default_value = "20000")] + slice_us: u64, + + /// If specified, only tasks which have their scheduling policy set to + /// SCHED_EXT using sched_setscheduler(2) are switched. Otherwise, all + /// tasks are switched. + #[clap(short = 'p', long, action = clap::ArgAction::SetTrue)] + partial: bool, + + /// If specified, all the BPF scheduling events will be reported in + /// debugfs (e.g., /sys/kernel/debug/tracing/trace_pipe). + #[clap(short = 'd', long, action = clap::ArgAction::SetTrue)] + debug: bool, +} + +// Message received from the dispatcher (see bpf_intf::queued_task_ctx for details). +// +// NOTE: eventually libbpf-rs will provide a better abstraction for this. +struct EnqueuedMessage { + inner: bpf_intf::queued_task_ctx, +} + +impl EnqueuedMessage { + fn from_bytes(bytes: &[u8]) -> Self { + let queued_task_struct = unsafe { *(bytes.as_ptr() as *const bpf_intf::queued_task_ctx) }; + EnqueuedMessage { + inner: queued_task_struct, + } + } + + fn as_queued_task_ctx(&self) -> bpf_intf::queued_task_ctx { + self.inner + } +} + +// Message sent to the dispatcher (see bpf_intf::dispatched_task_ctx for details). +// +// NOTE: eventually libbpf-rs will provide a better abstraction for this. +struct DispatchedMessage { + inner: bpf_intf::dispatched_task_ctx, +} + +impl DispatchedMessage { + fn from_task(task: &Task) -> Self { + let dispatched_task_struct = bpf_intf::dispatched_task_ctx { + pid: task.pid, + payload: task.vruntime, + }; + DispatchedMessage { + inner: dispatched_task_struct, + } + } + + fn as_bytes(&self) -> &[u8] { + let size = std::mem::size_of::(); + let ptr = &self.inner as *const _ as *const u8; + + unsafe { std::slice::from_raw_parts(ptr, size) } + } +} + +// Basic item stored in the task information map. +#[derive(Debug)] +struct TaskInfo { + sum_exec_runtime: u64, // total cpu time used by the task + vruntime: u64, // total vruntime of the task +} + +// Task information map: store total execution time and vruntime of each task in the system. +// +// TaskInfo objects are stored in the HashMap and they are indexed by pid. +// +// TODO: provide some hooks for .disable() in the BPF part to clean up entries once the task exits +// (or provide a garbage collector to free up the items that are not needed anymore). +struct TaskInfoMap { + tasks: HashMap, +} + +// TaskInfoMap implementation: provide methods to get items and update items by pid. +impl TaskInfoMap { + fn new() -> Self { + TaskInfoMap { + tasks: HashMap::new(), + } + } + + // Get an item (as mutable) from the HashMap (by pid) + fn get_mut(&mut self, pid: i32) -> Option<&mut TaskInfo> { + self.tasks.get_mut(&pid) + } + + // Add or update an item in the HashMap (by pid), if the pid is already present the item will + // be replaced (updated) + fn insert(&mut self, pid: i32, task: TaskInfo) { + self.tasks.insert(pid, task); + } +} + +// Basic task item stored in the task pool. +#[derive(Debug, PartialEq, Eq, PartialOrd)] +struct Task { + pid: i32, // pid that uniquely identifies a task + vruntime: u64, // total vruntime (that determines the order how tasks are dispatched) +} + +// Make sure tasks are ordered by vruntime, if multiple tasks have the same vruntime order by pid. +impl Ord for Task { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.vruntime + .cmp(&other.vruntime) + .then_with(|| self.pid.cmp(&other.pid)) + } +} + +// Task pool where all the tasks that needs to run are stored before dispatching +// (ordered by vruntime using a BTreeSet). +struct TaskTree { + tasks: BTreeSet, +} + +// Task pool methods (push / pop). +impl TaskTree { + fn new() -> Self { + TaskTree { + tasks: BTreeSet::new(), + } + } + + // Add an item to the pool (item will be placed in the tree depending on its vruntime, items + // with the same vruntime will be sorted by pid). + fn push(&mut self, pid: i32, vruntime: u64) { + let task = Task { pid, vruntime }; + self.tasks.insert(task); + } + + // Pop the first item from the BTreeSet (item with the smallest vruntime). + fn pop(&mut self) -> Option { + self.tasks.pop_first() + } +} + +// Main scheduler object +struct Scheduler<'a> { + skel: BpfSkel<'a>, // BPF connector + task_pool: TaskTree, // tasks ordered by vruntime + task_map: TaskInfoMap, // map pids to the corresponding task information + min_vruntime: u64, // Keep track of the minimum vruntime across all tasks + nr_cpus_online: u64, // Amount of the available CPUs in the system + struct_ops: Option, +} + +impl<'a> Scheduler<'a> { + fn init(opts: &Opts) -> Result { + // Open the BPF prog first for verification. + let skel_builder = BpfSkelBuilder::default(); + let mut skel = skel_builder.open().context("Failed to open BPF program")?; + let pid = std::process::id(); + + // Scheduler task pool to sort tasks by vruntime. + let task_pool = TaskTree::new(); + + // Scheduler task map to store tasks information. + let task_map = TaskInfoMap::new(); + + // Initialize global minimum vruntime. + let min_vruntime: u64 = 0; + + // Initialize online CPUs counter. + // + // We should probably refresh this counter during the normal execution to support cpu + // hotplugging, but for now let's keep it simple and set this only at initialization). + let nr_cpus_online = libbpf_rs::num_possible_cpus().unwrap() as u64; + + // Set scheduler options (defined in the BPF part). + skel.bss_mut().usersched_pid = pid; + skel.rodata_mut().slice_ns = opts.slice_us * 1000; + skel.rodata_mut().switch_partial = opts.partial; + skel.rodata_mut().debug = opts.debug; + + // Attach BPF scheduler. + let mut skel = skel.load().context("Failed to load BPF program")?; + skel.attach().context("Failed to attach BPF program")?; + let struct_ops = Some( + skel.maps_mut() + .rustlite() + .attach_struct_ops() + .context("Failed to attach struct ops")?, + ); + info!("{} scheduler attached", SCHEDULER_NAME); + + // Return scheduler object. + Ok(Self { + skel, + task_pool, + task_map, + min_vruntime, + nr_cpus_online, + struct_ops, + }) + } + + // Read exit code from the BPF part. + fn read_bpf_exit_kind(&mut self) -> i32 { + unsafe { std::ptr::read_volatile(&self.skel.bss().exit_kind as *const _) } + } + + // Called on exit to get exit code and exit message from the BPF part. + fn report_bpf_exit_kind(&mut self) -> Result<()> { + match self.read_bpf_exit_kind() { + 0 => Ok(()), + etype => { + let cstr = unsafe { CStr::from_ptr(self.skel.bss().exit_msg.as_ptr() as *const _) }; + let msg = cstr + .to_str() + .context("Failed to convert exit msg to string") + .unwrap(); + info!("BPF exit_kind={} msg={}", etype, msg); + Ok(()) + } + } + } + + // Update task's vruntime based on the information collected from the kernel part. + fn update_enqueued( + task_info: &mut TaskInfo, + sum_exec_runtime: u64, + weight: u64, + min_vruntime: u64, + ) { + // Add cputime delta normalized by weight to the vruntime (if delta > 0). + if sum_exec_runtime > task_info.sum_exec_runtime { + let delta = (sum_exec_runtime - task_info.sum_exec_runtime) / weight; + task_info.vruntime += delta; + } + // Make sure vruntime is moving forward (> current minimum). + if min_vruntime > task_info.vruntime { + task_info.vruntime = min_vruntime; + } + // Update total task cputime. + task_info.sum_exec_runtime = sum_exec_runtime; + } + + // Drain all the tasks from the queued list, update their vruntime (Self::update_enqueued()), + // then push them all to the task pool (doing so will sort them by their vruntime). + fn drain_queued_tasks(&mut self) { + let maps = self.skel.maps(); + let queued = maps.queued(); + + loop { + match queued.lookup_and_delete(&[]) { + Ok(Some(msg)) => { + // Schedule the task and update task information. + let task = EnqueuedMessage::from_bytes(msg.as_slice()).as_queued_task_ctx(); + if let Some(task_info) = self.task_map.get_mut(task.pid) { + Self::update_enqueued( + task_info, + task.sum_exec_runtime, + task.weight, + self.min_vruntime, + ); + self.task_pool.push(task.pid, task_info.vruntime); + } else { + let task_info = TaskInfo { + sum_exec_runtime: task.sum_exec_runtime, + vruntime: self.min_vruntime, + }; + self.task_map.insert(task.pid, task_info); + self.task_pool.push(task.pid, self.min_vruntime); + } + } + Ok(None) => break, + Err(err) => { + warn!("Error: {}", err); + break; + } + } + } + } + + // Dispatch tasks from the task pool in order (sending them to the BPF dispatcher). + fn dispatch_tasks(&mut self) { + let maps = self.skel.maps(); + let dispatched = maps.dispatched(); + + loop { + match self.task_pool.pop() { + Some(task) => { + // Update global minimum vruntime. + self.min_vruntime = task.vruntime; + + // Send task to the dispatcher. + let msg = DispatchedMessage::from_task(&task); + match dispatched.update(&[], msg.as_bytes(), libbpf_rs::MapFlags::ANY) { + Ok(_) => {} + Err(_) => { + /* + * Re-add the task to the dispatched list in case of failure and stop + * dispatching. + */ + self.task_pool.push(task.pid, task.vruntime); + break; + } + } + } + None => break, + } + } + } + + // Check if there's at least a CPU that can accept tasks. + fn is_dispatcher_needed(&self) -> bool { + let nr_tasks_running = self.skel.bss().nr_tasks_running as u64; + return nr_tasks_running < self.nr_cpus_online; + } + + // Main scheduling function (called in a loop to periodically drain tasks from the queued list + // and dispatch them to the BPF part via the dispatched list). + fn schedule(&mut self) { + self.drain_queued_tasks(); + // Instead of immediately dispatching all the tasks check if there is at least an idle CPU. + // This logic can be improved, because in this way we are going to add more scheduling + // overhead when the system is already overloaded (no idle CPUs). + // + // Probably a better solution could be to have a reasonable batch size (i.e., as a function + // of the CPUs and slice duration) and dispatch up to a maximum of BATCH_SIZE tasks each + // time. + if self.is_dispatcher_needed() { + self.dispatch_tasks(); + } + + // Yield to avoid using too much CPU from the scheduler itself. + thread::yield_now(); + } + + // Print internal scheduler statistics (fetched from the BPF part) + fn print_stats(&mut self) { + let nr_enqueues = self.skel.bss().nr_enqueues as u64; + let nr_user_dispatches = self.skel.bss().nr_user_dispatches as u64; + let nr_kernel_dispatches = self.skel.bss().nr_kernel_dispatches as u64; + let nr_sched_congested = self.skel.bss().nr_sched_congested as u64; + let nr_tasks_running = self.skel.bss().nr_tasks_running as u64; + + info!( + "min_vtime={} nr_tasks_running={} nr_enqueues={} nr_user_dispatched={} nr_kernel_dispatches={} nr_sched_congested={}", + self.min_vruntime, nr_tasks_running, nr_enqueues, nr_user_dispatches, nr_kernel_dispatches, nr_sched_congested + ); + log::logger().flush(); + } + + fn run(&mut self, shutdown: Arc) -> Result<()> { + let mut prev_ts = SystemTime::now(); + + while !shutdown.load(Ordering::Relaxed) && self.read_bpf_exit_kind() == 0 { + let curr_ts = SystemTime::now(); + let elapsed = curr_ts + .duration_since(prev_ts) + .unwrap_or_else(|_| Duration::from_secs(0)); + + self.schedule(); + + // Print scheduler statistics every second. + if elapsed > Duration::from_secs(1) { + self.print_stats(); + prev_ts = curr_ts; + } + } + self.report_bpf_exit_kind() + } +} + +impl<'a> Drop for Scheduler<'a> { + fn drop(&mut self) { + if let Some(struct_ops) = self.struct_ops.take() { + drop(struct_ops); + } + info!("Unregister {} scheduler", SCHEDULER_NAME); + } +} + +// Set scheduling class for the scheduler itself to SCHED_EXT +fn use_sched_ext() -> i32 { + let pid = std::process::id(); + let param: sched_param = sched_param { sched_priority: 0 }; + let res = unsafe { sched_setscheduler(pid as i32, SCHED_EXT, ¶m as *const sched_param) }; + res +} + +fn main() -> Result<()> { + let opts = Opts::parse(); + + let loglevel = simplelog::LevelFilter::Info; + + let mut lcfg = simplelog::ConfigBuilder::new(); + lcfg.set_time_level(simplelog::LevelFilter::Error) + .set_location_level(simplelog::LevelFilter::Off) + .set_target_level(simplelog::LevelFilter::Off) + .set_thread_level(simplelog::LevelFilter::Off); + simplelog::TermLogger::init( + loglevel, + lcfg.build(), + simplelog::TerminalMode::Stderr, + simplelog::ColorChoice::Auto, + )?; + + // Make sure to use the SCHED_EXT class at least for the scheduler itself. + let res = use_sched_ext(); + if res != 0 { + bail!("Failed to all sched_setscheduler: {}", res); + } + + let mut sched = Scheduler::init(&opts)?; + let shutdown = Arc::new(AtomicBool::new(false)); + let shutdown_clone = shutdown.clone(); + ctrlc::set_handler(move || { + shutdown_clone.store(true, Ordering::Relaxed); + }) + .context("Error setting Ctrl-C handler")?; + + // Start the scheduler. + sched.run(shutdown) +}