diff --git a/src/event.rs b/src/event.rs index 7398145a..f4f57c23 100644 --- a/src/event.rs +++ b/src/event.rs @@ -1,5 +1,5 @@ use alloc::sync::Arc; -use core::sync::atomic::{AtomicUsize, Ordering}; +use core::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; use hashbrown::{hash_map::DefaultHashBuilder, HashMap}; use smallvec::SmallVec; use syscall::data::GlobalSchemes; @@ -23,6 +23,7 @@ int_like!(EventQueueId, AtomicEventQueueId, usize, AtomicUsize); pub struct EventQueue { id: EventQueueId, queue: WaitQueue, + pub eventfd: Option<(AtomicU64, bool)>, // (counter, semaphore_mode) } impl EventQueue { @@ -30,6 +31,15 @@ impl EventQueue { EventQueue { id, queue: WaitQueue::new(), + eventfd: None, + } + } + + pub fn new_eventfd(id: EventQueueId, initval: u64, semaphore: bool) -> EventQueue { + EventQueue { + id, + queue: WaitQueue::new(), + eventfd: Some((AtomicU64::new(initval), semaphore)), } } diff --git a/src/scheme/event.rs b/src/scheme/event.rs index 36efe5b2..62e46c99 100644 --- a/src/scheme/event.rs +++ b/src/scheme/event.rs @@ -25,12 +25,26 @@ impl KernelScheme for EventScheme { fn kopenat( &self, id: usize, - _user_buf: StrOrBytes, + user_buf: StrOrBytes, _flags: usize, _fcntl_flags: u32, _ctx: CallerCtx, token: &mut CleanLockToken, ) -> Result { + let path = match &user_buf { + StrOrBytes::Str(s) | StrOrBytes::Bytes(s) => { + core::str::from_utf8(s).unwrap_or("") + } + }; + if path.starts_with("eventfd/") { + let rest = &path[8..]; // after "eventfd/" + let mut parts = rest.split('/'); + let initval: u64 = parts.next().and_then(|s| s.parse().ok()).unwrap_or(0); + let sem: bool = parts.next().and_then(|s| s.parse().ok()).unwrap_or(false); + let id = next_queue_id(); + queues_mut(token.token()).insert(id, Arc::new(EventQueue::new_eventfd(id, initval, sem))); + return Ok(OpenResult::SchemeLocal(id.get(), InternalFlags::empty())); + } if id != SCHEME_ROOT_ID { return Err(Error::new(EACCES)); } @@ -67,6 +81,31 @@ impl KernelScheme for EventScheme { handle.clone() }; + if let Some((ref counter, semaphore)) = queue.eventfd { + let is_nonblock = flags & O_NONBLOCK as u32 != 0; + if semaphore { + let val = counter.load(Ordering::Acquire); + if val == 0 { + if is_nonblock { return Err(Error::new(EAGAIN)); } + // Blocking wait not implemented for eventfd in kernel + return Err(Error::new(EAGAIN)); + } + if counter.compare_exchange(val, val - 1, Ordering::AcqRel, Ordering::Relaxed).is_ok() { + let one: u64 = 1; + buf.copy_from_slice(unsafe { core::slice::from_raw_parts(&one as *const u64 as *const u8, 8) })?; + return Ok(8); + } + return Err(Error::new(EAGAIN)); + } else { + let val = counter.swap(0, Ordering::AcqRel); + if val == 0 && is_nonblock { + return Err(Error::new(EAGAIN)); + } + buf.copy_from_slice(unsafe { core::slice::from_raw_parts(&val as *const u64 as *const u8, 8) })?; + return Ok(8); + } + } + queue.read(buf, flags & O_NONBLOCK as u32 == 0, token) } @@ -85,6 +124,19 @@ impl KernelScheme for EventScheme { let handle = handles.get(&id).ok_or(Error::new(EBADF))?; handle.clone() }; + + if let Some((ref counter, _semaphore)) = queue.eventfd { + if buf.len() >= 8 { + let mut bytes = [0u8; 8]; + buf.copy_to_slice(&mut bytes)?; + let val = u64::from_ne_bytes(bytes); + if val == u64::MAX { return Err(Error::new(EINVAL)); } + counter.fetch_add(val, Ordering::AcqRel); + return Ok(8); + } + return Err(Error::new(EINVAL)); + } + let mut events_written = 0; for chunk in buf.in_exact_chunks(size_of::()) {