diff --git a/common/src/lib.rs b/common/src/lib.rs index 78ea10b..516ecf7 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -17,6 +17,7 @@ pub mod bitpack; pub mod controlreg; pub mod elfloader; pub mod ipaddr; +pub mod pipe; pub mod sendable; pub mod util; pub mod vhost; diff --git a/common/src/pipe.rs b/common/src/pipe.rs new file mode 100644 index 0000000..9c45aa0 --- /dev/null +++ b/common/src/pipe.rs @@ -0,0 +1,26 @@ +//! # arca-pipe +//! +//! A `no_std`-compatible, lock-free bidirectional byte pipe built from two +//! single-producer, single-consumer (SPSC) ring buffers over shared memory. +//! +//! This is the lowest-level transport primitive in arca-networking — both the +//! control protocol and per-connection data streams are built on top of it. +//! +//! The pipe is a raw byte stream with no message framing. Higher layers +//! (control protocol, data protocol) add their own framing on top. + +mod bidirectional_pipe; +mod error; +mod ring; +mod ring_consumer; +mod ring_producer; +mod shared_memory_region; +mod traits; + +pub use bidirectional_pipe::{BidirectionalPipe, Side}; +pub use error::PipeError; +pub use ring::{RingData, RingHeader}; +pub use ring_consumer::RingConsumer; +pub use ring_producer::RingProducer; +pub use shared_memory_region::SharedMemoryRegion; +pub use traits::{Read, Write}; diff --git a/common/src/pipe/Pipe.md b/common/src/pipe/Pipe.md new file mode 100644 index 0000000..74c60a6 --- /dev/null +++ b/common/src/pipe/Pipe.md @@ -0,0 +1,116 @@ +# Arca Data Pipe Protocol + +This doc specifies the **data pipe**: how bytes move between Arca and +the Linux monitor for an established connection. The control protocol +(`PROTOCOL.md`) handles session setup and hands off a `DataPipeInfo` +(SHM handle + ring size); this doc covers everything after that handoff. + +--- + +## 1. Overview + +Each connection gets its own **bidirectional pipe**, a pair of +single-producer/single-consumer (SPSC) ring buffers in a shared-memory +region. One ring carries bytes Arca→Linux (outgoing), the other Linux→Arca +(incoming). The monitor relays bytes between the ring and the kernel +`TcpStream`; Arca reads and writes through `SyncStream` / `AsyncStream`. + +``` + Arca shared memory Monitor + ──── ───────────── ─────── + SyncStream ──write──► Ring A (A→B) ──read──► pipe_to_tcp → TcpStream + SyncStream ◄──read── Ring B (B→A) ◄──write── tcp_to_pipe ← TcpStream +``` + +--- + +## 2. Ring layer (`arca-pipe`) + +### 2.1 Ring buffer + +Each ring is a fixed-capacity byte buffer in shared memory with a header +and a data region: + +``` +[RingHeader: 24 bytes][Data: ring_size bytes] +``` + +`RingHeader` (stored in shared memory, all fields atomic): + +``` +read_cursor: AtomicU64 — monotonically increasing logical read position +write_cursor: AtomicU64 — monotonically increasing logical write position +writer_closed: AtomicBool — set by the producer when it will write no more +reader_closed: AtomicBool — set by the consumer when it will read no more +``` + +Physical position is `cursor % ring_size`. Cursors never reset; wrapping +arithmetic handles overflow. + +### 2.2 Read and write + +Both operations are **non-blocking**, return immediately if the ring +is full or empty. + +**`RingProducer::write(buf)`** +- Writes `min(buf.len(), free_space)` bytes into the ring. +- Returns the number of bytes actually written OR returns `WouldBlock` + if the ring is full (`free_space == 0`). + +**`RingConsumer::read(buf)`** +- Reads `min(buf.len(), used_space)` bytes from the ring. +- Returns the number of bytes actually read OR returns `WouldBlock` if the + ring is empty (`used_space == 0`). + +Callers that need blocking behavior loop on `WouldBlock` (see §3). + +### 2.3 Close flags + +Each ring has two close flags in its header, set independently by each end: + +| Flag | Set by | Meaning | +| --------------- | -------- | -------------------------------------------- | +| `writer_closed` | Producer | No more bytes will be written to this ring. | +| `reader_closed` | Consumer | No more bytes will be read from this ring. | + +A ring is **closed** when both flags are set. A `BidirectionalPipe` is +**fully closed** when both of its rings are closed (all four flags set). + +### 2.4 `BidirectionalPipe` layout and API + +Total shared-memory size for one pipe: + +``` +required_size(ring_size) = 2 × (24 + ring_size) bytes +``` + +Layout: `[HeaderA][DataA: ring_size][HeaderB][DataB: ring_size]` + +Side A's producer writes to Ring A; Side B's consumer reads from Ring A, +and vice versa for Ring B. + +**Close API on `BidirectionalPipe`:** + +| Method | What it does | +| ------------------------ | ---------------------------------------------------------- | +| `close_write()` | Sets `writer_closed` on the outgoing ring. | +| `close_read()` | Sets `reader_closed` on the incoming ring. | +| `is_peer_write_closed()` | Reads `writer_closed` on the incoming ring (peer set it). | +| `is_peer_read_closed()` | Reads `reader_closed` on the outgoing ring (peer set it). | +| `is_closed()` | True when all four flags across both rings are set. | + +--- + +## 3. Where the code lives + +``` +arca/common/src/ +├── pipe/ # ring buffers, BidirectionalPipe +│ └── src/ +│ ├── Pipe.md +│ ├── traits.rs # Read, Write traits; Write::write_all default +│ ├── ring.rs # RingHeader (cursors + close flags), RingData +│ ├── ring_producer.rs# RingProducer — write, close_writer, is_reader_closed +│ ├── ring_consumer.rs# RingConsumer — read, close_reader, is_writer_closed +│ └── bidirectional_pipe.rs # BidirectionalPipe — close_write/read, is_closed +``` diff --git a/common/src/pipe/bidirectional_pipe.rs b/common/src/pipe/bidirectional_pipe.rs new file mode 100644 index 0000000..d49f9a7 --- /dev/null +++ b/common/src/pipe/bidirectional_pipe.rs @@ -0,0 +1,205 @@ +use crate::pipe::error::PipeError; +use crate::pipe::ring::{RingData, RingHeader}; +use crate::pipe::ring_consumer::RingConsumer; +use crate::pipe::ring_producer::RingProducer; +use crate::pipe::shared_memory_region::SharedMemoryRegion; +use crate::pipe::traits; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum Side { + A, + B, +} + +/// One endpoint of a bidirectional pipe. +/// +/// Memory layout: `[HeaderA][Ring A->B data][HeaderB][Ring B->A data]`. +pub struct BidirectionalPipe<'a> { + writer: RingProducer<'a>, + reader: RingConsumer<'a>, +} + +const HEADER_SIZE: u64 = core::mem::size_of::() as u64; + +impl<'a> BidirectionalPipe<'a> { + /// Total bytes of shared memory needed for a given `ring_size`. + pub const fn required_size(ring_size: u64) -> u64 { + 2 * (HEADER_SIZE + ring_size) + } + + /// Create a pipe endpoint over a shared memory region. + /// + /// Caller must ensure the region is zero-initialized before the first side + /// is constructed, and that exactly one `Side::A` and one `Side::B` are + /// created per region. + pub fn new(region: &'a SharedMemoryRegion, ring_size: u64, side: Side) -> Self { + assert!(region.len() >= Self::required_size(ring_size)); + assert!( + ring_size.is_multiple_of(core::mem::align_of::() as u64), + "ring_size must be a multiple of 8 for header alignment" + ); + let base = region.as_ptr(); + assert!( + base.align_offset(core::mem::align_of::()) == 0, + "shared memory region must be 8-byte aligned" + ); + + // Layout: [HeaderA (16)] [DataA (ring_size)] [HeaderB (16)] [DataB (ring_size)] + // Interleaved so each header is adjacent to its data (cache locality) + // and headers are separated by ring_size (avoids false sharing). + let header_a = unsafe { &*(base as *const RingHeader) }; + let data_a = unsafe { base.add(HEADER_SIZE as usize) }; + let header_b = unsafe { &*(data_a.add(ring_size as usize) as *const RingHeader) }; + let data_b = unsafe { data_a.add(ring_size as usize + HEADER_SIZE as usize) }; + + let (writer_header, writer_data, reader_header, reader_data) = match side { + Side::A => (header_a, data_a, header_b, data_b), + Side::B => (header_b, data_b, header_a, data_a), + }; + + let writer = RingProducer::new(writer_header, unsafe { + RingData::new(writer_data, ring_size) + }); + let reader = RingConsumer::new(reader_header, unsafe { + RingData::new(reader_data, ring_size) + }); + Self { writer, reader } + } + + /// Split into independent read and write halves (like `TcpStream::split`). + pub fn split(&mut self) -> (&mut RingConsumer<'a>, &mut RingProducer<'a>) { + (&mut self.reader, &mut self.writer) + } + + /// Close this side's outgoing (write) direction. + pub fn close_write(&self) { + self.writer.close_writer(); + } + + /// Close this side's incoming (read) direction. + pub fn close_read(&self) { + self.reader.close_reader(); + } + + /// True if the peer has closed their write side (no more data incoming). + pub fn is_peer_write_closed(&self) -> bool { + self.reader.is_writer_closed() + } + + /// True if the peer has closed their read side (they will not read more data we send). + pub fn is_peer_read_closed(&self) -> bool { + self.writer.is_reader_closed() + } + + /// True when both unidirectional rings are fully closed (all four flags set). + pub fn is_closed(&self) -> bool { + self.writer.is_closed() && self.reader.is_closed() + } +} + +impl<'a> traits::Read for BidirectionalPipe<'a> { + fn read(&mut self, buf: &mut [u8]) -> Result { + self.reader.read(buf) + } +} + +impl<'a> traits::Write for BidirectionalPipe<'a> { + fn write(&mut self, buf: &[u8]) -> Result { + self.writer.write(buf) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::traits::{Read, Write}; + + #[repr(align(8))] + struct Aligned([u8; N]); + + macro_rules! pipe_pair { + ($ring:expr, $mem:ident, $a:ident, $b:ident) => { + let mut $mem = Aligned([0u8; BidirectionalPipe::required_size($ring as u64) as usize]); + let region = + unsafe { SharedMemoryRegion::from_raw($mem.0.as_mut_ptr(), $mem.0.len() as u64) }; + let mut $a = BidirectionalPipe::new(®ion, $ring, Side::A); + let mut $b = BidirectionalPipe::new(®ion, $ring, Side::B); + }; + } + + #[test] + fn required_size_matches_layout() { + assert_eq!(BidirectionalPipe::required_size(64), 2 * (24 + 64)); + } + + #[test] + fn round_trip_a_to_b() { + pipe_pair!(64, mem, a, b); + assert_eq!(a.write(b"ping").unwrap(), 4); + let mut out = [0u8; 4]; + assert_eq!(b.read(&mut out).unwrap(), 4); + assert_eq!(&out, b"ping"); + } + + #[test] + fn round_trip_b_to_a() { + pipe_pair!(32, mem, a, b); + assert_eq!(b.write(b"pong!!").unwrap(), 6); + let mut out = [0u8; 6]; + assert_eq!(a.read(&mut out).unwrap(), 6); + assert_eq!(&out, b"pong!!"); + } + + #[test] + fn both_directions_independent() { + pipe_pair!(32, mem, a, b); + a.write(b"hello").unwrap(); + b.write(b"world").unwrap(); + + let mut from_a = [0u8; 5]; + let mut from_b = [0u8; 5]; + b.read(&mut from_a).unwrap(); + a.read(&mut from_b).unwrap(); + assert_eq!(&from_a, b"hello"); + assert_eq!(&from_b, b"world"); + } + + #[test] + fn multi_lap() { + pipe_pair!(8, mem, a, b); + for i in 0u8..20 { + assert_eq!(a.write(&[i]).unwrap(), 1); + let mut out = [0u8; 1]; + assert_eq!(b.read(&mut out).unwrap(), 1); + assert_eq!(out[0], i); + } + } + + #[test] + fn fill_drain_refill() { + pipe_pair!(8, mem, a, b); + assert_eq!(a.write(b"12345678").unwrap(), 8); + let mut out = [0u8; 8]; + assert_eq!(b.read(&mut out).unwrap(), 8); + assert_eq!(&out, b"12345678"); + + assert_eq!(a.write(b"abcdefgh").unwrap(), 8); + assert_eq!(b.read(&mut out).unwrap(), 8); + assert_eq!(&out, b"abcdefgh"); + } + + #[test] + fn interleaved_both_directions() { + pipe_pair!(16, mem, a, b); + a.write(b"aa").unwrap(); + b.write(b"bb").unwrap(); + a.write(b"cc").unwrap(); + b.write(b"dd").unwrap(); + + let mut out = [0u8; 4]; + b.read(&mut out).unwrap(); + assert_eq!(&out, b"aacc"); + a.read(&mut out).unwrap(); + assert_eq!(&out, b"bbdd"); + } +} diff --git a/common/src/pipe/error.rs b/common/src/pipe/error.rs new file mode 100644 index 0000000..82a381c --- /dev/null +++ b/common/src/pipe/error.rs @@ -0,0 +1,16 @@ +use core::fmt; + +/// Errors returned by pipe operations. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum PipeError { + /// Ring buffer is empty (read) or full (write). Try again later. + WouldBlock, +} + +impl fmt::Display for PipeError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + PipeError::WouldBlock => write!(f, "operation would block"), + } + } +} diff --git a/common/src/pipe/ring.rs b/common/src/pipe/ring.rs new file mode 100644 index 0000000..1f98082 --- /dev/null +++ b/common/src/pipe/ring.rs @@ -0,0 +1,157 @@ +use core::sync::atomic::{AtomicBool, AtomicU64, Ordering}; + +/// Header for a single SPSC ring buffer, stored in shared memory. +/// +/// Cursors are monotonically increasing logical offsets. Physical positions +/// are `cursor % ring_size`. The close flags signal orderly shutdown: the +/// producer sets `writer_closed`; the consumer sets `reader_closed`. +#[repr(C)] +pub struct RingHeader { + pub read_cursor: AtomicU64, + pub write_cursor: AtomicU64, + pub writer_closed: AtomicBool, + pub reader_closed: AtomicBool, +} + +impl RingHeader { + /// Bytes available to read. Called by the consumer. + pub fn used_space(&self) -> u64 { + let write = self.write_cursor.load(Ordering::Acquire); + let read = self.read_cursor.load(Ordering::Relaxed); + + // Cursors are monotonically increasing and never reset, but they can + // overflow u64. wrapping_sub gives the correct delta regardless, + // also avoids panic on debug-mode subtraction overflow + write.wrapping_sub(read) + } + + /// Bytes available to write. Called by the producer. + pub fn free_space(&self, capacity: u64) -> u64 { + let write = self.write_cursor.load(Ordering::Relaxed); + let read = self.read_cursor.load(Ordering::Acquire); + + // See used_space — wrapping_sub handles cursor overflow correctly + capacity - write.wrapping_sub(read) + } +} + +/// Raw data region of a single SPSC ring buffer. +/// +/// Owns `(ptr, size)` together so call sites don't juggle them. +/// Wrap-around is handled inside `write_at` / `read_at`. +pub struct RingData { + ptr: *mut u8, + size: u64, +} + +impl RingData { + /// # Safety + /// - `ptr` must point to a valid region of `size` bytes. + /// - Caller must guarantee SPSC discipline on top of this region. + pub unsafe fn new(ptr: *mut u8, size: u64) -> Self { + Self { ptr, size } + } + + pub fn size(&self) -> u64 { + self.size + } + + /// Write `buf` starting at physical offset `cursor % size`, wrapping if needed. + /// Caller must ensure `buf.len() <= free space`. + pub fn write_at(&mut self, cursor: u64, buf: &[u8]) { + let size = self.size as usize; + let offset = (cursor % self.size) as usize; + let first = core::cmp::min(buf.len(), size - offset); + unsafe { + core::ptr::copy_nonoverlapping(buf.as_ptr(), self.ptr.add(offset), first); + if buf.len() > first { + core::ptr::copy_nonoverlapping( + buf.as_ptr().add(first), + self.ptr, + buf.len() - first, + ); + } + } + } + + /// Read into `buf` starting at physical offset `cursor % size`, wrapping if needed. + /// Caller must ensure `buf.len() <= used space`. + pub fn read_at(&self, cursor: u64, buf: &mut [u8]) { + let size = self.size as usize; + let offset = (cursor % self.size) as usize; + let first = core::cmp::min(buf.len(), size - offset); + unsafe { + core::ptr::copy_nonoverlapping(self.ptr.add(offset), buf.as_mut_ptr(), first); + if buf.len() > first { + core::ptr::copy_nonoverlapping( + self.ptr, + buf.as_mut_ptr().add(first), + buf.len() - first, + ); + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn header_with(read: u64, write: u64) -> RingHeader { + RingHeader { + read_cursor: AtomicU64::new(read), + write_cursor: AtomicU64::new(write), + writer_closed: AtomicBool::new(false), + reader_closed: AtomicBool::new(false), + } + } + + #[test] + fn empty_ring() { + let h = header_with(0, 0); + assert_eq!(h.used_space(), 0); + assert_eq!(h.free_space(64), 64); + } + + #[test] + fn partial_fill() { + let h = header_with(10, 40); + assert_eq!(h.used_space(), 30); + assert_eq!(h.free_space(64), 34); + } + + #[test] + fn full_ring() { + let h = header_with(100, 164); + assert_eq!(h.used_space(), 64); + assert_eq!(h.free_space(64), 0); + } + + #[test] + fn data_write_then_read_no_wrap() { + let mut mem = [0u8; 8]; + let mut rd = unsafe { RingData::new(mem.as_mut_ptr(), 8) }; + rd.write_at(0, b"abcd"); + let mut out = [0u8; 4]; + rd.read_at(0, &mut out); + assert_eq!(&out, b"abcd"); + } + + #[test] + fn data_write_wraps() { + let mut mem = [0u8; 8]; + let mut rd = unsafe { RingData::new(mem.as_mut_ptr(), 8) }; + rd.write_at(6, b"XYZW"); + assert_eq!(&mem[6..8], b"XY"); + assert_eq!(&mem[..2], b"ZW"); + } + + #[test] + fn data_read_wraps() { + let mut mem = *b"cdEFabXY"; + let rd = unsafe { RingData::new(mem.as_mut_ptr(), 8) }; + let mut out = [0u8; 6]; + rd.read_at(6, &mut out); + assert_eq!(&out, b"XYcdEF"); + } +} diff --git a/common/src/pipe/ring_consumer.rs b/common/src/pipe/ring_consumer.rs new file mode 100644 index 0000000..ed2a087 --- /dev/null +++ b/common/src/pipe/ring_consumer.rs @@ -0,0 +1,137 @@ +use crate::pipe::error::PipeError; +use crate::pipe::ring::{RingData, RingHeader}; +use crate::pipe::traits; +use core::sync::atomic::Ordering; + +/// Consumer (read) end of a single SPSC ring buffer. +pub struct RingConsumer<'a> { + header: &'a RingHeader, + data: RingData, +} + +impl<'a> RingConsumer<'a> { + pub fn new(header: &'a RingHeader, data: RingData) -> Self { + Self { header, data } + } + + /// Signal that this consumer will read no more bytes. + pub fn close_reader(&self) { + self.header.reader_closed.store(true, Ordering::Release); + } + + /// True if the producer has closed its write end. + pub fn is_writer_closed(&self) -> bool { + self.header.writer_closed.load(Ordering::Acquire) + } + + /// True when both ends of this ring are closed. + pub fn is_closed(&self) -> bool { + self.header.writer_closed.load(Ordering::Acquire) + && self.header.reader_closed.load(Ordering::Acquire) + } +} + +impl<'a> traits::Read for RingConsumer<'a> { + fn read(&mut self, buf: &mut [u8]) -> Result { + if buf.is_empty() { + return Ok(0); + } + let used = self.header.used_space(); + if used == 0 { + return Err(PipeError::WouldBlock); + } + + let n = core::cmp::min(buf.len() as u64, used) as usize; + let cursor = self.header.read_cursor.load(Ordering::Relaxed); + self.data.read_at(cursor, &mut buf[..n]); + + // No standalone fence needed, release on the store guarantees the + // preceding read_at is visible before the cursor update + self.header + .read_cursor + .store(cursor + n as u64, Ordering::Release); + Ok(n) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::pipe::traits::Read; + use core::sync::atomic::AtomicU64; + + fn header(read: u64, write: u64) -> RingHeader { + use core::sync::atomic::AtomicBool; + RingHeader { + read_cursor: AtomicU64::new(read), + write_cursor: AtomicU64::new(write), + writer_closed: AtomicBool::new(false), + reader_closed: AtomicBool::new(false), + } + } + + #[test] + fn simple_read() { + let mut mem = *b"hello..."; + let h = header(0, 5); + let data = unsafe { RingData::new(mem.as_mut_ptr(), 8) }; + let mut c = RingConsumer::new(&h, data); + let mut out = [0u8; 8]; + assert_eq!(c.read(&mut out).unwrap(), 5); + assert_eq!(&out[..5], b"hello"); + assert_eq!(h.read_cursor.load(Ordering::Relaxed), 5); + } + + #[test] + fn partial_read() { + let mut mem = *b"abcdefgh"; + let h = header(0, 8); + let data = unsafe { RingData::new(mem.as_mut_ptr(), 8) }; + let mut c = RingConsumer::new(&h, data); + let mut out = [0u8; 3]; + assert_eq!(c.read(&mut out).unwrap(), 3); + assert_eq!(&out, b"abc"); + } + + #[test] + fn wrap_around() { + let mut mem = *b"WXYZabcd"; + let h = header(5, 12); + let data = unsafe { RingData::new(mem.as_mut_ptr(), 8) }; + let mut c = RingConsumer::new(&h, data); + let mut out = [0u8; 8]; + assert_eq!(c.read(&mut out).unwrap(), 7); + assert_eq!(&out[..7], b"bcdWXYZ"); + } + + #[test] + fn empty_ring_blocks() { + let mut mem = [0u8; 4]; + let h = header(4, 4); + let data = unsafe { RingData::new(mem.as_mut_ptr(), 4) }; + let mut c = RingConsumer::new(&h, data); + let mut out = [0u8; 4]; + assert!(matches!(c.read(&mut out), Err(PipeError::WouldBlock))); + } + + #[test] + fn zero_length_read_non_empty() { + let mut mem = *b"data"; + let h = header(0, 4); + let data = unsafe { RingData::new(mem.as_mut_ptr(), 4) }; + let mut c = RingConsumer::new(&h, data); + let mut out = [0u8; 0]; + assert_eq!(c.read(&mut out).unwrap(), 0); + assert_eq!(h.read_cursor.load(Ordering::Relaxed), 0); + } + + #[test] + fn zero_length_read_empty() { + let mut mem = [0u8; 4]; + let h = header(0, 0); + let data = unsafe { RingData::new(mem.as_mut_ptr(), 4) }; + let mut c = RingConsumer::new(&h, data); + let mut out = [0u8; 0]; + assert_eq!(c.read(&mut out).unwrap(), 0); + } +} diff --git a/common/src/pipe/ring_producer.rs b/common/src/pipe/ring_producer.rs new file mode 100644 index 0000000..a6c0da9 --- /dev/null +++ b/common/src/pipe/ring_producer.rs @@ -0,0 +1,174 @@ +use crate::pipe::error::PipeError; +use crate::pipe::ring::{RingData, RingHeader}; +use crate::pipe::traits; +use core::sync::atomic::Ordering; + +/// Producer (write) end of a single SPSC ring buffer. +pub struct RingProducer<'a> { + header: &'a RingHeader, + data: RingData, +} + +impl<'a> RingProducer<'a> { + pub fn new(header: &'a RingHeader, data: RingData) -> Self { + Self { header, data } + } + + /// Bytes written by this producer that the consumer has not yet read. + /// Uses Acquire on read_cursor so this can be called cross-thread safely. + pub fn bytes_pending(&self) -> u64 { + let write = self.header.write_cursor.load(Ordering::Relaxed); + let read = self.header.read_cursor.load(Ordering::Acquire); + write.wrapping_sub(read) + } + + /// Signal that this producer will write no more bytes. + pub fn close_writer(&self) { + self.header.writer_closed.store(true, Ordering::Release); + } + + /// True if the consumer has closed its read end. + pub fn is_reader_closed(&self) -> bool { + self.header.reader_closed.load(Ordering::Acquire) + } + + /// True when both ends of this ring are closed. + pub fn is_closed(&self) -> bool { + self.header.writer_closed.load(Ordering::Acquire) + && self.header.reader_closed.load(Ordering::Acquire) + } +} + +impl<'a> traits::Write for RingProducer<'a> { + fn write(&mut self, buf: &[u8]) -> Result { + if buf.is_empty() { + return Ok(0); + } + let free = self.header.free_space(self.data.size()); + if free == 0 { + return Err(PipeError::WouldBlock); + } + + let n = core::cmp::min(buf.len() as u64, free) as usize; + let cursor = self.header.write_cursor.load(Ordering::Relaxed); + self.data.write_at(cursor, &buf[..n]); + + // No standalone fence needed, release on the store guarantees the + // preceding write_at is visible before the cursor update + self.header + .write_cursor + .store(cursor + n as u64, Ordering::Release); + Ok(n) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::pipe::traits::Write; + use core::sync::atomic::AtomicU64; + + fn header() -> RingHeader { + use core::sync::atomic::AtomicBool; + RingHeader { + read_cursor: AtomicU64::new(0), + write_cursor: AtomicU64::new(0), + writer_closed: AtomicBool::new(false), + reader_closed: AtomicBool::new(false), + } + } + + #[test] + fn simple_write() { + let h = header(); + let mut mem = [0u8; 16]; + let data = unsafe { RingData::new(mem.as_mut_ptr(), 16) }; + let mut p = RingProducer::new(&h, data); + assert_eq!(p.write(b"hello").unwrap(), 5); + assert_eq!(&mem[..5], b"hello"); + assert_eq!(h.write_cursor.load(Ordering::Relaxed), 5); + } + + #[test] + fn fill_to_full() { + let h = header(); + let mut mem = [0u8; 8]; + let data = unsafe { RingData::new(mem.as_mut_ptr(), 8) }; + let mut p = RingProducer::new(&h, data); + assert_eq!(p.write(b"abcdefghij").unwrap(), 8); + assert_eq!(&mem, b"abcdefgh"); + } + + #[test] + fn wrap_around() { + let h = header(); + h.read_cursor.store(5, Ordering::Relaxed); + h.write_cursor.store(5, Ordering::Relaxed); + let mut mem = [0u8; 8]; + let data = unsafe { RingData::new(mem.as_mut_ptr(), 8) }; + let mut p = RingProducer::new(&h, data); + assert_eq!(p.write(b"XYZW").unwrap(), 4); + assert_eq!(&mem[5..8], b"XYZ"); + assert_eq!(&mem[..1], b"W"); + } + + #[test] + fn full_ring_blocks() { + let h = header(); + h.write_cursor.store(4, Ordering::Relaxed); + let mut mem = [0u8; 4]; + let data = unsafe { RingData::new(mem.as_mut_ptr(), 4) }; + let mut p = RingProducer::new(&h, data); + assert!(matches!(p.write(b"x"), Err(PipeError::WouldBlock))); + } + + #[test] + fn zero_length_write_non_full() { + let h = header(); + let mut mem = [0u8; 8]; + let data = unsafe { RingData::new(mem.as_mut_ptr(), 8) }; + let mut p = RingProducer::new(&h, data); + assert_eq!(p.write(b"").unwrap(), 0); + assert_eq!(h.write_cursor.load(Ordering::Relaxed), 0); + } + + #[test] + fn zero_length_write_full() { + let h = header(); + h.write_cursor.store(8, Ordering::Relaxed); + let mut mem = [0u8; 8]; + let data = unsafe { RingData::new(mem.as_mut_ptr(), 8) }; + let mut p = RingProducer::new(&h, data); + assert_eq!(p.write(b"").unwrap(), 0); + } + + #[test] + fn bytes_pending_empty() { + let h = header(); + let mut mem = [0u8; 8]; + let data = unsafe { RingData::new(mem.as_mut_ptr(), 8) }; + let p = RingProducer::new(&h, data); + assert_eq!(p.bytes_pending(), 0); + } + + #[test] + fn bytes_pending_after_write() { + let h = header(); + let mut mem = [0u8; 8]; + let data = unsafe { RingData::new(mem.as_mut_ptr(), 8) }; + let mut p = RingProducer::new(&h, data); + p.write(b"hello").unwrap(); + assert_eq!(p.bytes_pending(), 5); + } + + #[test] + fn bytes_pending_zero_after_full_read() { + let h = header(); + let mut mem = [0u8; 8]; + let data = unsafe { RingData::new(mem.as_mut_ptr(), 8) }; + let mut p = RingProducer::new(&h, data); + p.write(b"hello").unwrap(); + h.read_cursor.store(5, Ordering::Release); + assert_eq!(p.bytes_pending(), 0); + } +} diff --git a/common/src/pipe/shared_memory_region.rs b/common/src/pipe/shared_memory_region.rs new file mode 100644 index 0000000..4b9ec64 --- /dev/null +++ b/common/src/pipe/shared_memory_region.rs @@ -0,0 +1,66 @@ +/// Owns a reference to a shared memory region. +/// +/// Instead of exposing raw pointers and `unsafe` at every call site, we wrap +/// the shared memory region in a type that guarantees validity — pushing the +/// `unsafe` into a single place. After constructing a `SharedMemoryRegion`, +/// all pipe construction and usage is safe. +/// +/// How the shared memory pointer is obtained (hypervisor mapping, POSIX shm, +/// etc.) is outside this type's scope — we assume both sides have a way to +/// get it. +pub struct SharedMemoryRegion { + ptr: *mut u8, + len: u64, +} + +impl SharedMemoryRegion { + /// Create a new shared memory region from a raw pointer. + /// + /// This is the one and only unsafe entry point for the pipe library. + /// + /// # Safety + /// - `ptr` must point to a valid, read-write memory region of at least `len` bytes. + /// - The memory must remain valid for the lifetime of this `SharedMemoryRegion`. + /// - The memory must be shared between both sides of the pipe (e.g. via + /// hypervisor page mapping or POSIX shared memory). + /// - The memory must be zero-initialized before the first pipe is created from it. + pub unsafe fn from_raw(ptr: *mut u8, len: u64) -> Self { + Self { ptr, len } + } + + /// Returns a raw pointer to the start of the shared memory region. + pub fn as_ptr(&self) -> *mut u8 { + self.ptr + } + + /// Returns the length of the shared memory region in bytes. + pub fn len(&self) -> u64 { + self.len + } + + /// Returns true if the shared memory region has zero length. + pub fn is_empty(&self) -> bool { + self.len == 0 + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn from_raw_stores_ptr_and_len() { + let mut buf = [0u8; 16]; + let region = unsafe { SharedMemoryRegion::from_raw(buf.as_mut_ptr(), buf.len() as u64) }; + assert_eq!(region.as_ptr(), buf.as_mut_ptr()); + assert_eq!(region.len(), 16); + assert!(!region.is_empty()); + } + + #[test] + fn zero_length_is_empty() { + let region = unsafe { SharedMemoryRegion::from_raw(core::ptr::null_mut(), 0) }; + assert_eq!(region.len(), 0); + assert!(region.is_empty()); + } +} diff --git a/common/src/pipe/traits.rs b/common/src/pipe/traits.rs new file mode 100644 index 0000000..2cc11e5 --- /dev/null +++ b/common/src/pipe/traits.rs @@ -0,0 +1,35 @@ +use crate::pipe::error::PipeError; + +/// Read bytes from a pipe. Analogous to std::io::Read. +/// +/// Partial reads are normal — `read` may return fewer bytes than `buf.len()`. +/// The caller loops if it needs more. This matches `std::io` semantics. +pub trait Read { + /// Try to read bytes into `buf`. + /// + /// Returns `Ok(n)` where `n > 0` is the number of bytes read, + /// or `Err(WouldBlock)` if no data is currently available. + fn read(&mut self, buf: &mut [u8]) -> Result; +} + +/// Write bytes to a pipe. Analogous to std::io::Write. +/// +/// Partial writes are normal — `write` may accept fewer bytes than `buf.len()`. +/// The caller loops if it needs to write more. This matches `std::io` semantics. +pub trait Write { + /// Try to write bytes from `buf`. + /// + /// Returns `Ok(n)` where `n > 0` is the number of bytes written, + /// or `Err(WouldBlock)` if the ring is currently full. + fn write(&mut self, buf: &[u8]) -> Result; + + /// Write all bytes in `src`, spinning on `WouldBlock` until every byte is written. + fn write_all(&mut self, mut src: &[u8]) { + while !src.is_empty() { + match self.write(src) { + Ok(n) => src = &src[n..], + Err(PipeError::WouldBlock) => core::hint::spin_loop(), + } + } + } +}