--- a/src/scheme/pipe.rs +++ b/src/scheme/pipe.rs @@ -1,5 +1,10 @@ -use alloc::{collections::VecDeque, sync::Arc, vec::Vec}; -use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use alloc::{ + collections::VecDeque, + string::{String, ToString}, + sync::Arc, + vec::Vec, +}; +use core::sync::atomic::{AtomicUsize, Ordering}; use syscall::{data::GlobalSchemes, CallFlags}; @@ -14,100 +19,262 @@ sync::{CleanLockToken, Mutex, RwLock, WaitCondition, L1}, syscall::{ data::Stat, - error::{Error, Result, EAGAIN, EBADF, EINTR, EINVAL, ENOENT, EPIPE}, - flag::{EventFlags, EVENT_READ, EVENT_WRITE, MODE_FIFO, O_NONBLOCK}, + error::{ + Error, Result, EAGAIN, EBADF, EEXIST, EINVAL, EINTR, ENOENT, ENOTDIR, EPIPE, + }, + flag::{ + EventFlags, EVENT_READ, EVENT_WRITE, MODE_FIFO, O_ACCMODE, O_DIRECTORY, + O_NONBLOCK, O_RDONLY, O_RDWR, O_STAT, O_WRONLY, + }, usercopy::{UserSliceRo, UserSliceRw, UserSliceWo}, }, }; use super::{CallerCtx, KernelScheme, OpenResult, SchemeExt, StrOrBytes}; -// TODO: Preallocate a number of scheme IDs, since there can only be *one* root namespace, and -// therefore only *one* pipe scheme. -static PIPE_NEXT_ID: AtomicUsize = AtomicUsize::new(0); - +static PIPE_NEXT_ID: AtomicUsize = AtomicUsize::new(1); + +#[derive(Clone)] enum Handle { - Pipe(Arc), + Endpoint(EndpointHandle), SchemeRoot, } -// TODO: SLOB? -static PIPES: RwLock> = +#[derive(Clone, Copy, Eq, PartialEq)] +enum EndpointKind { + Read, + Write, + ReadWrite, +} + +impl EndpointKind { + fn can_read(self) -> bool { + matches!(self, Self::Read | Self::ReadWrite) + } + + fn can_write(self) -> bool { + matches!(self, Self::Write | Self::ReadWrite) + } +} + +#[derive(Clone)] +struct EndpointHandle { + pipe: Arc, + kind: EndpointKind, + named: Option>, +} + +struct NamedPipe { + path: String, + mode: u16, + active: Mutex>>, +} + +static HANDLES: RwLock> = RwLock::new(HashMap::with_hasher(DefaultHashBuilder::new())); +static NAMED_PIPES: RwLock>> = + RwLock::new(HashMap::with_hasher(DefaultHashBuilder::new())); const MAX_QUEUE_SIZE: usize = 65536; -// In almost all places where Rust (and LLVM) uses pointers, they are limited to nonnegative isize, -// so this is fine. -const WRITE_NOT_READ_BIT: usize = 1; - -fn from_raw_id(id: usize) -> (bool, usize) { - (id & WRITE_NOT_READ_BIT != 0, id & !WRITE_NOT_READ_BIT) -} - -pub fn pipe(token: &mut CleanLockToken) -> Result<(usize, usize)> { - // Bit 0 is used for WRITE_NOT_READ_BIT - let id = PIPE_NEXT_ID.fetch_add(2, Ordering::Relaxed); - - PIPES.write(token.token()).insert( - id, - Handle::Pipe(Arc::new(Pipe { - queue: Mutex::new(VecDeque::new()), - read_condition: WaitCondition::new(), - write_condition: WaitCondition::new(), - writer_is_alive: AtomicBool::new(true), - reader_is_alive: AtomicBool::new(true), - has_run_dup: AtomicBool::new(false), - fd_queue: Mutex::new(VecDeque::new()), - })), - ); - - Ok((id, id | WRITE_NOT_READ_BIT)) -} - -pub struct PipeScheme; - -impl PipeScheme { - fn get_pipe(key: usize, token: &mut CleanLockToken) -> Result> { - PIPES - .read(token.token()) - .get(&key) - .and_then(|handle| match handle { - Handle::Pipe(pipe) => Some(Arc::clone(pipe)), +fn next_id() -> usize { + PIPE_NEXT_ID.fetch_add(1, Ordering::Relaxed) +} + +fn endpoint_kind_from_flags(flags: usize) -> Result { + match flags & O_ACCMODE { + O_RDONLY => Ok(EndpointKind::Read), + O_WRONLY => Ok(EndpointKind::Write), + O_RDWR => Ok(EndpointKind::ReadWrite), + _ => Err(Error::new(EINVAL)), + } +} + +fn validate_named_fifo_open(flags: usize) -> Result<()> { + if flags & O_DIRECTORY == O_DIRECTORY && flags & O_STAT != O_STAT { + return Err(Error::new(ENOTDIR)); + } + + let _ = endpoint_kind_from_flags(flags)?; + Ok(()) +} + +fn trigger_matching( + pipe: &Arc, + require_read: bool, + require_write: bool, + flags: EventFlags, + token: &mut CleanLockToken, +) { + let ids = { + let handles = HANDLES.read(token.token()); + handles + .iter() + .filter_map(|(id, handle)| match handle { + Handle::Endpoint(endpoint) + if Arc::ptr_eq(&endpoint.pipe, pipe) + && (!require_read || endpoint.kind.can_read()) + && (!require_write || endpoint.kind.can_write()) => + { + Some(*id) + } _ => None, }) + .collect::>() + }; + + for id in ids { + event::trigger(GlobalSchemes::Pipe.scheme_id(), id, flags, token); + } +} + +fn open_endpoint( + pipe: Arc, + kind: EndpointKind, + named: Option>, + token: &mut CleanLockToken, +) -> usize { + if kind.can_read() { + pipe.reader_count.fetch_add(1, Ordering::SeqCst); + } + if kind.can_write() { + pipe.writer_count.fetch_add(1, Ordering::SeqCst); + } + + let id = next_id(); + HANDLES.write(token.token()).insert( + id, + Handle::Endpoint(EndpointHandle { pipe, kind, named }), + ); + id +} + +fn drop_wait_conditions_if_possible(pipe: Arc, token: &mut CleanLockToken) { + if let Some(pipe) = Arc::into_inner(pipe) { + { + pipe.read_condition.into_drop(token); + } + { + pipe.write_condition.into_drop(token); + } + } +} + +pub fn pipe(token: &mut CleanLockToken) -> Result<(usize, usize)> { + let pipe = Arc::new(Pipe::new()); + let read_id = open_endpoint(Arc::clone(&pipe), EndpointKind::Read, None, token); + let write_id = open_endpoint(pipe, EndpointKind::Write, None, token); + + Ok((read_id, write_id)) +} + +pub fn named_pipe_exists(path: &str, token: &mut CleanLockToken) -> bool { + NAMED_PIPES.read(token.token()).contains_key(path) +} + +pub fn create_named_pipe( + path: &str, + display_path: &str, + mode: u16, + flags: usize, + token: &mut CleanLockToken, +) -> Result { + validate_named_fifo_open(flags)?; + + let named = { + let mut named_pipes = NAMED_PIPES.write(token.token()); + if named_pipes.contains_key(path) { + return Err(Error::new(EEXIST)); + } + + let named = Arc::new(NamedPipe { + path: display_path.to_string(), + mode, + active: Mutex::new(None), + }); + named_pipes.insert(path.to_string(), Arc::clone(&named)); + named + }; + + let kind = endpoint_kind_from_flags(flags)?; + let pipe = Arc::new(Pipe::new()); + *named.active.lock(token.token()) = Some(Arc::clone(&pipe)); + + Ok(open_endpoint(pipe, kind, Some(named), token)) +} + +pub fn open_named_pipe(path: &str, flags: usize, token: &mut CleanLockToken) -> Result> { + validate_named_fifo_open(flags)?; + + let named = match NAMED_PIPES.read(token.token()).get(path) { + Some(named) => Arc::clone(named), + None => return Ok(None), + }; + + let kind = endpoint_kind_from_flags(flags)?; + let pipe = { + let mut active = named.active.lock(token.token()); + match active.as_ref() { + Some(pipe) => Arc::clone(pipe), + None => { + let pipe = Arc::new(Pipe::new()); + *active = Some(Arc::clone(&pipe)); + pipe + } + } + }; + + Ok(Some(open_endpoint(pipe, kind, Some(named), token))) +} + +pub fn unlink_named_pipe(path: &str, token: &mut CleanLockToken) -> bool { + NAMED_PIPES.write(token.token()).remove(path).is_some() +} + +pub struct PipeScheme; + +impl PipeScheme { + fn get_endpoint(id: usize, token: &mut CleanLockToken) -> Result { + HANDLES + .read(token.token()) + .get(&id) + .and_then(|handle| match handle { + Handle::Endpoint(endpoint) => Some(endpoint.clone()), + Handle::SchemeRoot => None, + }) .ok_or(Error::new(EBADF)) } } impl KernelScheme for PipeScheme { fn scheme_root(&self, token: &mut CleanLockToken) -> Result { - let id = PIPE_NEXT_ID.fetch_add(2, Ordering::Relaxed); - PIPES.write(token.token()).insert(id, Handle::SchemeRoot); + let id = next_id(); + HANDLES.write(token.token()).insert(id, Handle::SchemeRoot); Ok(id) } + fn fevent( &self, id: usize, flags: EventFlags, token: &mut CleanLockToken, ) -> Result { - let (is_writer_not_reader, key) = from_raw_id(id); - let pipe = Self::get_pipe(key, token)?; + let endpoint = Self::get_endpoint(id, token)?; let mut ready = EventFlags::empty(); - if is_writer_not_reader + if endpoint.kind.can_write() && flags.contains(EVENT_WRITE) - && (pipe.queue.lock(token.token()).len() <= MAX_QUEUE_SIZE - || !pipe.reader_is_alive.load(Ordering::Acquire)) + && (endpoint.pipe.queue.lock(token.token()).len() <= MAX_QUEUE_SIZE + || endpoint.pipe.reader_count.load(Ordering::Acquire) == 0) { ready |= EventFlags::EVENT_WRITE; } - if !is_writer_not_reader + + if endpoint.kind.can_read() && flags.contains(EVENT_READ) - && (!pipe.queue.lock(token.token()).is_empty() - || !pipe.writer_is_alive.load(Ordering::Acquire)) + && (!endpoint.pipe.queue.lock(token.token()).is_empty() + || endpoint.pipe.writer_count.load(Ordering::Acquire) == 0) { ready |= EventFlags::EVENT_READ; } @@ -116,46 +283,48 @@ } fn close(&self, id: usize, token: &mut CleanLockToken) -> Result<()> { - let (is_write_not_read, key) = from_raw_id(id); - - let pipe = Self::get_pipe(key, token)?; - let scheme_id = GlobalSchemes::Pipe.scheme_id(); - - let can_remove = if is_write_not_read { - pipe.writer_is_alive.store(false, Ordering::SeqCst); - event::trigger(scheme_id, key, EVENT_READ, token); - pipe.read_condition.notify(token); - - !pipe.reader_is_alive.load(Ordering::SeqCst) - } else { - pipe.reader_is_alive.store(false, Ordering::SeqCst); - event::trigger(scheme_id, key | WRITE_NOT_READ_BIT, EVENT_WRITE, token); - pipe.write_condition.notify(token); - - !pipe.writer_is_alive.load(Ordering::SeqCst) + let handle = HANDLES + .write(token.token()) + .remove(&id) + .ok_or(Error::new(EBADF))?; + + let Handle::Endpoint(endpoint) = handle else { + return Ok(()); }; - if can_remove { - let handle = PIPES.write(token.token()).remove(&key); - if let Some(Handle::Pipe(pipe)) = handle - && let Some(pipe) = Arc::into_inner(pipe) - { + let mut last_reader = false; + let mut last_writer = false; + + if endpoint.kind.can_read() { + last_reader = endpoint.pipe.reader_count.fetch_sub(1, Ordering::SeqCst) == 1; + } + if endpoint.kind.can_write() { + last_writer = endpoint.pipe.writer_count.fetch_sub(1, Ordering::SeqCst) == 1; + } + + if last_writer { + trigger_matching(&endpoint.pipe, true, false, EVENT_READ, token); + endpoint.pipe.read_condition.notify(token); + } + if last_reader { + trigger_matching(&endpoint.pipe, false, true, EVENT_WRITE, token); + endpoint.pipe.write_condition.notify(token); + } + + let no_readers = endpoint.pipe.reader_count.load(Ordering::SeqCst) == 0; + let no_writers = endpoint.pipe.writer_count.load(Ordering::SeqCst) == 0; + if no_readers && no_writers { + if let Some(named) = endpoint.named { + let mut active = named.active.lock(token.token()); + if active + .as_ref() + .is_some_and(|active_pipe| Arc::ptr_eq(active_pipe, &endpoint.pipe)) { - pipe.read_condition.into_drop(token); + *active = None; } - { - pipe.write_condition.into_drop(token); - } - } - } - - if let Some(pipe) = Arc::into_inner(pipe) { - { - pipe.read_condition.into_drop(token); - } - { - pipe.write_condition.into_drop(token); - } + } + + drop_wait_conditions_if_possible(endpoint.pipe, token); } Ok(()) @@ -168,9 +337,9 @@ _ctx: CallerCtx, token: &mut CleanLockToken, ) -> Result { - let (is_writer_not_reader, key) = from_raw_id(old_id); - - if is_writer_not_reader { + let endpoint = Self::get_endpoint(old_id, token)?; + + if !endpoint.kind.can_read() { return Err(Error::new(EBADF)); } @@ -180,17 +349,17 @@ return Err(Error::new(EINVAL)); } - let pipe = Self::get_pipe(key, token)?; - - if pipe.has_run_dup.swap(true, Ordering::SeqCst) { - return Err(Error::new(EBADF)); - } - Ok(OpenResult::SchemeLocal( - key | WRITE_NOT_READ_BIT, + open_endpoint( + Arc::clone(&endpoint.pipe), + EndpointKind::Write, + endpoint.named, + token, + ), InternalFlags::empty(), )) } + fn kopenat( &self, id: usize, @@ -200,40 +369,47 @@ _ctx: CallerCtx, token: &mut CleanLockToken, ) -> Result { - let (_, key) = from_raw_id(id); - - { - let guard = PIPES.read(token.token()); - if let Some(Handle::SchemeRoot) = guard.get(&key) { - } else if let Some(Handle::Pipe(pipe_arc)) = guard.get(&key) { - let pipe = Arc::clone(pipe_arc); - drop(guard); - - if user_buf.as_bytes() == b"write" { - return Err(Error::new(EINVAL)); + let is_scheme_root = { + let handles = HANDLES.read(token.token()); + match handles.get(&id) { + Some(Handle::SchemeRoot) => true, + Some(Handle::Endpoint(_)) => false, + None => return Err(Error::new(EBADF)), + } + }; + + if is_scheme_root { + let path = user_buf.as_str().or(Err(Error::new(EINVAL)))?; + if !path.trim_start_matches('/').is_empty() { + return Err(Error::new(ENOENT)); } - if pipe.has_run_dup.swap(true, Ordering::SeqCst) { - return Err(Error::new(EBADF)); - } - + let pipe = Arc::new(Pipe::new()); return Ok(OpenResult::SchemeLocal( - key | WRITE_NOT_READ_BIT, + open_endpoint(pipe, EndpointKind::Read, None, token), InternalFlags::empty(), )); - } else { - return Err(Error::new(EBADF)); - } - } - - let path = user_buf.as_str().or(Err(Error::new(EINVAL)))?; - if !path.trim_start_matches('/').is_empty() { - return Err(Error::new(ENOENT)); - } - - let (read_id, _) = pipe(token)?; - - Ok(OpenResult::SchemeLocal(read_id, InternalFlags::empty())) + } + + let endpoint = Self::get_endpoint(id, token)?; + if !endpoint.kind.can_read() { + return Err(Error::new(EBADF)); + } + + let path = user_buf.as_bytes(); + if !path.is_empty() && path != b"write" { + return Err(Error::new(EINVAL)); + } + + Ok(OpenResult::SchemeLocal( + open_endpoint( + Arc::clone(&endpoint.pipe), + EndpointKind::Write, + endpoint.named, + token, + ), + InternalFlags::empty(), + )) } fn kread( @@ -244,16 +420,15 @@ _stored_flags: u32, token: &mut CleanLockToken, ) -> Result { - let (is_write_not_read, key) = from_raw_id(id); - - if is_write_not_read { + let endpoint = Self::get_endpoint(id, token)?; + + if !endpoint.kind.can_read() { return Err(Error::new(EBADF)); } - let pipe = Self::get_pipe(key, token)?; loop { - let vec = pipe.queue.lock(token.token()); - let (mut vec, mut token) = vec.into_split(); + let vec = endpoint.pipe.queue.lock(token.token()); + let (mut vec, mut lock_token) = vec.into_split(); let (s1, s2) = vec.as_slices(); let s1_count = core::cmp::min(user_buf.len(), s1.len()); @@ -273,28 +448,34 @@ let _ = vec.drain(..bytes_read); if bytes_read > 0 { - event::trigger_locked( - GlobalSchemes::Pipe.scheme_id(), - key | WRITE_NOT_READ_BIT, - EVENT_WRITE, - token.token(), - ); - pipe.write_condition.notify_locked(token.token()); + drop(vec); + drop(lock_token); + trigger_matching(&endpoint.pipe, false, true, EVENT_WRITE, token); + endpoint.pipe.write_condition.notify(token); return Ok(bytes_read); - } else if user_buf.is_empty() { + } + + if user_buf.is_empty() { return Ok(0); } - if !pipe.writer_is_alive.load(Ordering::SeqCst) { + if endpoint.pipe.writer_count.load(Ordering::SeqCst) == 0 { return Ok(0); - } else if fcntl_flags & O_NONBLOCK as u32 != 0 { + } + if fcntl_flags & O_NONBLOCK as u32 != 0 { return Err(Error::new(EAGAIN)); - } else if !pipe.read_condition.wait(vec, "PipeRead::read", &mut token) { + } + if !endpoint + .pipe + .read_condition + .wait(vec, "PipeRead::read", &mut lock_token) + { return Err(Error::new(EINTR)); } } } + fn kwrite( &self, id: usize, @@ -303,18 +484,17 @@ _stored_flags: u32, token: &mut CleanLockToken, ) -> Result { - let (is_write_not_read, key) = from_raw_id(id); - - if !is_write_not_read { + let endpoint = Self::get_endpoint(id, token)?; + + if !endpoint.kind.can_write() { return Err(Error::new(EBADF)); } - let pipe = Self::get_pipe(key, token)?; loop { - let vec = pipe.queue.lock(token.token()); - let (mut vec, mut token) = vec.into_split(); - - if !pipe.reader_is_alive.load(Ordering::Relaxed) { + let vec = endpoint.pipe.queue.lock(token.token()); + let (mut vec, mut lock_token) = vec.into_split(); + + if endpoint.pipe.reader_count.load(Ordering::Relaxed) == 0 { return Err(Error::new(EPIPE)); } @@ -329,7 +509,6 @@ let mut bytes_written = 0; - // TODO: Modify VecDeque so that the unwritten portions can be accessed directly? for (idx, chunk) in src_buf.in_variable_chunks(TMPBUF_SIZE).enumerate() { let chunk_byte_count = match chunk.copy_common_bytes_to_slice(&mut tmp_buf) { Ok(c) => c, @@ -341,41 +520,52 @@ } if bytes_written > 0 { - event::trigger_locked( - GlobalSchemes::Pipe.scheme_id(), - key, - EVENT_READ, - token.token(), - ); - pipe.read_condition.notify_locked(token.token()); + drop(vec); + drop(lock_token); + trigger_matching(&endpoint.pipe, true, false, EVENT_READ, token); + endpoint.pipe.read_condition.notify(token); return Ok(bytes_written); - } else if user_buf.is_empty() { + } + + if user_buf.is_empty() { return Ok(0); } if fcntl_flags & O_NONBLOCK as u32 != 0 { return Err(Error::new(EAGAIN)); - } else if !pipe + } + if !endpoint + .pipe .write_condition - .wait(vec, "PipeWrite::write", &mut token) + .wait(vec, "PipeWrite::write", &mut lock_token) { return Err(Error::new(EINTR)); } } } - fn kfpath(&self, _id: usize, buf: UserSliceWo, _token: &mut CleanLockToken) -> Result { - //TODO: construct useful path? - buf.copy_common_bytes_from_slice("/scheme/pipe/".as_bytes()) - } - fn kfstat(&self, _id: usize, buf: UserSliceWo, _token: &mut CleanLockToken) -> Result<()> { + + fn kfpath(&self, id: usize, buf: UserSliceWo, token: &mut CleanLockToken) -> Result { + let endpoint = Self::get_endpoint(id, token)?; + if let Some(named) = endpoint.named { + buf.copy_common_bytes_from_slice(named.path.as_bytes()) + } else { + buf.copy_common_bytes_from_slice("/scheme/pipe/".as_bytes()) + } + } + + fn kfstat(&self, id: usize, buf: UserSliceWo, token: &mut CleanLockToken) -> Result<()> { + let endpoint = Self::get_endpoint(id, token)?; + let mode = endpoint.named.map_or(0o666, |named| named.mode); + buf.copy_exactly(&Stat { - st_mode: MODE_FIFO | 0o666, + st_mode: MODE_FIFO | mode, ..Default::default() })?; Ok(()) } + fn kfdwrite( &self, id: usize, @@ -385,23 +575,17 @@ _metadata: &[u64], token: &mut CleanLockToken, ) -> Result { - let (is_write_not_read, key) = from_raw_id(id); - - if !is_write_not_read { + let endpoint = Self::get_endpoint(id, token)?; + + if !endpoint.kind.can_write() { return Err(Error::new(EBADF)); } - let pipe = match Self::get_pipe(key, token) { - Ok(p) => p, - Err(e) => { - return Err(e); - } - }; loop { - let vec = pipe.fd_queue.lock(token.token()); - let (mut vec, mut token) = vec.into_split(); - - if !pipe.reader_is_alive.load(Ordering::Relaxed) { + let vec = endpoint.pipe.fd_queue.lock(token.token()); + let (mut vec, mut lock_token) = vec.into_split(); + + if endpoint.pipe.reader_count.load(Ordering::Relaxed) == 0 { return Err(Error::new(EPIPE)); } if descs.is_empty() { @@ -421,25 +605,24 @@ let fds_written = vec.len() - before_len; if fds_written > 0 { - event::trigger_locked( - GlobalSchemes::Pipe.scheme_id(), - key, - EVENT_READ, - token.token(), - ); - pipe.read_condition.notify_locked(token.token()); + drop(vec); + drop(lock_token); + trigger_matching(&endpoint.pipe, true, false, EVENT_READ, token); + endpoint.pipe.read_condition.notify(token); return Ok(fds_written); } - if !pipe + if !endpoint + .pipe .write_condition - .wait(vec, "PipeWrite::write", &mut token) + .wait(vec, "PipeWrite::write", &mut lock_token) { return Err(Error::new(EINTR)); } } } + fn kfdread( &self, id: usize, @@ -448,25 +631,19 @@ _metadata: &[u64], token: &mut CleanLockToken, ) -> Result { - let (is_write_not_read, key) = from_raw_id(id); - - if is_write_not_read { + let endpoint = Self::get_endpoint(id, token)?; + + if !endpoint.kind.can_read() { return Err(Error::new(EBADF)); } - let pipe = match Self::get_pipe(key, token) { - Ok(p) => p, - Err(e) => { - return Err(e); - } - }; if payload.is_empty() { return Ok(0); } loop { - let vec = pipe.fd_queue.lock(token.token()); - let (mut vec, mut token) = vec.into_split(); + let vec = endpoint.pipe.fd_queue.lock(token.token()); + let (mut vec, mut lock_token) = vec.into_split(); let fds_available = vec.len(); let max_fds_read = payload.len() / size_of::(); @@ -479,31 +656,33 @@ fds_to_transfer, payload, flags.contains(CallFlags::FD_CLOEXEC), - &mut token, + &mut lock_token, )?; } else { bulk_add_fds( fds_to_transfer, payload, flags.contains(CallFlags::FD_CLOEXEC), - &mut token, + &mut lock_token, )?; } - event::trigger_locked( - GlobalSchemes::Pipe.scheme_id(), - key | WRITE_NOT_READ_BIT, - EVENT_WRITE, - token.token(), - ); - pipe.write_condition.notify_locked(token.token()); + drop(vec); + drop(lock_token); + trigger_matching(&endpoint.pipe, false, true, EVENT_WRITE, token); + endpoint.pipe.write_condition.notify(token); return Ok(fds_to_read); } - if !pipe.writer_is_alive.load(Ordering::SeqCst) { + if endpoint.pipe.writer_count.load(Ordering::SeqCst) == 0 { return Ok(0); - } else if !pipe.read_condition.wait(vec, "PipeRead::read", &mut token) { + } + if !endpoint + .pipe + .read_condition + .wait(vec, "PipeRead::read", &mut lock_token) + { return Err(Error::new(EINTR)); } } @@ -511,11 +690,23 @@ } pub struct Pipe { - read_condition: WaitCondition, // signals whether there are available bytes to read - write_condition: WaitCondition, // signals whether there is room for additional bytes + read_condition: WaitCondition, + write_condition: WaitCondition, queue: Mutex>, - reader_is_alive: AtomicBool, // starts set, unset when reader closes - writer_is_alive: AtomicBool, // starts set, unset when writer closes - has_run_dup: AtomicBool, + reader_count: AtomicUsize, + writer_count: AtomicUsize, fd_queue: Mutex>>, } + +impl Pipe { + fn new() -> Self { + Self { + read_condition: WaitCondition::new(), + write_condition: WaitCondition::new(), + queue: Mutex::new(VecDeque::new()), + reader_count: AtomicUsize::new(0), + writer_count: AtomicUsize::new(0), + fd_queue: Mutex::new(VecDeque::new()), + } + } +} --- a/src/syscall/fs.rs +++ b/src/syscall/fs.rs @@ -2,7 +2,7 @@ use core::num::NonZeroUsize; -use alloc::{string::String, sync::Arc, vec::Vec}; +use alloc::{format, string::{String, ToString}, sync::Arc, vec::Vec}; use redox_path::RedoxPath; use crate::{ @@ -12,9 +12,9 @@ memory::{AddrSpace, GenericFlusher, Grant, PageSpan, TlbShootdownActions}, }, memory::{Page, VirtualAddress, PAGE_SIZE}, - scheme::{self, FileHandle, KernelScheme, OpenResult, StrOrBytes}, + scheme::{self, pipe, FileHandle, KernelScheme, OpenResult, SchemeExt, StrOrBytes}, sync::{CleanLockToken, RwLock}, - syscall::{data::Stat, error::*, flag::*}, + syscall::{data::{GlobalSchemes, Stat}, error::*, flag::*}, }; use super::usercopy::{UserSlice, UserSliceRo, UserSliceRw, UserSliceWo}; @@ -62,55 +62,29 @@ // TODO: Define elsewhere const PATH_MAX: usize = PAGE_SIZE; -pub fn openat( - fh: FileHandle, - raw_path: UserSliceRo, +fn fifo_path_key(scheme_id: scheme::SchemeId, number: usize, path: &str) -> String { + if path.starts_with('/') { + path.to_string() + } else { + format!("@fifo:{}:{}:{}", scheme_id.get(), number, path) + } +} + +fn install_open_result( + scheme_id: scheme::SchemeId, flags: usize, - fcntl_flags: u32, - euid: u32, - egid: u32, + open_result: OpenResult, token: &mut CleanLockToken, ) -> Result { - let path_buf = copy_path_to_buf(raw_path, PATH_MAX)?; - - let (scheme_id, number) = { - let current_lock = context::current(); - let mut current = current_lock.read(token.token()); - let (context, mut token) = current.token_split(); - let pipe = context.get_file(fh, &mut token).ok_or(Error::new(EBADF))?; - let desc = pipe.description.read(token.token()); - (desc.scheme, desc.number) - }; - - let caller_ctx = context::current() - .read(token.token()) - .caller_ctx() - .filter_uid_gid(euid, egid); - - let new_description = { - let scheme = scheme::get_scheme(token.token(), scheme_id)?; - - let res = scheme.kopenat( + let new_description = match open_result { + OpenResult::SchemeLocal(number, internal_flags) => Arc::new(RwLock::new(FileDescription { + offset: 0, + internal_flags, + scheme: scheme_id, number, - StrOrBytes::from_str(&path_buf), - flags, - fcntl_flags, - caller_ctx, - token, - ); - - match res? { - OpenResult::SchemeLocal(number, internal_flags) => { - Arc::new(RwLock::new(FileDescription { - offset: 0, - internal_flags, - scheme: scheme_id, - number, - flags: (flags & !O_CLOEXEC) as u32, - })) - } - OpenResult::External(desc) => desc, - } + flags: (flags & !O_CLOEXEC) as u32, + })), + OpenResult::External(desc) => desc, }; let current_lock = context::current(); @@ -126,6 +100,100 @@ ) .ok_or(Error::new(EMFILE)) } + +fn path_exists_in_scheme( + scheme: &dyn KernelScheme, + number: usize, + path: &str, + caller_ctx: scheme::CallerCtx, + token: &mut CleanLockToken, +) -> Result { + match scheme.kopenat(number, StrOrBytes::from_str(path), O_STAT, 0, caller_ctx, token) { + Ok(OpenResult::SchemeLocal(number, _)) => { + let _ = scheme.close(number, token); + Ok(true) + } + Ok(OpenResult::External(_)) => Ok(true), + Err(err) if err.errno == ENOENT => Ok(false), + Err(err) => Err(err), + } +} + +pub fn openat( + fh: FileHandle, + raw_path: UserSliceRo, + flags: usize, + fcntl_flags: u32, + euid: u32, + egid: u32, + token: &mut CleanLockToken, +) -> Result { + let path_buf = copy_path_to_buf(raw_path, PATH_MAX)?; + + let (scheme_id, number) = { + let current_lock = context::current(); + let mut current = current_lock.read(token.token()); + let (context, mut token) = current.token_split(); + let pipe = context.get_file(fh, &mut token).ok_or(Error::new(EBADF))?; + let desc = pipe.description.read(token.token()); + (desc.scheme, desc.number) + }; + + let caller_ctx = context::current() + .read(token.token()) + .caller_ctx() + .filter_uid_gid(euid, egid); + + let fifo_mode_requested = flags & MODE_FIFO as usize == MODE_FIFO as usize; + let fifo_key = fifo_path_key(scheme_id, number, &path_buf); + + if pipe::named_pipe_exists(&fifo_key, token) { + if flags & O_EXCL == O_EXCL && flags & O_CREAT == O_CREAT { + return Err(Error::new(EEXIST)); + } + if fifo_mode_requested && flags & O_CREAT == O_CREAT { + return Err(Error::new(EEXIST)); + } + + let pipe_number = pipe::open_named_pipe(&fifo_key, flags, token)?.ok_or(Error::new(ENOENT))?; + return install_open_result( + GlobalSchemes::Pipe.scheme_id(), + flags, + OpenResult::SchemeLocal(pipe_number, InternalFlags::empty()), + token, + ); + } + + let scheme = scheme::get_scheme(token.token(), scheme_id)?; + + if fifo_mode_requested && flags & O_CREAT == O_CREAT { + if path_exists_in_scheme(&*scheme, number, &path_buf, caller_ctx, token)? { + return Err(Error::new(EEXIST)); + } + + let mode = u16::try_from(flags & 0o7777).map_err(|_| Error::new(EINVAL))?; + let pipe_number = + pipe::create_named_pipe(&fifo_key, &path_buf, mode, flags, token)?; + + return install_open_result( + GlobalSchemes::Pipe.scheme_id(), + flags, + OpenResult::SchemeLocal(pipe_number, InternalFlags::empty()), + token, + ); + } + + let open_result = scheme.kopenat( + number, + StrOrBytes::from_str(&path_buf), + flags, + fcntl_flags, + caller_ctx, + token, + )?; + + install_open_result(scheme_id, flags, open_result, token) +} /// Unlinkat syscall pub fn unlinkat( fh: FileHandle, @@ -152,6 +220,10 @@ .read(token.token()) .caller_ctx() .filter_uid_gid(euid, egid); + + if pipe::unlink_named_pipe(&fifo_path_key(scheme_id, number, &path_buf), token) { + return Ok(()); + } /* let mut path_buf = BorrowedHtBuf::head()?;