diff --git a/Cargo.lock b/Cargo.lock index 6a23a916..05f66224 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4029,6 +4029,7 @@ dependencies = [ "bytes", "libc", "mio", + "num_cpus", "pin-project-lite", "socket2", "tokio-macros", diff --git a/platforms/melpomene/Cargo.toml b/platforms/melpomene/Cargo.toml index d8edffb2..6475fa30 100644 --- a/platforms/melpomene/Cargo.toml +++ b/platforms/melpomene/Cargo.toml @@ -72,7 +72,7 @@ optional = true [dependencies.tokio] version = "1.19" -features = ["rt", "time", "macros", "sync"] +features = ["rt", "rt-multi-thread", "time", "macros", "sync"] [dependencies.clap] version = "3.0" diff --git a/platforms/melpomene/src/main.rs b/platforms/melpomene/src/main.rs index a84efdec..8257ab3c 100644 --- a/platforms/melpomene/src/main.rs +++ b/platforms/melpomene/src/main.rs @@ -30,7 +30,7 @@ fn main() { #[global_allocator] static AHEAP: MnemosAlloc = MnemosAlloc::new(); -#[tokio::main(flavor = "current_thread")] +#[tokio::main(flavor = "multi_thread", worker_threads = 2)] async fn run_melpomene() { let local = tokio::task::LocalSet::new(); println!("========================================"); @@ -128,45 +128,25 @@ async fn kernel_entry() { let sleep_cap = config .platform .sleep_cap - .unwrap_or_else(PlatformConfig::default_sleep_cap) - .as_micros() as u64; + .unwrap_or_else(PlatformConfig::default_sleep_cap); + + let t0 = tokio::time::Instant::now(); loop { - // Tick the scheduler - let t0 = tokio::time::Instant::now(); - let tick = k.tick(); - - // advance the timer (don't take more than 500k years) - let ticks = t0.elapsed().as_micros() as u64; - let turn = k.timer().force_advance_ticks(ticks); - tracing::trace!("advanced timer by {ticks:?}"); - - // If there is nothing else scheduled, and we didn't just wake something up, - // sleep for some amount of time - if turn.expired == 0 && !tick.has_remaining { - let wfi_start = tokio::time::Instant::now(); - // if no timers have expired on this tick, we should sleep until the - // next timer expires *or* something is woken by I/O, to simulate a - // hardware platform waiting for an interrupt. - tracing::trace!("waiting for an interrupt..."); - - let amount = turn.ticks_to_next_deadline().unwrap_or(sleep_cap); - tracing::trace!("next timer expires in {amount:?}us"); - // wait for an "interrupt" - futures::select! { - _ = irq.notified().fuse() => { - tracing::trace!("...woken by I/O interrupt"); - }, - _ = tokio::time::sleep(Duration::from_micros(amount)).fuse() => { - tracing::trace!("woken by timer"); - } - } + let sleep = k.run_until_sleepy(|| t0.elapsed()); + + tracing::trace!("waiting for an interrupt..."); - // Account for time slept - let elapsed = wfi_start.elapsed().as_micros() as u64; - let _turn = k.timer().force_advance_ticks(elapsed); - } else { - // let other tokio tasks (simulated hardware devices) run. - tokio::task::yield_now().await; + let amount = sleep.next_deadline.unwrap_or(sleep_cap); + tracing::trace!("next timer expires in {amount:?}us"); + + // wait for an "interrupt" + futures::select! { + _ = irq.notified().fuse() => { + tracing::trace!("...woken by I/O interrupt"); + }, + _ = tokio::time::sleep(amount).fuse() => { + tracing::trace!("woken by timer"); + } } } } diff --git a/source/kernel/src/lib.rs b/source/kernel/src/lib.rs index f6df50bb..d60cf082 100644 --- a/source/kernel/src/lib.rs +++ b/source/kernel/src/lib.rs @@ -157,6 +157,13 @@ pub struct KernelServiceSettings { pub sermux_trace: serial_trace::SerialTraceSettings, } +pub struct Sleepy { + pub next_deadline: Option, + kernel: &'static Kernel, + wfi_start: Duration, + now: F, +} + impl Kernel { /// Create a new kernel with the given settings. /// @@ -256,6 +263,51 @@ impl Kernel { self.inner.timer.timeout(duration, f) } + /// Run the kernel executor continuously until no scheduled work is + /// remaining, returning the duration for which the platform implementation + /// should sleep. + /// + /// # Returns + /// + /// - [`Some`]`(`[`Duration`]`)` if the platform implementation should set a + /// timer before waiting for an interrupt. If this method returns + /// [`Some`], the platform should set a timer to expire after the returned + /// [`Duration`] prior to sleeping, and must advance the kernel's timer + /// by the period for which the platform slept. + /// - [`None`] if there is no pending timeouts in the kernel's timer wheel. + /// In this case, the platform implementation should sleep until an interrupt + /// occurs. The platform *may* choose to set a timer anyway, to limit the + /// maximum amount of time spent waiting for an interrupt. + pub fn run_until_sleepy(&'static self, mut now: F) -> Sleepy + where + F: FnMut() -> Duration, + { + loop { + let start = now(); + + // Tick the scheduler. + let tick = self.tick(); + + // Advance the timer by the tick duration. + let elapsed = now() - start; + let turn = self.timer().force_advance(elapsed); + + // If there is nothing else scheduled, and we didn't just wake + // something up, it's time for the platform implementation to + // sleep. + if turn.expired == 0 && !tick.has_remaining { + return Sleepy { + kernel: self, + wfi_start: now(), + next_deadline: turn.time_to_next_deadline(), + now, + }; + } + + // Otherwise, continue looping. + } + } + /// Initialize the default set of cross-platform kernel [`services`] that /// are spawned on all hardware platforms. /// @@ -367,3 +419,15 @@ impl Kernel { } } } + +// === impl Sleepy === + +impl Sleepy +where + F: FnMut() -> Duration, +{ + pub fn wake_up(mut self) { + let elapsed = (self.now)().saturating_sub(self.wfi_start); + self.kernel.timer().force_advance(elapsed); + } +}