Files
RedBear-OS/src/pthread/mod.rs
T
vasilito 4b683014c9 relibc: PthreadFlags::FINISHED + robust_list_head + mutex_owner_id_is_live + comprehensive semaphores
P5-pthread-sigmask-race introduced PthreadFlags::FINISHED
handling in pthread_kill(). Add the FINISHED bit to PthreadFlags
(0x2) and set it in exit_current_thread() so a thread that
exited but whose memory has not been reaped is correctly
identified as finished.

P5-robust-mutexes references thread.robust_list_head and
crate::pthread::mutex_owner_id_is_live(). Add:

  - Pthread.robust_list_head: UnsafeCell<*mut RobustMutexNode>
    in src/pthread/mod.rs and src/ld_so/tcb.rs (both Pthread
    construction sites)

  - pub fn mutex_owner_id_is_live(owner: u32) -> bool in
    src/pthread/mod.rs that probes the thread via the proc
    scheme (Redox) or the OS_TID_TO_PTHREAD map (Linux)

P3-semaphore-comprehensive was un-applied at the merge state
because the next patch in the chain (P3-semaphore-varargs-header)
used the c_variadic unstable feature which is not enabled in
this toolchain. Restore the comprehensive semaphore code with
its original raw-pointer varargs extraction (which works in
Redox's ABI). The raw-pointer approach is fragile per the
multi-threading plan Oracle assessment (C2 finding) but is
the only option without enabling c_variadic; document this in
the patch as a known fragility.

The 52 cargo check errors about 'next_arg' are pre-existing
relibc host-check issues (the Rust stdlib renamed the method
from 'next_arg' to 'arg' but the relibc fork predates the
rename). They do not block the cookbook build (which
cross-compiles to x86_64-unknown-redox).
2026-07-02 07:45:14 +03:00

512 lines
16 KiB
Rust

//! Relibc Threads, or RLCT.
use core::{
cell::UnsafeCell,
ptr,
sync::atomic::{AtomicBool, AtomicUsize, Ordering},
};
use alloc::collections::BTreeMap;
use crate::{
error::Errno,
header::{errno::*, pthread as header, sched::sched_param, sys_mman},
ld_so::{ExpectTlsFree, tcb::Tcb},
platform::{Pal, Sys, types::*},
};
use crate::sync::{Mutex, waitval::Waitval};
/// Called only by the main thread, as part of relibc_start.
#[allow(unused_mut)]
pub unsafe fn init() {
let mut thread = Pthread {
waitval: Waitval::new(),
has_enabled_cancelation: AtomicBool::new(false),
has_queued_cancelation: AtomicBool::new(false),
flags: PthreadFlags::empty().bits().into(),
//index: FIRST_THREAD_IDX,
// TODO: set these values on Linux as well
stack_base: ptr::null_mut(),
stack_size: 0,
os_tid: UnsafeCell::new(Sys::current_os_tid()),
robust_list_head: UnsafeCell::new(core::ptr::null_mut()),
};
#[cfg(target_os = "redox")]
{
//TODO: what is the best way to get these values?
use redox_rt::arch::{STACK_SIZE, STACK_TOP};
thread.stack_base = (STACK_TOP - STACK_SIZE) as *mut c_void;
thread.stack_size = STACK_SIZE;
}
unsafe { Tcb::current() }
.expect_notls("no TCB present for main thread")
.pthread = thread;
}
//static NEXT_INDEX: AtomicU32 = AtomicU32::new(FIRST_THREAD_IDX + 1);
//const FIRST_THREAD_IDX: usize = 1;
pub unsafe fn terminate_from_main_thread() {
for (_, tcb) in OS_TID_TO_PTHREAD.lock().iter() {
let _ = unsafe { cancel(&(*tcb.0).pthread) };
}
}
bitflags::bitflags! {
pub struct PthreadFlags: usize {
const DETACHED = 1;
const FINISHED = 2;
}
}
#[derive(Debug)]
pub struct Pthread {
pub(crate) waitval: Waitval<Retval>,
pub(crate) has_queued_cancelation: AtomicBool,
pub(crate) has_enabled_cancelation: AtomicBool,
pub(crate) flags: AtomicUsize,
pub(crate) stack_base: *mut c_void,
pub(crate) stack_size: usize,
pub os_tid: UnsafeCell<OsTid>,
/// Head of the per-thread robust mutex list, used by the kernel
/// during thread exit to mark any held robust mutexes with
/// `FUTEX_OWNER_DIED` so a future `pthread_mutex_lock` on a
/// dead-owner mutex can recover via `EOWNERDEAD`. Maintained as
/// an `UnsafeCell<*mut RobustMutexNode>` because the kernel walks
/// it during thread exit. `null` = empty list.
pub(crate) robust_list_head: UnsafeCell<*mut crate::sync::pthread_mutex::RobustMutexNode>,
}
#[derive(Clone, Copy, Debug, Default, Ord, Eq, PartialOrd, PartialEq)]
pub struct OsTid {
#[cfg(target_os = "redox")]
pub thread_fd: usize,
#[cfg(target_os = "linux")]
pub thread_id: usize,
}
unsafe impl Send for Pthread {}
unsafe impl Sync for Pthread {}
#[derive(Clone, Copy, Debug)]
pub struct Retval(pub *mut c_void);
struct MmapGuard {
page_start: *mut c_void,
mmap_size: usize,
}
impl Drop for MmapGuard {
fn drop(&mut self) {
unsafe {
let _ = Sys::munmap(self.page_start, self.mmap_size);
}
}
}
#[allow(unused_mut)]
pub(crate) unsafe fn create(
attrs: Option<&header::RlctAttr>,
start_routine: extern "C" fn(arg: *mut c_void) -> *mut c_void,
arg: *mut c_void,
) -> Result<pthread_t, Errno> {
let attrs = attrs.cloned().unwrap_or_default();
let mut current_sigmask = 0_u64;
#[cfg(target_os = "redox")]
{
current_sigmask =
redox_rt::signal::get_sigmask().expect("failed to obtain sigprocmask for caller");
}
// Create a locked mutex, unlocked by the thread after it has started.
let synchronization_mutex = unsafe { Mutex::locked(current_sigmask) };
let synchronization_mutex = &synchronization_mutex;
let stack_size = attrs.stacksize.next_multiple_of(Sys::getpagesize());
let stack_base = if attrs.stack != 0 {
attrs.stack as *mut c_void
} else {
let ret = unsafe {
sys_mman::mmap(
core::ptr::null_mut(),
stack_size,
sys_mman::PROT_READ | sys_mman::PROT_WRITE,
sys_mman::MAP_PRIVATE | sys_mman::MAP_ANONYMOUS,
-1,
0,
)
};
if ret as isize == -1 {
// "Insufficient resources"
return Err(Errno(EAGAIN));
}
ret
};
let mut flags = PthreadFlags::empty();
match i32::from(attrs.detachstate) {
header::PTHREAD_CREATE_DETACHED => flags |= PthreadFlags::DETACHED,
header::PTHREAD_CREATE_JOINABLE => (),
other => unreachable!("unknown detachstate {}", other),
}
let stack_raii = MmapGuard {
page_start: stack_base,
mmap_size: stack_size,
};
let current_tcb = unsafe { Tcb::current() }.expect("no TCB!");
let new_tcb = unsafe { Tcb::new(current_tcb.tls_len) }.map_err(|_| Errno(ENOMEM))?;
new_tcb.pthread.flags = flags.bits().into();
new_tcb.pthread.stack_base = stack_base;
new_tcb.pthread.stack_size = stack_size;
new_tcb.masters_ptr = current_tcb.masters_ptr;
new_tcb.masters_len = current_tcb.masters_len;
new_tcb.linker_ptr = current_tcb.linker_ptr;
new_tcb.mspace = current_tcb.mspace;
let stack_end = unsafe { stack_base.add(stack_size) };
let mut stack = stack_end.cast::<usize>();
{
let mut push = |value: usize| {
stack = unsafe { stack.sub(1) };
unsafe { stack.write(value) };
};
if cfg!(target_arch = "aarch64") {
// Aarch64 requires the stack to be 16 byte aligned after
// the call instruction, unlike x86 which requires it to be
// aligned before the call instruction. As such push an
// extra word on the stack to align the stack to 16 bytes.
push(0);
}
push(0);
push(0);
push(ptr::from_ref(synchronization_mutex) as usize);
push(ptr::from_mut(new_tcb) as usize);
push(arg as usize);
push(start_routine as usize);
push(new_thread_shim as *const () as usize);
}
let Ok(os_tid) = (unsafe { Sys::rlct_clone(stack, &mut new_tcb.os_specific) }) else {
return Err(Errno(EAGAIN));
};
core::mem::forget(stack_raii);
let _ = synchronization_mutex.lock();
OS_TID_TO_PTHREAD
.lock()
.insert(os_tid, ForceSendSync(new_tcb));
Ok(&raw const new_tcb.pthread as *mut _)
}
/// A shim to wrap thread entry points in logic to set up TLS, for example
unsafe extern "C" fn new_thread_shim(
entry_point: unsafe extern "C" fn(*mut c_void) -> *mut c_void,
arg: *mut c_void,
tcb: *mut Tcb,
synchronization_mutex: *const Mutex<u64>,
) -> ! {
let tcb = unsafe { tcb.as_mut() }.expect_notls("non-null TLS is required");
#[cfg(not(target_os = "redox"))]
{
unsafe { tcb.activate() };
}
#[cfg(target_os = "redox")]
{
// `thr_fd` in `tcb` is set by [`Sys::rlct_clone`] *before* jumping to
// the entry point of the new thread.
unsafe {
tcb.activate(None);
}
redox_rt::signal::setup_sighandler(&tcb.os_specific, false);
}
let procmask = unsafe { (&*synchronization_mutex).as_ptr().read() };
unsafe { tcb.copy_masters() }.unwrap();
unsafe { (*tcb).pthread.os_tid.get().write(Sys::current_os_tid()) };
unsafe { (&*synchronization_mutex).manual_unlock() };
#[cfg(target_os = "redox")]
{
redox_rt::signal::set_sigmask(Some(procmask), None)
.expect("failed to set procmask in child thread");
}
let retval = unsafe { entry_point(arg) };
unsafe { exit_current_thread(Retval(retval)) }
}
pub unsafe fn join(thread: &Pthread) -> Result<Retval, Errno> {
// We don't have to return EDEADLK, but unlike e.g. pthread_t lifetime checking, it's a
// relatively easy check.
if core::ptr::eq(
thread,
current_thread().expect("current thread not present"),
) {
return Err(Errno(EDEADLK));
}
// Waitval starts locked, and is unlocked when the thread finishes.
let retval = *thread.waitval.wait();
// We have now awaited the thread and received its return value. POSIX states that the
// pthread_t of this thread, will no longer be valid. In practice, we can thus deallocate the
// thread state.
unsafe { dealloc_thread(thread) };
Ok(retval)
}
pub unsafe fn detach(thread: &Pthread) -> Result<(), Errno> {
thread
.flags
.fetch_or(PthreadFlags::DETACHED.bits(), Ordering::AcqRel);
Ok(())
}
pub fn current_thread() -> Option<&'static Pthread> {
unsafe { Tcb::current().map(|p| &p.pthread) }
}
pub unsafe fn testcancel() {
let this_thread = current_thread().expect("current thread not present");
if this_thread.has_queued_cancelation.load(Ordering::Acquire)
&& this_thread.has_enabled_cancelation.load(Ordering::Acquire)
{
unsafe { cancel_current_thread() };
}
}
pub unsafe fn exit_current_thread(retval: Retval) -> ! {
// Run pthread_cleanup_push/pthread_cleanup_pop destructors.
unsafe { header::run_destructor_stack() };
unsafe { header::tls::run_all_destructors() };
let this = current_thread().expect("failed to obtain current thread when exiting");
let stack_base = this.stack_base;
let stack_size = this.stack_size;
if this.flags.load(Ordering::Acquire) & PthreadFlags::DETACHED.bits() != 0 {
// When detached, the thread state no longer makes any sense, and can immediately be
// deallocated.
unsafe { dealloc_thread(this) };
} else {
// Mark the thread as finished so that pthread_kill() can return ESRCH
// instead of delivering a signal to a thread that has already exited.
this.flags.fetch_or(PthreadFlags::FINISHED.bits(), Ordering::AcqRel);
// When joinable, the return value should be made available to other threads.
unsafe { this.waitval.post(retval) };
}
unsafe { Sys::exit_thread(stack_base.cast(), stack_size) }
}
unsafe fn dealloc_thread(thread: &Pthread) {
// TODO: How should this be handled on Linux?
unsafe {
OS_TID_TO_PTHREAD.lock().remove(&thread.os_tid.get().read());
}
}
pub const SIGRT_RLCT_CANCEL: usize = 33;
pub const SIGRT_RLCT_TIMER: usize = 34;
unsafe extern "C" fn cancel_sighandler(_: c_int) {
unsafe { cancel_current_thread() };
}
unsafe fn cancel_current_thread() {
// Terminate the thread
unsafe { exit_current_thread(Retval(header::PTHREAD_CANCELED)) };
}
pub unsafe fn cancel(thread: &Pthread) -> Result<(), Errno> {
// TODO: What order should these atomic bools be accessed in?
thread.has_queued_cancelation.store(true, Ordering::Release);
if thread.has_enabled_cancelation.load(Ordering::Acquire) {
(unsafe { Sys::rlct_kill(thread.os_tid.get().read(), SIGRT_RLCT_CANCEL) })?;
}
Ok(())
}
pub fn set_sched_param(
_thread: &Pthread,
_policy: c_int,
_param: &sched_param,
) -> Result<(), Errno> {
// TODO
Ok(())
}
pub fn set_sched_priority(_thread: &Pthread, _prio: c_int) -> Result<(), Errno> {
// TODO
Ok(())
}
pub fn set_cancel_state(state: c_int) -> Result<c_int, Errno> {
let this_thread = current_thread().expect("current thread not present");
let was_cancelable = match state {
header::PTHREAD_CANCEL_ENABLE => {
let old = this_thread
.has_enabled_cancelation
.swap(true, Ordering::Release);
if this_thread.has_queued_cancelation.load(Ordering::Acquire) {
unsafe {
cancel_current_thread();
}
}
old
}
header::PTHREAD_CANCEL_DISABLE => this_thread
.has_enabled_cancelation
.swap(false, Ordering::Release),
_ => return Err(Errno(EINVAL)),
};
Ok(match was_cancelable {
true => header::PTHREAD_CANCEL_ENABLE,
false => header::PTHREAD_CANCEL_DISABLE,
})
}
pub fn set_cancel_type(ty: c_int) -> Result<c_int, Errno> {
let this_thread = current_thread().expect("current thread not present");
// TODO
match ty {
header::PTHREAD_CANCEL_DEFERRED => (),
header::PTHREAD_CANCEL_ASYNCHRONOUS => (),
_ => return Err(Errno(EINVAL)),
}
Ok(header::PTHREAD_CANCEL_DEFERRED)
}
pub fn get_cpu_clkid(thread: &Pthread) -> Result<clockid_t, Errno> {
// TODO
Err(Errno(ENOENT))
}
pub fn get_sched_param(thread: &Pthread) -> Result<(clockid_t, sched_param), Errno> {
todo!()
}
// TODO: Hash map?
// TODO: RwLock to improve perf?
static OS_TID_TO_PTHREAD: Mutex<BTreeMap<OsTid, ForceSendSync<*mut Tcb>>> =
Mutex::new(BTreeMap::new());
#[derive(Clone, Copy)]
struct ForceSendSync<T>(T);
unsafe impl<T> Send for ForceSendSync<T> {}
unsafe impl<T> Sync for ForceSendSync<T> {}
/*pub(crate) fn current_thread_index() -> u32 {
current_thread().expect("current thread not present").index
}*/
#[derive(Clone, Copy, Default, Debug)]
pub enum Pshared {
#[default]
Private,
Shared,
}
impl Pshared {
pub const fn from_raw(raw: c_int) -> Option<Self> {
Some(match raw {
header::PTHREAD_PROCESS_PRIVATE => Self::Private,
header::PTHREAD_PROCESS_SHARED => Self::Shared,
_ => return None,
})
}
pub const fn raw(self) -> c_int {
match self {
Self::Private => header::PTHREAD_PROCESS_PRIVATE,
Self::Shared => header::PTHREAD_PROCESS_SHARED,
}
}
}
/// Return `true` if the given mutex owner ID refers to a thread that
/// is still alive in this process. Used by `pthread_mutex_lock` to
/// detect robust-mutex dead-owner recovery (a thread exited while
/// holding a robust mutex; the kernel marked it with `FUTEX_OWNER_DIED`
/// and a future lock can recover via `EOWNERDEAD`).
///
/// On Redox, the owner ID is the kernel's `thread_fd` for the
/// thread. The kernel's `proc:` scheme lets us query whether a thread
/// handle is still alive by attempting an `open` on its name handle
/// (EOPNOTSUPP / EBADF / ENOENT -> dead).
///
/// On Linux, the owner ID is the `tid` (kernel task ID). A pthread
/// has not exited when its `tid` is in the live-process set; we
/// approximate this by checking our own `OS_TID_TO_PTHREAD` map,
/// which is populated at thread creation and removed at thread exit.
///
/// The fallback `false` is safe: a `false` return causes the lock
/// path to fall back to `EOWNERDEAD` recovery (treating the lock as
/// orphaned), which is the POSIX-allowed behavior for robust mutexes.
pub fn mutex_owner_id_is_live(owner: u32) -> bool {
if owner == 0 {
return false;
}
#[cfg(target_os = "redox")]
{
// For Redox, attempt to open the thread's "name" handle via
// the proc scheme. If the thread is alive, the open succeeds;
// if it has exited and been reaped, the open returns ENOENT.
let path = format!("proc:{}/name", owner);
let cstr = match crate::c_str::CStr::from_bytes_with_nul(path.as_bytes()) {
Ok(c) => c,
Err(_) => return false,
};
let fd = crate::platform::Sys::open(cstr.as_ptr(), crate::header::fcntl::O_RDONLY, 0);
if fd >= 0 {
let _ = crate::platform::Sys::close(fd);
true
} else {
false
}
}
#[cfg(target_os = "linux")]
{
// For Linux, the OS_TID_TO_PTHREAD map is maintained at
// thread creation (insert) and exit (remove). If owner is
// in the map, the thread is alive. We acquire the map lock
// briefly to avoid a TOCTOU race.
crate::pthread::OS_TID_TO_PTHREAD
.lock()
.iter()
.any(|(_os_tid, pthread)| {
let tid = unsafe { (*pthread).os_tid.get().read() };
tid.thread_id as u32 == owner
})
}
}