diff --git a/.github/workflows/compile.yml b/.github/workflows/compile.yml index d35eadf..8700296 100644 --- a/.github/workflows/compile.yml +++ b/.github/workflows/compile.yml @@ -47,7 +47,6 @@ jobs: - name: Setup run: | sudo apt update && sudo apt install gcc-multilib - sudo modprobe vhost_vsock echo 'KERNEL=="kvm", GROUP="kvm", MODE="0666", OPTIONS+="static_node=kvm"' | sudo tee /etc/udev/rules.d/99-kvm4all.rules sudo udevadm control --reload-rules sudo udevadm trigger --name-match=kvm diff --git a/Cargo.lock b/Cargo.lock index 1a1a9c8..9ca4461 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1052,6 +1052,7 @@ dependencies = [ "spin 0.10.0", "talc", "time", + "user", ] [[package]] diff --git a/common/src/lib.rs b/common/src/lib.rs index 78ea10b..b771f35 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -19,7 +19,6 @@ pub mod elfloader; pub mod ipaddr; pub mod sendable; pub mod util; -pub mod vhost; #[cfg(feature = "std")] pub mod mmap; @@ -46,13 +45,47 @@ pub struct SymtabRecord { } pub mod hypercall { + use core::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize}; + pub const EXIT: u64 = 0; pub const LOG: u64 = 1; pub const SYMNAME: u64 = 2; pub const MEMSET: u64 = 3; pub const MEMCLR: u64 = 4; - pub const SERVERREAD: u64 = 5; - pub const SERVERWRITE: u64 = 6; - pub const CLIENTREAD: u64 = 7; - pub const CLIENTWRITE: u64 = 8; + pub const TCP_CONNECT: u64 = 5; + pub const TCP_LISTEN: u64 = 6; + pub const TCP_ACCEPT: u64 = 7; + pub const TCP_CLOSE: u64 = 8; + pub const TCP_SEND: u64 = 9; + pub const TCP_RECV: u64 = 10; + pub const FILE_OPEN: u64 = 11; + pub const FILE_CLOSE: u64 = 12; + pub const FILE_READ: u64 = 13; + pub const FILE_WRITE: u64 = 14; + pub const FILE_SEEK: u64 = 15; + + #[derive(Debug, Default)] + pub struct TcpInfo { + pub ip: u32, + pub port: u16, + pub id: AtomicU64, + pub buf: usize, + pub len: AtomicUsize, + pub done: AtomicBool, + } + + #[derive(Debug, Default)] + pub struct FileInfo { + pub read: bool, + pub write: bool, + pub create: bool, + pub append: bool, + pub truncate: bool, + pub buf: usize, + pub offset: isize, + pub whence: i64, + pub len: AtomicUsize, + pub id: AtomicU64, + pub done: AtomicBool, + } } diff --git a/common/src/vhost.rs b/common/src/vhost.rs deleted file mode 100644 index 865c6d3..0000000 --- a/common/src/vhost.rs +++ /dev/null @@ -1,15 +0,0 @@ -#[repr(C)] -#[derive(Copy, Clone)] -pub struct VSockMetadata { - pub rx: VirtQueueMetadata, - pub tx: VirtQueueMetadata, -} - -#[repr(C)] -#[derive(Copy, Clone)] -pub struct VirtQueueMetadata { - pub descriptors: usize, - pub desc: usize, - pub used: usize, - pub avail: usize, -} diff --git a/fix/src/main.rs b/fix/src/main.rs index 38dc29c..c238193 100644 --- a/fix/src/main.rs +++ b/fix/src/main.rs @@ -26,7 +26,7 @@ extern crate alloc; const MODULE: &[u8] = include_bytes!(concat!(env!("OUT_DIR"), "/addblob")); #[kmain] -async fn main(_: &[usize]) { +fn main(_: &[usize]) { log::info!("creating object store"); let mut store = ObjectStore::new(); log::info!("creating fix runtime"); diff --git a/justfile b/justfile new file mode 100644 index 0000000..e4506cf --- /dev/null +++ b/justfile @@ -0,0 +1,40 @@ +alias b := build +alias r := run + +target := 'x86_64-unknown-none' + +release := if env('RELEASE', '0') != '0'{ '--release' } else { '' } + +default: + just --list + +build-all: + cargo build {{release}} + cargo build -p kernel {{release}} + cargo build -p fix {{release}} + cargo build -p user {{release}} + +test: + cargo test + cargo test -p kernel --target={{target}} + +build bin: + cargo build -p kernel --example={{bin}} --target={{target}} {{release}} + +run bin: + cargo run -p kernel --example={{bin}} --target={{target}} {{release}} + +fix: + cargo run -p fix --target={{target}} {{release}} + +fmt: + cargo fmt + cargo fmt -p kernel + cargo fmt -p fix + cargo fmt -p user + +lint: + cargo clippy + cargo clippy -p kernel + cargo clippy -p fix + cargo clippy -p user diff --git a/kernel/Cargo.toml b/kernel/Cargo.toml index 9a757d1..e252b54 100644 --- a/kernel/Cargo.toml +++ b/kernel/Cargo.toml @@ -6,10 +6,18 @@ version = "0.1.0" edition = "2021" forced-target = "x86_64-unknown-none" -[[bin]] +[[example]] name = "hello" test = false +[[example]] +name = "threads" +test = false + +[[example]] +name = "webserver" +test = false + # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [features] @@ -46,3 +54,4 @@ glob = "0.3.1" [dev-dependencies] postcard = { version = "1.1.3", features = ["alloc"] } +user = { path = "../user", artifact = "bin", target = "x86_64-unknown-none" } diff --git a/kernel/build.rs b/kernel/build.rs index a3fcca2..3a1f490 100644 --- a/kernel/build.rs +++ b/kernel/build.rs @@ -7,6 +7,8 @@ fn main() -> Result<()> { println!("cargo::rerun-if-changed=src/interrupts.S"); cc::Build::new().file("src/util.S").compile("util"); println!("cargo::rerun-if-changed=src/util.S"); + cc::Build::new().file("src/cswitch.S").compile("cswitch"); + println!("cargo::rerun-if-changed=src/cswitch.S"); let dir = std::env::var("CARGO_MANIFEST_DIR").unwrap(); println!("cargo::rerun-if-changed=etc/memmap.ld"); diff --git a/kernel/src/bin/hello.rs b/kernel/examples/hello.rs similarity index 67% rename from kernel/src/bin/hello.rs rename to kernel/examples/hello.rs index ac63652..24c00d1 100644 --- a/kernel/src/bin/hello.rs +++ b/kernel/examples/hello.rs @@ -2,9 +2,8 @@ #![no_main] use kernel::prelude::*; -use kernel::rt; #[kmain] -async fn main(_: &[usize]) { +fn main(_: &[usize]) { log::info!("hello, world"); } diff --git a/kernel/examples/threads.rs b/kernel/examples/threads.rs new file mode 100644 index 0000000..86b1f10 --- /dev/null +++ b/kernel/examples/threads.rs @@ -0,0 +1,24 @@ +#![no_std] +#![no_main] + +use kernel::{ + kthread::{self, yield_now}, + prelude::*, +}; + +#[kmain] +fn main(_: &[usize]) { + log::info!("about to spawn two threads"); + kthread::spawn(|| { + for i in 0..5 { + log::info!("thread 1 says {i}"); + yield_now(); + } + }); + kthread::spawn(|| { + for i in 100..105 { + log::info!("thread 2 says {i}"); + yield_now(); + } + }); +} diff --git a/kernel/examples/webserver.rs b/kernel/examples/webserver.rs new file mode 100644 index 0000000..d2f5890 --- /dev/null +++ b/kernel/examples/webserver.rs @@ -0,0 +1,64 @@ +#![no_std] +#![no_main] + +use common::elfloader; +use kernel::host::net::TcpListener; +use kernel::{kthread, prelude::*}; + +const HANDLER: &[u8] = include_bytes!(env!("CARGO_BIN_FILE_USER_webserver")); + +#[kmain] +fn main(_: &[usize]) { + kthread::wfi(); + let listener = TcpListener::bind(&[0, 0, 0, 0], 8080); + log::info!("listening on port 8080"); + let handler: Function = elfloader::load_elf(HANDLER).unwrap(); + loop { + let handler = handler.clone(); + let mut stream = listener.accept(); + kthread::spawn(move || { + let mut handler = handler; + loop { + let effect: Function = handler + .force() + .try_into() + .expect("handler returned non-effect"); + let mut data: Tuple = effect.read().try_into().expect("could not read effect"); + let t: Blob = data.take(0).try_into().unwrap(); + assert_eq!(&*t, b"Symbolic"); + let effect: Blob = data.take(1).try_into().expect("could not find effect name"); + let args: Tuple = data + .take(2) + .try_into() + .expect("could not find effect arguments"); + let mut args: Vec = args.into_iter().collect(); + let k: Function = args + .pop() + .and_then(|x| x.try_into().ok()) + .expect("could not find continuation"); + handler = match (&*effect, &*args) { + (b"read", &[Value::Word(fd), Value::Word(len)]) => { + assert_eq!(fd.read(), 0); + let mut v = vec![0; len.read() as usize]; + let len = stream.recv(&mut v); + v.truncate(len); + k.apply(Blob::new(v)) + } + (b"write", &[Value::Word(fd), Value::Blob(ref data)]) => { + assert_eq!(fd.read(), 1); + let len = stream.send(&*data); + k.apply(Word::new(len as u64)) + } + (b"close", &[Value::Word(_)]) => k.apply(None), + (b"exit", &[Value::Word(_)]) => { + break; + } + (e, _) => { + let e: &str = str::from_utf8(e).unwrap(); + panic!("unhandled effect: {e}"); + } + }; + } + }); + } +} diff --git a/kernel/examples/webserver2.rs b/kernel/examples/webserver2.rs new file mode 100644 index 0000000..aed8ae0 --- /dev/null +++ b/kernel/examples/webserver2.rs @@ -0,0 +1,77 @@ +#![no_std] +#![no_main] + +use common::elfloader; +use kernel::host::net::TcpListener; +use kernel::{kthread, prelude::*}; + +const HANDLER: &[u8] = include_bytes!(env!("CARGO_BIN_FILE_USER_webserver2")); + +#[kmain] +fn main(_: &[usize]) { + kthread::wfi(); + let listener = Arc::new(TcpListener::bind(&[0, 0, 0, 0], 8081)); + log::info!("listening on port 8081"); + + for _ in 0..kernel::ncores() { + let listener = listener.clone(); + kthread::spawn(move || { + let mut handler: Function = elfloader::load_elf(HANDLER).unwrap(); + let mut current = None; + loop { + let effect: Function = handler + .force() + .try_into() + .expect("handler returned non-effect"); + let mut data: Tuple = effect.read().try_into().expect("could not read effect"); + let t: Blob = data.take(0).try_into().unwrap(); + assert_eq!(&*t, b"Symbolic"); + let effect: Blob = data.take(1).try_into().expect("could not find effect name"); + let args: Tuple = data + .take(2) + .try_into() + .expect("could not find effect arguments"); + let mut args: Vec = args.into_iter().collect(); + let k: Function = args + .pop() + .and_then(|x| x.try_into().ok()) + .expect("could not find continuation"); + handler = match (&*effect, &*args) { + (b"accept", &[]) => { + current = Some(listener.accept()); + k.apply(None) + } + (b"recv", &[Value::Word(len)]) => { + if let Some(ref mut stream) = current { + let mut v = vec![0; len.read() as usize]; + let len = stream.recv(&mut v); + v.truncate(len); + k.apply(Blob::new(v)) + } else { + k.apply(None) + } + } + (b"send", &[Value::Blob(ref data)]) => { + if let Some(ref mut stream) = current { + let len = stream.send(&*data); + k.apply(Word::new(len as u64)) + } else { + k.apply(None) + } + } + (b"hangup", &[]) => { + current.take(); + k.apply(None) + } + (b"exit", &[Value::Word(_)]) => { + break; + } + (e, _) => { + let e: &str = str::from_utf8(e).unwrap(); + panic!("unhandled effect: {e}"); + } + }; + } + }); + } +} diff --git a/kernel/src/aprofile.rs b/kernel/src/aprofile.rs deleted file mode 100644 index 937506f..0000000 --- a/kernel/src/aprofile.rs +++ /dev/null @@ -1,101 +0,0 @@ -use core::{ - fmt::Write as _, - sync::atomic::{AtomicU64, Ordering}, - time::Duration, -}; - -pub use macros::profile; - -use alloc::collections::btree_map::BTreeMap; - -use crate::{debugcon, prelude::*}; - -static COUNTS: OnceLock<&'static [AtomicU64]> = OnceLock::new(); - -extern "C" { - static mut _srodata: u8; - static mut _edata: u8; -} - -pub(crate) unsafe fn init() { - COUNTS.get_or_init(|| { - let size = (&raw const _edata).offset_from_unsigned(&raw const _srodata); - let buffer = Box::new_zeroed_slice(size).assume_init(); - Box::leak(buffer) - }); -} - -pub fn reset() { - for count in *COUNTS { - count.store(0, Ordering::SeqCst); - } -} - -pub fn log_time_spent(f: &&'static str, duration: Duration) { - let ns = duration.as_nanos() as u64; - let start = &raw const _srodata; - let end = &raw const _edata; - let rip = f as *const &'static str as *const u8; - if rip < start || rip >= end { - return; - } - unsafe { - let offset = rip.offset_from_unsigned(start); - COUNTS[offset].fetch_add(ns, Ordering::SeqCst); - } -} - -pub fn entries() -> BTreeMap<&'static str, Duration> { - let start = &raw const _srodata as *const (); - - let mut entries = BTreeMap::new(); - for (i, count) in COUNTS.iter().enumerate() { - let count = count.load(Ordering::SeqCst); - - if count > 0 { - unsafe { - entries.insert( - *(start.byte_add(i) as *const &'static str), - Duration::from_nanos(count), - ); - } - } - } - entries -} - -pub fn report(entries: &mut [(&'static str, Duration)]) { - let start = &raw const _srodata; - static NULL: &str = "N/A"; - entries.fill((NULL, Duration::ZERO)); - - for (i, count) in COUNTS.iter().enumerate() { - let count = count.load(Ordering::SeqCst); - let dur = Duration::from_nanos(count); - - if dur > entries[0].1 { - unsafe { - entries[0] = (*(start.byte_add(i) as *const &'static str), dur); - } - } - - entries.sort_by_key(|x| x.1); - } - entries.reverse(); -} - -pub fn log(count: usize) { - let mut entries = vec![("", Duration::ZERO); count]; - report(&mut entries); - let mut console = debugcon::CONSOLE.lock(); - let _ = writeln!(&mut *console, "----- PROFILE -----"); - for (i, (p, n)) in entries.iter().enumerate() { - if *n == Duration::ZERO { - break; - } - let symname = *p; - let n = n.as_nanos(); - let _ = writeln!(&mut *console, "{i}. {n:.5e}\t{symname}"); - } - let _ = writeln!(&mut *console, "-------------------"); -} diff --git a/kernel/src/cswitch.S b/kernel/src/cswitch.S new file mode 100644 index 0000000..0652404 --- /dev/null +++ b/kernel/src/cswitch.S @@ -0,0 +1,37 @@ +.intel_syntax noprefix + +/* + * Cooperatively switches from one kthread to another. + * + * params: + * rdi: new stack pointer + * rsi: pointer to old stack pointer + */ +.globl cswitch +cswitch: + cli + push rbx + push rbp + push r12 + push r13 + push r14 + push r15 + + mov [rsi], rsp + mov rsp, rdi + + pop r15 + pop r14 + pop r13 + pop r12 + pop rbp + pop rbx + + sti + ret + +.globl kthread_init +kthread_init: + pop rax + pop rdi + jmp rax diff --git a/kernel/src/host.rs b/kernel/src/host.rs index 9a742af..a45b0e6 100644 --- a/kernel/src/host.rs +++ b/kernel/src/host.rs @@ -92,26 +92,281 @@ pub fn memclr(region: *mut [u8]) { } } -pub fn server_read(region: *mut [u8]) -> usize { - let (p, n) = region.to_raw_parts(); - let p = vm::ka2pa(p); - unsafe { crate::io::hypercall2(hypercall::SERVERREAD, p as u64, n as u64) as usize } -} +pub mod net { + use core::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; -pub fn server_write(region: *const [u8]) -> usize { - let (p, n) = region.to_raw_parts(); - let p = vm::ka2pa(p); - unsafe { crate::io::hypercall2(hypercall::SERVERWRITE, p as u64, n as u64) as usize } -} + use common::{ + hypercall::{self, TcpInfo}, + BuddyAllocator, + }; -pub fn client_read(region: *mut [u8]) -> usize { - let (p, n) = region.to_raw_parts(); - let p = vm::ka2pa(p); - unsafe { crate::io::hypercall2(hypercall::CLIENTREAD, p as u64, n as u64) as usize } + use crate::kthread; + + pub struct TcpListener { + id: u64, + } + + pub struct TcpStream { + id: u64, + } + + impl TcpListener { + pub fn bind(ip: &[u8; 4], port: u16) -> TcpListener { + let info = TcpInfo { + ip: u32::from_ne_bytes(*ip), + port, + ..Default::default() + }; + unsafe { + crate::io::hypercall1( + hypercall::TCP_LISTEN, + BuddyAllocator.to_offset(&info) as u64, + ) + }; + while !info.done.load(Ordering::SeqCst) { + kthread::wfi(); + } + TcpListener { + id: info.id.load(Ordering::SeqCst), + } + } + + pub fn accept(&self) -> TcpStream { + let info = TcpInfo { + id: AtomicU64::new(self.id), + ..Default::default() + }; + unsafe { + crate::io::hypercall1( + hypercall::TCP_ACCEPT, + BuddyAllocator.to_offset(&info) as u64, + ) + }; + while !info.done.load(Ordering::SeqCst) { + kthread::wfi(); + } + TcpStream { + id: info.id.load(Ordering::SeqCst), + } + } + } + + impl TcpStream { + pub fn connect(ip: &[u8; 4], port: u16) -> TcpStream { + let info = TcpInfo { + ip: u32::from_ne_bytes(*ip), + port, + ..Default::default() + }; + unsafe { + crate::io::hypercall1( + hypercall::TCP_CONNECT, + BuddyAllocator.to_offset(&info) as u64, + ) + }; + while !info.done.load(Ordering::SeqCst) { + kthread::wfi(); + } + TcpStream { + id: info.id.load(Ordering::SeqCst), + } + } + + pub fn send(&mut self, bytes: &[u8]) -> usize { + let info = TcpInfo { + id: AtomicU64::new(self.id), + buf: BuddyAllocator.to_offset(bytes.as_ptr()), + len: AtomicUsize::new(bytes.len()), + ..Default::default() + }; + unsafe { + crate::io::hypercall1(hypercall::TCP_SEND, BuddyAllocator.to_offset(&info) as u64) + }; + while !info.done.load(Ordering::SeqCst) { + kthread::wfi(); + } + info.len.load(Ordering::SeqCst) + } + + pub fn recv(&mut self, bytes: &mut [u8]) -> usize { + let info = TcpInfo { + id: AtomicU64::new(self.id), + buf: BuddyAllocator.to_offset(bytes.as_ptr()), + len: AtomicUsize::new(bytes.len()), + ..Default::default() + }; + unsafe { + crate::io::hypercall1(hypercall::TCP_RECV, BuddyAllocator.to_offset(&info) as u64) + }; + while !info.done.load(Ordering::SeqCst) { + kthread::wfi(); + } + info.len.load(Ordering::SeqCst) + } + + pub fn close(self) { + let _ = self; + } + } + + impl Drop for TcpStream { + fn drop(&mut self) { + let info = TcpInfo { + id: AtomicU64::new(self.id), + ..Default::default() + }; + unsafe { + crate::io::hypercall1(hypercall::TCP_CLOSE, BuddyAllocator.to_offset(&info) as u64) + }; + while !info.done.load(Ordering::SeqCst) { + kthread::wfi(); + } + } + } + + impl !Sync for TcpStream {} + + impl core::fmt::Write for TcpStream { + fn write_str(&mut self, s: &str) -> core::fmt::Result { + let mut s = s.as_bytes(); + while !s.is_empty() { + let len = self.send(s); + s = &s[len..]; + } + Ok(()) + } + } } -pub fn client_write(region: *const [u8]) -> usize { - let (p, n) = region.to_raw_parts(); - let p = vm::ka2pa(p); - unsafe { crate::io::hypercall2(hypercall::CLIENTWRITE, p as u64, n as u64) as usize } +pub mod fs { + use core::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; + + use common::{ + hypercall::{self, FileInfo}, + BuddyAllocator, + }; + + use crate::kthread; + + pub struct File { + id: u64, + } + + impl File { + pub fn open( + path: &str, + read: bool, + write: bool, + create: bool, + append: bool, + truncate: bool, + ) -> Option { + let info = FileInfo { + buf: BuddyAllocator.to_offset(path.as_ptr()), + len: AtomicUsize::new(path.len()), + read, + write, + create, + append, + truncate, + ..Default::default() + }; + unsafe { + crate::io::hypercall1(hypercall::FILE_OPEN, BuddyAllocator.to_offset(&info) as u64) + }; + while !info.done.load(Ordering::SeqCst) { + kthread::wfi(); + } + let id = info.id.load(Ordering::SeqCst); + if id != 0 { + Some(File { id }) + } else { + None + } + } + + pub fn close(self) {} + + pub fn read(&mut self, buf: &mut [u8]) -> usize { + let info = FileInfo { + id: AtomicU64::new(self.id), + buf: BuddyAllocator.to_offset(buf.as_ptr()), + len: AtomicUsize::new(buf.len()), + ..Default::default() + }; + unsafe { + crate::io::hypercall1(hypercall::FILE_READ, BuddyAllocator.to_offset(&info) as u64) + }; + while !info.done.load(Ordering::SeqCst) { + kthread::wfi(); + } + info.len.load(Ordering::SeqCst) + } + + pub fn write(&mut self, buf: &[u8]) -> usize { + let info = FileInfo { + id: AtomicU64::new(self.id), + buf: BuddyAllocator.to_offset(buf.as_ptr()), + len: AtomicUsize::new(buf.len()), + ..Default::default() + }; + unsafe { + crate::io::hypercall1( + hypercall::FILE_WRITE, + BuddyAllocator.to_offset(&info) as u64, + ) + }; + while !info.done.load(Ordering::SeqCst) { + kthread::wfi(); + } + info.len.load(Ordering::SeqCst) + } + + pub fn seek(&mut self, offset: isize, whence: i64) -> usize { + let info = FileInfo { + id: AtomicU64::new(self.id), + offset, + whence, + ..Default::default() + }; + unsafe { + crate::io::hypercall1(hypercall::FILE_SEEK, BuddyAllocator.to_offset(&info) as u64) + }; + while !info.done.load(Ordering::SeqCst) { + kthread::wfi(); + } + info.len.load(Ordering::SeqCst) + } + } + + impl Drop for File { + fn drop(&mut self) { + let info = FileInfo { + id: AtomicU64::new(self.id), + ..Default::default() + }; + unsafe { + crate::io::hypercall1( + hypercall::FILE_CLOSE, + BuddyAllocator.to_offset(&info) as u64, + ) + }; + while !info.done.load(Ordering::SeqCst) { + kthread::wfi(); + } + } + } + + impl core::fmt::Write for File { + fn write_str(&mut self, s: &str) -> core::fmt::Result { + let mut s = s.as_bytes(); + while !s.is_empty() { + let len = self.write(s); + s = &s[len..]; + } + Ok(()) + } + } + + impl !Sync for File {} } diff --git a/kernel/src/interrupts.rs b/kernel/src/interrupts.rs index cd9ddfe..db9e65f 100644 --- a/kernel/src/interrupts.rs +++ b/kernel/src/interrupts.rs @@ -80,10 +80,12 @@ pub fn critical(f: impl FnOnce() -> T) -> T { y } +#[allow(unused)] pub fn must_be_enabled() { assert!(enabled()); } +#[allow(unused)] pub fn must_be_disabled() { assert!(!enabled()); } @@ -91,12 +93,6 @@ pub fn must_be_disabled() { #[no_mangle] unsafe extern "C" fn isr_entry(registers: &mut IsrRegisterFile) { must_be_disabled(); - if registers.isr == 0x30 { - // log::warn!("got interrupt from virtio"); - INTERRUPTED.store(true, Ordering::Relaxed); - crate::lapic::LAPIC.borrow_mut().clear_interrupt(); - return; - } if registers.isr == 0x31 { INTERRUPTED.store(true, Ordering::Relaxed); if kvmclock::time_since_boot() > Duration::from_secs(1) { @@ -116,11 +112,8 @@ unsafe extern "C" fn isr_entry(registers: &mut IsrRegisterFile) { }); let _ = writeln!(&mut *console, "------ PROFILE ------"); crate::iprofile::log(20); - let _ = writeln!(&mut *console, "------ RUNTIME ------"); - crate::rt::profile(); let _ = writeln!(&mut *console, "---------------------"); crate::iprofile::reset(); - crate::rt::reset_stats(); } // crate::shutdown(); crate::lapic::LAPIC.borrow_mut().clear_interrupt(); diff --git a/kernel/src/kthread.rs b/kernel/src/kthread.rs new file mode 100644 index 0000000..caf4135 --- /dev/null +++ b/kernel/src/kthread.rs @@ -0,0 +1,198 @@ +use core::{ + cell::RefCell, + mem::MaybeUninit, + sync::atomic::{AtomicUsize, Ordering}, +}; + +use alloc::{boxed::Box, collections::vec_deque::VecDeque}; +use common::util::{initcell::LazyLock, spinlock::SpinLock}; + +use crate::{interrupts::INTERRUPTED, page::Page2MB}; + +#[derive(Default, Debug)] +pub struct KThread { + tid: usize, + rsp: *const u8, + base: *mut MaybeUninit, + scheduler: bool, + exited: bool, + wfi: bool, + next: Option>, +} + +unsafe impl Send for KThread {} + +unsafe extern "C" { + fn cswitch(new_stack: *const u8, old_stack: *mut *const u8) -> *const u8; + fn kthread_init(); +} + +static CURRENT_TID: AtomicUsize = AtomicUsize::new(0); +static OUTSTANDING: AtomicUsize = AtomicUsize::new(0); + +static WFI: SpinLock>> = SpinLock::new(None); + +fn next_tid() -> usize { + CURRENT_TID.fetch_add(1, Ordering::SeqCst) +} + +#[core_local] +pub static SCHEDULER_THREAD: LazyLock> = LazyLock::new(|| { + RefCell::new(KThread { + tid: next_tid(), + rsp: core::ptr::null(), + base: core::ptr::null_mut(), + scheduler: true, + exited: false, + wfi: false, + next: None, + }) +}); + +#[core_local] +pub static CURRENT_THREAD: LazyLock>> = LazyLock::new(RefCell::default); + +pub fn tid() -> usize { + CURRENT_THREAD.borrow().tid +} + +pub fn scheduler() -> bool { + CURRENT_THREAD.borrow().scheduler +} + +pub fn exited() -> bool { + CURRENT_THREAD.borrow().exited +} + +pub fn init() { + LazyLock::force(&SCHEDULER_THREAD); + LazyLock::force(&CURRENT_THREAD); +} + +pub static THREAD_QUEUE: LazyLock>>> = + LazyLock::new(|| SpinLock::new(VecDeque::new())); + +pub fn spawn(f: impl FnOnce()) { + let f: Box> = Box::new(Box::new(f)); + let f = Box::into_raw(f); + let tid = next_tid(); + let stack = Box::::new_uninit(); + let base = Box::into_raw(stack); + let stack = unsafe { + &mut *core::mem::transmute::< + *mut MaybeUninit, + *mut [MaybeUninit; (1 << 21) / 8], + >(base) + }; + stack[stack.len() - 1].write(f as u64); // data ptr + stack[stack.len() - 2].write(start_thread as *const () as u64); // function ptr (aligned) + stack[stack.len() - 3].write(kthread_init as *const () as u64); // return address + stack[stack.len() - 4].write(0); // rbx (aligned) + stack[stack.len() - 5].write(0); // rbp + stack[stack.len() - 6].write(0); // r12 (aligned) + stack[stack.len() - 7].write(0); // r13 + stack[stack.len() - 8].write(0); // r14 (aligned) + stack[stack.len() - 9].write(0); // r15 + let rsp = &raw mut stack[stack.len() - 9] as *const u8; + let thread = KThread { + tid, + rsp, + base, + scheduler: false, + exited: false, + wfi: false, + next: None, + }; + let mut q = THREAD_QUEUE.lock(); + OUTSTANDING.fetch_add(1, Ordering::SeqCst); + q.push_back(thread.into()); +} + +/** + * Waits for tasks to be scheduled, then runs tasks until there are none left. + * + * # Safety + * + * This function should only be called from rsstart; its calls should never be nested. + */ +pub(crate) unsafe fn run_scheduler() { + while OUTSTANDING.load(Ordering::SeqCst) == 0 { + // wait for something to be scheduled + core::hint::spin_loop(); + } + while OUTSTANDING.load(Ordering::SeqCst) != 0 { + let Some(next) = THREAD_QUEUE.lock().pop_front() else { + sleep(); + if INTERRUPTED.swap(false, Ordering::SeqCst) { + let mut wfi = WFI.lock(); + let mut head = core::mem::take(&mut *wfi); + let mut q = THREAD_QUEUE.lock(); + while let Some(mut current) = head { + head = core::mem::take(&mut current.next); + log::trace!("awakening {}", current.tid); + q.push_back(current); + } + } + continue; + }; + log::trace!("scheduling thread {}", next.tid); + CURRENT_THREAD.replace(next); + let mut scheduler = SCHEDULER_THREAD.borrow_mut(); + let old_rsp = &raw mut scheduler.rsp; + core::mem::drop(scheduler); + let new_rsp = CURRENT_THREAD.borrow_mut().rsp; + cswitch(new_rsp, old_rsp); + let mut next = CURRENT_THREAD.take(); + if next.exited { + log::trace!("thread {} exited", next.tid); + let old_stack = next.base; + let ptr = Box::from_raw(old_stack); + core::mem::drop(ptr); + let left = OUTSTANDING.fetch_sub(1, Ordering::SeqCst) - 1; + log::trace!("{left} threads outstanding"); + } else if next.wfi { + log::trace!("thread {} is waiting", next.tid); + let mut wfi = WFI.lock(); + next.next = core::mem::take(&mut *wfi); + wfi.replace(next); + } else { + log::trace!("thread {} yielded", next.tid); + THREAD_QUEUE.lock().push_back(next); + } + } +} + +pub fn yield_now() { + log::trace!("exiting from {} into scheduler", tid()); + let next_rsp = SCHEDULER_THREAD.borrow_mut().rsp; + let mut current = CURRENT_THREAD.borrow_mut(); + let current_rsp = &raw mut current.rsp; + core::mem::drop(current); + unsafe { + cswitch(next_rsp, current_rsp); + } +} + +fn sleep() { + unsafe { + // io::outl(0xf4, 0); + core::arch::asm!("hlt"); + } +} + +fn exit() { + CURRENT_THREAD.borrow_mut().exited = true; + yield_now(); +} + +pub fn wfi() { + CURRENT_THREAD.borrow_mut().wfi = true; + yield_now(); +} + +unsafe extern "C" fn start_thread(f: *mut Box) { + log::trace!("starting thread {}", tid()); + let f = unsafe { f.read() }; + f(); + exit(); +} diff --git a/kernel/src/kvmclock.rs b/kernel/src/kvmclock.rs index 51b888a..e913baf 100644 --- a/kernel/src/kvmclock.rs +++ b/kernel/src/kvmclock.rs @@ -108,9 +108,8 @@ fn info_and_tsc_to_duration(info: CpuTimeInfo, tsc: u64) -> Duration { time >> -info.tsc_shift }; let time = time.widening_mul(info.tsc_to_system_mul as u64); - let time = (time.0 >> 32) | (time.1 << 32); - let time = time.wrapping_add(info.system_time); - Duration::from_nanos(time) + let time = time.wrapping_add(info.system_time as u128); + Duration::from_nanos(time as u64) } #[inline] diff --git a/kernel/src/lapic.rs b/kernel/src/lapic.rs index 35a06a1..a0ce9ff 100644 --- a/kernel/src/lapic.rs +++ b/kernel/src/lapic.rs @@ -151,7 +151,7 @@ pub unsafe fn init() { // lapic.set_initial_count(0x400000); // 100ms - // lapic.set_initial_count(0x80000); // 10ms + lapic.set_initial_count(0x80000); // 10ms // lapic.set_initial_count(0x10000); // 1ms @@ -159,7 +159,7 @@ pub unsafe fn init() { // lapic.set_initial_count(0x2000); // 100us - lapic.set_initial_count(0x200); // 10us + // lapic.set_initial_count(0x200); // 10us // lapic.set_initial_count(0x40); // 1us diff --git a/kernel/src/lib.rs b/kernel/src/lib.rs index 48aba55..7e9c83c 100644 --- a/kernel/src/lib.rs +++ b/kernel/src/lib.rs @@ -9,6 +9,7 @@ #![feature(negative_impls)] #![feature(never_type)] #![feature(ptr_metadata)] +#![feature(atomic_ptr_null)] #![feature(custom_test_frameworks)] #![test_runner(testing::harness)] #![reexport_test_harness_main = "test_main"] @@ -22,20 +23,18 @@ use common::hypercall; pub use macros::core_local; pub mod allocator; -pub mod aprofile; pub mod cpu; pub mod debugcon; pub mod host; pub mod io; pub mod iprofile; +pub mod kthread; pub mod kvmclock; pub mod page; pub mod paging; pub mod prelude; -pub mod rt; pub mod tsc; pub mod types; -pub mod virtio; pub mod vm; mod gdt; diff --git a/kernel/src/rsstart.rs b/kernel/src/rsstart.rs index 55e49f8..d8ff4de 100644 --- a/kernel/src/rsstart.rs +++ b/kernel/src/rsstart.rs @@ -1,7 +1,7 @@ use core::{ arch::asm, ptr::{addr_of, addr_of_mut, NonNull}, - sync::atomic::{AtomicBool, Ordering}, + sync::atomic::Ordering, }; use alloc::boxed::Box; @@ -18,7 +18,7 @@ use crate::{ vm, }; -use common::{buddy::BuddyAllocatorRawData, vhost::VSockMetadata, BuddyAllocator}; +use common::{buddy::BuddyAllocatorRawData, BuddyAllocator}; extern "C" { fn kmain(argc: usize, argv: *const usize); @@ -57,8 +57,6 @@ pub(crate) static KERNEL_MAPPINGS: LazyLock = LazyLock::new(Executor::new); - -pub static SHUTDOWN: AtomicBool = AtomicBool::new(false); - -pub struct Executor { - todo_rx: Receiver>, - todo_tx: Sender>, - sleeping: Trie<2, Waker>, - wfi_rx: Receiver, - wfi_tx: Sender, - active: AtomicUsize, -} - -type BoxFuture = Pin + Send>>; - -#[core_local] -static LAST_TIME: SpinLock = SpinLock::new(0); - -static TIME_SLEEPING: AtomicUsize = AtomicUsize::new(0); -static TIME_WORKING: AtomicUsize = AtomicUsize::new(0); -static TIME_SCHEDULING: AtomicUsize = AtomicUsize::new(0); - -enum Entry { - Nothing, - Finished(Option), - Waiting(Waker), -} - -impl Executor { - fn new() -> Executor { - let (todo_tx, todo_rx) = channel::bounded(4096); - let (wfi_tx, wfi_rx) = channel::bounded(4096); - Executor { - todo_rx, - todo_tx, - sleeping: Trie::default(), - wfi_rx, - wfi_tx, - active: AtomicUsize::new(0), - } - } - - #[inline(never)] - async fn resolve(future: F, entry: Arc>>) - where - F: Future + Send, - T: Send, - { - let result = future.await; - let mut entry = entry.write(); - let mut replacement = Entry::Finished(Some(result)); - core::mem::swap(&mut *entry, &mut replacement); - match replacement { - Entry::Nothing => {} - Entry::Finished(_) => unreachable!(), - Entry::Waiting(waker) => { - waker.wake(); - } - } - } - - pub fn spawn(&self, future: F) -> JoinHandle - where - F: Future + Send + 'static, - T: Send + 'static, - { - let entry = Arc::new(RwLock::new(Entry::Nothing)); - let future = Self::resolve(future, entry.clone()); - let task = Arc::new(Task { - future: SpinLock::new(Box::pin(future)), - todo: self.todo_tx.clone(), - }); - let handle = JoinHandle { entry }; - self.active.fetch_add(1, Ordering::AcqRel); - self.todo_tx.try_send(task).unwrap(); - handle - } - - pub fn spawn_blocking<'a, F, T>(&self, future: F) -> T - where - F: Future + Send + 'a, - T: Send + 'a, - { - let entry = Arc::new(RwLock::new(Entry::Nothing)); - let future = Self::resolve(future, entry.clone()); - unsafe { - let pin: Pin + Send + 'a>> = Box::pin(future); - let pin: BoxFuture = core::mem::transmute(pin); - let task = Arc::new(Task { - future: SpinLock::new(pin), - todo: self.todo_tx.clone(), - }); - let handle = JoinHandle { entry }; - self.active.fetch_add(1, Ordering::AcqRel); - self.todo_tx.try_send(task).unwrap(); - handle.join() - } - } - - #[inline(never)] - fn wake_sleeping(&self) -> bool { - let Ok(Some(key)) = self.sleeping.try_first_key() else { - // No first key found. If we missed one because of a race it's not the end of the world anyway. - return false; - }; - let now = kvmclock::time_since_boot(); - let timestamp = now.as_nanos() as u64; - if key >= timestamp { - // The first task is still in the future. - return false; - } - let Some(value) = self.sleeping.remove(key) else { - // We saw a first key, but someone else grabbed it first. They can deal with this. - return false; - }; - value.wake(); - true - } - - fn diff(&self) -> usize { - let mut time = LAST_TIME.lock(); - let now = kvmclock::time_since_boot().as_nanos(); - let diff = now.wrapping_sub(*time); - *time = now; - diff as usize - } - - #[inline(never)] - fn run_pending(&self) -> bool { - let Ok(task) = self.todo_rx.try_recv() else { - return false; - }; - TIME_SCHEDULING.fetch_add(self.diff(), Ordering::SeqCst); - let result = task.poll(); - TIME_WORKING.fetch_add(self.diff(), Ordering::SeqCst); - if result.is_ready() { - self.active.fetch_sub(1, Ordering::AcqRel); - } - true - } - - #[inline(never)] - fn sleep(&self) { - if !INTERRUPTED.load(Ordering::Relaxed) { - TIME_SCHEDULING.fetch_add(self.diff(), Ordering::SeqCst); - unsafe { - io::outl(0xf4, 0); - core::arch::asm!("hlt"); - } - TIME_SLEEPING.fetch_add(self.diff(), Ordering::SeqCst); - } - } - - #[inline(never)] - fn wake_interrupted(&self) -> bool { - let mut anything = false; - if INTERRUPTED - .compare_exchange(true, false, Ordering::Relaxed, Ordering::Relaxed) - .is_ok() - { - while let Ok(waker) = self.wfi_rx.try_recv() { - anything = true; - waker.wake(); - } - } - anything - } - - pub fn tick(&self) { - crate::interrupts::must_be_enabled(); - let mut anything = false; - anything |= self.wake_interrupted(); - anything |= self.wake_sleeping(); - anything |= self.run_pending(); - if !anything { - self.sleep(); - } - } - - pub fn run(&self) { - self.diff(); - while self.active.load(Ordering::Acquire) != 0 { - self.tick(); - } - } -} - -pub struct JoinHandle { - entry: Arc>>, -} - -impl JoinHandle { - fn join(self) -> T { - loop { - if let Some(mut entry) = self.entry.try_write() { - if let Entry::Finished(value) = &mut *entry { - return value.take().unwrap(); - } - } - EXECUTOR.tick(); - } - } -} - -impl Future for JoinHandle { - type Output = T; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut result = self.entry.write(); - match &mut *result { - Entry::Nothing | Entry::Waiting(_) => { - *result = Entry::Waiting(cx.waker().clone()); - Poll::Pending - } - Entry::Finished(value) => Poll::Ready(value.take().unwrap()), - } - } -} - -pub fn spawn(future: F) -> JoinHandle -where - F: Future + Send + 'static, - T: Send + 'static, -{ - EXECUTOR.spawn(future) -} - -pub fn spawn_blocking(future: F) -> T -where - F: Future + Send, - T: Send, -{ - EXECUTOR.spawn_blocking(future) -} - -pub fn run() { - EXECUTOR.run(); -} - -struct Task { - future: SpinLock, - todo: Sender>, -} - -impl Task { - fn poll(self: Arc) -> Poll<()> { - let waker = self.clone().into(); - let mut cx = Context::from_waker(&waker); - let mut task_future = self.future.lock(); - task_future.as_mut().poll(&mut cx) - } -} - -impl Wake for Task { - fn wake(self: Arc) { - let todo = self.todo.clone(); - todo.try_send(self).unwrap(); - } -} - -struct Yield(AtomicBool); - -impl Future for Yield { - type Output = (); - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let done = self.0.load(Ordering::Acquire); - match done { - true => Poll::Ready(()), - false => { - self.0.store(true, Ordering::Release); - cx.waker().wake_by_ref(); - Poll::Pending - } - } - } -} - -pub fn yield_now() -> impl Future { - Yield(AtomicBool::new(false)) -} - -struct Delay(Duration); - -impl Future for Delay { - type Output = (); - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let now = kvmclock::time_since_boot(); - if now >= self.0 { - return Poll::Ready(()); - } - let waker = cx.waker().clone(); - EXECUTOR.sleeping.insert(self.0.as_nanos() as u64, waker); - Poll::Pending - } -} - -pub fn delay_for(duration: Duration) -> impl Future { - delay_until(kvmclock::time_since_boot() + duration) -} - -pub fn delay_until(time: Duration) -> impl Future { - Delay(time) -} - -pub fn reset_stats() { - TIME_SLEEPING.store(0, Ordering::SeqCst); - TIME_WORKING.store(0, Ordering::SeqCst); - TIME_SCHEDULING.store(0, Ordering::SeqCst); -} - -pub fn profile() { - let scheduling = TIME_SCHEDULING.load(Ordering::SeqCst) as f64; - let working = TIME_WORKING.load(Ordering::SeqCst) as f64; - let sleeping = TIME_SLEEPING.load(Ordering::SeqCst) as f64; - let total = scheduling + working + sleeping; - use core::fmt::Write as _; - let mut console = CONSOLE.lock(); - writeln!(console, "***** RUNTIME *****").unwrap(); - writeln!( - console, - "time spent sleeping: {sleeping:12} ({:3.2}%)", - sleeping * 100. / total - ) - .unwrap(); - writeln!( - console, - "time spent working: {working:12} ({:3.2}%)", - working * 100. / total - ) - .unwrap(); - writeln!( - console, - "time spent scheduling: {scheduling:12} ({:3.2}%)", - scheduling * 100. / total - ) - .unwrap(); - writeln!(console, "*******************").unwrap(); -} - -struct WaitForInterrupt { - done: AtomicBool, - wfi: Sender, -} - -impl Future for WaitForInterrupt { - type Output = (); - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let done = self.done.load(Ordering::Acquire); - match done { - true => Poll::Ready(()), - false => { - self.done.store(true, Ordering::Release); - self.wfi.try_send(cx.waker().clone()).unwrap(); - Poll::Pending - } - } - } -} - -pub fn wfi() -> impl Future { - WaitForInterrupt { - done: AtomicBool::new(false), - wfi: EXECUTOR.wfi_tx.clone(), - } -} diff --git a/kernel/src/testing.rs b/kernel/src/testing.rs index cf133fd..49922af 100644 --- a/kernel/src/testing.rs +++ b/kernel/src/testing.rs @@ -8,6 +8,6 @@ pub(crate) fn harness(tests: &[&dyn Fn()]) { } #[kmain] -async fn main(_: &[usize]) { +fn main(_: &[usize]) { super::test_main(); } diff --git a/kernel/src/virtio.rs b/kernel/src/virtio.rs deleted file mode 100644 index cdf3be4..0000000 --- a/kernel/src/virtio.rs +++ /dev/null @@ -1,2 +0,0 @@ -pub mod virtqueue; -pub mod vsock; diff --git a/kernel/src/virtio/virtqueue.rs b/kernel/src/virtio/virtqueue.rs deleted file mode 100644 index 9d0e004..0000000 --- a/kernel/src/virtio/virtqueue.rs +++ /dev/null @@ -1,273 +0,0 @@ -use core::{ - future::Future, - task::{Poll, Waker}, -}; - -use crate::{io, prelude::*}; -use common::util::rwlock::RwLock; - -mod desc; -mod idx; -mod vring; - -use common::vhost::VirtQueueMetadata; -use desc::*; -use idx::*; -use vring::*; - -#[repr(C, packed)] -#[derive(Debug, Copy, Clone)] -pub struct UsedElement { - id: u32, - len: u32, -} - -#[derive(Default)] -pub struct NotificationChannel { - waker: Option, - data: Option, -} - -pub struct Notification { - channel: Arc>, -} - -impl Future for Notification { - type Output = usize; - - fn poll( - self: core::pin::Pin<&mut Self>, - cx: &mut core::task::Context<'_>, - ) -> Poll { - let mut channel = self.channel.lock(); - if let Some(value) = channel.data.take() { - Poll::Ready(value) - } else { - channel.waker = Some(cx.waker().clone()); - Poll::Pending - } - } -} - -pub struct VirtQueue { - _name: &'static str, - notifications: Box<[Arc>]>, - desc: RwLock, - used: RwLock>, - avail: RwLock>, -} - -impl VirtQueue { - /// # Safety - /// - /// `info` must describe a valid VirtQueue with attached device, where the negotiated features - /// match those available in this driver. The VirtQueue must not be attached to any other - /// driver. - pub unsafe fn new(name: &'static str, info: VirtQueueMetadata) -> Self { - let desc = core::ptr::from_raw_parts_mut( - vm::pa2ka::<()>(BuddyAllocator.from_offset::<()>(info.desc) as usize), - info.descriptors, - ); - let used = core::ptr::from_raw_parts_mut( - vm::pa2ka::<()>(BuddyAllocator.from_offset::<()>(info.used) as usize), - info.descriptors, - ); - let avail = core::ptr::from_raw_parts_mut( - vm::pa2ka::<()>(BuddyAllocator.from_offset::<()>(info.avail) as usize), - info.descriptors, - ); - let mut notifications = vec![]; - notifications.resize_with(info.descriptors, Default::default); - VirtQueue { - _name: name, - notifications: notifications.into(), - desc: RwLock::new(DescTable::new(name, desc)), - used: RwLock::new(DeviceRing::new(name, used)), - avail: RwLock::new(DriverRing::new(name, avail)), - } - } -} - -#[derive(Debug)] -pub struct BufferChain<'a> { - car: Buffer<'a>, - cdr: Option<&'a BufferChain<'a>>, -} - -impl<'a> BufferChain<'a> { - pub fn new(x: &'a [u8]) -> Self { - BufferChain { - car: Buffer::Immutable(x), - cdr: None, - } - } - - pub fn new_mut(x: &'a mut [u8]) -> Self { - BufferChain { - car: Buffer::Mutable(x), - cdr: None, - } - } - - pub fn cons<'b>(x: &'a [u8], other: Option<&'b BufferChain<'b>>) -> Self - where - 'b: 'a, - { - BufferChain { - car: Buffer::Immutable(x), - cdr: other, - } - } - - pub fn cons_mut<'b>(x: &'a mut [u8], other: Option<&'b BufferChain<'b>>) -> Self - where - 'b: 'a, - { - BufferChain { - car: Buffer::Mutable(x), - cdr: other, - } - } - - pub fn len(&self) -> usize { - 1 + match self.cdr { - Some(x) => x.len(), - None => 0, - } - } - - #[must_use] - pub fn is_empty(&self) -> bool { - self.len() == 0 - } - - pub fn size(&self) -> usize { - self.car.len() - + match self.cdr { - Some(x) => x.size(), - None => 0, - } - } -} - -#[derive(Debug)] -pub enum Buffer<'a> { - Immutable(&'a [u8]), - Mutable(&'a mut [u8]), -} - -impl Buffer<'_> { - pub fn len(&self) -> usize { - match self { - Buffer::Immutable(items) => items.len(), - Buffer::Mutable(items) => items.len(), - } - } - - #[must_use] - pub fn is_empty(&self) -> bool { - self.len() == 0 - } -} - -impl VirtQueue { - async fn load(&self, bufs: &BufferChain<'_>) -> DescriptorIndex { - unsafe { - let mut buf = Box::new_uninit_slice(bufs.len()); - let descs = loop { - let descs = { - let mut desc = self.desc.lock(); - desc.try_allocate_many(&mut buf) - }; - if let Some(descs) = descs { - break descs; - } - // TODO: handle descriptor unavailability better - log::warn!("out of descriptors"); - crate::rt::wfi().await; - }; - let mut head = None; - let mut previous = None; - let mut current = Some(bufs); - let mut i = 0; - while let Some(x) = current { - let idx = descs[i]; - let desc = self.desc.read(); - desc.get_mut_unchecked(idx).modify(|d| { - let (p, w) = match &x.car { - Buffer::Immutable(x) => (*x as *const [u8], false), - Buffer::Mutable(x) => (*x as *const [u8], true), - }; - let (addr, len) = p.to_raw_parts(); - d.addr = addr as *mut (); - d.len = len; - d.next = None; - d.device_writeable = w; - }); - if let Some(previous) = previous { - desc.get_mut_unchecked(previous).modify(|d| { - d.next = Some(idx); - }); - } - previous = Some(idx); - - if head.is_none() { - head = Some(idx); - } - current = x.cdr; - i += 1; - } - head.unwrap() - } - } - - async unsafe fn notification(&self, head: DescriptorIndex) -> usize { - Notification { - channel: self.notifications[head.get() as usize].clone(), - } - .await - } - - async unsafe fn mark_avail(&self, head: DescriptorIndex) { - let mut avail = self.avail.lock(); - avail.send(head); - } - - async unsafe fn mark_free(&self, head: DescriptorIndex) { - let mut desc = self.desc.lock(); - desc.liberate(head); - } - - async unsafe fn send_interrupt(&self) { - if !self.used.read().avail_notifications_suppressed() { - io::outl(0xf4, 0); - } - } - - pub async fn send(&self, bufs: &BufferChain<'_>) -> usize { - unsafe { - let head = self.load(bufs).await; - self.mark_avail(head).await; - self.send_interrupt().await; - let result = self.notification(head).await; - self.mark_free(head).await; - result - } - } - - pub fn poll(&self) { - let Some(mut used) = self.used.try_lock() else { - return; - }; - unsafe { - while let Some(used) = used.recv() { - let mut notification = self.notifications[used.id as usize].lock(); - notification.data = Some(used.len as usize); - if let Some(waker) = notification.waker.take() { - core::mem::drop(notification); - waker.wake(); - } - } - } - } -} diff --git a/kernel/src/virtio/virtqueue/desc.rs b/kernel/src/virtio/virtqueue/desc.rs deleted file mode 100644 index d27cb76..0000000 --- a/kernel/src/virtio/virtqueue/desc.rs +++ /dev/null @@ -1,155 +0,0 @@ -use core::{ - mem::MaybeUninit, - sync::atomic::{AtomicUsize, Ordering}, -}; - -use super::*; - -#[derive(Debug)] -pub struct DescTable { - name: &'static str, - free_index: Option, - free_descs: AtomicUsize, - table: *mut [Desc], -} - -unsafe impl Send for DescTable {} - -impl DescTable { - pub unsafe fn new(name: &'static str, p: *mut [Desc]) -> Self { - let mut x = Self { - name, - free_index: None, - free_descs: AtomicUsize::new(0), - table: p, - }; - for i in 0..p.len() { - x.liberate(i.into()); - } - x - } - - pub fn get_mut(&mut self, index: DescriptorIndex) -> &mut Desc { - unsafe { &mut (*self.table)[usize::from(index)] } - } - - #[allow(clippy::mut_from_ref)] - pub unsafe fn get_mut_unchecked(&self, index: DescriptorIndex) -> &mut Desc { - unsafe { &mut (*self.table)[usize::from(index)] } - } - - pub fn try_allocate(&mut self) -> Option { - let idx = self.free_index?; - // safety: since this was on the free list, the device isn't using it - self.free_index = self.get_mut(idx).modify(|desc| { - let next = desc.next; - desc.next = None; - next - }); - log::debug!("{} allocated {idx:?}", self.name); - self.free_descs.fetch_sub(1, Ordering::SeqCst); - Some(idx) - } - - pub fn try_allocate_many<'buf>( - &mut self, - buf: &'buf mut [MaybeUninit], - ) -> Option<&'buf mut [DescriptorIndex]> { - if buf.is_empty() { - return Some(unsafe { buf.assume_init_mut() }); - } - if self.free_descs.load(Ordering::SeqCst) < buf.len() { - return None; - } - let idx = self.try_allocate()?; - if self.try_allocate_many(&mut buf[1..]).is_none() { - self.liberate(idx); - None - } else { - let buf = unsafe { buf.assume_init_mut() }; - buf[0] = idx; - Some(buf) - } - } - - pub fn liberate(&mut self, idx: DescriptorIndex) { - let head = self.free_index; - log::debug!("{} liberated {idx:?}", self.name); - let current = self.get_mut(idx); - let mut tbd = None; - current.modify(|current| { - current.addr = core::ptr::null_mut(); - current.len = 0; - tbd = current.next; - current.next = head; - }); - self.free_index = Some(idx); - self.free_descs.fetch_add(1, Ordering::SeqCst); - if let Some(tbd) = tbd { - self.liberate(tbd); - } - } -} - -mod inner { - use super::*; - - #[repr(C)] - #[derive(Debug, Copy, Clone)] - pub struct Desc { - addr: u64, - len: u32, - flags: u16, - next: u16, - } - - pub struct DescData { - pub addr: *mut (), - pub len: usize, - pub device_writeable: bool, - pub next: Option, - } - - impl Desc { - pub fn get(&self) -> DescData { - let Desc { - addr, - len, - flags, - next, - } = unsafe { (&raw const *self).read_volatile() }; - DescData { - addr: vm::pa2ka(addr as usize), - len: len as usize, - device_writeable: flags & 2 == 2, - next: if flags & 1 == 1 { - Some(next.into()) - } else { - None - }, - } - } - - pub fn set(&mut self, value: DescData) { - let desc = Desc { - addr: vm::ka2pa(value.addr) as u64, - len: value.len as u32, - flags: if value.device_writeable { 2 } else { 0 } - | if value.next.is_some() { 1 } else { 0 }, - next: value.next.map(|x| x.get()).unwrap_or(0), - }; - unsafe { (&raw mut *self).write_volatile(desc) } - } - - pub fn modify(&mut self, f: impl FnOnce(&mut DescData) -> T) -> T { - let mut desc = self.get(); - let result = f(&mut desc); - self.set(desc); - result - } - } - - // TODO: is this necessary? - unsafe impl Sync for Desc {} -} -use inner::*; diff --git a/kernel/src/virtio/virtqueue/idx.rs b/kernel/src/virtio/virtqueue/idx.rs deleted file mode 100644 index afd6e39..0000000 --- a/kernel/src/virtio/virtqueue/idx.rs +++ /dev/null @@ -1,55 +0,0 @@ -#[derive(Copy, Clone, Debug, Hash, Eq, PartialEq)] -#[repr(transparent)] -pub struct DescriptorIndex(u16); - -impl DescriptorIndex { - pub unsafe fn from(&self, idx: u16) -> Self { - Self(idx) - } - - pub fn get(&self) -> u16 { - self.0 - } -} - -impl From for DescriptorIndex { - fn from(value: u16) -> Self { - DescriptorIndex(value) - } -} - -impl From for DescriptorIndex { - fn from(value: u32) -> Self { - DescriptorIndex(value as u16) - } -} - -impl From for DescriptorIndex { - fn from(value: usize) -> Self { - DescriptorIndex(value as u16) - } -} - -impl From for u16 { - fn from(value: DescriptorIndex) -> Self { - value.0 - } -} - -impl From for u32 { - fn from(value: DescriptorIndex) -> Self { - value.0 as u32 - } -} - -impl From for usize { - fn from(value: DescriptorIndex) -> Self { - value.0 as usize - } -} - -impl AsRef for DescriptorIndex { - fn as_ref(&self) -> &u16 { - &self.0 - } -} diff --git a/kernel/src/virtio/virtqueue/vring.rs b/kernel/src/virtio/virtqueue/vring.rs deleted file mode 100644 index 41a113d..0000000 --- a/kernel/src/virtio/virtqueue/vring.rs +++ /dev/null @@ -1,119 +0,0 @@ -#[repr(C)] -pub struct Ring { - flags: u16, - idx: u16, - ring: [T], -} - -impl Ring { - pub fn raw_index(&self) -> u16 { - unsafe { (&raw const self.idx).read_volatile() } - } - - pub fn len(&self) -> usize { - self.ring.len() - } - - pub fn get(&self, idx: u16) -> T { - unsafe { (&raw const self.ring[idx as usize % self.len()]).read_volatile() } - } - - pub fn set(&mut self, idx: u16, value: T) { - unsafe { - (&raw mut self.ring[idx as usize % self.len()]).write_volatile(value); - } - } - - pub fn inc(&mut self) { - unsafe { - (&raw mut self.idx).write_volatile(self.raw_index().wrapping_add(1)); - } - } - - pub fn flags(&self) -> u16 { - unsafe { (&raw const self.flags).read_volatile() } - } - - pub fn set_flags(&mut self, value: u16) { - unsafe { (&raw mut self.flags).write_volatile(value) } - } -} - -#[derive(Debug)] -pub struct DeviceRing { - name: &'static str, - next_read: u16, - ring: *mut Ring, -} - -impl DeviceRing { - pub unsafe fn new(name: &'static str, ring: *mut Ring) -> Self { - let mut this = Self { - name, - next_read: 0, - ring, - }; - this.ring_mut().set_flags(1); // we don't need notifications from Linux - this - } - - fn ring(&self) -> &Ring { - unsafe { &*self.ring } - } - - fn ring_mut(&mut self) -> &mut Ring { - unsafe { &mut *self.ring } - } - - pub unsafe fn recv(&mut self) -> Option { - if self.next_read != self.ring().raw_index() { - log::debug!("{} is receiving", self.name); - let value = self.ring().get(self.next_read); - self.next_read = self.next_read.wrapping_add(1); - Some(value) - } else { - None - } - } - - pub fn avail_notifications_suppressed(&self) -> bool { - self.ring().flags() != 0 - } -} - -#[derive(Debug)] -pub struct DriverRing { - name: &'static str, - ring: *mut Ring, -} - -impl DriverRing { - pub unsafe fn new(name: &'static str, ring: *mut Ring) -> Self { - Self { name, ring } - } - - fn ring(&self) -> &Ring { - unsafe { &*self.ring } - } - - fn ring_mut(&mut self) -> &mut Ring { - unsafe { &mut *self.ring } - } - - pub unsafe fn send(&mut self, value: T) { - log::debug!("{} is sending", self.name); - let idx = self.ring().raw_index(); - self.ring_mut().set(idx, value); - self.ring_mut().inc(); - } - - #[allow(unused)] - pub fn suppress_used_notifications(&mut self, suppressed: bool) { - self.ring_mut().set_flags(if suppressed { 1 } else { 0 }); - } -} - -unsafe impl Send for DeviceRing {} -unsafe impl Send for DriverRing {} -unsafe impl Sync for DeviceRing {} -unsafe impl Sync for DriverRing {} diff --git a/kernel/src/virtio/vsock.rs b/kernel/src/virtio/vsock.rs deleted file mode 100644 index 984ab04..0000000 --- a/kernel/src/virtio/vsock.rs +++ /dev/null @@ -1,87 +0,0 @@ -use crate::prelude::*; - -pub mod addr; -pub(crate) mod driver; -pub mod flow; -pub mod header; -pub mod listener; -pub mod message; -pub mod stream; - -pub use addr::*; -use common::util::channel::{RecvError, SendError}; -pub use driver::*; -pub use flow::*; -pub use header::*; -pub use listener::*; -pub use message::*; -pub use stream::*; - -use async_lock::RwLock; - -#[derive(Debug)] -pub enum SocketError { - InvalidAddress, - AddressInUse(SocketAddr), - ConnectionReset, - ConnectionClosed, - ConnectionFailed, -} - -pub type Result = core::result::Result; - -impl From> for SocketError { - fn from(_: SendError) -> Self { - SocketError::ConnectionClosed - } -} - -impl From for SocketError { - fn from(_: RecvError) -> Self { - SocketError::ConnectionClosed - } -} - -pub(crate) static DRIVER: OnceLock> = OnceLock::new(); - -pub(crate) async fn listen(addr: SocketAddr) -> Arc> { - let driver = &DRIVER; - driver.listen(addr).await -} - -pub(crate) async fn accept(flow: Flow) -> Arc> { - let driver = &DRIVER; - driver.accept(flow).await -} - -pub(crate) async fn connect(flow: Flow) -> Arc> { - let driver = &DRIVER; - driver.connect(flow).await -} - -pub(crate) async fn send(flow: Flow, buf: &[u8]) -> usize { - let driver = &DRIVER; - driver.send(flow, buf).await -} - -pub(crate) async fn shutdown(flow: Flow, rx: bool, tx: bool) { - let driver = &DRIVER; - driver.shutdown(flow, rx, tx).await -} - -pub(crate) async fn rst(flow: Flow) { - let driver = &DRIVER; - driver.rst(flow).await -} - -#[allow(unused)] -pub(crate) async fn listeners() -> Vec { - let driver = &DRIVER; - driver.listeners().await -} - -#[allow(unused)] -pub(crate) async fn streams() -> Vec { - let driver = &DRIVER; - driver.streams().await -} diff --git a/kernel/src/virtio/vsock/addr.rs b/kernel/src/virtio/vsock/addr.rs deleted file mode 100644 index c6c5629..0000000 --- a/kernel/src/virtio/vsock/addr.rs +++ /dev/null @@ -1,34 +0,0 @@ -use core::str::FromStr; - -#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash, PartialOrd, Ord)] -pub struct SocketAddr { - pub cid: u64, - pub port: u32, -} - -impl SocketAddr { - pub fn new(cid: u64, port: u32) -> SocketAddr { - Self { cid, port } - } -} - -pub struct InvalidSockAddr; - -impl FromStr for SocketAddr { - type Err = InvalidSockAddr; - - fn from_str(s: &str) -> Result { - let (cid, port) = s.split_once(":").ok_or(InvalidSockAddr)?; - let cid: u64 = str::parse(cid).map_err(|_| InvalidSockAddr)?; - let port: u32 = str::parse(port).map_err(|_| InvalidSockAddr)?; - Ok(SocketAddr { cid, port }) - } -} - -impl TryFrom<&str> for SocketAddr { - type Error = InvalidSockAddr; - - fn try_from(value: &str) -> Result { - SocketAddr::from_str(value) - } -} diff --git a/kernel/src/virtio/vsock/driver.rs b/kernel/src/virtio/vsock/driver.rs deleted file mode 100644 index 06b7c64..0000000 --- a/kernel/src/virtio/vsock/driver.rs +++ /dev/null @@ -1,421 +0,0 @@ -use core::sync::atomic::{AtomicUsize, Ordering}; - -use alloc::collections::btree_map::BTreeMap; -use alloc::collections::vec_deque::VecDeque; -use alloc::sync::Arc; -use common::vhost::VSockMetadata; - -use super::*; -use crate::rt; -use crate::virtio::virtqueue::*; -use async_lock::RwLock; - -pub(crate) struct Driver { - rx: VirtQueue, - tx: VirtQueue, - status: Status, - listeners: RwLock>>>, - streams: RwLock>>>, -} - -#[derive(Debug, Default)] -struct Status { - peer_buf_alloc: AtomicUsize, - peer_fwd_cnt: AtomicUsize, - tx_cnt: AtomicUsize, - buf_alloc: AtomicUsize, - fwd_cnt: AtomicUsize, -} - -impl Driver { - /// # Safety - /// - /// `info` must describe a valid VSock, which has an attached device but no driver. Feature - /// negotiation must have completed and match the features available in this driver. - pub unsafe fn new(info: VSockMetadata) -> Arc { - let len = info.rx.descriptors; - let rx = VirtQueue::new("rx", info.rx); - let tx = VirtQueue::new("tx", info.tx); - let this = Arc::new(Self { - rx, - tx, - status: Status::default(), - listeners: Default::default(), - streams: Default::default(), - }); - for _ in 0..len / 2 { - crate::rt::spawn(this.clone().recv_task()); - } - crate::rt::spawn(this.clone().poll_task()); - - this - } - - #[rt::profile] - pub async fn listen(&self, addr: SocketAddr) -> Arc> { - let mut addr = addr; - - let mut listeners = self.listeners.write().await; - - if addr.port == 0 { - let mut found = false; - for port in 49152..=65535 { - addr.port = port; - if !listeners.contains_key(&addr) { - found = true; - break; - } - } - if !found { - panic!("out of ports"); - } - } - - let socket = Arc::new(RwLock::new(ListenSocket { - addr, - waker: None, - queue: VecDeque::new(), - })); - let weak = Arc::downgrade(&socket); - if let Some(x) = listeners.get(&addr) { - if x.strong_count() >= 1 { - panic!("addr {addr:?} in use!"); - } - } - listeners.insert(addr, weak); - core::mem::drop(listeners); - socket - } - - #[rt::profile] - pub async fn accept(&self, flow: Flow) -> Arc> { - let mut streams = self.streams.write().await; - assert!(!streams.contains_key(&flow.reverse())); - let socket = Arc::new(RwLock::new(StreamSocket { - outbound: flow, - waker: None, - queue: VecDeque::new(), - })); - streams.insert(flow.reverse(), Arc::downgrade(&socket)); - core::mem::drop(streams); - self.response(flow).await; - socket - } - - #[rt::profile] - pub async fn connect(&self, flow: Flow) -> Arc> { - let mut flow = flow; - let mut streams = self.streams.write().await; - - if flow.src.port == 0 { - let mut found = false; - for port in 49152..=65535 { - flow.src.port = port; - if !streams.contains_key(&flow.reverse()) { - found = true; - break; - } - } - if !found { - panic!("out of ports"); - } - } - - let socket = Arc::new(RwLock::new(StreamSocket { - outbound: flow, - waker: None, - queue: VecDeque::new(), - })); - let weak = Arc::downgrade(&socket); - if let Some(x) = streams.get(&flow.reverse()) { - if x.strong_count() >= 1 { - panic!("flow {flow:?} in use!"); - } - } - streams.insert(flow.reverse(), weak); - core::mem::drop(streams); - self.request(flow).await; - socket - } - - #[rt::profile] - async fn send_message(&self, msg: OutgoingMessage, buffers: Option<&BufferChain<'_>>) -> usize { - unsafe { - log::debug!("-> {msg:?}"); - let mut header: Header = msg.into(); - let len = buffers.map(|x| x.size()).unwrap_or(0); - let mut waiting = false; - loop { - let status = &self.status; - let peer_buf_alloc = status.peer_buf_alloc.load(Ordering::SeqCst); - let peer_fwd_count = status.peer_fwd_cnt.load(Ordering::SeqCst); - let tx_cnt = status.tx_cnt.load(Ordering::SeqCst); - - let tx_free = peer_buf_alloc.wrapping_sub(tx_cnt.wrapping_sub(peer_fwd_count)); - if tx_free < len { - waiting = true; - log::info!("waiting for rx capacity {tx_free} {len}"); - crate::rt::wfi().await; - continue; - } - - if waiting { - log::warn!("rx okay"); - } - - let buf_alloc = status.buf_alloc.load(Ordering::SeqCst); - let fwd_count = status.fwd_cnt.load(Ordering::SeqCst); - - header.buf_alloc = buf_alloc as u32; - header.fwd_cnt = fwd_count as u32; - - status.tx_cnt.fetch_sub(len, Ordering::SeqCst); - - let header_buf: &mut [u8; 44] = core::mem::transmute(&mut header); - let buffers = BufferChain::cons(header_buf, buffers); - return self.tx.send(&buffers).await; - } - } - } - - #[rt::profile] - pub async fn rst(&self, flow: Flow) { - self.send_message( - OutgoingMessage { - flow, - message: Outgoing::Rst, - }, - None, - ) - .await; - } - - #[rt::profile] - pub async fn response(&self, flow: Flow) { - self.send_message( - OutgoingMessage { - flow, - message: Outgoing::Response, - }, - None, - ) - .await; - } - - #[rt::profile] - pub async fn request(&self, flow: Flow) { - self.send_message( - OutgoingMessage { - flow, - message: Outgoing::Request, - }, - None, - ) - .await; - } - - #[rt::profile] - pub async fn send(&self, flow: Flow, buf: &[u8]) -> usize { - let chain = BufferChain::new(buf); - self.send_message( - OutgoingMessage { - flow, - message: Outgoing::Write(buf.len()), - }, - Some(&chain), - ) - .await - } - - #[rt::profile] - pub async fn shutdown(&self, flow: Flow, rx: bool, tx: bool) { - self.send_message( - OutgoingMessage { - flow, - message: Outgoing::Shutdown { rx, tx }, - }, - None, - ) - .await; - } - - pub async fn update(&self, flow: Flow) { - self.send_message( - OutgoingMessage { - flow, - message: Outgoing::CreditUpdate, - }, - None, - ) - .await; - } - - #[rt::profile] - async fn recv_task(self: Arc) { - unsafe { - loop { - let mut payload_buf = vec![0; 65536]; - payload_buf.resize(65536, 0); - let mut header = Header::default(); - let status = &self.status; - let len = payload_buf.len(); - status.buf_alloc.fetch_add(len, Ordering::SeqCst); - - let header_buf: &mut [u8; 44] = core::mem::transmute(&mut header); - let payload_chain = BufferChain::new_mut(&mut payload_buf); - let chain = BufferChain::cons_mut(header_buf, Some(&payload_chain)); - - let read = self.rx.send(&chain).await; - - status.buf_alloc.fetch_sub(len, Ordering::SeqCst); - status.fwd_cnt.fetch_add(read - 44, Ordering::SeqCst); - status - .peer_buf_alloc - .store(header.buf_alloc as usize, Ordering::SeqCst); - status - .peer_fwd_cnt - .store(header.fwd_cnt as usize, Ordering::SeqCst); - - assert!(len >= 44); - let incoming = header.into(); - log::debug!("<- {incoming:?}"); - let IncomingMessage { flow, message } = incoming; - - match message { - Incoming::Invalid(_) => self.rst(flow).await, - Incoming::Request => { - let listeners = self.listeners.read().await; - if let Some(socket) = listeners.get(&flow.dst).and_then(Weak::upgrade) { - core::mem::drop(listeners); - let mut socket = socket.write().await; - socket.queue.push_back(flow.reverse()); - if let Some(waker) = socket.waker.take() { - core::mem::drop(socket); - waker.wake(); - } - } else { - core::mem::drop(listeners); - log::warn!("got incoming request {flow:?}, but no listener"); - log::warn!("current listeners: {:?}", self.listeners().await); - self.rst(flow.reverse()).await - }; - } - Incoming::Response => { - let streams = self.streams.read().await; - if let Some(socket) = streams.get(&flow).and_then(Weak::upgrade) { - core::mem::drop(streams); - let mut socket = socket.write().await; - socket.queue.push_back(StreamEvent::Connect); - if let Some(waker) = socket.waker.take() { - core::mem::drop(socket); - waker.wake(); - } - } else { - log::warn!("got response {flow:?}, but no stream"); - self.rst(flow.reverse()).await - }; - } - Incoming::Rst => { - let streams = self.streams.read().await; - if let Some(socket) = streams.get(&flow).and_then(Weak::upgrade) { - core::mem::drop(streams); - let mut socket = socket.write().await; - socket.queue.push_back(StreamEvent::Reset); - if let Some(waker) = socket.waker.take() { - core::mem::drop(socket); - waker.wake(); - } - } - } - Incoming::Shutdown { rx, tx } => { - let streams = self.streams.read().await; - if let Some(socket) = streams.get(&flow).and_then(Weak::upgrade) { - core::mem::drop(streams); - let mut socket = socket.write().await; - socket.queue.push_back(StreamEvent::Shutdown { rx, tx }); - if let Some(waker) = socket.waker.take() { - core::mem::drop(socket); - waker.wake(); - } - } else { - // log::warn!("got shutdown for {flow:?}, but no stream"); - }; - } - Incoming::Read(len) => { - let streams = self.streams.read().await; - payload_buf.truncate(len); - if let Some(socket) = streams.get(&flow).and_then(Weak::upgrade) { - core::mem::drop(streams); - let mut socket = socket.write().await; - socket - .queue - .push_back(StreamEvent::Data { data: payload_buf }); - if let Some(waker) = socket.waker.take() { - core::mem::drop(socket); - waker.wake(); - } - } else { - log::warn!("got data for {flow:?}, but no stream"); - self.rst(flow.reverse()).await - }; - } - Incoming::CreditUpdate => {} - Incoming::CreditRequest => self.update(flow.reverse()).await, - } - } - } - } - - async fn poll_task(self: Arc) { - loop { - self.poll(); - crate::rt::wfi().await; - } - } - - pub fn poll(&self) { - self.rx.poll(); - self.tx.poll(); - } - - pub async fn listeners(&self) -> Vec { - self.listeners - .read() - .await - .iter() - .filter_map(|(k, v)| { - if v.strong_count() >= 1 { - Some(*k) - } else { - None - } - }) - .collect() - } - - pub async fn streams(&self) -> Vec { - self.streams - .read() - .await - .iter() - .filter_map(|(k, v)| { - if v.strong_count() >= 1 { - Some(*k) - } else { - None - } - }) - .collect() - } -} - -#[derive(Debug)] -pub enum StreamEvent { - Reset, - Connect, - Shutdown { rx: bool, tx: bool }, - Data { data: Vec }, -} - -// TODO: why do we need this? -unsafe impl Sync for Driver {} diff --git a/kernel/src/virtio/vsock/flow.rs b/kernel/src/virtio/vsock/flow.rs deleted file mode 100644 index 9d622f7..0000000 --- a/kernel/src/virtio/vsock/flow.rs +++ /dev/null @@ -1,16 +0,0 @@ -use super::*; - -#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash, PartialOrd, Ord)] -pub struct Flow { - pub src: SocketAddr, - pub dst: SocketAddr, -} - -impl Flow { - pub fn reverse(&self) -> Flow { - Flow { - dst: self.src, - src: self.dst, - } - } -} diff --git a/kernel/src/virtio/vsock/header.rs b/kernel/src/virtio/vsock/header.rs deleted file mode 100644 index e850612..0000000 --- a/kernel/src/virtio/vsock/header.rs +++ /dev/null @@ -1,18 +0,0 @@ -#[derive(Debug, Copy, Clone, Default)] -#[repr(C, packed)] -pub struct Header { - pub src_cid: u64, - pub dst_cid: u64, - pub src_port: u32, - pub dst_port: u32, - pub len: u32, - pub ptype: u16, - pub op: u16, - pub flags: u32, - pub buf_alloc: u32, - pub fwd_cnt: u32, -} - -const _: () = const { - assert!(core::mem::size_of::
() == 44); -}; diff --git a/kernel/src/virtio/vsock/listener.rs b/kernel/src/virtio/vsock/listener.rs deleted file mode 100644 index f4b106b..0000000 --- a/kernel/src/virtio/vsock/listener.rs +++ /dev/null @@ -1,81 +0,0 @@ -use core::{ - future::Future, - pin::pin, - task::{Context, Poll, Waker}, -}; - -use alloc::collections::vec_deque::VecDeque; - -use crate::rt; - -use super::*; - -pub(crate) struct ListenSocket { - pub addr: SocketAddr, - pub waker: Option, - pub queue: VecDeque, -} - -impl Drop for ListenSocket { - fn drop(&mut self) { - panic!("who dropped a ListenSocket?"); - } -} - -pub struct StreamListener { - socket: Arc>, - pub addr: SocketAddr, -} - -impl Drop for StreamListener { - fn drop(&mut self) { - panic!("who dropped a StreamListener?"); - } -} - -struct Accept { - socket: Arc>, -} - -impl StreamListener { - #[rt::profile] - pub async fn bind(cid: u64, port: u32) -> Result { - let rx = listen(SocketAddr { cid, port }).await; - let addr = rx.read().await.addr; - Ok(StreamListener { socket: rx, addr }) - } - - #[rt::profile] - pub async fn accept(&self) -> Result { - let flow = Accept { - socket: self.socket.clone(), - } - .await; - let socket = accept(flow).await; - Ok(Stream::new(socket).await) - } -} - -impl Future for Accept { - type Output = Flow; - - fn poll( - self: core::pin::Pin<&mut Self>, - cx: &mut core::task::Context<'_>, - ) -> core::task::Poll { - let mut fake_cx = Context::from_waker(Waker::noop()); - let mut socket = loop { - let socket = pin! {self.socket.write()}; - if let Poll::Ready(result) = Future::poll(socket, &mut fake_cx) { - break result; - } - core::hint::spin_loop(); - }; - if let Some(x) = socket.queue.pop_front() { - Poll::Ready(x) - } else { - socket.waker = Some(cx.waker().clone()); - Poll::Pending - } - } -} diff --git a/kernel/src/virtio/vsock/message.rs b/kernel/src/virtio/vsock/message.rs deleted file mode 100644 index 558c085..0000000 --- a/kernel/src/virtio/vsock/message.rs +++ /dev/null @@ -1,123 +0,0 @@ -use super::*; - -#[repr(u8)] -#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)] -pub enum PacketOperation { - Request = 1, - Response = 2, - Rst = 3, - Shutdown = 4, - ReadWrite = 5, - CreditUpdate = 6, - CreditRequest = 7, -} - -#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)] -pub enum Outgoing { - Request, - Response, - Rst, - Shutdown { rx: bool, tx: bool }, - Write(usize), - CreditUpdate, - CreditRequest, -} - -impl From for PacketOperation { - fn from(value: Outgoing) -> Self { - match value { - Outgoing::Request => PacketOperation::Request, - Outgoing::Response => PacketOperation::Response, - Outgoing::Rst => PacketOperation::Rst, - Outgoing::Shutdown { rx: _, tx: _ } => PacketOperation::Shutdown, - Outgoing::Write(_) => PacketOperation::ReadWrite, - Outgoing::CreditUpdate => PacketOperation::CreditUpdate, - Outgoing::CreditRequest => PacketOperation::CreditRequest, - } - } -} - -#[derive(Debug, Clone, Eq, PartialEq, Hash)] -pub struct OutgoingMessage { - pub flow: Flow, - pub message: Outgoing, -} - -#[derive(Debug, Clone, Eq, PartialEq, Hash)] -pub enum Incoming { - Invalid(u16), - Request, - Response, - Rst, - Shutdown { rx: bool, tx: bool }, - Read(usize), - CreditUpdate, - CreditRequest, -} - -#[derive(Debug, Clone, Eq, PartialEq, Hash)] -pub struct IncomingMessage { - pub flow: Flow, - pub message: Incoming, -} - -impl From
for IncomingMessage { - fn from(value: Header) -> Self { - let flow = Flow { - src: SocketAddr { - cid: value.src_cid, - port: value.src_port, - }, - dst: SocketAddr { - cid: value.dst_cid, - port: value.dst_port, - }, - }; - let message = match value.op { - 1 => Incoming::Request, - 2 => Incoming::Response, - 3 => Incoming::Rst, - 4 => Incoming::Shutdown { - rx: value.flags & 1 == 1, - tx: value.flags & 2 == 2, - }, - 5 => Incoming::Read(value.len as usize), - 6 => Incoming::CreditUpdate, - 7 => Incoming::CreditRequest, - x => Incoming::Invalid(x), - }; - IncomingMessage { flow, message } - } -} - -impl From for Header { - fn from(value: OutgoingMessage) -> Self { - if value.flow.dst.cid != 2 { - log::error!("trying to send outgoing message to {:?}", value.flow); - } - Header { - src_cid: value.flow.src.cid, - src_port: value.flow.src.port, - dst_cid: value.flow.dst.cid, - dst_port: value.flow.dst.port, - len: if let Outgoing::Write(x) = value.message { - x as u32 - } else { - 0 - }, - ptype: 1, // TODO: seqpacket support - op: PacketOperation::from(value.message) as u16, - flags: if let Outgoing::Shutdown { rx: true, tx: _ } = value.message { - 1 - } else { - 0 - } | if let Outgoing::Shutdown { rx: _, tx: true } = value.message { - 2 - } else { - 0 - }, - buf_alloc: 0, - fwd_cnt: 0, - } - } -} diff --git a/kernel/src/virtio/vsock/stream.rs b/kernel/src/virtio/vsock/stream.rs deleted file mode 100644 index fbb1214..0000000 --- a/kernel/src/virtio/vsock/stream.rs +++ /dev/null @@ -1,199 +0,0 @@ -use core::{ - future::Future, - pin::pin, - sync::atomic::{AtomicBool, Ordering}, - task::{Context, Poll, Waker}, -}; - -use alloc::collections::vec_deque::VecDeque; - -use crate::rt; - -use super::*; - -pub(crate) struct StreamSocket { - pub outbound: Flow, - pub waker: Option, - pub queue: VecDeque, -} - -pub struct Stream { - socket: Arc>, - peer_rx_closed: AtomicBool, - peer_tx_closed: AtomicBool, - closed: AtomicBool, - last_read: Mutex>>, - outbound: Flow, -} - -pub struct Read { - socket: Arc>, -} - -impl Stream { - pub(crate) async fn new(socket: Arc>) -> Stream { - let outbound = socket.read().await.outbound; - Stream { - socket, - peer_rx_closed: false.into(), - peer_tx_closed: false.into(), - closed: false.into(), - last_read: Mutex::new(None), - outbound, - } - } - - #[rt::profile] - pub async fn connect(cid: u64, peer: impl TryInto) -> Result { - let local = SocketAddr { cid, port: 0 }; - let outbound = Flow { - src: local, - dst: peer.try_into().map_err(|_| SocketError::InvalidAddress)?, - }; - let socket = connect(outbound).await; - let result = Read { - socket: socket.clone(), - } - .await; - let StreamEvent::Connect = result else { - return Err(SocketError::ConnectionFailed); - }; - Ok(Stream::new(socket).await) - } - - #[rt::profile] - pub async fn recv(&self, bytes: &mut [u8]) -> Result { - if bytes.is_empty() { - return Ok(0); - } - let mut last_read = self.last_read.lock().await; - if let Some(rest) = last_read.as_mut() { - let n = core::cmp::min(bytes.len(), rest.len()); - for item in bytes.iter_mut().take(n) { - *item = rest.pop_front().unwrap(); - } - if rest.is_empty() { - *last_read = None; - } - Ok(n) - } else { - loop { - let result = Read { - socket: self.socket.clone(), - } - .await; - return match result { - StreamEvent::Reset => Err(SocketError::ConnectionReset), - StreamEvent::Connect => Err(SocketError::ConnectionReset), - StreamEvent::Shutdown { rx, tx } => { - let rx = if rx { - self.peer_rx_closed.store(true, Ordering::SeqCst); - true - } else { - self.peer_rx_closed.load(Ordering::SeqCst) - }; - let tx = if tx { - self.peer_tx_closed.store(true, Ordering::SeqCst); - true - } else { - self.peer_tx_closed.load(Ordering::SeqCst) - }; - if rx && tx { - rst(self.outbound).await; - } - if tx { - Err(SocketError::ConnectionClosed) - } else { - continue; - } - } - StreamEvent::Data { data } => { - let n = core::cmp::min(data.len(), bytes.len()); - let mut rest: VecDeque = data.into(); - for item in bytes.iter_mut().take(n) { - *item = rest.pop_front().unwrap(); - } - if !rest.is_empty() { - *last_read = Some(rest); - } - return Ok(n); - } - }; - } - } - } - - #[rt::profile] - pub async fn send(&self, buf: &[u8]) -> Result { - if self.peer_rx_closed.load(Ordering::SeqCst) { - Err(SocketError::ConnectionClosed) - } else { - send(self.outbound, buf).await; - Ok(buf.len()) - } - } - - #[rt::profile] - async fn close_internal(&mut self) -> Result<()> { - if self.closed.fetch_or(true, Ordering::SeqCst) { - return Ok(()); - } - shutdown(self.outbound, true, true).await; - - loop { - let result = Read { - socket: self.socket.clone(), - } - .await; - if let StreamEvent::Reset = result { - return Ok(()); - }; - } - } - - #[rt::profile] - pub async fn close(mut self) -> Result<()> { - self.close_internal().await?; - Ok(()) - } - - pub fn peer(&self) -> SocketAddr { - self.outbound.dst - } - - pub fn local(&self) -> SocketAddr { - self.outbound.src - } -} - -impl Drop for Stream { - fn drop(&mut self) { - if !self.closed.load(Ordering::SeqCst) { - let _ = crate::rt::spawn_blocking(self.close_internal()); - } - } -} - -impl Future for Read { - type Output = StreamEvent; - - fn poll( - self: core::pin::Pin<&mut Self>, - cx: &mut core::task::Context<'_>, - ) -> Poll { - let mut fake_cx = Context::from_waker(Waker::noop()); - let mut socket = loop { - let socket = pin! {self.socket.write()}; - if let Poll::Ready(result) = Future::poll(socket, &mut fake_cx) { - break result; - } - core::hint::spin_loop(); - }; - if let Some(x) = socket.queue.pop_front() { - Poll::Ready(x) - } else { - socket.waker = Some(cx.waker().clone()); - Poll::Pending - } - } -} diff --git a/macros/src/util.rs b/macros/src/util.rs index 108c33a..8cc983c 100644 --- a/macros/src/util.rs +++ b/macros/src/util.rs @@ -1,22 +1,10 @@ use proc_macro::TokenStream; -use proc_macro2::Span; -use proc_macro_crate::{crate_name, FoundCrate}; -use quote::{format_ident, quote}; -use syn::{parse_macro_input, Ident, ItemFn}; - -fn kernel_ident() -> Ident { - let found_crate = crate_name("kernel").expect("kernel is present in `Cargo.toml`"); - - match found_crate { - FoundCrate::Itself => format_ident!("crate"), - FoundCrate::Name(name) => Ident::new(&name, Span::call_site()), - } -} +use quote::quote; +use syn::{parse_macro_input, ItemFn}; pub fn kmain(_: TokenStream, item: TokenStream) -> TokenStream { let item = parse_macro_input!(item as ItemFn); let ident = item.sig.ident.clone(); - let kernel = kernel_ident(); quote! { #[no_mangle] extern "C" fn kmain(argc: usize, argv: *const usize) { @@ -26,10 +14,7 @@ pub fn kmain(_: TokenStream, item: TokenStream) -> TokenStream { core::slice::from_raw_parts(argv, argc) }; - #kernel::rt::spawn(async { - #ident(slice).await; - #kernel::shutdown(); - }); + #ident(slice); } } diff --git a/user/src/bin/webserver.rs b/user/src/bin/webserver.rs new file mode 100644 index 0000000..f5f9209 --- /dev/null +++ b/user/src/bin/webserver.rs @@ -0,0 +1,27 @@ +#![no_std] +#![no_main] + +extern crate user; + +use user::{io::File, prelude::*, print, println}; + +/// A simple webserver program; expects a standard filesystem effect interface +/// (open/close/read/write), where stdin is connected to the HTTP request and stdout is connected +/// to the HTTP response. +#[unsafe(no_mangle)] +pub extern "C" fn _rsstart() -> ! { + let mut stdin = File::STDIN; + let mut bytes: [u8; 1024] = [0; 1024]; + stdin.read(&mut bytes).unwrap(); + + let message = "hello, world"; + let len = message.len(); + + print!("HTTP/1.1 200 OK\r\n"); + print!("Content-Length: {len}\r\n"); + print!("Content-Type: text/html\r\n"); + print!("\r\n"); + + println!("{message}"); + io::exit(0) +} diff --git a/user/src/bin/webserver2.rs b/user/src/bin/webserver2.rs new file mode 100644 index 0000000..2336b23 --- /dev/null +++ b/user/src/bin/webserver2.rs @@ -0,0 +1,72 @@ +#![no_std] +#![no_main] + +extern crate user; + +use core::fmt::Write; + +use user::prelude::*; + +fn accept() { + Function::symbolic("accept").call_with_current_continuation(); +} + +#[derive(Debug)] +struct Error; + +fn recv(bytes: &mut [u8]) -> Result { + let result: Blob = Function::symbolic("recv") + .apply(bytes.len()) + .call_with_current_continuation() + .try_into() + .map_err(|_| Error)?; + Ok(result.read(0, bytes)) +} + +fn send(bytes: &[u8]) -> Result { + let result: Word = Function::symbolic("send") + .apply(bytes) + .call_with_current_continuation() + .try_into() + .map_err(|_| Error)?; + let result = result.read() as i64; + if result < 0 { + Err(Error) + } else { + Ok(result as usize) + } +} + +fn hangup() { + Function::symbolic("hangup").call_with_current_continuation(); +} + +struct Stream; +impl Write for Stream { + fn write_str(&mut self, s: &str) -> core::fmt::Result { + send(s.as_bytes()).map(|_| ()).map_err(|_| core::fmt::Error) + } +} + +/// A simple webserver program; expects a standard filesystem effect interface +/// (open/close/read/write), where stdin is connected to the HTTP request and stdout is connected +/// to the HTTP response. +#[unsafe(no_mangle)] +pub extern "C" fn _rsstart() -> ! { + loop { + accept(); + let mut bytes: [u8; 1024] = [0; 1024]; + recv(&mut bytes).unwrap(); + + let message = "hello, world"; + let len = message.len(); + + write!(Stream, "HTTP/1.1 200 OK\r\n").unwrap(); + write!(Stream, "Content-Length: {len}\r\n").unwrap(); + write!(Stream, "Content-Type: text/html\r\n").unwrap(); + write!(Stream, "\r\n").unwrap(); + + write!(Stream, "{message}").unwrap(); + hangup(); + } +} diff --git a/user/src/io.rs b/user/src/io.rs index d7f6ddd..762ec54 100644 --- a/user/src/io.rs +++ b/user/src/io.rs @@ -9,7 +9,7 @@ use crate::prelude::*; extern crate alloc; pub struct File { - pub content: Table, + pub fd: u32, } #[derive(Clone, Debug)] @@ -34,99 +34,94 @@ pub enum SeekFrom { } impl File { + pub fn open(file: &str) -> Result { + OpenOptions::default().read(true).open(file) + } + + pub const STDIN: File = File { fd: 0 }; + pub const STDOUT: File = File { fd: 1 }; + pub const STDERR: File = File { fd: 2 }; + pub fn options() -> OpenOptions { OpenOptions::default() } - //pub fn read(&mut self, bytes: &mut [u8]) -> Result { - // let result: Blob = Function::symbolic("read") - // .apply(self.fd) - // .apply(bytes.len()) - // .call_with_current_continuation() - // .try_into() - // .map_err(|_| Error)?; - // Ok(result.read(0, bytes)) - //} - - //pub fn write(&mut self, bytes: &[u8]) -> Result { - // let result: Word = Function::symbolic("write") - // .apply(self.fd) - // .apply(bytes) - // .call_with_current_continuation() - // .try_into() - // .map_err(|_| Error)?; - // let result = result.read() as i64; - // if result < 0 { - // Err(Error) - // } else { - // Ok(result as usize) - // } - //} - - //pub fn seek(&mut self, from: SeekFrom) -> Result { - // let (whence, offset) = match from { - // SeekFrom::Start(offset) => (arcane::SEEK_SET, offset as usize), - // SeekFrom::End(offset) => (arcane::SEEK_END, offset as usize), - // SeekFrom::Current(offset) => (arcane::SEEK_CUR, offset as usize), - // }; - // let result: Word = Function::symbolic("seek") - // .apply(self.fd) - // .apply(offset) - // .apply(whence) - // .call_with_current_continuation() - // .try_into() - // .map_err(|_| Error)?; - // let result = result.read() as i64; - // if result < 0 { - // Err(Error) - // } else { - // Ok(result as usize) - // } - //} + pub fn read(&mut self, bytes: &mut [u8]) -> Result { + let result: Blob = Function::symbolic("read") + .apply(self.fd) + .apply(bytes.len()) + .call_with_current_continuation() + .try_into() + .map_err(|_| Error)?; + Ok(result.read(0, bytes)) + } + + pub fn write(&mut self, bytes: &[u8]) -> Result { + let result: Word = Function::symbolic("write") + .apply(self.fd) + .apply(bytes) + .call_with_current_continuation() + .try_into() + .map_err(|_| Error)?; + let result = result.read() as i64; + if result < 0 { + Err(Error) + } else { + Ok(result as usize) + } + } + + pub fn seek(&mut self, from: SeekFrom) -> Result { + let (whence, offset) = match from { + SeekFrom::Start(offset) => (arcane::SEEK_SET, offset as usize), + SeekFrom::End(offset) => (arcane::SEEK_END, offset as usize), + SeekFrom::Current(offset) => (arcane::SEEK_CUR, offset as usize), + }; + let result: Word = Function::symbolic("seek") + .apply(self.fd) + .apply(offset) + .apply(whence) + .call_with_current_continuation() + .try_into() + .map_err(|_| Error)?; + let result = result.read() as i64; + if result < 0 { + Err(Error) + } else { + Ok(result as usize) + } + } +} + +impl Drop for File { + fn drop(&mut self) { + Function::symbolic("close") + .apply(self.fd) + .call_with_current_continuation(); + } +} + +impl Write for File { + fn write_str(&mut self, s: &str) -> core::fmt::Result { + self.write(s.as_bytes()).map_err(|_| core::fmt::Error)?; + Ok(()) + } } -//impl Clone for File { -// fn clone(&self) -> Self { -// let result: Word = Function::symbolic("dup") -// .apply(self.fd) -// .call_with_current_continuation() -// .try_into() -// .unwrap(); -// let fd = result.read() as i64; -// assert!(fd >= 0); -// let fd = fd as u32; -// Self { fd } -// } -//} -// -//impl Drop for File { -// fn drop(&mut self) { -// Function::symbolic("close") -// .apply(self.fd) -// .call_with_current_continuation(); -// } -//} - -//impl Write for File { -// fn write_str(&mut self, s: &str) -> core::fmt::Result { -// self.write(s.as_bytes()).map_err(|_| core::fmt::Error)?; -// Ok(()) -// } -//} -// impl Iterator for File { -// type Item = u8; -// -// fn next(&mut self) -> Option { -// let mut bytes = [0]; -// crate::error::log("reading"); -// if let Ok(1) = self.read(&mut bytes) { -// crate::error::log_int("read", bytes[0] as u64); -// Some(bytes[0]) -// } else { -// None -// } -// } -// } +impl Iterator for File { + type Item = u8; + + fn next(&mut self) -> Option { + let mut bytes = [0]; + crate::error::log("reading"); + if let Ok(1) = self.read(&mut bytes) { + crate::error::log_int("read", bytes[0] as u64); + Some(bytes[0]) + } else { + None + } + } +} impl OpenOptions { pub fn read(&mut self, read: bool) -> &mut Self { @@ -174,20 +169,19 @@ impl OpenOptions { flags |= arcane::O_TRUNC; } let mode = 0o655; - let result: Table = Function::symbolic("open") + let result: Word = Function::symbolic("open") .apply(path) .apply(flags) .apply(mode) .call_with_current_continuation() .try_into() .map_err(|_| Error)?; - // let result = result.read() as i64; - //if result < 0 { - // Err(Error) - //} else { - // Ok(File { fd: result as u32 }) - //} - Ok(File { content: result }) + let result = result.read() as i64; + if result < 0 { + Err(Error) + } else { + Ok(File { fd: result as u32 }) + } } } @@ -207,133 +201,111 @@ pub fn fork() -> Result> { Ok(NonZeroUsize::new(result)) } -pub struct Monitor { - fd: u64, -} +#[cfg(feature = "allocator")] +mod buf { + extern crate alloc; + use core::ops::{Deref, DerefMut}; -impl Monitor { - pub fn new() -> Monitor { - let result: Word = Function::symbolic("monitor-new") - .call_with_current_continuation() - .try_into() - .unwrap(); - Monitor { fd: result.read() } + use super::*; + use alloc::vec::Vec; + + pub struct Buffered { + file: File, + pending: Vec, } - pub fn enter(&self, f: impl FnOnce(&mut MonitorContext) -> Value) -> Value { - if let Ok(k) = os::continuation() { - Function::symbolic("monitor-enter") - .apply(self.fd) - .apply(k) - .call_with_current_continuation() - } else { - let mut ctx = MonitorContext(PhantomData); - let value = f(&mut ctx); - Function::symbolic("exit") - .apply(value) - .call_with_current_continuation() + impl Buffered { + pub fn new(file: File) -> Self { + Self { + file, + pending: Vec::new(), + } } - } - pub fn set(&self, value: impl Into) -> Value { - self.enter(|ctx| ctx.set(value.into())) - } + pub fn read(&mut self, buf: &mut [u8]) -> Result { + let n = core::cmp::min(buf.len(), self.pending.len()); + buf[..n].copy_from_slice(&self.pending[..n]); + self.pending = self.pending[n..].to_vec(); + self.file.read(&mut buf[n..]) + } - pub fn get(&self) -> Value { - self.enter(|ctx| ctx.get()) + pub fn read_until(&mut self, end: u8) -> Result> { + let mut buffer = [0; 1024]; + loop { + if let Some(i) = self.pending.iter().position(|x| *x == end) { + let head = self.pending[..i + 1].to_vec(); + let rest = self.pending[i + 1..].to_vec(); + self.pending = rest; + return Ok(head); + } + let n = self.file.read(&mut buffer)?; + let slice = &buffer[..n]; + self.pending.extend_from_slice(slice); + } + } } -} -pub struct MonitorContext(PhantomData<()>); + impl Deref for Buffered { + type Target = File; -impl MonitorContext { - pub fn get(&self) -> Value { - Function::symbolic("get").call_with_current_continuation() + fn deref(&self) -> &Self::Target { + &self.file + } } - pub fn set(&mut self, value: impl Into) -> Value { - Function::symbolic("set") - .apply(value) - .call_with_current_continuation() + impl DerefMut for Buffered { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.file + } } +} - pub fn wait(&self) { - Function::symbolic("wait").call_with_current_continuation(); - } +pub use buf::Buffered; + +#[macro_export] +macro_rules! print { + ($($arg:tt)*) => {{ + use ::core::fmt::Write; + ::core::write!(File::STDOUT, $($arg)*).unwrap(); + }}; } -impl Default for Monitor { - fn default() -> Self { - Self::new() - } +#[macro_export] +macro_rules! println { + () => { + { + use ::core::fmt::Write; + ::core::writeln!(File::STDOUT).unwrap() + } + }; + ($($arg:tt)*) => { + { + use ::core::fmt::Write; + ::core::writeln!(File::STDOUT, $($arg)*).unwrap() + } + }; } -impl Drop for Monitor { - fn drop(&mut self) { - Function::symbolic("close") - .apply(self.fd) - .call_with_current_continuation(); - } +#[macro_export] +macro_rules! eprint { + ($($arg:tt)*) => {{ + use ::core::fmt::Write; + ::core::write!(File::STDERR).unwrap(); + }}; } -//#[cfg(feature = "allocator")] -//mod buf { -// extern crate alloc; -// use core::ops::{Deref, DerefMut}; -// -// use super::*; -// use alloc::vec::Vec; -// -// #[derive(Clone)] -// pub struct Buffered { -// file: File, -// pending: Vec, -// } -// -// impl Buffered { -// pub fn new(file: File) -> Self { -// Self { -// file, -// pending: Vec::new(), -// } -// } -// -// pub fn read(&mut self, buf: &mut [u8]) -> Result { -// let n = core::cmp::min(buf.len(), self.pending.len()); -// buf[..n].copy_from_slice(&self.pending[..n]); -// self.pending = self.pending[n..].to_vec(); -// self.file.read(&mut buf[n..]) -// } -// -// pub fn read_until(&mut self, end: u8) -> Result> { -// let mut buffer = [0; 1024]; -// loop { -// if let Some(i) = self.pending.iter().position(|x| *x == end) { -// let head = self.pending[..i + 1].to_vec(); -// let rest = self.pending[i + 1..].to_vec(); -// self.pending = rest; -// return Ok(head); -// } -// let n = self.file.read(&mut buffer)?; -// let slice = &buffer[..n]; -// self.pending.extend_from_slice(slice); -// } -// } -// } -// -// impl Deref for Buffered { -// type Target = File; -// -// fn deref(&self) -> &Self::Target { -// &self.file -// } -// } -// -// impl DerefMut for Buffered { -// fn deref_mut(&mut self) -> &mut Self::Target { -// &mut self.file -// } -// } -//} -// -//pub use buf::Buffered; +#[macro_export] +macro_rules! eprintln { + () => { + { + use ::core::fmt::Write; + ::core::writeln!(File::STDERR).unwrap() + } + }; + ($($arg:tt)*) => { + { + use ::core::fmt::Write; + ::core::writeln!(File::STDERR, $($arg)*).unwrap() + } + }; +} diff --git a/vmm/Cargo.toml b/vmm/Cargo.toml index edc2bf2..d082ece 100644 --- a/vmm/Cargo.toml +++ b/vmm/Cargo.toml @@ -26,7 +26,7 @@ nix = { version = "0.30.1", features = ["mman", "socket"] } vsock = "0.5.1" thread-priority = "3.0.0" elevate = "0.6.1" -clap = { version = "4.5.48", features = ["derive"] } +clap = { version = "4.5.48", features = ["derive", "env"] } rlimit = "0.10.2" core_affinity = "0.8.3" diff --git a/vmm/build.rs b/vmm/build.rs deleted file mode 100644 index 10be670..0000000 --- a/vmm/build.rs +++ /dev/null @@ -1,18 +0,0 @@ -use std::{env, path::PathBuf}; - -use anyhow::Result; -fn main() -> Result<()> { - let bindings = bindgen::Builder::default() - .header("src/vhost_bindings.h") - .default_enum_style(bindgen::EnumVariation::ModuleConsts) - .parse_callbacks(Box::new(bindgen::CargoCallbacks::new())) - .clang_macro_fallback() - .generate() - .expect("Unable to generate bindings"); - - let out_path = PathBuf::from(env::var("OUT_DIR").unwrap()); - bindings - .write_to_file(out_path.join("vhost_bindings.rs")) - .expect("Couldn't write bindings!"); - Ok(()) -} diff --git a/vmm/src/lib.rs b/vmm/src/lib.rs index 040802e..858e25c 100644 --- a/vmm/src/lib.rs +++ b/vmm/src/lib.rs @@ -6,4 +6,3 @@ #![feature(cstr_display)] pub mod runtime; -pub mod vhost; diff --git a/vmm/src/main.rs b/vmm/src/main.rs index 245b201..42c7d06 100644 --- a/vmm/src/main.rs +++ b/vmm/src/main.rs @@ -6,10 +6,8 @@ use vmm::runtime::Runtime; #[derive(Parser, Debug)] struct Args { kernel: PathBuf, - #[arg(short, long)] + #[arg(short, long, env = "ARCA_SMP")] smp: Option, - #[arg(short, long, default_value = "3")] - cid: usize, } fn main() -> anyhow::Result<()> { @@ -18,12 +16,11 @@ fn main() -> anyhow::Result<()> { let args = Args::parse(); let smp = args .smp - // .or_else(|| std::thread::available_parallelism().ok().map(|x| x.get())) + .or_else(|| std::thread::available_parallelism().ok().map(|x| x.get())) .unwrap_or(1); - let cid = args.cid; let bin = std::fs::read(args.kernel)?; - let mut rt = Runtime::new(cid, smp, 1 << 34, bin.into()); + let mut rt = Runtime::new(smp, 1 << 34, bin.into()); rt.run(&[]); Ok(()) diff --git a/vmm/src/runtime.rs b/vmm/src/runtime.rs index 3a5b0b0..c066e22 100644 --- a/vmm/src/runtime.rs +++ b/vmm/src/runtime.rs @@ -1,7 +1,10 @@ use std::{ collections::HashMap, - io::{self, Read}, + fs::{File, OpenOptions}, + io::{self, Read, Seek, SeekFrom, Write}, + net::SocketAddr, process::ExitCode, + slice, sync::{ atomic::{AtomicBool, Ordering}, Arc, @@ -10,17 +13,19 @@ use std::{ time::{Duration, Instant}, }; -use common::{hypercall, BuddyAllocator}; +use common::{ + hypercall::{self, FileInfo, TcpInfo}, + BuddyAllocator, +}; use elf::{endian::AnyEndian, segment::ProgramHeader, ElfBytes}; use kvm_bindings::{kvm_userspace_memory_region, CpuId, KVM_MAX_CPUID_ENTRIES}; use kvm_ioctls::{IoEventAddress, Kvm, NoDatamatch, VcpuExit, VcpuFd, VmFd}; pub use common::mmap::Mmap; use libc::EFD_NONBLOCK; +use std::net::{TcpListener, TcpStream}; use vmm_sys_util::eventfd::EventFd; -use crate::vhost::VSockBackend; - const MEM_BASE: u64 = 0x1_0000_0000; fn new_cpu<'scope>( @@ -286,6 +291,143 @@ fn run_cpu(mut vcpu_fd: VcpuFd, elf: &ElfBytes, exit: Arc regs.rax = mem.as_ptr() as u64; } } + hypercall::TCP_LISTEN => { + let info: &TcpInfo = + unsafe { &*BuddyAllocator.from_offset(args[0] as usize) }; + let ip = u32::to_ne_bytes(info.ip); + let port = info.port; + let listener = + Box::new(TcpListener::bind(SocketAddr::from((ip, port))).unwrap()); + let listener_ptr = Box::into_raw(listener); + info.id.store(listener_ptr as u64, Ordering::SeqCst); + info.done.store(true, Ordering::SeqCst); + regs.rax = listener_ptr as u64; + } + hypercall::TCP_ACCEPT => { + let info: &TcpInfo = + unsafe { &*BuddyAllocator.from_offset(args[0] as usize) }; + let listener_ptr = + info.id.load(Ordering::SeqCst) as usize as *mut TcpListener; + let listener = unsafe { &*listener_ptr }; + let (stream, _) = listener.accept().unwrap(); + info.id + .store(Box::into_raw(Box::new(stream)) as u64, Ordering::SeqCst); + info.done.store(true, Ordering::SeqCst); + } + hypercall::TCP_SEND => { + let info: &TcpInfo = + unsafe { &*BuddyAllocator.from_offset(args[0] as usize) }; + let stream_ptr = + info.id.load(Ordering::SeqCst) as usize as *mut TcpStream; + let stream = unsafe { &mut *stream_ptr }; + let (ptr, len) = ( + BuddyAllocator.from_offset(info.buf), + info.len.load(Ordering::SeqCst), + ); + let slice = unsafe { slice::from_raw_parts(ptr, len) }; + let len = stream.write(slice).unwrap_or(0); + // assert_eq!(len, slice.len()); + info.len.store(len, Ordering::SeqCst); + info.done.store(true, Ordering::SeqCst); + } + hypercall::TCP_RECV => { + let info: &TcpInfo = + unsafe { &*BuddyAllocator.from_offset(args[0] as usize) }; + let stream_ptr = + info.id.load(Ordering::SeqCst) as usize as *mut TcpStream; + let stream = unsafe { &mut *stream_ptr }; + let (ptr, len) = ( + BuddyAllocator.from_offset(info.buf), + info.len.load(Ordering::SeqCst), + ); + let slice = unsafe { slice::from_raw_parts_mut(ptr, len) }; + let len = stream.read(slice).unwrap(); + info.len.store(len, Ordering::SeqCst); + info.done.store(true, Ordering::SeqCst); + } + hypercall::TCP_CLOSE => { + let info: &TcpInfo = + unsafe { &*BuddyAllocator.from_offset(args[0] as usize) }; + let stream_ptr = + info.id.load(Ordering::SeqCst) as usize as *mut TcpStream; + let mut stream = *unsafe { Box::from_raw(stream_ptr) }; + stream.flush().unwrap(); + let _ = stream; + info.done.store(true, Ordering::SeqCst); + } + hypercall::FILE_OPEN => { + let info: &FileInfo = + unsafe { &*BuddyAllocator.from_offset(args[0] as usize) }; + let (ptr, len) = ( + BuddyAllocator.from_offset(info.buf), + info.len.load(Ordering::SeqCst), + ); + let slice = unsafe { slice::from_raw_parts(ptr, len) }; + let s = str::from_utf8(slice).unwrap(); + let f = OpenOptions::new() + .read(info.read) + .write(info.write) + .create(info.create) + .append(info.append) + .truncate(info.truncate) + .open(s) + .ok() + .map(Box::new) + .map(Box::into_raw); + info.id + .store(f.unwrap_or(core::ptr::null_mut()) as u64, Ordering::SeqCst); + info.done.store(true, Ordering::SeqCst); + } + hypercall::FILE_READ => { + let info: &FileInfo = + unsafe { &*BuddyAllocator.from_offset(args[0] as usize) }; + let file_ptr = info.id.load(Ordering::SeqCst) as *mut File; + let file = unsafe { &mut *file_ptr }; + let (ptr, len) = ( + BuddyAllocator.from_offset(info.buf), + info.len.load(Ordering::SeqCst), + ); + let slice = unsafe { slice::from_raw_parts_mut(ptr, len) }; + let size = file.read(slice).unwrap(); + info.len.store(size, Ordering::SeqCst); + info.done.store(true, Ordering::SeqCst); + } + hypercall::FILE_WRITE => { + let info: &FileInfo = + unsafe { &*BuddyAllocator.from_offset(args[0] as usize) }; + let file_ptr = info.id.load(Ordering::SeqCst) as *mut File; + let file = unsafe { &mut *file_ptr }; + let (ptr, len) = ( + BuddyAllocator.from_offset(info.buf), + info.len.load(Ordering::SeqCst), + ); + let slice = unsafe { slice::from_raw_parts(ptr, len) }; + let size = file.write(slice).unwrap(); + info.len.store(size, Ordering::SeqCst); + info.done.store(true, Ordering::SeqCst); + } + hypercall::FILE_SEEK => { + let info: &FileInfo = + unsafe { &*BuddyAllocator.from_offset(args[0] as usize) }; + let file_ptr = info.id.load(Ordering::SeqCst) as *mut File; + let file = unsafe { &mut *file_ptr }; + let seek_from = match info.whence { + 0 => SeekFrom::Start(info.offset as u64), + 1 => SeekFrom::Current(info.offset as i64), + -1 => SeekFrom::End(info.offset as i64), + _ => panic!("invalid whence: {}", info.whence), + }; + let pos = file.seek(seek_from).unwrap(); + info.len.store(pos as usize, Ordering::SeqCst); + info.done.store(true, Ordering::SeqCst); + } + hypercall::FILE_CLOSE => { + let info: &FileInfo = + unsafe { &*BuddyAllocator.from_offset(args[0] as usize) }; + let file_ptr = info.id.load(Ordering::SeqCst) as *mut File; + let file = *unsafe { Box::from_raw(file_ptr) }; + let _ = file; + } x => unimplemented!("hypercall {x}"), }; vcpu_fd.set_regs(®s).unwrap(); @@ -328,13 +470,12 @@ fn run_cpu(mut vcpu_fd: VcpuFd, elf: &ElfBytes, exit: Arc pub struct Runtime { kvm: Kvm, vm: VmFd, - vsock: VSockBackend, cores: usize, elf: Arc<[u8]>, } impl Runtime { - pub fn new(cid: usize, cores: usize, ram: usize, elf: Arc<[u8]>) -> Self { + pub fn new(cores: usize, ram: usize, elf: Arc<[u8]>) -> Self { let kvm = Kvm::new().unwrap(); let vm = kvm.create_vm().unwrap(); vm.create_irq_chip().unwrap(); @@ -376,12 +517,9 @@ impl Runtime { }) .unwrap(); - let vsock = VSockBackend::new(cid as u64, 1024, kick, call).unwrap(); - let mut x = Self { kvm, vm, - vsock, cores, elf: elf.clone(), }; @@ -449,7 +587,6 @@ impl Runtime { } pub fn run(&mut self, args: &[usize]) { - self.vsock.set_running(true).unwrap(); let elf = ElfBytes::::minimal_parse(&self.elf) .expect("could not read kernel elf file"); @@ -457,11 +594,7 @@ impl Runtime { let allocator_raw = Box::into_raw_with_allocator(Box::new_in(allocator_raw, BuddyAllocator)).0; - // let args = args.to_vec_in(BuddyAllocator); - let mut inner_args = Vec::new_in(BuddyAllocator); - let vsock_meta = Box::new_in(self.vsock.metadata(), BuddyAllocator); - inner_args.push(BuddyAllocator.to_offset(Box::into_raw_with_allocator(vsock_meta).0)); - inner_args.extend_from_slice(args); + let args = args.to_vec_in(BuddyAllocator); std::thread::scope(|s| { let mut cpus = vec![]; @@ -482,8 +615,11 @@ impl Runtime { vcpu_fd.set_msrs(&msrs).unwrap(); let allocator_raw_offset = BuddyAllocator.to_offset(allocator_raw); - assert!(!inner_args.is_empty()); - let inner_args_offset = BuddyAllocator.to_offset(inner_args.as_ptr()); + let args_offset = if args.is_empty() { + 0 + } else { + BuddyAllocator.to_offset(args.as_ptr()) + }; cpus.push(new_cpu( i, s, @@ -492,8 +628,8 @@ impl Runtime { &[ self.cores as u64, allocator_raw_offset as u64, - inner_args.len() as u64, - inner_args_offset as u64, + args.len() as u64, + args_offset as u64, 0, 0, ], @@ -505,8 +641,4 @@ impl Runtime { } }); } - - pub fn cid(&self) -> u64 { - self.vsock.cid() - } } diff --git a/vmm/src/vhost.rs b/vmm/src/vhost.rs deleted file mode 100644 index b04b315..0000000 --- a/vmm/src/vhost.rs +++ /dev/null @@ -1,276 +0,0 @@ -use std::{ - alloc::{Allocator, Global, Layout}, - ffi::CStr, - fs::File, - os::fd::{AsFd, AsRawFd, BorrowedFd, FromRawFd, IntoRawFd, OwnedFd}, - ptr::NonNull, -}; - -use common::{ - vhost::{VSockMetadata, VirtQueueMetadata}, - BuddyAllocator, -}; - -use anyhow::{anyhow, Result}; -use vmm_sys_util::eventfd::EventFd; - -mod bindings { - #![allow(non_upper_case_globals)] - #![allow(non_camel_case_types)] - #![allow(unused)] - - include!(concat!(env!("OUT_DIR"), "/vhost_bindings.rs")); -} - -macro_rules! check_syscall { - ($e: expr) => {{ - let result = $e; - if result == -1 { - let errno = libc::__errno_location().read(); - let error = CStr::from_ptr(libc::strerror(errno)); - log::error!("system call returned {errno}"); - Err(anyhow::anyhow!( - "system call {} failed: {}", - stringify!($e), - error.display() - )) - } else { - Ok(()) - } - }}; -} - -pub struct VSockBackend { - fd: OwnedFd, - cid: u64, - rx: VirtQueue, - tx: VirtQueue, - _kick: EventFd, - _call: EventFd, -} - -impl VSockBackend { - pub fn new(cid: u64, descriptors: usize, kick: EventFd, call: EventFd) -> Result { - let file = File::options() - .read(true) - .write(true) - .open("/dev/vhost-vsock") - .expect("could not open /dev/vhost-vsock"); - let owned = unsafe { - let raw = file.into_raw_fd(); - OwnedFd::from_raw_fd(raw) - }; - - unsafe { - check_syscall!(libc::ioctl( - owned.as_raw_fd(), - bindings::VHOST_SET_OWNER as u64, - ))?; - - check_syscall!(libc::ioctl( - owned.as_raw_fd(), - bindings::VHOST_VSOCK_SET_GUEST_CID as u64, - &cid, - ))?; - - let mut features: u64 = 0; - check_syscall!(libc::ioctl( - owned.as_raw_fd(), - bindings::VHOST_GET_FEATURES as u64, - &mut features - ))?; - - if features & (1 << 1) == 0 { - return Err(anyhow!("This host does not support seqpacket vsock.")); - } - if features & (1 << 2) != 0 { - return Err(anyhow!("This host does not support stream vsock.")); - } - - features = (1 << 1) // supports seqpacket - | (1 << 28) // supports indirect descriptors - | (1 << 32); - - check_syscall!(libc::ioctl( - owned.as_raw_fd(), - bindings::VHOST_SET_FEATURES as u64, - &features - ))?; - - let layout = Layout::new::() - .extend(Layout::new::()) - .unwrap() - .0; - let ptr = Global.allocate_zeroed(layout).unwrap(); - - let ptr: *mut bindings::vhost_memory = - std::mem::transmute(&raw mut (*NonNull::as_ptr(ptr))[0]); - let mut memory = Box::from_raw(ptr); - memory.nregions = 1; - let regions = memory.regions.as_mut_slice(1); - regions[0] = bindings::vhost_memory_region { - guest_phys_addr: 0, - memory_size: BuddyAllocator.len() as u64, - userspace_addr: BuddyAllocator.base() as u64, - flags_padding: 0, - }; - - check_syscall!(libc::ioctl( - owned.as_raw_fd(), - bindings::VHOST_SET_MEM_TABLE as u64, - memory - ))?; - } - - let mut rx = VirtQueue::new(descriptors); - let mut tx = VirtQueue::new(descriptors); - - rx.attach(owned.as_fd(), VirtQueueType::Receive, &kick, &call)?; - tx.attach(owned.as_fd(), VirtQueueType::Transmit, &kick, &call)?; - - Ok(Self { - fd: owned, - cid, - rx, - tx, - _kick: kick, - _call: call, - }) - } - - pub fn cid(&self) -> u64 { - self.cid - } - - pub fn set_running(&mut self, running: bool) -> Result<()> { - unsafe { - let running: std::ffi::c_int = if running { 1 } else { 0 }; - check_syscall!(libc::ioctl( - self.fd.as_raw_fd(), - bindings::VHOST_VSOCK_SET_RUNNING as u64, - &running - ))?; - } - Ok(()) - } - - pub fn metadata(&self) -> VSockMetadata { - VSockMetadata { - rx: self.rx.metadata(), - tx: self.tx.metadata(), - } - } -} - -impl Drop for VSockBackend { - fn drop(&mut self) { - self.set_running(false).unwrap(); - } -} - -#[repr(u32)] -#[derive(Copy, Clone, Debug, Eq, PartialEq)] -enum VirtQueueType { - Receive = 0, - Transmit = 1, -} - -#[derive(Clone, Debug)] -struct VirtQueue { - pub descriptors: usize, - pub desc: Box<[u8], BuddyAllocator>, - pub used: Box<[u8], BuddyAllocator>, - pub avail: Box<[u8], BuddyAllocator>, -} - -impl VirtQueue { - pub fn new(descriptors: usize) -> Self { - let new_buffer = |size: usize| { - let slice = Box::new_zeroed_slice_in(2 * size, BuddyAllocator); - unsafe { slice.assume_init() } - }; - let desc = new_buffer(descriptors * 16); - let used = new_buffer(descriptors * 8 + 6); - let avail = new_buffer(descriptors * 2 + 6); - - VirtQueue { - descriptors, - desc, - used, - avail, - } - } - - pub fn metadata(&self) -> VirtQueueMetadata { - let desc = (&raw const *self.desc).to_raw_parts(); - let used = (&raw const *self.used).to_raw_parts(); - let avail = (&raw const *self.avail).to_raw_parts(); - - let desc = BuddyAllocator.to_offset(desc.0); - let used = BuddyAllocator.to_offset(used.0); - let avail = BuddyAllocator.to_offset(avail.0); - - VirtQueueMetadata { - descriptors: self.descriptors, - desc, - used, - avail, - } - } - - pub fn attach( - &mut self, - fd: BorrowedFd, - qtype: VirtQueueType, - kick: &EventFd, - call: &EventFd, - ) -> Result<()> { - unsafe { - let vring_addr = bindings::vhost_vring_addr { - index: qtype as u32, - flags: 0, - desc_user_addr: self.desc.as_ptr() as u64, - used_user_addr: self.used.as_ptr() as u64, - avail_user_addr: self.avail.as_ptr() as u64, - log_guest_addr: 0, // logging is disabled - }; - - check_syscall!(libc::ioctl( - fd.as_raw_fd(), - bindings::VHOST_SET_VRING_ADDR as u64, - &vring_addr - ))?; - - let vring_num = bindings::vhost_vring_state { - index: qtype as u32, - num: self.descriptors as u32, - }; - check_syscall!(libc::ioctl( - fd.as_raw_fd(), - bindings::VHOST_SET_VRING_NUM as u64, - &vring_num - ))?; - - let vring_file = bindings::vhost_vring_file { - index: qtype as u32, - fd: kick.as_raw_fd(), - }; - check_syscall!(libc::ioctl( - fd.as_raw_fd(), - bindings::VHOST_SET_VRING_KICK as u64, - &vring_file - ))?; - - let vring_file = bindings::vhost_vring_file { - index: qtype as u32, - fd: call.as_raw_fd(), - }; - check_syscall!(libc::ioctl( - fd.as_raw_fd(), - bindings::VHOST_SET_VRING_CALL as u64, - &vring_file - ))?; - } - Ok(()) - } -} diff --git a/vmm/src/vhost_bindings.h b/vmm/src/vhost_bindings.h deleted file mode 100644 index 60854af..0000000 --- a/vmm/src/vhost_bindings.h +++ /dev/null @@ -1,3 +0,0 @@ -#include -#include -#include