diff --git a/src/percpu.rs b/src/percpu.rs --- a/src/percpu.rs +++ b/src/percpu.rs @@ -29,15 +29,17 @@ pub struct PerCpuSched { pub run_queues_lock: AtomicBool, pub balance: Cell<[usize; RUN_QUEUE_COUNT]>, pub last_queue: Cell, + pub last_balance_time: Cell, } impl PerCpuSched { pub const fn new() -> Self { const EMPTY: VecDeque = VecDeque::new(); Self { run_queues: SyncUnsafeCell::new([EMPTY; RUN_QUEUE_COUNT]), run_queues_lock: AtomicBool::new(false), balance: Cell::new([0; RUN_QUEUE_COUNT]), last_queue: Cell::new(0), + last_balance_time: Cell::new(0), } } diff --git a/src/context/switch.rs b/src/context/switch.rs --- a/src/context/switch.rs +++ b/src/context/switch.rs @@ -33,4 +33,6 @@ const SCHED_PRIO_TO_WEIGHT: [usize; 40] = [ 70, 56, 45, 36, 29, 23, 18, 15, ]; +const LOAD_BALANCE_INTERVAL_NS: u128 = 100_000_000; + static SCHED_STEAL_COUNT: AtomicUsize = AtomicUsize::new(0); @@ -101,6 +103,9 @@ pub fn tick(token: &mut CleanLockToken) { let new_ticks = ticks_cell.get() + 1; ticks_cell.set(new_ticks); + let balance_time = crate::time::monotonic(token); + maybe_balance_queues(token, percpu, balance_time); + // Trigger a context switch after every 3 ticks. if new_ticks >= 3 { switch(token); @@ -427,3 +432,104 @@ fn steal_work( None } + +fn queue_depth(percpu: &PercpuBlock) -> usize { + let mut sched_lock = SchedQueuesLock::new(&percpu.sched); + unsafe { + sched_lock + .queues_mut() + .iter() + .map(|queue| queue.len()) + .sum() + } +} + +fn migrate_one_context( + token: &mut CleanLockToken, + source_id: LogicalCpuId, + target_id: LogicalCpuId, + switch_time: u128, +) -> bool { + let Some(source) = get_percpu_block(source_id) else { + return false; + }; + let Some(target) = get_percpu_block(target_id) else { + return false; + }; + + let source_idle = source.switch_internals.idle_context(); + let moved = { + let mut source_lock = SchedQueuesLock::new(&source.sched); + let source_queues = unsafe { source_lock.queues_mut() }; + pop_movable_context(token, source_queues, target_id, switch_time, &source_idle) + }; + + let Some((prio, context_ref)) = moved else { + return false; + }; + + let mut target_lock = SchedQueuesLock::new(&target.sched); + unsafe { + target_lock.queues_mut()[prio].push_back(context_ref); + } + true +} + +fn maybe_balance_queues(token: &mut CleanLockToken, percpu: &PercpuBlock, balance_time: u128) { + if crate::cpu_count() <= 1 || percpu.cpu_id != LogicalCpuId::BSP { + return; + } + if balance_time.saturating_sub(percpu.sched.last_balance_time.get()) < LOAD_BALANCE_INTERVAL_NS + { + return; + } + + percpu.sched.last_balance_time.set(balance_time); + + let mut depths = Vec::new(); + let mut total_depth = 0usize; + for raw_id in 0..crate::cpu_count() { + let cpu_id = LogicalCpuId::new(raw_id); + let Some(cpu_percpu) = get_percpu_block(cpu_id) else { + continue; + }; + let depth = queue_depth(cpu_percpu); + total_depth += depth; + depths.push((cpu_id, depth)); + } + + if depths.len() <= 1 || total_depth == 0 { + return; + } + + let avg_depth = (total_depth + depths.len().saturating_sub(1)) / depths.len(); + + for target_index in 0..depths.len() { + if depths[target_index].1 != 0 { + continue; + } + + let mut source_index = None; + let mut source_depth = 0usize; + for (idx, &(_, depth)) in depths.iter().enumerate() { + if idx == target_index { + continue; + } + if depth > avg_depth + 1 && depth > source_depth { + source_index = Some(idx); + source_depth = depth; + } + } + + let Some(source_index) = source_index else { + continue; + }; + + let source_id = depths[source_index].0; + let target_id = depths[target_index].0; + if migrate_one_context(token, source_id, target_id, balance_time) { + depths[source_index].1 = depths[source_index].1.saturating_sub(1); + depths[target_index].1 += 1; + } + } +}