diff --git a/src/percpu.rs b/src/percpu.rs --- a/src/percpu.rs +++ b/src/percpu.rs @@ -100,6 +100,14 @@ static ALL_PERCPU_BLOCKS: [AtomicPtr; MAX_CPU_COUNT as usize] = pub unsafe fn init_tlb_shootdown(id: LogicalCpuId, block: *mut PercpuBlock) { ALL_PERCPU_BLOCKS[id.get() as usize].store(block, Ordering::Release) } + +pub fn get_percpu_block(id: LogicalCpuId) -> Option<&'static PercpuBlock> { + unsafe { + ALL_PERCPU_BLOCKS[id.get() as usize] + .load(Ordering::Acquire) + .as_ref() + } +} pub fn get_all_stats() -> Vec<(LogicalCpuId, CpuStatsData)> { diff --git a/src/context/switch.rs b/src/context/switch.rs --- a/src/context/switch.rs +++ b/src/context/switch.rs @@ -7,15 +7,15 @@ use crate::{ self, arch, idle_contexts, idle_contexts_try, run_contexts, ArcContextLockWriteGuard, Context, ContextLock, SchedPolicy, WeakContextRef, RUN_QUEUE_COUNT, }, - cpu_set::LogicalCpuId, + cpu_set::{LogicalCpuId, LogicalCpuSet}, cpu_stats::{self, CpuState}, - percpu::{PerCpuSched, PercpuBlock}, + percpu::{get_percpu_block, PerCpuSched, PercpuBlock}, sync::{ArcRwLockWriteGuard, CleanLockToken, LockToken, L1, L4}, }; use alloc::{sync::Arc, vec::Vec}; use core::{ cell::{Cell, RefCell}, hint, mem, - sync::atomic::Ordering, + sync::atomic::{AtomicUsize, Ordering}, }; use syscall::PtraceFlags; @@ +static SCHED_STEAL_COUNT: AtomicUsize = AtomicUsize::new(0); + +fn assign_context_to_cpu(context: &mut Context, cpu_id: LogicalCpuId) { + context.sched_affinity = LogicalCpuSet::empty(); + context.sched_affinity.atomic_set(cpu_id); +} @@ +fn pop_movable_context( + token: &mut CleanLockToken, + queues: &mut [alloc::collections::VecDeque; RUN_QUEUE_COUNT], + target_cpu: LogicalCpuId, + switch_time: u128, + idle_context: &Arc, +) -> Option<(usize, WeakContextRef)> { + for prio in 0..RUN_QUEUE_COUNT { + let len = queues[prio].len(); + for _ in 0..len { + let Some(context_ref) = queues[prio].pop_front() else { + break; + }; + let Some(context_lock) = context_ref.upgrade() else { + continue; + }; + if Arc::ptr_eq(&context_lock, idle_context) { + queues[prio].push_back(context_ref); + continue; + } + + let mut context_guard = unsafe { context_lock.write_arc() }; + let sw = unsafe { update_stealable(&mut context_guard, switch_time) }; + if let UpdateResult::CanSwitch = sw { + assign_context_to_cpu(&mut context_guard, target_cpu); + let moved_ref = WeakContextRef(Arc::downgrade(ArcContextLockWriteGuard::rwlock( + &context_guard, + ))); + drop(context_guard); + return Some((prio, moved_ref)); + } + + if matches!(sw, UpdateResult::Blocked) { + idle_contexts(token.downgrade()).push_back(context_ref); + } else { + queues[prio].push_back(context_ref); + } + } + } + + None +} + +fn steal_work( + token: &mut CleanLockToken, + cpu_id: LogicalCpuId, + switch_time: u128, +) -> Option { + let cpu_count = crate::cpu_count(); + if cpu_count <= 1 { + return None; + } + + for offset in 1..cpu_count { + let victim_id = LogicalCpuId::new((cpu_id.get() + offset) % cpu_count); + let Some(victim) = get_percpu_block(victim_id) else { + continue; + }; + + let victim_idle = victim.switch_internals.idle_context(); + let mut victim_lock = SchedQueuesLock::new(&victim.sched); + let victim_queues = unsafe { victim_lock.queues_mut() }; + + for prio in 0..RUN_QUEUE_COUNT { + let len = victim_queues[prio].len(); + for _ in 0..len { + let Some(context_ref) = victim_queues[prio].pop_front() else { + break; + }; + let Some(context_lock) = context_ref.upgrade() else { + continue; + }; + if Arc::ptr_eq(&context_lock, &victim_idle) { + victim_queues[prio].push_back(context_ref); + continue; + } + + let mut context_guard = unsafe { context_lock.write_arc() }; + let sw = unsafe { update_stealable(&mut context_guard, switch_time) }; + if let UpdateResult::CanSwitch = sw { + assign_context_to_cpu(&mut context_guard, cpu_id); + SCHED_STEAL_COUNT.fetch_add(1, Ordering::Relaxed); + return Some(context_guard); + } + + if matches!(sw, UpdateResult::Blocked) { + idle_contexts(token.downgrade()).push_back(context_ref); + } else { + victim_queues[prio].push_back(context_ref); + } + } + } + } + + None +} + +unsafe fn update_stealable(context: &mut Context, switch_time: u128) -> UpdateResult { + if context.running { + return UpdateResult::Skip; + } + if context.status.is_soft_blocked() + && let Some(wake) = context.wake + && switch_time >= wake + { + context.wake = None; + context.unblock_no_ipi(); + } + if context.status.is_runnable() { + UpdateResult::CanSwitch + } else { + UpdateResult::Blocked + } +} @@ -360,6 +469,10 @@ fn wakeup_contexts(token: &mut CleanLockToken, percpu: &PercpuBlock, switch_time let mut sched_lock = SchedQueuesLock::new(&percpu.sched); let run_queues = unsafe { sched_lock.queues_mut() }; for (prio, context_ref) in wakeups { + if let Some(context_lock) = context_ref.upgrade() { + let mut context_guard = unsafe { context_lock.write_arc() }; + assign_context_to_cpu(&mut context_guard, percpu.cpu_id); + } run_queues[prio].push_back(context_ref); } } @@ -559,6 +672,16 @@ fn select_next_context( ); return Ok(Some(next_context_guard)); } + + if let Some(next_context_guard) = steal_work(token, cpu_id, switch_time) { + queue_previous_context( + token, + percpu, + &prev_context_lock, + prev_context_guard, + &idle_context, + ); + return Ok(Some(next_context_guard)); + } let global_next = { let contexts_data = run_contexts(token.token());