diff --git a/.github/workflows/actions/setup-musl/action.yml b/.github/workflows/actions/setup-musl/action.yml index b8aee3661a..67062e057a 100644 --- a/.github/workflows/actions/setup-musl/action.yml +++ b/.github/workflows/actions/setup-musl/action.yml @@ -11,7 +11,7 @@ runs: steps: - name: Cache musl id: cache-musl - uses: actions/cache/restore@v3 + uses: actions/cache/restore@v4 with: path: ${{ inputs.arch }}-linux-musl-cross key: ${{ inputs.arch }}-linux-musl-cross @@ -22,7 +22,7 @@ runs: MUSL_PATH=${{ inputs.arch }}-linux-musl-cross wget https://musl.cc/${MUSL_PATH}.tgz tar -xf ${MUSL_PATH}.tgz - - uses: actions/cache/save@v3 + - uses: actions/cache/save@v4 if: steps.cache-musl.outputs.cache-hit != 'true' with: path: ${{ inputs.arch }}-linux-musl-cross diff --git a/.github/workflows/actions/setup-qemu/action.yml b/.github/workflows/actions/setup-qemu/action.yml index 0f8bc4d1ca..819fb08797 100644 --- a/.github/workflows/actions/setup-qemu/action.yml +++ b/.github/workflows/actions/setup-qemu/action.yml @@ -11,7 +11,7 @@ runs: steps: - name: Cache QEMU id: cache-qemu - uses: actions/cache/restore@v3 + uses: actions/cache/restore@v4 with: path: qemu_build key: qemu-${{ inputs.qemu-version }}-slirp-1 @@ -22,13 +22,13 @@ runs: PREFIX: ${{ github.workspace }}/qemu_build shell: bash run: | - sudo apt-get update && sudo apt-get install -y ninja-build libslirp-dev + sudo apt-get update && sudo apt-get install -y ninja-build libslirp-dev libglib2.0-dev wget https://download.qemu.org/$QEMU_PATH.tar.xz && tar -xJf $QEMU_PATH.tar.xz cd $QEMU_PATH \ && ./configure --prefix=$PREFIX --target-list=x86_64-softmmu,riscv64-softmmu,aarch64-softmmu --enable-slirp \ && make -j > /dev/null 2>&1 \ && make install - - uses: actions/cache/save@v3 + - uses: actions/cache/save@v4 if: steps.cache-qemu.outputs.cache-hit != 'true' with: path: qemu_build diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index d3c1de36f1..59b7bbab21 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -5,7 +5,7 @@ on: [push, pull_request] env: qemu-version: 8.2.0 rust-toolchain: nightly-2024-05-02 - arceos-apps: '68054e8' + arceos-apps: '57d495a' jobs: unit-test: diff --git a/Cargo.lock b/Cargo.lock index eb9a02423d..6f71e698e1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -511,6 +511,7 @@ dependencies = [ "axhal", "axtask", "cfg-if", + "cpumask", "crate_interface", "kernel_guard", "kspin", @@ -696,6 +697,14 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f8f80099a98041a3d1622845c271458a2d73e688351bf3cb999266764b81d48" +[[package]] +name = "cpumask" +version = "0.1.0" +source = "git+https://github.com/arceos-org/cpumask.git#5b61f0136d140a529ba2ab609e47beb41dc4f1c4" +dependencies = [ + "bitmaps", +] + [[package]] name = "crate_interface" version = "0.1.3" diff --git a/README.md b/README.md index e0a1e52189..c936cb3ede 100644 --- a/README.md +++ b/README.md @@ -19,7 +19,7 @@ ArceOS was inspired a lot by [Unikraft](https://github.com/unikraft/unikraft). * [x] VirtIO net/blk/gpu drivers * [x] TCP/UDP net stack using [smoltcp](https://github.com/smoltcp-rs/smoltcp) * [x] Synchronization/Mutex -* [x] SMP scheduling with single run queue +* [x] SMP scheduling with [per-cpu run queue](https://github.com/arceos-org/arceos/discussions/181) * [x] File system * [ ] Compatible with Linux apps * [ ] Interrupt driven device I/O diff --git a/api/axfeat/Cargo.toml b/api/axfeat/Cargo.toml index 6e246b36df..412fdb8027 100644 --- a/api/axfeat/Cargo.toml +++ b/api/axfeat/Cargo.toml @@ -13,7 +13,7 @@ documentation = "https://arceos-org.github.io/arceos/axfeat/index.html" default = [] # Multicore -smp = ["axhal/smp", "axruntime/smp", "kspin/smp"] +smp = ["axhal/smp", "axruntime/smp", "axtask?/smp", "kspin/smp"] # Floating point/SIMD fp_simd = ["axhal/fp_simd"] diff --git a/doc/README.md b/doc/README.md index b3f7d2d5b1..4029880b5a 100644 --- a/doc/README.md +++ b/doc/README.md @@ -38,3 +38,10 @@ See [arceos-apps](https://github.com/arceos-org/arceos-apps) for example applica Documentation of ArceOS [modules](../modules), [api](../api), and [ulib](../ulib) are generated by [`rustdoc`](https://doc.rust-lang.org/rustdoc/what-is-rustdoc.html) and hosted on GitHub Pages: * https://arceos-org.github.io/arceos/ + +## Discussions + +* [Rust std support](https://github.com/arceos-org/arceos/discussions/92) +* [ArceOS for ARM64](https://github.com/arceos-org/arceos/discussions/101) +* [ArceOS for RISCV Hardware](https://github.com/arceos-org/arceos/discussions/120) +* [Per-CPU scheduling](https://github.com/arceos-org/arceos/discussions/181) diff --git a/modules/axhal/src/arch/x86_64/mod.rs b/modules/axhal/src/arch/x86_64/mod.rs index f19469b726..c9002a58d3 100644 --- a/modules/axhal/src/arch/x86_64/mod.rs +++ b/modules/axhal/src/arch/x86_64/mod.rs @@ -19,6 +19,10 @@ pub use x86_64::structures::tss::TaskStateSegment; /// Allows the current CPU to respond to interrupts. #[inline] pub fn enable_irqs() { + #[cfg(not(target_os = "none"))] + { + warn!("enable_irqs: not implemented"); + } #[cfg(target_os = "none")] interrupts::enable() } @@ -26,6 +30,10 @@ pub fn enable_irqs() { /// Makes the current CPU to ignore interrupts. #[inline] pub fn disable_irqs() { + #[cfg(not(target_os = "none"))] + { + warn!("disable_irqs: not implemented"); + } #[cfg(target_os = "none")] interrupts::disable() } diff --git a/modules/axruntime/Cargo.toml b/modules/axruntime/Cargo.toml index b904fe6b89..c26237b718 100644 --- a/modules/axruntime/Cargo.toml +++ b/modules/axruntime/Cargo.toml @@ -12,7 +12,7 @@ documentation = "https://arceos-org.github.io/arceos/axruntime/index.html" [features] default = [] -smp = ["axhal/smp"] +smp = ["axhal/smp", "axtask?/smp"] irq = ["axhal/irq", "axtask?/irq", "percpu", "kernel_guard"] tls = ["axhal/tls", "axtask?/tls"] alloc = ["axalloc"] diff --git a/modules/axtask/Cargo.toml b/modules/axtask/Cargo.toml index 25fe7fb371..daa83d9d3f 100644 --- a/modules/axtask/Cargo.toml +++ b/modules/axtask/Cargo.toml @@ -13,12 +13,21 @@ documentation = "https://arceos-org.github.io/arceos/axtask/index.html" default = [] multitask = [ - "dep:axconfig", "dep:percpu", "dep:kspin", "dep:lazyinit", "dep:memory_addr", - "dep:scheduler", "dep:timer_list", "kernel_guard", "dep:crate_interface", + "dep:axconfig", + "dep:percpu", + "dep:kspin", + "dep:lazyinit", + "dep:memory_addr", + "dep:scheduler", + "dep:timer_list", + "kernel_guard", + "dep:crate_interface", + "dep:cpumask", ] irq = [] tls = ["axhal/tls"] preempt = ["irq", "percpu?/preempt", "kernel_guard/preempt"] +smp = ["kspin/smp"] sched_fifo = ["multitask"] sched_rr = ["multitask", "preempt"] @@ -39,6 +48,7 @@ timer_list = { version = "0.1", optional = true } kernel_guard = { version = "0.1", optional = true } crate_interface = { version = "0.1", optional = true } scheduler = { git = "https://github.com/arceos-org/scheduler.git", tag = "v0.1.0", optional = true } +cpumask = { git = "https://github.com/arceos-org/cpumask.git", optional = true } [dev-dependencies] rand = "0.8" diff --git a/modules/axtask/src/api.rs b/modules/axtask/src/api.rs index 156ba9d3b6..b3278b9595 100644 --- a/modules/axtask/src/api.rs +++ b/modules/axtask/src/api.rs @@ -2,7 +2,9 @@ use alloc::{string::String, sync::Arc}; -pub(crate) use crate::run_queue::{AxRunQueue, RUN_QUEUE}; +use kernel_guard::NoPreemptIrqSave; + +pub(crate) use crate::run_queue::{current_run_queue, select_run_queue}; #[doc(cfg(feature = "multitask"))] pub use crate::task::{CurrentTask, TaskId, TaskInner}; @@ -14,6 +16,9 @@ pub use crate::wait_queue::WaitQueue; /// The reference type of a task. pub type AxTaskRef = Arc; +/// The wrapper type for cpumask::CpuMask with SMP configuration. +pub type CpuMask = cpumask::CpuMask<{ axconfig::SMP }>; + cfg_if::cfg_if! { if #[cfg(feature = "sched_rr")] { const MAX_TIME_SLICE: usize = 5; @@ -77,6 +82,8 @@ pub fn init_scheduler() { /// Initializes the task scheduler for secondary CPUs. pub fn init_scheduler_secondary() { crate::run_queue::init_secondary(); + #[cfg(feature = "irq")] + crate::timers::init(); } /// Handles periodic timer ticks for the task manager. @@ -85,14 +92,17 @@ pub fn init_scheduler_secondary() { #[cfg(feature = "irq")] #[doc(cfg(feature = "irq"))] pub fn on_timer_tick() { + use kernel_guard::NoOp; crate::timers::check_events(); - RUN_QUEUE.lock().scheduler_timer_tick(); + // Since irq and preemption are both disabled here, + // we can get current run queue with the default `kernel_guard::NoOp`. + current_run_queue::().scheduler_timer_tick(); } /// Adds the given task to the run queue, returns the task reference. pub fn spawn_task(task: TaskInner) -> AxTaskRef { let task_ref = task.into_arc(); - RUN_QUEUE.lock().add_task(task_ref.clone()); + select_run_queue::(&task_ref).add_task(task_ref.clone()); task_ref } @@ -129,13 +139,13 @@ where /// /// [CFS]: https://en.wikipedia.org/wiki/Completely_Fair_Scheduler pub fn set_priority(prio: isize) -> bool { - RUN_QUEUE.lock().set_current_priority(prio) + current_run_queue::().set_current_priority(prio) } /// Current task gives up the CPU time voluntarily, and switches to another /// ready task. pub fn yield_now() { - RUN_QUEUE.lock().yield_current(); + current_run_queue::().yield_current() } /// Current task is going to sleep for the given duration. @@ -150,14 +160,14 @@ pub fn sleep(dur: core::time::Duration) { /// If the feature `irq` is not enabled, it uses busy-wait instead. pub fn sleep_until(deadline: axhal::time::TimeValue) { #[cfg(feature = "irq")] - RUN_QUEUE.lock().sleep_until(deadline); + current_run_queue::().sleep_until(deadline); #[cfg(not(feature = "irq"))] axhal::time::busy_wait_until(deadline); } /// Exits the current task. pub fn exit(exit_code: i32) -> ! { - RUN_QUEUE.lock().exit_current(exit_code) + current_run_queue::().exit_current(exit_code) } /// The idle task routine. diff --git a/modules/axtask/src/lib.rs b/modules/axtask/src/lib.rs index 64702634f5..489ac0a95a 100644 --- a/modules/axtask/src/lib.rs +++ b/modules/axtask/src/lib.rs @@ -42,6 +42,7 @@ cfg_if::cfg_if! { extern crate log; extern crate alloc; + #[macro_use] mod run_queue; mod task; mod task_ext; diff --git a/modules/axtask/src/run_queue.rs b/modules/axtask/src/run_queue.rs index 1592b7637e..9eb53ed72e 100644 --- a/modules/axtask/src/run_queue.rs +++ b/modules/axtask/src/run_queue.rs @@ -1,45 +1,205 @@ use alloc::collections::VecDeque; use alloc::sync::Arc; -use kspin::SpinNoIrq; +use core::mem::MaybeUninit; + +use kernel_guard::BaseGuard; +use kspin::SpinRaw; use lazyinit::LazyInit; use scheduler::BaseScheduler; +use axhal::cpu::this_cpu_id; + use crate::task::{CurrentTask, TaskState}; -use crate::{AxTaskRef, Scheduler, TaskInner, WaitQueue}; +use crate::wait_queue::WaitQueueGuard; +use crate::{AxTaskRef, CpuMask, Scheduler, TaskInner, WaitQueue}; + +macro_rules! percpu_static { + ($($name:ident: $ty:ty = $init:expr),* $(,)?) => { + $( + #[percpu::def_percpu] + static $name: $ty = $init; + )* + }; +} + +percpu_static! { + RUN_QUEUE: LazyInit = LazyInit::new(), + EXITED_TASKS: VecDeque = VecDeque::new(), + WAIT_FOR_EXIT: WaitQueue = WaitQueue::new(), + IDLE_TASK: LazyInit = LazyInit::new(), +} + +/// An array of references to run queues, one for each CPU, indexed by cpu_id. +/// +/// This static variable holds references to the run queues for each CPU in the system. +/// +/// # Safety +/// +/// Access to this variable is marked as `unsafe` because it contains `MaybeUninit` references, +/// which require careful handling to avoid undefined behavior. The array should be fully +/// initialized before being accessed to ensure safe usage. +static mut RUN_QUEUES: [MaybeUninit<&'static mut AxRunQueue>; axconfig::SMP] = + [ARRAY_REPEAT_VALUE; axconfig::SMP]; +const ARRAY_REPEAT_VALUE: MaybeUninit<&'static mut AxRunQueue> = MaybeUninit::uninit(); + +/// Returns a reference to the current run queue. +/// +/// ## Safety +/// +/// This function returns a static reference to the current run queue, which +/// is inherently unsafe. It assumes that the `RUN_QUEUE` has been properly +/// initialized and is not accessed concurrently in a way that could cause +/// data races or undefined behavior. +/// +/// ## Returns +/// +/// A static reference to the current run queue. +#[inline(always)] +pub(crate) fn current_run_queue() -> AxRunQueueRef<'static, G> { + let irq_state = G::acquire(); + AxRunQueueRef { + inner: unsafe { RUN_QUEUE.current_ref_mut_raw() }, + state: irq_state, + _phantom: core::marker::PhantomData, + } +} -// TODO: per-CPU -pub(crate) static RUN_QUEUE: LazyInit> = LazyInit::new(); +/// Selects the run queue index based on a CPU set bitmap and load balancing. +/// +/// This function filters the available run queues based on the provided `cpumask` and +/// selects the run queue index for the next task. The selection is based on a round-robin algorithm. +/// +/// ## Arguments +/// +/// * `cpumask` - A bitmap representing the CPUs that are eligible for task execution. +/// +/// ## Returns +/// +/// The index (cpu_id) of the selected run queue. +/// +/// ## Panics +/// +/// This function will panic if `cpu_mask` is empty, indicating that there are no available CPUs for task execution. +/// +#[cfg(feature = "smp")] +// The modulo operation is safe here because `axconfig::SMP` is always greater than 1 with "smp" enabled. +#[allow(clippy::modulo_one)] +#[inline] +fn select_run_queue_index(cpumask: CpuMask) -> usize { + use core::sync::atomic::{AtomicUsize, Ordering}; + static RUN_QUEUE_INDEX: AtomicUsize = AtomicUsize::new(0); -// TODO: per-CPU -static EXITED_TASKS: SpinNoIrq> = SpinNoIrq::new(VecDeque::new()); + assert!(!cpumask.is_empty(), "No available CPU for task execution"); -static WAIT_FOR_EXIT: WaitQueue = WaitQueue::new(); + // Round-robin selection of the run queue index. + loop { + let index = RUN_QUEUE_INDEX.fetch_add(1, Ordering::SeqCst) % axconfig::SMP; + if cpumask.get(index) { + return index; + } + } +} -#[percpu::def_percpu] -static IDLE_TASK: LazyInit = LazyInit::new(); +/// Retrieves a `'static` reference to the run queue corresponding to the given index. +/// +/// This function asserts that the provided index is within the range of available CPUs +/// and returns a reference to the corresponding run queue. +/// +/// ## Arguments +/// +/// * `index` - The index of the run queue to retrieve. +/// +/// ## Returns +/// +/// A reference to the `AxRunQueue` corresponding to the provided index. +/// +/// ## Panics +/// +/// This function will panic if the index is out of bounds. +/// +#[cfg(feature = "smp")] +#[inline] +fn get_run_queue(index: usize) -> &'static mut AxRunQueue { + unsafe { RUN_QUEUES[index].assume_init_mut() } +} +/// Selects the appropriate run queue for the provided task. +/// +/// * In a single-core system, this function always returns a reference to the global run queue. +/// * In a multi-core system, this function selects the run queue based on the task's CPU affinity and load balance. +/// +/// ## Arguments +/// +/// * `task` - A reference to the task for which a run queue is being selected. +/// +/// ## Returns +/// +/// A reference to the selected `AxRunQueue`. +/// +/// ## TODO +/// +/// 1. Implement better load balancing across CPUs for more efficient task distribution. +/// 2. Use a more generic load balancing algorithm that can be customized or replaced. +/// +#[inline] +pub(crate) fn select_run_queue(task: &AxTaskRef) -> AxRunQueueRef<'static, G> { + #[cfg(not(feature = "smp"))] + { + let _ = task; + // When SMP is disabled, all tasks are scheduled on the same global run queue. + current_run_queue() + } + #[cfg(feature = "smp")] + { + let irq_state = G::acquire(); + // When SMP is enabled, select the run queue based on the task's CPU affinity and load balance. + let index = select_run_queue_index(task.cpumask()); + AxRunQueueRef { + inner: get_run_queue(index), + state: irq_state, + _phantom: core::marker::PhantomData, + } + } +} + +/// `AxRunQueue` represents a run queue for global system or a specific CPU. pub(crate) struct AxRunQueue { - scheduler: Scheduler, + /// The ID of the CPU this run queue is associated with. + cpu_id: usize, + /// The core scheduler of this run queue. + /// Since irq and preempt are preserved by the kernel guard hold by `AxRunQueueRef`, + /// we just use a simple raw spin lock here. + scheduler: SpinRaw, } -impl AxRunQueue { - pub fn new() -> SpinNoIrq { - let gc_task = TaskInner::new(gc_entry, "gc".into(), axconfig::TASK_STACK_SIZE).into_arc(); - let mut scheduler = Scheduler::new(); - scheduler.add_task(gc_task); - SpinNoIrq::new(Self { scheduler }) +pub(crate) struct AxRunQueueRef<'a, G: BaseGuard> { + inner: &'a mut AxRunQueue, + state: G::State, + _phantom: core::marker::PhantomData, +} + +impl<'a, G: BaseGuard> Drop for AxRunQueueRef<'a, G> { + fn drop(&mut self) { + G::release(self.state); } +} +/// Core functions of run queue. +impl<'a, G: BaseGuard> AxRunQueueRef<'a, G> { pub fn add_task(&mut self, task: AxTaskRef) { - debug!("task spawn: {}", task.id_name()); + debug!( + "task add: {} on run_queue {}", + task.id_name(), + self.inner.cpu_id + ); assert!(task.is_ready()); - self.scheduler.add_task(task); + self.inner.scheduler.lock().add_task(task); } #[cfg(feature = "irq")] pub fn scheduler_timer_tick(&mut self) { let curr = crate::current(); - if !curr.is_idle() && self.scheduler.task_tick(curr.as_task_ref()) { + if !curr.is_idle() && self.inner.scheduler.lock().task_tick(curr.as_task_ref()) { #[cfg(feature = "preempt")] curr.set_preempt_pending(true); } @@ -49,24 +209,27 @@ impl AxRunQueue { let curr = crate::current(); trace!("task yield: {}", curr.id_name()); assert!(curr.is_running()); - self.resched(false); + self.inner.resched(false); } pub fn set_current_priority(&mut self, prio: isize) -> bool { - self.scheduler + self.inner + .scheduler + .lock() .set_priority(crate::current().as_task_ref(), prio) } #[cfg(feature = "preempt")] pub fn preempt_resched(&mut self) { + // There is no need to disable IRQ and preemption here, because + // they both have been disabled in `current_check_preempt_pending`. let curr = crate::current(); assert!(curr.is_running()); - // When we get the mutable reference of the run queue, we must - // have held the `SpinNoIrq` lock with both IRQs and preemption - // disabled. So we need to set `current_disable_count` to 1 in - // `can_preempt()` to obtain the preemption permission before - // locking the run queue. + // When we call `preempt_resched()`, both IRQs and preemption must + // have been disabled by `kernel_guard::NoPreemptIrqSave`. So we need + // to set `current_disable_count` to 1 in `can_preempt()` to obtain + // the preemption permission. let can_preempt = curr.can_preempt(1); debug!( @@ -75,54 +238,121 @@ impl AxRunQueue { can_preempt ); if can_preempt { - self.resched(true); + self.inner.resched(true); } else { curr.set_preempt_pending(true); } } + /// Exit the current task with the specified exit code. + /// This function will never return. pub fn exit_current(&mut self, exit_code: i32) -> ! { let curr = crate::current(); debug!("task exit: {}, exit_code={}", curr.id_name(), exit_code); - assert!(curr.is_running()); + assert!(curr.is_running(), "task is not running: {:?}", curr.state()); assert!(!curr.is_idle()); if curr.is_init() { - EXITED_TASKS.lock().clear(); + // Safety: it is called from `current_run_queue::().exit_current(exit_code)`, + // which disabled IRQs and preemption. + unsafe { + EXITED_TASKS.current_ref_mut_raw().clear(); + } axhal::misc::terminate(); } else { curr.set_state(TaskState::Exited); - curr.notify_exit(exit_code, self); - EXITED_TASKS.lock().push_back(curr.clone()); - WAIT_FOR_EXIT.notify_one_locked(false, self); - self.resched(false); + + // Notify the joiner task. + curr.notify_exit(exit_code); + + // Safety: it is called from `current_run_queue::().exit_current(exit_code)`, + // which disabled IRQs and preemption. + unsafe { + // Push current task to the `EXITED_TASKS` list, which will be consumed by the GC task. + EXITED_TASKS.current_ref_mut_raw().push_back(curr.clone()); + // Wake up the GC task to drop the exited tasks. + WAIT_FOR_EXIT.current_ref_mut_raw().notify_one(false); + } + + // Schedule to next task. + self.inner.resched(false); } unreachable!("task exited!"); } - pub fn block_current(&mut self, wait_queue_push: F) - where - F: FnOnce(AxTaskRef), - { + /// Block the current task, put current task into the wait queue and reschedule. + /// Mark the state of current task as `Blocked`, set the `in_wait_queue` flag as true. + /// Note: + /// 1. The caller must hold the lock of the wait queue. + /// 2. The caller must ensure that the current task is in the running state. + /// 3. The caller must ensure that the current task is not the idle task. + /// 4. The lock of the wait queue will be released explicitly after current task is pushed into it. + pub fn blocked_resched(&mut self, mut wq_guard: WaitQueueGuard) { let curr = crate::current(); - debug!("task block: {}", curr.id_name()); assert!(curr.is_running()); assert!(!curr.is_idle()); - // we must not block current task with preemption disabled. + // Current expected preempt count is 2. + // 1 for `NoPreemptIrqSave`, 1 for wait queue's `SpinNoIrq`. #[cfg(feature = "preempt")] - assert!(curr.can_preempt(1)); + assert!(curr.can_preempt(2)); + // Mark the task as blocked, this has to be done before adding it to the wait queue + // while holding the lock of the wait queue. curr.set_state(TaskState::Blocked); - wait_queue_push(curr.clone()); - self.resched(false); + curr.set_in_wait_queue(true); + + wq_guard.push_back(curr.clone()); + // Drop the lock of wait queue explictly. + drop(wq_guard); + + // Current task's state has been changed to `Blocked` and added to the wait queue. + // Note that the state may have been set as `Ready` in `unblock_task()`, + // see `unblock_task()` for details. + + debug!("task block: {}", curr.id_name()); + self.inner.resched(false); } + /// Unblock one task by inserting it into the run queue. + /// If task's `on_cpu` flag is true, + /// it will enter a loop until the task finishes its scheduling process. pub fn unblock_task(&mut self, task: AxTaskRef, resched: bool) { - debug!("task unblock: {}", task.id_name()); - if task.is_blocked() { - task.set_state(TaskState::Ready); - self.scheduler.add_task(task); // TODO: priority - if resched { + // Try to change the state of the task from `Blocked` to `Ready`, + // if successful, put it into the run queue, + // otherwise, the task is already unblocked by other cores. + if task.transition_state(TaskState::Blocked, TaskState::Ready) { + // Since now, the task to be unblocked is in the `Ready` state. + + // If the owning (remote) CPU is still in the middle of schedule() with + // this task as prev, wait until it's done referencing the task. + // + // Pairs with the `clear_prev_task_on_cpu()`. + // + // This ensures that tasks getting woken will be fully ordered against + // their previous state and preserve Program Order. + // + // Note: + // 1. This should be placed after the judgement of `TaskState::Blocked,`, + // because the task may have been woken up by other cores. + // 2. This can be placed in the front of `switch_to()` + #[cfg(feature = "smp")] + while task.on_cpu() { + // Wait for the task to finish its scheduling process. + core::hint::spin_loop(); + } + #[cfg(feature = "smp")] + assert!(!task.on_cpu()); + + let cpu_id = self.inner.cpu_id; + debug!("task unblock: {} on run_queue {}", task.id_name(), cpu_id); + self.inner + .scheduler + .lock() + .put_prev_task(task.clone(), resched); // TODO: priority + + // Note: when the task is unblocked on another CPU's run queue, + // we just ingiore the `resched` flag. + if resched && cpu_id == this_cpu_id() { #[cfg(feature = "preempt")] crate::current().set_preempt_pending(true); } @@ -140,12 +370,27 @@ impl AxRunQueue { if now < deadline { crate::timers::set_alarm_wakeup(deadline, curr.clone()); curr.set_state(TaskState::Blocked); - self.resched(false); + self.inner.resched(false); } } } impl AxRunQueue { + /// Create a new run queue for the specified CPU. + /// The run queue is initialized with a per-CPU gc task in its scheduler. + fn new(cpu_id: usize) -> Self { + let gc_task = TaskInner::new(gc_entry, "gc".into(), axconfig::TASK_STACK_SIZE).into_arc(); + // gc task should be pinned to the current CPU. + gc_task.set_cpumask(CpuMask::one_shot(cpu_id)); + + let mut scheduler = Scheduler::new(); + scheduler.add_task(gc_task); + Self { + cpu_id, + scheduler: SpinRaw::new(scheduler), + } + } + /// Common reschedule subroutine. If `preempt`, keep current task's time /// slice, otherwise reset it. fn resched(&mut self, preempt: bool) { @@ -153,17 +398,34 @@ impl AxRunQueue { if prev.is_running() { prev.set_state(TaskState::Ready); if !prev.is_idle() { - self.scheduler.put_prev_task(prev.clone(), preempt); + self.scheduler.lock().put_prev_task(prev.clone(), preempt); } } - let next = self.scheduler.pick_next_task().unwrap_or_else(|| unsafe { - // Safety: IRQs must be disabled at this time. - IDLE_TASK.current_ref_raw().get_unchecked().clone() - }); + + let next = self + .scheduler + .lock() + .pick_next_task() + .unwrap_or_else(|| unsafe { + // Safety: IRQs must be disabled at this time. + IDLE_TASK.current_ref_raw().get_unchecked().clone() + }); + assert!( + next.is_ready(), + "next {} is not ready: {:?}", + next.id_name(), + next.state() + ); self.switch_to(prev, next); } fn switch_to(&mut self, prev_task: CurrentTask, next_task: AxTaskRef) { + // Make sure that IRQs are disabled by kernel guard or other means. + #[cfg(all(not(test), feature = "irq"))] // Note: irq is faked under unit tests. + assert!( + !axhal::arch::irqs_enabled(), + "IRQs must be disabled during scheduling" + ); trace!( "context switch: {} -> {}", prev_task.id_name(), @@ -176,17 +438,32 @@ impl AxRunQueue { return; } + // Claim the task as running, we do this before switching to it + // such that any running task will have this set. + #[cfg(feature = "smp")] + next_task.set_on_cpu(true); + unsafe { let prev_ctx_ptr = prev_task.ctx_mut_ptr(); let next_ctx_ptr = next_task.ctx_mut_ptr(); + // Store the weak pointer of **prev_task** in **next_task**'s struct. + #[cfg(feature = "smp")] + next_task.set_prev_task(prev_task.as_task_ref()); + // The strong reference count of `prev_task` will be decremented by 1, // but won't be dropped until `gc_entry()` is called. assert!(Arc::strong_count(prev_task.as_task_ref()) > 1); assert!(Arc::strong_count(&next_task) >= 1); CurrentTask::set_current(prev_task, next_task); + (*prev_ctx_ptr).switch_to(&*next_ctx_ptr); + + // Current it's **next_task** running on this CPU, clear the `prev_task`'s `on_cpu` field + // to indicate that it has finished its scheduling process and no longer running on this CPU. + #[cfg(feature = "smp")] + crate::current().clear_prev_task_on_cpu(); } } } @@ -194,10 +471,10 @@ impl AxRunQueue { fn gc_entry() { loop { // Drop all exited tasks and recycle resources. - let n = EXITED_TASKS.lock().len(); + let n = EXITED_TASKS.with_current(|exited_tasks| exited_tasks.len()); for _ in 0..n { // Do not do the slow drops in the critical section. - let task = EXITED_TASKS.lock().pop_front(); + let task = EXITED_TASKS.with_current(|exited_tasks| exited_tasks.pop_front()); if let Some(task) = task { if Arc::strong_count(&task) == 1 { // If I'm the last holder of the task, drop it immediately. @@ -205,18 +482,25 @@ fn gc_entry() { } else { // Otherwise (e.g, `switch_to` is not compeleted, held by the // joiner, etc), push it back and wait for them to drop first. - EXITED_TASKS.lock().push_back(task); + EXITED_TASKS.with_current(|exited_tasks| exited_tasks.push_back(task)); } } } - WAIT_FOR_EXIT.wait(); + // Note: we cannot block current task with preemption disabled, + // use `current_ref_raw` to get the `WAIT_FOR_EXIT`'s reference here to avoid the use of `NoPreemptGuard`. + // Since gc task is pinned to the current CPU, there is no affection if the gc task is preempted during the process. + unsafe { WAIT_FOR_EXIT.current_ref_raw() }.wait(); } } pub(crate) fn init() { + let cpu_id = this_cpu_id(); + // Create the `idle` task (not current task). const IDLE_TASK_STACK_SIZE: usize = 4096; let idle_task = TaskInner::new(|| crate::run_idle(), "idle".into(), IDLE_TASK_STACK_SIZE); + // idle task should be pinned to the current CPU. + idle_task.set_cpumask(CpuMask::one_shot(cpu_id)); IDLE_TASK.with_current(|i| { i.init_once(idle_task.into_arc()); }); @@ -224,12 +508,19 @@ pub(crate) fn init() { // Put the subsequent execution into the `main` task. let main_task = TaskInner::new_init("main".into()).into_arc(); main_task.set_state(TaskState::Running); - unsafe { CurrentTask::init_current(main_task) }; + unsafe { CurrentTask::init_current(main_task) } - RUN_QUEUE.init_once(AxRunQueue::new()); + RUN_QUEUE.with_current(|rq| { + rq.init_once(AxRunQueue::new(cpu_id)); + }); + unsafe { + RUN_QUEUES[cpu_id].write(RUN_QUEUE.current_ref_mut_raw()); + } } pub(crate) fn init_secondary() { + let cpu_id = this_cpu_id(); + // Put the subsequent execution into the `idle` task. let idle_task = TaskInner::new_init("idle".into()).into_arc(); idle_task.set_state(TaskState::Running); @@ -237,4 +528,11 @@ pub(crate) fn init_secondary() { i.init_once(idle_task.clone()); }); unsafe { CurrentTask::init_current(idle_task) } + + RUN_QUEUE.with_current(|rq| { + rq.init_once(AxRunQueue::new(cpu_id)); + }); + unsafe { + RUN_QUEUES[cpu_id].write(RUN_QUEUE.current_ref_mut_raw()); + } } diff --git a/modules/axtask/src/task.rs b/modules/axtask/src/task.rs index a916551eb5..017b764171 100644 --- a/modules/axtask/src/task.rs +++ b/modules/axtask/src/task.rs @@ -3,17 +3,20 @@ use core::ops::Deref; use core::sync::atomic::{AtomicBool, AtomicI32, AtomicU64, AtomicU8, Ordering}; use core::{alloc::Layout, cell::UnsafeCell, fmt, ptr::NonNull}; +#[cfg(feature = "smp")] +use alloc::sync::Weak; #[cfg(feature = "preempt")] use core::sync::atomic::AtomicUsize; -#[cfg(feature = "tls")] -use axhal::tls::TlsArea; +use kspin::SpinNoIrq; +use memory_addr::{align_up_4k, VirtAddr}; use axhal::arch::TaskContext; -use memory_addr::{align_up_4k, VirtAddr}; +#[cfg(feature = "tls")] +use axhal::tls::TlsArea; use crate::task_ext::AxTaskExt; -use crate::{AxRunQueue, AxTask, AxTaskRef, WaitQueue}; +use crate::{AxTask, AxTaskRef, CpuMask, WaitQueue}; /// A unique identifier for a thread. #[derive(Debug, Clone, Copy, Eq, PartialEq)] @@ -23,9 +26,14 @@ pub struct TaskId(u64); #[repr(u8)] #[derive(Debug, Clone, Copy, Eq, PartialEq)] pub(crate) enum TaskState { + /// Task is running on some CPU. Running = 1, + /// Task is ready to run on some scheduler's ready queue. Ready = 2, + /// Task is blocked (in the wait queue or timer list), + /// and it has finished its scheduling process, it can be wake up by `notify()` on any run queue safely. Blocked = 3, + /// Task is exited and waiting for being dropped. Exited = 4, } @@ -39,9 +47,24 @@ pub struct TaskInner { entry: Option<*mut dyn FnOnce()>, state: AtomicU8, + /// CPU affinity mask. + cpumask: SpinNoIrq, + + /// Mark whether the task is in the wait queue. in_wait_queue: AtomicBool, + + /// Used to indicate whether the task is running on a CPU. + #[cfg(feature = "smp")] + on_cpu: AtomicBool, + /// A weak reference to the previous task running on this CPU. + #[cfg(feature = "smp")] + prev_task: UnsafeCell>, + + /// A ticket ID used to identify the timer event. + /// Set by `set_timer_ticket()` when creating a timer event in `set_alarm_wakeup()`, + /// expired by setting it as zero in `timer_ticket_expired()`, which is called by `cancel_events()`. #[cfg(feature = "irq")] - in_timer_list: AtomicBool, + timer_ticket_id: AtomicU64, #[cfg(feature = "preempt")] need_resched: AtomicBool, @@ -171,9 +194,15 @@ impl TaskInner { is_init: false, entry: None, state: AtomicU8::new(TaskState::Ready as u8), + // By default, the task is allowed to run on all CPUs. + cpumask: SpinNoIrq::new(CpuMask::full()), in_wait_queue: AtomicBool::new(false), #[cfg(feature = "irq")] - in_timer_list: AtomicBool::new(false), + timer_ticket_id: AtomicU64::new(0), + #[cfg(feature = "smp")] + on_cpu: AtomicBool::new(false), + #[cfg(feature = "smp")] + prev_task: UnsafeCell::new(Weak::default()), #[cfg(feature = "preempt")] need_resched: AtomicBool::new(false), #[cfg(feature = "preempt")] @@ -199,6 +228,8 @@ impl TaskInner { pub(crate) fn new_init(name: String) -> Self { let mut t = Self::new_common(TaskId::new(), name); t.is_init = true; + #[cfg(feature = "smp")] + t.set_on_cpu(true); if t.name == "idle" { t.is_idle = true; } @@ -219,6 +250,21 @@ impl TaskInner { self.state.store(state as u8, Ordering::Release) } + /// Transition the task state from `current_state` to `new_state`, + /// Returns `true` if the current state is `current_state` and the state is successfully set to `new_state`, + /// otherwise returns `false`. + #[inline] + pub(crate) fn transition_state(&self, current_state: TaskState, new_state: TaskState) -> bool { + self.state + .compare_exchange( + current_state as u8, + new_state as u8, + Ordering::AcqRel, + Ordering::Acquire, + ) + .is_ok() + } + #[inline] pub(crate) fn is_running(&self) -> bool { matches!(self.state(), TaskState::Running) @@ -229,11 +275,6 @@ impl TaskInner { matches!(self.state(), TaskState::Ready) } - #[inline] - pub(crate) fn is_blocked(&self) -> bool { - matches!(self.state(), TaskState::Blocked) - } - #[inline] pub(crate) const fn is_init(&self) -> bool { self.is_init @@ -244,6 +285,17 @@ impl TaskInner { self.is_idle } + #[allow(unused)] + #[inline] + pub(crate) fn cpumask(&self) -> CpuMask { + *self.cpumask.lock() + } + + #[inline] + pub(crate) fn set_cpumask(&self, cpumask: CpuMask) { + *self.cpumask.lock() = cpumask + } + #[inline] pub(crate) fn in_wait_queue(&self) -> bool { self.in_wait_queue.load(Ordering::Acquire) @@ -254,16 +306,30 @@ impl TaskInner { self.in_wait_queue.store(in_wait_queue, Ordering::Release); } + /// Returns task's current timer ticket ID. + #[inline] + #[cfg(feature = "irq")] + pub(crate) fn timer_ticket(&self) -> u64 { + self.timer_ticket_id.load(Ordering::Acquire) + } + + /// Set the timer ticket ID. #[inline] #[cfg(feature = "irq")] - pub(crate) fn in_timer_list(&self) -> bool { - self.in_timer_list.load(Ordering::Acquire) + pub(crate) fn set_timer_ticket(&self, timer_ticket_id: u64) { + // CAN NOT set timer_ticket_id to 0, + // because 0 is used to indicate the timer event is expired. + assert!(timer_ticket_id != 0); + self.timer_ticket_id + .store(timer_ticket_id, Ordering::Release); } + /// Expire timer ticket ID by setting it to 0, + /// it can be used to identify one timer event is triggered or expired. #[inline] #[cfg(feature = "irq")] - pub(crate) fn set_in_timer_list(&self, in_timer_list: bool) { - self.in_timer_list.store(in_timer_list, Ordering::Release); + pub(crate) fn timer_ticket_expired(&self) { + self.timer_ticket_id.store(0, Ordering::Release); } #[inline] @@ -297,16 +363,17 @@ impl TaskInner { fn current_check_preempt_pending() { let curr = crate::current(); if curr.need_resched.load(Ordering::Acquire) && curr.can_preempt(0) { - let mut rq = crate::RUN_QUEUE.lock(); + let mut rq = crate::current_run_queue::(); if curr.need_resched.load(Ordering::Acquire) { - rq.preempt_resched(); + rq.preempt_resched() } } } - pub(crate) fn notify_exit(&self, exit_code: i32, rq: &mut AxRunQueue) { + /// Notify all tasks that join on this task. + pub(crate) fn notify_exit(&self, exit_code: i32) { self.exit_code.store(exit_code, Ordering::Release); - self.wait_for_exit.notify_all_locked(false, rq); + self.wait_for_exit.notify_all(false); } #[inline] @@ -320,6 +387,56 @@ impl TaskInner { self.ctx.get_mut() } + /// Returns whether the task is running on a CPU. + /// + /// It is used to protect the task from being moved to a different run queue + /// while it has not finished its scheduling process. + /// The `on_cpu field is set to `true` when the task is preparing to run on a CPU, + /// and it is set to `false` when the task has finished its scheduling process in `clear_prev_task_on_cpu()`. + #[cfg(feature = "smp")] + pub(crate) fn on_cpu(&self) -> bool { + self.on_cpu.load(Ordering::Acquire) + } + + /// Sets whether the task is running on a CPU. + #[cfg(feature = "smp")] + pub(crate) fn set_on_cpu(&self, on_cpu: bool) { + self.on_cpu.store(on_cpu, Ordering::Release) + } + + /// Stores a weak reference to the previous task running on this CPU. + /// + /// ## Safety + /// This function is only called by current task in `switch_to`. + #[cfg(feature = "smp")] + pub(crate) unsafe fn set_prev_task(&self, prev_task: &AxTaskRef) { + *self.prev_task.get() = Arc::downgrade(prev_task); + } + + /// Clears the `on_cpu` field of previous task running on this CPU. + /// It is called by the current task before running. + /// The weak reference of previous task running on this CPU is set through `set_prev_task()`. + /// + /// Panic if the pointer is invalid or the previous task is dropped. + /// + /// ## Note + /// This must be the very last reference to @_prev_task from this CPU. + /// After `on_cpu` is cleared, the task can be moved to a different CPU. + /// We must ensure this doesn't happen until the switch is completely finished. + /// + /// ## Safety + /// The caller must ensure that the weak reference to the prev task is valid, which is + /// done by the previous task running on this CPU through `set_prev_task()`. + #[cfg(feature = "smp")] + pub(crate) unsafe fn clear_prev_task_on_cpu(&self) { + self.prev_task + .get() + .as_ref() + .and_then(|weak| weak.upgrade()) + .expect("Invalid prev_task pointer or prev_task has been dropped") + .set_on_cpu(false); + } + /// Returns the top address of the kernel stack. #[inline] pub const fn kernel_stack_top(&self) -> Option { @@ -429,8 +546,12 @@ impl Deref for CurrentTask { } extern "C" fn task_entry() -> ! { - // release the lock that was implicitly held across the reschedule - unsafe { crate::RUN_QUEUE.force_unlock() }; + #[cfg(feature = "smp")] + unsafe { + // Clear the prev task on CPU before running the task entry function. + crate::current().clear_prev_task_on_cpu(); + } + // Enable irq (if feature "irq" is enabled) before running the task entry function. #[cfg(feature = "irq")] axhal::arch::enable_irqs(); let task = crate::current(); diff --git a/modules/axtask/src/timers.rs b/modules/axtask/src/timers.rs index 1c4a8eed05..4dcc03c41b 100644 --- a/modules/axtask/src/timers.rs +++ b/modules/axtask/src/timers.rs @@ -1,40 +1,56 @@ -use alloc::sync::Arc; -use axhal::time::wall_time; -use kspin::SpinNoIrq; +use core::sync::atomic::{AtomicU64, Ordering}; + +use kernel_guard::NoOp; use lazyinit::LazyInit; use timer_list::{TimeValue, TimerEvent, TimerList}; -use crate::{AxTaskRef, RUN_QUEUE}; +use axhal::time::wall_time; + +use crate::{select_run_queue, AxTaskRef}; + +static TIMER_TICKET_ID: AtomicU64 = AtomicU64::new(1); -// TODO: per-CPU -static TIMER_LIST: LazyInit>> = LazyInit::new(); +percpu_static! { + TIMER_LIST: LazyInit> = LazyInit::new(), +} -struct TaskWakeupEvent(AxTaskRef); +struct TaskWakeupEvent { + ticket_id: u64, + task: AxTaskRef, +} impl TimerEvent for TaskWakeupEvent { fn callback(self, _now: TimeValue) { - let mut rq = RUN_QUEUE.lock(); - self.0.set_in_timer_list(false); - rq.unblock_task(self.0, true); + // Ignore the timer event if timeout was set but not triggered + // (wake up by `WaitQueue::notify()`). + // Judge if this timer event is still valid by checking the ticket ID. + if self.task.timer_ticket() != self.ticket_id { + // Timer ticket ID is not matched. + // Just ignore this timer event and return. + return; + } + + // Timer ticket match. + select_run_queue::(&self.task).unblock_task(self.task, true) } } pub fn set_alarm_wakeup(deadline: TimeValue, task: AxTaskRef) { - let mut timers = TIMER_LIST.lock(); - task.set_in_timer_list(true); - timers.set(deadline, TaskWakeupEvent(task)); -} - -pub fn cancel_alarm(task: &AxTaskRef) { - let mut timers = TIMER_LIST.lock(); - task.set_in_timer_list(false); - timers.cancel(|t| Arc::ptr_eq(&t.0, task)); + TIMER_LIST.with_current(|timer_list| { + let ticket_id = TIMER_TICKET_ID.fetch_add(1, Ordering::AcqRel); + task.set_timer_ticket(ticket_id); + timer_list.set(deadline, TaskWakeupEvent { ticket_id, task }); + }) } pub fn check_events() { loop { let now = wall_time(); - let event = TIMER_LIST.lock().expire_one(now); + let event = unsafe { + // Safety: IRQs are disabled at this time. + TIMER_LIST.current_ref_mut_raw() + } + .expire_one(now); if let Some((_deadline, event)) = event { event.callback(now); } else { @@ -44,5 +60,7 @@ pub fn check_events() { } pub fn init() { - TIMER_LIST.init_once(SpinNoIrq::new(TimerList::new())); + TIMER_LIST.with_current(|timer_list| { + timer_list.init_once(TimerList::new()); + }); } diff --git a/modules/axtask/src/wait_queue.rs b/modules/axtask/src/wait_queue.rs index d13628ad6f..d6e546f347 100644 --- a/modules/axtask/src/wait_queue.rs +++ b/modules/axtask/src/wait_queue.rs @@ -1,8 +1,10 @@ use alloc::collections::VecDeque; use alloc::sync::Arc; -use kspin::SpinRaw; -use crate::{AxRunQueue, AxTaskRef, CurrentTask, RUN_QUEUE}; +use kernel_guard::{NoOp, NoPreemptIrqSave}; +use kspin::{SpinNoIrq, SpinNoIrqGuard}; + +use crate::{current_run_queue, select_run_queue, AxTaskRef, CurrentTask}; /// A queue to store sleeping tasks. /// @@ -27,49 +29,57 @@ use crate::{AxRunQueue, AxTaskRef, CurrentTask, RUN_QUEUE}; /// assert_eq!(VALUE.load(Ordering::Relaxed), 1); /// ``` pub struct WaitQueue { - queue: SpinRaw>, // we already disabled IRQs when lock the `RUN_QUEUE` + queue: SpinNoIrq>, } +pub(crate) type WaitQueueGuard<'a> = SpinNoIrqGuard<'a, VecDeque>; + impl WaitQueue { /// Creates an empty wait queue. pub const fn new() -> Self { Self { - queue: SpinRaw::new(VecDeque::new()), + queue: SpinNoIrq::new(VecDeque::new()), } } /// Creates an empty wait queue with space for at least `capacity` elements. pub fn with_capacity(capacity: usize) -> Self { Self { - queue: SpinRaw::new(VecDeque::with_capacity(capacity)), + queue: SpinNoIrq::new(VecDeque::with_capacity(capacity)), } } - fn cancel_events(&self, curr: CurrentTask) { + /// Cancel events by removing the task from the wait queue. + /// If `from_timer_list` is true, try to remove the task from the timer list. + fn cancel_events(&self, curr: CurrentTask, _from_timer_list: bool) { // A task can be wake up only one events (timer or `notify()`), remove // the event from another queue. if curr.in_wait_queue() { // wake up by timer (timeout). - // `RUN_QUEUE` is not locked here, so disable IRQs. - let _guard = kernel_guard::IrqSave::new(); - self.queue.lock().retain(|t| !curr.ptr_eq(t)); + let mut wq_locked = self.queue.lock(); + wq_locked.retain(|t| !curr.ptr_eq(t)); curr.set_in_wait_queue(false); } + + // Try to cancel a timer event from timer lists. + // Just mark task's current timer ticket ID as expired. #[cfg(feature = "irq")] - if curr.in_timer_list() { - // timeout was set but not triggered (wake up by `WaitQueue::notify()`) - crate::timers::cancel_alarm(curr.as_task_ref()); + if _from_timer_list { + curr.timer_ticket_expired(); + // Note: + // this task is still not removed from timer list of target CPU, + // which may cause some redundant timer events because it still needs to + // go through the process of expiring an event from the timer list and invoking the callback. + // (it can be considered a lazy-removal strategy, it will be ignored when it is about to take effect.) } } /// Blocks the current task and put it into the wait queue, until other task /// notifies it. pub fn wait(&self) { - RUN_QUEUE.lock().block_current(|task| { - task.set_in_wait_queue(true); - self.queue.lock().push_back(task) - }); - self.cancel_events(crate::current()); + let mut rq = current_run_queue::(); + rq.blocked_resched(self.queue.lock()); + self.cancel_events(crate::current(), false); } /// Blocks the current task and put it into the wait queue, until the given @@ -81,23 +91,23 @@ impl WaitQueue { where F: Fn() -> bool, { + let curr = crate::current(); loop { - let mut rq = RUN_QUEUE.lock(); + let mut rq = current_run_queue::(); + let wq = self.queue.lock(); if condition() { break; } - rq.block_current(|task| { - task.set_in_wait_queue(true); - self.queue.lock().push_back(task); - }); + rq.blocked_resched(wq); } - self.cancel_events(crate::current()); + self.cancel_events(curr, false); } /// Blocks the current task and put it into the wait queue, until other tasks /// notify it, or the given duration has elapsed. #[cfg(feature = "irq")] pub fn wait_timeout(&self, dur: core::time::Duration) -> bool { + let mut rq = current_run_queue::(); let curr = crate::current(); let deadline = axhal::time::wall_time() + dur; debug!( @@ -107,12 +117,12 @@ impl WaitQueue { ); crate::timers::set_alarm_wakeup(deadline, curr.clone()); - RUN_QUEUE.lock().block_current(|task| { - task.set_in_wait_queue(true); - self.queue.lock().push_back(task) - }); + rq.blocked_resched(self.queue.lock()); + let timeout = curr.in_wait_queue(); // still in the wait queue, must have timed out - self.cancel_events(curr); + + // Always try to remove the task from the timer list. + self.cancel_events(curr, true); timeout } @@ -136,18 +146,21 @@ impl WaitQueue { crate::timers::set_alarm_wakeup(deadline, curr.clone()); let mut timeout = true; - while axhal::time::wall_time() < deadline { - let mut rq = RUN_QUEUE.lock(); + loop { + let mut rq = current_run_queue::(); + if axhal::time::wall_time() >= deadline { + break; + } + let wq = self.queue.lock(); if condition() { timeout = false; break; } - rq.block_current(|task| { - task.set_in_wait_queue(true); - self.queue.lock().push_back(task); - }); + + rq.blocked_resched(wq); } - self.cancel_events(curr); + // Always try to remove the task from the timer list. + self.cancel_events(curr, true); timeout } @@ -156,9 +169,10 @@ impl WaitQueue { /// If `resched` is true, the current task will be preempted when the /// preemption is enabled. pub fn notify_one(&self, resched: bool) -> bool { - let mut rq = RUN_QUEUE.lock(); - if !self.queue.lock().is_empty() { - self.notify_one_locked(resched, &mut rq) + let mut wq = self.queue.lock(); + if let Some(task) = wq.pop_front() { + unblock_one_task(task, resched); + true } else { false } @@ -169,15 +183,8 @@ impl WaitQueue { /// If `resched` is true, the current task will be preempted when the /// preemption is enabled. pub fn notify_all(&self, resched: bool) { - loop { - let mut rq = RUN_QUEUE.lock(); - if let Some(task) = self.queue.lock().pop_front() { - task.set_in_wait_queue(false); - rq.unblock_task(task, resched); - } else { - break; - } - drop(rq); // we must unlock `RUN_QUEUE` after unlocking `self.queue`. + while self.notify_one(resched) { + // loop until the wait queue is empty } } @@ -186,31 +193,21 @@ impl WaitQueue { /// If `resched` is true, the current task will be preempted when the /// preemption is enabled. pub fn notify_task(&mut self, resched: bool, task: &AxTaskRef) -> bool { - let mut rq = RUN_QUEUE.lock(); let mut wq = self.queue.lock(); if let Some(index) = wq.iter().position(|t| Arc::ptr_eq(t, task)) { - task.set_in_wait_queue(false); - rq.unblock_task(wq.remove(index).unwrap(), resched); - true - } else { - false - } - } - - pub(crate) fn notify_one_locked(&self, resched: bool, rq: &mut AxRunQueue) -> bool { - if let Some(task) = self.queue.lock().pop_front() { - task.set_in_wait_queue(false); - rq.unblock_task(task, resched); + unblock_one_task(wq.remove(index).unwrap(), resched); true } else { false } } +} - pub(crate) fn notify_all_locked(&self, resched: bool, rq: &mut AxRunQueue) { - while let Some(task) = self.queue.lock().pop_front() { - task.set_in_wait_queue(false); - rq.unblock_task(task, resched); - } - } +fn unblock_one_task(task: AxTaskRef, resched: bool) { + // Mark task as not in wait queue. + task.set_in_wait_queue(false); + // Select run queue by the CPU set of the task. + // Use `NoOp` kernel guard here because the function is called with holding the + // lock of wait queue, where the irq and preemption are disabled. + select_run_queue::(&task).unblock_task(task, resched) }