relibc: P5-robust-mutexes — apply Phase 0e patch
Re-apply P5-robust-mutexes.patch from local/patches/relibc/ to the local fork. Multi-threading plan Phase 0e.
This commit is contained in:
+211
-63
@@ -1,3 +1,4 @@
|
||||
use alloc::boxed::Box;
|
||||
use core::{
|
||||
cell::Cell,
|
||||
sync::atomic::{AtomicU32 as AtomicUint, Ordering},
|
||||
@@ -6,10 +7,9 @@ use core::{
|
||||
use crate::{
|
||||
error::Errno,
|
||||
header::{bits_timespec::timespec, errno::*, pthread::*},
|
||||
platform::{Pal, Sys, types::c_int},
|
||||
};
|
||||
|
||||
use crate::platform::{Pal, Sys, types::c_int};
|
||||
|
||||
use super::FutexWaitResult;
|
||||
|
||||
pub struct RlctMutex {
|
||||
@@ -21,15 +21,22 @@ pub struct RlctMutex {
|
||||
robust: bool,
|
||||
}
|
||||
|
||||
pub struct RobustMutexNode {
|
||||
pub next: *mut RobustMutexNode,
|
||||
pub prev: *mut RobustMutexNode,
|
||||
pub mutex: *const RlctMutex,
|
||||
}
|
||||
|
||||
const STATE_UNLOCKED: u32 = 0;
|
||||
const WAITING_BIT: u32 = 1 << 31;
|
||||
const INDEX_MASK: u32 = !WAITING_BIT;
|
||||
const FUTEX_OWNER_DIED: u32 = 1 << 30;
|
||||
const INDEX_MASK: u32 = !(WAITING_BIT | FUTEX_OWNER_DIED);
|
||||
|
||||
// TODO: Lower limit is probably better.
|
||||
const RECURSIVE_COUNT_MAX_INCLUSIVE: u32 = u32::MAX;
|
||||
// TODO: How many spins should we do before it becomes more time-economical to enter kernel mode
|
||||
// via futexes?
|
||||
const SPIN_COUNT: usize = 0;
|
||||
const SPIN_COUNT: usize = 100;
|
||||
|
||||
impl RlctMutex {
|
||||
pub(crate) fn new(attr: &RlctMutexAttr) -> Result<Self, Errno> {
|
||||
@@ -69,13 +76,25 @@ impl RlctMutex {
|
||||
Ok(0)
|
||||
}
|
||||
pub fn make_consistent(&self) -> Result<(), Errno> {
|
||||
todo_skip!(0, "pthread robust mutexes: not implemented");
|
||||
Ok(())
|
||||
debug_assert!(self.robust, "make_consistent called on non-robust mutex");
|
||||
|
||||
if !self.robust {
|
||||
return Err(Errno(EINVAL));
|
||||
}
|
||||
|
||||
let current = self.inner.load(Ordering::Relaxed);
|
||||
let owner = current & INDEX_MASK;
|
||||
|
||||
if owner == os_tid_invalid_after_fork() && current & FUTEX_OWNER_DIED != 0 {
|
||||
self.inner.store(0, Ordering::Release);
|
||||
Ok(())
|
||||
} else {
|
||||
Err(Errno(EINVAL))
|
||||
}
|
||||
}
|
||||
fn lock_inner(&self, deadline: Option<×pec>) -> Result<(), Errno> {
|
||||
let this_thread = os_tid_invalid_after_fork();
|
||||
|
||||
//let mut spins_left = SPIN_COUNT;
|
||||
let mut spins_left = SPIN_COUNT;
|
||||
|
||||
loop {
|
||||
let result = self.inner.compare_exchange_weak(
|
||||
@@ -86,45 +105,59 @@ impl RlctMutex {
|
||||
);
|
||||
|
||||
match result {
|
||||
// CAS succeeded
|
||||
Ok(_) => {
|
||||
if self.ty == Ty::Recursive {
|
||||
self.increment_recursive_count()?;
|
||||
}
|
||||
return Ok(());
|
||||
}
|
||||
// CAS failed, but the mutex was recursive and we already own the lock.
|
||||
Ok(_) => return self.finish_lock_acquire(false),
|
||||
Err(thread) if thread & INDEX_MASK == this_thread && self.ty == Ty::Recursive => {
|
||||
self.increment_recursive_count()?;
|
||||
return Ok(());
|
||||
}
|
||||
// CAS failed, but the mutex was error-checking and we already own the lock.
|
||||
Err(thread) if thread & INDEX_MASK == this_thread && self.ty == Ty::Errck => {
|
||||
return Err(Errno(EAGAIN));
|
||||
return Err(Errno(EDEADLK));
|
||||
}
|
||||
// CAS spuriously failed, simply retry the CAS. TODO: Use core::hint::spin_loop()?
|
||||
Err(thread) if thread & INDEX_MASK == 0 => {
|
||||
continue;
|
||||
Err(thread) if thread & FUTEX_OWNER_DIED != 0 && thread & INDEX_MASK == 0 => {
|
||||
return Err(Errno(ENOTRECOVERABLE));
|
||||
}
|
||||
// CAS failed because some other thread owned the lock. We must now wait.
|
||||
Err(thread) if thread & FUTEX_OWNER_DIED != 0 => {
|
||||
if !self.robust {
|
||||
return Err(Errno(ENOTRECOVERABLE));
|
||||
}
|
||||
|
||||
let new_value = (thread & WAITING_BIT) | FUTEX_OWNER_DIED | this_thread;
|
||||
match self.inner.compare_exchange(
|
||||
thread,
|
||||
new_value,
|
||||
Ordering::Acquire,
|
||||
Ordering::Relaxed,
|
||||
) {
|
||||
Ok(_) => return self.finish_lock_acquire(true),
|
||||
Err(_) => continue,
|
||||
}
|
||||
}
|
||||
Err(thread) if thread & INDEX_MASK == 0 => continue,
|
||||
Err(thread) => {
|
||||
/*if spins_left > 0 {
|
||||
// TODO: Faster to spin trying to load the flag, compared to CAS?
|
||||
let owner = thread & INDEX_MASK;
|
||||
|
||||
if !crate::pthread::mutex_owner_id_is_live(owner) {
|
||||
if !self.robust {
|
||||
return Err(Errno(ENOTRECOVERABLE));
|
||||
}
|
||||
|
||||
let new_value = (thread & WAITING_BIT) | FUTEX_OWNER_DIED | this_thread;
|
||||
match self.inner.compare_exchange(
|
||||
thread,
|
||||
new_value,
|
||||
Ordering::Acquire,
|
||||
Ordering::Relaxed,
|
||||
) {
|
||||
Ok(_) => return self.finish_lock_acquire(true),
|
||||
Err(_) => continue,
|
||||
}
|
||||
}
|
||||
|
||||
if spins_left > 0 {
|
||||
spins_left -= 1;
|
||||
core::hint::spin_loop();
|
||||
continue;
|
||||
}
|
||||
|
||||
spins_left = SPIN_COUNT;
|
||||
|
||||
let inner = self.inner.fetch_or(WAITING_BIT, Ordering::Relaxed);
|
||||
|
||||
if inner == STATE_UNLOCKED {
|
||||
continue;
|
||||
}*/
|
||||
|
||||
// If the mutex is not robust, simply futex_wait until unblocked.
|
||||
//crate::sync::futex_wait(&self.inner, inner | WAITING_BIT, None);
|
||||
if crate::sync::futex_wait(&self.inner, thread, deadline)
|
||||
== FutexWaitResult::TimedOut
|
||||
{
|
||||
@@ -140,6 +173,20 @@ impl RlctMutex {
|
||||
pub fn lock_with_timeout(&self, deadline: ×pec) -> Result<(), Errno> {
|
||||
self.lock_inner(Some(deadline))
|
||||
}
|
||||
fn finish_lock_acquire(&self, owner_dead: bool) -> Result<(), Errno> {
|
||||
if self.ty == Ty::Recursive {
|
||||
self.increment_recursive_count()?;
|
||||
}
|
||||
if self.robust {
|
||||
add_to_robust_list(self);
|
||||
}
|
||||
|
||||
if owner_dead {
|
||||
Err(Errno(EOWNERDEAD))
|
||||
} else {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
fn increment_recursive_count(&self) -> Result<(), Errno> {
|
||||
// We don't have to worry about asynchronous signals here, since pthread_mutex_trylock
|
||||
// is not async-signal-safe.
|
||||
@@ -161,41 +208,65 @@ impl RlctMutex {
|
||||
pub fn try_lock(&self) -> Result<(), Errno> {
|
||||
let this_thread = os_tid_invalid_after_fork();
|
||||
|
||||
// TODO: If recursive, omitting CAS may be faster if it is already owned by this thread.
|
||||
let result = self.inner.compare_exchange(
|
||||
STATE_UNLOCKED,
|
||||
this_thread,
|
||||
Ordering::Acquire,
|
||||
Ordering::Relaxed,
|
||||
);
|
||||
loop {
|
||||
let current = self.inner.load(Ordering::Relaxed);
|
||||
|
||||
if self.ty == Ty::Recursive {
|
||||
match result {
|
||||
Err(index) if index & INDEX_MASK != this_thread => return Err(Errno(EBUSY)),
|
||||
_ => (),
|
||||
if current == STATE_UNLOCKED {
|
||||
match self.inner.compare_exchange(
|
||||
STATE_UNLOCKED,
|
||||
this_thread,
|
||||
Ordering::Acquire,
|
||||
Ordering::Relaxed,
|
||||
) {
|
||||
Ok(_) => return self.finish_lock_acquire(false),
|
||||
Err(_) => continue,
|
||||
}
|
||||
}
|
||||
|
||||
self.increment_recursive_count()?;
|
||||
let owner = current & INDEX_MASK;
|
||||
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
match result {
|
||||
Ok(_) => Ok(()),
|
||||
Err(index) if index & INDEX_MASK == this_thread && self.ty == Ty::Errck => {
|
||||
Err(Errno(EDEADLK))
|
||||
if owner == this_thread && self.ty == Ty::Recursive {
|
||||
self.increment_recursive_count()?;
|
||||
return Ok(());
|
||||
}
|
||||
Err(_) => Err(Errno(EBUSY)),
|
||||
|
||||
if owner == this_thread && self.ty == Ty::Errck {
|
||||
return Err(Errno(EDEADLK));
|
||||
}
|
||||
|
||||
if current & FUTEX_OWNER_DIED != 0 && owner == 0 {
|
||||
return Err(Errno(ENOTRECOVERABLE));
|
||||
}
|
||||
|
||||
if current & FUTEX_OWNER_DIED != 0 || (owner != 0 && !crate::pthread::mutex_owner_id_is_live(owner)) {
|
||||
if !self.robust {
|
||||
return Err(Errno(ENOTRECOVERABLE));
|
||||
}
|
||||
|
||||
let new_value = (current & WAITING_BIT) | FUTEX_OWNER_DIED | this_thread;
|
||||
match self.inner.compare_exchange(
|
||||
current,
|
||||
new_value,
|
||||
Ordering::Acquire,
|
||||
Ordering::Relaxed,
|
||||
) {
|
||||
Ok(_) => return self.finish_lock_acquire(true),
|
||||
Err(_) => continue,
|
||||
}
|
||||
}
|
||||
|
||||
return Err(Errno(EBUSY));
|
||||
}
|
||||
}
|
||||
// Safe because we are not protecting any data.
|
||||
pub fn unlock(&self) -> Result<(), Errno> {
|
||||
let current = self.inner.load(Ordering::Relaxed);
|
||||
|
||||
if self.robust || matches!(self.ty, Ty::Recursive | Ty::Errck) {
|
||||
if self.inner.load(Ordering::Relaxed) & INDEX_MASK != os_tid_invalid_after_fork() {
|
||||
if current & INDEX_MASK != os_tid_invalid_after_fork() {
|
||||
return Err(Errno(EPERM));
|
||||
}
|
||||
|
||||
// TODO: Is this fence correct?
|
||||
core::sync::atomic::fence(Ordering::Acquire);
|
||||
}
|
||||
|
||||
@@ -208,18 +279,47 @@ impl RlctMutex {
|
||||
}
|
||||
}
|
||||
|
||||
self.inner.store(STATE_UNLOCKED, Ordering::Release);
|
||||
crate::sync::futex_wake(&self.inner, i32::MAX);
|
||||
/*let was_waiting = self.inner.swap(STATE_UNLOCKED, Ordering::Release) & WAITING_BIT != 0;
|
||||
if self.robust {
|
||||
remove_from_robust_list(self);
|
||||
}
|
||||
|
||||
if was_waiting {
|
||||
let _ = crate::sync::futex_wake(&self.inner, 1);
|
||||
}*/
|
||||
let new_state = if self.robust && current & FUTEX_OWNER_DIED != 0 {
|
||||
FUTEX_OWNER_DIED
|
||||
} else {
|
||||
STATE_UNLOCKED
|
||||
};
|
||||
|
||||
self.inner.store(new_state, Ordering::Release);
|
||||
crate::sync::futex_wake(&self.inner, i32::MAX);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) unsafe fn mark_robust_mutexes_dead(thread: &crate::pthread::Pthread) {
|
||||
let head = thread.robust_list_head.get();
|
||||
let this_thread = os_tid_invalid_after_fork();
|
||||
let mut node = unsafe { *head };
|
||||
|
||||
unsafe { *head = core::ptr::null_mut() };
|
||||
|
||||
while !node.is_null() {
|
||||
let next = unsafe { (*node).next };
|
||||
let mutex = unsafe { &*(*node).mutex };
|
||||
let current = mutex.inner.load(Ordering::Relaxed);
|
||||
|
||||
if current & INDEX_MASK == this_thread {
|
||||
mutex
|
||||
.inner
|
||||
.store((current & WAITING_BIT) | FUTEX_OWNER_DIED | this_thread, Ordering::Release);
|
||||
crate::sync::futex_wake(&mutex.inner, i32::MAX);
|
||||
}
|
||||
|
||||
unsafe { drop(Box::from_raw(node)) };
|
||||
node = next;
|
||||
}
|
||||
}
|
||||
|
||||
#[repr(u8)]
|
||||
#[derive(PartialEq)]
|
||||
enum Ty {
|
||||
@@ -237,6 +337,54 @@ enum Ty {
|
||||
#[thread_local]
|
||||
static CACHED_OS_TID_INVALID_AFTER_FORK: Cell<u32> = Cell::new(0);
|
||||
|
||||
fn add_to_robust_list(mutex: &RlctMutex) {
|
||||
let thread = crate::pthread::current_thread().expect("current thread not present");
|
||||
let node_ptr = Box::into_raw(Box::new(RobustMutexNode {
|
||||
next: core::ptr::null_mut(),
|
||||
prev: core::ptr::null_mut(),
|
||||
mutex: core::ptr::from_ref(mutex),
|
||||
}));
|
||||
|
||||
unsafe {
|
||||
let head = thread.robust_list_head.get();
|
||||
if !(*head).is_null() {
|
||||
(**head).prev = node_ptr;
|
||||
}
|
||||
(*node_ptr).next = *head;
|
||||
*head = node_ptr;
|
||||
}
|
||||
}
|
||||
|
||||
fn remove_from_robust_list(mutex: &RlctMutex) {
|
||||
let thread = match crate::pthread::current_thread() {
|
||||
Some(thread) => thread,
|
||||
None => return,
|
||||
};
|
||||
|
||||
unsafe {
|
||||
let mut node = *thread.robust_list_head.get();
|
||||
|
||||
while !node.is_null() {
|
||||
if core::ptr::eq((*node).mutex, core::ptr::from_ref(mutex)) {
|
||||
if !(*node).prev.is_null() {
|
||||
(*(*node).prev).next = (*node).next;
|
||||
} else {
|
||||
*thread.robust_list_head.get() = (*node).next;
|
||||
}
|
||||
|
||||
if !(*node).next.is_null() {
|
||||
(*(*node).next).prev = (*node).prev;
|
||||
}
|
||||
|
||||
drop(Box::from_raw(node));
|
||||
return;
|
||||
}
|
||||
|
||||
node = (*node).next;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Assumes TIDs are unique between processes, which I only know is true for Redox.
|
||||
fn os_tid_invalid_after_fork() -> u32 {
|
||||
// TODO: Coordinate better if using shared == PTHREAD_PROCESS_SHARED, with up to 2^32 separate
|
||||
|
||||
Reference in New Issue
Block a user