relibc: P3-barrier-smp-futex — apply Phase 0e patch
Re-apply P3-barrier-smp-futex.patch from local/patches/relibc/ to the local fork. Multi-threading plan Phase 0e.
This commit is contained in:
+40
-49
@@ -1,18 +1,34 @@
|
||||
use core::num::NonZeroU32;
|
||||
use core::{
|
||||
num::NonZeroU32,
|
||||
sync::atomic::{AtomicU32, Ordering},
|
||||
};
|
||||
|
||||
pub struct Barrier {
|
||||
original_count: NonZeroU32,
|
||||
// 4
|
||||
lock: crate::sync::Mutex<Inner>,
|
||||
// 16
|
||||
cvar: crate::header::pthread::RlctCond,
|
||||
cvar: FutexState,
|
||||
// 24
|
||||
}
|
||||
#[derive(Debug)]
|
||||
struct Inner {
|
||||
count: u32,
|
||||
// TODO: Overflows might be problematic... 64-bit?
|
||||
gen_id: u32,
|
||||
_unused0: u32,
|
||||
_unused1: u32,
|
||||
}
|
||||
|
||||
struct FutexState {
|
||||
count: AtomicU32,
|
||||
sense: AtomicU32,
|
||||
}
|
||||
|
||||
impl FutexState {
|
||||
const fn new(count: u32) -> Self {
|
||||
Self {
|
||||
count: AtomicU32::new(count),
|
||||
sense: AtomicU32::new(0),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub enum WaitResult {
|
||||
@@ -25,61 +41,36 @@ impl Barrier {
|
||||
Self {
|
||||
original_count: count,
|
||||
lock: crate::sync::Mutex::new(Inner {
|
||||
count: 0,
|
||||
gen_id: 0,
|
||||
_unused0: 0,
|
||||
_unused1: 0,
|
||||
}),
|
||||
cvar: crate::header::pthread::RlctCond::new(),
|
||||
cvar: FutexState::new(count.get()),
|
||||
}
|
||||
}
|
||||
pub fn wait(&self) -> WaitResult {
|
||||
let mut guard = self.lock.lock();
|
||||
let gen_id = guard.gen_id;
|
||||
let _ = &self.lock;
|
||||
let sense = self.cvar.sense.load(Ordering::Acquire);
|
||||
|
||||
guard.count += 1;
|
||||
|
||||
if guard.count == self.original_count.get() {
|
||||
guard.gen_id = guard.gen_id.wrapping_add(1);
|
||||
guard.count = 0;
|
||||
if let Ok(()) = self.cvar.broadcast() {}; // TODO handle error
|
||||
|
||||
drop(guard);
|
||||
if self.cvar.count.fetch_sub(1, Ordering::AcqRel) == 1 {
|
||||
self.cvar
|
||||
.count
|
||||
.store(self.original_count.get(), Ordering::Relaxed);
|
||||
self.cvar
|
||||
.sense
|
||||
.store(sense.wrapping_add(1), Ordering::Release);
|
||||
crate::sync::futex_wake(&self.cvar.sense, i32::MAX);
|
||||
|
||||
WaitResult::NotifiedAll
|
||||
} else {
|
||||
while guard.gen_id == gen_id {
|
||||
guard = self.cvar.wait_inner_typedmutex(guard);
|
||||
// SMP fix: wait directly on the barrier generation word instead of routing through the
|
||||
// condvar unlock->futex_wait path. If the last thread flips `sense` after we load it
|
||||
// but before our futex wait starts, the futex observes a stale value and returns
|
||||
// immediately instead of sleeping forever after a missed broadcast wakeup.
|
||||
while self.cvar.sense.load(Ordering::Acquire) == sense {
|
||||
let _ = crate::sync::futex_wait(&self.cvar.sense, sense, None);
|
||||
}
|
||||
|
||||
WaitResult::Waited
|
||||
}
|
||||
/*
|
||||
let mut guard = self.lock.lock();
|
||||
let Inner { count, gen_id } = *guard;
|
||||
|
||||
let last = self.original_count.get() - 1;
|
||||
|
||||
if count == last {
|
||||
eprintln!("last {:?}", *guard);
|
||||
guard.gen_id = guard.gen_id.wrapping_add(1);
|
||||
guard.count = 0;
|
||||
|
||||
drop(guard);
|
||||
|
||||
self.cvar.broadcast();
|
||||
|
||||
WaitResult::NotifiedAll
|
||||
} else {
|
||||
guard.count += 1;
|
||||
|
||||
while guard.count != last && guard.gen_id == gen_id {
|
||||
eprintln!("before {:?}", *guard);
|
||||
guard = self.cvar.wait_inner_typedmutex(guard);
|
||||
eprintln!("after {:?}", *guard);
|
||||
}
|
||||
|
||||
WaitResult::Waited
|
||||
}
|
||||
*/
|
||||
}
|
||||
}
|
||||
static LOCK: crate::sync::Mutex<()> = crate::sync::Mutex::new(());
|
||||
|
||||
Reference in New Issue
Block a user