34360e1e4f
P0-P2: Barrier SMP, sigmask/pthread_kill races, robust mutexes, RT scheduling, POSIX sched API P3: PerCpuSched struct, per-CPU wiring, work stealing, load balancing, initial placement P4: 64-shard futex table, REQUEUE, PI futexes (LOCK_PI/UNLOCK_PI/TRYLOCK_PI), robust futexes, vruntime tracking, min-vruntime SCHED_OTHER selection P5: setpriority/getpriority, pthread_setaffinity_np, pthread_setname_np, pthread_setschedparam (Redox) P6: Cache-affine scheduling (last_cpu + vruntime bonus), NUMA topology kernel hints + numad userspace daemon Stability fixes: make_consistent stores 0 (dead TID fix), cond.rs error propagation, SPIN_COUNT adaptive spinning, Sys::open &str fix, PI futex CAS race, proc.rs lock ordering, barrier destroy Patches: 33 kernel + 58 relibc patches, all tracked in recipes Docs: KERNEL-SCHEDULER-MULTITHREAD-IMPROVEMENT-PLAN.md updated, SCHEDULER-REVIEW-FINAL.md created Architecture: NUMA topology parsing stays userspace (numad daemon), kernel stores lightweight NumaTopology hints
283 lines
11 KiB
Diff
283 lines
11 KiB
Diff
diff --git a/src/syscall/debug.rs b/src/syscall/debug.rs
|
|
--- a/src/syscall/debug.rs
|
|
+++ b/src/syscall/debug.rs
|
|
@@
|
|
- SYS_FUTEX => format!(
|
|
- "futex({:#X} [{:?}], {}, {}, {}, {})",
|
|
+ SYS_FUTEX => format!(
|
|
+ "futex({:#X} [{:?}], {}, {}, {}, {}, {})",
|
|
b,
|
|
UserSlice::ro(b, 4).and_then(|buf| buf.read_u32()),
|
|
c,
|
|
d,
|
|
e,
|
|
- f
|
|
+ f,
|
|
+ g,
|
|
),
|
|
diff --git a/src/syscall/futex.rs b/src/syscall/futex.rs
|
|
--- a/src/syscall/futex.rs
|
|
+++ b/src/syscall/futex.rs
|
|
@@
|
|
-use crate::syscall::{
|
|
- data::TimeSpec,
|
|
- error::{Error, Result, EAGAIN, EFAULT, EINVAL, ETIMEDOUT},
|
|
- flag::{FUTEX_WAIT, FUTEX_WAIT64, FUTEX_WAKE},
|
|
-};
|
|
+use crate::syscall::{
|
|
+ data::TimeSpec,
|
|
+ error::{Error, Result, EAGAIN, EFAULT, EINVAL, ETIMEDOUT},
|
|
+ flag::{FUTEX_REQUEUE, FUTEX_WAIT, FUTEX_WAIT64, FUTEX_WAKE},
|
|
+};
|
|
+
|
|
+const FUTEX_CMP_REQUEUE: usize = 4;
|
|
@@
|
|
pub struct FutexEntry {
|
|
@@
|
|
}
|
|
+
|
|
+fn validate_futex_u32_addr(addr: usize) -> Result<VirtualAddress> {
|
|
+ if !addr.is_multiple_of(4) {
|
|
+ return Err(Error::new(EINVAL));
|
|
+ }
|
|
+ Ok(VirtualAddress::new(addr))
|
|
+}
|
|
+
|
|
+fn lock_futex_pair<R>(
|
|
+ first_shard: usize,
|
|
+ second_shard: usize,
|
|
+ token: &mut CleanLockToken,
|
|
+ f: impl FnOnce(&mut FutexList, Option<&mut FutexList>, crate::sync::LockToken<'_, L1>) -> R,
|
|
+) -> R {
|
|
+ if first_shard == second_shard {
|
|
+ let mut guard = FUTEXES[first_shard].lock(token.token());
|
|
+ let (map, map_token) = guard.token_split();
|
|
+ return f(map, None, map_token);
|
|
+ }
|
|
+
|
|
+ let low = core::cmp::min(first_shard, second_shard);
|
|
+ let high = core::cmp::max(first_shard, second_shard);
|
|
+
|
|
+ let mut low_guard = FUTEXES[low].lock(token.token());
|
|
+ let (low_map, low_token) = low_guard.token_split();
|
|
+ let mut high_guard = unsafe { FUTEXES[high].relock(low_token) };
|
|
+ let (high_map, high_token) = high_guard.token_split();
|
|
+
|
|
+ if first_shard == low {
|
|
+ f(low_map, Some(high_map), high_token)
|
|
+ } else {
|
|
+ f(high_map, Some(low_map), high_token)
|
|
+ }
|
|
+}
|
|
@@
|
|
-pub fn futex(
|
|
- addr: usize,
|
|
- op: usize,
|
|
- val: usize,
|
|
- val2: usize,
|
|
- _addr2: usize,
|
|
- token: &mut CleanLockToken,
|
|
-) -> Result<usize> {
|
|
+pub fn futex(
|
|
+ addr: usize,
|
|
+ op: usize,
|
|
+ val: usize,
|
|
+ val2: usize,
|
|
+ addr2: usize,
|
|
+ val3: usize,
|
|
+ token: &mut CleanLockToken,
|
|
+) -> Result<usize> {
|
|
@@
|
|
- {
|
|
- // TODO: Lock ordering violation
|
|
- let mut token = unsafe { CleanLockToken::new() };
|
|
- let mut futexes = FUTEXES[futex_shard(target_physaddr)].lock(token.token());
|
|
- let (futexes, mut token) = futexes.token_split();
|
|
+ loop {
|
|
+ let shard = futex_shard(target_physaddr);
|
|
+ let queued = {
|
|
+ let mut futexes = FUTEXES[shard].lock(token.token());
|
|
+ let (futexes, mut futex_token) = futexes.token_split();
|
|
+ let addr_space_guard = current_addrsp.acquire_read(futex_token.downgrade());
|
|
+ let locked_physaddr = validate_and_translate_virt(&addr_space_guard, target_virtaddr)
|
|
+ .ok_or(Error::new(EFAULT))?;
|
|
+ if locked_physaddr != target_physaddr {
|
|
+ false
|
|
+ } else {
|
|
+ drop(addr_space_guard);
|
|
@@
|
|
- futexes
|
|
- .entry(target_physaddr)
|
|
- .or_insert_with(Vec::new)
|
|
- .push(FutexEntry {
|
|
- target_virtaddr,
|
|
- context_lock: context_lock.clone(),
|
|
- addr_space: Arc::downgrade(¤t_addrsp),
|
|
- });
|
|
- }
|
|
+ futexes
|
|
+ .entry(locked_physaddr)
|
|
+ .or_insert_with(Vec::new)
|
|
+ .push(FutexEntry {
|
|
+ target_virtaddr,
|
|
+ context_lock: context_lock.clone(),
|
|
+ addr_space: Arc::downgrade(¤t_addrsp),
|
|
+ });
|
|
+ true
|
|
+ }
|
|
+ };
|
|
+
|
|
+ if queued {
|
|
+ break;
|
|
+ }
|
|
+ }
|
|
@@
|
|
- drop(addr_space_guard);
|
|
-
|
|
context::switch(token);
|
|
@@
|
|
FUTEX_WAKE => {
|
|
@@
|
|
Ok(woken)
|
|
}
|
|
+ FUTEX_REQUEUE | FUTEX_CMP_REQUEUE => {
|
|
+ let _ = validate_futex_u32_addr(addr)?;
|
|
+ let target2_virtaddr = validate_futex_u32_addr(addr2)?;
|
|
+ let target2_physaddr = {
|
|
+ let addr_space_guard = current_addrsp.acquire_read(token.downgrade());
|
|
+ validate_and_translate_virt(&addr_space_guard, target2_virtaddr)
|
|
+ .ok_or(Error::new(EFAULT))?
|
|
+ };
|
|
+ let source_shard = futex_shard(target_physaddr);
|
|
+ let target_shard = futex_shard(target2_physaddr);
|
|
+ let current_addrsp_weak = Arc::downgrade(¤t_addrsp);
|
|
+
|
|
+ let affected = lock_futex_pair(
|
|
+ source_shard,
|
|
+ target_shard,
|
|
+ token,
|
|
+ |source_map, target_map_opt, mut futex_token| {
|
|
+ let addr_space_guard = current_addrsp.acquire_read(futex_token.downgrade());
|
|
+ let locked_source_physaddr = validate_and_translate_virt(&addr_space_guard, target_virtaddr)
|
|
+ .ok_or(Error::new(EFAULT))?;
|
|
+ let locked_target_physaddr = validate_and_translate_virt(&addr_space_guard, target2_virtaddr)
|
|
+ .ok_or(Error::new(EFAULT))?;
|
|
+ drop(addr_space_guard);
|
|
+
|
|
+ if locked_source_physaddr != target_physaddr || locked_target_physaddr != target2_physaddr {
|
|
+ return Err(Error::new(EAGAIN));
|
|
+ }
|
|
+
|
|
+ if op == FUTEX_CMP_REQUEUE {
|
|
+ let accessible_addr = crate::memory::RmmA::phys_to_virt(locked_source_physaddr).data();
|
|
+ let current = u64::from(unsafe {
|
|
+ (*(accessible_addr as *const AtomicU32)).load(Ordering::SeqCst)
|
|
+ });
|
|
+ if current != u64::from(val3 as u32) {
|
|
+ return Err(Error::new(EAGAIN));
|
|
+ }
|
|
+ }
|
|
+
|
|
+ let mut source_waiters = source_map.remove(&locked_source_physaddr).unwrap_or_default();
|
|
+ let mut total_woken = 0;
|
|
+ let mut total_requeued = 0;
|
|
+
|
|
+ let wake_from = |waiters: &mut Vec<FutexEntry>, limit: usize, token: &mut crate::sync::LockToken<'_, L1>| {
|
|
+ let mut woken = 0;
|
|
+ let mut i = 0;
|
|
+ while i < waiters.len() && woken < limit {
|
|
+ let waiter = match waiters.get(i) {
|
|
+ Some(waiter) => waiter,
|
|
+ None => break,
|
|
+ };
|
|
+ if waiter.target_virtaddr != target_virtaddr || !current_addrsp_weak.ptr_eq(&waiter.addr_space) {
|
|
+ i += 1;
|
|
+ continue;
|
|
+ }
|
|
+ let waiter = waiters.swap_remove(i);
|
|
+ waiter.context_lock.write(token.token()).unblock();
|
|
+ woken += 1;
|
|
+ }
|
|
+ woken
|
|
+ };
|
|
+
|
|
+ total_woken = wake_from(&mut source_waiters, val, &mut futex_token);
|
|
+
|
|
+ if let Some(target_map) = target_map_opt {
|
|
+ let mut target_waiters = target_map.remove(&locked_target_physaddr).unwrap_or_default();
|
|
+ let mut i = 0;
|
|
+ while i < source_waiters.len() && total_requeued < val2 {
|
|
+ let should_move = source_waiters
|
|
+ .get(i)
|
|
+ .map(|waiter| {
|
|
+ waiter.target_virtaddr == target_virtaddr
|
|
+ && current_addrsp_weak.ptr_eq(&waiter.addr_space)
|
|
+ })
|
|
+ .unwrap_or(false);
|
|
+ if !should_move {
|
|
+ i += 1;
|
|
+ continue;
|
|
+ }
|
|
+ let mut waiter = source_waiters.swap_remove(i);
|
|
+ waiter.target_virtaddr = target2_virtaddr;
|
|
+ target_waiters.push(waiter);
|
|
+ total_requeued += 1;
|
|
+ }
|
|
+ if !target_waiters.is_empty() {
|
|
+ target_map.insert(locked_target_physaddr, target_waiters);
|
|
+ }
|
|
+ } else if locked_source_physaddr == locked_target_physaddr {
|
|
+ for waiter in source_waiters.iter_mut() {
|
|
+ if total_requeued >= val2 {
|
|
+ break;
|
|
+ }
|
|
+ if waiter.target_virtaddr == target_virtaddr && current_addrsp_weak.ptr_eq(&waiter.addr_space) {
|
|
+ waiter.target_virtaddr = target2_virtaddr;
|
|
+ total_requeued += 1;
|
|
+ }
|
|
+ }
|
|
+ } else {
|
|
+ let mut target_waiters = source_map.remove(&locked_target_physaddr).unwrap_or_default();
|
|
+ let mut i = 0;
|
|
+ while i < source_waiters.len() && total_requeued < val2 {
|
|
+ let should_move = source_waiters
|
|
+ .get(i)
|
|
+ .map(|waiter| {
|
|
+ waiter.target_virtaddr == target_virtaddr
|
|
+ && current_addrsp_weak.ptr_eq(&waiter.addr_space)
|
|
+ })
|
|
+ .unwrap_or(false);
|
|
+ if !should_move {
|
|
+ i += 1;
|
|
+ continue;
|
|
+ }
|
|
+ let mut waiter = source_waiters.swap_remove(i);
|
|
+ waiter.target_virtaddr = target2_virtaddr;
|
|
+ target_waiters.push(waiter);
|
|
+ total_requeued += 1;
|
|
+ }
|
|
+ if !target_waiters.is_empty() {
|
|
+ source_map.insert(locked_target_physaddr, target_waiters);
|
|
+ }
|
|
+ }
|
|
+
|
|
+ if !source_waiters.is_empty() {
|
|
+ source_map.insert(locked_source_physaddr, source_waiters);
|
|
+ }
|
|
+
|
|
+ Ok(total_woken + total_requeued)
|
|
+ },
|
|
+ )?;
|
|
+
|
|
+ Ok(affected)
|
|
+ }
|
|
_ => Err(Error::new(EINVAL)),
|
|
}
|
|
}
|
|
diff --git a/src/syscall/mod.rs b/src/syscall/mod.rs
|
|
--- a/src/syscall/mod.rs
|
|
+++ b/src/syscall/mod.rs
|
|
@@
|
|
- SYS_FUTEX => futex(b, c, d, e, f, token),
|
|
+ SYS_FUTEX => futex(b, c, d, e, f, g, token),
|