Files
RedBear-OS/local/patches/kernel/P8-work-stealing.patch
T
vasilito 34360e1e4f feat: P0-P6 kernel scheduler + relibc threading comprehensive implementation
P0-P2: Barrier SMP, sigmask/pthread_kill races, robust mutexes, RT scheduling, POSIX sched API
P3: PerCpuSched struct, per-CPU wiring, work stealing, load balancing, initial placement
P4: 64-shard futex table, REQUEUE, PI futexes (LOCK_PI/UNLOCK_PI/TRYLOCK_PI), robust futexes, vruntime tracking, min-vruntime SCHED_OTHER selection
P5: setpriority/getpriority, pthread_setaffinity_np, pthread_setname_np, pthread_setschedparam (Redox)
P6: Cache-affine scheduling (last_cpu + vruntime bonus), NUMA topology kernel hints + numad userspace daemon

Stability fixes: make_consistent stores 0 (dead TID fix), cond.rs error propagation, SPIN_COUNT adaptive spinning, Sys::open &str fix, PI futex CAS race, proc.rs lock ordering, barrier destroy

Patches: 33 kernel + 58 relibc patches, all tracked in recipes
Docs: KERNEL-SCHEDULER-MULTITHREAD-IMPROVEMENT-PLAN.md updated, SCHEDULER-REVIEW-FINAL.md created
Architecture: NUMA topology parsing stays userspace (numad daemon), kernel stores lightweight NumaTopology hints
2026-04-30 18:21:48 +01:00

191 lines
6.5 KiB
Diff

diff --git a/src/percpu.rs b/src/percpu.rs
--- a/src/percpu.rs
+++ b/src/percpu.rs
@@ -100,6 +100,14 @@ static ALL_PERCPU_BLOCKS: [AtomicPtr<PercpuBlock>; MAX_CPU_COUNT as usize] =
pub unsafe fn init_tlb_shootdown(id: LogicalCpuId, block: *mut PercpuBlock) {
ALL_PERCPU_BLOCKS[id.get() as usize].store(block, Ordering::Release)
}
+
+pub fn get_percpu_block(id: LogicalCpuId) -> Option<&'static PercpuBlock> {
+ unsafe {
+ ALL_PERCPU_BLOCKS[id.get() as usize]
+ .load(Ordering::Acquire)
+ .as_ref()
+ }
+}
pub fn get_all_stats() -> Vec<(LogicalCpuId, CpuStatsData)> {
diff --git a/src/context/switch.rs b/src/context/switch.rs
--- a/src/context/switch.rs
+++ b/src/context/switch.rs
@@ -7,15 +7,15 @@ use crate::{
self, arch, idle_contexts, idle_contexts_try, run_contexts, ArcContextLockWriteGuard,
Context, ContextLock, SchedPolicy, WeakContextRef, RUN_QUEUE_COUNT,
},
- cpu_set::LogicalCpuId,
+ cpu_set::{LogicalCpuId, LogicalCpuSet},
cpu_stats::{self, CpuState},
- percpu::{PerCpuSched, PercpuBlock},
+ percpu::{get_percpu_block, PerCpuSched, PercpuBlock},
sync::{ArcRwLockWriteGuard, CleanLockToken, LockToken, L1, L4},
};
use alloc::{sync::Arc, vec::Vec};
use core::{
cell::{Cell, RefCell},
hint, mem,
- sync::atomic::Ordering,
+ sync::atomic::{AtomicUsize, Ordering},
};
use syscall::PtraceFlags;
@@
+static SCHED_STEAL_COUNT: AtomicUsize = AtomicUsize::new(0);
+
+fn assign_context_to_cpu(context: &mut Context, cpu_id: LogicalCpuId) {
+ context.sched_affinity = LogicalCpuSet::empty();
+ context.sched_affinity.atomic_set(cpu_id);
+}
@@
+fn pop_movable_context(
+ token: &mut CleanLockToken,
+ queues: &mut [alloc::collections::VecDeque<WeakContextRef>; RUN_QUEUE_COUNT],
+ target_cpu: LogicalCpuId,
+ switch_time: u128,
+ idle_context: &Arc<ContextLock>,
+) -> Option<(usize, WeakContextRef)> {
+ for prio in 0..RUN_QUEUE_COUNT {
+ let len = queues[prio].len();
+ for _ in 0..len {
+ let Some(context_ref) = queues[prio].pop_front() else {
+ break;
+ };
+ let Some(context_lock) = context_ref.upgrade() else {
+ continue;
+ };
+ if Arc::ptr_eq(&context_lock, idle_context) {
+ queues[prio].push_back(context_ref);
+ continue;
+ }
+
+ let mut context_guard = unsafe { context_lock.write_arc() };
+ let sw = unsafe { update_stealable(&mut context_guard, switch_time) };
+ if let UpdateResult::CanSwitch = sw {
+ assign_context_to_cpu(&mut context_guard, target_cpu);
+ let moved_ref = WeakContextRef(Arc::downgrade(ArcContextLockWriteGuard::rwlock(
+ &context_guard,
+ )));
+ drop(context_guard);
+ return Some((prio, moved_ref));
+ }
+
+ if matches!(sw, UpdateResult::Blocked) {
+ idle_contexts(token.downgrade()).push_back(context_ref);
+ } else {
+ queues[prio].push_back(context_ref);
+ }
+ }
+ }
+
+ None
+}
+
+fn steal_work(
+ token: &mut CleanLockToken,
+ cpu_id: LogicalCpuId,
+ switch_time: u128,
+) -> Option<ArcContextLockWriteGuard> {
+ let cpu_count = crate::cpu_count();
+ if cpu_count <= 1 {
+ return None;
+ }
+
+ for offset in 1..cpu_count {
+ let victim_id = LogicalCpuId::new((cpu_id.get() + offset) % cpu_count);
+ let Some(victim) = get_percpu_block(victim_id) else {
+ continue;
+ };
+
+ let victim_idle = victim.switch_internals.idle_context();
+ let mut victim_lock = SchedQueuesLock::new(&victim.sched);
+ let victim_queues = unsafe { victim_lock.queues_mut() };
+
+ for prio in 0..RUN_QUEUE_COUNT {
+ let len = victim_queues[prio].len();
+ for _ in 0..len {
+ let Some(context_ref) = victim_queues[prio].pop_front() else {
+ break;
+ };
+ let Some(context_lock) = context_ref.upgrade() else {
+ continue;
+ };
+ if Arc::ptr_eq(&context_lock, &victim_idle) {
+ victim_queues[prio].push_back(context_ref);
+ continue;
+ }
+
+ let mut context_guard = unsafe { context_lock.write_arc() };
+ let sw = unsafe { update_stealable(&mut context_guard, switch_time) };
+ if let UpdateResult::CanSwitch = sw {
+ assign_context_to_cpu(&mut context_guard, cpu_id);
+ SCHED_STEAL_COUNT.fetch_add(1, Ordering::Relaxed);
+ return Some(context_guard);
+ }
+
+ if matches!(sw, UpdateResult::Blocked) {
+ idle_contexts(token.downgrade()).push_back(context_ref);
+ } else {
+ victim_queues[prio].push_back(context_ref);
+ }
+ }
+ }
+ }
+
+ None
+}
+
+unsafe fn update_stealable(context: &mut Context, switch_time: u128) -> UpdateResult {
+ if context.running {
+ return UpdateResult::Skip;
+ }
+ if context.status.is_soft_blocked()
+ && let Some(wake) = context.wake
+ && switch_time >= wake
+ {
+ context.wake = None;
+ context.unblock_no_ipi();
+ }
+ if context.status.is_runnable() {
+ UpdateResult::CanSwitch
+ } else {
+ UpdateResult::Blocked
+ }
+}
@@ -360,6 +469,10 @@ fn wakeup_contexts(token: &mut CleanLockToken, percpu: &PercpuBlock, switch_time
let mut sched_lock = SchedQueuesLock::new(&percpu.sched);
let run_queues = unsafe { sched_lock.queues_mut() };
for (prio, context_ref) in wakeups {
+ if let Some(context_lock) = context_ref.upgrade() {
+ let mut context_guard = unsafe { context_lock.write_arc() };
+ assign_context_to_cpu(&mut context_guard, percpu.cpu_id);
+ }
run_queues[prio].push_back(context_ref);
}
}
@@ -559,6 +672,16 @@ fn select_next_context(
);
return Ok(Some(next_context_guard));
}
+
+ if let Some(next_context_guard) = steal_work(token, cpu_id, switch_time) {
+ queue_previous_context(
+ token,
+ percpu,
+ &prev_context_lock,
+ prev_context_guard,
+ &idle_context,
+ );
+ return Ok(Some(next_context_guard));
+ }
let global_next = {
let contexts_data = run_contexts(token.token());