Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
306 changes: 205 additions & 101 deletions README.es.md

Large diffs are not rendered by default.

313 changes: 211 additions & 102 deletions README.fr.md

Large diffs are not rendered by default.

194 changes: 141 additions & 53 deletions README.ja.md

Large diffs are not rendered by default.

138 changes: 113 additions & 25 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ Language: **English** | [简体中文](./README.zh-CN.md) | [Español](./README.

## Overview

`uring` handles the kernel-facing boundary for Linux `io_uring`. It creates and starts rings, prepares SQEs, decodes
CQEs, carries submission identity through `user_data`, and provides buffer registration, multishot operations, and
listener-setup primitives.
`uring` is the kernel-facing boundary for Linux `io_uring`. It creates and starts rings, prepares SQEs, decodes CQEs,
carries submission identity through `user_data`, and exposes buffer registration, multishot operations, and
listener-setup primitives without turning them into a scheduler.

`uring` draws a clear boundary: kernel-facing mechanics and observable completion facts live at the API edge; policy and
composition stay above it. Caller-side runtime code owns completion correlation, retry/backoff, handler and session
The package keeps the boundary explicit: kernel mechanics and observable completion facts live here; policy and
composition live above it. Caller-side runtime code owns completion correlation, retry/backoff, handler and session
routing, connection lifecycle, and terminal resource release.

The primary surfaces are:
Expand All @@ -34,6 +34,9 @@ The primary surfaces are:
uname -r
```

`uring` assumes the 6.18+ baseline and carries no fallback branches for older kernels. Boot a supported kernel instead
of expecting compatibility shims inside this package.

Debian 13's stable kernel track may still be below 6.18. See [Debian 13 kernel upgrade](#debian-13-kernel-upgrade) for
the backports path to a kernel that meets the requirement.

Expand All @@ -54,8 +57,8 @@ Container runtimes block `io_uring` syscalls by default. See [SETUP.md](./SETUP.
## Ring lifecycle

`New` returns an unstarted ring and eagerly constructs the context pools. Call `Start` before submitting operations; it
registers ring resources and enables the ring. `uring` assumes the 6.18+ baseline and carries no fallback branches for
older kernels.
registers ring resources and enables the ring. The example below submits a read, waits for the matching CQE, and uses
`iox.Classify` so `ErrWouldBlock` stays a semantic no-progress result rather than a failure.

```go
ring, err := uring.New(func(o *uring.Options) {
Expand All @@ -68,26 +71,51 @@ if err != nil {
if err := ring.Start(); err != nil {
return err
}
defer ring.Stop()

cqes := make([]uring.CQEView, 64)
n, err := ring.Wait(cqes)
if err != nil && !errors.Is(err, iox.ErrWouldBlock) {
fd := iofd.NewFD(int(file.Fd()))
buf := make([]byte, 4096)
ctx := uring.PackDirect(uring.IORING_OP_READ, 0, 0, 0).WithFD(fd)
if err := ring.Read(ctx, buf); err != nil {
return err
}

for i := range n {
cqe := cqes[i]
if cqe.Res < 0 {
return fmt.Errorf("completion failed: op=%d fd=%d res=%d", cqe.Op(), cqe.FD(), cqe.Res)
cqes := make([]uring.CQEView, 64)
var backoff iox.Backoff

for {
n, err := ring.Wait(cqes)
switch iox.Classify(err) {
case iox.OutcomeWouldBlock:
backoff.Wait()
continue
case iox.OutcomeFailure:
return err
}
if n == 0 {
backoff.Wait()
continue
}

backoff.Reset()
for i := range n {
cqe := cqes[i]
if cqe.Op() != uring.IORING_OP_READ || cqe.FD() != fd {
Comment thread
hayabusa-cloud marked this conversation as resolved.
continue
}
Comment thread
hayabusa-cloud marked this conversation as resolved.
if cqe.Res < 0 {
return fmt.Errorf("uring read failed: res=%d", cqe.Res)
}
handle(buf[:int(cqe.Res)])
return nil
}
fmt.Printf("completed op=%d on fd=%d with res=%d\n", cqe.Op(), cqe.FD(), cqe.Res)
}
```

`Wait` flushes pending submissions, then reaps completions. On single-issuer rings it also issues the kernel enter that
keeps deferred task work moving once the SQ drains; the caller must serialize `Wait`/`enter` with submit-state
operations. `iox.ErrWouldBlock` signals that no completion is currently observable at the boundary. The error is defined
in `code.hybscloud.com/iox`.
operations. If `iox.Classify(err)` yields `iox.OutcomeWouldBlock`, no completion is currently observable at the
boundary.

`Start` and `Stop` form the ring lifecycle pair. `Stop` is idempotent and renders the ring permanently unusable; call it
only after you have drained all in-flight operations, reaped outstanding CQEs, and quiesced live multishot
Expand Down Expand Up @@ -183,9 +211,15 @@ recover the original submission token.
cqes := make([]uring.CQEView, 64)

n, err := ring.Wait(cqes)
if err != nil && !errors.Is(err, iox.ErrWouldBlock) {
switch iox.Classify(err) {
case iox.OutcomeWouldBlock:
return iox.ErrWouldBlock
case iox.OutcomeFailure:
return err
}
if n == 0 {
return iox.ErrWouldBlock
}

for i := 0; i < n; i++ {
cqe := cqes[i]
Expand Down Expand Up @@ -213,10 +247,13 @@ borrowed buffers must not outlive their documented lifetimes.

## Buffer provisioning

Two receive-buffer strategies are supported:
`uring` has three practical buffer paths. Registered buffers are pinned during ring setup and used by fixed-buffer file
I/O. Provided buffer rings let the kernel choose a receive buffer and report the selected buffer ID in the CQE. Bundle
receives consume a contiguous logical range of provided buffers and expose that range through `BundleIterator`.

- fixed-size provided buffers through `ReadBufferSize` and `ReadBufferNum`
- multi-size buffer groups through `MultiSizeBuffer`
- registered fixed buffers through `LockedBufferMem`, `RegisteredBuffer`, `ReadFixed`, and `WriteFixed`

For most systems the configuration helpers are the easiest entry point:

Expand All @@ -228,7 +265,58 @@ ring, err := uring.New(func(o *uring.Options) {
```

Use `OptionsForBudget` to start from an explicit memory budget, or `BufferConfigForBudget` to inspect the tier layout
chosen for a given budget.
chosen for a given budget:

```go
cfg, scale := uring.BufferConfigForBudget(256 * uring.MiB)
fmt.Printf("buffer tiers=%+v scale=%d\n", cfg, scale)
```

Fixed-buffer I/O uses a registered buffer by index. The returned slice is ring-owned memory; keep it live until the
fixed operation completes:

```go
buf := ring.RegisteredBuffer(0)
copy(buf, payload)

fd := iofd.NewFD(int(file.Fd()))
ctx := uring.PackDirect(uring.IORING_OP_WRITE_FIXED, 0, 0, 0).WithFD(fd)
if err := ring.WriteFixed(ctx, 0, len(payload)); err != nil {
return err
}
```

For socket receive with kernel buffer selection, pass `nil` as the receive buffer and request the size class you want.
The completion reports which buffer was selected:

```go
recvCtx := uring.PackDirect(uring.IORING_OP_RECV, 0, 0, 0)

if err := ring.Receive(recvCtx, &socketFD, nil, uring.WithReadBufferSize(uring.BufferSizeSmall)); err != nil {
return err
}

// Later, after Wait returns the matching CQE:
if cqe.HasBuffer() {
fmt.Printf("kernel selected group=%d id=%d\n", cqe.BufGroup(), cqe.BufID())
}
```

Bundle receives use the same provided-buffer storage but may consume more than one buffer in a single CQE. Process the
iterator, then recycle the consumed slots:

```go
if err := ring.ReceiveBundle(recvCtx, &socketFD, uring.WithReadBufferSize(uring.BufferSizeSmall)); err != nil {
return err
}

if it, ok := ring.BundleIterator(cqe, cqe.BufGroup()); ok {
for buf := range it.All() {
handle(buf)
}
it.Recycle(ring)
}
```

Registered buffers require pinned memory. If large buffer registration fails, increase `RLIMIT_MEMLOCK` or use a smaller memory budget.

Expand Down Expand Up @@ -289,11 +377,11 @@ func runLoop(ring *uring.Uring, stop <-chan struct{}) error {
}

n, err := ring.Wait(cqes)
if errors.Is(err, iox.ErrWouldBlock) {
switch iox.Classify(err) {
case iox.OutcomeWouldBlock:
backoff.Wait()
continue
}
if err != nil {
case iox.OutcomeFailure:
return err
}
if n == 0 {
Expand All @@ -311,8 +399,8 @@ func runLoop(ring *uring.Uring, stop <-chan struct{}) error {

All ring methods, including `Send`, `Receive`, `AcceptMultishot`, and `Wait`, run on this goroutine. Work from other
goroutines enters the loop through a channel or a lock-free queue, not by calling ring methods directly. `iox.Backoff`
stays caller-owned: call `backoff.Wait()` on `iox.ErrWouldBlock` or when `Wait` returns no CQEs, and `backoff.Reset()`
after any batch with `n > 0`.
stays caller-owned: call `backoff.Wait()` on `iox.OutcomeWouldBlock` or when `Wait` returns no CQEs, and
`backoff.Reset()` after any batch with `n > 0`.

### Multishot subscription lifecycle

Expand Down
Loading