# eventfd kernel support — EventCounter implementation and scheme dispatch # Adds EventCounter struct with blocking read/write, semaphore mode, and wait conditions # Extends EventScheme to handle eventfd path-based open, read, write, close, fevent, kfpath diff --git a/src/event.rs b/src/event.rs index 7398145a..92e5793c 100644 --- a/src/event.rs +++ b/src/event.rs @@ -8,13 +8,14 @@ use crate::{ context, scheme::{self, SchemeExt, SchemeId}, sync::{ - CleanLockToken, LockToken, RwLock, RwLockReadGuard, RwLockWriteGuard, WaitQueue, L0, L1, L2, + CleanLockToken, LockToken, Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard, + WaitCondition, WaitQueue, L0, L1, L2, }, syscall::{ data::Event, - error::{Error, Result, EBADF}, - flag::EventFlags, - usercopy::UserSliceWo, + error::{Error, Result, EAGAIN, EBADF, EINVAL, EINTR}, + flag::{EVENT_READ, EVENT_WRITE, EventFlags}, + usercopy::{UserSliceRo, UserSliceWo}, }, }; @@ -25,6 +26,17 @@ pub struct EventQueue { queue: WaitQueue, } +const EVENTFD_COUNTER_MAX: u64 = u64::MAX - 1; +const EVENTFD_TAG_BIT: usize = 1usize << (usize::BITS - 1); + +pub struct EventCounter { + id: usize, + counter: Mutex, + read_condition: WaitCondition, + write_condition: WaitCondition, + semaphore: bool, +} + impl EventQueue { pub fn new(id: EventQueueId) -> EventQueue { EventQueue { @@ -91,19 +103,146 @@ impl EventQueue { } } +impl EventCounter { + pub fn new(id: usize, init: u64, semaphore: bool) -> EventCounter { + EventCounter { + id, + counter: Mutex::new(init), + read_condition: WaitCondition::new(), + write_condition: WaitCondition::new(), + semaphore, + } + } + + pub fn is_readable(&self, token: &mut CleanLockToken) -> bool { + *self.counter.lock(token.token()) > 0 + } + + pub fn is_writable(&self, token: &mut CleanLockToken) -> bool { + *self.counter.lock(token.token()) < EVENTFD_COUNTER_MAX + } + + pub fn read(&self, buf: UserSliceWo, block: bool, token: &mut CleanLockToken) -> Result { + if buf.len() < core::mem::size_of::() { + return Err(Error::new(EINVAL)); + } + + loop { + let counter = self.counter.lock(token.token()); + let (mut counter, mut token) = counter.into_split(); + + if *counter > 0 { + let value = if self.semaphore { + *counter -= 1; + 1 + } else { + let value = *counter; + *counter = 0; + value + }; + + buf.limit(core::mem::size_of::()) + .ok_or(Error::new(EINVAL))? + .copy_from_slice(&value.to_ne_bytes())?; + + trigger_locked( + GlobalSchemes::Event.scheme_id(), + self.id, + EVENT_WRITE, + token.token(), + ); + self.write_condition.notify_locked(token.token()); + + return Ok(core::mem::size_of::()); + } + + if !block { + return Err(Error::new(EAGAIN)); + } + + if !self + .read_condition + .wait(counter, "EventCounter::read", &mut token) + { + return Err(Error::new(EINTR)); + } + } + } + + pub fn write(&self, buf: UserSliceRo, block: bool, token: &mut CleanLockToken) -> Result { + if buf.len() != core::mem::size_of::() { + return Err(Error::new(EINVAL)); + } + + let value = unsafe { buf.read_exact::()? }; + if value == u64::MAX { + return Err(Error::new(EINVAL)); + } + + loop { + let counter = self.counter.lock(token.token()); + let (mut counter, mut token) = counter.into_split(); + + if EVENTFD_COUNTER_MAX - *counter >= value { + let was_zero = *counter == 0; + *counter += value; + + if was_zero && value != 0 { + trigger_locked( + GlobalSchemes::Event.scheme_id(), + self.id, + EVENT_READ, + token.token(), + ); + self.read_condition.notify_locked(token.token()); + } + + return Ok(core::mem::size_of::()); + } + + if !block { + return Err(Error::new(EAGAIN)); + } + + if !self + .write_condition + .wait(counter, "EventCounter::write", &mut token) + { + return Err(Error::new(EINTR)); + } + } + } + + pub fn into_drop(self, _token: LockToken<'_, L1>) { + drop(self); + } +} + pub type EventQueueList = HashMap>; +pub type EventCounterList = HashMap>; // Next queue id static NEXT_QUEUE_ID: AtomicUsize = AtomicUsize::new(0); +static NEXT_COUNTER_ID: AtomicUsize = AtomicUsize::new(0); /// Get next queue id pub fn next_queue_id() -> EventQueueId { EventQueueId::from(NEXT_QUEUE_ID.fetch_add(1, Ordering::SeqCst)) } +pub fn next_counter_id() -> usize { + EVENTFD_TAG_BIT | NEXT_COUNTER_ID.fetch_add(1, Ordering::SeqCst) +} + +pub fn is_counter_id(id: usize) -> bool { + id & EVENTFD_TAG_BIT != 0 +} + // Current event queues static QUEUES: RwLock = RwLock::new(EventQueueList::with_hasher(DefaultHashBuilder::new())); +static COUNTERS: RwLock = + RwLock::new(EventCounterList::with_hasher(DefaultHashBuilder::new())); /// Get the event queues list, const pub fn queues(token: LockToken<'_, L0>) -> RwLockReadGuard<'_, L2, EventQueueList> { @@ -115,6 +254,14 @@ pub fn queues_mut(token: LockToken<'_, L0>) -> RwLockWriteGuard<'_, L2, EventQue QUEUES.write(token) } +pub fn counters(token: LockToken<'_, L0>) -> RwLockReadGuard<'_, L2, EventCounterList> { + COUNTERS.read(token) +} + +pub fn counters_mut(token: LockToken<'_, L0>) -> RwLockWriteGuard<'_, L2, EventCounterList> { + COUNTERS.write(token) +} + #[derive(Debug, PartialEq, Eq, Hash, PartialOrd, Ord)] pub struct RegKey { pub scheme: SchemeId, diff --git a/src/scheme/event.rs b/src/scheme/event.rs index 36efe5b2..c64b6bd0 100644 --- a/src/scheme/event.rs +++ b/src/scheme/event.rs @@ -1,9 +1,12 @@ -use alloc::sync::Arc; +use alloc::{sync::Arc, vec::Vec}; use syscall::{EventFlags, O_NONBLOCK}; use crate::{ context::file::InternalFlags, - event::{next_queue_id, queues, queues_mut, EventQueue, EventQueueId}, + event::{ + EventCounter, EventQueue, EventQueueId, counters, counters_mut, is_counter_id, + next_counter_id, next_queue_id, queues, queues_mut, + }, sync::CleanLockToken, syscall::{ data::Event, @@ -25,7 +28,7 @@ impl KernelScheme for EventScheme { fn kopenat( &self, id: usize, - _user_buf: StrOrBytes, + user_buf: StrOrBytes, _flags: usize, _fcntl_flags: u32, _ctx: CallerCtx, @@ -34,13 +37,53 @@ impl KernelScheme for EventScheme { if id != SCHEME_ROOT_ID { return Err(Error::new(EACCES)); } - let id = next_queue_id(); - queues_mut(token.token()).insert(id, Arc::new(EventQueue::new(id))); - Ok(OpenResult::SchemeLocal(id.get(), InternalFlags::empty())) + let path = user_buf.as_str().or(Err(Error::new(EINVAL)))?; + let path = path.trim_matches('/'); + + if path.is_empty() { + let id = next_queue_id(); + queues_mut(token.token()).insert(id, Arc::new(EventQueue::new(id))); + return Ok(OpenResult::SchemeLocal(id.get(), InternalFlags::empty())); + } + + let parts: Vec<&str> = path.split('/').collect(); + if matches!(parts.first(), Some(&"eventfd")) { + let init = match parts.get(1) { + Some(value) => value.parse::().map_err(|_| Error::new(EINVAL))?, + None => 0_u64, + }; + if init > u32::MAX as u64 { + return Err(Error::new(EINVAL)); + } + let semaphore = match parts.get(2) { + Some(value) => match *value { + "0" => Ok(false), + "1" => Ok(true), + _ => Err(Error::new(EINVAL)), + }?, + None => false, + }; + + let id = next_counter_id(); + counters_mut(token.token()).insert(id, Arc::new(EventCounter::new(id, init, semaphore))); + return Ok(OpenResult::SchemeLocal(id, InternalFlags::empty())); + } + + Err(Error::new(ENOENT)) } fn close(&self, id: usize, token: &mut CleanLockToken) -> Result<()> { + if is_counter_id(id) { + let counter = counters_mut(token.token()) + .remove(&id) + .ok_or(Error::new(EBADF))?; + if let Some(counter) = Arc::into_inner(counter) { + counter.into_drop(token.downgrade()); + } + return Ok(()); + } + let id = EventQueueId::from(id); let queue = queues_mut(token.token()) .remove(&id) @@ -59,6 +102,15 @@ impl KernelScheme for EventScheme { _stored_flags: u32, token: &mut CleanLockToken, ) -> Result { + if is_counter_id(id) { + let counter = { + let handles = counters(token.token()); + let handle = handles.get(&id).ok_or(Error::new(EBADF))?; + handle.clone() + }; + return counter.read(buf, flags & O_NONBLOCK as u32 == 0, token); + } + let id = EventQueueId::from(id); let queue = { @@ -74,10 +126,19 @@ impl KernelScheme for EventScheme { &self, id: usize, buf: UserSliceRo, - _flags: u32, + flags: u32, _stored_flags: u32, token: &mut CleanLockToken, ) -> Result { + if is_counter_id(id) { + let counter = { + let handles = counters(token.token()); + let handle = handles.get(&id).ok_or(Error::new(EBADF))?; + handle.clone() + }; + return counter.write(buf, flags & O_NONBLOCK as u32 == 0, token); + } + let id = EventQueueId::from(id); let queue = { @@ -98,8 +159,12 @@ impl KernelScheme for EventScheme { Ok(events_written * size_of::()) } - fn kfpath(&self, _id: usize, buf: UserSliceWo, _token: &mut CleanLockToken) -> Result { - buf.copy_common_bytes_from_slice(b"/scheme/event/") + fn kfpath(&self, id: usize, buf: UserSliceWo, _token: &mut CleanLockToken) -> Result { + if is_counter_id(id) { + buf.copy_common_bytes_from_slice(b"/scheme/event/eventfd") + } else { + buf.copy_common_bytes_from_slice(b"/scheme/event/") + } } fn fevent( @@ -108,6 +173,23 @@ impl KernelScheme for EventScheme { flags: EventFlags, token: &mut CleanLockToken, ) -> Result { + if is_counter_id(id) { + let counter = { + let handles = counters(token.token()); + let handle = handles.get(&id).ok_or(Error::new(EBADF))?; + handle.clone() + }; + + let mut ready = EventFlags::empty(); + if flags.contains(EventFlags::EVENT_READ) && counter.is_readable(token) { + ready |= EventFlags::EVENT_READ; + } + if flags.contains(EventFlags::EVENT_WRITE) && counter.is_writable(token) { + ready |= EventFlags::EVENT_WRITE; + } + return Ok(ready); + } + let id = EventQueueId::from(id); let queue = {