Files
RedBear-OS/local/patches/kernel/P1-mkfifo-fifo-support.patch
T
vasilito 7706617e7f cub: full AUR package manager + Phase 1-5 native build tools
cub redesign (local/recipes/system/cub/):
- AUR RPC v5 client (serde_json) with search/info
- ~/.cub/ user-local recipe/source/repo storage
- Enhanced PKGBUILD parser: optdepends, .SRCINFO, split packages, 19 linuxism patterns
- Recipe generation: host: prefix on dev-deps, shallow_clone, cargopath, installs, optional-packages
- Dependency resolver: scans build errors for missing commands/headers/libs/pkgconfig, maps to packages
- Dependency installation: checks installed packages, fetches AUR deps, interactive prompt
- ~110 Arc→Redox dependency mappings
- ratatui TUI: search, info, install, build, query views
- 14 Arch-style CLI switches (-S/-Si/-Syu/-G/-R/-Q/-Qi/-Ql)
- 65 tests, 0 failures, clean build

Phase 1-5 native build tools (local/recipes/dev/):
- P1 Substrate: tar, m4, diffutils (gnulib bypass), mkfifo kernel patch (1085 lines)
- P2 Build Systems: bison, flex, meson (standalone wrapper), ninja-build, libtool
- P3 Native GCC: gcc-native, binutils-native (cross-compiled for redox host)
- P4 Native LLVM: llvm-native (clang + lld from monorepo)
- P5 Native Rust: rust-native (rustc + cargo)
- Groups: build-essential-native, dev-essential expanded

Config:
- redbear-mini: +7 tools (diffutils, tar, bison, flex, meson, ninja, m4)
- redbear-full: +4 native tools (gcc, binutils, llvm, rust)
- All recipes moved to local/ with symlinks for cookbook discovery (Red Bear policy)

Docs:
- BUILD-TOOLS-PORTING-PLAN.md: phased porting roadmap
- CUB-WORKFLOW-ASSESSMENT.md: gap analysis and integration assessment
2026-05-08 00:13:31 +01:00

1086 lines
34 KiB
Diff

--- a/src/scheme/pipe.rs
+++ b/src/scheme/pipe.rs
@@ -1,5 +1,10 @@
-use alloc::{collections::VecDeque, sync::Arc, vec::Vec};
-use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
+use alloc::{
+ collections::VecDeque,
+ string::{String, ToString},
+ sync::Arc,
+ vec::Vec,
+};
+use core::sync::atomic::{AtomicUsize, Ordering};
use syscall::{data::GlobalSchemes, CallFlags};
@@ -14,100 +19,262 @@
sync::{CleanLockToken, Mutex, RwLock, WaitCondition, L1},
syscall::{
data::Stat,
- error::{Error, Result, EAGAIN, EBADF, EINTR, EINVAL, ENOENT, EPIPE},
- flag::{EventFlags, EVENT_READ, EVENT_WRITE, MODE_FIFO, O_NONBLOCK},
+ error::{
+ Error, Result, EAGAIN, EBADF, EEXIST, EINVAL, EINTR, ENOENT, ENOTDIR, EPIPE,
+ },
+ flag::{
+ EventFlags, EVENT_READ, EVENT_WRITE, MODE_FIFO, O_ACCMODE, O_DIRECTORY,
+ O_NONBLOCK, O_RDONLY, O_RDWR, O_STAT, O_WRONLY,
+ },
usercopy::{UserSliceRo, UserSliceRw, UserSliceWo},
},
};
use super::{CallerCtx, KernelScheme, OpenResult, SchemeExt, StrOrBytes};
-// TODO: Preallocate a number of scheme IDs, since there can only be *one* root namespace, and
-// therefore only *one* pipe scheme.
-static PIPE_NEXT_ID: AtomicUsize = AtomicUsize::new(0);
-
+static PIPE_NEXT_ID: AtomicUsize = AtomicUsize::new(1);
+
+#[derive(Clone)]
enum Handle {
- Pipe(Arc<Pipe>),
+ Endpoint(EndpointHandle),
SchemeRoot,
}
-// TODO: SLOB?
-static PIPES: RwLock<L1, HashMap<usize, Handle>> =
+#[derive(Clone, Copy, Eq, PartialEq)]
+enum EndpointKind {
+ Read,
+ Write,
+ ReadWrite,
+}
+
+impl EndpointKind {
+ fn can_read(self) -> bool {
+ matches!(self, Self::Read | Self::ReadWrite)
+ }
+
+ fn can_write(self) -> bool {
+ matches!(self, Self::Write | Self::ReadWrite)
+ }
+}
+
+#[derive(Clone)]
+struct EndpointHandle {
+ pipe: Arc<Pipe>,
+ kind: EndpointKind,
+ named: Option<Arc<NamedPipe>>,
+}
+
+struct NamedPipe {
+ path: String,
+ mode: u16,
+ active: Mutex<L1, Option<Arc<Pipe>>>,
+}
+
+static HANDLES: RwLock<L1, HashMap<usize, Handle>> =
RwLock::new(HashMap::with_hasher(DefaultHashBuilder::new()));
+static NAMED_PIPES: RwLock<L1, HashMap<String, Arc<NamedPipe>>> =
+ RwLock::new(HashMap::with_hasher(DefaultHashBuilder::new()));
const MAX_QUEUE_SIZE: usize = 65536;
-// In almost all places where Rust (and LLVM) uses pointers, they are limited to nonnegative isize,
-// so this is fine.
-const WRITE_NOT_READ_BIT: usize = 1;
-
-fn from_raw_id(id: usize) -> (bool, usize) {
- (id & WRITE_NOT_READ_BIT != 0, id & !WRITE_NOT_READ_BIT)
-}
-
-pub fn pipe(token: &mut CleanLockToken) -> Result<(usize, usize)> {
- // Bit 0 is used for WRITE_NOT_READ_BIT
- let id = PIPE_NEXT_ID.fetch_add(2, Ordering::Relaxed);
-
- PIPES.write(token.token()).insert(
- id,
- Handle::Pipe(Arc::new(Pipe {
- queue: Mutex::new(VecDeque::new()),
- read_condition: WaitCondition::new(),
- write_condition: WaitCondition::new(),
- writer_is_alive: AtomicBool::new(true),
- reader_is_alive: AtomicBool::new(true),
- has_run_dup: AtomicBool::new(false),
- fd_queue: Mutex::new(VecDeque::new()),
- })),
- );
-
- Ok((id, id | WRITE_NOT_READ_BIT))
-}
-
-pub struct PipeScheme;
-
-impl PipeScheme {
- fn get_pipe(key: usize, token: &mut CleanLockToken) -> Result<Arc<Pipe>> {
- PIPES
- .read(token.token())
- .get(&key)
- .and_then(|handle| match handle {
- Handle::Pipe(pipe) => Some(Arc::clone(pipe)),
+fn next_id() -> usize {
+ PIPE_NEXT_ID.fetch_add(1, Ordering::Relaxed)
+}
+
+fn endpoint_kind_from_flags(flags: usize) -> Result<EndpointKind> {
+ match flags & O_ACCMODE {
+ O_RDONLY => Ok(EndpointKind::Read),
+ O_WRONLY => Ok(EndpointKind::Write),
+ O_RDWR => Ok(EndpointKind::ReadWrite),
+ _ => Err(Error::new(EINVAL)),
+ }
+}
+
+fn validate_named_fifo_open(flags: usize) -> Result<()> {
+ if flags & O_DIRECTORY == O_DIRECTORY && flags & O_STAT != O_STAT {
+ return Err(Error::new(ENOTDIR));
+ }
+
+ let _ = endpoint_kind_from_flags(flags)?;
+ Ok(())
+}
+
+fn trigger_matching(
+ pipe: &Arc<Pipe>,
+ require_read: bool,
+ require_write: bool,
+ flags: EventFlags,
+ token: &mut CleanLockToken,
+) {
+ let ids = {
+ let handles = HANDLES.read(token.token());
+ handles
+ .iter()
+ .filter_map(|(id, handle)| match handle {
+ Handle::Endpoint(endpoint)
+ if Arc::ptr_eq(&endpoint.pipe, pipe)
+ && (!require_read || endpoint.kind.can_read())
+ && (!require_write || endpoint.kind.can_write()) =>
+ {
+ Some(*id)
+ }
_ => None,
})
+ .collect::<Vec<_>>()
+ };
+
+ for id in ids {
+ event::trigger(GlobalSchemes::Pipe.scheme_id(), id, flags, token);
+ }
+}
+
+fn open_endpoint(
+ pipe: Arc<Pipe>,
+ kind: EndpointKind,
+ named: Option<Arc<NamedPipe>>,
+ token: &mut CleanLockToken,
+) -> usize {
+ if kind.can_read() {
+ pipe.reader_count.fetch_add(1, Ordering::SeqCst);
+ }
+ if kind.can_write() {
+ pipe.writer_count.fetch_add(1, Ordering::SeqCst);
+ }
+
+ let id = next_id();
+ HANDLES.write(token.token()).insert(
+ id,
+ Handle::Endpoint(EndpointHandle { pipe, kind, named }),
+ );
+ id
+}
+
+fn drop_wait_conditions_if_possible(pipe: Arc<Pipe>, token: &mut CleanLockToken) {
+ if let Some(pipe) = Arc::into_inner(pipe) {
+ {
+ pipe.read_condition.into_drop(token);
+ }
+ {
+ pipe.write_condition.into_drop(token);
+ }
+ }
+}
+
+pub fn pipe(token: &mut CleanLockToken) -> Result<(usize, usize)> {
+ let pipe = Arc::new(Pipe::new());
+ let read_id = open_endpoint(Arc::clone(&pipe), EndpointKind::Read, None, token);
+ let write_id = open_endpoint(pipe, EndpointKind::Write, None, token);
+
+ Ok((read_id, write_id))
+}
+
+pub fn named_pipe_exists(path: &str, token: &mut CleanLockToken) -> bool {
+ NAMED_PIPES.read(token.token()).contains_key(path)
+}
+
+pub fn create_named_pipe(
+ path: &str,
+ display_path: &str,
+ mode: u16,
+ flags: usize,
+ token: &mut CleanLockToken,
+) -> Result<usize> {
+ validate_named_fifo_open(flags)?;
+
+ let named = {
+ let mut named_pipes = NAMED_PIPES.write(token.token());
+ if named_pipes.contains_key(path) {
+ return Err(Error::new(EEXIST));
+ }
+
+ let named = Arc::new(NamedPipe {
+ path: display_path.to_string(),
+ mode,
+ active: Mutex::new(None),
+ });
+ named_pipes.insert(path.to_string(), Arc::clone(&named));
+ named
+ };
+
+ let kind = endpoint_kind_from_flags(flags)?;
+ let pipe = Arc::new(Pipe::new());
+ *named.active.lock(token.token()) = Some(Arc::clone(&pipe));
+
+ Ok(open_endpoint(pipe, kind, Some(named), token))
+}
+
+pub fn open_named_pipe(path: &str, flags: usize, token: &mut CleanLockToken) -> Result<Option<usize>> {
+ validate_named_fifo_open(flags)?;
+
+ let named = match NAMED_PIPES.read(token.token()).get(path) {
+ Some(named) => Arc::clone(named),
+ None => return Ok(None),
+ };
+
+ let kind = endpoint_kind_from_flags(flags)?;
+ let pipe = {
+ let mut active = named.active.lock(token.token());
+ match active.as_ref() {
+ Some(pipe) => Arc::clone(pipe),
+ None => {
+ let pipe = Arc::new(Pipe::new());
+ *active = Some(Arc::clone(&pipe));
+ pipe
+ }
+ }
+ };
+
+ Ok(Some(open_endpoint(pipe, kind, Some(named), token)))
+}
+
+pub fn unlink_named_pipe(path: &str, token: &mut CleanLockToken) -> bool {
+ NAMED_PIPES.write(token.token()).remove(path).is_some()
+}
+
+pub struct PipeScheme;
+
+impl PipeScheme {
+ fn get_endpoint(id: usize, token: &mut CleanLockToken) -> Result<EndpointHandle> {
+ HANDLES
+ .read(token.token())
+ .get(&id)
+ .and_then(|handle| match handle {
+ Handle::Endpoint(endpoint) => Some(endpoint.clone()),
+ Handle::SchemeRoot => None,
+ })
.ok_or(Error::new(EBADF))
}
}
impl KernelScheme for PipeScheme {
fn scheme_root(&self, token: &mut CleanLockToken) -> Result<usize> {
- let id = PIPE_NEXT_ID.fetch_add(2, Ordering::Relaxed);
- PIPES.write(token.token()).insert(id, Handle::SchemeRoot);
+ let id = next_id();
+ HANDLES.write(token.token()).insert(id, Handle::SchemeRoot);
Ok(id)
}
+
fn fevent(
&self,
id: usize,
flags: EventFlags,
token: &mut CleanLockToken,
) -> Result<EventFlags> {
- let (is_writer_not_reader, key) = from_raw_id(id);
- let pipe = Self::get_pipe(key, token)?;
+ let endpoint = Self::get_endpoint(id, token)?;
let mut ready = EventFlags::empty();
- if is_writer_not_reader
+ if endpoint.kind.can_write()
&& flags.contains(EVENT_WRITE)
- && (pipe.queue.lock(token.token()).len() <= MAX_QUEUE_SIZE
- || !pipe.reader_is_alive.load(Ordering::Acquire))
+ && (endpoint.pipe.queue.lock(token.token()).len() <= MAX_QUEUE_SIZE
+ || endpoint.pipe.reader_count.load(Ordering::Acquire) == 0)
{
ready |= EventFlags::EVENT_WRITE;
}
- if !is_writer_not_reader
+
+ if endpoint.kind.can_read()
&& flags.contains(EVENT_READ)
- && (!pipe.queue.lock(token.token()).is_empty()
- || !pipe.writer_is_alive.load(Ordering::Acquire))
+ && (!endpoint.pipe.queue.lock(token.token()).is_empty()
+ || endpoint.pipe.writer_count.load(Ordering::Acquire) == 0)
{
ready |= EventFlags::EVENT_READ;
}
@@ -116,46 +283,48 @@
}
fn close(&self, id: usize, token: &mut CleanLockToken) -> Result<()> {
- let (is_write_not_read, key) = from_raw_id(id);
-
- let pipe = Self::get_pipe(key, token)?;
- let scheme_id = GlobalSchemes::Pipe.scheme_id();
-
- let can_remove = if is_write_not_read {
- pipe.writer_is_alive.store(false, Ordering::SeqCst);
- event::trigger(scheme_id, key, EVENT_READ, token);
- pipe.read_condition.notify(token);
-
- !pipe.reader_is_alive.load(Ordering::SeqCst)
- } else {
- pipe.reader_is_alive.store(false, Ordering::SeqCst);
- event::trigger(scheme_id, key | WRITE_NOT_READ_BIT, EVENT_WRITE, token);
- pipe.write_condition.notify(token);
-
- !pipe.writer_is_alive.load(Ordering::SeqCst)
+ let handle = HANDLES
+ .write(token.token())
+ .remove(&id)
+ .ok_or(Error::new(EBADF))?;
+
+ let Handle::Endpoint(endpoint) = handle else {
+ return Ok(());
};
- if can_remove {
- let handle = PIPES.write(token.token()).remove(&key);
- if let Some(Handle::Pipe(pipe)) = handle
- && let Some(pipe) = Arc::into_inner(pipe)
- {
+ let mut last_reader = false;
+ let mut last_writer = false;
+
+ if endpoint.kind.can_read() {
+ last_reader = endpoint.pipe.reader_count.fetch_sub(1, Ordering::SeqCst) == 1;
+ }
+ if endpoint.kind.can_write() {
+ last_writer = endpoint.pipe.writer_count.fetch_sub(1, Ordering::SeqCst) == 1;
+ }
+
+ if last_writer {
+ trigger_matching(&endpoint.pipe, true, false, EVENT_READ, token);
+ endpoint.pipe.read_condition.notify(token);
+ }
+ if last_reader {
+ trigger_matching(&endpoint.pipe, false, true, EVENT_WRITE, token);
+ endpoint.pipe.write_condition.notify(token);
+ }
+
+ let no_readers = endpoint.pipe.reader_count.load(Ordering::SeqCst) == 0;
+ let no_writers = endpoint.pipe.writer_count.load(Ordering::SeqCst) == 0;
+ if no_readers && no_writers {
+ if let Some(named) = endpoint.named {
+ let mut active = named.active.lock(token.token());
+ if active
+ .as_ref()
+ .is_some_and(|active_pipe| Arc::ptr_eq(active_pipe, &endpoint.pipe))
{
- pipe.read_condition.into_drop(token);
+ *active = None;
}
- {
- pipe.write_condition.into_drop(token);
- }
- }
- }
-
- if let Some(pipe) = Arc::into_inner(pipe) {
- {
- pipe.read_condition.into_drop(token);
- }
- {
- pipe.write_condition.into_drop(token);
- }
+ }
+
+ drop_wait_conditions_if_possible(endpoint.pipe, token);
}
Ok(())
@@ -168,9 +337,9 @@
_ctx: CallerCtx,
token: &mut CleanLockToken,
) -> Result<OpenResult> {
- let (is_writer_not_reader, key) = from_raw_id(old_id);
-
- if is_writer_not_reader {
+ let endpoint = Self::get_endpoint(old_id, token)?;
+
+ if !endpoint.kind.can_read() {
return Err(Error::new(EBADF));
}
@@ -180,17 +349,17 @@
return Err(Error::new(EINVAL));
}
- let pipe = Self::get_pipe(key, token)?;
-
- if pipe.has_run_dup.swap(true, Ordering::SeqCst) {
- return Err(Error::new(EBADF));
- }
-
Ok(OpenResult::SchemeLocal(
- key | WRITE_NOT_READ_BIT,
+ open_endpoint(
+ Arc::clone(&endpoint.pipe),
+ EndpointKind::Write,
+ endpoint.named,
+ token,
+ ),
InternalFlags::empty(),
))
}
+
fn kopenat(
&self,
id: usize,
@@ -200,40 +369,47 @@
_ctx: CallerCtx,
token: &mut CleanLockToken,
) -> Result<OpenResult> {
- let (_, key) = from_raw_id(id);
-
- {
- let guard = PIPES.read(token.token());
- if let Some(Handle::SchemeRoot) = guard.get(&key) {
- } else if let Some(Handle::Pipe(pipe_arc)) = guard.get(&key) {
- let pipe = Arc::clone(pipe_arc);
- drop(guard);
-
- if user_buf.as_bytes() == b"write" {
- return Err(Error::new(EINVAL));
+ let is_scheme_root = {
+ let handles = HANDLES.read(token.token());
+ match handles.get(&id) {
+ Some(Handle::SchemeRoot) => true,
+ Some(Handle::Endpoint(_)) => false,
+ None => return Err(Error::new(EBADF)),
+ }
+ };
+
+ if is_scheme_root {
+ let path = user_buf.as_str().or(Err(Error::new(EINVAL)))?;
+ if !path.trim_start_matches('/').is_empty() {
+ return Err(Error::new(ENOENT));
}
- if pipe.has_run_dup.swap(true, Ordering::SeqCst) {
- return Err(Error::new(EBADF));
- }
-
+ let pipe = Arc::new(Pipe::new());
return Ok(OpenResult::SchemeLocal(
- key | WRITE_NOT_READ_BIT,
+ open_endpoint(pipe, EndpointKind::Read, None, token),
InternalFlags::empty(),
));
- } else {
- return Err(Error::new(EBADF));
- }
- }
-
- let path = user_buf.as_str().or(Err(Error::new(EINVAL)))?;
- if !path.trim_start_matches('/').is_empty() {
- return Err(Error::new(ENOENT));
- }
-
- let (read_id, _) = pipe(token)?;
-
- Ok(OpenResult::SchemeLocal(read_id, InternalFlags::empty()))
+ }
+
+ let endpoint = Self::get_endpoint(id, token)?;
+ if !endpoint.kind.can_read() {
+ return Err(Error::new(EBADF));
+ }
+
+ let path = user_buf.as_bytes();
+ if !path.is_empty() && path != b"write" {
+ return Err(Error::new(EINVAL));
+ }
+
+ Ok(OpenResult::SchemeLocal(
+ open_endpoint(
+ Arc::clone(&endpoint.pipe),
+ EndpointKind::Write,
+ endpoint.named,
+ token,
+ ),
+ InternalFlags::empty(),
+ ))
}
fn kread(
@@ -244,16 +420,15 @@
_stored_flags: u32,
token: &mut CleanLockToken,
) -> Result<usize> {
- let (is_write_not_read, key) = from_raw_id(id);
-
- if is_write_not_read {
+ let endpoint = Self::get_endpoint(id, token)?;
+
+ if !endpoint.kind.can_read() {
return Err(Error::new(EBADF));
}
- let pipe = Self::get_pipe(key, token)?;
loop {
- let vec = pipe.queue.lock(token.token());
- let (mut vec, mut token) = vec.into_split();
+ let vec = endpoint.pipe.queue.lock(token.token());
+ let (mut vec, mut lock_token) = vec.into_split();
let (s1, s2) = vec.as_slices();
let s1_count = core::cmp::min(user_buf.len(), s1.len());
@@ -273,28 +448,34 @@
let _ = vec.drain(..bytes_read);
if bytes_read > 0 {
- event::trigger_locked(
- GlobalSchemes::Pipe.scheme_id(),
- key | WRITE_NOT_READ_BIT,
- EVENT_WRITE,
- token.token(),
- );
- pipe.write_condition.notify_locked(token.token());
+ drop(vec);
+ drop(lock_token);
+ trigger_matching(&endpoint.pipe, false, true, EVENT_WRITE, token);
+ endpoint.pipe.write_condition.notify(token);
return Ok(bytes_read);
- } else if user_buf.is_empty() {
+ }
+
+ if user_buf.is_empty() {
return Ok(0);
}
- if !pipe.writer_is_alive.load(Ordering::SeqCst) {
+ if endpoint.pipe.writer_count.load(Ordering::SeqCst) == 0 {
return Ok(0);
- } else if fcntl_flags & O_NONBLOCK as u32 != 0 {
+ }
+ if fcntl_flags & O_NONBLOCK as u32 != 0 {
return Err(Error::new(EAGAIN));
- } else if !pipe.read_condition.wait(vec, "PipeRead::read", &mut token) {
+ }
+ if !endpoint
+ .pipe
+ .read_condition
+ .wait(vec, "PipeRead::read", &mut lock_token)
+ {
return Err(Error::new(EINTR));
}
}
}
+
fn kwrite(
&self,
id: usize,
@@ -303,18 +484,17 @@
_stored_flags: u32,
token: &mut CleanLockToken,
) -> Result<usize> {
- let (is_write_not_read, key) = from_raw_id(id);
-
- if !is_write_not_read {
+ let endpoint = Self::get_endpoint(id, token)?;
+
+ if !endpoint.kind.can_write() {
return Err(Error::new(EBADF));
}
- let pipe = Self::get_pipe(key, token)?;
loop {
- let vec = pipe.queue.lock(token.token());
- let (mut vec, mut token) = vec.into_split();
-
- if !pipe.reader_is_alive.load(Ordering::Relaxed) {
+ let vec = endpoint.pipe.queue.lock(token.token());
+ let (mut vec, mut lock_token) = vec.into_split();
+
+ if endpoint.pipe.reader_count.load(Ordering::Relaxed) == 0 {
return Err(Error::new(EPIPE));
}
@@ -329,7 +509,6 @@
let mut bytes_written = 0;
- // TODO: Modify VecDeque so that the unwritten portions can be accessed directly?
for (idx, chunk) in src_buf.in_variable_chunks(TMPBUF_SIZE).enumerate() {
let chunk_byte_count = match chunk.copy_common_bytes_to_slice(&mut tmp_buf) {
Ok(c) => c,
@@ -341,41 +520,52 @@
}
if bytes_written > 0 {
- event::trigger_locked(
- GlobalSchemes::Pipe.scheme_id(),
- key,
- EVENT_READ,
- token.token(),
- );
- pipe.read_condition.notify_locked(token.token());
+ drop(vec);
+ drop(lock_token);
+ trigger_matching(&endpoint.pipe, true, false, EVENT_READ, token);
+ endpoint.pipe.read_condition.notify(token);
return Ok(bytes_written);
- } else if user_buf.is_empty() {
+ }
+
+ if user_buf.is_empty() {
return Ok(0);
}
if fcntl_flags & O_NONBLOCK as u32 != 0 {
return Err(Error::new(EAGAIN));
- } else if !pipe
+ }
+ if !endpoint
+ .pipe
.write_condition
- .wait(vec, "PipeWrite::write", &mut token)
+ .wait(vec, "PipeWrite::write", &mut lock_token)
{
return Err(Error::new(EINTR));
}
}
}
- fn kfpath(&self, _id: usize, buf: UserSliceWo, _token: &mut CleanLockToken) -> Result<usize> {
- //TODO: construct useful path?
- buf.copy_common_bytes_from_slice("/scheme/pipe/".as_bytes())
- }
- fn kfstat(&self, _id: usize, buf: UserSliceWo, _token: &mut CleanLockToken) -> Result<()> {
+
+ fn kfpath(&self, id: usize, buf: UserSliceWo, token: &mut CleanLockToken) -> Result<usize> {
+ let endpoint = Self::get_endpoint(id, token)?;
+ if let Some(named) = endpoint.named {
+ buf.copy_common_bytes_from_slice(named.path.as_bytes())
+ } else {
+ buf.copy_common_bytes_from_slice("/scheme/pipe/".as_bytes())
+ }
+ }
+
+ fn kfstat(&self, id: usize, buf: UserSliceWo, token: &mut CleanLockToken) -> Result<()> {
+ let endpoint = Self::get_endpoint(id, token)?;
+ let mode = endpoint.named.map_or(0o666, |named| named.mode);
+
buf.copy_exactly(&Stat {
- st_mode: MODE_FIFO | 0o666,
+ st_mode: MODE_FIFO | mode,
..Default::default()
})?;
Ok(())
}
+
fn kfdwrite(
&self,
id: usize,
@@ -385,23 +575,17 @@
_metadata: &[u64],
token: &mut CleanLockToken,
) -> Result<usize> {
- let (is_write_not_read, key) = from_raw_id(id);
-
- if !is_write_not_read {
+ let endpoint = Self::get_endpoint(id, token)?;
+
+ if !endpoint.kind.can_write() {
return Err(Error::new(EBADF));
}
- let pipe = match Self::get_pipe(key, token) {
- Ok(p) => p,
- Err(e) => {
- return Err(e);
- }
- };
loop {
- let vec = pipe.fd_queue.lock(token.token());
- let (mut vec, mut token) = vec.into_split();
-
- if !pipe.reader_is_alive.load(Ordering::Relaxed) {
+ let vec = endpoint.pipe.fd_queue.lock(token.token());
+ let (mut vec, mut lock_token) = vec.into_split();
+
+ if endpoint.pipe.reader_count.load(Ordering::Relaxed) == 0 {
return Err(Error::new(EPIPE));
}
if descs.is_empty() {
@@ -421,25 +605,24 @@
let fds_written = vec.len() - before_len;
if fds_written > 0 {
- event::trigger_locked(
- GlobalSchemes::Pipe.scheme_id(),
- key,
- EVENT_READ,
- token.token(),
- );
- pipe.read_condition.notify_locked(token.token());
+ drop(vec);
+ drop(lock_token);
+ trigger_matching(&endpoint.pipe, true, false, EVENT_READ, token);
+ endpoint.pipe.read_condition.notify(token);
return Ok(fds_written);
}
- if !pipe
+ if !endpoint
+ .pipe
.write_condition
- .wait(vec, "PipeWrite::write", &mut token)
+ .wait(vec, "PipeWrite::write", &mut lock_token)
{
return Err(Error::new(EINTR));
}
}
}
+
fn kfdread(
&self,
id: usize,
@@ -448,25 +631,19 @@
_metadata: &[u64],
token: &mut CleanLockToken,
) -> Result<usize> {
- let (is_write_not_read, key) = from_raw_id(id);
-
- if is_write_not_read {
+ let endpoint = Self::get_endpoint(id, token)?;
+
+ if !endpoint.kind.can_read() {
return Err(Error::new(EBADF));
}
- let pipe = match Self::get_pipe(key, token) {
- Ok(p) => p,
- Err(e) => {
- return Err(e);
- }
- };
if payload.is_empty() {
return Ok(0);
}
loop {
- let vec = pipe.fd_queue.lock(token.token());
- let (mut vec, mut token) = vec.into_split();
+ let vec = endpoint.pipe.fd_queue.lock(token.token());
+ let (mut vec, mut lock_token) = vec.into_split();
let fds_available = vec.len();
let max_fds_read = payload.len() / size_of::<usize>();
@@ -479,31 +656,33 @@
fds_to_transfer,
payload,
flags.contains(CallFlags::FD_CLOEXEC),
- &mut token,
+ &mut lock_token,
)?;
} else {
bulk_add_fds(
fds_to_transfer,
payload,
flags.contains(CallFlags::FD_CLOEXEC),
- &mut token,
+ &mut lock_token,
)?;
}
- event::trigger_locked(
- GlobalSchemes::Pipe.scheme_id(),
- key | WRITE_NOT_READ_BIT,
- EVENT_WRITE,
- token.token(),
- );
- pipe.write_condition.notify_locked(token.token());
+ drop(vec);
+ drop(lock_token);
+ trigger_matching(&endpoint.pipe, false, true, EVENT_WRITE, token);
+ endpoint.pipe.write_condition.notify(token);
return Ok(fds_to_read);
}
- if !pipe.writer_is_alive.load(Ordering::SeqCst) {
+ if endpoint.pipe.writer_count.load(Ordering::SeqCst) == 0 {
return Ok(0);
- } else if !pipe.read_condition.wait(vec, "PipeRead::read", &mut token) {
+ }
+ if !endpoint
+ .pipe
+ .read_condition
+ .wait(vec, "PipeRead::read", &mut lock_token)
+ {
return Err(Error::new(EINTR));
}
}
@@ -511,11 +690,23 @@
}
pub struct Pipe {
- read_condition: WaitCondition, // signals whether there are available bytes to read
- write_condition: WaitCondition, // signals whether there is room for additional bytes
+ read_condition: WaitCondition,
+ write_condition: WaitCondition,
queue: Mutex<L1, VecDeque<u8>>,
- reader_is_alive: AtomicBool, // starts set, unset when reader closes
- writer_is_alive: AtomicBool, // starts set, unset when writer closes
- has_run_dup: AtomicBool,
+ reader_count: AtomicUsize,
+ writer_count: AtomicUsize,
fd_queue: Mutex<L1, VecDeque<Arc<LockedFileDescription>>>,
}
+
+impl Pipe {
+ fn new() -> Self {
+ Self {
+ read_condition: WaitCondition::new(),
+ write_condition: WaitCondition::new(),
+ queue: Mutex::new(VecDeque::new()),
+ reader_count: AtomicUsize::new(0),
+ writer_count: AtomicUsize::new(0),
+ fd_queue: Mutex::new(VecDeque::new()),
+ }
+ }
+}
--- a/src/syscall/fs.rs
+++ b/src/syscall/fs.rs
@@ -2,7 +2,7 @@
use core::num::NonZeroUsize;
-use alloc::{string::String, sync::Arc, vec::Vec};
+use alloc::{format, string::{String, ToString}, sync::Arc, vec::Vec};
use redox_path::RedoxPath;
use crate::{
@@ -12,9 +12,9 @@
memory::{AddrSpace, GenericFlusher, Grant, PageSpan, TlbShootdownActions},
},
memory::{Page, VirtualAddress, PAGE_SIZE},
- scheme::{self, FileHandle, KernelScheme, OpenResult, StrOrBytes},
+ scheme::{self, pipe, FileHandle, KernelScheme, OpenResult, SchemeExt, StrOrBytes},
sync::{CleanLockToken, RwLock},
- syscall::{data::Stat, error::*, flag::*},
+ syscall::{data::{GlobalSchemes, Stat}, error::*, flag::*},
};
use super::usercopy::{UserSlice, UserSliceRo, UserSliceRw, UserSliceWo};
@@ -62,55 +62,29 @@
// TODO: Define elsewhere
const PATH_MAX: usize = PAGE_SIZE;
-pub fn openat(
- fh: FileHandle,
- raw_path: UserSliceRo,
+fn fifo_path_key(scheme_id: scheme::SchemeId, number: usize, path: &str) -> String {
+ if path.starts_with('/') {
+ path.to_string()
+ } else {
+ format!("@fifo:{}:{}:{}", scheme_id.get(), number, path)
+ }
+}
+
+fn install_open_result(
+ scheme_id: scheme::SchemeId,
flags: usize,
- fcntl_flags: u32,
- euid: u32,
- egid: u32,
+ open_result: OpenResult,
token: &mut CleanLockToken,
) -> Result<FileHandle> {
- let path_buf = copy_path_to_buf(raw_path, PATH_MAX)?;
-
- let (scheme_id, number) = {
- let current_lock = context::current();
- let mut current = current_lock.read(token.token());
- let (context, mut token) = current.token_split();
- let pipe = context.get_file(fh, &mut token).ok_or(Error::new(EBADF))?;
- let desc = pipe.description.read(token.token());
- (desc.scheme, desc.number)
- };
-
- let caller_ctx = context::current()
- .read(token.token())
- .caller_ctx()
- .filter_uid_gid(euid, egid);
-
- let new_description = {
- let scheme = scheme::get_scheme(token.token(), scheme_id)?;
-
- let res = scheme.kopenat(
+ let new_description = match open_result {
+ OpenResult::SchemeLocal(number, internal_flags) => Arc::new(RwLock::new(FileDescription {
+ offset: 0,
+ internal_flags,
+ scheme: scheme_id,
number,
- StrOrBytes::from_str(&path_buf),
- flags,
- fcntl_flags,
- caller_ctx,
- token,
- );
-
- match res? {
- OpenResult::SchemeLocal(number, internal_flags) => {
- Arc::new(RwLock::new(FileDescription {
- offset: 0,
- internal_flags,
- scheme: scheme_id,
- number,
- flags: (flags & !O_CLOEXEC) as u32,
- }))
- }
- OpenResult::External(desc) => desc,
- }
+ flags: (flags & !O_CLOEXEC) as u32,
+ })),
+ OpenResult::External(desc) => desc,
};
let current_lock = context::current();
@@ -126,6 +100,100 @@
)
.ok_or(Error::new(EMFILE))
}
+
+fn path_exists_in_scheme(
+ scheme: &dyn KernelScheme,
+ number: usize,
+ path: &str,
+ caller_ctx: scheme::CallerCtx,
+ token: &mut CleanLockToken,
+) -> Result<bool> {
+ match scheme.kopenat(number, StrOrBytes::from_str(path), O_STAT, 0, caller_ctx, token) {
+ Ok(OpenResult::SchemeLocal(number, _)) => {
+ let _ = scheme.close(number, token);
+ Ok(true)
+ }
+ Ok(OpenResult::External(_)) => Ok(true),
+ Err(err) if err.errno == ENOENT => Ok(false),
+ Err(err) => Err(err),
+ }
+}
+
+pub fn openat(
+ fh: FileHandle,
+ raw_path: UserSliceRo,
+ flags: usize,
+ fcntl_flags: u32,
+ euid: u32,
+ egid: u32,
+ token: &mut CleanLockToken,
+) -> Result<FileHandle> {
+ let path_buf = copy_path_to_buf(raw_path, PATH_MAX)?;
+
+ let (scheme_id, number) = {
+ let current_lock = context::current();
+ let mut current = current_lock.read(token.token());
+ let (context, mut token) = current.token_split();
+ let pipe = context.get_file(fh, &mut token).ok_or(Error::new(EBADF))?;
+ let desc = pipe.description.read(token.token());
+ (desc.scheme, desc.number)
+ };
+
+ let caller_ctx = context::current()
+ .read(token.token())
+ .caller_ctx()
+ .filter_uid_gid(euid, egid);
+
+ let fifo_mode_requested = flags & MODE_FIFO as usize == MODE_FIFO as usize;
+ let fifo_key = fifo_path_key(scheme_id, number, &path_buf);
+
+ if pipe::named_pipe_exists(&fifo_key, token) {
+ if flags & O_EXCL == O_EXCL && flags & O_CREAT == O_CREAT {
+ return Err(Error::new(EEXIST));
+ }
+ if fifo_mode_requested && flags & O_CREAT == O_CREAT {
+ return Err(Error::new(EEXIST));
+ }
+
+ let pipe_number = pipe::open_named_pipe(&fifo_key, flags, token)?.ok_or(Error::new(ENOENT))?;
+ return install_open_result(
+ GlobalSchemes::Pipe.scheme_id(),
+ flags,
+ OpenResult::SchemeLocal(pipe_number, InternalFlags::empty()),
+ token,
+ );
+ }
+
+ let scheme = scheme::get_scheme(token.token(), scheme_id)?;
+
+ if fifo_mode_requested && flags & O_CREAT == O_CREAT {
+ if path_exists_in_scheme(&*scheme, number, &path_buf, caller_ctx, token)? {
+ return Err(Error::new(EEXIST));
+ }
+
+ let mode = u16::try_from(flags & 0o7777).map_err(|_| Error::new(EINVAL))?;
+ let pipe_number =
+ pipe::create_named_pipe(&fifo_key, &path_buf, mode, flags, token)?;
+
+ return install_open_result(
+ GlobalSchemes::Pipe.scheme_id(),
+ flags,
+ OpenResult::SchemeLocal(pipe_number, InternalFlags::empty()),
+ token,
+ );
+ }
+
+ let open_result = scheme.kopenat(
+ number,
+ StrOrBytes::from_str(&path_buf),
+ flags,
+ fcntl_flags,
+ caller_ctx,
+ token,
+ )?;
+
+ install_open_result(scheme_id, flags, open_result, token)
+}
/// Unlinkat syscall
pub fn unlinkat(
fh: FileHandle,
@@ -152,6 +220,10 @@
.read(token.token())
.caller_ctx()
.filter_uid_gid(euid, egid);
+
+ if pipe::unlink_named_pipe(&fifo_path_key(scheme_id, number, &path_buf), token) {
+ return Ok(());
+ }
/*
let mut path_buf = BorrowedHtBuf::head()?;