Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub mod bitpack;
pub mod controlreg;
pub mod elfloader;
pub mod ipaddr;
pub mod pipe;
pub mod sendable;
pub mod util;
pub mod vhost;
Expand Down
26 changes: 26 additions & 0 deletions common/src/pipe.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
//! # arca-pipe
//!
//! A `no_std`-compatible, lock-free bidirectional byte pipe built from two
//! single-producer, single-consumer (SPSC) ring buffers over shared memory.
//!
//! This is the lowest-level transport primitive in arca-networking — both the
//! control protocol and per-connection data streams are built on top of it.
//!
//! The pipe is a raw byte stream with no message framing. Higher layers
//! (control protocol, data protocol) add their own framing on top.

mod bidirectional_pipe;
mod error;
mod ring;
mod ring_consumer;
mod ring_producer;
mod shared_memory_region;
mod traits;

pub use bidirectional_pipe::{BidirectionalPipe, Side};
pub use error::PipeError;
pub use ring::{RingData, RingHeader};
pub use ring_consumer::RingConsumer;
pub use ring_producer::RingProducer;
pub use shared_memory_region::SharedMemoryRegion;
pub use traits::{Read, Write};
116 changes: 116 additions & 0 deletions common/src/pipe/Pipe.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
# Arca Data Pipe Protocol

This doc specifies the **data pipe**: how bytes move between Arca and
the Linux monitor for an established connection. The control protocol
(`PROTOCOL.md`) handles session setup and hands off a `DataPipeInfo`
(SHM handle + ring size); this doc covers everything after that handoff.

---

## 1. Overview

Each connection gets its own **bidirectional pipe**, a pair of
single-producer/single-consumer (SPSC) ring buffers in a shared-memory
region. One ring carries bytes Arca→Linux (outgoing), the other Linux→Arca
(incoming). The monitor relays bytes between the ring and the kernel
`TcpStream`; Arca reads and writes through `SyncStream` / `AsyncStream`.

```
Arca shared memory Monitor
──── ───────────── ───────
SyncStream ──write──► Ring A (A→B) ──read──► pipe_to_tcp → TcpStream
SyncStream ◄──read── Ring B (B→A) ◄──write── tcp_to_pipe ← TcpStream
```

---

## 2. Ring layer (`arca-pipe`)

### 2.1 Ring buffer

Each ring is a fixed-capacity byte buffer in shared memory with a header
and a data region:

```
[RingHeader: 24 bytes][Data: ring_size bytes]
```

`RingHeader` (stored in shared memory, all fields atomic):

```
read_cursor: AtomicU64 — monotonically increasing logical read position
write_cursor: AtomicU64 — monotonically increasing logical write position
writer_closed: AtomicBool — set by the producer when it will write no more
reader_closed: AtomicBool — set by the consumer when it will read no more
```

Physical position is `cursor % ring_size`. Cursors never reset; wrapping
arithmetic handles overflow.

### 2.2 Read and write

Both operations are **non-blocking**, return immediately if the ring
is full or empty.

**`RingProducer::write(buf)`**
- Writes `min(buf.len(), free_space)` bytes into the ring.
- Returns the number of bytes actually written OR returns `WouldBlock`
if the ring is full (`free_space == 0`).

**`RingConsumer::read(buf)`**
- Reads `min(buf.len(), used_space)` bytes from the ring.
- Returns the number of bytes actually read OR returns `WouldBlock` if the
ring is empty (`used_space == 0`).

Callers that need blocking behavior loop on `WouldBlock` (see §3).

### 2.3 Close flags

Each ring has two close flags in its header, set independently by each end:

| Flag | Set by | Meaning |
| --------------- | -------- | -------------------------------------------- |
| `writer_closed` | Producer | No more bytes will be written to this ring. |
| `reader_closed` | Consumer | No more bytes will be read from this ring. |

A ring is **closed** when both flags are set. A `BidirectionalPipe` is
**fully closed** when both of its rings are closed (all four flags set).

### 2.4 `BidirectionalPipe` layout and API

Total shared-memory size for one pipe:

```
required_size(ring_size) = 2 × (24 + ring_size) bytes
```

Layout: `[HeaderA][DataA: ring_size][HeaderB][DataB: ring_size]`

Side A's producer writes to Ring A; Side B's consumer reads from Ring A,
and vice versa for Ring B.

**Close API on `BidirectionalPipe`:**

| Method | What it does |
| ------------------------ | ---------------------------------------------------------- |
| `close_write()` | Sets `writer_closed` on the outgoing ring. |
| `close_read()` | Sets `reader_closed` on the incoming ring. |
| `is_peer_write_closed()` | Reads `writer_closed` on the incoming ring (peer set it). |
| `is_peer_read_closed()` | Reads `reader_closed` on the outgoing ring (peer set it). |
| `is_closed()` | True when all four flags across both rings are set. |

---

## 3. Where the code lives

```
arca/common/src/
├── pipe/ # ring buffers, BidirectionalPipe
│ └── src/
│ ├── Pipe.md
│ ├── traits.rs # Read, Write traits; Write::write_all default
│ ├── ring.rs # RingHeader (cursors + close flags), RingData
│ ├── ring_producer.rs# RingProducer — write, close_writer, is_reader_closed
│ ├── ring_consumer.rs# RingConsumer — read, close_reader, is_writer_closed
│ └── bidirectional_pipe.rs # BidirectionalPipe — close_write/read, is_closed
```
205 changes: 205 additions & 0 deletions common/src/pipe/bidirectional_pipe.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
use crate::pipe::error::PipeError;
use crate::pipe::ring::{RingData, RingHeader};
use crate::pipe::ring_consumer::RingConsumer;
use crate::pipe::ring_producer::RingProducer;
use crate::pipe::shared_memory_region::SharedMemoryRegion;
use crate::pipe::traits;

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Side {
A,
B,
}

/// One endpoint of a bidirectional pipe.
///
/// Memory layout: `[HeaderA][Ring A->B data][HeaderB][Ring B->A data]`.
pub struct BidirectionalPipe<'a> {
writer: RingProducer<'a>,
reader: RingConsumer<'a>,
}

const HEADER_SIZE: u64 = core::mem::size_of::<RingHeader>() as u64;

impl<'a> BidirectionalPipe<'a> {
/// Total bytes of shared memory needed for a given `ring_size`.
pub const fn required_size(ring_size: u64) -> u64 {
2 * (HEADER_SIZE + ring_size)
}

/// Create a pipe endpoint over a shared memory region.
///
/// Caller must ensure the region is zero-initialized before the first side
/// is constructed, and that exactly one `Side::A` and one `Side::B` are
/// created per region.
pub fn new(region: &'a SharedMemoryRegion, ring_size: u64, side: Side) -> Self {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that the SharedMemoryRegion right now doesn't really "own" the memory, the reference here also don't quite make sense to me...if the concern is that it may have different implementation on the vmm side and the Arca side, we could keep it as a trait here, and let the BidirectionalPipe own it? Then on the vmm-side, this would be mmap::Mmap, on the Arca-side, it holds a raw pointer and size and does nothing on Drop?

assert!(region.len() >= Self::required_size(ring_size));
assert!(
ring_size.is_multiple_of(core::mem::align_of::<RingHeader>() as u64),
"ring_size must be a multiple of 8 for header alignment"
);
let base = region.as_ptr();
assert!(
base.align_offset(core::mem::align_of::<RingHeader>()) == 0,
"shared memory region must be 8-byte aligned"
);

// Layout: [HeaderA (16)] [DataA (ring_size)] [HeaderB (16)] [DataB (ring_size)]
// Interleaved so each header is adjacent to its data (cache locality)
// and headers are separated by ring_size (avoids false sharing).
let header_a = unsafe { &*(base as *const RingHeader) };
let data_a = unsafe { base.add(HEADER_SIZE as usize) };
let header_b = unsafe { &*(data_a.add(ring_size as usize) as *const RingHeader) };
let data_b = unsafe { data_a.add(ring_size as usize + HEADER_SIZE as usize) };

let (writer_header, writer_data, reader_header, reader_data) = match side {
Side::A => (header_a, data_a, header_b, data_b),
Side::B => (header_b, data_b, header_a, data_a),
};

let writer = RingProducer::new(writer_header, unsafe {
RingData::new(writer_data, ring_size)
});
let reader = RingConsumer::new(reader_header, unsafe {
RingData::new(reader_data, ring_size)
});
Self { writer, reader }
}

/// Split into independent read and write halves (like `TcpStream::split`).
pub fn split(&mut self) -> (&mut RingConsumer<'a>, &mut RingProducer<'a>) {
(&mut self.reader, &mut self.writer)
}

/// Close this side's outgoing (write) direction.
pub fn close_write(&self) {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also these functions should also be &mut self? If the user want to be able to handle the consumer and producer side independently, they should split.

self.writer.close_writer();
}

/// Close this side's incoming (read) direction.
pub fn close_read(&self) {
self.reader.close_reader();
}

/// True if the peer has closed their write side (no more data incoming).
pub fn is_peer_write_closed(&self) -> bool {
self.reader.is_writer_closed()
}

/// True if the peer has closed their read side (they will not read more data we send).
pub fn is_peer_read_closed(&self) -> bool {
self.writer.is_reader_closed()
}

/// True when both unidirectional rings are fully closed (all four flags set).
pub fn is_closed(&self) -> bool {
self.writer.is_closed() && self.reader.is_closed()
}
}

impl<'a> traits::Read for BidirectionalPipe<'a> {
fn read(&mut self, buf: &mut [u8]) -> Result<usize, PipeError> {
self.reader.read(buf)
}
}

impl<'a> traits::Write for BidirectionalPipe<'a> {
fn write(&mut self, buf: &[u8]) -> Result<usize, PipeError> {
self.writer.write(buf)
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::traits::{Read, Write};

Check failure on line 115 in common/src/pipe/bidirectional_pipe.rs

View workflow job for this annotation

GitHub Actions / build

unresolved import `crate::traits`

Check failure on line 115 in common/src/pipe/bidirectional_pipe.rs

View workflow job for this annotation

GitHub Actions / build

unresolved import `crate::traits`

#[repr(align(8))]
struct Aligned<const N: usize>([u8; N]);

macro_rules! pipe_pair {
($ring:expr, $mem:ident, $a:ident, $b:ident) => {
let mut $mem = Aligned([0u8; BidirectionalPipe::required_size($ring as u64) as usize]);
let region =
unsafe { SharedMemoryRegion::from_raw($mem.0.as_mut_ptr(), $mem.0.len() as u64) };
let mut $a = BidirectionalPipe::new(&region, $ring, Side::A);
let mut $b = BidirectionalPipe::new(&region, $ring, Side::B);
};
}

#[test]
fn required_size_matches_layout() {
assert_eq!(BidirectionalPipe::required_size(64), 2 * (24 + 64));
}

#[test]
fn round_trip_a_to_b() {
pipe_pair!(64, mem, a, b);
assert_eq!(a.write(b"ping").unwrap(), 4);

Check failure on line 138 in common/src/pipe/bidirectional_pipe.rs

View workflow job for this annotation

GitHub Actions / build

no method named `write` found for struct `bidirectional_pipe::BidirectionalPipe<'a>` in the current scope

Check failure on line 138 in common/src/pipe/bidirectional_pipe.rs

View workflow job for this annotation

GitHub Actions / build

no method named `write` found for struct `bidirectional_pipe::BidirectionalPipe<'a>` in the current scope
let mut out = [0u8; 4];
assert_eq!(b.read(&mut out).unwrap(), 4);

Check failure on line 140 in common/src/pipe/bidirectional_pipe.rs

View workflow job for this annotation

GitHub Actions / build

no method named `read` found for struct `bidirectional_pipe::BidirectionalPipe<'a>` in the current scope

Check failure on line 140 in common/src/pipe/bidirectional_pipe.rs

View workflow job for this annotation

GitHub Actions / build

no method named `read` found for struct `bidirectional_pipe::BidirectionalPipe<'a>` in the current scope
assert_eq!(&out, b"ping");
}

#[test]
fn round_trip_b_to_a() {
pipe_pair!(32, mem, a, b);
assert_eq!(b.write(b"pong!!").unwrap(), 6);

Check failure on line 147 in common/src/pipe/bidirectional_pipe.rs

View workflow job for this annotation

GitHub Actions / build

no method named `write` found for struct `bidirectional_pipe::BidirectionalPipe<'a>` in the current scope

Check failure on line 147 in common/src/pipe/bidirectional_pipe.rs

View workflow job for this annotation

GitHub Actions / build

no method named `write` found for struct `bidirectional_pipe::BidirectionalPipe<'a>` in the current scope
let mut out = [0u8; 6];
assert_eq!(a.read(&mut out).unwrap(), 6);

Check failure on line 149 in common/src/pipe/bidirectional_pipe.rs

View workflow job for this annotation

GitHub Actions / build

no method named `read` found for struct `bidirectional_pipe::BidirectionalPipe<'a>` in the current scope

Check failure on line 149 in common/src/pipe/bidirectional_pipe.rs

View workflow job for this annotation

GitHub Actions / build

no method named `read` found for struct `bidirectional_pipe::BidirectionalPipe<'a>` in the current scope
assert_eq!(&out, b"pong!!");
}

#[test]
fn both_directions_independent() {
pipe_pair!(32, mem, a, b);
a.write(b"hello").unwrap();

Check failure on line 156 in common/src/pipe/bidirectional_pipe.rs

View workflow job for this annotation

GitHub Actions / build

no method named `write` found for struct `bidirectional_pipe::BidirectionalPipe<'a>` in the current scope

Check failure on line 156 in common/src/pipe/bidirectional_pipe.rs

View workflow job for this annotation

GitHub Actions / build

no method named `write` found for struct `bidirectional_pipe::BidirectionalPipe<'a>` in the current scope
b.write(b"world").unwrap();

Check failure on line 157 in common/src/pipe/bidirectional_pipe.rs

View workflow job for this annotation

GitHub Actions / build

no method named `write` found for struct `bidirectional_pipe::BidirectionalPipe<'a>` in the current scope

Check failure on line 157 in common/src/pipe/bidirectional_pipe.rs

View workflow job for this annotation

GitHub Actions / build

no method named `write` found for struct `bidirectional_pipe::BidirectionalPipe<'a>` in the current scope

let mut from_a = [0u8; 5];
let mut from_b = [0u8; 5];
b.read(&mut from_a).unwrap();

Check failure on line 161 in common/src/pipe/bidirectional_pipe.rs

View workflow job for this annotation

GitHub Actions / build

no method named `read` found for struct `bidirectional_pipe::BidirectionalPipe<'a>` in the current scope

Check failure on line 161 in common/src/pipe/bidirectional_pipe.rs

View workflow job for this annotation

GitHub Actions / build

no method named `read` found for struct `bidirectional_pipe::BidirectionalPipe<'a>` in the current scope
a.read(&mut from_b).unwrap();

Check failure on line 162 in common/src/pipe/bidirectional_pipe.rs

View workflow job for this annotation

GitHub Actions / build

no method named `read` found for struct `bidirectional_pipe::BidirectionalPipe<'a>` in the current scope

Check failure on line 162 in common/src/pipe/bidirectional_pipe.rs

View workflow job for this annotation

GitHub Actions / build

no method named `read` found for struct `bidirectional_pipe::BidirectionalPipe<'a>` in the current scope
assert_eq!(&from_a, b"hello");
assert_eq!(&from_b, b"world");
}

#[test]
fn multi_lap() {
pipe_pair!(8, mem, a, b);
for i in 0u8..20 {
assert_eq!(a.write(&[i]).unwrap(), 1);

Check failure on line 171 in common/src/pipe/bidirectional_pipe.rs

View workflow job for this annotation

GitHub Actions / build

no method named `write` found for struct `bidirectional_pipe::BidirectionalPipe<'a>` in the current scope

Check failure on line 171 in common/src/pipe/bidirectional_pipe.rs

View workflow job for this annotation

GitHub Actions / build

no method named `write` found for struct `bidirectional_pipe::BidirectionalPipe<'a>` in the current scope
let mut out = [0u8; 1];
assert_eq!(b.read(&mut out).unwrap(), 1);
assert_eq!(out[0], i);
}
}

#[test]
fn fill_drain_refill() {
pipe_pair!(8, mem, a, b);
assert_eq!(a.write(b"12345678").unwrap(), 8);
let mut out = [0u8; 8];
assert_eq!(b.read(&mut out).unwrap(), 8);
assert_eq!(&out, b"12345678");

assert_eq!(a.write(b"abcdefgh").unwrap(), 8);
assert_eq!(b.read(&mut out).unwrap(), 8);
assert_eq!(&out, b"abcdefgh");
}

#[test]
fn interleaved_both_directions() {
pipe_pair!(16, mem, a, b);
a.write(b"aa").unwrap();
b.write(b"bb").unwrap();
a.write(b"cc").unwrap();
b.write(b"dd").unwrap();

let mut out = [0u8; 4];
b.read(&mut out).unwrap();
assert_eq!(&out, b"aacc");
a.read(&mut out).unwrap();
assert_eq!(&out, b"bbdd");
}
}
16 changes: 16 additions & 0 deletions common/src/pipe/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
use core::fmt;

/// Errors returned by pipe operations.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PipeError {
/// Ring buffer is empty (read) or full (write). Try again later.
WouldBlock,
}

impl fmt::Display for PipeError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
PipeError::WouldBlock => write!(f, "operation would block"),
}
}
}
Loading
Loading