7dfb749b3d
Absorb redundant kernel patches into v2 carriers, remove debug and suspend patches no longer needed. Wire v2 patches in recipe.toml.
1151 lines
36 KiB
Diff
1151 lines
36 KiB
Diff
--- a/src/scheme/pipe.rs
|
|
+++ b/src/scheme/pipe.rs
|
|
@@ -1,5 +1,10 @@
|
|
-use alloc::{collections::VecDeque, sync::Arc, vec::Vec};
|
|
-use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
|
+use alloc::{
|
|
+ collections::VecDeque,
|
|
+ string::{String, ToString},
|
|
+ sync::Arc,
|
|
+ vec::Vec,
|
|
+};
|
|
+use core::sync::atomic::{AtomicUsize, Ordering};
|
|
|
|
use syscall::{data::GlobalSchemes, CallFlags};
|
|
|
|
@@ -14,100 +19,262 @@
|
|
sync::{CleanLockToken, Mutex, RwLock, WaitCondition, L1},
|
|
syscall::{
|
|
data::Stat,
|
|
- error::{Error, Result, EAGAIN, EBADF, EINTR, EINVAL, ENOENT, EPIPE},
|
|
- flag::{EventFlags, EVENT_READ, EVENT_WRITE, MODE_FIFO, O_NONBLOCK},
|
|
+ error::{
|
|
+ Error, Result, EAGAIN, EBADF, EEXIST, EINVAL, EINTR, ENOENT, ENOTDIR, EPIPE,
|
|
+ },
|
|
+ flag::{
|
|
+ EventFlags, EVENT_READ, EVENT_WRITE, MODE_FIFO, O_ACCMODE, O_DIRECTORY,
|
|
+ O_NONBLOCK, O_RDONLY, O_RDWR, O_STAT, O_WRONLY,
|
|
+ },
|
|
usercopy::{UserSliceRo, UserSliceRw, UserSliceWo},
|
|
},
|
|
};
|
|
|
|
use super::{CallerCtx, KernelScheme, OpenResult, SchemeExt, StrOrBytes};
|
|
|
|
-// TODO: Preallocate a number of scheme IDs, since there can only be *one* root namespace, and
|
|
-// therefore only *one* pipe scheme.
|
|
-static PIPE_NEXT_ID: AtomicUsize = AtomicUsize::new(0);
|
|
-
|
|
+static PIPE_NEXT_ID: AtomicUsize = AtomicUsize::new(1);
|
|
+
|
|
+#[derive(Clone)]
|
|
enum Handle {
|
|
- Pipe(Arc<Pipe>),
|
|
+ Endpoint(EndpointHandle),
|
|
SchemeRoot,
|
|
}
|
|
|
|
-// TODO: SLOB?
|
|
-static PIPES: RwLock<L1, HashMap<usize, Handle>> =
|
|
+#[derive(Clone, Copy, Eq, PartialEq)]
|
|
+enum EndpointKind {
|
|
+ Read,
|
|
+ Write,
|
|
+ ReadWrite,
|
|
+}
|
|
+
|
|
+impl EndpointKind {
|
|
+ fn can_read(self) -> bool {
|
|
+ matches!(self, Self::Read | Self::ReadWrite)
|
|
+ }
|
|
+
|
|
+ fn can_write(self) -> bool {
|
|
+ matches!(self, Self::Write | Self::ReadWrite)
|
|
+ }
|
|
+}
|
|
+
|
|
+#[derive(Clone)]
|
|
+struct EndpointHandle {
|
|
+ pipe: Arc<Pipe>,
|
|
+ kind: EndpointKind,
|
|
+ named: Option<Arc<NamedPipe>>,
|
|
+}
|
|
+
|
|
+struct NamedPipe {
|
|
+ path: String,
|
|
+ mode: u16,
|
|
+ active: Mutex<L1, Option<Arc<Pipe>>>,
|
|
+}
|
|
+
|
|
+static HANDLES: RwLock<L1, HashMap<usize, Handle>> =
|
|
RwLock::new(HashMap::with_hasher(DefaultHashBuilder::new()));
|
|
+static NAMED_PIPES: RwLock<L1, HashMap<String, Arc<NamedPipe>>> =
|
|
+ RwLock::new(HashMap::with_hasher(DefaultHashBuilder::new()));
|
|
|
|
const MAX_QUEUE_SIZE: usize = 65536;
|
|
|
|
-// In almost all places where Rust (and LLVM) uses pointers, they are limited to nonnegative isize,
|
|
-// so this is fine.
|
|
-const WRITE_NOT_READ_BIT: usize = 1;
|
|
-
|
|
-fn from_raw_id(id: usize) -> (bool, usize) {
|
|
- (id & WRITE_NOT_READ_BIT != 0, id & !WRITE_NOT_READ_BIT)
|
|
-}
|
|
-
|
|
-pub fn pipe(token: &mut CleanLockToken) -> Result<(usize, usize)> {
|
|
- // Bit 0 is used for WRITE_NOT_READ_BIT
|
|
- let id = PIPE_NEXT_ID.fetch_add(2, Ordering::Relaxed);
|
|
-
|
|
- PIPES.write(token.token()).insert(
|
|
- id,
|
|
- Handle::Pipe(Arc::new(Pipe {
|
|
- queue: Mutex::new(VecDeque::new()),
|
|
- read_condition: WaitCondition::new(),
|
|
- write_condition: WaitCondition::new(),
|
|
- writer_is_alive: AtomicBool::new(true),
|
|
- reader_is_alive: AtomicBool::new(true),
|
|
- has_run_dup: AtomicBool::new(false),
|
|
- fd_queue: Mutex::new(VecDeque::new()),
|
|
- })),
|
|
- );
|
|
-
|
|
- Ok((id, id | WRITE_NOT_READ_BIT))
|
|
-}
|
|
-
|
|
-pub struct PipeScheme;
|
|
-
|
|
-impl PipeScheme {
|
|
- fn get_pipe(key: usize, token: &mut CleanLockToken) -> Result<Arc<Pipe>> {
|
|
- PIPES
|
|
- .read(token.token())
|
|
- .get(&key)
|
|
- .and_then(|handle| match handle {
|
|
- Handle::Pipe(pipe) => Some(Arc::clone(pipe)),
|
|
+fn next_id() -> usize {
|
|
+ PIPE_NEXT_ID.fetch_add(1, Ordering::Relaxed)
|
|
+}
|
|
+
|
|
+fn endpoint_kind_from_flags(flags: usize) -> Result<EndpointKind> {
|
|
+ match flags & O_ACCMODE {
|
|
+ O_RDONLY => Ok(EndpointKind::Read),
|
|
+ O_WRONLY => Ok(EndpointKind::Write),
|
|
+ O_RDWR => Ok(EndpointKind::ReadWrite),
|
|
+ _ => Err(Error::new(EINVAL)),
|
|
+ }
|
|
+}
|
|
+
|
|
+fn validate_named_fifo_open(flags: usize) -> Result<()> {
|
|
+ if flags & O_DIRECTORY == O_DIRECTORY && flags & O_STAT != O_STAT {
|
|
+ return Err(Error::new(ENOTDIR));
|
|
+ }
|
|
+
|
|
+ let _ = endpoint_kind_from_flags(flags)?;
|
|
+ Ok(())
|
|
+}
|
|
+
|
|
+fn trigger_matching(
|
|
+ pipe: &Arc<Pipe>,
|
|
+ require_read: bool,
|
|
+ require_write: bool,
|
|
+ flags: EventFlags,
|
|
+ token: &mut CleanLockToken,
|
|
+) {
|
|
+ let ids = {
|
|
+ let handles = HANDLES.read(token.token());
|
|
+ handles
|
|
+ .iter()
|
|
+ .filter_map(|(id, handle)| match handle {
|
|
+ Handle::Endpoint(endpoint)
|
|
+ if Arc::ptr_eq(&endpoint.pipe, pipe)
|
|
+ && (!require_read || endpoint.kind.can_read())
|
|
+ && (!require_write || endpoint.kind.can_write()) =>
|
|
+ {
|
|
+ Some(*id)
|
|
+ }
|
|
_ => None,
|
|
})
|
|
+ .collect::<Vec<_>>()
|
|
+ };
|
|
+
|
|
+ for id in ids {
|
|
+ event::trigger(GlobalSchemes::Pipe.scheme_id(), id, flags, token);
|
|
+ }
|
|
+}
|
|
+
|
|
+fn open_endpoint(
|
|
+ pipe: Arc<Pipe>,
|
|
+ kind: EndpointKind,
|
|
+ named: Option<Arc<NamedPipe>>,
|
|
+ token: &mut CleanLockToken,
|
|
+) -> usize {
|
|
+ if kind.can_read() {
|
|
+ pipe.reader_count.fetch_add(1, Ordering::SeqCst);
|
|
+ }
|
|
+ if kind.can_write() {
|
|
+ pipe.writer_count.fetch_add(1, Ordering::SeqCst);
|
|
+ }
|
|
+
|
|
+ let id = next_id();
|
|
+ HANDLES.write(token.token()).insert(
|
|
+ id,
|
|
+ Handle::Endpoint(EndpointHandle { pipe, kind, named }),
|
|
+ );
|
|
+ id
|
|
+}
|
|
+
|
|
+fn drop_wait_conditions_if_possible(pipe: Arc<Pipe>, token: &mut CleanLockToken) {
|
|
+ if let Some(pipe) = Arc::into_inner(pipe) {
|
|
+ {
|
|
+ pipe.read_condition.into_drop(token);
|
|
+ }
|
|
+ {
|
|
+ pipe.write_condition.into_drop(token);
|
|
+ }
|
|
+ }
|
|
+}
|
|
+
|
|
+pub fn pipe(token: &mut CleanLockToken) -> Result<(usize, usize)> {
|
|
+ let pipe = Arc::new(Pipe::new());
|
|
+ let read_id = open_endpoint(Arc::clone(&pipe), EndpointKind::Read, None, token);
|
|
+ let write_id = open_endpoint(pipe, EndpointKind::Write, None, token);
|
|
+
|
|
+ Ok((read_id, write_id))
|
|
+}
|
|
+
|
|
+pub fn named_pipe_exists(path: &str, token: &mut CleanLockToken) -> bool {
|
|
+ NAMED_PIPES.read(token.token()).contains_key(path)
|
|
+}
|
|
+
|
|
+pub fn create_named_pipe(
|
|
+ path: &str,
|
|
+ display_path: &str,
|
|
+ mode: u16,
|
|
+ flags: usize,
|
|
+ token: &mut CleanLockToken,
|
|
+) -> Result<usize> {
|
|
+ validate_named_fifo_open(flags)?;
|
|
+
|
|
+ let named = {
|
|
+ let mut named_pipes = NAMED_PIPES.write(token.token());
|
|
+ if named_pipes.contains_key(path) {
|
|
+ return Err(Error::new(EEXIST));
|
|
+ }
|
|
+
|
|
+ let named = Arc::new(NamedPipe {
|
|
+ path: display_path.to_string(),
|
|
+ mode,
|
|
+ active: Mutex::new(None),
|
|
+ });
|
|
+ named_pipes.insert(path.to_string(), Arc::clone(&named));
|
|
+ named
|
|
+ };
|
|
+
|
|
+ let kind = endpoint_kind_from_flags(flags)?;
|
|
+ let pipe = Arc::new(Pipe::new());
|
|
+ *named.active.lock(token.token()) = Some(Arc::clone(&pipe));
|
|
+
|
|
+ Ok(open_endpoint(pipe, kind, Some(named), token))
|
|
+}
|
|
+
|
|
+pub fn open_named_pipe(path: &str, flags: usize, token: &mut CleanLockToken) -> Result<Option<usize>> {
|
|
+ validate_named_fifo_open(flags)?;
|
|
+
|
|
+ let named = match NAMED_PIPES.read(token.token()).get(path) {
|
|
+ Some(named) => Arc::clone(named),
|
|
+ None => return Ok(None),
|
|
+ };
|
|
+
|
|
+ let kind = endpoint_kind_from_flags(flags)?;
|
|
+ let pipe = {
|
|
+ let mut active = named.active.lock(token.token());
|
|
+ match active.as_ref() {
|
|
+ Some(pipe) => Arc::clone(pipe),
|
|
+ None => {
|
|
+ let pipe = Arc::new(Pipe::new());
|
|
+ *active = Some(Arc::clone(&pipe));
|
|
+ pipe
|
|
+ }
|
|
+ }
|
|
+ };
|
|
+
|
|
+ Ok(Some(open_endpoint(pipe, kind, Some(named), token)))
|
|
+}
|
|
+
|
|
+pub fn unlink_named_pipe(path: &str, token: &mut CleanLockToken) -> bool {
|
|
+ NAMED_PIPES.write(token.token()).remove(path).is_some()
|
|
+}
|
|
+
|
|
+pub struct PipeScheme;
|
|
+
|
|
+impl PipeScheme {
|
|
+ fn get_endpoint(id: usize, token: &mut CleanLockToken) -> Result<EndpointHandle> {
|
|
+ HANDLES
|
|
+ .read(token.token())
|
|
+ .get(&id)
|
|
+ .and_then(|handle| match handle {
|
|
+ Handle::Endpoint(endpoint) => Some(endpoint.clone()),
|
|
+ Handle::SchemeRoot => None,
|
|
+ })
|
|
.ok_or(Error::new(EBADF))
|
|
}
|
|
}
|
|
|
|
impl KernelScheme for PipeScheme {
|
|
fn scheme_root(&self, token: &mut CleanLockToken) -> Result<usize> {
|
|
- let id = PIPE_NEXT_ID.fetch_add(2, Ordering::Relaxed);
|
|
- PIPES.write(token.token()).insert(id, Handle::SchemeRoot);
|
|
+ let id = next_id();
|
|
+ HANDLES.write(token.token()).insert(id, Handle::SchemeRoot);
|
|
Ok(id)
|
|
}
|
|
+
|
|
fn fevent(
|
|
&self,
|
|
id: usize,
|
|
flags: EventFlags,
|
|
token: &mut CleanLockToken,
|
|
) -> Result<EventFlags> {
|
|
- let (is_writer_not_reader, key) = from_raw_id(id);
|
|
- let pipe = Self::get_pipe(key, token)?;
|
|
+ let endpoint = Self::get_endpoint(id, token)?;
|
|
|
|
let mut ready = EventFlags::empty();
|
|
|
|
- if is_writer_not_reader
|
|
+ if endpoint.kind.can_write()
|
|
&& flags.contains(EVENT_WRITE)
|
|
- && (pipe.queue.lock(token.token()).len() <= MAX_QUEUE_SIZE
|
|
- || !pipe.reader_is_alive.load(Ordering::Acquire))
|
|
+ && (endpoint.pipe.queue.lock(token.token()).len() <= MAX_QUEUE_SIZE
|
|
+ || endpoint.pipe.reader_count.load(Ordering::Acquire) == 0)
|
|
{
|
|
ready |= EventFlags::EVENT_WRITE;
|
|
}
|
|
- if !is_writer_not_reader
|
|
+
|
|
+ if endpoint.kind.can_read()
|
|
&& flags.contains(EVENT_READ)
|
|
- && (!pipe.queue.lock(token.token()).is_empty()
|
|
- || !pipe.writer_is_alive.load(Ordering::Acquire))
|
|
+ && (!endpoint.pipe.queue.lock(token.token()).is_empty()
|
|
+ || endpoint.pipe.writer_count.load(Ordering::Acquire) == 0)
|
|
{
|
|
ready |= EventFlags::EVENT_READ;
|
|
}
|
|
@@ -116,46 +283,48 @@
|
|
}
|
|
|
|
fn close(&self, id: usize, token: &mut CleanLockToken) -> Result<()> {
|
|
- let (is_write_not_read, key) = from_raw_id(id);
|
|
-
|
|
- let pipe = Self::get_pipe(key, token)?;
|
|
- let scheme_id = GlobalSchemes::Pipe.scheme_id();
|
|
-
|
|
- let can_remove = if is_write_not_read {
|
|
- pipe.writer_is_alive.store(false, Ordering::SeqCst);
|
|
- event::trigger(scheme_id, key, EVENT_READ, token);
|
|
- pipe.read_condition.notify(token);
|
|
-
|
|
- !pipe.reader_is_alive.load(Ordering::SeqCst)
|
|
- } else {
|
|
- pipe.reader_is_alive.store(false, Ordering::SeqCst);
|
|
- event::trigger(scheme_id, key | WRITE_NOT_READ_BIT, EVENT_WRITE, token);
|
|
- pipe.write_condition.notify(token);
|
|
-
|
|
- !pipe.writer_is_alive.load(Ordering::SeqCst)
|
|
+ let handle = HANDLES
|
|
+ .write(token.token())
|
|
+ .remove(&id)
|
|
+ .ok_or(Error::new(EBADF))?;
|
|
+
|
|
+ let Handle::Endpoint(endpoint) = handle else {
|
|
+ return Ok(());
|
|
};
|
|
|
|
- if can_remove {
|
|
- let handle = PIPES.write(token.token()).remove(&key);
|
|
- if let Some(Handle::Pipe(pipe)) = handle
|
|
- && let Some(pipe) = Arc::into_inner(pipe)
|
|
- {
|
|
+ let mut last_reader = false;
|
|
+ let mut last_writer = false;
|
|
+
|
|
+ if endpoint.kind.can_read() {
|
|
+ last_reader = endpoint.pipe.reader_count.fetch_sub(1, Ordering::SeqCst) == 1;
|
|
+ }
|
|
+ if endpoint.kind.can_write() {
|
|
+ last_writer = endpoint.pipe.writer_count.fetch_sub(1, Ordering::SeqCst) == 1;
|
|
+ }
|
|
+
|
|
+ if last_writer {
|
|
+ trigger_matching(&endpoint.pipe, true, false, EVENT_READ, token);
|
|
+ endpoint.pipe.read_condition.notify(token);
|
|
+ }
|
|
+ if last_reader {
|
|
+ trigger_matching(&endpoint.pipe, false, true, EVENT_WRITE, token);
|
|
+ endpoint.pipe.write_condition.notify(token);
|
|
+ }
|
|
+
|
|
+ let no_readers = endpoint.pipe.reader_count.load(Ordering::SeqCst) == 0;
|
|
+ let no_writers = endpoint.pipe.writer_count.load(Ordering::SeqCst) == 0;
|
|
+ if no_readers && no_writers {
|
|
+ if let Some(named) = endpoint.named {
|
|
+ let mut active = named.active.lock(token.token());
|
|
+ if active
|
|
+ .as_ref()
|
|
+ .is_some_and(|active_pipe| Arc::ptr_eq(active_pipe, &endpoint.pipe))
|
|
{
|
|
- pipe.read_condition.into_drop(token);
|
|
+ *active = None;
|
|
}
|
|
- {
|
|
- pipe.write_condition.into_drop(token);
|
|
- }
|
|
- }
|
|
- }
|
|
-
|
|
- if let Some(pipe) = Arc::into_inner(pipe) {
|
|
- {
|
|
- pipe.read_condition.into_drop(token);
|
|
- }
|
|
- {
|
|
- pipe.write_condition.into_drop(token);
|
|
- }
|
|
+ }
|
|
+
|
|
+ drop_wait_conditions_if_possible(endpoint.pipe, token);
|
|
}
|
|
|
|
Ok(())
|
|
@@ -168,9 +337,9 @@
|
|
_ctx: CallerCtx,
|
|
token: &mut CleanLockToken,
|
|
) -> Result<OpenResult> {
|
|
- let (is_writer_not_reader, key) = from_raw_id(old_id);
|
|
-
|
|
- if is_writer_not_reader {
|
|
+ let endpoint = Self::get_endpoint(old_id, token)?;
|
|
+
|
|
+ if !endpoint.kind.can_read() {
|
|
return Err(Error::new(EBADF));
|
|
}
|
|
|
|
@@ -180,17 +349,17 @@
|
|
return Err(Error::new(EINVAL));
|
|
}
|
|
|
|
- let pipe = Self::get_pipe(key, token)?;
|
|
-
|
|
- if pipe.has_run_dup.swap(true, Ordering::SeqCst) {
|
|
- return Err(Error::new(EBADF));
|
|
- }
|
|
-
|
|
Ok(OpenResult::SchemeLocal(
|
|
- key | WRITE_NOT_READ_BIT,
|
|
+ open_endpoint(
|
|
+ Arc::clone(&endpoint.pipe),
|
|
+ EndpointKind::Write,
|
|
+ endpoint.named,
|
|
+ token,
|
|
+ ),
|
|
InternalFlags::empty(),
|
|
))
|
|
}
|
|
+
|
|
fn kopenat(
|
|
&self,
|
|
id: usize,
|
|
@@ -200,40 +369,47 @@
|
|
_ctx: CallerCtx,
|
|
token: &mut CleanLockToken,
|
|
) -> Result<OpenResult> {
|
|
- let (_, key) = from_raw_id(id);
|
|
-
|
|
- {
|
|
- let guard = PIPES.read(token.token());
|
|
- if let Some(Handle::SchemeRoot) = guard.get(&key) {
|
|
- } else if let Some(Handle::Pipe(pipe_arc)) = guard.get(&key) {
|
|
- let pipe = Arc::clone(pipe_arc);
|
|
- drop(guard);
|
|
-
|
|
- if user_buf.as_bytes() == b"write" {
|
|
- return Err(Error::new(EINVAL));
|
|
+ let is_scheme_root = {
|
|
+ let handles = HANDLES.read(token.token());
|
|
+ match handles.get(&id) {
|
|
+ Some(Handle::SchemeRoot) => true,
|
|
+ Some(Handle::Endpoint(_)) => false,
|
|
+ None => return Err(Error::new(EBADF)),
|
|
+ }
|
|
+ };
|
|
+
|
|
+ if is_scheme_root {
|
|
+ let path = user_buf.as_str().or(Err(Error::new(EINVAL)))?;
|
|
+ if !path.trim_start_matches('/').is_empty() {
|
|
+ return Err(Error::new(ENOENT));
|
|
}
|
|
|
|
- if pipe.has_run_dup.swap(true, Ordering::SeqCst) {
|
|
- return Err(Error::new(EBADF));
|
|
- }
|
|
-
|
|
+ let pipe = Arc::new(Pipe::new());
|
|
return Ok(OpenResult::SchemeLocal(
|
|
- key | WRITE_NOT_READ_BIT,
|
|
+ open_endpoint(pipe, EndpointKind::Read, None, token),
|
|
InternalFlags::empty(),
|
|
));
|
|
- } else {
|
|
- return Err(Error::new(EBADF));
|
|
- }
|
|
- }
|
|
-
|
|
- let path = user_buf.as_str().or(Err(Error::new(EINVAL)))?;
|
|
- if !path.trim_start_matches('/').is_empty() {
|
|
- return Err(Error::new(ENOENT));
|
|
- }
|
|
-
|
|
- let (read_id, _) = pipe(token)?;
|
|
-
|
|
- Ok(OpenResult::SchemeLocal(read_id, InternalFlags::empty()))
|
|
+ }
|
|
+
|
|
+ let endpoint = Self::get_endpoint(id, token)?;
|
|
+ if !endpoint.kind.can_read() {
|
|
+ return Err(Error::new(EBADF));
|
|
+ }
|
|
+
|
|
+ let path = user_buf.as_bytes();
|
|
+ if !path.is_empty() && path != b"write" {
|
|
+ return Err(Error::new(EINVAL));
|
|
+ }
|
|
+
|
|
+ Ok(OpenResult::SchemeLocal(
|
|
+ open_endpoint(
|
|
+ Arc::clone(&endpoint.pipe),
|
|
+ EndpointKind::Write,
|
|
+ endpoint.named,
|
|
+ token,
|
|
+ ),
|
|
+ InternalFlags::empty(),
|
|
+ ))
|
|
}
|
|
|
|
fn kread(
|
|
@@ -244,16 +420,15 @@
|
|
_stored_flags: u32,
|
|
token: &mut CleanLockToken,
|
|
) -> Result<usize> {
|
|
- let (is_write_not_read, key) = from_raw_id(id);
|
|
-
|
|
- if is_write_not_read {
|
|
+ let endpoint = Self::get_endpoint(id, token)?;
|
|
+
|
|
+ if !endpoint.kind.can_read() {
|
|
return Err(Error::new(EBADF));
|
|
}
|
|
- let pipe = Self::get_pipe(key, token)?;
|
|
|
|
loop {
|
|
- let vec = pipe.queue.lock(token.token());
|
|
- let (mut vec, mut token) = vec.into_split();
|
|
+ let vec = endpoint.pipe.queue.lock(token.token());
|
|
+ let (mut vec, mut lock_token) = vec.into_split();
|
|
|
|
let (s1, s2) = vec.as_slices();
|
|
let s1_count = core::cmp::min(user_buf.len(), s1.len());
|
|
@@ -273,28 +448,34 @@
|
|
let _ = vec.drain(..bytes_read);
|
|
|
|
if bytes_read > 0 {
|
|
- event::trigger_locked(
|
|
- GlobalSchemes::Pipe.scheme_id(),
|
|
- key | WRITE_NOT_READ_BIT,
|
|
- EVENT_WRITE,
|
|
- token.token(),
|
|
- );
|
|
- pipe.write_condition.notify_locked(token.token());
|
|
+ drop(vec);
|
|
+ drop(lock_token);
|
|
+ trigger_matching(&endpoint.pipe, false, true, EVENT_WRITE, token);
|
|
+ endpoint.pipe.write_condition.notify(token);
|
|
|
|
return Ok(bytes_read);
|
|
- } else if user_buf.is_empty() {
|
|
+ }
|
|
+
|
|
+ if user_buf.is_empty() {
|
|
return Ok(0);
|
|
}
|
|
|
|
- if !pipe.writer_is_alive.load(Ordering::SeqCst) {
|
|
+ if endpoint.pipe.writer_count.load(Ordering::SeqCst) == 0 {
|
|
return Ok(0);
|
|
- } else if fcntl_flags & O_NONBLOCK as u32 != 0 {
|
|
+ }
|
|
+ if fcntl_flags & O_NONBLOCK as u32 != 0 {
|
|
return Err(Error::new(EAGAIN));
|
|
- } else if !pipe.read_condition.wait(vec, "PipeRead::read", &mut token) {
|
|
+ }
|
|
+ if !endpoint
|
|
+ .pipe
|
|
+ .read_condition
|
|
+ .wait(vec, "PipeRead::read", &mut lock_token)
|
|
+ {
|
|
return Err(Error::new(EINTR));
|
|
}
|
|
}
|
|
}
|
|
+
|
|
fn kwrite(
|
|
&self,
|
|
id: usize,
|
|
@@ -303,18 +484,17 @@
|
|
_stored_flags: u32,
|
|
token: &mut CleanLockToken,
|
|
) -> Result<usize> {
|
|
- let (is_write_not_read, key) = from_raw_id(id);
|
|
-
|
|
- if !is_write_not_read {
|
|
+ let endpoint = Self::get_endpoint(id, token)?;
|
|
+
|
|
+ if !endpoint.kind.can_write() {
|
|
return Err(Error::new(EBADF));
|
|
}
|
|
- let pipe = Self::get_pipe(key, token)?;
|
|
|
|
loop {
|
|
- let vec = pipe.queue.lock(token.token());
|
|
- let (mut vec, mut token) = vec.into_split();
|
|
-
|
|
- if !pipe.reader_is_alive.load(Ordering::Relaxed) {
|
|
+ let vec = endpoint.pipe.queue.lock(token.token());
|
|
+ let (mut vec, mut lock_token) = vec.into_split();
|
|
+
|
|
+ if endpoint.pipe.reader_count.load(Ordering::Relaxed) == 0 {
|
|
return Err(Error::new(EPIPE));
|
|
}
|
|
|
|
@@ -329,7 +509,6 @@
|
|
|
|
let mut bytes_written = 0;
|
|
|
|
- // TODO: Modify VecDeque so that the unwritten portions can be accessed directly?
|
|
for (idx, chunk) in src_buf.in_variable_chunks(TMPBUF_SIZE).enumerate() {
|
|
let chunk_byte_count = match chunk.copy_common_bytes_to_slice(&mut tmp_buf) {
|
|
Ok(c) => c,
|
|
@@ -341,41 +520,52 @@
|
|
}
|
|
|
|
if bytes_written > 0 {
|
|
- event::trigger_locked(
|
|
- GlobalSchemes::Pipe.scheme_id(),
|
|
- key,
|
|
- EVENT_READ,
|
|
- token.token(),
|
|
- );
|
|
- pipe.read_condition.notify_locked(token.token());
|
|
+ drop(vec);
|
|
+ drop(lock_token);
|
|
+ trigger_matching(&endpoint.pipe, true, false, EVENT_READ, token);
|
|
+ endpoint.pipe.read_condition.notify(token);
|
|
|
|
return Ok(bytes_written);
|
|
- } else if user_buf.is_empty() {
|
|
+ }
|
|
+
|
|
+ if user_buf.is_empty() {
|
|
return Ok(0);
|
|
}
|
|
|
|
if fcntl_flags & O_NONBLOCK as u32 != 0 {
|
|
return Err(Error::new(EAGAIN));
|
|
- } else if !pipe
|
|
+ }
|
|
+ if !endpoint
|
|
+ .pipe
|
|
.write_condition
|
|
- .wait(vec, "PipeWrite::write", &mut token)
|
|
+ .wait(vec, "PipeWrite::write", &mut lock_token)
|
|
{
|
|
return Err(Error::new(EINTR));
|
|
}
|
|
}
|
|
}
|
|
- fn kfpath(&self, _id: usize, buf: UserSliceWo, _token: &mut CleanLockToken) -> Result<usize> {
|
|
- //TODO: construct useful path?
|
|
- buf.copy_common_bytes_from_slice("/scheme/pipe/".as_bytes())
|
|
- }
|
|
- fn kfstat(&self, _id: usize, buf: UserSliceWo, _token: &mut CleanLockToken) -> Result<()> {
|
|
+
|
|
+ fn kfpath(&self, id: usize, buf: UserSliceWo, token: &mut CleanLockToken) -> Result<usize> {
|
|
+ let endpoint = Self::get_endpoint(id, token)?;
|
|
+ if let Some(named) = endpoint.named {
|
|
+ buf.copy_common_bytes_from_slice(named.path.as_bytes())
|
|
+ } else {
|
|
+ buf.copy_common_bytes_from_slice("/scheme/pipe/".as_bytes())
|
|
+ }
|
|
+ }
|
|
+
|
|
+ fn kfstat(&self, id: usize, buf: UserSliceWo, token: &mut CleanLockToken) -> Result<()> {
|
|
+ let endpoint = Self::get_endpoint(id, token)?;
|
|
+ let mode = endpoint.named.map_or(0o666, |named| named.mode);
|
|
+
|
|
buf.copy_exactly(&Stat {
|
|
- st_mode: MODE_FIFO | 0o666,
|
|
+ st_mode: MODE_FIFO | mode,
|
|
..Default::default()
|
|
})?;
|
|
|
|
Ok(())
|
|
}
|
|
+
|
|
fn kfdwrite(
|
|
&self,
|
|
id: usize,
|
|
@@ -385,23 +575,17 @@
|
|
_metadata: &[u64],
|
|
token: &mut CleanLockToken,
|
|
) -> Result<usize> {
|
|
- let (is_write_not_read, key) = from_raw_id(id);
|
|
-
|
|
- if !is_write_not_read {
|
|
+ let endpoint = Self::get_endpoint(id, token)?;
|
|
+
|
|
+ if !endpoint.kind.can_write() {
|
|
return Err(Error::new(EBADF));
|
|
}
|
|
- let pipe = match Self::get_pipe(key, token) {
|
|
- Ok(p) => p,
|
|
- Err(e) => {
|
|
- return Err(e);
|
|
- }
|
|
- };
|
|
|
|
loop {
|
|
- let vec = pipe.fd_queue.lock(token.token());
|
|
- let (mut vec, mut token) = vec.into_split();
|
|
-
|
|
- if !pipe.reader_is_alive.load(Ordering::Relaxed) {
|
|
+ let vec = endpoint.pipe.fd_queue.lock(token.token());
|
|
+ let (mut vec, mut lock_token) = vec.into_split();
|
|
+
|
|
+ if endpoint.pipe.reader_count.load(Ordering::Relaxed) == 0 {
|
|
return Err(Error::new(EPIPE));
|
|
}
|
|
if descs.is_empty() {
|
|
@@ -421,25 +605,24 @@
|
|
let fds_written = vec.len() - before_len;
|
|
|
|
if fds_written > 0 {
|
|
- event::trigger_locked(
|
|
- GlobalSchemes::Pipe.scheme_id(),
|
|
- key,
|
|
- EVENT_READ,
|
|
- token.token(),
|
|
- );
|
|
- pipe.read_condition.notify_locked(token.token());
|
|
+ drop(vec);
|
|
+ drop(lock_token);
|
|
+ trigger_matching(&endpoint.pipe, true, false, EVENT_READ, token);
|
|
+ endpoint.pipe.read_condition.notify(token);
|
|
|
|
return Ok(fds_written);
|
|
}
|
|
|
|
- if !pipe
|
|
+ if !endpoint
|
|
+ .pipe
|
|
.write_condition
|
|
- .wait(vec, "PipeWrite::write", &mut token)
|
|
+ .wait(vec, "PipeWrite::write", &mut lock_token)
|
|
{
|
|
return Err(Error::new(EINTR));
|
|
}
|
|
}
|
|
}
|
|
+
|
|
fn kfdread(
|
|
&self,
|
|
id: usize,
|
|
@@ -448,25 +631,19 @@
|
|
_metadata: &[u64],
|
|
token: &mut CleanLockToken,
|
|
) -> Result<usize> {
|
|
- let (is_write_not_read, key) = from_raw_id(id);
|
|
-
|
|
- if is_write_not_read {
|
|
+ let endpoint = Self::get_endpoint(id, token)?;
|
|
+
|
|
+ if !endpoint.kind.can_read() {
|
|
return Err(Error::new(EBADF));
|
|
}
|
|
- let pipe = match Self::get_pipe(key, token) {
|
|
- Ok(p) => p,
|
|
- Err(e) => {
|
|
- return Err(e);
|
|
- }
|
|
- };
|
|
|
|
if payload.is_empty() {
|
|
return Ok(0);
|
|
}
|
|
|
|
loop {
|
|
- let vec = pipe.fd_queue.lock(token.token());
|
|
- let (mut vec, mut token) = vec.into_split();
|
|
+ let vec = endpoint.pipe.fd_queue.lock(token.token());
|
|
+ let (mut vec, mut lock_token) = vec.into_split();
|
|
|
|
let fds_available = vec.len();
|
|
let max_fds_read = payload.len() / size_of::<usize>();
|
|
@@ -479,31 +656,33 @@
|
|
fds_to_transfer,
|
|
payload,
|
|
flags.contains(CallFlags::FD_CLOEXEC),
|
|
- &mut token,
|
|
+ &mut lock_token,
|
|
)?;
|
|
} else {
|
|
bulk_add_fds(
|
|
fds_to_transfer,
|
|
payload,
|
|
flags.contains(CallFlags::FD_CLOEXEC),
|
|
- &mut token,
|
|
+ &mut lock_token,
|
|
)?;
|
|
}
|
|
|
|
- event::trigger_locked(
|
|
- GlobalSchemes::Pipe.scheme_id(),
|
|
- key | WRITE_NOT_READ_BIT,
|
|
- EVENT_WRITE,
|
|
- token.token(),
|
|
- );
|
|
- pipe.write_condition.notify_locked(token.token());
|
|
+ drop(vec);
|
|
+ drop(lock_token);
|
|
+ trigger_matching(&endpoint.pipe, false, true, EVENT_WRITE, token);
|
|
+ endpoint.pipe.write_condition.notify(token);
|
|
|
|
return Ok(fds_to_read);
|
|
}
|
|
|
|
- if !pipe.writer_is_alive.load(Ordering::SeqCst) {
|
|
+ if endpoint.pipe.writer_count.load(Ordering::SeqCst) == 0 {
|
|
return Ok(0);
|
|
- } else if !pipe.read_condition.wait(vec, "PipeRead::read", &mut token) {
|
|
+ }
|
|
+ if !endpoint
|
|
+ .pipe
|
|
+ .read_condition
|
|
+ .wait(vec, "PipeRead::read", &mut lock_token)
|
|
+ {
|
|
return Err(Error::new(EINTR));
|
|
}
|
|
}
|
|
@@ -511,11 +690,23 @@
|
|
}
|
|
|
|
pub struct Pipe {
|
|
- read_condition: WaitCondition, // signals whether there are available bytes to read
|
|
- write_condition: WaitCondition, // signals whether there is room for additional bytes
|
|
+ read_condition: WaitCondition,
|
|
+ write_condition: WaitCondition,
|
|
queue: Mutex<L1, VecDeque<u8>>,
|
|
- reader_is_alive: AtomicBool, // starts set, unset when reader closes
|
|
- writer_is_alive: AtomicBool, // starts set, unset when writer closes
|
|
- has_run_dup: AtomicBool,
|
|
+ reader_count: AtomicUsize,
|
|
+ writer_count: AtomicUsize,
|
|
fd_queue: Mutex<L1, VecDeque<Arc<LockedFileDescription>>>,
|
|
}
|
|
+
|
|
+impl Pipe {
|
|
+ fn new() -> Self {
|
|
+ Self {
|
|
+ read_condition: WaitCondition::new(),
|
|
+ write_condition: WaitCondition::new(),
|
|
+ queue: Mutex::new(VecDeque::new()),
|
|
+ reader_count: AtomicUsize::new(0),
|
|
+ writer_count: AtomicUsize::new(0),
|
|
+ fd_queue: Mutex::new(VecDeque::new()),
|
|
+ }
|
|
+ }
|
|
+}
|
|
--- a/src/syscall/fs.rs
|
|
+++ b/src/syscall/fs.rs
|
|
@@ -1,29 +1,29 @@
|
|
//! Filesystem syscalls
|
|
|
|
use core::num::NonZeroUsize;
|
|
|
|
-use alloc::{string::String, sync::Arc, vec::Vec};
|
|
+use alloc::{format, string::{String, ToString}, sync::Arc, vec::Vec};
|
|
use redox_path::RedoxPath;
|
|
|
|
use crate::{
|
|
context::{
|
|
self,
|
|
file::{FileDescription, FileDescriptor, InternalFlags, LockedFileDescription},
|
|
memory::{AddrSpace, GenericFlusher, Grant, PageSpan, TlbShootdownActions},
|
|
},
|
|
memory::{Page, VirtualAddress, PAGE_SIZE},
|
|
- scheme::{FileHandle, KernelScheme, OpenResult, StrOrBytes},
|
|
+ scheme::{self, pipe, FileHandle, KernelScheme, OpenResult, SchemeExt, StrOrBytes},
|
|
sync::{CleanLockToken, RwLock},
|
|
- syscall::{data::Stat, error::*, flag::*},
|
|
+ syscall::{data::{GlobalSchemes, Stat}, error::*, flag::*},
|
|
};
|
|
|
|
use super::usercopy::{UserSlice, UserSliceRo, UserSliceRw, UserSliceWo};
|
|
|
|
pub fn file_op_generic<T>(
|
|
fd: FileHandle,
|
|
token: &mut CleanLockToken,
|
|
op: impl FnOnce(&dyn KernelScheme, usize, &mut CleanLockToken) -> Result<T>,
|
|
) -> Result<T> {
|
|
file_op_generic_ext(fd, token, |s, _, desc, token| op(s, desc.number, token))
|
|
}
|
|
pub fn file_op_generic_ext<T>(
|
|
@@ -53,91 +53,161 @@
|
|
let mut path_buf = vec![0_u8; max_len];
|
|
if raw_path.len() > path_buf.len() {
|
|
return Err(Error::new(ENAMETOOLONG));
|
|
}
|
|
let path_len = raw_path.copy_common_bytes_to_slice(&mut path_buf)?;
|
|
path_buf.truncate(path_len);
|
|
String::from_utf8(path_buf).map_err(|_| Error::new(EINVAL))
|
|
//core::str::from_utf8(&path_buf[..path_len]).map_err(|_| Error::new(EINVAL))
|
|
}
|
|
// TODO: Define elsewhere
|
|
const PATH_MAX: usize = PAGE_SIZE;
|
|
|
|
-pub fn openat(
|
|
- fh: FileHandle,
|
|
- raw_path: UserSliceRo,
|
|
+fn fifo_path_key(scheme_id: scheme::SchemeId, number: usize, path: &str) -> String {
|
|
+ if path.starts_with('/') {
|
|
+ path.to_string()
|
|
+ } else {
|
|
+ format!("@fifo:{}:{}:{}", scheme_id.get(), number, path)
|
|
+ }
|
|
+}
|
|
+
|
|
+fn install_open_result(
|
|
+ scheme_id: scheme::SchemeId,
|
|
flags: usize,
|
|
- fcntl_flags: u32,
|
|
- euid: u32,
|
|
- egid: u32,
|
|
+ open_result: OpenResult,
|
|
token: &mut CleanLockToken,
|
|
) -> Result<FileHandle> {
|
|
- let path_buf = copy_path_to_buf(raw_path, PATH_MAX)?;
|
|
-
|
|
- let desc = {
|
|
- let current_lock = context::current();
|
|
- let mut current = current_lock.read(token.token());
|
|
- let (context, mut context_token) = current.token_split();
|
|
- let pipe = context
|
|
- .get_file(fh, &mut context_token)
|
|
- .ok_or(Error::new(EBADF))?;
|
|
- *pipe.description.read(context_token.token())
|
|
- };
|
|
- let scheme = desc.get_scheme(token)?;
|
|
- let number = desc.number;
|
|
- let scheme_id = desc.scheme;
|
|
-
|
|
- let caller_ctx = context::current()
|
|
- .read(token.token())
|
|
- .caller_ctx()
|
|
- .filter_uid_gid(euid, egid);
|
|
-
|
|
- let new_description = {
|
|
- let res = scheme.kopenat(
|
|
- number,
|
|
- StrOrBytes::from_str(&path_buf),
|
|
- flags,
|
|
- fcntl_flags,
|
|
- caller_ctx,
|
|
- token,
|
|
- );
|
|
-
|
|
- match res? {
|
|
- OpenResult::SchemeLocal(number, internal_flags) => {
|
|
- Arc::new(RwLock::new(FileDescription::new(
|
|
- scheme_id,
|
|
- number,
|
|
- 0,
|
|
- (flags & !O_CLOEXEC) as u32,
|
|
- internal_flags,
|
|
- token,
|
|
- )))
|
|
- }
|
|
- OpenResult::External(desc) => desc,
|
|
- }
|
|
+ let new_description = match open_result {
|
|
+ OpenResult::SchemeLocal(number, internal_flags) => Arc::new(RwLock::new(
|
|
+ FileDescription::new(
|
|
+ scheme_id,
|
|
+ number,
|
|
+ 0,
|
|
+ (flags & !O_CLOEXEC) as u32,
|
|
+ internal_flags,
|
|
+ token,
|
|
+ ),
|
|
+ )),
|
|
+ OpenResult::External(desc) => desc,
|
|
};
|
|
|
|
let current_lock = context::current();
|
|
let mut current = current_lock.read(token.token());
|
|
let (context, mut token) = current.token_split();
|
|
context
|
|
.add_file(
|
|
FileDescriptor {
|
|
description: new_description,
|
|
cloexec: flags & O_CLOEXEC == O_CLOEXEC,
|
|
},
|
|
&mut token,
|
|
)
|
|
.ok_or(Error::new(EMFILE))
|
|
}
|
|
+
|
|
+fn path_exists_in_scheme(
|
|
+ scheme: &dyn KernelScheme,
|
|
+ number: usize,
|
|
+ path: &str,
|
|
+ caller_ctx: scheme::CallerCtx,
|
|
+ token: &mut CleanLockToken,
|
|
+) -> Result<bool> {
|
|
+ match scheme.kopenat(number, StrOrBytes::from_str(path), O_STAT, 0, caller_ctx, token) {
|
|
+ Ok(OpenResult::SchemeLocal(number, _)) => {
|
|
+ let _ = scheme.close(number, token);
|
|
+ Ok(true)
|
|
+ }
|
|
+ Ok(OpenResult::External(_)) => Ok(true),
|
|
+ Err(err) if err.errno == ENOENT => Ok(false),
|
|
+ Err(err) => Err(err),
|
|
+ }
|
|
+}
|
|
+
|
|
+pub fn openat(
|
|
+ fh: FileHandle,
|
|
+ raw_path: UserSliceRo,
|
|
+ flags: usize,
|
|
+ fcntl_flags: u32,
|
|
+ euid: u32,
|
|
+ egid: u32,
|
|
+ token: &mut CleanLockToken,
|
|
+) -> Result<FileHandle> {
|
|
+ let path_buf = copy_path_to_buf(raw_path, PATH_MAX)?;
|
|
+
|
|
+ let desc = {
|
|
+ let current_lock = context::current();
|
|
+ let mut current = current_lock.read(token.token());
|
|
+ let (context, mut context_token) = current.token_split();
|
|
+ let pipe = context
|
|
+ .get_file(fh, &mut context_token)
|
|
+ .ok_or(Error::new(EBADF))?;
|
|
+ *pipe.description.read(context_token.token())
|
|
+ };
|
|
+ let scheme = desc.get_scheme(token)?;
|
|
+ let number = desc.number;
|
|
+ let scheme_id = desc.scheme;
|
|
+
|
|
+ let caller_ctx = context::current()
|
|
+ .read(token.token())
|
|
+ .caller_ctx()
|
|
+ .filter_uid_gid(euid, egid);
|
|
+
|
|
+ let fifo_mode_requested = flags & MODE_FIFO as usize == MODE_FIFO as usize;
|
|
+ let fifo_key = fifo_path_key(scheme_id, number, &path_buf);
|
|
+
|
|
+ if pipe::named_pipe_exists(&fifo_key, token) {
|
|
+ if flags & O_EXCL == O_EXCL && flags & O_CREAT == O_CREAT {
|
|
+ return Err(Error::new(EEXIST));
|
|
+ }
|
|
+ if fifo_mode_requested && flags & O_CREAT == O_CREAT {
|
|
+ return Err(Error::new(EEXIST));
|
|
+ }
|
|
+
|
|
+ let pipe_number = pipe::open_named_pipe(&fifo_key, flags, token)?
|
|
+ .ok_or(Error::new(ENOENT))?;
|
|
+ return install_open_result(
|
|
+ GlobalSchemes::Pipe.scheme_id(),
|
|
+ flags,
|
|
+ OpenResult::SchemeLocal(pipe_number, InternalFlags::empty()),
|
|
+ token,
|
|
+ );
|
|
+ }
|
|
+
|
|
+ if fifo_mode_requested && flags & O_CREAT == O_CREAT {
|
|
+ if path_exists_in_scheme(&*scheme, number, &path_buf, caller_ctx, token)? {
|
|
+ return Err(Error::new(EEXIST));
|
|
+ }
|
|
+
|
|
+ let mode = u16::try_from(flags & 0o7777).map_err(|_| Error::new(EINVAL))?;
|
|
+ let pipe_number = pipe::create_named_pipe(&fifo_key, &path_buf, mode, flags, token)?;
|
|
+
|
|
+ return install_open_result(
|
|
+ GlobalSchemes::Pipe.scheme_id(),
|
|
+ flags,
|
|
+ OpenResult::SchemeLocal(pipe_number, InternalFlags::empty()),
|
|
+ token,
|
|
+ );
|
|
+ }
|
|
+
|
|
+ let open_result = scheme.kopenat(
|
|
+ number,
|
|
+ StrOrBytes::from_str(&path_buf),
|
|
+ flags,
|
|
+ fcntl_flags,
|
|
+ caller_ctx,
|
|
+ token,
|
|
+ )?;
|
|
+
|
|
+ install_open_result(scheme_id, flags, open_result, token)
|
|
+}
|
|
/// Unlinkat syscall
|
|
pub fn unlinkat(
|
|
fh: FileHandle,
|
|
raw_path: UserSliceRo,
|
|
flags: usize,
|
|
euid: u32,
|
|
egid: u32,
|
|
token: &mut CleanLockToken,
|
|
) -> Result<()> {
|
|
let path_buf = copy_path_to_buf(raw_path, PATH_MAX)?;
|
|
|
|
let desc = {
|
|
@@ -147,24 +217,28 @@
|
|
let pipe = context
|
|
.get_file(fh, &mut context_token)
|
|
.ok_or(Error::new(EBADF))?;
|
|
*pipe.description.read(context_token.token())
|
|
};
|
|
let number = desc.number;
|
|
let scheme = desc.get_scheme(token)?;
|
|
|
|
let caller_ctx = context::current()
|
|
.read(token.token())
|
|
.caller_ctx()
|
|
.filter_uid_gid(euid, egid);
|
|
+
|
|
+ if pipe::unlink_named_pipe(&fifo_path_key(desc.scheme, number, &path_buf), token) {
|
|
+ return Ok(());
|
|
+ }
|
|
|
|
/*
|
|
let mut path_buf = BorrowedHtBuf::head()?;
|
|
let path = path_buf.use_for_string(raw_path)?;
|
|
*/
|
|
scheme.unlinkat(number, &path_buf, flags, caller_ctx, token)
|
|
}
|
|
|
|
/// Close syscall
|
|
pub fn close(fd: FileHandle, token: &mut CleanLockToken) -> Result<()> {
|
|
let file = {
|
|
let current_lock = context::current();
|