Files
RedBear-OS/local/patches/kernel/P8-futex-pi.patch
T
vasilito 34360e1e4f feat: P0-P6 kernel scheduler + relibc threading comprehensive implementation
P0-P2: Barrier SMP, sigmask/pthread_kill races, robust mutexes, RT scheduling, POSIX sched API
P3: PerCpuSched struct, per-CPU wiring, work stealing, load balancing, initial placement
P4: 64-shard futex table, REQUEUE, PI futexes (LOCK_PI/UNLOCK_PI/TRYLOCK_PI), robust futexes, vruntime tracking, min-vruntime SCHED_OTHER selection
P5: setpriority/getpriority, pthread_setaffinity_np, pthread_setname_np, pthread_setschedparam (Redox)
P6: Cache-affine scheduling (last_cpu + vruntime bonus), NUMA topology kernel hints + numad userspace daemon

Stability fixes: make_consistent stores 0 (dead TID fix), cond.rs error propagation, SPIN_COUNT adaptive spinning, Sys::open &str fix, PI futex CAS race, proc.rs lock ordering, barrier destroy

Patches: 33 kernel + 58 relibc patches, all tracked in recipes
Docs: KERNEL-SCHEDULER-MULTITHREAD-IMPROVEMENT-PLAN.md updated, SCHEDULER-REVIEW-FINAL.md created
Architecture: NUMA topology parsing stays userspace (numad daemon), kernel stores lightweight NumaTopology hints
2026-04-30 18:21:48 +01:00

365 lines
16 KiB
Diff

diff --git a/src/syscall/futex.rs b/src/syscall/futex.rs
--- a/src/syscall/futex.rs
+++ b/src/syscall/futex.rs
@@
-use crate::syscall::{
- data::TimeSpec,
- error::{Error, Result, EAGAIN, EFAULT, EINVAL, ETIMEDOUT},
- flag::{FUTEX_REQUEUE, FUTEX_WAIT, FUTEX_WAIT64, FUTEX_WAKE},
-};
+use crate::syscall::{
+ data::TimeSpec,
+ error::{Error, Result, EAGAIN, EDEADLK, EFAULT, EINVAL, EPERM, ETIMEDOUT},
+ flag::{FUTEX_REQUEUE, FUTEX_WAIT, FUTEX_WAIT64, FUTEX_WAKE},
+};
+
+const FUTEX_LOCK_PI: usize = 6;
+const FUTEX_UNLOCK_PI: usize = 7;
+const FUTEX_TRYLOCK_PI: usize = 8;
+
+const FUTEX_WAITERS: u32 = 0x8000_0000;
+const FUTEX_OWNER_DIED: u32 = 0x4000_0000;
+const FUTEX_TID_MASK: u32 = 0x3FFF_FFFF;
@@
-type FutexList = HashMap<PhysicalAddress, Vec<FutexEntry>>;
+type FutexList = HashMap<PhysicalAddress, FutexQueue>;
+
+#[derive(Clone, Copy, Debug, Eq, PartialEq)]
+enum FutexWaitKind {
+ Regular,
+ PriorityInheritance,
+}
+
+#[derive(Default)]
+struct FutexQueue {
+ waiters: Vec<FutexEntry>,
+ pi_owner: Option<Weak<ContextLock>>,
+}
+
+impl FutexQueue {
+ fn is_empty(&self) -> bool {
+ self.waiters.is_empty() && self.pi_owner.is_none()
+ }
+}
@@
pub struct FutexEntry {
@@
// address space to check against if virt matches but not phys
addr_space: Weak<AddrSpaceWrapper>,
+ kind: FutexWaitKind,
}
@@
+fn context_futex_tid(context: &crate::context::Context) -> u32 {
+ let tid = u32::try_from(context.pid).unwrap_or(context.debug_id) & FUTEX_TID_MASK;
+ if tid == 0 {
+ context.debug_id & FUTEX_TID_MASK
+ } else {
+ tid
+ }
+}
+
+fn current_context_futex_tid(context_lock: &Arc<ContextLock>, token: &mut CleanLockToken) -> u32 {
+ let context = context_lock.read(token.token());
+ context_futex_tid(&context)
+}
+
+fn push_owner_waiter(owner: &mut crate::context::Context, phys: PhysicalAddress) {
+ if !owner.futex_pi_waiters.iter().any(|waiter| *waiter == phys) {
+ owner.futex_pi_waiters.push(phys);
+ }
+}
+
+fn pop_owner_waiter(owner: &mut crate::context::Context, phys: PhysicalAddress) {
+ owner.futex_pi_waiters.retain(|waiter| *waiter != phys);
+}
+
+fn boost_pi_owner(
+ owner_lock: &Arc<ContextLock>,
+ waiter_prio: usize,
+ phys: PhysicalAddress,
+ token: &mut crate::sync::LockToken<'_, L1>,
+) {
+ let mut owner = owner_lock.write(token.token());
+ push_owner_waiter(&mut owner, phys);
+ if owner.prio > waiter_prio {
+ if !owner.futex_pi_boost {
+ owner.futex_pi_original_prio = owner.prio;
+ }
+ owner.futex_pi_boost = true;
+ owner.prio = owner.prio.min(waiter_prio);
+ }
+}
+
+fn restore_pi_owner(owner: &mut crate::context::Context, phys: PhysicalAddress) {
+ pop_owner_waiter(owner, phys);
+ if owner.futex_pi_boost && owner.futex_pi_waiters.is_empty() {
+ owner.futex_pi_boost = false;
+ owner.prio = owner.futex_pi_original_prio;
+ }
+}
+
+fn queue_waiter(
+ queue: &mut FutexQueue,
+ target_virtaddr: VirtualAddress,
+ context_lock: &Arc<ContextLock>,
+ addr_space: &Arc<AddrSpaceWrapper>,
+ kind: FutexWaitKind,
+) {
+ queue.waiters.push(FutexEntry {
+ target_virtaddr,
+ context_lock: Arc::clone(context_lock),
+ addr_space: Arc::downgrade(addr_space),
+ kind,
+ });
+}
@@
- futexes
- .entry(locked_physaddr)
- .or_insert_with(Vec::new)
- .push(FutexEntry {
- target_virtaddr,
- context_lock: context_lock.clone(),
- addr_space: Arc::downgrade(&current_addrsp),
- });
+ let queue = futexes.entry(locked_physaddr).or_insert_with(FutexQueue::default);
+ queue_waiter(
+ queue,
+ target_virtaddr,
+ &context_lock,
+ &current_addrsp,
+ FutexWaitKind::Regular,
+ );
@@
- let remove_queue = if let Some(futexes) = futexes_map.get_mut(&target_physaddr) {
- let mut i = 0;
- let current_addrsp_weak = Arc::downgrade(&current_addrsp);
- while i < futexes.len() && woken < val {
- let futex = unsafe { futexes.get_unchecked_mut(i) };
- if futex.target_virtaddr != target_virtaddr
- || !current_addrsp_weak.ptr_eq(&futex.addr_space)
- {
- i += 1;
- continue;
- }
- futex.context_lock.write(futex_token.token()).unblock();
- futexes.swap_remove(i);
- woken += 1;
- }
- futexes.is_empty()
+ let remove_queue = if let Some(queue) = futexes_map.get_mut(&target_physaddr) {
+ let mut i = 0;
+ let current_addrsp_weak = Arc::downgrade(&current_addrsp);
+ while i < queue.waiters.len() && woken < val {
+ let waiter = match queue.waiters.get(i) {
+ Some(waiter) => waiter,
+ None => break,
+ };
+ if waiter.kind != FutexWaitKind::Regular
+ || waiter.target_virtaddr != target_virtaddr
+ || !current_addrsp_weak.ptr_eq(&waiter.addr_space)
+ {
+ i += 1;
+ continue;
+ }
+ let waiter = queue.waiters.swap_remove(i);
+ waiter.context_lock.write(futex_token.token()).unblock();
+ woken += 1;
+ }
+ queue.is_empty()
} else {
false
};
@@
- let mut source_waiters = source_map.remove(&locked_source_physaddr).unwrap_or_default();
+ let mut source_queue = source_map.remove(&locked_source_physaddr).unwrap_or_default();
@@
- total_woken = wake_from(&mut source_waiters, val, &mut futex_token);
+ total_woken = wake_from(&mut source_queue.waiters, val, &mut futex_token);
@@
- let mut target_waiters = target_map.remove(&locked_target_physaddr).unwrap_or_default();
- let mut i = 0;
- while i < source_waiters.len() && total_requeued < val2 {
- let should_move = source_waiters
+ let mut target_queue = target_map.remove(&locked_target_physaddr).unwrap_or_default();
+ let mut i = 0;
+ while i < source_queue.waiters.len() && total_requeued < val2 {
+ let should_move = source_queue
+ .waiters
.get(i)
.map(|waiter| {
- waiter.target_virtaddr == target_virtaddr
+ waiter.kind == FutexWaitKind::Regular
+ && waiter.target_virtaddr == target_virtaddr
&& current_addrsp_weak.ptr_eq(&waiter.addr_space)
})
.unwrap_or(false);
@@
- let mut waiter = source_waiters.swap_remove(i);
- waiter.target_virtaddr = target2_virtaddr;
- target_waiters.push(waiter);
+ let mut waiter = source_queue.waiters.swap_remove(i);
+ waiter.target_virtaddr = target2_virtaddr;
+ target_queue.waiters.push(waiter);
total_requeued += 1;
}
- if !target_waiters.is_empty() {
- target_map.insert(locked_target_physaddr, target_waiters);
+ if !target_queue.is_empty() {
+ target_map.insert(locked_target_physaddr, target_queue);
}
@@
- if !source_waiters.is_empty() {
- source_map.insert(locked_source_physaddr, source_waiters);
+ if !source_queue.is_empty() {
+ source_map.insert(locked_source_physaddr, source_queue);
}
@@
+ FUTEX_LOCK_PI | FUTEX_TRYLOCK_PI => {
+ let _ = validate_futex_u32_addr(addr)?;
+ let context_lock = context::current();
+ let current_tid = current_context_futex_tid(&context_lock, token);
+ let current_prio = context_lock.read(token.token()).prio;
+
+ loop {
+ let outcome = {
+ let shard = futex_shard(target_physaddr);
+ let mut futexes = FUTEXES[shard].lock(token.token());
+ let (futexes, mut futex_token) = futexes.token_split();
+ let addr_space_guard = current_addrsp.acquire_read(futex_token.downgrade());
+ let locked_physaddr = validate_and_translate_virt(&addr_space_guard, target_virtaddr)
+ .ok_or(Error::new(EFAULT))?;
+ if locked_physaddr != target_physaddr {
+ None
+ } else {
+ drop(addr_space_guard);
+ let futex_atomic = futex_atomic_u32(locked_physaddr);
+ let mut current = futex_atomic.load(Ordering::SeqCst);
+ loop {
+ let owner_tid = current & FUTEX_TID_MASK;
+ let queue = futexes.entry(locked_physaddr).or_insert_with(FutexQueue::default);
+ let desired_waiters = if queue.waiters.is_empty() { 0 } else { FUTEX_WAITERS };
+
+ if owner_tid == 0 {
+ let desired = current_tid | desired_waiters;
+ match futex_atomic.compare_exchange(current, desired, Ordering::SeqCst, Ordering::SeqCst) {
+ Ok(_) => {
+ queue.pi_owner = Some(Arc::downgrade(&context_lock));
+ break Some(Ok(Ok(0)));
+ }
+ Err(actual) => current = actual,
+ }
+ continue;
+ }
+
+ if owner_tid == current_tid {
+ break Some(Ok(Err(Error::new(EDEADLK))));
+ }
+
+ if op == FUTEX_TRYLOCK_PI {
+ break Some(Ok(Err(Error::new(EAGAIN))));
+ }
+
+ if let Some(owner_lock) = queue.pi_owner.as_ref().and_then(Weak::upgrade) {
+ boost_pi_owner(&owner_lock, current_prio, locked_physaddr, &mut futex_token);
+ }
+
+ {
+ let mut context = context_lock.write(futex_token.token());
+ if let Some((tctl, pctl, _)) = context.sigcontrol()
+ && tctl.currently_pending_unblocked(pctl) != 0
+ {
+ break Some(Ok(Err(Error::new(EINTR))));
+ }
+ context.wake = None;
+ context.block("futex_pi");
+ }
+
+ queue_waiter(
+ queue,
+ target_virtaddr,
+ &context_lock,
+ &current_addrsp,
+ FutexWaitKind::PriorityInheritance,
+ );
+ futex_atomic.fetch_or(FUTEX_WAITERS, Ordering::SeqCst);
+ break Some(Ok(Ok(1)));
+ }
+ }
+ };
+
+ match outcome {
+ None => continue,
+ Some(Ok(Ok(0))) => return Ok(0),
+ Some(Ok(Ok(_))) => context::switch(token),
+ Some(Ok(Err(err))) => return Err(err),
+ Some(Err(err)) => return Err(err),
+ }
+ }
+ }
+ FUTEX_UNLOCK_PI => {
+ let _ = validate_futex_u32_addr(addr)?;
+ let context_lock = context::current();
+ let current_tid = current_context_futex_tid(&context_lock, token);
+ let shard = futex_shard(target_physaddr);
+ let current_addrsp_weak = Arc::downgrade(&current_addrsp);
+
+ let unlocked = {
+ let mut futexes = FUTEXES[shard].lock(token.token());
+ let (futexes, mut futex_token) = futexes.token_split();
+ let addr_space_guard = current_addrsp.acquire_read(futex_token.downgrade());
+ let locked_physaddr = validate_and_translate_virt(&addr_space_guard, target_virtaddr)
+ .ok_or(Error::new(EFAULT))?;
+ if locked_physaddr != target_physaddr {
+ return Err(Error::new(EAGAIN));
+ }
+ drop(addr_space_guard);
+
+ let futex_atomic = futex_atomic_u32(locked_physaddr);
+ let current = futex_atomic.load(Ordering::SeqCst);
+ if (current & FUTEX_TID_MASK) != current_tid {
+ return Err(Error::new(EPERM));
+ }
+
+ let mut wake_one = None;
+ let mut new = current & !(FUTEX_TID_MASK | FUTEX_OWNER_DIED);
+ if let Some(queue) = futexes.get_mut(&locked_physaddr) {
+ queue.pi_owner = None;
+ let mut best = None;
+ for (idx, waiter) in queue.waiters.iter().enumerate() {
+ if waiter.kind != FutexWaitKind::PriorityInheritance
+ || waiter.target_virtaddr != target_virtaddr
+ || !current_addrsp_weak.ptr_eq(&waiter.addr_space)
+ {
+ continue;
+ }
+ let prio = waiter.context_lock.read(futex_token.token()).prio;
+ match best {
+ Some((_, best_prio)) if prio >= best_prio => {}
+ _ => best = Some((idx, prio)),
+ }
+ }
+ if let Some((waiter_idx, _)) = best {
+ wake_one = Some(queue.waiters.swap_remove(waiter_idx));
+ }
+ if !queue.waiters.is_empty() {
+ new |= FUTEX_WAITERS;
+ }
+ }
+
+ futex_atomic.store(new, Ordering::SeqCst);
+ {
+ let mut context = context_lock.write(futex_token.token());
+ restore_pi_owner(&mut context, locked_physaddr);
+ }
+ if let Some(waiter) = wake_one {
+ waiter.context_lock.write(futex_token.token()).unblock();
+ }
+ true
+ };
+
+ Ok(usize::from(unlocked))
+ }
_ => Err(Error::new(EINVAL)),
}
}