feat: P0-P6 kernel scheduler + relibc threading comprehensive implementation

P0-P2: Barrier SMP, sigmask/pthread_kill races, robust mutexes, RT scheduling, POSIX sched API
P3: PerCpuSched struct, per-CPU wiring, work stealing, load balancing, initial placement
P4: 64-shard futex table, REQUEUE, PI futexes (LOCK_PI/UNLOCK_PI/TRYLOCK_PI), robust futexes, vruntime tracking, min-vruntime SCHED_OTHER selection
P5: setpriority/getpriority, pthread_setaffinity_np, pthread_setname_np, pthread_setschedparam (Redox)
P6: Cache-affine scheduling (last_cpu + vruntime bonus), NUMA topology kernel hints + numad userspace daemon

Stability fixes: make_consistent stores 0 (dead TID fix), cond.rs error propagation, SPIN_COUNT adaptive spinning, Sys::open &str fix, PI futex CAS race, proc.rs lock ordering, barrier destroy

Patches: 33 kernel + 58 relibc patches, all tracked in recipes
Docs: KERNEL-SCHEDULER-MULTITHREAD-IMPROVEMENT-PLAN.md updated, SCHEDULER-REVIEW-FINAL.md created
Architecture: NUMA topology parsing stays userspace (numad daemon), kernel stores lightweight NumaTopology hints
This commit is contained in:
2026-04-30 18:21:48 +01:00
parent 7d5d364c01
commit 2e99d4073b
70 changed files with 15268 additions and 10 deletions
File diff suppressed because it is too large Load Diff
@@ -0,0 +1,13 @@
diff --git a/src/context/mod.rs b/src/context/mod.rs
index 37c73f5..4f5d60f 100644
--- a/src/context/mod.rs
+++ b/src/context/mod.rs
@@ -22,7 +22,7 @@ use crate::{
use self::context::Kstack;
pub use self::{
- context::{BorrowedHtBuf, Context, Status},
+ context::{BorrowedHtBuf, Context, SchedPolicy, Status},
switch::switch,
};
@@ -0,0 +1,152 @@
diff --git a/src/scheme/proc.rs b/src/scheme/proc.rs
index 47588e1..6578761 100644
--- a/src/scheme/proc.rs
+++ b/src/scheme/proc.rs
@@ -1,7 +1,7 @@
use crate::{
context::{
self,
- context::{HardBlockedReason, LockedFdTbl, SignalState},
+ context::{HardBlockedReason, LockedFdTbl, SchedPolicy, SignalState},
file::InternalFlags,
memory::{handle_notify_files, AddrSpace, AddrSpaceWrapper, Grant, PageSpan},
Context, ContextLock, Status,
@@ -105,6 +105,7 @@ enum ContextHandle {
// Attr handles, to set ens/euid/egid/pid.
Authority,
Attr,
+ Groups,
Status {
privileged: bool,
@@ -145,6 +146,7 @@ enum ContextHandle {
// directory.
OpenViaDup,
SchedAffinity,
+ SchedPolicy,
MmapMinAddr(Arc<AddrSpaceWrapper>),
}
@@ -249,6 +251,9 @@ impl ProcScheme {
false,
),
"sched-affinity" => (ContextHandle::SchedAffinity, true),
+ // TODO: Switch this kernel-local proc handle over to a stable upstream
+ // redox_syscall ProcCall::SetSchedPolicy opcode once that lands.
+ "sched-policy" => (ContextHandle::SchedPolicy, false),
"status" => (ContextHandle::Status { privileged: false }, false),
_ if path.starts_with("auth-") => {
let nonprefix = &path["auth-".len()..];
@@ -261,6 +266,7 @@ impl ProcScheme {
let handle = match actual_name {
"attrs" => ContextHandle::Attr,
"status" => ContextHandle::Status { privileged: true },
+ "groups" => ContextHandle::Groups,
_ => return Err(Error::new(ENOENT)),
};
@@ -306,6 +312,11 @@ impl ProcScheme {
let id = NonZeroUsize::new(NEXT_ID.fetch_add(1, Ordering::Relaxed))
.ok_or(Error::new(EMFILE))?;
let context = context::spawn(true, Some(id), ret, token)?;
+ {
+ let parent_groups =
+ context::current().read(token.token()).groups.clone();
+ context.write(token.token()).groups = parent_groups;
+ }
HANDLES.write(token.token()).insert(
id.get(),
Handle {
@@ -1165,6 +1176,20 @@ impl ContextHandle {
Ok(size_of_val(&mask))
}
+ Self::SchedPolicy => {
+ if buf.len() != 2 {
+ return Err(Error::new(EINVAL));
+ }
+
+ let [policy, rt_priority] = unsafe { buf.read_exact::<[u8; 2]>()? };
+ let sched_policy = SchedPolicy::try_from_raw(policy).ok_or(Error::new(EINVAL))?;
+
+ context
+ .write(token.token())
+ .set_sched_policy(sched_policy, rt_priority);
+
+ Ok(2)
+ }
ContextHandle::Status { privileged } => {
let mut args = buf.usizes();
@@ -1268,9 +1293,42 @@ impl ContextHandle {
guard.pid = info.pid as usize;
guard.euid = info.euid;
guard.egid = info.egid;
- guard.prio = (info.prio as usize).min(39);
+ guard.set_sched_other_prio(info.prio as usize);
Ok(size_of::<ProcSchemeAttrs>())
}
+ Self::Groups => {
+ const NGROUPS_MAX: usize = 65536;
+ if buf.len() % size_of::<u32>() != 0 {
+ return Err(Error::new(EINVAL));
+ }
+ let count = buf.len() / size_of::<u32>();
+ if count > NGROUPS_MAX {
+ return Err(Error::new(EINVAL));
+ }
+ let mut groups = Vec::with_capacity(count);
+ for chunk in buf.in_exact_chunks(size_of::<u32>()).take(count) {
+ groups.push(chunk.read_u32()?);
+ }
+ let proc_id = {
+ let guard = context.read(token.token());
+ guard.owner_proc_id
+ };
+ {
+ let mut guard = context.write(token.token());
+ guard.groups = groups.clone();
+ }
+ if let Some(pid) = proc_id {
+ let mut contexts = context::contexts(token.downgrade());
+ let (contexts, mut t) = contexts.token_split();
+ for context_ref in contexts.iter() {
+ let mut ctx = context_ref.write(t.token());
+ if ctx.owner_proc_id == Some(pid) {
+ ctx.groups = groups.clone();
+ }
+ }
+ }
+ Ok(count * size_of::<u32>())
+ }
ContextHandle::OpenViaDup => {
let mut args = buf.usizes();
@@ -1427,6 +1485,11 @@ impl ContextHandle {
buf.copy_exactly(crate::cpu_set::mask_as_bytes(&mask))?;
Ok(size_of_val(&mask))
+ }
+ ContextHandle::SchedPolicy => {
+ let context = context.read(token.token());
+ let data = [context.sched_policy as u8, context.sched_rt_priority];
+ buf.copy_common_bytes_from_slice(&data)
} // TODO: Replace write() with SYS_SENDFD?
ContextHandle::Status { .. } => {
let status = {
@@ -1475,6 +1538,15 @@ impl ContextHandle {
debug_name,
})
}
+ Self::Groups => {
+ let c = &context.read(token.token());
+ let max = buf.len() / size_of::<u32>();
+ let count = c.groups.len().min(max);
+ for (chunk, gid) in buf.in_exact_chunks(size_of::<u32>()).zip(&c.groups).take(count) {
+ chunk.copy_from_slice(&gid.to_ne_bytes())?;
+ }
+ Ok(count * size_of::<u32>())
+ }
ContextHandle::Sighandler => {
let data = match context.read(token.token()).sig {
Some(ref sig) => SetSighandlerData {
@@ -0,0 +1,176 @@
diff --git a/src/context/context.rs b/src/context/context.rs
index c97c516..8a8b078 100644
--- a/src/context/context.rs
+++ b/src/context/context.rs
@@ -18,7 +18,8 @@ use crate::{
cpu_stats,
ipi::{ipi, IpiKind, IpiTarget},
memory::{
- allocate_p2frame, deallocate_p2frame, Enomem, Frame, RaiiFrame, RmmA, RmmArch, PAGE_SIZE,
+ allocate_p2frame, deallocate_p2frame, Enomem, Frame, PhysicalAddress, RaiiFrame, RmmA,
+ RmmArch, PAGE_SIZE,
},
percpu::PercpuBlock,
scheme::{CallerCtx, FileHandle, SchemeId},
@@ -62,6 +63,38 @@ impl Status {
}
}
+pub const SCHED_PRIORITY_LEVELS: usize = 40;
+pub const DEFAULT_SCHED_OTHER_PRIORITY: usize = 20;
+pub const DEFAULT_SCHED_RR_QUANTUM: u128 = 100_000_000;
+
+#[repr(u8)]
+#[derive(Clone, Copy, Debug, PartialEq, Eq)]
+pub enum SchedPolicy {
+ Fifo = 0,
+ RoundRobin = 1,
+ Other = 2,
+}
+
+impl SchedPolicy {
+ pub fn try_from_raw(raw: u8) -> Option<Self> {
+ match raw {
+ 0 => Some(Self::Fifo),
+ 1 => Some(Self::RoundRobin),
+ 2 => Some(Self::Other),
+ _ => None,
+ }
+ }
+}
+
+pub fn rt_priority_to_kernel_prio(rt_priority: u8) -> usize {
+ (SCHED_PRIORITY_LEVELS - 1)
+ .saturating_sub((usize::from(rt_priority.min(99)) * (SCHED_PRIORITY_LEVELS - 1)) / 99)
+}
+
+fn clamp_sched_other_prio(prio: usize) -> usize {
+ prio.min(SCHED_PRIORITY_LEVELS - 1)
+}
+
#[derive(Clone, Debug)]
pub enum HardBlockedReason {
/// "SIGSTOP", only procmgr is allowed to switch contexts this state
@@ -140,6 +173,17 @@ pub struct Context {
pub fmap_ret: Option<Frame>,
/// Priority
pub prio: usize,
+ pub sched_policy: SchedPolicy,
+ pub sched_rt_priority: u8,
+ pub sched_rr_ticks_consumed: u32,
+ pub sched_static_prio: usize,
+ pub sched_rr_quantum: u128,
+ #[allow(dead_code)]
+ pub futex_pi_boost: bool,
+ #[allow(dead_code)]
+ pub futex_pi_original_prio: usize,
+ #[allow(dead_code)]
+ pub futex_pi_waiters: Vec<PhysicalAddress>,
// TODO: id can reappear after wraparound?
pub owner_proc_id: Option<NonZeroUsize>,
@@ -148,6 +192,8 @@ pub struct Context {
pub euid: u32,
pub egid: u32,
pub pid: usize,
+ /// Supplementary group IDs for access control decisions.
+ pub groups: Vec<u32>,
// See [`PreemptGuard`]
//
@@ -197,13 +243,22 @@ impl Context {
files: Arc::new(RwLock::new(FdTbl::new())),
userspace: false,
fmap_ret: None,
- prio: 20,
+ prio: DEFAULT_SCHED_OTHER_PRIORITY,
+ sched_policy: SchedPolicy::Other,
+ sched_rt_priority: 0,
+ sched_rr_ticks_consumed: 0,
+ sched_static_prio: DEFAULT_SCHED_OTHER_PRIORITY,
+ sched_rr_quantum: DEFAULT_SCHED_RR_QUANTUM,
+ futex_pi_boost: false,
+ futex_pi_original_prio: DEFAULT_SCHED_OTHER_PRIORITY,
+ futex_pi_waiters: Vec::new(),
being_sigkilled: false,
owner_proc_id,
euid: 0,
egid: 0,
pid: 0,
+ groups: Vec::new(),
#[cfg(feature = "syscall_debug")]
syscall_debug_info: crate::syscall::debug::SyscallDebugInfo::default(),
@@ -218,11 +273,47 @@ impl Context {
self.preempt_locks == 0
}
+ fn base_sched_prio(&self) -> usize {
+ match self.sched_policy {
+ SchedPolicy::Other => clamp_sched_other_prio(self.sched_static_prio),
+ SchedPolicy::Fifo | SchedPolicy::RoundRobin => {
+ rt_priority_to_kernel_prio(self.sched_rt_priority)
+ }
+ }
+ }
+
+ fn apply_sched_prio(&mut self) {
+ let base_prio = self.base_sched_prio();
+ if self.futex_pi_boost {
+ self.futex_pi_original_prio = base_prio;
+ self.prio = self.prio.min(base_prio);
+ } else {
+ self.futex_pi_original_prio = base_prio;
+ self.prio = base_prio;
+ }
+ }
+
+ pub fn set_sched_other_prio(&mut self, prio: usize) {
+ self.sched_static_prio = clamp_sched_other_prio(prio);
+ self.apply_sched_prio();
+ }
+
+ pub fn set_sched_policy(&mut self, sched_policy: SchedPolicy, rt_priority: u8) {
+ self.sched_policy = sched_policy;
+ self.sched_rt_priority = match sched_policy {
+ SchedPolicy::Other => 0,
+ SchedPolicy::Fifo | SchedPolicy::RoundRobin => rt_priority.min(99),
+ };
+ self.sched_rr_ticks_consumed = 0;
+ self.apply_sched_prio();
+ }
+
/// Block the context, and return true if it was runnable before being blocked
pub fn block(&mut self, reason: &'static str) -> bool {
if self.status.is_runnable() {
self.status = Status::Blocked;
self.status_reason = reason;
+ self.sched_rr_ticks_consumed = 0;
true
} else {
false
@@ -232,6 +323,7 @@ impl Context {
pub fn hard_block(&mut self, reason: HardBlockedReason) -> bool {
if self.status.is_runnable() {
self.status = Status::HardBlocked { reason };
+ self.sched_rr_ticks_consumed = 0;
true
} else {
@@ -261,6 +353,7 @@ impl Context {
if self.status.is_soft_blocked() {
self.status = Status::Runnable;
self.status_reason = "";
+ self.sched_rr_ticks_consumed = 0;
true
} else {
@@ -479,6 +572,7 @@ impl Context {
uid: self.euid,
gid: self.egid,
pid: self.pid,
+ groups: self.groups.clone(),
}
}
}
@@ -0,0 +1,150 @@
diff --git a/src/context/switch.rs b/src/context/switch.rs
index 86684c8..aeb29c9 100644
--- a/src/context/switch.rs
+++ b/src/context/switch.rs
@@ -5,7 +5,7 @@
use crate::{
context::{
self, arch, idle_contexts, idle_contexts_try, run_contexts, ArcContextLockWriteGuard,
- Context, ContextLock, WeakContextRef,
+ Context, ContextLock, SchedPolicy, WeakContextRef,
},
cpu_set::LogicalCpuId,
cpu_stats::{self, CpuState},
@@ -33,35 +33,17 @@ const SCHED_PRIO_TO_WEIGHT: [usize; 40] = [
70, 56, 45, 36, 29, 23, 18, 15,
];
-/// Determines if a given context is eligible to be scheduled on a given CPU (in
-/// principle, the current CPU).
-///
-/// # Safety
-/// This function is unsafe because it modifies the `context`'s state directly without synchronization.
-///
-/// # Parameters
-/// - `context`: The context (process/thread) to be checked.
-/// - `cpu_id`: The logical ID of the CPU on which the context is being scheduled.
-///
-/// # Returns
-/// - `UpdateResult::CanSwitch`: If the context can be switched to.
-/// - `UpdateResult::Skip`: If the context should be skipped (e.g., it's running on another CPU).
unsafe fn update_runnable(
context: &mut Context,
cpu_id: LogicalCpuId,
switch_time: u128,
) -> UpdateResult {
- // Ignore contexts that are already running.
if context.running {
return UpdateResult::Skip;
}
-
- // Ignore contexts assigned to other CPUs.
if !context.sched_affinity.contains(cpu_id) {
return UpdateResult::Skip;
}
-
- // If context is soft-blocked and has a wake-up time, check if it should wake up.
if context.status.is_soft_blocked()
&& let Some(wake) = context.wake
&& switch_time >= wake
@@ -69,8 +51,6 @@ unsafe fn update_runnable(
context.wake = None;
context.unblock_no_ipi();
}
-
- // If the context is runnable, indicate it can be switched to.
if context.status.is_runnable() {
UpdateResult::CanSwitch
} else {
@@ -95,7 +75,7 @@ pub fn tick(token: &mut CleanLockToken) {
let new_ticks = ticks_cell.get() + 1;
ticks_cell.set(new_ticks);
- // Trigger a context switch after every 3 ticks (approx. 6.75 ms).
+ // Trigger a context switch after every 3 ticks.
if new_ticks >= 3 {
switch(token);
crate::context::signal::signal_handler(token);
@@ -167,10 +147,7 @@ pub fn switch(token: &mut CleanLockToken) -> SwitchResult {
let mut prev_context_guard = unsafe { prev_context_lock.write_arc() };
if !prev_context_guard.is_preemptable() {
- // Unset global lock
arch::CONTEXT_SWITCH_LOCK.store(false, Ordering::SeqCst);
-
- // Pretend to have finished switching, so CPU is not idled
return SwitchResult::Switched;
}
@@ -377,6 +354,71 @@ fn select_next_context(
let total_contexts: usize = contexts_list.iter().map(|q| q.len()).sum();
let mut skipped_contexts = 0;
+ // PASS 0: SCHED_FIFO and SCHED_RR — scan for RT contexts to schedule.
+ // When a runnable RT context is found, it takes priority over all SCHED_OTHER.
+ for prio in 0..40 {
+ let rt_contexts = contexts_list
+ .get_mut(prio)
+ .expect("prio should be between [0, 39]");
+ let len = rt_contexts.len();
+ for _ in 0..len {
+ let (rt_ref, rt_lock) = match rt_contexts.pop_front() {
+ Some(lock) => match lock.upgrade() {
+ Some(l) => (lock, l),
+ None => {
+ skipped_contexts += 1;
+ continue;
+ }
+ },
+ None => break,
+ };
+ if Arc::ptr_eq(&rt_lock, &idle_context) {
+ rt_contexts.push_back(rt_ref);
+ continue;
+ }
+ // Current RT thread: if runnable with no higher-prio RT found yet,
+ // keep it running (no demotion to SCHED_OTHER)
+ if Arc::ptr_eq(&rt_lock, &prev_context_lock) {
+ let mut rt_guard = unsafe { rt_lock.write_arc() };
+ if rt_guard.status.is_runnable()
+ && (rt_guard.sched_policy == SchedPolicy::Fifo
+ || rt_guard.sched_policy == SchedPolicy::RoundRobin)
+ {
+ percpu.balance.set(balance);
+ percpu.last_queue.set(i);
+ return Ok(Some(rt_guard));
+ }
+ rt_contexts.push_back(rt_ref);
+ continue;
+ }
+ let mut rt_guard = unsafe { rt_lock.write_arc() };
+ if !rt_guard.status.is_runnable() || rt_guard.running
+ || !rt_guard.sched_affinity.contains(cpu_id)
+ {
+ rt_contexts.push_back(rt_ref);
+ continue;
+ }
+ if rt_guard.sched_policy == SchedPolicy::Fifo
+ || rt_guard.sched_policy == SchedPolicy::RoundRobin
+ {
+ percpu.balance.set(balance);
+ percpu.last_queue.set(i);
+ if !Arc::ptr_eq(&prev_context_lock, &idle_context) {
+ let prev_ctx = WeakContextRef(Arc::downgrade(&prev_context_lock));
+ if prev_context_guard.status.is_runnable() {
+ contexts_list[prev_context_guard.prio].push_back(prev_ctx);
+ } else {
+ idle_contexts(token.token()).push_back(prev_ctx);
+ }
+ }
+ return Ok(Some(rt_guard));
+ }
+ rt_contexts.push_back(rt_ref);
+ }
+ }
+
+ // PASS 1: SCHED_OTHER — existing DWRR deficit tracking
+
'priority: loop {
i = (i + 1) % 40;
total_iters += 1;
@@ -0,0 +1,20 @@
diff --git a/src/scheme/mod.rs b/src/scheme/mod.rs
index d30272c..9da2b28 100644
--- a/src/scheme/mod.rs
+++ b/src/scheme/mod.rs
@@ -777,6 +777,7 @@ pub struct CallerCtx {
pub pid: usize,
pub uid: u32,
pub gid: u32,
+ pub groups: alloc::vec::Vec<u32>,
}
impl CallerCtx {
pub fn filter_uid_gid(self, euid: u32, egid: u32) -> Self {
@@ -785,6 +786,7 @@ impl CallerCtx {
pid: self.pid,
uid: euid,
gid: egid,
+ groups: self.groups,
}
} else {
self
@@ -0,0 +1,42 @@
diff --git a/src/syscall/futex.rs b/src/syscall/futex.rs
index 4c187b8..9884d2b 100644
--- a/src/syscall/futex.rs
+++ b/src/syscall/futex.rs
@@ -49,8 +49,13 @@ pub struct FutexEntry {
// implement that fully in userspace. Although futex is probably the best API for process-shared
// POSIX synchronization primitives, a local hash table and wait-for-thread kernel APIs (e.g.
// lwp_park/lwp_unpark from NetBSD) could be a simpler replacement.
-static FUTEXES: Mutex<L1, FutexList> =
- Mutex::new(FutexList::with_hasher(DefaultHashBuilder::new()));
+const FUTEX_SHARDS: usize = 64;
+
+fn futex_shard(phys: PhysicalAddress) -> usize {
+ (phys.data() as usize >> 12) % FUTEX_SHARDS
+}
+
+static FUTEXES: [Mutex<L1, FutexList>; FUTEX_SHARDS] = [const { Mutex::new(FutexList::with_hasher(DefaultHashBuilder::new())) }; FUTEX_SHARDS];
fn validate_and_translate_virt(space: &AddrSpace, addr: VirtualAddress) -> Option<PhysicalAddress> {
// TODO: Move this elsewhere!
@@ -97,7 +102,7 @@ pub fn futex(
{
// TODO: Lock ordering violation
let mut token = unsafe { CleanLockToken::new() };
- let mut futexes = FUTEXES.lock(token.token());
+ let mut futexes = FUTEXES[futex_shard(target_physaddr)].lock(token.token());
let (futexes, mut token) = futexes.token_split();
let (fetched, expected) = if op == FUTEX_WAIT {
@@ -181,10 +186,11 @@ pub fn futex(
}
FUTEX_WAKE => {
let mut woken = 0;
+ let shard = futex_shard(target_physaddr);
{
drop(addr_space_guard);
- let mut futexes_map = FUTEXES.lock(token.token());
+ let mut futexes_map = FUTEXES[shard].lock(token.token());
let (futexes_map, mut token) = futexes_map.token_split();
let is_empty = if let Some(futexes) = futexes_map.get_mut(&target_physaddr) {
@@ -0,0 +1,89 @@
diff --git a/src/percpu.rs b/src/percpu.rs
index f4ad5e6..1844d62 100644
--- a/src/percpu.rs
+++ b/src/percpu.rs
@@ -1,4 +1,5 @@
use alloc::{
+ collections::VecDeque,
sync::{Arc, Weak},
vec::Vec,
};
@@ -12,7 +13,10 @@ use syscall::PtraceFlags;
use crate::{
arch::device::ArchPercpuMisc,
- context::{empty_cr3, memory::AddrSpaceWrapper, switch::ContextSwitchPercpu},
+ context::{
+ empty_cr3, memory::AddrSpaceWrapper, switch::ContextSwitchPercpu, WeakContextRef,
+ RUN_QUEUE_COUNT,
+ },
cpu_set::{LogicalCpuId, MAX_CPU_COUNT},
cpu_stats::{CpuStats, CpuStatsData},
ptrace::Session,
@@ -20,6 +24,42 @@ use crate::{
syscall::debug::SyscallDebugInfo,
};
+#[allow(dead_code)]
+pub struct PerCpuSched {
+ pub run_queues: [VecDeque<WeakContextRef>; RUN_QUEUE_COUNT],
+ pub run_queues_lock: AtomicBool,
+ pub balance: Cell<[usize; RUN_QUEUE_COUNT]>,
+ pub last_queue: Cell<usize>,
+}
+
+impl PerCpuSched {
+ pub const fn new() -> Self {
+ const EMPTY: VecDeque<WeakContextRef> = VecDeque::new();
+ Self {
+ run_queues: [EMPTY; RUN_QUEUE_COUNT],
+ run_queues_lock: AtomicBool::new(false),
+ balance: Cell::new([0; RUN_QUEUE_COUNT]),
+ last_queue: Cell::new(0),
+ }
+ }
+
+ pub fn take_lock(&self) {
+ while self
+ .run_queues_lock
+ .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
+ .is_err()
+ {
+ while self.run_queues_lock.load(Ordering::Relaxed) {
+ core::hint::spin_loop();
+ }
+ }
+ }
+
+ pub fn release_lock(&self) {
+ self.run_queues_lock.store(false, Ordering::Release);
+ }
+}
+
/// The percpu block, that stored all percpu variables.
pub struct PercpuBlock {
/// A unique immutable number that identifies the current CPU - used for scheduling
@@ -31,7 +71,12 @@ pub struct PercpuBlock {
pub current_addrsp: RefCell<Option<Arc<AddrSpaceWrapper>>>,
pub new_addrsp_tmp: Cell<Option<Arc<AddrSpaceWrapper>>>,
pub wants_tlb_shootdown: AtomicBool,
- pub balance: Cell<[usize; 40]>,
+
+ pub sched: PerCpuSched,
+
+ // Legacy DWRR state used by context/switch.rs until the per-CPU scheduler migration is
+ // finished.
+ pub balance: Cell<[usize; RUN_QUEUE_COUNT]>,
pub last_queue: Cell<usize>,
// TODO: Put mailbox queues here, e.g. for TLB shootdown? Just be sure to 128-byte align it
@@ -187,7 +232,8 @@ impl PercpuBlock {
current_addrsp: RefCell::new(None),
new_addrsp_tmp: Cell::new(None),
wants_tlb_shootdown: AtomicBool::new(false),
- balance: Cell::new([0; 40]),
+ sched: PerCpuSched::new(),
+ balance: Cell::new([0; RUN_QUEUE_COUNT]),
last_queue: Cell::new(39),
ptrace_flags: Cell::new(PtraceFlags::empty()),
ptrace_session: RefCell::new(None),
@@ -0,0 +1,180 @@
diff --git a/src/context/context.rs b/src/context/context.rs
index c97c516..a0814fa 100644
--- a/src/context/context.rs
+++ b/src/context/context.rs
@@ -18,7 +18,8 @@ use crate::{
cpu_stats,
ipi::{ipi, IpiKind, IpiTarget},
memory::{
- allocate_p2frame, deallocate_p2frame, Enomem, Frame, RaiiFrame, RmmA, RmmArch, PAGE_SIZE,
+ allocate_p2frame, deallocate_p2frame, Enomem, Frame, PhysicalAddress, RaiiFrame, RmmA,
+ RmmArch, PAGE_SIZE,
},
percpu::PercpuBlock,
scheme::{CallerCtx, FileHandle, SchemeId},
@@ -62,6 +63,38 @@ impl Status {
}
}
+pub const SCHED_PRIORITY_LEVELS: usize = 40;
+pub const DEFAULT_SCHED_OTHER_PRIORITY: usize = 20;
+pub const DEFAULT_SCHED_RR_QUANTUM: u128 = 100_000_000;
+
+#[repr(u8)]
+#[derive(Clone, Copy, Debug, PartialEq, Eq)]
+pub enum SchedPolicy {
+ Fifo = 0,
+ RoundRobin = 1,
+ Other = 2,
+}
+
+impl SchedPolicy {
+ pub fn try_from_raw(raw: u8) -> Option<Self> {
+ match raw {
+ 0 => Some(Self::Fifo),
+ 1 => Some(Self::RoundRobin),
+ 2 => Some(Self::Other),
+ _ => None,
+ }
+ }
+}
+
+pub fn rt_priority_to_kernel_prio(rt_priority: u8) -> usize {
+ (SCHED_PRIORITY_LEVELS - 1)
+ .saturating_sub((usize::from(rt_priority.min(99)) * (SCHED_PRIORITY_LEVELS - 1)) / 99)
+}
+
+fn clamp_sched_other_prio(prio: usize) -> usize {
+ prio.min(SCHED_PRIORITY_LEVELS - 1)
+}
+
#[derive(Clone, Debug)]
pub enum HardBlockedReason {
/// "SIGSTOP", only procmgr is allowed to switch contexts this state
@@ -140,6 +173,20 @@ pub struct Context {
pub fmap_ret: Option<Frame>,
/// Priority
pub prio: usize,
+ pub sched_policy: SchedPolicy,
+ pub sched_rt_priority: u8,
+ pub sched_rr_ticks_consumed: u32,
+ pub sched_static_prio: usize,
+pub sched_rr_quantum: u128,
+ /// Virtual runtime for SCHED_OTHER fair scheduling.
+ /// CPU-bound threads accumulate vruntime faster; I/O-bound stay lower.
+ pub vruntime: u128,
+ #[allow(dead_code)]
+ pub futex_pi_boost: bool,
+ #[allow(dead_code)]
+ pub futex_pi_original_prio: usize,
+ #[allow(dead_code)]
+ pub futex_pi_waiters: Vec<PhysicalAddress>,
// TODO: id can reappear after wraparound?
pub owner_proc_id: Option<NonZeroUsize>,
@@ -148,6 +195,8 @@ pub struct Context {
pub euid: u32,
pub egid: u32,
pub pid: usize,
+ /// Supplementary group IDs for access control decisions.
+ pub groups: Vec<u32>,
// See [`PreemptGuard`]
//
@@ -197,13 +246,23 @@ impl Context {
files: Arc::new(RwLock::new(FdTbl::new())),
userspace: false,
fmap_ret: None,
- prio: 20,
+ prio: DEFAULT_SCHED_OTHER_PRIORITY,
+ sched_policy: SchedPolicy::Other,
+ sched_rt_priority: 0,
+ sched_rr_ticks_consumed: 0,
+ sched_static_prio: DEFAULT_SCHED_OTHER_PRIORITY,
+ sched_rr_quantum: DEFAULT_SCHED_RR_QUANTUM,
+ vruntime: 0u128,
+ futex_pi_boost: false,
+ futex_pi_original_prio: DEFAULT_SCHED_OTHER_PRIORITY,
+ futex_pi_waiters: Vec::new(),
being_sigkilled: false,
owner_proc_id,
euid: 0,
egid: 0,
pid: 0,
+ groups: Vec::new(),
#[cfg(feature = "syscall_debug")]
syscall_debug_info: crate::syscall::debug::SyscallDebugInfo::default(),
@@ -218,11 +277,47 @@ impl Context {
self.preempt_locks == 0
}
+ fn base_sched_prio(&self) -> usize {
+ match self.sched_policy {
+ SchedPolicy::Other => clamp_sched_other_prio(self.sched_static_prio),
+ SchedPolicy::Fifo | SchedPolicy::RoundRobin => {
+ rt_priority_to_kernel_prio(self.sched_rt_priority)
+ }
+ }
+ }
+
+ fn apply_sched_prio(&mut self) {
+ let base_prio = self.base_sched_prio();
+ if self.futex_pi_boost {
+ self.futex_pi_original_prio = base_prio;
+ self.prio = self.prio.min(base_prio);
+ } else {
+ self.futex_pi_original_prio = base_prio;
+ self.prio = base_prio;
+ }
+ }
+
+ pub fn set_sched_other_prio(&mut self, prio: usize) {
+ self.sched_static_prio = clamp_sched_other_prio(prio);
+ self.apply_sched_prio();
+ }
+
+ pub fn set_sched_policy(&mut self, sched_policy: SchedPolicy, rt_priority: u8) {
+ self.sched_policy = sched_policy;
+ self.sched_rt_priority = match sched_policy {
+ SchedPolicy::Other => 0,
+ SchedPolicy::Fifo | SchedPolicy::RoundRobin => rt_priority.min(99),
+ };
+ self.sched_rr_ticks_consumed = 0;
+ self.apply_sched_prio();
+ }
+
/// Block the context, and return true if it was runnable before being blocked
pub fn block(&mut self, reason: &'static str) -> bool {
if self.status.is_runnable() {
self.status = Status::Blocked;
self.status_reason = reason;
+ self.sched_rr_ticks_consumed = 0;
true
} else {
false
@@ -232,6 +327,7 @@ impl Context {
pub fn hard_block(&mut self, reason: HardBlockedReason) -> bool {
if self.status.is_runnable() {
self.status = Status::HardBlocked { reason };
+ self.sched_rr_ticks_consumed = 0;
true
} else {
@@ -261,6 +357,7 @@ impl Context {
if self.status.is_soft_blocked() {
self.status = Status::Runnable;
self.status_reason = "";
+ self.sched_rr_ticks_consumed = 0;
true
} else {
@@ -479,6 +576,7 @@ impl Context {
uid: self.euid,
gid: self.egid,
pid: self.pid,
+ groups: self.groups.clone(),
}
}
}
@@ -0,0 +1,214 @@
diff --git a/src/context/switch.rs b/src/context/switch.rs
index 86684c8..74dd5f1 100644
--- a/src/context/switch.rs
+++ b/src/context/switch.rs
@@ -5,7 +5,7 @@
use crate::{
context::{
self, arch, idle_contexts, idle_contexts_try, run_contexts, ArcContextLockWriteGuard,
- Context, ContextLock, WeakContextRef,
+ Context, ContextLock, SchedPolicy, WeakContextRef,
},
cpu_set::LogicalCpuId,
cpu_stats::{self, CpuState},
@@ -33,35 +33,17 @@ const SCHED_PRIO_TO_WEIGHT: [usize; 40] = [
70, 56, 45, 36, 29, 23, 18, 15,
];
-/// Determines if a given context is eligible to be scheduled on a given CPU (in
-/// principle, the current CPU).
-///
-/// # Safety
-/// This function is unsafe because it modifies the `context`'s state directly without synchronization.
-///
-/// # Parameters
-/// - `context`: The context (process/thread) to be checked.
-/// - `cpu_id`: The logical ID of the CPU on which the context is being scheduled.
-///
-/// # Returns
-/// - `UpdateResult::CanSwitch`: If the context can be switched to.
-/// - `UpdateResult::Skip`: If the context should be skipped (e.g., it's running on another CPU).
unsafe fn update_runnable(
context: &mut Context,
cpu_id: LogicalCpuId,
switch_time: u128,
) -> UpdateResult {
- // Ignore contexts that are already running.
if context.running {
return UpdateResult::Skip;
}
-
- // Ignore contexts assigned to other CPUs.
if !context.sched_affinity.contains(cpu_id) {
return UpdateResult::Skip;
}
-
- // If context is soft-blocked and has a wake-up time, check if it should wake up.
if context.status.is_soft_blocked()
&& let Some(wake) = context.wake
&& switch_time >= wake
@@ -69,8 +51,6 @@ unsafe fn update_runnable(
context.wake = None;
context.unblock_no_ipi();
}
-
- // If the context is runnable, indicate it can be switched to.
if context.status.is_runnable() {
UpdateResult::CanSwitch
} else {
@@ -95,7 +75,7 @@ pub fn tick(token: &mut CleanLockToken) {
let new_ticks = ticks_cell.get() + 1;
ticks_cell.set(new_ticks);
- // Trigger a context switch after every 3 ticks (approx. 6.75 ms).
+ // Trigger a context switch after every 3 ticks.
if new_ticks >= 3 {
switch(token);
crate::context::signal::signal_handler(token);
@@ -167,10 +147,7 @@ pub fn switch(token: &mut CleanLockToken) -> SwitchResult {
let mut prev_context_guard = unsafe { prev_context_lock.write_arc() };
if !prev_context_guard.is_preemptable() {
- // Unset global lock
arch::CONTEXT_SWITCH_LOCK.store(false, Ordering::SeqCst);
-
- // Pretend to have finished switching, so CPU is not idled
return SwitchResult::Switched;
}
@@ -222,6 +199,13 @@ pub fn switch(token: &mut CleanLockToken) -> SwitchResult {
// Update times
if !was_idle {
prev_context.cpu_time += switch_time.saturating_sub(prev_context.switch_time);
+ if prev_context.sched_policy == SchedPolicy::Other {
+ let actual_ns = switch_time.saturating_sub(prev_context.switch_time);
+ let weight = SCHED_PRIO_TO_WEIGHT[prev_context.sched_static_prio.min(39)] as u128;
+ let default_weight = SCHED_PRIO_TO_WEIGHT[20] as u128;
+ let delta = actual_ns.saturating_mul(default_weight) / weight.max(1);
+ prev_context.vruntime = prev_context.vruntime.saturating_add(delta);
+ }
}
next_context.switch_time = switch_time;
if next_context.userspace {
@@ -377,6 +361,121 @@ fn select_next_context(
let total_contexts: usize = contexts_list.iter().map(|q| q.len()).sum();
let mut skipped_contexts = 0;
+ // PASS 0: SCHED_FIFO and SCHED_RR — scan for RT contexts to schedule.
+ // When a runnable RT context is found, it takes priority over all SCHED_OTHER.
+ for prio in 0..40 {
+ let rt_contexts = contexts_list
+ .get_mut(prio)
+ .expect("prio should be between [0, 39]");
+ let len = rt_contexts.len();
+ for _ in 0..len {
+ let (rt_ref, rt_lock) = match rt_contexts.pop_front() {
+ Some(lock) => match lock.upgrade() {
+ Some(l) => (lock, l),
+ None => {
+ skipped_contexts += 1;
+ continue;
+ }
+ },
+ None => break,
+ };
+ if Arc::ptr_eq(&rt_lock, &idle_context) {
+ rt_contexts.push_back(rt_ref);
+ continue;
+ }
+ // Current RT thread: if runnable with no higher-prio RT found yet,
+ // keep it running (no demotion to SCHED_OTHER)
+ if Arc::ptr_eq(&rt_lock, &prev_context_lock) {
+ let rt_guard = unsafe { rt_lock.write_arc() };
+ if rt_guard.status.is_runnable()
+ && (rt_guard.sched_policy == SchedPolicy::Fifo
+ || rt_guard.sched_policy == SchedPolicy::RoundRobin)
+ {
+ percpu.balance.set(balance);
+ percpu.last_queue.set(i);
+ return Ok(Some(rt_guard));
+ }
+ rt_contexts.push_back(rt_ref);
+ continue;
+ }
+ let rt_guard = unsafe { rt_lock.write_arc() };
+ if !rt_guard.status.is_runnable() || rt_guard.running
+ || !rt_guard.sched_affinity.contains(cpu_id)
+ {
+ rt_contexts.push_back(rt_ref);
+ continue;
+ }
+ if rt_guard.sched_policy == SchedPolicy::Fifo
+ || rt_guard.sched_policy == SchedPolicy::RoundRobin
+ {
+ percpu.balance.set(balance);
+ percpu.last_queue.set(i);
+ if !Arc::ptr_eq(&prev_context_lock, &idle_context) {
+ let prev_ctx = WeakContextRef(Arc::downgrade(&prev_context_lock));
+ if prev_context_guard.status.is_runnable() {
+ contexts_list[prev_context_guard.prio].push_back(prev_ctx);
+ } else {
+ idle_contexts(token.token()).push_back(prev_ctx);
+ }
+ }
+ return Ok(Some(rt_guard));
+ }
+ rt_contexts.push_back(rt_ref);
+ }
+ }
+
+ // PASS 1: SCHED_OTHER — minimum-vruntime selection
+ {
+ let mut min_vruntime = u128::MAX;
+ let mut best: Option<(usize, WeakContextRef)> = None;
+ for (prio, queue) in contexts_list.iter().enumerate() {
+ for ctx_ref in queue.iter() {
+ if let Some(ctx_lock) = ctx_ref.upgrade() {
+ if Arc::ptr_eq(&ctx_lock, &prev_context_lock) || Arc::ptr_eq(&ctx_lock, &idle_context) {
+ continue;
+ }
+ if let Some(guard) = ctx_lock.try_read(token.token()) {
+ if guard.status.is_runnable() && !guard.running
+ && guard.sched_affinity.contains(cpu_id)
+ && guard.sched_policy == SchedPolicy::Other
+ {
+ let v = guard.vruntime;
+ drop(guard);
+ if v < min_vruntime {
+ min_vruntime = v;
+ best = Some((prio, ctx_ref.clone()));
+ }
+ }
+ }
+ }
+ }
+ }
+ if let Some((best_prio, ctx_ref)) = best {
+ {
+ let queue = contexts_list.get_mut(best_prio).expect("valid prio");
+ queue.retain(|r| !WeakContextRef::eq(r, &ctx_ref));
+ }
+ if let Some(ctx_lock) = ctx_ref.upgrade() {
+ let guard = unsafe { ctx_lock.write_arc() };
+ if guard.status.is_runnable() {
+ percpu.balance.set(balance);
+ percpu.last_queue.set(i);
+ if !Arc::ptr_eq(&prev_context_lock, &idle_context) {
+ let prev_ctx = WeakContextRef(Arc::downgrade(&prev_context_lock));
+ if prev_context_guard.status.is_runnable() {
+ contexts_list[prev_context_guard.prio].push_back(prev_ctx);
+ } else {
+ idle_contexts(token.token()).push_back(prev_ctx);
+ }
+ }
+ return Ok(Some(guard));
+ }
+ }
+ }
+ }
+
+ // PASS 2: fallback DWRR deficit tracking
+
'priority: loop {
i = (i + 1) % 40;
total_iters += 1;
@@ -0,0 +1,196 @@
diff --git a/src/context/context.rs b/src/context/context.rs
index c97c516..18fbd7f 100644
--- a/src/context/context.rs
+++ b/src/context/context.rs
@@ -18,7 +18,8 @@ use crate::{
cpu_stats,
ipi::{ipi, IpiKind, IpiTarget},
memory::{
- allocate_p2frame, deallocate_p2frame, Enomem, Frame, RaiiFrame, RmmA, RmmArch, PAGE_SIZE,
+ allocate_p2frame, deallocate_p2frame, Enomem, Frame, PhysicalAddress, RaiiFrame, RmmA,
+ RmmArch, PAGE_SIZE,
},
percpu::PercpuBlock,
scheme::{CallerCtx, FileHandle, SchemeId},
@@ -62,6 +63,38 @@ impl Status {
}
}
+pub const SCHED_PRIORITY_LEVELS: usize = 40;
+pub const DEFAULT_SCHED_OTHER_PRIORITY: usize = 20;
+pub const DEFAULT_SCHED_RR_QUANTUM: u128 = 100_000_000;
+
+#[repr(u8)]
+#[derive(Clone, Copy, Debug, PartialEq, Eq)]
+pub enum SchedPolicy {
+ Fifo = 0,
+ RoundRobin = 1,
+ Other = 2,
+}
+
+impl SchedPolicy {
+ pub fn try_from_raw(raw: u8) -> Option<Self> {
+ match raw {
+ 0 => Some(Self::Fifo),
+ 1 => Some(Self::RoundRobin),
+ 2 => Some(Self::Other),
+ _ => None,
+ }
+ }
+}
+
+pub fn rt_priority_to_kernel_prio(rt_priority: u8) -> usize {
+ (SCHED_PRIORITY_LEVELS - 1)
+ .saturating_sub((usize::from(rt_priority.min(99)) * (SCHED_PRIORITY_LEVELS - 1)) / 99)
+}
+
+fn clamp_sched_other_prio(prio: usize) -> usize {
+ prio.min(SCHED_PRIORITY_LEVELS - 1)
+}
+
#[derive(Clone, Debug)]
pub enum HardBlockedReason {
/// "SIGSTOP", only procmgr is allowed to switch contexts this state
@@ -96,6 +129,7 @@ pub struct Context {
pub running: bool,
/// Current CPU ID
pub cpu_id: Option<LogicalCpuId>,
+ pub last_cpu: Option<LogicalCpuId>,
/// Time this context was switched to
pub switch_time: u128,
/// Amount of CPU time used
@@ -140,6 +174,20 @@ pub struct Context {
pub fmap_ret: Option<Frame>,
/// Priority
pub prio: usize,
+ pub sched_policy: SchedPolicy,
+ pub sched_rt_priority: u8,
+ pub sched_rr_ticks_consumed: u32,
+ pub sched_static_prio: usize,
+pub sched_rr_quantum: u128,
+ /// Virtual runtime for SCHED_OTHER fair scheduling.
+ /// CPU-bound threads accumulate vruntime faster; I/O-bound stay lower.
+ pub vruntime: u128,
+ #[allow(dead_code)]
+ pub futex_pi_boost: bool,
+ #[allow(dead_code)]
+ pub futex_pi_original_prio: usize,
+ #[allow(dead_code)]
+ pub futex_pi_waiters: Vec<PhysicalAddress>,
// TODO: id can reappear after wraparound?
pub owner_proc_id: Option<NonZeroUsize>,
@@ -148,6 +196,8 @@ pub struct Context {
pub euid: u32,
pub egid: u32,
pub pid: usize,
+ /// Supplementary group IDs for access control decisions.
+ pub groups: Vec<u32>,
// See [`PreemptGuard`]
//
@@ -182,6 +232,7 @@ impl Context {
status_reason: "",
running: false,
cpu_id: None,
+ last_cpu: None,
switch_time: 0,
cpu_time: 0,
sched_affinity: LogicalCpuSet::all(),
@@ -197,13 +248,23 @@ impl Context {
files: Arc::new(RwLock::new(FdTbl::new())),
userspace: false,
fmap_ret: None,
- prio: 20,
+ prio: DEFAULT_SCHED_OTHER_PRIORITY,
+ sched_policy: SchedPolicy::Other,
+ sched_rt_priority: 0,
+ sched_rr_ticks_consumed: 0,
+ sched_static_prio: DEFAULT_SCHED_OTHER_PRIORITY,
+ sched_rr_quantum: DEFAULT_SCHED_RR_QUANTUM,
+ vruntime: 0u128,
+ futex_pi_boost: false,
+ futex_pi_original_prio: DEFAULT_SCHED_OTHER_PRIORITY,
+ futex_pi_waiters: Vec::new(),
being_sigkilled: false,
owner_proc_id,
euid: 0,
egid: 0,
pid: 0,
+ groups: Vec::new(),
#[cfg(feature = "syscall_debug")]
syscall_debug_info: crate::syscall::debug::SyscallDebugInfo::default(),
@@ -218,11 +279,47 @@ impl Context {
self.preempt_locks == 0
}
+ fn base_sched_prio(&self) -> usize {
+ match self.sched_policy {
+ SchedPolicy::Other => clamp_sched_other_prio(self.sched_static_prio),
+ SchedPolicy::Fifo | SchedPolicy::RoundRobin => {
+ rt_priority_to_kernel_prio(self.sched_rt_priority)
+ }
+ }
+ }
+
+ fn apply_sched_prio(&mut self) {
+ let base_prio = self.base_sched_prio();
+ if self.futex_pi_boost {
+ self.futex_pi_original_prio = base_prio;
+ self.prio = self.prio.min(base_prio);
+ } else {
+ self.futex_pi_original_prio = base_prio;
+ self.prio = base_prio;
+ }
+ }
+
+ pub fn set_sched_other_prio(&mut self, prio: usize) {
+ self.sched_static_prio = clamp_sched_other_prio(prio);
+ self.apply_sched_prio();
+ }
+
+ pub fn set_sched_policy(&mut self, sched_policy: SchedPolicy, rt_priority: u8) {
+ self.sched_policy = sched_policy;
+ self.sched_rt_priority = match sched_policy {
+ SchedPolicy::Other => 0,
+ SchedPolicy::Fifo | SchedPolicy::RoundRobin => rt_priority.min(99),
+ };
+ self.sched_rr_ticks_consumed = 0;
+ self.apply_sched_prio();
+ }
+
/// Block the context, and return true if it was runnable before being blocked
pub fn block(&mut self, reason: &'static str) -> bool {
if self.status.is_runnable() {
self.status = Status::Blocked;
self.status_reason = reason;
+ self.sched_rr_ticks_consumed = 0;
true
} else {
false
@@ -232,6 +329,7 @@ impl Context {
pub fn hard_block(&mut self, reason: HardBlockedReason) -> bool {
if self.status.is_runnable() {
self.status = Status::HardBlocked { reason };
+ self.sched_rr_ticks_consumed = 0;
true
} else {
@@ -261,6 +359,7 @@ impl Context {
if self.status.is_soft_blocked() {
self.status = Status::Runnable;
self.status_reason = "";
+ self.sched_rr_ticks_consumed = 0;
true
} else {
@@ -479,6 +578,7 @@ impl Context {
uid: self.euid,
gid: self.egid,
pid: self.pid,
+ groups: self.groups.clone(),
}
}
}
@@ -0,0 +1,225 @@
diff --git a/src/context/switch.rs b/src/context/switch.rs
index 86684c8..cd5f7ed 100644
--- a/src/context/switch.rs
+++ b/src/context/switch.rs
@@ -5,7 +5,7 @@
use crate::{
context::{
self, arch, idle_contexts, idle_contexts_try, run_contexts, ArcContextLockWriteGuard,
- Context, ContextLock, WeakContextRef,
+ Context, ContextLock, SchedPolicy, WeakContextRef,
},
cpu_set::LogicalCpuId,
cpu_stats::{self, CpuState},
@@ -33,35 +33,17 @@ const SCHED_PRIO_TO_WEIGHT: [usize; 40] = [
70, 56, 45, 36, 29, 23, 18, 15,
];
-/// Determines if a given context is eligible to be scheduled on a given CPU (in
-/// principle, the current CPU).
-///
-/// # Safety
-/// This function is unsafe because it modifies the `context`'s state directly without synchronization.
-///
-/// # Parameters
-/// - `context`: The context (process/thread) to be checked.
-/// - `cpu_id`: The logical ID of the CPU on which the context is being scheduled.
-///
-/// # Returns
-/// - `UpdateResult::CanSwitch`: If the context can be switched to.
-/// - `UpdateResult::Skip`: If the context should be skipped (e.g., it's running on another CPU).
unsafe fn update_runnable(
context: &mut Context,
cpu_id: LogicalCpuId,
switch_time: u128,
) -> UpdateResult {
- // Ignore contexts that are already running.
if context.running {
return UpdateResult::Skip;
}
-
- // Ignore contexts assigned to other CPUs.
if !context.sched_affinity.contains(cpu_id) {
return UpdateResult::Skip;
}
-
- // If context is soft-blocked and has a wake-up time, check if it should wake up.
if context.status.is_soft_blocked()
&& let Some(wake) = context.wake
&& switch_time >= wake
@@ -69,8 +51,6 @@ unsafe fn update_runnable(
context.wake = None;
context.unblock_no_ipi();
}
-
- // If the context is runnable, indicate it can be switched to.
if context.status.is_runnable() {
UpdateResult::CanSwitch
} else {
@@ -95,7 +75,7 @@ pub fn tick(token: &mut CleanLockToken) {
let new_ticks = ticks_cell.get() + 1;
ticks_cell.set(new_ticks);
- // Trigger a context switch after every 3 ticks (approx. 6.75 ms).
+ // Trigger a context switch after every 3 ticks.
if new_ticks >= 3 {
switch(token);
crate::context::signal::signal_handler(token);
@@ -167,10 +147,7 @@ pub fn switch(token: &mut CleanLockToken) -> SwitchResult {
let mut prev_context_guard = unsafe { prev_context_lock.write_arc() };
if !prev_context_guard.is_preemptable() {
- // Unset global lock
arch::CONTEXT_SWITCH_LOCK.store(false, Ordering::SeqCst);
-
- // Pretend to have finished switching, so CPU is not idled
return SwitchResult::Switched;
}
@@ -213,6 +190,7 @@ pub fn switch(token: &mut CleanLockToken) -> SwitchResult {
// Set the previous context as "not running"
prev_context.running = false;
+ prev_context.last_cpu = prev_context.cpu_id;
// Set the next context as "running"
next_context.running = true;
@@ -222,6 +200,13 @@ pub fn switch(token: &mut CleanLockToken) -> SwitchResult {
// Update times
if !was_idle {
prev_context.cpu_time += switch_time.saturating_sub(prev_context.switch_time);
+ if prev_context.sched_policy == SchedPolicy::Other {
+ let actual_ns = switch_time.saturating_sub(prev_context.switch_time);
+ let weight = SCHED_PRIO_TO_WEIGHT[prev_context.sched_static_prio.min(39)] as u128;
+ let default_weight = SCHED_PRIO_TO_WEIGHT[20] as u128;
+ let delta = actual_ns.saturating_mul(default_weight) / weight.max(1);
+ prev_context.vruntime = prev_context.vruntime.saturating_add(delta);
+ }
}
next_context.switch_time = switch_time;
if next_context.userspace {
@@ -377,6 +362,124 @@ fn select_next_context(
let total_contexts: usize = contexts_list.iter().map(|q| q.len()).sum();
let mut skipped_contexts = 0;
+ // PASS 0: SCHED_FIFO and SCHED_RR — scan for RT contexts to schedule.
+ // When a runnable RT context is found, it takes priority over all SCHED_OTHER.
+ for prio in 0..40 {
+ let rt_contexts = contexts_list
+ .get_mut(prio)
+ .expect("prio should be between [0, 39]");
+ let len = rt_contexts.len();
+ for _ in 0..len {
+ let (rt_ref, rt_lock) = match rt_contexts.pop_front() {
+ Some(lock) => match lock.upgrade() {
+ Some(l) => (lock, l),
+ None => {
+ skipped_contexts += 1;
+ continue;
+ }
+ },
+ None => break,
+ };
+ if Arc::ptr_eq(&rt_lock, &idle_context) {
+ rt_contexts.push_back(rt_ref);
+ continue;
+ }
+ // Current RT thread: if runnable with no higher-prio RT found yet,
+ // keep it running (no demotion to SCHED_OTHER)
+ if Arc::ptr_eq(&rt_lock, &prev_context_lock) {
+ let rt_guard = unsafe { rt_lock.write_arc() };
+ if rt_guard.status.is_runnable()
+ && (rt_guard.sched_policy == SchedPolicy::Fifo
+ || rt_guard.sched_policy == SchedPolicy::RoundRobin)
+ {
+ percpu.balance.set(balance);
+ percpu.last_queue.set(i);
+ return Ok(Some(rt_guard));
+ }
+ rt_contexts.push_back(rt_ref);
+ continue;
+ }
+ let rt_guard = unsafe { rt_lock.write_arc() };
+ if !rt_guard.status.is_runnable() || rt_guard.running
+ || !rt_guard.sched_affinity.contains(cpu_id)
+ {
+ rt_contexts.push_back(rt_ref);
+ continue;
+ }
+ if rt_guard.sched_policy == SchedPolicy::Fifo
+ || rt_guard.sched_policy == SchedPolicy::RoundRobin
+ {
+ percpu.balance.set(balance);
+ percpu.last_queue.set(i);
+ if !Arc::ptr_eq(&prev_context_lock, &idle_context) {
+ let prev_ctx = WeakContextRef(Arc::downgrade(&prev_context_lock));
+ if prev_context_guard.status.is_runnable() {
+ contexts_list[prev_context_guard.prio].push_back(prev_ctx);
+ } else {
+ idle_contexts(token.token()).push_back(prev_ctx);
+ }
+ }
+ return Ok(Some(rt_guard));
+ }
+ rt_contexts.push_back(rt_ref);
+ }
+ }
+
+ // PASS 1: SCHED_OTHER — minimum-vruntime selection
+ {
+ let mut min_vruntime = u128::MAX;
+ let mut best: Option<(usize, WeakContextRef)> = None;
+ for (prio, queue) in contexts_list.iter().enumerate() {
+ for ctx_ref in queue.iter() {
+ if let Some(ctx_lock) = ctx_ref.upgrade() {
+ if Arc::ptr_eq(&ctx_lock, &prev_context_lock) || Arc::ptr_eq(&ctx_lock, &idle_context) {
+ continue;
+ }
+ if let Some(guard) = ctx_lock.try_read(token.token()) {
+ if guard.status.is_runnable() && !guard.running
+ && guard.sched_affinity.contains(cpu_id)
+ && guard.sched_policy == SchedPolicy::Other
+ {
+ let mut v = guard.vruntime;
+ if guard.last_cpu == Some(cpu_id) {
+ v = v.saturating_sub(v / 8);
+ }
+ drop(guard);
+ if v < min_vruntime {
+ min_vruntime = v;
+ best = Some((prio, ctx_ref.clone()));
+ }
+ }
+ }
+ }
+ }
+ }
+ if let Some((best_prio, ctx_ref)) = best {
+ {
+ let queue = contexts_list.get_mut(best_prio).expect("valid prio");
+ queue.retain(|r| !WeakContextRef::eq(r, &ctx_ref));
+ }
+ if let Some(ctx_lock) = ctx_ref.upgrade() {
+ let guard = unsafe { ctx_lock.write_arc() };
+ if guard.status.is_runnable() {
+ percpu.balance.set(balance);
+ percpu.last_queue.set(i);
+ if !Arc::ptr_eq(&prev_context_lock, &idle_context) {
+ let prev_ctx = WeakContextRef(Arc::downgrade(&prev_context_lock));
+ if prev_context_guard.status.is_runnable() {
+ contexts_list[prev_context_guard.prio].push_back(prev_ctx);
+ } else {
+ idle_contexts(token.token()).push_back(prev_ctx);
+ }
+ }
+ return Ok(Some(guard));
+ }
+ }
+ }
+ }
+
+ // PASS 2: fallback DWRR deficit tracking
+
'priority: loop {
i = (i + 1) % 40;
total_iters += 1;
@@ -0,0 +1,47 @@
diff --git a/src/scheme/proc.rs b/src/scheme/proc.rs
--- a/src/scheme/proc.rs
+++ b/src/scheme/proc.rs
@@ -147,6 +147,7 @@ enum ContextHandle {
Priority,
SchedAffinity,
SchedPolicy,
+ Name,
MmapMinAddr(Arc<AddrSpaceWrapper>),
}
@@ -267,6 +268,7 @@ impl ProcScheme {
"sched-affinity" => (ContextHandle::SchedAffinity, true),
// TODO: Switch this kernel-local proc handle over to a stable upstream
// redox_syscall ProcCall::SetSchedPolicy opcode once that lands.
"sched-policy" => (ContextHandle::SchedPolicy, false),
+ "name" => (ContextHandle::Name, false),
"status" => (ContextHandle::Status { privileged: false }, false),
_ if path.starts_with("auth-") => {
let nonprefix = &path["auth-".len()..];
@@ -1218,6 +1220,16 @@ impl ContextHandle {
Ok(2)
}
+ ContextHandle::Name => {
+ let mut name_buf = [0u8; 32];
+ let len = buf.copy_common_bytes_to_slice(&mut name_buf[..31]).unwrap_or(0);
+ let mut context = context.write(token.token());
+ context.name.clear();
+ if let Ok(s) = core::str::from_utf8(&name_buf[..len]) {
+ context.name.push_str(s);
+ }
+ Ok(len)
+ }
ContextHandle::Status { privileged } => {
let mut args = buf.usizes();
@@ -1532,6 +1544,10 @@ impl ContextHandle {
let data = [context.sched_policy as u8, context.sched_rt_priority];
buf.copy_common_bytes_from_slice(&data)
}
+ ContextHandle::Name => {
+ let context = context.read(token.token());
+ buf.copy_common_bytes_from_slice(context.name.as_bytes())
+ }
ContextHandle::Status { .. } => {
let status = {
let context = context.read(token.token());
@@ -0,0 +1,70 @@
diff --git a/src/scheme/proc.rs b/src/scheme/proc.rs
--- a/src/scheme/proc.rs
+++ b/src/scheme/proc.rs
@@ -145,8 +145,9 @@ enum ContextHandle {
// TODO: Remove this once openat is implemented, or allow openat-via-dup via e.g. the top-level
// directory.
OpenViaDup,
+ Priority,
SchedAffinity,
SchedPolicy,
Name,
MmapMinAddr(Arc<AddrSpaceWrapper>),
@@ -160,6 +161,17 @@ pub struct ProcScheme;
static NEXT_ID: AtomicUsize = AtomicUsize::new(1);
static HANDLES: RwLock<L1, HashMap<usize, Handle>> =
RwLock::new(HashMap::with_hasher(DefaultHashBuilder::new()));
+
+const NICE_MIN: i32 = -20;
+const NICE_MAX: i32 = 19;
+
+fn nice_to_kernel_prio(nice: i32) -> usize {
+ (nice.saturating_add(20)).clamp(0, 39) as usize
+}
+
+fn kernel_prio_to_nice(prio: usize) -> i32 {
+ (prio.min(39) as i32) - 20
+}
#[cfg(feature = "debugger")]
#[allow(dead_code)]
pub fn foreach_addrsp(
@@ -253,6 +265,7 @@ impl ProcScheme {
"sighandler" => (ContextHandle::Sighandler, false),
"start" => (ContextHandle::Start, false),
"open_via_dup" => (ContextHandle::OpenViaDup, false),
+ "priority" => (ContextHandle::Priority, false),
"mmap-min-addr" => (
ContextHandle::MmapMinAddr(Arc::clone(
context
@@ -1191,6 +1204,17 @@ impl ContextHandle {
Ok(size_of_val(&mask))
}
+ Self::Priority => {
+ let nice = unsafe { buf.read_exact::<i32>()? };
+ if !(NICE_MIN..=NICE_MAX).contains(&nice) {
+ return Err(Error::new(EINVAL));
+ }
+
+ context
+ .write(token.token())
+ .set_sched_other_prio(nice_to_kernel_prio(nice));
+
+ Ok(size_of::<i32>())
+ }
Self::SchedPolicy => {
if buf.len() != 2 {
return Err(Error::new(EINVAL));
@@ -1522,6 +1546,10 @@ impl ContextHandle {
buf.copy_exactly(crate::cpu_set::mask_as_bytes(&mask))?;
Ok(size_of_val(&mask))
+ }
+ ContextHandle::Priority => {
+ let nice = kernel_prio_to_nice(context.read(token.token()).prio);
+ buf.copy_common_bytes_from_slice(&nice.to_ne_bytes())
}
ContextHandle::SchedPolicy => {
let context = context.read(token.token());
+364
View File
@@ -0,0 +1,364 @@
diff --git a/src/syscall/futex.rs b/src/syscall/futex.rs
--- a/src/syscall/futex.rs
+++ b/src/syscall/futex.rs
@@
-use crate::syscall::{
- data::TimeSpec,
- error::{Error, Result, EAGAIN, EFAULT, EINVAL, ETIMEDOUT},
- flag::{FUTEX_REQUEUE, FUTEX_WAIT, FUTEX_WAIT64, FUTEX_WAKE},
-};
+use crate::syscall::{
+ data::TimeSpec,
+ error::{Error, Result, EAGAIN, EDEADLK, EFAULT, EINVAL, EPERM, ETIMEDOUT},
+ flag::{FUTEX_REQUEUE, FUTEX_WAIT, FUTEX_WAIT64, FUTEX_WAKE},
+};
+
+const FUTEX_LOCK_PI: usize = 6;
+const FUTEX_UNLOCK_PI: usize = 7;
+const FUTEX_TRYLOCK_PI: usize = 8;
+
+const FUTEX_WAITERS: u32 = 0x8000_0000;
+const FUTEX_OWNER_DIED: u32 = 0x4000_0000;
+const FUTEX_TID_MASK: u32 = 0x3FFF_FFFF;
@@
-type FutexList = HashMap<PhysicalAddress, Vec<FutexEntry>>;
+type FutexList = HashMap<PhysicalAddress, FutexQueue>;
+
+#[derive(Clone, Copy, Debug, Eq, PartialEq)]
+enum FutexWaitKind {
+ Regular,
+ PriorityInheritance,
+}
+
+#[derive(Default)]
+struct FutexQueue {
+ waiters: Vec<FutexEntry>,
+ pi_owner: Option<Weak<ContextLock>>,
+}
+
+impl FutexQueue {
+ fn is_empty(&self) -> bool {
+ self.waiters.is_empty() && self.pi_owner.is_none()
+ }
+}
@@
pub struct FutexEntry {
@@
// address space to check against if virt matches but not phys
addr_space: Weak<AddrSpaceWrapper>,
+ kind: FutexWaitKind,
}
@@
+fn context_futex_tid(context: &crate::context::Context) -> u32 {
+ let tid = u32::try_from(context.pid).unwrap_or(context.debug_id) & FUTEX_TID_MASK;
+ if tid == 0 {
+ context.debug_id & FUTEX_TID_MASK
+ } else {
+ tid
+ }
+}
+
+fn current_context_futex_tid(context_lock: &Arc<ContextLock>, token: &mut CleanLockToken) -> u32 {
+ let context = context_lock.read(token.token());
+ context_futex_tid(&context)
+}
+
+fn push_owner_waiter(owner: &mut crate::context::Context, phys: PhysicalAddress) {
+ if !owner.futex_pi_waiters.iter().any(|waiter| *waiter == phys) {
+ owner.futex_pi_waiters.push(phys);
+ }
+}
+
+fn pop_owner_waiter(owner: &mut crate::context::Context, phys: PhysicalAddress) {
+ owner.futex_pi_waiters.retain(|waiter| *waiter != phys);
+}
+
+fn boost_pi_owner(
+ owner_lock: &Arc<ContextLock>,
+ waiter_prio: usize,
+ phys: PhysicalAddress,
+ token: &mut crate::sync::LockToken<'_, L1>,
+) {
+ let mut owner = owner_lock.write(token.token());
+ push_owner_waiter(&mut owner, phys);
+ if owner.prio > waiter_prio {
+ if !owner.futex_pi_boost {
+ owner.futex_pi_original_prio = owner.prio;
+ }
+ owner.futex_pi_boost = true;
+ owner.prio = owner.prio.min(waiter_prio);
+ }
+}
+
+fn restore_pi_owner(owner: &mut crate::context::Context, phys: PhysicalAddress) {
+ pop_owner_waiter(owner, phys);
+ if owner.futex_pi_boost && owner.futex_pi_waiters.is_empty() {
+ owner.futex_pi_boost = false;
+ owner.prio = owner.futex_pi_original_prio;
+ }
+}
+
+fn queue_waiter(
+ queue: &mut FutexQueue,
+ target_virtaddr: VirtualAddress,
+ context_lock: &Arc<ContextLock>,
+ addr_space: &Arc<AddrSpaceWrapper>,
+ kind: FutexWaitKind,
+) {
+ queue.waiters.push(FutexEntry {
+ target_virtaddr,
+ context_lock: Arc::clone(context_lock),
+ addr_space: Arc::downgrade(addr_space),
+ kind,
+ });
+}
@@
- futexes
- .entry(locked_physaddr)
- .or_insert_with(Vec::new)
- .push(FutexEntry {
- target_virtaddr,
- context_lock: context_lock.clone(),
- addr_space: Arc::downgrade(&current_addrsp),
- });
+ let queue = futexes.entry(locked_physaddr).or_insert_with(FutexQueue::default);
+ queue_waiter(
+ queue,
+ target_virtaddr,
+ &context_lock,
+ &current_addrsp,
+ FutexWaitKind::Regular,
+ );
@@
- let remove_queue = if let Some(futexes) = futexes_map.get_mut(&target_physaddr) {
- let mut i = 0;
- let current_addrsp_weak = Arc::downgrade(&current_addrsp);
- while i < futexes.len() && woken < val {
- let futex = unsafe { futexes.get_unchecked_mut(i) };
- if futex.target_virtaddr != target_virtaddr
- || !current_addrsp_weak.ptr_eq(&futex.addr_space)
- {
- i += 1;
- continue;
- }
- futex.context_lock.write(futex_token.token()).unblock();
- futexes.swap_remove(i);
- woken += 1;
- }
- futexes.is_empty()
+ let remove_queue = if let Some(queue) = futexes_map.get_mut(&target_physaddr) {
+ let mut i = 0;
+ let current_addrsp_weak = Arc::downgrade(&current_addrsp);
+ while i < queue.waiters.len() && woken < val {
+ let waiter = match queue.waiters.get(i) {
+ Some(waiter) => waiter,
+ None => break,
+ };
+ if waiter.kind != FutexWaitKind::Regular
+ || waiter.target_virtaddr != target_virtaddr
+ || !current_addrsp_weak.ptr_eq(&waiter.addr_space)
+ {
+ i += 1;
+ continue;
+ }
+ let waiter = queue.waiters.swap_remove(i);
+ waiter.context_lock.write(futex_token.token()).unblock();
+ woken += 1;
+ }
+ queue.is_empty()
} else {
false
};
@@
- let mut source_waiters = source_map.remove(&locked_source_physaddr).unwrap_or_default();
+ let mut source_queue = source_map.remove(&locked_source_physaddr).unwrap_or_default();
@@
- total_woken = wake_from(&mut source_waiters, val, &mut futex_token);
+ total_woken = wake_from(&mut source_queue.waiters, val, &mut futex_token);
@@
- let mut target_waiters = target_map.remove(&locked_target_physaddr).unwrap_or_default();
- let mut i = 0;
- while i < source_waiters.len() && total_requeued < val2 {
- let should_move = source_waiters
+ let mut target_queue = target_map.remove(&locked_target_physaddr).unwrap_or_default();
+ let mut i = 0;
+ while i < source_queue.waiters.len() && total_requeued < val2 {
+ let should_move = source_queue
+ .waiters
.get(i)
.map(|waiter| {
- waiter.target_virtaddr == target_virtaddr
+ waiter.kind == FutexWaitKind::Regular
+ && waiter.target_virtaddr == target_virtaddr
&& current_addrsp_weak.ptr_eq(&waiter.addr_space)
})
.unwrap_or(false);
@@
- let mut waiter = source_waiters.swap_remove(i);
- waiter.target_virtaddr = target2_virtaddr;
- target_waiters.push(waiter);
+ let mut waiter = source_queue.waiters.swap_remove(i);
+ waiter.target_virtaddr = target2_virtaddr;
+ target_queue.waiters.push(waiter);
total_requeued += 1;
}
- if !target_waiters.is_empty() {
- target_map.insert(locked_target_physaddr, target_waiters);
+ if !target_queue.is_empty() {
+ target_map.insert(locked_target_physaddr, target_queue);
}
@@
- if !source_waiters.is_empty() {
- source_map.insert(locked_source_physaddr, source_waiters);
+ if !source_queue.is_empty() {
+ source_map.insert(locked_source_physaddr, source_queue);
}
@@
+ FUTEX_LOCK_PI | FUTEX_TRYLOCK_PI => {
+ let _ = validate_futex_u32_addr(addr)?;
+ let context_lock = context::current();
+ let current_tid = current_context_futex_tid(&context_lock, token);
+ let current_prio = context_lock.read(token.token()).prio;
+
+ loop {
+ let outcome = {
+ let shard = futex_shard(target_physaddr);
+ let mut futexes = FUTEXES[shard].lock(token.token());
+ let (futexes, mut futex_token) = futexes.token_split();
+ let addr_space_guard = current_addrsp.acquire_read(futex_token.downgrade());
+ let locked_physaddr = validate_and_translate_virt(&addr_space_guard, target_virtaddr)
+ .ok_or(Error::new(EFAULT))?;
+ if locked_physaddr != target_physaddr {
+ None
+ } else {
+ drop(addr_space_guard);
+ let futex_atomic = futex_atomic_u32(locked_physaddr);
+ let mut current = futex_atomic.load(Ordering::SeqCst);
+ loop {
+ let owner_tid = current & FUTEX_TID_MASK;
+ let queue = futexes.entry(locked_physaddr).or_insert_with(FutexQueue::default);
+ let desired_waiters = if queue.waiters.is_empty() { 0 } else { FUTEX_WAITERS };
+
+ if owner_tid == 0 {
+ let desired = current_tid | desired_waiters;
+ match futex_atomic.compare_exchange(current, desired, Ordering::SeqCst, Ordering::SeqCst) {
+ Ok(_) => {
+ queue.pi_owner = Some(Arc::downgrade(&context_lock));
+ break Some(Ok(Ok(0)));
+ }
+ Err(actual) => current = actual,
+ }
+ continue;
+ }
+
+ if owner_tid == current_tid {
+ break Some(Ok(Err(Error::new(EDEADLK))));
+ }
+
+ if op == FUTEX_TRYLOCK_PI {
+ break Some(Ok(Err(Error::new(EAGAIN))));
+ }
+
+ if let Some(owner_lock) = queue.pi_owner.as_ref().and_then(Weak::upgrade) {
+ boost_pi_owner(&owner_lock, current_prio, locked_physaddr, &mut futex_token);
+ }
+
+ {
+ let mut context = context_lock.write(futex_token.token());
+ if let Some((tctl, pctl, _)) = context.sigcontrol()
+ && tctl.currently_pending_unblocked(pctl) != 0
+ {
+ break Some(Ok(Err(Error::new(EINTR))));
+ }
+ context.wake = None;
+ context.block("futex_pi");
+ }
+
+ queue_waiter(
+ queue,
+ target_virtaddr,
+ &context_lock,
+ &current_addrsp,
+ FutexWaitKind::PriorityInheritance,
+ );
+ futex_atomic.fetch_or(FUTEX_WAITERS, Ordering::SeqCst);
+ break Some(Ok(Ok(1)));
+ }
+ }
+ };
+
+ match outcome {
+ None => continue,
+ Some(Ok(Ok(0))) => return Ok(0),
+ Some(Ok(Ok(_))) => context::switch(token),
+ Some(Ok(Err(err))) => return Err(err),
+ Some(Err(err)) => return Err(err),
+ }
+ }
+ }
+ FUTEX_UNLOCK_PI => {
+ let _ = validate_futex_u32_addr(addr)?;
+ let context_lock = context::current();
+ let current_tid = current_context_futex_tid(&context_lock, token);
+ let shard = futex_shard(target_physaddr);
+ let current_addrsp_weak = Arc::downgrade(&current_addrsp);
+
+ let unlocked = {
+ let mut futexes = FUTEXES[shard].lock(token.token());
+ let (futexes, mut futex_token) = futexes.token_split();
+ let addr_space_guard = current_addrsp.acquire_read(futex_token.downgrade());
+ let locked_physaddr = validate_and_translate_virt(&addr_space_guard, target_virtaddr)
+ .ok_or(Error::new(EFAULT))?;
+ if locked_physaddr != target_physaddr {
+ return Err(Error::new(EAGAIN));
+ }
+ drop(addr_space_guard);
+
+ let futex_atomic = futex_atomic_u32(locked_physaddr);
+ let current = futex_atomic.load(Ordering::SeqCst);
+ if (current & FUTEX_TID_MASK) != current_tid {
+ return Err(Error::new(EPERM));
+ }
+
+ let mut wake_one = None;
+ let mut new = current & !(FUTEX_TID_MASK | FUTEX_OWNER_DIED);
+ if let Some(queue) = futexes.get_mut(&locked_physaddr) {
+ queue.pi_owner = None;
+ let mut best = None;
+ for (idx, waiter) in queue.waiters.iter().enumerate() {
+ if waiter.kind != FutexWaitKind::PriorityInheritance
+ || waiter.target_virtaddr != target_virtaddr
+ || !current_addrsp_weak.ptr_eq(&waiter.addr_space)
+ {
+ continue;
+ }
+ let prio = waiter.context_lock.read(futex_token.token()).prio;
+ match best {
+ Some((_, best_prio)) if prio >= best_prio => {}
+ _ => best = Some((idx, prio)),
+ }
+ }
+ if let Some((waiter_idx, _)) = best {
+ wake_one = Some(queue.waiters.swap_remove(waiter_idx));
+ }
+ if !queue.waiters.is_empty() {
+ new |= FUTEX_WAITERS;
+ }
+ }
+
+ futex_atomic.store(new, Ordering::SeqCst);
+ {
+ let mut context = context_lock.write(futex_token.token());
+ restore_pi_owner(&mut context, locked_physaddr);
+ }
+ if let Some(waiter) = wake_one {
+ waiter.context_lock.write(futex_token.token()).unblock();
+ }
+ true
+ };
+
+ Ok(usize::from(unlocked))
+ }
_ => Err(Error::new(EINVAL)),
}
}
+282
View File
@@ -0,0 +1,282 @@
diff --git a/src/syscall/debug.rs b/src/syscall/debug.rs
--- a/src/syscall/debug.rs
+++ b/src/syscall/debug.rs
@@
- SYS_FUTEX => format!(
- "futex({:#X} [{:?}], {}, {}, {}, {})",
+ SYS_FUTEX => format!(
+ "futex({:#X} [{:?}], {}, {}, {}, {}, {})",
b,
UserSlice::ro(b, 4).and_then(|buf| buf.read_u32()),
c,
d,
e,
- f
+ f,
+ g,
),
diff --git a/src/syscall/futex.rs b/src/syscall/futex.rs
--- a/src/syscall/futex.rs
+++ b/src/syscall/futex.rs
@@
-use crate::syscall::{
- data::TimeSpec,
- error::{Error, Result, EAGAIN, EFAULT, EINVAL, ETIMEDOUT},
- flag::{FUTEX_WAIT, FUTEX_WAIT64, FUTEX_WAKE},
-};
+use crate::syscall::{
+ data::TimeSpec,
+ error::{Error, Result, EAGAIN, EFAULT, EINVAL, ETIMEDOUT},
+ flag::{FUTEX_REQUEUE, FUTEX_WAIT, FUTEX_WAIT64, FUTEX_WAKE},
+};
+
+const FUTEX_CMP_REQUEUE: usize = 4;
@@
pub struct FutexEntry {
@@
}
+
+fn validate_futex_u32_addr(addr: usize) -> Result<VirtualAddress> {
+ if !addr.is_multiple_of(4) {
+ return Err(Error::new(EINVAL));
+ }
+ Ok(VirtualAddress::new(addr))
+}
+
+fn lock_futex_pair<R>(
+ first_shard: usize,
+ second_shard: usize,
+ token: &mut CleanLockToken,
+ f: impl FnOnce(&mut FutexList, Option<&mut FutexList>, crate::sync::LockToken<'_, L1>) -> R,
+) -> R {
+ if first_shard == second_shard {
+ let mut guard = FUTEXES[first_shard].lock(token.token());
+ let (map, map_token) = guard.token_split();
+ return f(map, None, map_token);
+ }
+
+ let low = core::cmp::min(first_shard, second_shard);
+ let high = core::cmp::max(first_shard, second_shard);
+
+ let mut low_guard = FUTEXES[low].lock(token.token());
+ let (low_map, low_token) = low_guard.token_split();
+ let mut high_guard = unsafe { FUTEXES[high].relock(low_token) };
+ let (high_map, high_token) = high_guard.token_split();
+
+ if first_shard == low {
+ f(low_map, Some(high_map), high_token)
+ } else {
+ f(high_map, Some(low_map), high_token)
+ }
+}
@@
-pub fn futex(
- addr: usize,
- op: usize,
- val: usize,
- val2: usize,
- _addr2: usize,
- token: &mut CleanLockToken,
-) -> Result<usize> {
+pub fn futex(
+ addr: usize,
+ op: usize,
+ val: usize,
+ val2: usize,
+ addr2: usize,
+ val3: usize,
+ token: &mut CleanLockToken,
+) -> Result<usize> {
@@
- {
- // TODO: Lock ordering violation
- let mut token = unsafe { CleanLockToken::new() };
- let mut futexes = FUTEXES[futex_shard(target_physaddr)].lock(token.token());
- let (futexes, mut token) = futexes.token_split();
+ loop {
+ let shard = futex_shard(target_physaddr);
+ let queued = {
+ let mut futexes = FUTEXES[shard].lock(token.token());
+ let (futexes, mut futex_token) = futexes.token_split();
+ let addr_space_guard = current_addrsp.acquire_read(futex_token.downgrade());
+ let locked_physaddr = validate_and_translate_virt(&addr_space_guard, target_virtaddr)
+ .ok_or(Error::new(EFAULT))?;
+ if locked_physaddr != target_physaddr {
+ false
+ } else {
+ drop(addr_space_guard);
@@
- futexes
- .entry(target_physaddr)
- .or_insert_with(Vec::new)
- .push(FutexEntry {
- target_virtaddr,
- context_lock: context_lock.clone(),
- addr_space: Arc::downgrade(&current_addrsp),
- });
- }
+ futexes
+ .entry(locked_physaddr)
+ .or_insert_with(Vec::new)
+ .push(FutexEntry {
+ target_virtaddr,
+ context_lock: context_lock.clone(),
+ addr_space: Arc::downgrade(&current_addrsp),
+ });
+ true
+ }
+ };
+
+ if queued {
+ break;
+ }
+ }
@@
- drop(addr_space_guard);
-
context::switch(token);
@@
FUTEX_WAKE => {
@@
Ok(woken)
}
+ FUTEX_REQUEUE | FUTEX_CMP_REQUEUE => {
+ let _ = validate_futex_u32_addr(addr)?;
+ let target2_virtaddr = validate_futex_u32_addr(addr2)?;
+ let target2_physaddr = {
+ let addr_space_guard = current_addrsp.acquire_read(token.downgrade());
+ validate_and_translate_virt(&addr_space_guard, target2_virtaddr)
+ .ok_or(Error::new(EFAULT))?
+ };
+ let source_shard = futex_shard(target_physaddr);
+ let target_shard = futex_shard(target2_physaddr);
+ let current_addrsp_weak = Arc::downgrade(&current_addrsp);
+
+ let affected = lock_futex_pair(
+ source_shard,
+ target_shard,
+ token,
+ |source_map, target_map_opt, mut futex_token| {
+ let addr_space_guard = current_addrsp.acquire_read(futex_token.downgrade());
+ let locked_source_physaddr = validate_and_translate_virt(&addr_space_guard, target_virtaddr)
+ .ok_or(Error::new(EFAULT))?;
+ let locked_target_physaddr = validate_and_translate_virt(&addr_space_guard, target2_virtaddr)
+ .ok_or(Error::new(EFAULT))?;
+ drop(addr_space_guard);
+
+ if locked_source_physaddr != target_physaddr || locked_target_physaddr != target2_physaddr {
+ return Err(Error::new(EAGAIN));
+ }
+
+ if op == FUTEX_CMP_REQUEUE {
+ let accessible_addr = crate::memory::RmmA::phys_to_virt(locked_source_physaddr).data();
+ let current = u64::from(unsafe {
+ (*(accessible_addr as *const AtomicU32)).load(Ordering::SeqCst)
+ });
+ if current != u64::from(val3 as u32) {
+ return Err(Error::new(EAGAIN));
+ }
+ }
+
+ let mut source_waiters = source_map.remove(&locked_source_physaddr).unwrap_or_default();
+ let mut total_woken = 0;
+ let mut total_requeued = 0;
+
+ let wake_from = |waiters: &mut Vec<FutexEntry>, limit: usize, token: &mut crate::sync::LockToken<'_, L1>| {
+ let mut woken = 0;
+ let mut i = 0;
+ while i < waiters.len() && woken < limit {
+ let waiter = match waiters.get(i) {
+ Some(waiter) => waiter,
+ None => break,
+ };
+ if waiter.target_virtaddr != target_virtaddr || !current_addrsp_weak.ptr_eq(&waiter.addr_space) {
+ i += 1;
+ continue;
+ }
+ let waiter = waiters.swap_remove(i);
+ waiter.context_lock.write(token.token()).unblock();
+ woken += 1;
+ }
+ woken
+ };
+
+ total_woken = wake_from(&mut source_waiters, val, &mut futex_token);
+
+ if let Some(target_map) = target_map_opt {
+ let mut target_waiters = target_map.remove(&locked_target_physaddr).unwrap_or_default();
+ let mut i = 0;
+ while i < source_waiters.len() && total_requeued < val2 {
+ let should_move = source_waiters
+ .get(i)
+ .map(|waiter| {
+ waiter.target_virtaddr == target_virtaddr
+ && current_addrsp_weak.ptr_eq(&waiter.addr_space)
+ })
+ .unwrap_or(false);
+ if !should_move {
+ i += 1;
+ continue;
+ }
+ let mut waiter = source_waiters.swap_remove(i);
+ waiter.target_virtaddr = target2_virtaddr;
+ target_waiters.push(waiter);
+ total_requeued += 1;
+ }
+ if !target_waiters.is_empty() {
+ target_map.insert(locked_target_physaddr, target_waiters);
+ }
+ } else if locked_source_physaddr == locked_target_physaddr {
+ for waiter in source_waiters.iter_mut() {
+ if total_requeued >= val2 {
+ break;
+ }
+ if waiter.target_virtaddr == target_virtaddr && current_addrsp_weak.ptr_eq(&waiter.addr_space) {
+ waiter.target_virtaddr = target2_virtaddr;
+ total_requeued += 1;
+ }
+ }
+ } else {
+ let mut target_waiters = source_map.remove(&locked_target_physaddr).unwrap_or_default();
+ let mut i = 0;
+ while i < source_waiters.len() && total_requeued < val2 {
+ let should_move = source_waiters
+ .get(i)
+ .map(|waiter| {
+ waiter.target_virtaddr == target_virtaddr
+ && current_addrsp_weak.ptr_eq(&waiter.addr_space)
+ })
+ .unwrap_or(false);
+ if !should_move {
+ i += 1;
+ continue;
+ }
+ let mut waiter = source_waiters.swap_remove(i);
+ waiter.target_virtaddr = target2_virtaddr;
+ target_waiters.push(waiter);
+ total_requeued += 1;
+ }
+ if !target_waiters.is_empty() {
+ source_map.insert(locked_target_physaddr, target_waiters);
+ }
+ }
+
+ if !source_waiters.is_empty() {
+ source_map.insert(locked_source_physaddr, source_waiters);
+ }
+
+ Ok(total_woken + total_requeued)
+ },
+ )?;
+
+ Ok(affected)
+ }
_ => Err(Error::new(EINVAL)),
}
}
diff --git a/src/syscall/mod.rs b/src/syscall/mod.rs
--- a/src/syscall/mod.rs
+++ b/src/syscall/mod.rs
@@
- SYS_FUTEX => futex(b, c, d, e, f, token),
+ SYS_FUTEX => futex(b, c, d, e, f, g, token),
+264
View File
@@ -0,0 +1,264 @@
diff --git a/src/context/context.rs b/src/context/context.rs
--- a/src/context/context.rs
+++ b/src/context/context.rs
@@
#[allow(dead_code)]
pub futex_pi_waiters: Vec<PhysicalAddress>,
+ pub robust_list_head: Option<usize>,
@@
futex_pi_boost: false,
futex_pi_original_prio: DEFAULT_SCHED_OTHER_PRIORITY,
futex_pi_waiters: Vec::new(),
+ robust_list_head: None,
being_sigkilled: false,
diff --git a/src/syscall/debug.rs b/src/syscall/debug.rs
--- a/src/syscall/debug.rs
+++ b/src/syscall/debug.rs
@@
use crate::{sync::CleanLockToken, syscall::error::Result};
+
+const SYS_SET_ROBUST_LIST: usize = 311;
+const SYS_GET_ROBUST_LIST: usize = 312;
@@
SYS_FUTEX => format!(
"futex({:#X} [{:?}], {}, {}, {}, {}, {})",
@@
),
+ SYS_SET_ROBUST_LIST => format!("set_robust_list({:#X}, {})", b, c),
+ SYS_GET_ROBUST_LIST => format!("get_robust_list({}, {:#X}, {:#X})", b, c, d),
SYS_MKNS => format!(
diff --git a/src/syscall/futex.rs b/src/syscall/futex.rs
--- a/src/syscall/futex.rs
+++ b/src/syscall/futex.rs
@@
-use crate::syscall::{
- data::TimeSpec,
- error::{Error, Result, EAGAIN, EDEADLK, EFAULT, EINVAL, EPERM, ETIMEDOUT},
- flag::{FUTEX_REQUEUE, FUTEX_WAIT, FUTEX_WAIT64, FUTEX_WAKE},
-};
+use crate::syscall::{
+ data::TimeSpec,
+ error::{Error, Result, EAGAIN, EDEADLK, EFAULT, EINVAL, EPERM, ESRCH, ETIMEDOUT},
+ flag::{FUTEX_REQUEUE, FUTEX_WAIT, FUTEX_WAIT64, FUTEX_WAKE},
+};
+
+use super::usercopy::UserSliceWo;
@@
const FUTEX_WAITERS: u32 = 0x8000_0000;
const FUTEX_OWNER_DIED: u32 = 0x4000_0000;
const FUTEX_TID_MASK: u32 = 0x3FFF_FFFF;
+
+const ROBUST_LIST_LIMIT: usize = 2048;
+const ROBUST_LIST_HEAD_SIZE: usize = size_of::<RobustListHead>();
@@
pub struct FutexEntry {
@@
}
+
+#[derive(Clone, Copy, Debug)]
+#[repr(C)]
+struct RobustList {
+ next: usize,
+}
+
+#[derive(Clone, Copy, Debug)]
+#[repr(C)]
+struct RobustListHead {
+ list: RobustList,
+ futex_offset: isize,
+ list_op_pending: usize,
+}
@@
+fn lookup_robust_list_head(pid: usize, token: &mut CleanLockToken) -> Result<(usize, usize)> {
+ let current = context::current();
+ {
+ let current_guard = current.read(token.token());
+ if pid == 0 || current_guard.pid == pid {
+ return Ok((current_guard.robust_list_head.unwrap_or(0), ROBUST_LIST_HEAD_SIZE));
+ }
+ }
+
+ let mut token_ref = token.token();
+ let mut contexts = context::contexts(token_ref.downgrade());
+ let (contexts, mut contexts_token) = contexts.token_split();
+ for context_ref in contexts.iter() {
+ let context = context_ref.read(contexts_token.token());
+ if context.pid == pid {
+ return Ok((context.robust_list_head.unwrap_or(0), ROBUST_LIST_HEAD_SIZE));
+ }
+ }
+
+ Err(Error::new(ESRCH))
+}
+
+fn walk_robust_list_node(
+ node_ptr: usize,
+ futex_offset: isize,
+ owner_tid: u32,
+ token: &mut CleanLockToken,
+) {
+ if node_ptr == 0 {
+ return;
+ }
+
+ let Ok(futex_addr) = node_ptr.checked_add_signed(futex_offset).ok_or(Error::new(EFAULT)) else {
+ return;
+ };
+ let Ok(target_virtaddr) = validate_futex_u32_addr(futex_addr) else {
+ return;
+ };
+
+ let current_addrsp = match AddrSpace::current() {
+ Ok(addrsp) => addrsp,
+ Err(_) => return,
+ };
+
+ let shard = futex_shard(validate_and_translate_virt(
+ &current_addrsp.acquire_read(token.downgrade()),
+ target_virtaddr,
+ ).ok_or(Error::new(EFAULT)).unwrap_or_else(|_| return));
+
+ let mut futexes = FUTEXES[shard].lock(token.token());
+ let (futexes, mut futex_token) = futexes.token_split();
+ let addr_space_guard = current_addrsp.acquire_read(futex_token.downgrade());
+ let Some(locked_physaddr) = validate_and_translate_virt(&addr_space_guard, target_virtaddr) else {
+ return;
+ };
+ drop(addr_space_guard);
+
+ let futex_atomic = futex_atomic_u32(locked_physaddr);
+ let current = futex_atomic.load(Ordering::SeqCst);
+ if (current & FUTEX_TID_MASK) != owner_tid {
+ return;
+ }
+
+ let mut new = (current & FUTEX_WAITERS) | FUTEX_OWNER_DIED;
+ if let Some(queue) = futexes.get_mut(&locked_physaddr) {
+ queue.pi_owner = None;
+ let mut woke = false;
+ let mut i = 0;
+ while i < queue.waiters.len() && !woke {
+ let waiter = match queue.waiters.get(i) {
+ Some(waiter) => waiter,
+ None => break,
+ };
+ if waiter.target_virtaddr != target_virtaddr || !Arc::downgrade(&current_addrsp).ptr_eq(&waiter.addr_space) {
+ i += 1;
+ continue;
+ }
+ let waiter = queue.waiters.swap_remove(i);
+ waiter.context_lock.write(futex_token.token()).unblock();
+ woke = true;
+ }
+ if !queue.waiters.is_empty() {
+ new |= FUTEX_WAITERS;
+ }
+ }
+
+ futex_atomic.store(new, Ordering::SeqCst);
+}
+
+pub fn cleanup_current_robust_futexes(token: &mut CleanLockToken) {
+ let context_lock = context::current();
+ let (head_ptr, owner_tid) = {
+ let context = context_lock.read(token.token());
+ let Some(head_ptr) = context.robust_list_head else {
+ return;
+ };
+ (head_ptr, context_futex_tid(&context))
+ };
+
+ let Ok(head) = UserSlice::ro(head_ptr, ROBUST_LIST_HEAD_SIZE)
+ .and_then(|slice| unsafe { slice.read_exact::<RobustListHead>() })
+ else {
+ return;
+ };
+
+ let mut next = head.list.next;
+ let mut walked = 0;
+ while next != 0 && next != head_ptr && walked < ROBUST_LIST_LIMIT {
+ let node_ptr = next;
+ let Ok(node) = UserSlice::ro(node_ptr, size_of::<RobustList>())
+ .and_then(|slice| unsafe { slice.read_exact::<RobustList>() })
+ else {
+ break;
+ };
+ walk_robust_list_node(node_ptr, head.futex_offset, owner_tid, token);
+ next = node.next;
+ walked += 1;
+ }
+
+ if head.list_op_pending != 0 {
+ walk_robust_list_node(head.list_op_pending, head.futex_offset, owner_tid, token);
+ }
+}
+
+pub fn set_robust_list(head: usize, len: usize, token: &mut CleanLockToken) -> Result<()> {
+ if len != ROBUST_LIST_HEAD_SIZE {
+ return Err(Error::new(EINVAL));
+ }
+ if head != 0 {
+ UserSlice::ro(head, ROBUST_LIST_HEAD_SIZE)?;
+ }
+
+ let current = context::current();
+ current.write(token.token()).robust_list_head = (head != 0).then_some(head);
+ Ok(())
+}
+
+pub fn get_robust_list(pid: usize, head_ptr: usize, len_ptr: usize, token: &mut CleanLockToken) -> Result<()> {
+ let (head, len) = lookup_robust_list_head(pid, token)?;
+ UserSliceWo::wo(head_ptr, size_of::<usize>())?.write_usize(head)?;
+ UserSliceWo::wo(len_ptr, size_of::<usize>())?.write_usize(len)?;
+ Ok(())
+}
diff --git a/src/syscall/mod.rs b/src/syscall/mod.rs
--- a/src/syscall/mod.rs
+++ b/src/syscall/mod.rs
@@
-pub use self::{
- fs::*,
- futex::futex,
- process::*,
- time::*,
- usercopy::validate_region,
-};
+pub use self::{
+ fs::*,
+ futex::{futex, get_robust_list, set_robust_list},
+ process::*,
+ time::*,
+ usercopy::validate_region,
+};
@@
+const SYS_SET_ROBUST_LIST: usize = 311;
+const SYS_GET_ROBUST_LIST: usize = 312;
@@
SYS_CLOCK_GETTIME => {
clock_gettime(b, UserSlice::wo(c, size_of::<TimeSpec>())?, token).map(|()| 0)
}
SYS_FUTEX => futex(b, c, d, e, f, g, token),
+ SYS_SET_ROBUST_LIST => set_robust_list(b, c, token).map(|()| 0),
+ SYS_GET_ROBUST_LIST => get_robust_list(b, c, d, token).map(|()| 0),
SYS_MPROTECT => mprotect(b, c, MapFlags::from_bits_truncate(d), token).map(|()| 0),
diff --git a/src/syscall/process.rs b/src/syscall/process.rs
--- a/src/syscall/process.rs
+++ b/src/syscall/process.rs
@@
pub fn exit_this_context(excp: Option<syscall::Exception>, token: &mut CleanLockToken) -> ! {
let mut close_files;
let addrspace_opt;
+ super::futex::cleanup_current_robust_futexes(token);
+
let context_lock = context::current();
{
let mut context = context_lock.write(token.token());
@@
addrspace_opt = context
.set_addr_space(None, token.downgrade())
.and_then(|a| Arc::try_unwrap(a).ok());
+ context.robust_list_head = None;
drop(mem::replace(&mut context.syscall_head, SyscallFrame::Dummy));
drop(mem::replace(&mut context.syscall_tail, SyscallFrame::Dummy));
@@ -0,0 +1,56 @@
diff --git a/src/context/mod.rs b/src/context/mod.rs
--- a/src/context/mod.rs
+++ b/src/context/mod.rs
@@ -10,9 +10,9 @@ use core::{num::NonZeroUsize, ops::Deref};
use crate::{
context::memory::AddrSpaceWrapper,
- cpu_set::LogicalCpuSet,
+ cpu_set::{LogicalCpuId, LogicalCpuSet},
memory::{RmmA, RmmArch, TableKind},
- percpu::PercpuBlock,
+ percpu::{get_percpu_block, PercpuBlock},
sync::{
ArcRwLockWriteGuard, CleanLockToken, LockToken, Mutex, MutexGuard, RwLock, RwLockReadGuard,
RwLockWriteGuard, L0, L1, L2, L4,
@@ -118,6 +118,30 @@ pub fn run_contexts(token: LockToken<'_, L0>) -> MutexGuard<'_, L1, RunContextDa
RUN_CONTEXTS.lock(token)
}
+fn least_loaded_cpu() -> LogicalCpuId {
+ let current_cpu = crate::cpu_id();
+ let mut best_cpu = current_cpu;
+ let mut best_depth = usize::MAX;
+
+ for raw_id in 0..crate::cpu_count() {
+ let cpu_id = LogicalCpuId::new(raw_id);
+ let Some(percpu) = get_percpu_block(cpu_id) else {
+ continue;
+ };
+
+ percpu.sched.take_lock();
+ let depth = unsafe { percpu.sched.queues().iter().map(|queue| queue.len()).sum() };
+ percpu.sched.release_lock();
+
+ if depth < best_depth {
+ best_depth = depth;
+ best_cpu = cpu_id;
+ }
+ }
+
+ best_cpu
+}
+
pub fn init(token: &mut CleanLockToken) {
let owner = None; // kmain not owned by any fd
let mut context = Context::new(owner).expect("failed to create kmain context");
@@ -238,6 +262,9 @@ pub fn spawn(
context.kstack = Some(stack);
context.userspace = userspace_allowed;
+ let target_cpu = least_loaded_cpu();
+ context.sched_affinity = LogicalCpuSet::empty();
+ context.sched_affinity.atomic_set(target_cpu);
let context_lock = Arc::new(ContextLock::new(context));
let context_ref = ContextRef(Arc::clone(&context_lock));
+146
View File
@@ -0,0 +1,146 @@
diff --git a/src/percpu.rs b/src/percpu.rs
--- a/src/percpu.rs
+++ b/src/percpu.rs
@@ -29,12 +29,14 @@ pub struct PerCpuSched {
pub run_queues_lock: AtomicBool,
pub balance: Cell<[usize; RUN_QUEUE_COUNT]>,
pub last_queue: Cell<usize>,
+ pub last_balance_time: Cell<u128>,
}
impl PerCpuSched {
pub const fn new() -> Self {
const EMPTY: VecDeque<WeakContextRef> = VecDeque::new();
Self {
run_queues: SyncUnsafeCell::new([EMPTY; RUN_QUEUE_COUNT]),
run_queues_lock: AtomicBool::new(false),
balance: Cell::new([0; RUN_QUEUE_COUNT]),
last_queue: Cell::new(0),
+ last_balance_time: Cell::new(0),
}
}
diff --git a/src/context/switch.rs b/src/context/switch.rs
--- a/src/context/switch.rs
+++ b/src/context/switch.rs
@@ -33,6 +33,8 @@ const SCHED_PRIO_TO_WEIGHT: [usize; 40] = [
70, 56, 45, 36, 29, 23, 18, 15,
];
+const LOAD_BALANCE_INTERVAL_NS: u128 = 100_000_000;
+
static SCHED_STEAL_COUNT: AtomicUsize = AtomicUsize::new(0);
@@ -101,6 +103,9 @@ pub fn tick(token: &mut CleanLockToken) {
let new_ticks = ticks_cell.get() + 1;
ticks_cell.set(new_ticks);
+ let balance_time = crate::time::monotonic(token);
+ maybe_balance_queues(token, percpu, balance_time);
+
// Trigger a context switch after every 3 ticks.
if new_ticks >= 3 {
switch(token);
@@ -427,6 +432,92 @@ fn steal_work(
None
}
+
+fn queue_depth(percpu: &PercpuBlock) -> usize {
+ let mut sched_lock = SchedQueuesLock::new(&percpu.sched);
+ unsafe {
+ sched_lock
+ .queues_mut()
+ .iter()
+ .map(|queue| queue.len())
+ .sum()
+ }
+}
+
+fn migrate_one_context(
+ token: &mut CleanLockToken,
+ source_id: LogicalCpuId,
+ target_id: LogicalCpuId,
+ switch_time: u128,
+) -> bool {
+ let Some(source) = get_percpu_block(source_id) else {
+ return false;
+ };
+ let Some(target) = get_percpu_block(target_id) else {
+ return false;
+ };
+
+ let source_idle = source.switch_internals.idle_context();
+ let moved = {
+ let mut source_lock = SchedQueuesLock::new(&source.sched);
+ let source_queues = unsafe { source_lock.queues_mut() };
+ pop_movable_context(token, source_queues, target_id, switch_time, &source_idle)
+ };
+
+ let Some((prio, context_ref)) = moved else {
+ return false;
+ };
+
+ let mut target_lock = SchedQueuesLock::new(&target.sched);
+ unsafe {
+ target_lock.queues_mut()[prio].push_back(context_ref);
+ }
+ true
+}
+
+fn maybe_balance_queues(token: &mut CleanLockToken, percpu: &PercpuBlock, balance_time: u128) {
+ if crate::cpu_count() <= 1 || percpu.cpu_id != LogicalCpuId::BSP {
+ return;
+ }
+ if balance_time.saturating_sub(percpu.sched.last_balance_time.get()) < LOAD_BALANCE_INTERVAL_NS
+ {
+ return;
+ }
+
+ percpu.sched.last_balance_time.set(balance_time);
+
+ let mut depths = Vec::new();
+ let mut total_depth = 0usize;
+ for raw_id in 0..crate::cpu_count() {
+ let cpu_id = LogicalCpuId::new(raw_id);
+ let Some(cpu_percpu) = get_percpu_block(cpu_id) else {
+ continue;
+ };
+ let depth = queue_depth(cpu_percpu);
+ total_depth += depth;
+ depths.push((cpu_id, depth));
+ }
+
+ if depths.len() <= 1 || total_depth == 0 {
+ return;
+ }
+
+ let avg_depth = (total_depth + depths.len().saturating_sub(1)) / depths.len();
+
+ for target_index in 0..depths.len() {
+ if depths[target_index].1 != 0 {
+ continue;
+ }
+
+ let mut source_index = None;
+ let mut source_depth = 0usize;
+ for (idx, &(_, depth)) in depths.iter().enumerate() {
+ if idx == target_index {
+ continue;
+ }
+ if depth > avg_depth + 1 && depth > source_depth {
+ source_index = Some(idx);
+ source_depth = depth;
+ }
+ }
+
+ let Some(source_index) = source_index else {
+ continue;
+ };
+
+ let source_id = depths[source_index].0;
+ let target_id = depths[target_index].0;
+ if migrate_one_context(token, source_id, target_id, balance_time) {
+ depths[source_index].1 = depths[source_index].1.saturating_sub(1);
+ depths[target_index].1 += 1;
+ }
+ }
+}
+123
View File
@@ -0,0 +1,123 @@
diff --git a/src/percpu.rs b/src/percpu.rs
index f4ad5e6..da10036 100644
--- a/src/percpu.rs
+++ b/src/percpu.rs
@@ -1,9 +1,10 @@
use alloc::{
+ collections::VecDeque,
sync::{Arc, Weak},
vec::Vec,
};
use core::{
- cell::{Cell, RefCell},
+ cell::{Cell, RefCell, SyncUnsafeCell},
sync::atomic::{AtomicBool, AtomicPtr, Ordering},
};
@@ -12,7 +13,10 @@ use syscall::PtraceFlags;
use crate::{
arch::device::ArchPercpuMisc,
- context::{empty_cr3, memory::AddrSpaceWrapper, switch::ContextSwitchPercpu},
+ context::{
+ empty_cr3, memory::AddrSpaceWrapper, switch::ContextSwitchPercpu, WeakContextRef,
+ RUN_QUEUE_COUNT,
+ },
cpu_set::{LogicalCpuId, MAX_CPU_COUNT},
cpu_stats::{CpuStats, CpuStatsData},
ptrace::Session,
@@ -20,6 +24,58 @@ use crate::{
syscall::debug::SyscallDebugInfo,
};
+#[allow(dead_code)]
+pub struct PerCpuSched {
+ pub run_queues: SyncUnsafeCell<[VecDeque<WeakContextRef>; RUN_QUEUE_COUNT]>,
+ pub run_queues_lock: AtomicBool,
+ pub balance: Cell<[usize; RUN_QUEUE_COUNT]>,
+ pub last_queue: Cell<usize>,
+ pub last_balance_time: Cell<u128>,
+}
+
+impl PerCpuSched {
+ pub const fn new() -> Self {
+ const EMPTY: VecDeque<WeakContextRef> = VecDeque::new();
+ Self {
+ run_queues: SyncUnsafeCell::new([EMPTY; RUN_QUEUE_COUNT]),
+ run_queues_lock: AtomicBool::new(false),
+ balance: Cell::new([0; RUN_QUEUE_COUNT]),
+ last_queue: Cell::new(0),
+ last_balance_time: Cell::new(0),
+ }
+ }
+
+ pub fn take_lock(&self) {
+ while self
+ .run_queues_lock
+ .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
+ .is_err()
+ {
+ while self.run_queues_lock.load(Ordering::Relaxed) {
+ core::hint::spin_loop();
+ }
+ }
+ }
+
+ pub fn release_lock(&self) {
+ self.run_queues_lock.store(false, Ordering::Release);
+ }
+
+ /// # Safety
+ ///
+ /// The caller must hold `run_queues_lock` while accessing the returned reference.
+ pub unsafe fn queues(&self) -> &[VecDeque<WeakContextRef>; RUN_QUEUE_COUNT] {
+ unsafe { &*self.run_queues.get() }
+ }
+
+ /// # Safety
+ ///
+ /// The caller must hold `run_queues_lock` while accessing the returned reference.
+ pub unsafe fn queues_mut(&self) -> &mut [VecDeque<WeakContextRef>; RUN_QUEUE_COUNT] {
+ unsafe { &mut *self.run_queues.get() }
+ }
+}
+
/// The percpu block, that stored all percpu variables.
pub struct PercpuBlock {
/// A unique immutable number that identifies the current CPU - used for scheduling
@@ -31,8 +87,8 @@ pub struct PercpuBlock {
pub current_addrsp: RefCell<Option<Arc<AddrSpaceWrapper>>>,
pub new_addrsp_tmp: Cell<Option<Arc<AddrSpaceWrapper>>>,
pub wants_tlb_shootdown: AtomicBool,
- pub balance: Cell<[usize; 40]>,
- pub last_queue: Cell<usize>,
+
+ pub sched: PerCpuSched,
// TODO: Put mailbox queues here, e.g. for TLB shootdown? Just be sure to 128-byte align it
// first to avoid cache invalidation.
@@ -57,6 +113,14 @@ pub unsafe fn init_tlb_shootdown(id: LogicalCpuId, block: *mut PercpuBlock) {
ALL_PERCPU_BLOCKS[id.get() as usize].store(block, Ordering::Release)
}
+pub fn get_percpu_block(id: LogicalCpuId) -> Option<&'static PercpuBlock> {
+ unsafe {
+ ALL_PERCPU_BLOCKS[id.get() as usize]
+ .load(Ordering::Acquire)
+ .as_ref()
+ }
+}
+
pub fn get_all_stats() -> Vec<(LogicalCpuId, CpuStatsData)> {
let mut res = ALL_PERCPU_BLOCKS
.iter()
@@ -187,8 +251,7 @@ impl PercpuBlock {
current_addrsp: RefCell::new(None),
new_addrsp_tmp: Cell::new(None),
wants_tlb_shootdown: AtomicBool::new(false),
- balance: Cell::new([0; 40]),
- last_queue: Cell::new(39),
+ sched: PerCpuSched::new(),
ptrace_flags: Cell::new(PtraceFlags::empty()),
ptrace_session: RefCell::new(None),
inside_syscall: Cell::new(false),
+985
View File
@@ -0,0 +1,985 @@
diff --git a/src/context/switch.rs b/src/context/switch.rs
index 86684c8..d054734 100644
--- a/src/context/switch.rs
+++ b/src/context/switch.rs
@@ -5,18 +5,18 @@
use crate::{
context::{
self, arch, idle_contexts, idle_contexts_try, run_contexts, ArcContextLockWriteGuard,
- Context, ContextLock, WeakContextRef,
+ Context, ContextLock, SchedPolicy, WeakContextRef, RUN_QUEUE_COUNT,
},
- cpu_set::LogicalCpuId,
+ cpu_set::{LogicalCpuId, LogicalCpuSet},
cpu_stats::{self, CpuState},
- percpu::PercpuBlock,
- sync::{ArcRwLockWriteGuard, CleanLockToken, L4},
+ percpu::{get_percpu_block, PerCpuSched, PercpuBlock},
+ sync::{ArcRwLockWriteGuard, CleanLockToken, LockToken, L1, L4},
};
use alloc::{sync::Arc, vec::Vec};
use core::{
cell::{Cell, RefCell},
hint, mem,
- sync::atomic::Ordering,
+ sync::atomic::{AtomicUsize, Ordering},
};
use syscall::PtraceFlags;
@@ -33,35 +33,49 @@ const SCHED_PRIO_TO_WEIGHT: [usize; 40] = [
70, 56, 45, 36, 29, 23, 18, 15,
];
-/// Determines if a given context is eligible to be scheduled on a given CPU (in
-/// principle, the current CPU).
-///
-/// # Safety
-/// This function is unsafe because it modifies the `context`'s state directly without synchronization.
-///
-/// # Parameters
-/// - `context`: The context (process/thread) to be checked.
-/// - `cpu_id`: The logical ID of the CPU on which the context is being scheduled.
-///
-/// # Returns
-/// - `UpdateResult::CanSwitch`: If the context can be switched to.
-/// - `UpdateResult::Skip`: If the context should be skipped (e.g., it's running on another CPU).
+const LOAD_BALANCE_INTERVAL_NS: u128 = 100_000_000;
+
+static SCHED_STEAL_COUNT: AtomicUsize = AtomicUsize::new(0);
+
+struct SchedQueuesLock<'a> {
+ sched: &'a PerCpuSched,
+}
+
+impl<'a> SchedQueuesLock<'a> {
+ fn new(sched: &'a PerCpuSched) -> Self {
+ sched.take_lock();
+ Self { sched }
+ }
+
+ unsafe fn queues_mut(
+ &mut self,
+ ) -> &mut [alloc::collections::VecDeque<WeakContextRef>; RUN_QUEUE_COUNT] {
+ unsafe { self.sched.queues_mut() }
+ }
+}
+
+impl Drop for SchedQueuesLock<'_> {
+ fn drop(&mut self) {
+ self.sched.release_lock();
+ }
+}
+
+fn assign_context_to_cpu(context: &mut Context, cpu_id: LogicalCpuId) {
+ context.sched_affinity = LogicalCpuSet::empty();
+ context.sched_affinity.atomic_set(cpu_id);
+}
+
unsafe fn update_runnable(
context: &mut Context,
cpu_id: LogicalCpuId,
switch_time: u128,
) -> UpdateResult {
- // Ignore contexts that are already running.
if context.running {
return UpdateResult::Skip;
}
-
- // Ignore contexts assigned to other CPUs.
if !context.sched_affinity.contains(cpu_id) {
return UpdateResult::Skip;
}
-
- // If context is soft-blocked and has a wake-up time, check if it should wake up.
if context.status.is_soft_blocked()
&& let Some(wake) = context.wake
&& switch_time >= wake
@@ -69,8 +83,6 @@ unsafe fn update_runnable(
context.wake = None;
context.unblock_no_ipi();
}
-
- // If the context is runnable, indicate it can be switched to.
if context.status.is_runnable() {
UpdateResult::CanSwitch
} else {
@@ -90,12 +102,16 @@ struct SwitchResultInner {
///
/// The function also calls the signal handler after switching contexts.
pub fn tick(token: &mut CleanLockToken) {
- let ticks_cell = &PercpuBlock::current().switch_internals.pit_ticks;
+ let percpu = PercpuBlock::current();
+ let ticks_cell = &percpu.switch_internals.pit_ticks;
let new_ticks = ticks_cell.get() + 1;
ticks_cell.set(new_ticks);
- // Trigger a context switch after every 3 ticks (approx. 6.75 ms).
+ let balance_time = crate::time::monotonic(token);
+ maybe_balance_queues(token, percpu, balance_time);
+
+ // Trigger a context switch after every 3 ticks.
if new_ticks >= 3 {
switch(token);
crate::context::signal::signal_handler(token);
@@ -167,22 +183,12 @@ pub fn switch(token: &mut CleanLockToken) -> SwitchResult {
let mut prev_context_guard = unsafe { prev_context_lock.write_arc() };
if !prev_context_guard.is_preemptable() {
- // Unset global lock
arch::CONTEXT_SWITCH_LOCK.store(false, Ordering::SeqCst);
-
- // Pretend to have finished switching, so CPU is not idled
return SwitchResult::Switched;
}
// Alarm (previously in update_runnable)
- let wakeups = wakeup_contexts(token, switch_time);
-
- if wakeups.len() > 0 {
- let mut run_contexts = run_contexts(token.token());
- for (prio, context_lock) in wakeups {
- run_contexts.set[prio].push_back(context_lock);
- }
- }
+ wakeup_contexts(token, percpu, switch_time);
let cpu_id = crate::cpu_id();
@@ -213,6 +219,7 @@ pub fn switch(token: &mut CleanLockToken) -> SwitchResult {
// Set the previous context as "not running"
prev_context.running = false;
+ prev_context.last_cpu = prev_context.cpu_id;
// Set the next context as "running"
next_context.running = true;
@@ -222,6 +229,14 @@ pub fn switch(token: &mut CleanLockToken) -> SwitchResult {
// Update times
if !was_idle {
prev_context.cpu_time += switch_time.saturating_sub(prev_context.switch_time);
+ if prev_context.sched_policy == SchedPolicy::Other {
+ let actual_ns = switch_time.saturating_sub(prev_context.switch_time);
+ let weight =
+ SCHED_PRIO_TO_WEIGHT[prev_context.sched_static_prio.min(39)] as u128;
+ let default_weight = SCHED_PRIO_TO_WEIGHT[20] as u128;
+ let delta = actual_ns.saturating_mul(default_weight) / weight.max(1);
+ prev_context.vruntime = prev_context.vruntime.saturating_add(delta);
+ }
}
next_context.switch_time = switch_time;
if next_context.userspace {
@@ -302,13 +317,234 @@ pub fn switch(token: &mut CleanLockToken) -> SwitchResult {
}
}
-fn wakeup_contexts(token: &mut CleanLockToken, switch_time: u128) -> Vec<(usize, WeakContextRef)> {
+fn queue_previous_context(
+ token: &mut CleanLockToken,
+ percpu: &PercpuBlock,
+ prev_context_lock: &Arc<ContextLock>,
+ prev_context_guard: &ArcRwLockWriteGuard<L4, Context>,
+ idle_context: &Arc<ContextLock>,
+) {
+ if Arc::ptr_eq(prev_context_lock, idle_context) {
+ return;
+ }
+
+ let prev_ctx = WeakContextRef(Arc::downgrade(prev_context_lock));
+ if prev_context_guard.status.is_runnable() {
+ let prio = prev_context_guard.prio;
+ let mut sched_lock = SchedQueuesLock::new(&percpu.sched);
+ unsafe {
+ sched_lock.queues_mut()[prio].push_back(prev_ctx);
+ }
+ } else {
+ idle_contexts(token.downgrade()).push_back(prev_ctx);
+ }
+}
+
+fn pop_movable_context(
+ token: &mut CleanLockToken,
+ queues: &mut [alloc::collections::VecDeque<WeakContextRef>; RUN_QUEUE_COUNT],
+ target_cpu: LogicalCpuId,
+ switch_time: u128,
+ idle_context: &Arc<ContextLock>,
+) -> Option<(usize, WeakContextRef)> {
+ for prio in 0..RUN_QUEUE_COUNT {
+ let len = queues[prio].len();
+ for _ in 0..len {
+ let Some(context_ref) = queues[prio].pop_front() else {
+ break;
+ };
+ let Some(context_lock) = context_ref.upgrade() else {
+ continue;
+ };
+ if Arc::ptr_eq(&context_lock, idle_context) {
+ queues[prio].push_back(context_ref);
+ continue;
+ }
+
+ let mut context_guard = unsafe { context_lock.write_arc() };
+ let sw = unsafe { update_stealable(&mut context_guard, switch_time) };
+ if let UpdateResult::CanSwitch = sw {
+ assign_context_to_cpu(&mut context_guard, target_cpu);
+ let moved_ref = WeakContextRef(Arc::downgrade(ArcContextLockWriteGuard::rwlock(
+ &context_guard,
+ )));
+ drop(context_guard);
+ return Some((prio, moved_ref));
+ }
+
+ if matches!(sw, UpdateResult::Blocked) {
+ idle_contexts(token.downgrade()).push_back(context_ref);
+ } else {
+ queues[prio].push_back(context_ref);
+ }
+ }
+ }
+
+ None
+}
+
+fn steal_work(
+ token: &mut CleanLockToken,
+ cpu_id: LogicalCpuId,
+ switch_time: u128,
+) -> Option<ArcContextLockWriteGuard> {
+ let cpu_count = crate::cpu_count();
+ if cpu_count <= 1 {
+ return None;
+ }
+
+ for offset in 1..cpu_count {
+ let victim_id = LogicalCpuId::new((cpu_id.get() + offset) % cpu_count);
+ let Some(victim) = get_percpu_block(victim_id) else {
+ continue;
+ };
+
+ let victim_idle = victim.switch_internals.idle_context();
+ let mut victim_lock = SchedQueuesLock::new(&victim.sched);
+ let victim_queues = unsafe { victim_lock.queues_mut() };
+
+ for prio in 0..RUN_QUEUE_COUNT {
+ let len = victim_queues[prio].len();
+ for _ in 0..len {
+ let Some(context_ref) = victim_queues[prio].pop_front() else {
+ break;
+ };
+ let Some(context_lock) = context_ref.upgrade() else {
+ continue;
+ };
+ if Arc::ptr_eq(&context_lock, &victim_idle) {
+ victim_queues[prio].push_back(context_ref);
+ continue;
+ }
+
+ let mut context_guard = unsafe { context_lock.write_arc() };
+ let sw = unsafe { update_stealable(&mut context_guard, switch_time) };
+ if let UpdateResult::CanSwitch = sw {
+ assign_context_to_cpu(&mut context_guard, cpu_id);
+ SCHED_STEAL_COUNT.fetch_add(1, Ordering::Relaxed);
+ return Some(context_guard);
+ }
+
+ if matches!(sw, UpdateResult::Blocked) {
+ idle_contexts(token.downgrade()).push_back(context_ref);
+ } else {
+ victim_queues[prio].push_back(context_ref);
+ }
+ }
+ }
+ }
+
+ None
+}
+
+fn queue_depth(percpu: &PercpuBlock) -> usize {
+ let mut sched_lock = SchedQueuesLock::new(&percpu.sched);
+ unsafe {
+ sched_lock
+ .queues_mut()
+ .iter()
+ .map(|queue| queue.len())
+ .sum()
+ }
+}
+
+fn migrate_one_context(
+ token: &mut CleanLockToken,
+ source_id: LogicalCpuId,
+ target_id: LogicalCpuId,
+ switch_time: u128,
+) -> bool {
+ let Some(source) = get_percpu_block(source_id) else {
+ return false;
+ };
+ let Some(target) = get_percpu_block(target_id) else {
+ return false;
+ };
+
+ let source_idle = source.switch_internals.idle_context();
+ let moved = {
+ let mut source_lock = SchedQueuesLock::new(&source.sched);
+ let source_queues = unsafe { source_lock.queues_mut() };
+ pop_movable_context(token, source_queues, target_id, switch_time, &source_idle)
+ };
+
+ let Some((prio, context_ref)) = moved else {
+ return false;
+ };
+
+ let mut target_lock = SchedQueuesLock::new(&target.sched);
+ unsafe {
+ target_lock.queues_mut()[prio].push_back(context_ref);
+ }
+ true
+}
+
+fn maybe_balance_queues(token: &mut CleanLockToken, percpu: &PercpuBlock, balance_time: u128) {
+ if crate::cpu_count() <= 1 || percpu.cpu_id != LogicalCpuId::BSP {
+ return;
+ }
+ if balance_time.saturating_sub(percpu.sched.last_balance_time.get()) < LOAD_BALANCE_INTERVAL_NS
+ {
+ return;
+ }
+
+ percpu.sched.last_balance_time.set(balance_time);
+
+ let mut depths = Vec::new();
+ let mut total_depth = 0usize;
+ for raw_id in 0..crate::cpu_count() {
+ let cpu_id = LogicalCpuId::new(raw_id);
+ let Some(cpu_percpu) = get_percpu_block(cpu_id) else {
+ continue;
+ };
+ let depth = queue_depth(cpu_percpu);
+ total_depth += depth;
+ depths.push((cpu_id, depth));
+ }
+
+ if depths.len() <= 1 || total_depth == 0 {
+ return;
+ }
+
+ let avg_depth = (total_depth + depths.len().saturating_sub(1)) / depths.len();
+
+ for target_index in 0..depths.len() {
+ if depths[target_index].1 != 0 {
+ continue;
+ }
+
+ let mut source_index = None;
+ let mut source_depth = 0usize;
+ for (idx, &(_, depth)) in depths.iter().enumerate() {
+ if idx == target_index {
+ continue;
+ }
+ if depth > avg_depth + 1 && depth > source_depth {
+ source_index = Some(idx);
+ source_depth = depth;
+ }
+ }
+
+ let Some(source_index) = source_index else {
+ continue;
+ };
+
+ let source_id = depths[source_index].0;
+ let target_id = depths[target_index].0;
+ if migrate_one_context(token, source_id, target_id, balance_time) {
+ depths[source_index].1 = depths[source_index].1.saturating_sub(1);
+ depths[target_index].1 += 1;
+ }
+ }
+}
+
+fn wakeup_contexts(token: &mut CleanLockToken, percpu: &PercpuBlock, switch_time: u128) {
// TODO: Optimise this somehow. Perhaps using a separate timer queue?
let mut wakeups = Vec::new();
let current_context = context::current();
let Some(idle_contexts) = idle_contexts_try(token.downgrade()) else {
// other cpus may spawning or killing contexts so let's skip wakeups to avoid contention
- return wakeups;
+ return;
};
let (mut idle_contexts, mut token) = idle_contexts.into_split();
let len = idle_contexts.len();
@@ -327,15 +563,14 @@ fn wakeup_contexts(token: &mut CleanLockToken, switch_time: u128) -> Vec<(usize,
idle_contexts.push_back(context_ref);
continue;
};
- if guard.status.is_soft_blocked() {
- if let Some(wake) = guard.wake {
- if switch_time >= wake {
- let prio = guard.prio;
- drop(guard);
- wakeups.push((prio, context_ref));
- continue;
- }
- }
+ if guard.status.is_soft_blocked()
+ && let Some(wake) = guard.wake
+ && switch_time >= wake
+ {
+ let prio = guard.prio;
+ drop(guard);
+ wakeups.push((prio, context_ref));
+ continue;
}
if guard.status.is_runnable() && !guard.running {
@@ -348,43 +583,127 @@ fn wakeup_contexts(token: &mut CleanLockToken, switch_time: u128) -> Vec<(usize,
drop(guard);
idle_contexts.push_back(context_ref);
}
- wakeups
+
+ if wakeups.is_empty() {
+ return;
+ }
+
+ let mut sched_lock = SchedQueuesLock::new(&percpu.sched);
+ let run_queues = unsafe { sched_lock.queues_mut() };
+ for (prio, context_ref) in wakeups {
+ if let Some(context_lock) = context_ref.upgrade() {
+ let mut context_guard = unsafe { context_lock.write_arc() };
+ assign_context_to_cpu(&mut context_guard, percpu.cpu_id);
+ }
+ run_queues[prio].push_back(context_ref);
+ }
}
-/// This is the scheduler function which currently utilises Deficit Weighted Round Robin Scheduler
-fn select_next_context(
+fn pick_next_from_queues(
token: &mut CleanLockToken,
- percpu: &PercpuBlock,
+ contexts_list: &mut [alloc::collections::VecDeque<WeakContextRef>; RUN_QUEUE_COUNT],
cpu_id: LogicalCpuId,
switch_time: u128,
- was_idle: bool,
- prev_context_guard: &mut ArcRwLockWriteGuard<L4, Context>,
-) -> Result<Option<ArcContextLockWriteGuard>, SwitchResult> {
- let contexts_data = run_contexts(token.token());
- let (mut contexts_data, mut token) = contexts_data.into_split();
- let contexts_list = &mut contexts_data.set;
- let idle_context = percpu.switch_internals.idle_context();
- let mut balance = percpu.balance.get();
- let mut i = percpu.last_queue.get() % 40;
-
- // Lock the previous context.
- let prev_context_lock = crate::context::current();
-
+ prev_context_lock: &Arc<ContextLock>,
+ idle_context: &Arc<ContextLock>,
+ balance: &mut [usize; RUN_QUEUE_COUNT],
+ i: &mut usize,
+) -> Option<ArcContextLockWriteGuard> {
let mut empty_queues = 0;
let mut total_iters = 0;
- let mut next_context_guard_opt = None;
-
let total_contexts: usize = contexts_list.iter().map(|q| q.len()).sum();
let mut skipped_contexts = 0;
+ for prio in 0..RUN_QUEUE_COUNT {
+ let rt_contexts = contexts_list
+ .get_mut(prio)
+ .expect("prio should be between [0, 39]");
+ let len = rt_contexts.len();
+ for _ in 0..len {
+ let (rt_ref, rt_lock) = match rt_contexts.pop_front() {
+ Some(lock) => match lock.upgrade() {
+ Some(l) => (lock, l),
+ None => {
+ skipped_contexts += 1;
+ continue;
+ }
+ },
+ None => break,
+ };
+ if Arc::ptr_eq(&rt_lock, idle_context) || Arc::ptr_eq(&rt_lock, prev_context_lock) {
+ rt_contexts.push_back(rt_ref);
+ continue;
+ }
+ let rt_guard = unsafe { rt_lock.write_arc() };
+ if !rt_guard.status.is_runnable()
+ || rt_guard.running
+ || !rt_guard.sched_affinity.contains(cpu_id)
+ {
+ rt_contexts.push_back(rt_ref);
+ continue;
+ }
+ if rt_guard.sched_policy == SchedPolicy::Fifo
+ || rt_guard.sched_policy == SchedPolicy::RoundRobin
+ {
+ return Some(rt_guard);
+ }
+ rt_contexts.push_back(rt_ref);
+ }
+ }
+
+ {
+ let mut min_vruntime = u128::MAX;
+ let mut best: Option<(usize, WeakContextRef)> = None;
+ for (prio, queue) in contexts_list.iter().enumerate() {
+ for ctx_ref in queue.iter() {
+ if let Some(ctx_lock) = ctx_ref.upgrade() {
+ if Arc::ptr_eq(&ctx_lock, prev_context_lock)
+ || Arc::ptr_eq(&ctx_lock, idle_context)
+ {
+ continue;
+ }
+ if let Some(guard) = ctx_lock.try_read(token.token()) {
+ if guard.status.is_runnable()
+ && !guard.running
+ && guard.sched_affinity.contains(cpu_id)
+ && guard.sched_policy == SchedPolicy::Other
+ {
+ let mut vruntime = guard.vruntime;
+ if guard.last_cpu == Some(cpu_id) {
+ vruntime = vruntime.saturating_sub(vruntime / 8);
+ }
+ drop(guard);
+ if vruntime < min_vruntime {
+ min_vruntime = vruntime;
+ best = Some((prio, ctx_ref.clone()));
+ }
+ }
+ }
+ }
+ }
+ }
+ if let Some((best_prio, ctx_ref)) = best {
+ contexts_list[best_prio].retain(|r| !WeakContextRef::eq(r, &ctx_ref));
+ if let Some(ctx_lock) = ctx_ref.upgrade() {
+ let guard = unsafe { ctx_lock.write_arc() };
+ if guard.status.is_runnable()
+ && !guard.running
+ && guard.sched_affinity.contains(cpu_id)
+ && guard.sched_policy == SchedPolicy::Other
+ {
+ return Some(guard);
+ }
+
+ drop(guard);
+ contexts_list[best_prio].push_back(ctx_ref);
+ }
+ }
+ }
+
'priority: loop {
- i = (i + 1) % 40;
+ *i = (*i + 1) % RUN_QUEUE_COUNT;
total_iters += 1;
- // The least prioritised queue takes <5000 iters to build up
- // balance = sched_prio_to_weight[20], if we have already spent
- // that many iters and not found any context, it is better to just
- // skip for now
if total_iters >= 5000 {
break 'priority;
}
@@ -394,24 +713,21 @@ fn select_next_context(
}
let contexts = contexts_list
- .get_mut(i)
+ .get_mut(*i)
.expect("i should be between [0, 39]!");
if contexts.is_empty() {
empty_queues += 1;
- if empty_queues >= 40 {
- // If all queues are empty, just break out
+ if empty_queues >= RUN_QUEUE_COUNT {
break 'priority;
}
continue;
- } else {
- empty_queues = 0;
}
- if balance[i] < SCHED_PRIO_TO_WEIGHT[20] {
- // This queue does not have enough balance to run,
- // increment the balance!
- balance[i] += SCHED_PRIO_TO_WEIGHT[i];
+ empty_queues = 0;
+
+ if balance[*i] < SCHED_PRIO_TO_WEIGHT[20] {
+ balance[*i] += SCHED_PRIO_TO_WEIGHT[*i];
continue;
}
@@ -422,67 +738,331 @@ fn select_next_context(
Some(new_lock) => (lock, new_lock),
None => {
skipped_contexts += 1;
- continue; // Ghost Process, just continue
+ continue;
}
},
- None => break, // Empty Queue
+ None => break,
};
- if Arc::ptr_eq(&next_context_lock, &prev_context_lock) {
+ if Arc::ptr_eq(&next_context_lock, prev_context_lock)
+ || Arc::ptr_eq(&next_context_lock, idle_context)
+ {
contexts.push_back(next_context_ref);
continue;
}
- if Arc::ptr_eq(&next_context_lock, &idle_context) {
+ let mut next_context_guard = unsafe { next_context_lock.write_arc() };
+
+ let sw = unsafe { update_runnable(&mut next_context_guard, cpu_id, switch_time) };
+ if let UpdateResult::CanSwitch = sw {
+ balance[*i] -= SCHED_PRIO_TO_WEIGHT[20];
+ return Some(next_context_guard);
+ }
+
+ if matches!(sw, UpdateResult::Blocked) {
+ idle_contexts(token.downgrade()).push_back(next_context_ref);
+ } else {
+ contexts.push_back(next_context_ref);
+ }
+ skipped_contexts += 1;
+
+ if skipped_contexts >= total_contexts {
+ break 'priority;
+ }
+ }
+ }
+
+ None
+}
+
+fn pick_next_from_global_queues(
+ token: &mut LockToken<L1>,
+ contexts_list: &mut [alloc::collections::VecDeque<WeakContextRef>; RUN_QUEUE_COUNT],
+ cpu_id: LogicalCpuId,
+ switch_time: u128,
+ prev_context_lock: &Arc<ContextLock>,
+ idle_context: &Arc<ContextLock>,
+ balance: &mut [usize; RUN_QUEUE_COUNT],
+ i: &mut usize,
+) -> Option<ArcContextLockWriteGuard> {
+ let mut empty_queues = 0;
+ let mut total_iters = 0;
+ let total_contexts: usize = contexts_list.iter().map(|q| q.len()).sum();
+ let mut skipped_contexts = 0;
+
+ for prio in 0..RUN_QUEUE_COUNT {
+ let rt_contexts = contexts_list
+ .get_mut(prio)
+ .expect("prio should be between [0, 39]");
+ let len = rt_contexts.len();
+ for _ in 0..len {
+ let (rt_ref, rt_lock) = match rt_contexts.pop_front() {
+ Some(lock) => match lock.upgrade() {
+ Some(l) => (lock, l),
+ None => {
+ skipped_contexts += 1;
+ continue;
+ }
+ },
+ None => break,
+ };
+ if Arc::ptr_eq(&rt_lock, idle_context) || Arc::ptr_eq(&rt_lock, prev_context_lock) {
+ rt_contexts.push_back(rt_ref);
+ continue;
+ }
+ let rt_guard = unsafe { rt_lock.write_arc() };
+ if !rt_guard.status.is_runnable()
+ || rt_guard.running
+ || !rt_guard.sched_affinity.contains(cpu_id)
+ {
+ rt_contexts.push_back(rt_ref);
+ continue;
+ }
+ if rt_guard.sched_policy == SchedPolicy::Fifo
+ || rt_guard.sched_policy == SchedPolicy::RoundRobin
+ {
+ return Some(rt_guard);
+ }
+ rt_contexts.push_back(rt_ref);
+ }
+ }
+
+ {
+ let mut min_vruntime = u128::MAX;
+ let mut best: Option<(usize, WeakContextRef)> = None;
+ for (prio, queue) in contexts_list.iter().enumerate() {
+ for ctx_ref in queue.iter() {
+ if let Some(ctx_lock) = ctx_ref.upgrade() {
+ if Arc::ptr_eq(&ctx_lock, prev_context_lock)
+ || Arc::ptr_eq(&ctx_lock, idle_context)
+ {
+ continue;
+ }
+ if let Some(guard) = ctx_lock.try_read(token.token()) {
+ if guard.status.is_runnable()
+ && !guard.running
+ && guard.sched_affinity.contains(cpu_id)
+ && guard.sched_policy == SchedPolicy::Other
+ {
+ let mut vruntime = guard.vruntime;
+ if guard.last_cpu == Some(cpu_id) {
+ vruntime = vruntime.saturating_sub(vruntime / 8);
+ }
+ drop(guard);
+ if vruntime < min_vruntime {
+ min_vruntime = vruntime;
+ best = Some((prio, ctx_ref.clone()));
+ }
+ }
+ }
+ }
+ }
+ }
+ if let Some((best_prio, ctx_ref)) = best {
+ contexts_list[best_prio].retain(|r| !WeakContextRef::eq(r, &ctx_ref));
+ if let Some(ctx_lock) = ctx_ref.upgrade() {
+ let guard = unsafe { ctx_lock.write_arc() };
+ if guard.status.is_runnable()
+ && !guard.running
+ && guard.sched_affinity.contains(cpu_id)
+ && guard.sched_policy == SchedPolicy::Other
+ {
+ return Some(guard);
+ }
+
+ drop(guard);
+ contexts_list[best_prio].push_back(ctx_ref);
+ }
+ }
+ }
+
+ 'priority: loop {
+ *i = (*i + 1) % RUN_QUEUE_COUNT;
+ total_iters += 1;
+
+ if total_iters >= 5000 {
+ break 'priority;
+ }
+
+ if skipped_contexts > total_contexts && total_contexts > 0 {
+ break 'priority;
+ }
+
+ let contexts = contexts_list
+ .get_mut(*i)
+ .expect("i should be between [0, 39]!");
+
+ if contexts.is_empty() {
+ empty_queues += 1;
+ if empty_queues >= RUN_QUEUE_COUNT {
+ break 'priority;
+ }
+ continue;
+ }
+
+ empty_queues = 0;
+
+ if balance[*i] < SCHED_PRIO_TO_WEIGHT[20] {
+ balance[*i] += SCHED_PRIO_TO_WEIGHT[*i];
+ continue;
+ }
+
+ let len = contexts.len();
+ for _ in 0..len {
+ let (next_context_ref, next_context_lock) = match contexts.pop_front() {
+ Some(lock) => match lock.upgrade() {
+ Some(new_lock) => (lock, new_lock),
+ None => {
+ skipped_contexts += 1;
+ continue;
+ }
+ },
+ None => break,
+ };
+
+ if Arc::ptr_eq(&next_context_lock, prev_context_lock)
+ || Arc::ptr_eq(&next_context_lock, idle_context)
+ {
contexts.push_back(next_context_ref);
continue;
}
let mut next_context_guard = unsafe { next_context_lock.write_arc() };
- // Is this context runnable on this CPU?
let sw = unsafe { update_runnable(&mut next_context_guard, cpu_id, switch_time) };
if let UpdateResult::CanSwitch = sw {
- next_context_guard_opt = Some(next_context_guard);
- balance[i] -= SCHED_PRIO_TO_WEIGHT[20];
- break 'priority;
+ balance[*i] -= SCHED_PRIO_TO_WEIGHT[20];
+ return Some(next_context_guard);
+ }
+
+ if matches!(sw, UpdateResult::Blocked) {
+ idle_contexts(token.token()).push_back(next_context_ref);
} else {
- if matches!(sw, UpdateResult::Blocked) {
- idle_contexts(token.token()).push_back(next_context_ref);
- } else {
- contexts.push_back(next_context_ref);
- };
- skipped_contexts += 1;
+ contexts.push_back(next_context_ref);
+ }
+ skipped_contexts += 1;
- if skipped_contexts >= total_contexts {
- break 'priority;
- }
+ if skipped_contexts >= total_contexts {
+ break 'priority;
}
}
}
- percpu.balance.set(balance);
- percpu.last_queue.set(i);
-
- if !Arc::ptr_eq(&prev_context_lock, &idle_context) {
- // Send the old process to the back of the line (if it is still runnable)
- let prev_ctx = WeakContextRef(Arc::downgrade(&prev_context_lock));
- if prev_context_guard.status.is_runnable() {
- let prio = prev_context_guard.prio;
- contexts_list[prio].push_back(prev_ctx);
- } else {
- idle_contexts(token.token()).push_back(prev_ctx);
- }
+
+ None
+}
+
+unsafe fn update_stealable(context: &mut Context, switch_time: u128) -> UpdateResult {
+ if context.running {
+ return UpdateResult::Skip;
}
+ if context.status.is_soft_blocked()
+ && let Some(wake) = context.wake
+ && switch_time >= wake
+ {
+ context.wake = None;
+ context.unblock_no_ipi();
+ }
+ if context.status.is_runnable() {
+ UpdateResult::CanSwitch
+ } else {
+ UpdateResult::Blocked
+ }
+}
- if let Some(next_context_guard) = next_context_guard_opt {
- // We found a new process!
+/// This is the scheduler function which currently utilises Deficit Weighted Round Robin Scheduler
+fn select_next_context(
+ token: &mut CleanLockToken,
+ percpu: &PercpuBlock,
+ cpu_id: LogicalCpuId,
+ switch_time: u128,
+ was_idle: bool,
+ prev_context_guard: &mut ArcRwLockWriteGuard<L4, Context>,
+) -> Result<Option<ArcContextLockWriteGuard>, SwitchResult> {
+ let idle_context = percpu.switch_internals.idle_context();
+ let prev_context_lock = crate::context::current();
+
+ let local_next = {
+ let mut sched_lock = SchedQueuesLock::new(&percpu.sched);
+ let mut balance = percpu.sched.balance.get();
+ let mut last_queue = percpu.sched.last_queue.get() % RUN_QUEUE_COUNT;
+ let next = pick_next_from_queues(
+ token,
+ unsafe { sched_lock.queues_mut() },
+ cpu_id,
+ switch_time,
+ &prev_context_lock,
+ &idle_context,
+ &mut balance,
+ &mut last_queue,
+ );
+ percpu.sched.balance.set(balance);
+ percpu.sched.last_queue.set(last_queue);
+ next
+ };
+
+ if let Some(next_context_guard) = local_next {
+ queue_previous_context(
+ token,
+ percpu,
+ &prev_context_lock,
+ prev_context_guard,
+ &idle_context,
+ );
+ return Ok(Some(next_context_guard));
+ }
+
+ if let Some(next_context_guard) = steal_work(token, cpu_id, switch_time) {
+ queue_previous_context(
+ token,
+ percpu,
+ &prev_context_lock,
+ prev_context_guard,
+ &idle_context,
+ );
+ return Ok(Some(next_context_guard));
+ }
+
+ let global_next = {
+ let contexts_data = run_contexts(token.token());
+ let (mut contexts_data, mut contexts_token) = contexts_data.into_split();
+ let mut balance = percpu.sched.balance.get();
+ let mut last_queue = percpu.sched.last_queue.get() % RUN_QUEUE_COUNT;
+ let next = pick_next_from_global_queues(
+ &mut contexts_token,
+ &mut contexts_data.set,
+ cpu_id,
+ switch_time,
+ &prev_context_lock,
+ &idle_context,
+ &mut balance,
+ &mut last_queue,
+ );
+ percpu.sched.balance.set(balance);
+ percpu.sched.last_queue.set(last_queue);
+ next
+ };
+
+ if let Some(next_context_guard) = global_next {
+ queue_previous_context(
+ token,
+ percpu,
+ &prev_context_lock,
+ prev_context_guard,
+ &idle_context,
+ );
return Ok(Some(next_context_guard));
+ }
+
+ queue_previous_context(
+ token,
+ percpu,
+ &prev_context_lock,
+ prev_context_guard,
+ &idle_context,
+ );
+
+ if !was_idle && !Arc::ptr_eq(&prev_context_lock, &idle_context) {
+ Ok(Some(unsafe { idle_context.write_arc() }))
} else {
- if !was_idle && !Arc::ptr_eq(&prev_context_lock, &idle_context) {
- // We switch into the idle context
- Ok(Some(unsafe { idle_context.write_arc() }))
- } else {
- // We found no other process to run.
- Ok(None)
- }
+ Ok(None)
}
}
+190
View File
@@ -0,0 +1,190 @@
diff --git a/src/percpu.rs b/src/percpu.rs
--- a/src/percpu.rs
+++ b/src/percpu.rs
@@ -100,6 +100,14 @@ static ALL_PERCPU_BLOCKS: [AtomicPtr<PercpuBlock>; MAX_CPU_COUNT as usize] =
pub unsafe fn init_tlb_shootdown(id: LogicalCpuId, block: *mut PercpuBlock) {
ALL_PERCPU_BLOCKS[id.get() as usize].store(block, Ordering::Release)
}
+
+pub fn get_percpu_block(id: LogicalCpuId) -> Option<&'static PercpuBlock> {
+ unsafe {
+ ALL_PERCPU_BLOCKS[id.get() as usize]
+ .load(Ordering::Acquire)
+ .as_ref()
+ }
+}
pub fn get_all_stats() -> Vec<(LogicalCpuId, CpuStatsData)> {
diff --git a/src/context/switch.rs b/src/context/switch.rs
--- a/src/context/switch.rs
+++ b/src/context/switch.rs
@@ -7,15 +7,15 @@ use crate::{
self, arch, idle_contexts, idle_contexts_try, run_contexts, ArcContextLockWriteGuard,
Context, ContextLock, SchedPolicy, WeakContextRef, RUN_QUEUE_COUNT,
},
- cpu_set::LogicalCpuId,
+ cpu_set::{LogicalCpuId, LogicalCpuSet},
cpu_stats::{self, CpuState},
- percpu::{PerCpuSched, PercpuBlock},
+ percpu::{get_percpu_block, PerCpuSched, PercpuBlock},
sync::{ArcRwLockWriteGuard, CleanLockToken, LockToken, L1, L4},
};
use alloc::{sync::Arc, vec::Vec};
use core::{
cell::{Cell, RefCell},
hint, mem,
- sync::atomic::Ordering,
+ sync::atomic::{AtomicUsize, Ordering},
};
use syscall::PtraceFlags;
@@
+static SCHED_STEAL_COUNT: AtomicUsize = AtomicUsize::new(0);
+
+fn assign_context_to_cpu(context: &mut Context, cpu_id: LogicalCpuId) {
+ context.sched_affinity = LogicalCpuSet::empty();
+ context.sched_affinity.atomic_set(cpu_id);
+}
@@
+fn pop_movable_context(
+ token: &mut CleanLockToken,
+ queues: &mut [alloc::collections::VecDeque<WeakContextRef>; RUN_QUEUE_COUNT],
+ target_cpu: LogicalCpuId,
+ switch_time: u128,
+ idle_context: &Arc<ContextLock>,
+) -> Option<(usize, WeakContextRef)> {
+ for prio in 0..RUN_QUEUE_COUNT {
+ let len = queues[prio].len();
+ for _ in 0..len {
+ let Some(context_ref) = queues[prio].pop_front() else {
+ break;
+ };
+ let Some(context_lock) = context_ref.upgrade() else {
+ continue;
+ };
+ if Arc::ptr_eq(&context_lock, idle_context) {
+ queues[prio].push_back(context_ref);
+ continue;
+ }
+
+ let mut context_guard = unsafe { context_lock.write_arc() };
+ let sw = unsafe { update_stealable(&mut context_guard, switch_time) };
+ if let UpdateResult::CanSwitch = sw {
+ assign_context_to_cpu(&mut context_guard, target_cpu);
+ let moved_ref = WeakContextRef(Arc::downgrade(ArcContextLockWriteGuard::rwlock(
+ &context_guard,
+ )));
+ drop(context_guard);
+ return Some((prio, moved_ref));
+ }
+
+ if matches!(sw, UpdateResult::Blocked) {
+ idle_contexts(token.downgrade()).push_back(context_ref);
+ } else {
+ queues[prio].push_back(context_ref);
+ }
+ }
+ }
+
+ None
+}
+
+fn steal_work(
+ token: &mut CleanLockToken,
+ cpu_id: LogicalCpuId,
+ switch_time: u128,
+) -> Option<ArcContextLockWriteGuard> {
+ let cpu_count = crate::cpu_count();
+ if cpu_count <= 1 {
+ return None;
+ }
+
+ for offset in 1..cpu_count {
+ let victim_id = LogicalCpuId::new((cpu_id.get() + offset) % cpu_count);
+ let Some(victim) = get_percpu_block(victim_id) else {
+ continue;
+ };
+
+ let victim_idle = victim.switch_internals.idle_context();
+ let mut victim_lock = SchedQueuesLock::new(&victim.sched);
+ let victim_queues = unsafe { victim_lock.queues_mut() };
+
+ for prio in 0..RUN_QUEUE_COUNT {
+ let len = victim_queues[prio].len();
+ for _ in 0..len {
+ let Some(context_ref) = victim_queues[prio].pop_front() else {
+ break;
+ };
+ let Some(context_lock) = context_ref.upgrade() else {
+ continue;
+ };
+ if Arc::ptr_eq(&context_lock, &victim_idle) {
+ victim_queues[prio].push_back(context_ref);
+ continue;
+ }
+
+ let mut context_guard = unsafe { context_lock.write_arc() };
+ let sw = unsafe { update_stealable(&mut context_guard, switch_time) };
+ if let UpdateResult::CanSwitch = sw {
+ assign_context_to_cpu(&mut context_guard, cpu_id);
+ SCHED_STEAL_COUNT.fetch_add(1, Ordering::Relaxed);
+ return Some(context_guard);
+ }
+
+ if matches!(sw, UpdateResult::Blocked) {
+ idle_contexts(token.downgrade()).push_back(context_ref);
+ } else {
+ victim_queues[prio].push_back(context_ref);
+ }
+ }
+ }
+ }
+
+ None
+}
+
+unsafe fn update_stealable(context: &mut Context, switch_time: u128) -> UpdateResult {
+ if context.running {
+ return UpdateResult::Skip;
+ }
+ if context.status.is_soft_blocked()
+ && let Some(wake) = context.wake
+ && switch_time >= wake
+ {
+ context.wake = None;
+ context.unblock_no_ipi();
+ }
+ if context.status.is_runnable() {
+ UpdateResult::CanSwitch
+ } else {
+ UpdateResult::Blocked
+ }
+}
@@ -360,6 +469,10 @@ fn wakeup_contexts(token: &mut CleanLockToken, percpu: &PercpuBlock, switch_time
let mut sched_lock = SchedQueuesLock::new(&percpu.sched);
let run_queues = unsafe { sched_lock.queues_mut() };
for (prio, context_ref) in wakeups {
+ if let Some(context_lock) = context_ref.upgrade() {
+ let mut context_guard = unsafe { context_lock.write_arc() };
+ assign_context_to_cpu(&mut context_guard, percpu.cpu_id);
+ }
run_queues[prio].push_back(context_ref);
}
}
@@ -559,6 +672,16 @@ fn select_next_context(
);
return Ok(Some(next_context_guard));
}
+
+ if let Some(next_context_guard) = steal_work(token, cpu_id, switch_time) {
+ queue_previous_context(
+ token,
+ percpu,
+ &prev_context_lock,
+ prev_context_guard,
+ &idle_context,
+ );
+ return Ok(Some(next_context_guard));
+ }
let global_next = {
let contexts_data = run_contexts(token.token());
@@ -0,0 +1,21 @@
diff --git a/src/syscall/futex.rs b/src/syscall/futex.rs
--- a/src/syscall/futex.rs
+++ b/src/syscall/futex.rs
@@
- let futex_atomic = futex_atomic_u32(locked_physaddr);
- let mut current = futex_atomic.load(Ordering::SeqCst);
+ let futex_atomic = futex_atomic_u32(locked_physaddr);
+ let mut current = futex_atomic.load(Ordering::SeqCst);
+ let queue = futexes
+ .entry(locked_physaddr)
+ .or_insert_with(FutexQueue::default);
loop {
let owner_tid = current & FUTEX_TID_MASK;
- let queue = futexes
- .entry(locked_physaddr)
- .or_insert_with(FutexQueue::default);
let desired_waiters = if queue.waiters.is_empty() {
0
} else {
FUTEX_WAITERS
@@ -0,0 +1,68 @@
diff --git a/src/numa.rs b/src/numa.rs
new file mode 100644
index 0000000..40c5a06
--- /dev/null
+++ b/src/numa.rs
@@ -0,0 +1,62 @@
+/// NUMA topology hints for the kernel scheduler.
+/// NUMA discovery (SRAT/SLIT parsing) is performed by a userspace daemon
+/// (numad) via /scheme/acpi/, then pushed to the kernel via scheme:numa.
+/// The kernel stores a lightweight copy for O(1) scheduling lookups.
+use crate::cpu_set::{LogicalCpuId, LogicalCpuSet};
+use core::sync::atomic::{AtomicBool, Ordering};
+
+const MAX_NUMA_NODES: usize = 8;
+
+#[derive(Clone, Debug)]
+pub struct NumaHint {
+ pub node_id: u8,
+ pub cpus: LogicalCpuSet,
+}
+
+pub struct NumaTopology {
+ pub nodes: [Option<NumaHint>; MAX_NUMA_NODES],
+ pub initialized: AtomicBool,
+}
+
+impl NumaTopology {
+ pub const fn new() -> Self {
+ const NONE: Option<NumaHint> = None;
+ Self {
+ nodes: [NONE; MAX_NUMA_NODES],
+ initialized: AtomicBool::new(false),
+ }
+ }
+
+ pub fn node_for_cpu(&self, cpu: LogicalCpuId) -> Option<u8> {
+ for node in self.nodes.iter().flatten() {
+ if node.cpus.contains(cpu) {
+ return Some(node.node_id);
+ }
+ }
+ None
+ }
+
+ pub fn same_node(&self, cpu1: LogicalCpuId, cpu2: LogicalCpuId) -> bool {
+ self.node_for_cpu(cpu1) == self.node_for_cpu(cpu2)
+ }
+}
+
+static mut NUMA_TOPOLOGY: NumaTopology = NumaTopology::new();
+
+pub fn topology() -> &'static NumaTopology {
+ unsafe { &NUMA_TOPOLOGY }
+}
+
+pub fn init_default() {
+ let topo = topology();
+ if topo.initialized.swap(true, Ordering::AcqRel) {
+ return;
+ }
+ unsafe {
+ let topo_mut = &mut *core::ptr::addr_of_mut!(NUMA_TOPOLOGY);
+ topo_mut.nodes[0] = Some(NumaHint {
+ node_id: 0,
+ cpus: LogicalCpuSet::all(),
+ });
+ }
+}
@@ -0,0 +1,41 @@
diff --git a/src/scheme/proc.rs b/src/scheme/proc.rs
--- a/src/scheme/proc.rs
+++ b/src/scheme/proc.rs
@@ -450,6 +450,7 @@ impl KernelScheme for ProcScheme {
}
fn close(&self, id: usize, token: &mut CleanLockToken) -> Result<()> {
+ let mut inner_token = unsafe { CleanLockToken::new() };
let handle = HANDLES
.write(token.token())
.remove(&id)
@@ -478,9 +479,7 @@ impl KernelScheme for ProcScheme {
))]
regs.set_arg1(arg1);
- // TODO: Lock ordering violation
- let mut token = unsafe { CleanLockToken::new() };
- Ok(context.set_addr_space(Some(new), token.downgrade()))
+ Ok(context.set_addr_space(Some(new), inner_token.downgrade()))
})?;
if let Some(old_ctx) = old_ctx
&& let Some(addrspace) = Arc::into_inner(old_ctx)
@@ -518,6 +517,7 @@ impl KernelScheme for ProcScheme {
consume: bool,
token: &mut CleanLockToken,
) -> Result<usize> {
+ let mut inner_token = unsafe { CleanLockToken::new() };
let handle = HANDLES
.read(token.token())
.get(&id)
@@ -609,9 +609,7 @@ impl KernelScheme for ProcScheme {
};
// TODO: Allocated or AllocatedShared?
let addrsp = AddrSpace::current()?;
- // TODO: Lock ordering violation
- let mut token = unsafe { CleanLockToken::new() };
- let page = addrsp.acquire_write(token.downgrade()).mmap_anywhere(
+ let page = addrsp.acquire_write(inner_token.downgrade()).mmap_anywhere(
&addrsp,
NonZeroUsize::new(1).unwrap(),
MapFlags::PROT_READ | MapFlags::PROT_WRITE,