diff --git a/Cargo.toml b/Cargo.toml index 0880d35..f3233e0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,7 @@ default = [] multitask = [ "dep:percpu", "dep:spinlock", "dep:lazy_init", "dep:memory_addr", - "dep:scheduler", "dep:timer_list", "kernel_guard","taskctx/multitask" + "dep:scheduler", "dep:timer_list", "kernel_guard","taskctx/multitask","dep:cpumask" ] irq = [] tls = ["axhal/tls", "taskctx/tls"] @@ -47,6 +47,7 @@ taskctx = { git = "https://github.com/Starry-OS/taskctx.git" } axlog = { git = "https://github.com/Starry-OS/axlog.git" } linked_list = { git = "https://github.com/Starry-OS/linked_list.git" } axbacktrace = { git = "https://github.com/kern-crates/axbacktrace.git" } +cpumask = { git = "https://github.com/arceos-org/cpumask.git", optional = true } [dev-dependencies] rand = "0.8" diff --git a/src/api.rs b/src/api.rs index 08c85aa..22107dd 100644 --- a/src/api.rs +++ b/src/api.rs @@ -3,21 +3,16 @@ use alloc::{string::String, sync::Arc}; #[cfg(feature = "monolithic")] use axhal::KERNEL_PROCESS_ID; - +use kernel_guard::NoPreemptIrqSave; use crate::task::{ScheduleTask, TaskState}; -use crate::schedule::get_wait_for_exit_queue; +use crate::processor::select_processor; +use crate::processor::get_wait_for_exit_queue; #[doc(cfg(feature = "multitask"))] pub use crate::task::{new_task, CurrentTask, TaskId}; #[doc(cfg(feature = "multitask"))] pub use crate::wait_queue::WaitQueue; - -pub use crate::processor::{current_processor, Processor}; - -pub use crate::schedule::schedule; - -#[cfg(feature = "irq")] -pub use crate::schedule::schedule_timeout; +pub use crate::processor::current_processor; /// The reference type of a task. pub type AxTaskRef = Arc; @@ -75,7 +70,7 @@ pub fn init_scheduler_secondary() { #[doc(cfg(feature = "irq"))] pub fn on_timer_tick() { crate::timers::check_events(); - crate::schedule::scheduler_timer_tick(); + current_processor::().scheduler_timer_tick(); } #[cfg(feature = "preempt")] @@ -84,16 +79,15 @@ pub fn on_timer_tick() { /// disable_preempt ctx pub fn current_check_preempt_pending() { let curr = crate::current(); - // if task is already exited or blocking, - // no need preempt, they are rescheduling - if curr.get_preempt_pending() && curr.can_preempt() && !curr.is_exited() && !curr.is_blocking() + + if curr.get_preempt_pending() && curr.can_preempt() && !curr.is_blocked() { debug!( "current {} is to be preempted , allow {}", curr.id_name(), curr.can_preempt() ); - crate::schedule::schedule() + current_processor::().reschedule(); } } @@ -113,7 +107,7 @@ where #[cfg(feature = "monolithic")] 0, ); - Processor::first_add_task(task.clone()); + select_processor::(&task).add_task(task.clone()); task } @@ -140,13 +134,13 @@ where /// /// [CFS]: https://en.wikipedia.org/wiki/Completely_Fair_Scheduler pub fn set_priority(prio: isize) -> bool { - crate::schedule::set_current_priority(prio) + current_processor::().set_current_priority(prio) } /// Current task gives up the CPU time voluntarily, and switches to another /// ready task. pub fn yield_now() { - crate::schedule::yield_current(); + current_processor::().yield_current(); } /// Current task is going to sleep for the given duration. @@ -161,13 +155,13 @@ pub fn sleep(dur: core::time::Duration) { /// If the feature `irq` is not enabled, it uses busy-wait instead. pub fn sleep_until(deadline: axhal::time::TimeValue) { #[cfg(feature = "irq")] - crate::schedule::schedule_timeout(deadline); + current_processor::().schedule_timeout(deadline); #[cfg(not(feature = "irq"))] axhal::time::busy_wait_until(deadline); } /// wake up task pub fn wakeup_task(task: AxTaskRef) { - crate::schedule::wakeup_task(task) + current_processor::().wakeup_task(task); } /// Current task is going to sleep, it will be woken up when the given task exits. @@ -198,9 +192,12 @@ pub fn wake_vfork_process(task: &AxTaskRef) { /// Exits the current task. pub fn exit(exit_code: i32) -> ! { - crate::schedule::exit_current(exit_code) + current_processor::().exit_current(exit_code); } +/// The wrapper type for cpumask::CpuMask with SMP configuration. +pub type CpuMask = cpumask::CpuMask<{ axconfig::SMP }>; + /// The idle task routine. /// /// It runs an infinite loop that keeps calling [`yield_now()`]. diff --git a/src/lib.rs b/src/lib.rs index 0f9cac0..9a360d0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -38,7 +38,6 @@ cfg_if::cfg_if! { mod processor; mod task; - mod schedule; mod api; mod wait_list; mod wait_queue; diff --git a/src/processor.rs b/src/processor.rs index 55a77b6..60d9dfc 100644 --- a/src/processor.rs +++ b/src/processor.rs @@ -1,35 +1,60 @@ use alloc::collections::VecDeque; + use alloc::collections::BTreeMap; use alloc::sync::Arc; -use core::mem::ManuallyDrop; +use core::mem::MaybeUninit; use core::sync::atomic::{AtomicUsize, Ordering}; use lazy_init::LazyInit; use scheduler::BaseScheduler; -use spinlock::{SpinNoIrq, SpinNoIrqOnly, SpinNoIrqOnlyGuard}; +use spinlock::{SpinNoIrq, SpinRaw}; +use kernel_guard::BaseGuard; +use kernel_guard::NoOp; +use axhal::cpu::this_cpu_id; #[cfg(feature = "monolithic")] use axhal::KERNEL_PROCESS_ID; use crate::task::{new_init_task, new_task, CurrentTask, TaskState}; +use crate::{AxTaskRef, Scheduler, WaitQueue, CpuMask}; -use crate::{AxTaskRef, Scheduler, WaitQueue}; +const ARRAY_REPEAT_VALUE: MaybeUninit<&'static mut Processor> = MaybeUninit::uninit(); -static PROCESSORS: SpinNoIrqOnly> = - SpinNoIrqOnly::new(VecDeque::new()); +static mut PROCESSORS: [MaybeUninit<&'static mut Processor>; axconfig::SMP] = [ARRAY_REPEAT_VALUE; axconfig::SMP]; #[percpu::def_percpu] static PROCESSOR: LazyInit = LazyInit::new(); +/// A map to store tasks' wait queues, which stores tasks that are waiting for this task to exit. +pub(crate) static WAIT_FOR_TASK_EXITS: SpinNoIrq>> = + SpinNoIrq::new(BTreeMap::new()); + +pub(crate) fn add_wait_for_exit_queue(task: &AxTaskRef) { + WAIT_FOR_TASK_EXITS + .lock() + .insert(task.id().as_u64(), Arc::new(WaitQueue::new())); +} + +pub(crate) fn get_wait_for_exit_queue(task: &AxTaskRef) -> Option> { + WAIT_FOR_TASK_EXITS.lock().get(&task.id().as_u64()).cloned() +} + + +/// When the task exits, notify all tasks that are waiting for this task to exit, and +/// then remove the wait queue of the exited task. +pub(crate) fn notify_wait_for_exit(task: &AxTaskRef) { + if let Some(wait_queue) = WAIT_FOR_TASK_EXITS.lock().remove(&task.id().as_u64()) { + wait_queue.notify_all(); + } +} pub struct Processor { /// Processor SCHEDULER - scheduler: SpinNoIrq, - /// Owned this Processor task num - task_nr: AtomicUsize, + /// spinraw: migrate may be happend on different cpu + /// cpu1 add task to cpu2, so percpu and no preempt are not enough + /// to protect scheduler + scheduler: SpinRaw, /// The exited task-queue of the current processor - exited_tasks: SpinNoIrq>, + exited_tasks: SpinRaw>, /// GC wait or notify use gc_wait: WaitQueue, - /// Pre save ctx when processor switch ctx - prev_ctx_save: SpinNoIrq, /// The idle task of the processor idle_task: AxTaskRef, /// The gc task of the processor @@ -52,12 +77,10 @@ impl Processor { ); Processor { - scheduler: SpinNoIrq::new(Scheduler::new()), + scheduler: SpinRaw::new(Scheduler::new()), idle_task, - prev_ctx_save: SpinNoIrq::new(PrevCtxSave::new_empty()), - exited_tasks: SpinNoIrq::new(VecDeque::new()), + exited_tasks: SpinRaw::new(VecDeque::new()), gc_wait: WaitQueue::new(), - task_nr: AtomicUsize::new(0), gc_task: gc_task, } } @@ -66,19 +89,88 @@ impl Processor { &self.idle_task } - pub(crate) fn kick_exited_task(&self, task: &AxTaskRef) { - self.exited_tasks.lock().push_back(task.clone()); - self.task_nr.fetch_sub(1, Ordering::Acquire); - self.gc_wait.notify_one(); + #[inline] + /// Pick one task from processor + pub(crate) fn pick_next_task(&mut self) -> AxTaskRef { + self.scheduler.lock().pick_next_task() + .unwrap_or_else(|| self.idle_task.clone()) + } + + pub(crate) fn put_prev_task(&mut self, task: AxTaskRef, front: bool) { + self.scheduler.lock().put_prev_task(task, front); + } + + + #[inline] + /// Processor Clean + pub(crate) fn task_tick(&mut self, task: &AxTaskRef) -> bool { + self.scheduler.lock().task_tick(task) } - pub(crate) fn clean_task_wait(&self) { + #[inline] + /// Processor Clean + pub(crate) fn set_priority(&mut self, task: &AxTaskRef, prio: isize) -> bool { + self.scheduler.lock().set_priority(task, prio) + } + + #[inline] + /// Processor Clean + fn clean(&mut self) { + self.exited_tasks.lock().clear() + } +} + +fn get_processor(index: usize) -> &'static mut Processor{ + unsafe { PROCESSORS[index].assume_init_mut() } +} + +fn select_processor_index(cpumask: CpuMask) -> usize { + static PROCESSOR_INDEX: AtomicUsize = AtomicUsize::new(0); + assert!(!cpumask.is_empty(), "No available CPU for task execution"); + + // Round-robin selection of the processor index. + loop { + let index = PROCESSOR_INDEX.fetch_add(1, Ordering::SeqCst) % axconfig::SMP; + if cpumask.get(index) { + return index; + } + } +} + +pub(crate) fn select_processor(task: &AxTaskRef) -> AxProcessorRef<'static, G> { + let irq_state = G::acquire(); + let index = select_processor_index(task.cpumask()); + AxProcessorRef { + inner: get_processor(index), + state: irq_state, + _phantom: core::marker::PhantomData, + } +} + +/// `AxProcessorRef` +pub struct AxProcessorRef<'a, G: BaseGuard> { + inner: &'a mut Processor, + state: G::State, + _phantom: core::marker::PhantomData, +} + +impl<'a, G: BaseGuard> Drop for AxProcessorRef<'a, G> { + fn drop(&mut self) { + G::release(self.state); + } +} + +/// The interfaces here are all called through current_processor +/// to ensure that CPU preemption does not occur during the call +impl<'a, G: BaseGuard> AxProcessorRef<'a, G> { + pub(crate) fn clean_task_wait(&mut self) { loop { // Drop all exited tasks and recycle resources. - let n = self.exited_tasks.lock().len(); + let mut exit_tasks = self.inner.exited_tasks.lock(); + let n = exit_tasks.len(); for _ in 0..n { // Do not do the slow drops in the critical section. - let task = self.exited_tasks.lock().pop_front(); + let task = exit_tasks.pop_front(); if let Some(task) = task { if Arc::strong_count(&task) == 1 { // If I'm the last holder of the task, drop it immediately. @@ -87,152 +179,228 @@ impl Processor { } else { // Otherwise (e.g, `switch_to` is not compeleted, held by the // joiner, etc), push it back and wait for them to drop first. - self.exited_tasks.lock().push_back(task); + exit_tasks.push_back(task); } } } + drop(exit_tasks); // gc wait other task exit - self.gc_wait.wait(); + self.inner.gc_wait.wait(); } } - #[inline] - /// Pick one task from processor - pub(crate) fn pick_next_task(&self) -> AxTaskRef { - self.scheduler - .lock() - .pick_next_task() - .unwrap_or_else(|| self.idle_task.clone()) + fn kick_exited_task(&mut self, task: &AxTaskRef) { + self.inner.exited_tasks.lock().push_back(task.clone()); + self.inner.gc_wait.notify_one(); } #[inline] - /// Add curr task to Processor, it ususally add to back - pub(crate) fn put_prev_task(&self, task: AxTaskRef, front: bool) { - self.scheduler.lock().put_prev_task(task, front); + /// gc init + fn gc_init(&mut self) { + self.inner.gc_task.set_cpumask(CpuMask::one_shot(this_cpu_id())); + self.add_task(self.inner.gc_task.clone()); } - #[inline] - /// Add task to processor, now just put it to own processor - /// TODO: support task migrate on differ processor - pub(crate) fn add_task(task: AxTaskRef) { - task.get_processor().scheduler.lock().add_task(task); + /// post switch + pub(crate) fn switch_post(&mut self) { + unsafe { + crate::current().clear_prev_task_on_cpu(); + } } - #[inline] - /// Processor Clean - pub(crate) fn task_tick(&self, task: &AxTaskRef) -> bool { - self.scheduler.lock().task_tick(task) + /// Add task to processor + pub(crate) fn add_task(&mut self, task: AxTaskRef) { + self.inner.scheduler.lock().add_task(task); } - #[inline] - /// Processor Clean - pub(crate) fn set_priority(&self, task: &AxTaskRef, prio: isize) -> bool { - self.scheduler.lock().set_priority(task, prio) + #[cfg(feature = "irq")] + pub fn scheduler_timer_tick(&mut self) { + let curr = crate::current(); + if !curr.is_idle() && self.inner.task_tick(curr.as_task_ref()) { + #[cfg(feature = "preempt")] + curr.set_preempt_pending(true); + } } - #[inline] - /// update prev_ctx_save when ctx_switch - pub(crate) fn set_prev_ctx_save(&self, prev_save: PrevCtxSave) { - *self.prev_ctx_save.lock() = prev_save; + pub fn set_current_priority(&mut self, prio: isize) -> bool { + self.inner.set_priority(crate::current().as_task_ref(), prio) } - #[inline] - /// post process prev_ctx_save - pub(crate) fn switch_post(&self) { - let mut prev_ctx = self.prev_ctx_save.lock(); - if let Some(prev_lock_state) = prev_ctx.get_mut().take() { - // Note the lock sequence: prev_lock_state.lock -> prev_ctx_save.lock -> - // prev_ctx_save.unlock -> prev_lock_state.unlock - drop(prev_ctx); - ManuallyDrop::into_inner(prev_lock_state); + pub fn wakeup_task(&mut self, task: AxTaskRef) { + if task.transition_state(TaskState::Blocked, TaskState::Runable) { + debug!("task unblock: {}", task.id_name()); + while task.on_cpu() { + // Wait for the task to finish its scheduling process. + core::hint::spin_loop(); + } + self.add_task(task.clone()); } else { - panic!("no prev ctx"); + debug!("try to wakeup {:?} unexpect state {:?}", + task.id(), task.state()); } + } - #[cfg(feature = "irq")] - { - let curr = crate::current(); - match curr.get_irq_state() { - true => axhal::arch::enable_irqs(), - false => axhal::arch::disable_irqs(), - } - } + + #[cfg(feature = "irq")] + pub fn schedule_timeout(&mut self, deadline: axhal::time::TimeValue) -> bool { + let curr = crate::current(); + debug!("task sleep: {}, deadline={:?}", curr.id_name(), deadline); + assert!(!curr.is_idle()); + crate::timers::set_alarm_wakeup(deadline, curr.clone()); + self.reschedule(); + let timeout = axhal::time::current_time() >= deadline; + // may wake up by others + crate::timers::cancel_alarm(curr.as_task_ref()); + timeout } - #[inline] - /// Processor Clean - fn clean(&self) { - self.exited_tasks.lock().clear() + pub(crate) fn yield_current(&mut self) { + let curr = crate::current(); + assert!(curr.is_runable()); + trace!("task yield: {}", curr.id_name()); + self.reschedule(); } - #[inline] - /// Processor Clean all - pub fn clean_all() { - for p in PROCESSORS.lock().iter() { - p.clean() + pub(crate) fn exit_current(&mut self, exit_code: i32) -> ! { + let curr = crate::current(); + debug!("task exit: {}, exit_code={}", curr.id_name(), exit_code); + curr.set_state(TaskState::Exited); + // maybe others join on this thread + // must set state before notify wait_exit + notify_wait_for_exit(curr.as_task_ref()); + + self.kick_exited_task(curr.as_task_ref()); + if curr.is_init() { + self.inner.clean(); + axhal::misc::terminate(); + } else { + curr.set_exit_code(exit_code); + self.reschedule(); } + unreachable!("task exited!"); } - #[inline] - /// First add task to processor - pub fn first_add_task(task: AxTaskRef) { - let p = Processor::select_one_processor(); - task.init_processor(p); - p.scheduler.lock().add_task(task); - p.task_nr.fetch_add(1, Ordering::Relaxed); - } + pub fn reschedule(&mut self) { + let curr = crate::current(); + if curr.is_runable() { + if !curr.is_idle() { + #[cfg(feature = "preempt")] + self.inner.put_prev_task(curr.clone(), curr.get_preempt_pending()); + #[cfg(not(feature = "preempt"))] + self.inner.put_prev_task(curr.clone(), false); + } + } - #[inline] - /// gc init - pub(crate) fn gc_init(&'static self) { - self.gc_task.init_processor(&self); - self.scheduler.lock().add_task(self.gc_task.clone()); + let next_task = self.inner.pick_next_task(); + self.switch_to(curr, next_task); } - #[inline] - /// Add task to processor - fn select_one_processor() -> &'static Processor { - PROCESSORS - .lock() - .iter() - .min_by_key(|p| p.task_nr.load(Ordering::Acquire)) - .unwrap() + + fn switch_to(&mut self, prev_task: CurrentTask, next_task: AxTaskRef) { + // task in a disable_preempt context? it not allowed ctx switch + // switch to happend in current_processor:: + #[cfg(feature = "preempt")] + assert!( + prev_task.preempt_num() == 1 , + "task can_preempt failed {}", + prev_task.id_name() + ); + + + #[cfg(feature = "preempt")] + //reset preempt pending + next_task.set_preempt_pending(false); + + if prev_task.ptr_eq(&next_task) { + return; + } + + if prev_task.is_blocked() { + debug!("task block: {}", prev_task.id_name()); + } + // 当任务进行切换时,更新两个任务的时间统计信息 + #[cfg(feature = "monolithic")] + { + let current_timestamp = axhal::time::current_time_nanos() as usize; + next_task.time_stat_when_switch_to(current_timestamp); + prev_task.time_stat_when_switch_from(current_timestamp); + } + + // Claim the task as running, we do this before switching to it + // such that any running task will have this set. + next_task.set_on_cpu(true); + + + trace!( + "context switch: {} -> {}", + prev_task.id_name(), + next_task.id_name(), + ); + unsafe { + let prev_ctx_ptr = prev_task.ctx_mut_ptr(); + let next_ctx_ptr = next_task.ctx_mut_ptr(); + + // The strong reference count of `prev_task` will be decremented by 1, + // but won't be dropped until `gc_entry()` is called. + assert!( + Arc::strong_count(prev_task.as_task_ref()) > 1, + "task id {} strong count {}", + prev_task.id().as_u64(), + Arc::strong_count(prev_task.as_task_ref()) + ); + + assert!(Arc::strong_count(&next_task) >= 1); + #[cfg(feature = "monolithic")] + { + let page_table_token = *next_task.page_table_token.get(); + if page_table_token != 0 { + axhal::arch::write_page_table_root0(page_table_token.into()); + } + } + + // Store the weak pointer of **prev_task** in **next_task**'s struct. + next_task.set_prev_task(prev_task.clone()); + CurrentTask::set_current(prev_task, next_task); + + axhal::arch::task_context_switch(&mut (*prev_ctx_ptr), &(*next_ctx_ptr)); + self.switch_post(); + + } } } -pub fn current_processor() -> &'static Processor { - unsafe { PROCESSOR.current_ref_raw() } +/// current processor protect by a irq Guard, +/// protect when it is used,no cpu switch happend +pub fn current_processor() -> AxProcessorRef<'static, G> { + let irq_state = G::acquire(); + AxProcessorRef { + inner: unsafe { PROCESSOR.current_ref_mut_raw() }, + state: irq_state, + _phantom: core::marker::PhantomData, + } } -pub(crate) struct PrevCtxSave(Option>>); +pub(crate) struct PrevCtxSave(Option); impl PrevCtxSave { - pub(crate) fn new( - prev_lock_state: ManuallyDrop>, - ) -> PrevCtxSave { - Self(Some(prev_lock_state)) + pub(crate) fn new(prev_task: AxTaskRef) -> PrevCtxSave { + Self(Some(prev_task)) } const fn new_empty() -> PrevCtxSave { Self(None) } - #[allow(unused)] - pub(crate) fn get(&self) -> &Option>> { - &self.0 - } - - pub(crate) fn get_mut( - &mut self, - ) -> &mut Option>> { - &mut self.0 + pub(crate) fn take_prev_task(&mut self) -> Option { + self.0.take() } } fn gc_entry() { - current_processor().clean_task_wait(); + current_processor::().clean_task_wait(); } + pub(crate) fn init() { const IDLE_TASK_STACK_SIZE: usize = 4096; @@ -252,10 +420,11 @@ pub(crate) fn init() { let processor = Processor::new(idle_task.clone()); PROCESSOR.with_current(|i| i.init_by(processor)); - current_processor().gc_init(); - PROCESSORS.lock().push_back(current_processor()); + current_processor::().gc_init(); - main_task.init_processor(current_processor()); + unsafe { + PROCESSORS[this_cpu_id()].write(PROCESSOR.current_ref_mut_raw()); + } unsafe { CurrentTask::init_current(main_task) } } @@ -268,8 +437,9 @@ pub(crate) fn init_secondary() { let processor = Processor::new(idle_task.clone()); PROCESSOR.with_current(|i| i.init_by(processor)); - current_processor().gc_init(); - PROCESSORS.lock().push_back(current_processor()); - + current_processor::().gc_init(); + unsafe { + PROCESSORS[this_cpu_id()].write(PROCESSOR.current_ref_mut_raw()); + } unsafe { CurrentTask::init_current(idle_task) }; } diff --git a/src/schedule.rs b/src/schedule.rs deleted file mode 100644 index 20aa484..0000000 --- a/src/schedule.rs +++ /dev/null @@ -1,221 +0,0 @@ -use alloc::{collections::BTreeMap, sync::Arc}; - -use core::mem::ManuallyDrop; - -use crate::processor::{current_processor, PrevCtxSave, Processor}; -use crate::task::{CurrentTask, TaskState}; -use crate::{AxTaskRef, WaitQueue}; -use spinlock::{SpinNoIrq, SpinNoIrqOnlyGuard}; - -/// A map to store tasks' wait queues, which stores tasks that are waiting for this task to exit. -pub(crate) static WAIT_FOR_TASK_EXITS: SpinNoIrq>> = - SpinNoIrq::new(BTreeMap::new()); - -pub(crate) fn add_wait_for_exit_queue(task: &AxTaskRef) { - WAIT_FOR_TASK_EXITS - .lock() - .insert(task.id().as_u64(), Arc::new(WaitQueue::new())); -} - -pub(crate) fn get_wait_for_exit_queue(task: &AxTaskRef) -> Option> { - WAIT_FOR_TASK_EXITS.lock().get(&task.id().as_u64()).cloned() -} - -/// When the task exits, notify all tasks that are waiting for this task to exit, and -/// then remove the wait queue of the exited task. -pub(crate) fn notify_wait_for_exit(task: &AxTaskRef) { - if let Some(wait_queue) = WAIT_FOR_TASK_EXITS.lock().remove(&task.id().as_u64()) { - wait_queue.notify_all(); - } -} - -pub(crate) fn exit_current(exit_code: i32) -> ! { - let curr = crate::current(); - debug!("task exit: {}, exit_code={}", curr.id_name(), exit_code); - curr.set_state(TaskState::Exited); - - // maybe others join on this thread - // must set state before notify wait_exit - notify_wait_for_exit(curr.as_task_ref()); - - current_processor().kick_exited_task(curr.as_task_ref()); - if curr.is_init() { - Processor::clean_all(); - axhal::misc::terminate(); - } else { - curr.set_exit_code(exit_code); - schedule(); - } - unreachable!("task exited!"); -} - -pub(crate) fn yield_current() { - let curr = crate::current(); - assert!(curr.is_runable()); - trace!("task yield: {}", curr.id_name()); - schedule(); -} - -#[cfg(feature = "irq")] -pub fn schedule_timeout(deadline: axhal::time::TimeValue) -> bool { - let curr = crate::current(); - debug!("task sleep: {}, deadline={:?}", curr.id_name(), deadline); - assert!(!curr.is_idle()); - crate::timers::set_alarm_wakeup(deadline, curr.clone()); - schedule(); - let timeout = axhal::time::current_time() >= deadline; - // may wake up by others - crate::timers::cancel_alarm(curr.as_task_ref()); - timeout -} - -#[cfg(feature = "irq")] -pub fn scheduler_timer_tick() { - let curr = crate::current(); - if !curr.is_idle() && current_processor().task_tick(curr.as_task_ref()) { - #[cfg(feature = "preempt")] - curr.set_preempt_pending(true); - } -} - -pub fn set_current_priority(prio: isize) -> bool { - current_processor().set_priority(crate::current().as_task_ref(), prio) -} - -pub fn wakeup_task(task: AxTaskRef) { - let mut state = task.state_lock_manual(); - match **state { - TaskState::Blocking => **state = TaskState::Runable, - TaskState::Runable => (), - TaskState::Blocked => { - debug!("task unblock: {}", task.id_name()); - **state = TaskState::Runable; - ManuallyDrop::into_inner(state); - // may be other processor wake up - Processor::add_task(task.clone()); - return; - } - _ => panic!("try to wakeup {:?} unexpect state {:?}", - task.id(), **state), - } - ManuallyDrop::into_inner(state); -} - -pub fn schedule() { - let next_task = current_processor().pick_next_task(); - switch_to(next_task); -} - -fn switch_to(mut next_task: AxTaskRef) { - let prev_task = crate::current(); - - // task in a disable_preempt context? it not allowed ctx switch - #[cfg(feature = "preempt")] - assert!( - prev_task.can_preempt(), - "task can_preempt failed {}", - prev_task.id_name() - ); - - - // When the prev_task state_lock is locked, it records the irq configuration of - // the prev_task at that time, after swich(in switch_post) it would be unlocked, - // and restore the irq configuration to the lock_state store(NOTE: it own the prev_task). - // - // so have to save the prev_task irq config here,and restore it after swich_post - #[cfg(feature = "irq")] - prev_task.set_irq_state(axhal::arch::irqs_enabled()); - - // Here must lock curr state, and no one can change curr state - // when excuting ctx_switch - let mut prev_state_lock = prev_task.state_lock_manual(); - - loop { - match **prev_state_lock { - TaskState::Runable => { - if next_task.is_idle() { - next_task = prev_task.clone(); - break; - } - if !prev_task.is_idle() { - #[cfg(feature = "preempt")] - current_processor() - .put_prev_task(prev_task.clone(), prev_task.get_preempt_pending()); - #[cfg(not(feature = "preempt"))] - current_processor().put_prev_task(prev_task.clone(), false); - } - break; - } - TaskState::Blocking => { - debug!("task block: {}", prev_task.id_name()); - **prev_state_lock = TaskState::Blocked; - break; - } - TaskState::Exited => { - break; - } - _ => { - panic!("unexpect state when switch_to happend "); - } - } - } - - #[cfg(feature = "preempt")] - //reset preempt pending - next_task.set_preempt_pending(false); - - if prev_task.ptr_eq(&next_task) { - ManuallyDrop::into_inner(prev_state_lock); - return; - } - - // 当任务进行切换时,更新两个任务的时间统计信息 - #[cfg(feature = "monolithic")] - { - let current_timestamp = axhal::time::current_time_nanos() as usize; - next_task.time_stat_when_switch_to(current_timestamp); - prev_task.time_stat_when_switch_from(current_timestamp); - } - - trace!( - "context switch: {} -> {}", - prev_task.id_name(), - next_task.id_name(), - ); - unsafe { - let prev_ctx_ptr = prev_task.ctx_mut_ptr(); - let next_ctx_ptr = next_task.ctx_mut_ptr(); - - // The strong reference count of `prev_task` will be decremented by 1, - // but won't be dropped until `gc_entry()` is called. - assert!( - Arc::strong_count(prev_task.as_task_ref()) > 1, - "task id {} strong count {}", - prev_task.id().as_u64(), - Arc::strong_count(prev_task.as_task_ref()) - ); - - assert!(Arc::strong_count(&next_task) >= 1); - #[cfg(feature = "monolithic")] - { - let page_table_token = *next_task.page_table_token.get(); - if page_table_token != 0 { - axhal::arch::write_page_table_root0(page_table_token.into()); - } - } - - let prev_ctx = PrevCtxSave::new(core::mem::transmute::< - ManuallyDrop>, - ManuallyDrop>, - >(prev_state_lock)); - - current_processor().set_prev_ctx_save(prev_ctx); - - CurrentTask::set_current(prev_task, next_task); - - axhal::arch::task_context_switch(&mut (*prev_ctx_ptr), &(*next_ctx_ptr)); - - current_processor().switch_post(); - - } -} diff --git a/src/task.rs b/src/task.rs index b38e0ce..93e33fa 100644 --- a/src/task.rs +++ b/src/task.rs @@ -3,20 +3,27 @@ use alloc::{string::String, sync::Arc}; use core::{mem::ManuallyDrop, ops::Deref}; use alloc::boxed::Box; +use core::cell::UnsafeCell; +use alloc::sync::Weak; + +use kernel_guard::NoPreemptIrqSave; use memory_addr::VirtAddr; #[cfg(feature = "monolithic")] use axhal::arch::TrapFrame; use crate::{ - current_processor, processor::Processor, schedule::add_wait_for_exit_queue, AxTask, AxTaskRef, + processor::{add_wait_for_exit_queue,current_processor}, + AxTask, + AxTaskRef, + CpuMask, }; pub use taskctx::{TaskId, TaskInner}; -use spinlock::{SpinNoIrq, SpinNoIrqOnly, SpinNoIrqOnlyGuard}; -use core::sync::atomic::{AtomicBool, Ordering}; +use spinlock::SpinNoIrq; +use core::sync::atomic::{AtomicU8, AtomicBool, Ordering}; extern "C" { fn _stdata(); @@ -35,100 +42,129 @@ pub(crate) fn tls_area() -> (usize, usize) { #[allow(missing_docs)] pub enum TaskState { Runable = 1, - Blocking = 2, - Blocked = 3, - Exited = 4, + Blocked = 2, + Exited = 3, +} + + +impl From for TaskState { + #[inline] + fn from(state: u8) -> Self { + match state { + 1 => Self::Runable, + 2 => Self::Blocked, + 3 => Self::Exited, + _ => unreachable!(), + } + } } pub struct ScheduleTask { inner: TaskInner, - /// Store task irq state - irq_state: AtomicBool, /// Task state - state: SpinNoIrqOnly, - /// Task own which Processor - processor: SpinNoIrq>, + state: AtomicU8, + /// On-Cpu flag + on_cpu: AtomicBool, + /// CPU affinity mask + cpumask: SpinNoIrq, + + /// A weak reference to the previous task running on this CPU. + prev_task: UnsafeCell>, } +unsafe impl Send for ScheduleTask {} +unsafe impl Sync for ScheduleTask {} + impl ScheduleTask { - fn new(inner: TaskInner, irq_init_state: bool) -> Self { + fn new(inner: TaskInner, on_cpu: bool, cpu_mask: CpuMask) -> Self { Self { - state: SpinNoIrqOnly::new(TaskState::Runable), - processor: SpinNoIrq::new(None), - irq_state: AtomicBool::new(irq_init_state), + state: AtomicU8::new(TaskState::Runable as u8), inner: inner, + on_cpu: AtomicBool::new(on_cpu), + // By default, the task is allowed to run on all CPUs. + cpumask: SpinNoIrq::new(cpu_mask), + prev_task: UnsafeCell::new(Weak::default()), } } #[inline] - /// lock the task state and ctx_ptr access - pub fn state_lock_manual(&self) -> ManuallyDrop> { - ManuallyDrop::new(self.state.lock()) + pub(crate) fn state(&self) -> TaskState { + self.state.load(Ordering::Acquire).into() } + /// Transition the task state from `current_state` to `new_state`, + /// Returns `true` if the current state is `current_state` and the state is successfully set to `new_state`, + /// otherwise returns `false`. #[inline] - /// set the state of the task - pub fn state(&self) -> TaskState { - *self.state.lock() + pub(crate) fn transition_state(&self, current_state: TaskState, new_state: TaskState) -> bool { + self.state + .compare_exchange( + current_state as u8, + new_state as u8, + Ordering::AcqRel, + Ordering::Acquire, + ) + .is_ok() } #[inline] - /// set the state of the task - pub fn set_state(&self, state: TaskState) { - *self.state.lock() = state + pub(crate) fn set_state(&self, state: TaskState) { + self.state.store(state as u8, Ordering::Release) } - /// Whether the task is Exited #[inline] - pub fn is_exited(&self) -> bool { - matches!(*self.state.lock(), TaskState::Exited) + pub(crate) fn on_cpu(&self) -> bool { + self.on_cpu.load(Ordering::Acquire) } - /// Whether the task is runnalbe #[inline] - pub fn is_runable(&self) -> bool { - matches!(*self.state.lock(), TaskState::Runable) + pub(crate) fn set_on_cpu(&self, on_cpu: bool) { + self.on_cpu.store(on_cpu, Ordering::Release); } - /// Whether the task is blocking - #[inline] - pub fn is_blocking(&self) -> bool { - matches!(*self.state.lock(), TaskState::Blocking) + /// Stores a weak reference to the previous task running on this CPU. + /// + /// ## Safety + /// This function is only called by current task in `switch_to`. + pub unsafe fn set_prev_task(&self, prev_task: Arc) { + *self.prev_task.get() = Arc::downgrade(&prev_task); } - /// Whether the task is blocked + pub unsafe fn clear_prev_task_on_cpu(&self) { + self.prev_task + .get() + .as_ref() + .expect("Invalid prev_task pointer") + .upgrade() + .expect("prev_task is dropped") + .set_on_cpu(false); + } + + /// Whether the task is Exited #[inline] - pub fn is_blocked(&self) -> bool { - matches!(*self.state.lock(), TaskState::Blocked) + pub fn is_exited(&self) -> bool { + matches!(self.state(), TaskState::Exited) } - /// Whether the task is blocked #[inline] - pub(crate) fn init_processor(&self, p: &'static Processor) { - *self.processor.lock() = Some(p); + pub fn is_runable(&self) -> bool { + matches!(self.state(), TaskState::Runable) } /// Whether the task is blocked #[inline] - pub(crate) fn get_processor(&self) -> &'static Processor { - self.processor - .lock() - .as_ref() - .expect("task {} processor not init") + pub fn is_blocked(&self) -> bool { + matches!(self.state(), TaskState::Blocked) } - /// set irq state - #[cfg(feature = "irq")] #[inline] - pub(crate) fn set_irq_state(&self, irq_state: bool) { - self.irq_state.store(irq_state,Ordering::Relaxed); + pub(crate) fn cpumask(&self) -> CpuMask { + *self.cpumask.lock() } - /// get irq state - #[cfg(feature = "irq")] #[inline] - pub(crate) fn get_irq_state(&self) -> bool { - self.irq_state.load(Ordering::Relaxed) + pub(crate) fn set_cpumask(&self, cpumask: CpuMask) { + *self.cpumask.lock() = cpumask } } @@ -185,13 +221,11 @@ where tls, ); - // 设置 CPU 亲和集 - task.set_cpu_set((1 << axconfig::SMP) - 1, 1, axconfig::SMP); - task.reset_time_stat(current_time_nanos() as usize); // a new task start, irq should be enabled by default - let axtask = Arc::new(AxTask::new(ScheduleTask::new(task,true))); + let axtask = Arc::new(AxTask::new(ScheduleTask::new(task, false,CpuMask::full()))); + add_wait_for_exit_queue(&axtask); axtask } @@ -225,7 +259,8 @@ where tls, ); // a new task start, irq should be enabled by default - let axtask = Arc::new(AxTask::new(ScheduleTask::new(task, true))); + let axtask = Arc::new(AxTask::new(ScheduleTask::new(task, false, + CpuMask::full()))); add_wait_for_exit_queue(&axtask); axtask } @@ -239,13 +274,10 @@ pub(crate) fn new_init_task(name: String) -> AxTaskRef { #[cfg(feature = "tls")] tls_area(), ), - false, + true, + CpuMask::full() ))); - #[cfg(feature = "monolithic")] - // 设置 CPU 亲和集 - axtask.set_cpu_set((1 << axconfig::SMP) - 1, 1, axconfig::SMP); - add_wait_for_exit_queue(&axtask); axtask } @@ -306,7 +338,10 @@ extern "C" fn task_entry() -> ! { // SAFETY: INIT when switch_to // First into task entry, manually perform the subsequent work of switch_to - current_processor().switch_post(); + current_processor::().switch_post(); + + #[cfg(feature = "irq")] + axhal::arch::enable_irqs(); let task = crate::current(); if let Some(entry) = task.get_entry() { diff --git a/src/timers.rs b/src/timers.rs index d4c2ea8..b01859c 100644 --- a/src/timers.rs +++ b/src/timers.rs @@ -4,7 +4,7 @@ use lazy_init::LazyInit; use spinlock::SpinNoIrq; use timer_list::{TimeValue, TimerEvent, TimerList}; -use crate::{AxTaskRef,TaskState}; +use crate::{AxTaskRef,TaskState,wakeup_task}; // TODO: per-CPU static TIMER_LIST: LazyInit>> = LazyInit::new(); @@ -13,13 +13,13 @@ struct TaskWakeupEvent(AxTaskRef); impl TimerEvent for TaskWakeupEvent { fn callback(self, _now: TimeValue) { - crate::schedule::wakeup_task(self.0); + wakeup_task(self.0); } } pub fn set_alarm_wakeup(deadline: TimeValue, task: AxTaskRef) { let mut timer_list = TIMER_LIST.lock(); - task.set_state(TaskState::Blocking); + task.set_state(TaskState::Blocked); timer_list.set(deadline, TaskWakeupEvent(task)); drop(timer_list) } diff --git a/src/wait_list.rs b/src/wait_list.rs index 0e1fb1b..4e7713a 100644 --- a/src/wait_list.rs +++ b/src/wait_list.rs @@ -2,8 +2,7 @@ use alloc::sync::Arc; use core::ops::Deref; use crate::AxTaskRef; use crate::task::TaskState; -use crate::schedule::wakeup_task; - +use crate::wakeup_task; use linked_list::{GetLinks, Links, List}; /// A task wrapper. @@ -64,7 +63,7 @@ impl WaitTaskList { /// add wait to list back pub fn prepare_to_wait(&mut self, task: Arc) { - task.inner().set_state(TaskState::Blocking); + task.inner().set_state(TaskState::Blocked); self.list.push_back(task); } diff --git a/src/wait_queue.rs b/src/wait_queue.rs index 51b0e5c..ff203c0 100644 --- a/src/wait_queue.rs +++ b/src/wait_queue.rs @@ -1,11 +1,10 @@ -#[cfg(feature = "irq")] -use crate::schedule::schedule_timeout; -use crate::schedule::schedule; use spinlock::SpinNoIrq; use crate::wait_list::WaitTaskList; use crate::wait_list::WaitTaskNode; use crate::AxTaskRef; use alloc::sync::Arc; +use crate::processor::current_processor; +use kernel_guard::NoPreemptIrqSave; /// A queue to store sleeping tasks. /// /// # Examples @@ -73,7 +72,7 @@ impl WaitQueue { pub fn wait(&self) { declare_wait!(waiter); self.queue.lock().prepare_to_wait(waiter.clone()); - schedule(); + current_processor::().reschedule(); // maybe wakeup by signal or others, try to delete again // 1. starry support UNINTERRUPT mask, no need to check @@ -98,7 +97,7 @@ impl WaitQueue { // maybe wakeup by signal or others, should check before push // wait_list will do check self.queue.lock().prepare_to_wait(waiter.clone()); - schedule(); + current_processor::().reschedule(); } //maybe wakeup by signal or others, try to delete again @@ -113,7 +112,7 @@ impl WaitQueue { let deadline = axhal::time::current_time() + dur; self.queue.lock().prepare_to_wait(waiter.clone()); - let timeout = schedule_timeout(deadline); + let timeout = current_processor::().schedule_timeout(deadline); //maybe wakeup by timer or signal, try to delete again self.queue.lock().remove(&waiter); @@ -139,7 +138,7 @@ impl WaitQueue { } //maybe wakeup by signal or others, should check before push self.queue.lock().prepare_to_wait(waiter.clone()); - timeout = schedule_timeout(deadline); + timeout = current_processor::().schedule_timeout(deadline); if timeout { break; }