34360e1e4f
P0-P2: Barrier SMP, sigmask/pthread_kill races, robust mutexes, RT scheduling, POSIX sched API P3: PerCpuSched struct, per-CPU wiring, work stealing, load balancing, initial placement P4: 64-shard futex table, REQUEUE, PI futexes (LOCK_PI/UNLOCK_PI/TRYLOCK_PI), robust futexes, vruntime tracking, min-vruntime SCHED_OTHER selection P5: setpriority/getpriority, pthread_setaffinity_np, pthread_setname_np, pthread_setschedparam (Redox) P6: Cache-affine scheduling (last_cpu + vruntime bonus), NUMA topology kernel hints + numad userspace daemon Stability fixes: make_consistent stores 0 (dead TID fix), cond.rs error propagation, SPIN_COUNT adaptive spinning, Sys::open &str fix, PI futex CAS race, proc.rs lock ordering, barrier destroy Patches: 33 kernel + 58 relibc patches, all tracked in recipes Docs: KERNEL-SCHEDULER-MULTITHREAD-IMPROVEMENT-PLAN.md updated, SCHEDULER-REVIEW-FINAL.md created Architecture: NUMA topology parsing stays userspace (numad daemon), kernel stores lightweight NumaTopology hints
381 lines
13 KiB
Diff
381 lines
13 KiB
Diff
diff --git a/src/sync/pthread_mutex.rs b/src/sync/pthread_mutex.rs
|
|
index 29bad63..af0c429 100644
|
|
--- a/src/sync/pthread_mutex.rs
|
|
+++ b/src/sync/pthread_mutex.rs
|
|
@@ -1,3 +1,4 @@
|
|
+use alloc::boxed::Box;
|
|
use core::{
|
|
cell::Cell,
|
|
sync::atomic::{AtomicU32 as AtomicUint, Ordering},
|
|
@@ -6,10 +7,9 @@ use core::{
|
|
use crate::{
|
|
error::Errno,
|
|
header::{bits_timespec::timespec, errno::*, pthread::*},
|
|
+ platform::{Pal, Sys, types::c_int},
|
|
};
|
|
|
|
-use crate::platform::{Pal, Sys, types::c_int};
|
|
-
|
|
use super::FutexWaitResult;
|
|
|
|
pub struct RlctMutex {
|
|
@@ -21,15 +21,22 @@ pub struct RlctMutex {
|
|
robust: bool,
|
|
}
|
|
|
|
+pub struct RobustMutexNode {
|
|
+ pub next: *mut RobustMutexNode,
|
|
+ pub prev: *mut RobustMutexNode,
|
|
+ pub mutex: *const RlctMutex,
|
|
+}
|
|
+
|
|
const STATE_UNLOCKED: u32 = 0;
|
|
const WAITING_BIT: u32 = 1 << 31;
|
|
-const INDEX_MASK: u32 = !WAITING_BIT;
|
|
+const FUTEX_OWNER_DIED: u32 = 1 << 30;
|
|
+const INDEX_MASK: u32 = !(WAITING_BIT | FUTEX_OWNER_DIED);
|
|
|
|
// TODO: Lower limit is probably better.
|
|
const RECURSIVE_COUNT_MAX_INCLUSIVE: u32 = u32::MAX;
|
|
// TODO: How many spins should we do before it becomes more time-economical to enter kernel mode
|
|
// via futexes?
|
|
-const SPIN_COUNT: usize = 0;
|
|
+const SPIN_COUNT: usize = 100;
|
|
|
|
impl RlctMutex {
|
|
pub(crate) fn new(attr: &RlctMutexAttr) -> Result<Self, Errno> {
|
|
@@ -69,13 +76,25 @@ impl RlctMutex {
|
|
Ok(0)
|
|
}
|
|
pub fn make_consistent(&self) -> Result<(), Errno> {
|
|
- todo_skip!(0, "pthread robust mutexes: not implemented");
|
|
- Ok(())
|
|
+ debug_assert!(self.robust, "make_consistent called on non-robust mutex");
|
|
+
|
|
+ if !self.robust {
|
|
+ return Err(Errno(EINVAL));
|
|
+ }
|
|
+
|
|
+ let current = self.inner.load(Ordering::Relaxed);
|
|
+ let owner = current & INDEX_MASK;
|
|
+
|
|
+ if owner == os_tid_invalid_after_fork() && current & FUTEX_OWNER_DIED != 0 {
|
|
+ self.inner.store(0, Ordering::Release);
|
|
+ Ok(())
|
|
+ } else {
|
|
+ Err(Errno(EINVAL))
|
|
+ }
|
|
}
|
|
fn lock_inner(&self, deadline: Option<×pec>) -> Result<(), Errno> {
|
|
let this_thread = os_tid_invalid_after_fork();
|
|
-
|
|
- //let mut spins_left = SPIN_COUNT;
|
|
+ let mut spins_left = SPIN_COUNT;
|
|
|
|
loop {
|
|
let result = self.inner.compare_exchange_weak(
|
|
@@ -86,45 +105,59 @@ impl RlctMutex {
|
|
);
|
|
|
|
match result {
|
|
- // CAS succeeded
|
|
- Ok(_) => {
|
|
- if self.ty == Ty::Recursive {
|
|
- self.increment_recursive_count()?;
|
|
- }
|
|
- return Ok(());
|
|
- }
|
|
- // CAS failed, but the mutex was recursive and we already own the lock.
|
|
+ Ok(_) => return self.finish_lock_acquire(false),
|
|
Err(thread) if thread & INDEX_MASK == this_thread && self.ty == Ty::Recursive => {
|
|
self.increment_recursive_count()?;
|
|
return Ok(());
|
|
}
|
|
- // CAS failed, but the mutex was error-checking and we already own the lock.
|
|
Err(thread) if thread & INDEX_MASK == this_thread && self.ty == Ty::Errck => {
|
|
- return Err(Errno(EAGAIN));
|
|
+ return Err(Errno(EDEADLK));
|
|
}
|
|
- // CAS spuriously failed, simply retry the CAS. TODO: Use core::hint::spin_loop()?
|
|
- Err(thread) if thread & INDEX_MASK == 0 => {
|
|
- continue;
|
|
+ Err(thread) if thread & FUTEX_OWNER_DIED != 0 && thread & INDEX_MASK == 0 => {
|
|
+ return Err(Errno(ENOTRECOVERABLE));
|
|
}
|
|
- // CAS failed because some other thread owned the lock. We must now wait.
|
|
+ Err(thread) if thread & FUTEX_OWNER_DIED != 0 => {
|
|
+ if !self.robust {
|
|
+ return Err(Errno(ENOTRECOVERABLE));
|
|
+ }
|
|
+
|
|
+ let new_value = (thread & WAITING_BIT) | FUTEX_OWNER_DIED | this_thread;
|
|
+ match self.inner.compare_exchange(
|
|
+ thread,
|
|
+ new_value,
|
|
+ Ordering::Acquire,
|
|
+ Ordering::Relaxed,
|
|
+ ) {
|
|
+ Ok(_) => return self.finish_lock_acquire(true),
|
|
+ Err(_) => continue,
|
|
+ }
|
|
+ }
|
|
+ Err(thread) if thread & INDEX_MASK == 0 => continue,
|
|
Err(thread) => {
|
|
- /*if spins_left > 0 {
|
|
- // TODO: Faster to spin trying to load the flag, compared to CAS?
|
|
+ let owner = thread & INDEX_MASK;
|
|
+
|
|
+ if !crate::pthread::mutex_owner_id_is_live(owner) {
|
|
+ if !self.robust {
|
|
+ return Err(Errno(ENOTRECOVERABLE));
|
|
+ }
|
|
+
|
|
+ let new_value = (thread & WAITING_BIT) | FUTEX_OWNER_DIED | this_thread;
|
|
+ match self.inner.compare_exchange(
|
|
+ thread,
|
|
+ new_value,
|
|
+ Ordering::Acquire,
|
|
+ Ordering::Relaxed,
|
|
+ ) {
|
|
+ Ok(_) => return self.finish_lock_acquire(true),
|
|
+ Err(_) => continue,
|
|
+ }
|
|
+ }
|
|
+
|
|
+ if spins_left > 0 {
|
|
spins_left -= 1;
|
|
core::hint::spin_loop();
|
|
continue;
|
|
}
|
|
-
|
|
- spins_left = SPIN_COUNT;
|
|
-
|
|
- let inner = self.inner.fetch_or(WAITING_BIT, Ordering::Relaxed);
|
|
-
|
|
- if inner == STATE_UNLOCKED {
|
|
- continue;
|
|
- }*/
|
|
-
|
|
- // If the mutex is not robust, simply futex_wait until unblocked.
|
|
- //crate::sync::futex_wait(&self.inner, inner | WAITING_BIT, None);
|
|
if crate::sync::futex_wait(&self.inner, thread, deadline)
|
|
== FutexWaitResult::TimedOut
|
|
{
|
|
@@ -140,6 +173,20 @@ impl RlctMutex {
|
|
pub fn lock_with_timeout(&self, deadline: ×pec) -> Result<(), Errno> {
|
|
self.lock_inner(Some(deadline))
|
|
}
|
|
+ fn finish_lock_acquire(&self, owner_dead: bool) -> Result<(), Errno> {
|
|
+ if self.ty == Ty::Recursive {
|
|
+ self.increment_recursive_count()?;
|
|
+ }
|
|
+ if self.robust {
|
|
+ add_to_robust_list(self);
|
|
+ }
|
|
+
|
|
+ if owner_dead {
|
|
+ Err(Errno(EOWNERDEAD))
|
|
+ } else {
|
|
+ Ok(())
|
|
+ }
|
|
+ }
|
|
fn increment_recursive_count(&self) -> Result<(), Errno> {
|
|
// We don't have to worry about asynchronous signals here, since pthread_mutex_trylock
|
|
// is not async-signal-safe.
|
|
@@ -161,41 +208,65 @@ impl RlctMutex {
|
|
pub fn try_lock(&self) -> Result<(), Errno> {
|
|
let this_thread = os_tid_invalid_after_fork();
|
|
|
|
- // TODO: If recursive, omitting CAS may be faster if it is already owned by this thread.
|
|
- let result = self.inner.compare_exchange(
|
|
- STATE_UNLOCKED,
|
|
- this_thread,
|
|
- Ordering::Acquire,
|
|
- Ordering::Relaxed,
|
|
- );
|
|
+ loop {
|
|
+ let current = self.inner.load(Ordering::Relaxed);
|
|
+
|
|
+ if current == STATE_UNLOCKED {
|
|
+ match self.inner.compare_exchange(
|
|
+ STATE_UNLOCKED,
|
|
+ this_thread,
|
|
+ Ordering::Acquire,
|
|
+ Ordering::Relaxed,
|
|
+ ) {
|
|
+ Ok(_) => return self.finish_lock_acquire(false),
|
|
+ Err(_) => continue,
|
|
+ }
|
|
+ }
|
|
|
|
- if self.ty == Ty::Recursive {
|
|
- match result {
|
|
- Err(index) if index & INDEX_MASK != this_thread => return Err(Errno(EBUSY)),
|
|
- _ => (),
|
|
+ let owner = current & INDEX_MASK;
|
|
+
|
|
+ if owner == this_thread && self.ty == Ty::Recursive {
|
|
+ self.increment_recursive_count()?;
|
|
+ return Ok(());
|
|
}
|
|
|
|
- self.increment_recursive_count()?;
|
|
+ if owner == this_thread && self.ty == Ty::Errck {
|
|
+ return Err(Errno(EDEADLK));
|
|
+ }
|
|
|
|
- return Ok(());
|
|
- }
|
|
+ if current & FUTEX_OWNER_DIED != 0 && owner == 0 {
|
|
+ return Err(Errno(ENOTRECOVERABLE));
|
|
+ }
|
|
|
|
- match result {
|
|
- Ok(_) => Ok(()),
|
|
- Err(index) if index & INDEX_MASK == this_thread && self.ty == Ty::Errck => {
|
|
- Err(Errno(EDEADLK))
|
|
+ if current & FUTEX_OWNER_DIED != 0 || (owner != 0 && !crate::pthread::mutex_owner_id_is_live(owner)) {
|
|
+ if !self.robust {
|
|
+ return Err(Errno(ENOTRECOVERABLE));
|
|
+ }
|
|
+
|
|
+ let new_value = (current & WAITING_BIT) | FUTEX_OWNER_DIED | this_thread;
|
|
+ match self.inner.compare_exchange(
|
|
+ current,
|
|
+ new_value,
|
|
+ Ordering::Acquire,
|
|
+ Ordering::Relaxed,
|
|
+ ) {
|
|
+ Ok(_) => return self.finish_lock_acquire(true),
|
|
+ Err(_) => continue,
|
|
+ }
|
|
}
|
|
- Err(_) => Err(Errno(EBUSY)),
|
|
+
|
|
+ return Err(Errno(EBUSY));
|
|
}
|
|
}
|
|
// Safe because we are not protecting any data.
|
|
pub fn unlock(&self) -> Result<(), Errno> {
|
|
+ let current = self.inner.load(Ordering::Relaxed);
|
|
+
|
|
if self.robust || matches!(self.ty, Ty::Recursive | Ty::Errck) {
|
|
- if self.inner.load(Ordering::Relaxed) & INDEX_MASK != os_tid_invalid_after_fork() {
|
|
+ if current & INDEX_MASK != os_tid_invalid_after_fork() {
|
|
return Err(Errno(EPERM));
|
|
}
|
|
|
|
- // TODO: Is this fence correct?
|
|
core::sync::atomic::fence(Ordering::Acquire);
|
|
}
|
|
|
|
@@ -208,18 +279,47 @@ impl RlctMutex {
|
|
}
|
|
}
|
|
|
|
- self.inner.store(STATE_UNLOCKED, Ordering::Release);
|
|
- crate::sync::futex_wake(&self.inner, i32::MAX);
|
|
- /*let was_waiting = self.inner.swap(STATE_UNLOCKED, Ordering::Release) & WAITING_BIT != 0;
|
|
+ if self.robust {
|
|
+ remove_from_robust_list(self);
|
|
+ }
|
|
|
|
- if was_waiting {
|
|
- let _ = crate::sync::futex_wake(&self.inner, 1);
|
|
- }*/
|
|
+ let new_state = if self.robust && current & FUTEX_OWNER_DIED != 0 {
|
|
+ FUTEX_OWNER_DIED
|
|
+ } else {
|
|
+ STATE_UNLOCKED
|
|
+ };
|
|
+
|
|
+ self.inner.store(new_state, Ordering::Release);
|
|
+ crate::sync::futex_wake(&self.inner, i32::MAX);
|
|
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
+pub(crate) unsafe fn mark_robust_mutexes_dead(thread: &crate::pthread::Pthread) {
|
|
+ let head = thread.robust_list_head.get();
|
|
+ let this_thread = os_tid_invalid_after_fork();
|
|
+ let mut node = unsafe { *head };
|
|
+
|
|
+ unsafe { *head = core::ptr::null_mut() };
|
|
+
|
|
+ while !node.is_null() {
|
|
+ let next = unsafe { (*node).next };
|
|
+ let mutex = unsafe { &*(*node).mutex };
|
|
+ let current = mutex.inner.load(Ordering::Relaxed);
|
|
+
|
|
+ if current & INDEX_MASK == this_thread {
|
|
+ mutex
|
|
+ .inner
|
|
+ .store((current & WAITING_BIT) | FUTEX_OWNER_DIED | this_thread, Ordering::Release);
|
|
+ crate::sync::futex_wake(&mutex.inner, i32::MAX);
|
|
+ }
|
|
+
|
|
+ unsafe { drop(Box::from_raw(node)) };
|
|
+ node = next;
|
|
+ }
|
|
+}
|
|
+
|
|
#[repr(u8)]
|
|
#[derive(PartialEq)]
|
|
enum Ty {
|
|
@@ -237,6 +337,54 @@ enum Ty {
|
|
#[thread_local]
|
|
static CACHED_OS_TID_INVALID_AFTER_FORK: Cell<u32> = Cell::new(0);
|
|
|
|
+fn add_to_robust_list(mutex: &RlctMutex) {
|
|
+ let thread = crate::pthread::current_thread().expect("current thread not present");
|
|
+ let node_ptr = Box::into_raw(Box::new(RobustMutexNode {
|
|
+ next: core::ptr::null_mut(),
|
|
+ prev: core::ptr::null_mut(),
|
|
+ mutex: core::ptr::from_ref(mutex),
|
|
+ }));
|
|
+
|
|
+ unsafe {
|
|
+ let head = thread.robust_list_head.get();
|
|
+ if !(*head).is_null() {
|
|
+ (**head).prev = node_ptr;
|
|
+ }
|
|
+ (*node_ptr).next = *head;
|
|
+ *head = node_ptr;
|
|
+ }
|
|
+}
|
|
+
|
|
+fn remove_from_robust_list(mutex: &RlctMutex) {
|
|
+ let thread = match crate::pthread::current_thread() {
|
|
+ Some(thread) => thread,
|
|
+ None => return,
|
|
+ };
|
|
+
|
|
+ unsafe {
|
|
+ let mut node = *thread.robust_list_head.get();
|
|
+
|
|
+ while !node.is_null() {
|
|
+ if core::ptr::eq((*node).mutex, core::ptr::from_ref(mutex)) {
|
|
+ if !(*node).prev.is_null() {
|
|
+ (*(*node).prev).next = (*node).next;
|
|
+ } else {
|
|
+ *thread.robust_list_head.get() = (*node).next;
|
|
+ }
|
|
+
|
|
+ if !(*node).next.is_null() {
|
|
+ (*(*node).next).prev = (*node).prev;
|
|
+ }
|
|
+
|
|
+ drop(Box::from_raw(node));
|
|
+ return;
|
|
+ }
|
|
+
|
|
+ node = (*node).next;
|
|
+ }
|
|
+ }
|
|
+}
|
|
+
|
|
// Assumes TIDs are unique between processes, which I only know is true for Redox.
|
|
fn os_tid_invalid_after_fork() -> u32 {
|
|
// TODO: Coordinate better if using shared == PTHREAD_PROCESS_SHARED, with up to 2^32 separate
|