diff --git a/src/sync/pthread_mutex.rs b/src/sync/pthread_mutex.rs index 29bad63..af0c429 100644 --- a/src/sync/pthread_mutex.rs +++ b/src/sync/pthread_mutex.rs @@ -1,3 +1,4 @@ +use alloc::boxed::Box; use core::{ cell::Cell, sync::atomic::{AtomicU32 as AtomicUint, Ordering}, @@ -6,10 +7,9 @@ use core::{ use crate::{ error::Errno, header::{bits_timespec::timespec, errno::*, pthread::*}, + platform::{Pal, Sys, types::c_int}, }; -use crate::platform::{Pal, Sys, types::c_int}; - use super::FutexWaitResult; pub struct RlctMutex { @@ -21,15 +21,22 @@ pub struct RlctMutex { robust: bool, } +pub struct RobustMutexNode { + pub next: *mut RobustMutexNode, + pub prev: *mut RobustMutexNode, + pub mutex: *const RlctMutex, +} + const STATE_UNLOCKED: u32 = 0; const WAITING_BIT: u32 = 1 << 31; -const INDEX_MASK: u32 = !WAITING_BIT; +const FUTEX_OWNER_DIED: u32 = 1 << 30; +const INDEX_MASK: u32 = !(WAITING_BIT | FUTEX_OWNER_DIED); // TODO: Lower limit is probably better. const RECURSIVE_COUNT_MAX_INCLUSIVE: u32 = u32::MAX; // TODO: How many spins should we do before it becomes more time-economical to enter kernel mode // via futexes? -const SPIN_COUNT: usize = 0; +const SPIN_COUNT: usize = 100; impl RlctMutex { pub(crate) fn new(attr: &RlctMutexAttr) -> Result { @@ -69,13 +76,25 @@ impl RlctMutex { Ok(0) } pub fn make_consistent(&self) -> Result<(), Errno> { - todo_skip!(0, "pthread robust mutexes: not implemented"); - Ok(()) + debug_assert!(self.robust, "make_consistent called on non-robust mutex"); + + if !self.robust { + return Err(Errno(EINVAL)); + } + + let current = self.inner.load(Ordering::Relaxed); + let owner = current & INDEX_MASK; + + if owner == os_tid_invalid_after_fork() && current & FUTEX_OWNER_DIED != 0 { + self.inner.store(0, Ordering::Release); + Ok(()) + } else { + Err(Errno(EINVAL)) + } } fn lock_inner(&self, deadline: Option<×pec>) -> Result<(), Errno> { let this_thread = os_tid_invalid_after_fork(); - - //let mut spins_left = SPIN_COUNT; + let mut spins_left = SPIN_COUNT; loop { let result = self.inner.compare_exchange_weak( @@ -86,45 +105,59 @@ impl RlctMutex { ); match result { - // CAS succeeded - Ok(_) => { - if self.ty == Ty::Recursive { - self.increment_recursive_count()?; - } - return Ok(()); - } - // CAS failed, but the mutex was recursive and we already own the lock. + Ok(_) => return self.finish_lock_acquire(false), Err(thread) if thread & INDEX_MASK == this_thread && self.ty == Ty::Recursive => { self.increment_recursive_count()?; return Ok(()); } - // CAS failed, but the mutex was error-checking and we already own the lock. Err(thread) if thread & INDEX_MASK == this_thread && self.ty == Ty::Errck => { - return Err(Errno(EAGAIN)); + return Err(Errno(EDEADLK)); } - // CAS spuriously failed, simply retry the CAS. TODO: Use core::hint::spin_loop()? - Err(thread) if thread & INDEX_MASK == 0 => { - continue; + Err(thread) if thread & FUTEX_OWNER_DIED != 0 && thread & INDEX_MASK == 0 => { + return Err(Errno(ENOTRECOVERABLE)); } - // CAS failed because some other thread owned the lock. We must now wait. + Err(thread) if thread & FUTEX_OWNER_DIED != 0 => { + if !self.robust { + return Err(Errno(ENOTRECOVERABLE)); + } + + let new_value = (thread & WAITING_BIT) | FUTEX_OWNER_DIED | this_thread; + match self.inner.compare_exchange( + thread, + new_value, + Ordering::Acquire, + Ordering::Relaxed, + ) { + Ok(_) => return self.finish_lock_acquire(true), + Err(_) => continue, + } + } + Err(thread) if thread & INDEX_MASK == 0 => continue, Err(thread) => { - /*if spins_left > 0 { - // TODO: Faster to spin trying to load the flag, compared to CAS? + let owner = thread & INDEX_MASK; + + if !crate::pthread::mutex_owner_id_is_live(owner) { + if !self.robust { + return Err(Errno(ENOTRECOVERABLE)); + } + + let new_value = (thread & WAITING_BIT) | FUTEX_OWNER_DIED | this_thread; + match self.inner.compare_exchange( + thread, + new_value, + Ordering::Acquire, + Ordering::Relaxed, + ) { + Ok(_) => return self.finish_lock_acquire(true), + Err(_) => continue, + } + } + + if spins_left > 0 { spins_left -= 1; core::hint::spin_loop(); continue; } - - spins_left = SPIN_COUNT; - - let inner = self.inner.fetch_or(WAITING_BIT, Ordering::Relaxed); - - if inner == STATE_UNLOCKED { - continue; - }*/ - - // If the mutex is not robust, simply futex_wait until unblocked. - //crate::sync::futex_wait(&self.inner, inner | WAITING_BIT, None); if crate::sync::futex_wait(&self.inner, thread, deadline) == FutexWaitResult::TimedOut { @@ -140,6 +173,20 @@ impl RlctMutex { pub fn lock_with_timeout(&self, deadline: ×pec) -> Result<(), Errno> { self.lock_inner(Some(deadline)) } + fn finish_lock_acquire(&self, owner_dead: bool) -> Result<(), Errno> { + if self.ty == Ty::Recursive { + self.increment_recursive_count()?; + } + if self.robust { + add_to_robust_list(self); + } + + if owner_dead { + Err(Errno(EOWNERDEAD)) + } else { + Ok(()) + } + } fn increment_recursive_count(&self) -> Result<(), Errno> { // We don't have to worry about asynchronous signals here, since pthread_mutex_trylock // is not async-signal-safe. @@ -161,41 +208,65 @@ impl RlctMutex { pub fn try_lock(&self) -> Result<(), Errno> { let this_thread = os_tid_invalid_after_fork(); - // TODO: If recursive, omitting CAS may be faster if it is already owned by this thread. - let result = self.inner.compare_exchange( - STATE_UNLOCKED, - this_thread, - Ordering::Acquire, - Ordering::Relaxed, - ); + loop { + let current = self.inner.load(Ordering::Relaxed); + + if current == STATE_UNLOCKED { + match self.inner.compare_exchange( + STATE_UNLOCKED, + this_thread, + Ordering::Acquire, + Ordering::Relaxed, + ) { + Ok(_) => return self.finish_lock_acquire(false), + Err(_) => continue, + } + } - if self.ty == Ty::Recursive { - match result { - Err(index) if index & INDEX_MASK != this_thread => return Err(Errno(EBUSY)), - _ => (), + let owner = current & INDEX_MASK; + + if owner == this_thread && self.ty == Ty::Recursive { + self.increment_recursive_count()?; + return Ok(()); } - self.increment_recursive_count()?; + if owner == this_thread && self.ty == Ty::Errck { + return Err(Errno(EDEADLK)); + } - return Ok(()); - } + if current & FUTEX_OWNER_DIED != 0 && owner == 0 { + return Err(Errno(ENOTRECOVERABLE)); + } - match result { - Ok(_) => Ok(()), - Err(index) if index & INDEX_MASK == this_thread && self.ty == Ty::Errck => { - Err(Errno(EDEADLK)) + if current & FUTEX_OWNER_DIED != 0 || (owner != 0 && !crate::pthread::mutex_owner_id_is_live(owner)) { + if !self.robust { + return Err(Errno(ENOTRECOVERABLE)); + } + + let new_value = (current & WAITING_BIT) | FUTEX_OWNER_DIED | this_thread; + match self.inner.compare_exchange( + current, + new_value, + Ordering::Acquire, + Ordering::Relaxed, + ) { + Ok(_) => return self.finish_lock_acquire(true), + Err(_) => continue, + } } - Err(_) => Err(Errno(EBUSY)), + + return Err(Errno(EBUSY)); } } // Safe because we are not protecting any data. pub fn unlock(&self) -> Result<(), Errno> { + let current = self.inner.load(Ordering::Relaxed); + if self.robust || matches!(self.ty, Ty::Recursive | Ty::Errck) { - if self.inner.load(Ordering::Relaxed) & INDEX_MASK != os_tid_invalid_after_fork() { + if current & INDEX_MASK != os_tid_invalid_after_fork() { return Err(Errno(EPERM)); } - // TODO: Is this fence correct? core::sync::atomic::fence(Ordering::Acquire); } @@ -208,18 +279,47 @@ impl RlctMutex { } } - self.inner.store(STATE_UNLOCKED, Ordering::Release); - crate::sync::futex_wake(&self.inner, i32::MAX); - /*let was_waiting = self.inner.swap(STATE_UNLOCKED, Ordering::Release) & WAITING_BIT != 0; + if self.robust { + remove_from_robust_list(self); + } - if was_waiting { - let _ = crate::sync::futex_wake(&self.inner, 1); - }*/ + let new_state = if self.robust && current & FUTEX_OWNER_DIED != 0 { + FUTEX_OWNER_DIED + } else { + STATE_UNLOCKED + }; + + self.inner.store(new_state, Ordering::Release); + crate::sync::futex_wake(&self.inner, i32::MAX); Ok(()) } } +pub(crate) unsafe fn mark_robust_mutexes_dead(thread: &crate::pthread::Pthread) { + let head = thread.robust_list_head.get(); + let this_thread = os_tid_invalid_after_fork(); + let mut node = unsafe { *head }; + + unsafe { *head = core::ptr::null_mut() }; + + while !node.is_null() { + let next = unsafe { (*node).next }; + let mutex = unsafe { &*(*node).mutex }; + let current = mutex.inner.load(Ordering::Relaxed); + + if current & INDEX_MASK == this_thread { + mutex + .inner + .store((current & WAITING_BIT) | FUTEX_OWNER_DIED | this_thread, Ordering::Release); + crate::sync::futex_wake(&mutex.inner, i32::MAX); + } + + unsafe { drop(Box::from_raw(node)) }; + node = next; + } +} + #[repr(u8)] #[derive(PartialEq)] enum Ty { @@ -237,6 +337,54 @@ enum Ty { #[thread_local] static CACHED_OS_TID_INVALID_AFTER_FORK: Cell = Cell::new(0); +fn add_to_robust_list(mutex: &RlctMutex) { + let thread = crate::pthread::current_thread().expect("current thread not present"); + let node_ptr = Box::into_raw(Box::new(RobustMutexNode { + next: core::ptr::null_mut(), + prev: core::ptr::null_mut(), + mutex: core::ptr::from_ref(mutex), + })); + + unsafe { + let head = thread.robust_list_head.get(); + if !(*head).is_null() { + (**head).prev = node_ptr; + } + (*node_ptr).next = *head; + *head = node_ptr; + } +} + +fn remove_from_robust_list(mutex: &RlctMutex) { + let thread = match crate::pthread::current_thread() { + Some(thread) => thread, + None => return, + }; + + unsafe { + let mut node = *thread.robust_list_head.get(); + + while !node.is_null() { + if core::ptr::eq((*node).mutex, core::ptr::from_ref(mutex)) { + if !(*node).prev.is_null() { + (*(*node).prev).next = (*node).next; + } else { + *thread.robust_list_head.get() = (*node).next; + } + + if !(*node).next.is_null() { + (*(*node).next).prev = (*node).prev; + } + + drop(Box::from_raw(node)); + return; + } + + node = (*node).next; + } + } +} + // Assumes TIDs are unique between processes, which I only know is true for Redox. fn os_tid_invalid_after_fork() -> u32 { // TODO: Coordinate better if using shared == PTHREAD_PROCESS_SHARED, with up to 2^32 separate