-
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathinterface_linux.go
More file actions
1789 lines (1597 loc) · 68.4 KB
/
interface_linux.go
File metadata and controls
1789 lines (1597 loc) · 68.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// ©Hayabusa Cloud Co., Ltd. 2026. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
//go:build linux
package uring
// This file is part of the `uring` package refactored from `code.hybscloud.com/sox`.
import (
"encoding/binary"
"errors"
"fmt"
"sync/atomic"
"time"
"unsafe"
"code.hybscloud.com/iofd"
"code.hybscloud.com/zcall"
)
// Uring entry count constants define submission queue sizes.
// The values scale by powers of four: 8, 32, 128, 512, 2048, 8192, and 32768.
const (
EntriesPico = 1 << 3 // 8 entries
EntriesNano = 1 << 5 // 32 entries
EntriesMicro = 1 << 7 // 128 entries
EntriesSmall = 1 << 9 // 512 entries
EntriesMedium = 1 << 11 // 2048 entries
EntriesLarge = 1 << 13 // 8192 entries
EntriesHuge = 1 << 15 // 32768 entries
)
// Options configures the io_uring instance behavior.
// All fields have sensible defaults if not specified.
type Options struct {
// Entries specifies the number of SQE slots (use Entries* constants).
Entries int
// LockedBufferMem is the total memory for registered buffers (bytes).
LockedBufferMem int
// ReadBufferSize is the size of each read buffer (bytes).
ReadBufferSize int
// ReadBufferNum is the number of read buffers to allocate.
ReadBufferNum int
// ReadBufferGidOffset is the base group ID for read buffers.
ReadBufferGidOffset int
// WriteBufferSize is the size of each write buffer (bytes).
WriteBufferSize int
// WriteBufferNum is the number of write buffers to allocate.
WriteBufferNum int
// MultiSizeBuffer enables multiple buffer size groups when > 0.
MultiSizeBuffer int
// MultiIssuers enables the shared-submit configuration for rings that accept
// submissions from multiple goroutines. When false, New requests
// SINGLE_ISSUER + DEFER_TASKRUN and callers must serialize submit-state
// operations such as submit, Wait, WaitDirect, WaitExtended, Stop, and ring
// resize so the default fast path can skip shared synchronization. When
// true, it requests COOP_TASKRUN and keeps the shared-submit synchronization
// path.
MultiIssuers bool
// NotifySucceed ensures CQEs are generated for all successful operations.
NotifySucceed bool
// IndirectSubmissionQueue enables the SQ array. When false, New requests
// IORING_SETUP_NO_SQARRAY to reduce ring memory in the direct-index submit path.
IndirectSubmissionQueue bool
// SQE128 requests 128-byte SQE slots for the ring. This is required for
// Nop128 and UringCmd128. When false, the ring keeps the default 64-byte SQE
// layout.
SQE128 bool
// HybridPolling enables hybrid I/O polling mode (IORING_SETUP_HYBRID_IOPOLL).
// This delays polling to reduce CPU usage while maintaining low latency.
// Requires: O_DIRECT files on polling-capable storage devices (e.g., NVMe).
// Available since kernel 6.13.
HybridPolling bool
}
// New creates a new io_uring instance with the specified options.
// Returns an unstarted ring; call Start() to initialize buffers and enable.
func New(options ...OptionFunc) (*Uring, error) {
opt := defaultOptions
opt.Apply(options...)
baseSetupOpts := newSetupOptions(opt, false)
setupOpts := newSetupOptions(opt, opt.SQE128)
r, err := newIoUring(opt.Entries, setupOpts...)
if err != nil {
if opt.SQE128 && errors.Is(err, ErrInvalidParam) {
fallback, fallbackErr := newIoUring(opt.Entries, baseSetupOpts...)
if fallbackErr == nil {
_ = fallback.stop()
return nil, ErrNotSupported
}
}
return nil, err
}
rFlags, wFlags := uint8(0), uint8(0)
if !opt.NotifySucceed {
wFlags |= IOSQE_CQE_SKIP_SUCCESS
}
ret := Uring{
ioUring: r,
Options: &opt,
bufferRings: newUringBufferRings(),
ctxPools: NewContextPools(opt.Entries),
readLikeOpFlags: rFlags,
writeLikeOpFlags: wFlags,
}
if opt.MultiSizeBuffer > 0 {
ret.bufferGroups = newUringBufferGroups(opt.MultiSizeBuffer)
ret.bufferGroups.setGIDOffset(opt.ReadBufferGidOffset)
} else {
ret.buffers = newUringProvideBuffers(opt.ReadBufferSize, opt.ReadBufferNum)
ret.buffers.setGIDOffset(opt.ReadBufferGidOffset)
}
feat := Features{
SQEntries: int(r.params.sqEntries),
CQEntries: int(r.params.cqEntries),
SQEBytes: sqeBytesFromFlags(r.params.flags),
UserDataByteOrder: binary.LittleEndian,
}
if isBigEndian {
feat.UserDataByteOrder = binary.BigEndian
}
ret.Features = &feat
return &ret, nil
}
// Features reports per-ring sizing and metadata returned at creation time.
type Features struct {
// SQEntries is the actual number of SQ entries allocated by the kernel.
SQEntries int
// CQEntries is the actual number of CQ entries allocated by the kernel.
CQEntries int
// SQEBytes is the width of each mapped SQE slot in bytes.
SQEBytes int
// UserDataByteOrder is the byte order for user_data field interpretation.
UserDataByteOrder binary.ByteOrder
}
func newSetupOptions(opt Options, sqe128 bool) []func(params *ioUringParams) {
setupOpts := []func(params *ioUringParams){ioUringDisabledOptions}
if opt.MultiIssuers {
setupOpts = append(setupOpts, func(params *ioUringParams) {
params.flags |= IORING_SETUP_COOP_TASKRUN
})
} else {
setupOpts = append(setupOpts, func(params *ioUringParams) {
params.flags |= IORING_SETUP_SINGLE_ISSUER
params.flags |= IORING_SETUP_DEFER_TASKRUN
})
}
if !opt.IndirectSubmissionQueue {
setupOpts = append(setupOpts, ioUringNoSQArrayOptions)
}
if sqe128 {
setupOpts = append(setupOpts, ioUringSQE128Options)
}
if opt.HybridPolling {
setupOpts = append(setupOpts, ioUringHybridIoPollOptions)
}
return setupOpts
}
// Uring is the main io_uring interface for submitting and completing I/O operations.
// It wraps the kernel io_uring instance with buffer management and typed operations.
// Default rings use the single-issuer fast path, so submit-state operations
// are not safe for concurrent use by multiple goroutines; caller must
// serialize submit, Wait, WaitDirect, WaitExtended, Stop, and ResizeRings
// unless MultiIssuers is enabled.
type Uring struct {
*ioUring
*Options
// Features reports actual ring sizing and userdata metadata.
Features *Features
buffers *uringProvideBuffers
bufferGroups *uringProvideBufferGroups
bufferRings *uringBufferRings
// registeredBufRings keeps explicit RegisterBufRing* registrations reachable
// so Stop can unregister them and release any extra mappings.
registeredBufRings []registeredBufRing
buffersPool *RegisterBufferPool
// ctxPools provides lock-free pools for IndirectSQE and ExtSQE contexts.
// Capacity matches SQ entries for natural backpressure.
ctxPools *ContextPools
readLikeOpFlags uint8
writeLikeOpFlags uint8
}
type registeredBufRing struct {
groupID uint16
backing []byte
mmapPtr unsafe.Pointer
mmapSize uintptr
}
// Start initializes the io_uring instance with buffers and enables the ring.
// Context pools are constructed eagerly by New and are intentionally not
// reset here so any SQEs borrowed before Start remain valid.
func (ur *Uring) Start() (err error) {
if ur.closed.Load() {
return ErrClosed
}
if !ur.started.CompareAndSwap(false, true) {
return ErrExists
}
defer func() {
if err != nil {
err = errors.Join(err, ur.Stop())
}
}()
// Register probes as dormant placeholder infrastructure for future
// post-baseline capability branches. Current Linux 6.18+ startup does not
// derive public feature state from probe results.
probe := ioUringProbe{}
err = ur.registerProbe(&probe)
if err != nil {
return err
}
for _, op := range ur.ops {
switch op.op {
}
}
// register buffers
if ur.LockedBufferMem > (1 << 16) {
maximizeMemoryLock()
}
regBufNum := min(registerBufferNum, ur.LockedBufferMem/registerBufferSize)
ur.buffersPool = NewRegisterBufferPool(regBufNum)
ur.buffersPool.Fill(func() RegisterBuffer { return RegisterBuffer{} })
regBufAddr := unsafe.Pointer(unsafe.SliceData(ur.buffersPool.items))
err = ur.registerBuffers(regBufAddr, regBufNum, registerBufferSize)
if err != nil {
return err
}
// provide buffers
if ur.buffers != nil {
err = ur.bufferRings.registerBuffers(ur.ioUring, ur.buffers)
if err != nil {
return err
}
} else if ur.bufferGroups != nil {
err = ur.bufferRings.registerGroups(ur.ioUring, ur.bufferGroups)
if err != nil {
return err
}
}
ur.bufferRings.advance(ur.ioUring)
// enable ring
err = ur.enable()
if err != nil {
return err
}
return nil
}
// Stop tears down ring-owned resources and makes the ring permanently unusable.
// It is idempotent. Caller must drain all in-flight operations, reap
// outstanding CQEs, and quiesce live subscriptions before calling Stop. Stop
// is not safe for concurrent use.
func (ur *Uring) Stop() error {
var err error
if stopErr := ur.ioUring.stop(); stopErr != nil {
err = errors.Join(err, fmt.Errorf("release ring core: %w", stopErr))
}
if stopErr := ur.releaseRegisteredBufRings(); stopErr != nil {
err = errors.Join(err, fmt.Errorf("release manual buf rings: %w", stopErr))
}
if ur.bufferRings != nil {
if stopErr := ur.bufferRings.release(ur.ioUring); stopErr != nil {
err = errors.Join(err, fmt.Errorf("release provided buf rings: %w", stopErr))
}
}
ur.buffersPool = nil
ur.buffers = nil
ur.bufferGroups = nil
return err
}
func (ur *Uring) trackRegisteredBufRing(groupID uint16, backing []byte, mmapPtr unsafe.Pointer, mmapSize uintptr) {
ur.registeredBufRings = append(ur.registeredBufRings, registeredBufRing{
groupID: groupID,
backing: backing,
mmapPtr: mmapPtr,
mmapSize: mmapSize,
})
}
func (ur *Uring) releaseRegisteredBufRings() error {
var err error
for i := len(ur.registeredBufRings) - 1; i >= 0; i-- {
reg := ur.registeredBufRings[i]
if reg.mmapPtr != nil && reg.mmapSize != 0 {
errno := zcall.Munmap(reg.mmapPtr, reg.mmapSize)
if errno != 0 {
err = errors.Join(err, fmt.Errorf("munmap manual buf ring gid %d: %w", reg.groupID, errFromErrno(errno)))
}
}
}
ur.registeredBufRings = nil
return err
}
// Wait flushes pending submissions, drives deferred task work when needed, and
// collects completion events into cqes. On single-issuer rings it is not safe
// for concurrent use with submit, Wait, WaitDirect, WaitExtended, Stop, or
// ResizeRings; caller must serialize those operations. On IOPOLL rings Wait
// also performs the nonblocking poll enter needed to make completions visible.
// It returns the number of events received, ErrCQOverflow when the ring enters
// CQ overflow and no CQEs are immediately claimable, or `iox.ErrWouldBlock` if
// the CQ is empty.
//
// CQEView provides direct field access to Res and Flags, and methods to access
// the submission context based on mode (Direct, Indirect, Extended).
//
// Example:
//
// cqes := make([]CQEView, 64)
// n, err := ring.Wait(cqes)
// for i := range n {
// cqe := &cqes[i]
// if err := cqe.Err(); err != nil {
// handleCompletionError(cqe.Op(), err)
// continue
// }
// if cqe.Extended() {
// ext := cqe.ExtSQE()
// ctx := ViewCtx(ext).Vals1()
// seq := ctx.Val1
// handleCompletion(cqe.Res, seq)
// }
// }
func (ur *Uring) Wait(cqes []CQEView) (n int, err error) {
err = ur.ioUring.enter()
if err != nil {
return 0, err
}
// Use batch retrieval: single CAS claims multiple CQEs
return ur.ioUring.waitBatch(cqes)
}
// ========================================
// Context Pool Accessors
// ========================================
// ExtSQE acquires an ExtSQE from the pool for Extended mode submissions.
// Returns nil if the pool is exhausted (ring is full - natural backpressure).
// The returned ExtSQE is borrowed until PutExtSQE after the corresponding CQE
// is processed. Callers must not retain pointers into SQE or UserData after
// release.
//
//go:nosplit
func (ur *Uring) ExtSQE() *ExtSQE {
return ur.ctxPools.Extended()
}
// PutExtSQE returns an ExtSQE to the pool after completion processing.
// Must be called exactly once per ExtSQE to maintain pool balance.
// After this call the ExtSQE, typed context views, and raw CastUserData
// overlays derived from it are invalid.
//
//go:nosplit
func (ur *Uring) PutExtSQE(sqe *ExtSQE) {
ur.ctxPools.PutExtended(sqe)
}
// IndirectSQE acquires an IndirectSQE from the pool for Indirect mode submissions.
// Returns nil if the pool is exhausted.
// The returned IndirectSQE is borrowed until PutIndirectSQE.
//
//go:nosplit
func (ur *Uring) IndirectSQE() *IndirectSQE {
return ur.ctxPools.Indirect()
}
// PutIndirectSQE returns an IndirectSQE to the pool.
// After this call the IndirectSQE is invalid and must not be reused.
//
//go:nosplit
func (ur *Uring) PutIndirectSQE(sqe *IndirectSQE) {
ur.ctxPools.PutIndirect(sqe)
}
// SubmitExtended submits an SQE using Extended mode context.
// The ExtSQE.SQE fields must be populated before calling this method.
// The io_uring.user_data field is set to the SQEContext (pointer + mode bits).
func (ur *Uring) SubmitExtended(sqeCtx SQEContext) error {
return ur.ioUring.submitExtended(sqeCtx)
}
// ========================================
// Registered Buffer Access
// ========================================
// RegisteredBuffer returns the registered buffer at the given index.
// Returns nil if the index is out of range.
// The returned slice shares memory with the kernel; writes are visible
// to zero-copy operations using the same buffer index.
//
//go:nosplit
func (ur *Uring) RegisteredBuffer(index int) []byte {
if index < 0 || index >= len(ur.bufs) {
return nil
}
return ur.bufs[index]
}
// RegisteredBufferCount returns the number of registered buffers.
//
//go:nosplit
func (ur *Uring) RegisteredBufferCount() int {
return len(ur.bufs)
}
// ========================================
// Ring Introspection (for backpressure)
// ========================================
// SQAvailable returns the number of SQEs available for submission.
// Caller-side runtime code can use this for admission control and
// backpressure. On single-issuer rings it is not safe for concurrent use with ResizeRings;
// caller must serialize those operations.
func (ur *Uring) SQAvailable() int {
ur.lockSubmitState()
defer ur.unlockSubmitState()
if ur.closed.Load() || ur.sq.kRingEntries == nil || ur.sq.kHead == nil || ur.sq.kTail == nil {
return 0
}
entries := int(*ur.sq.kRingEntries)
pending := ur.sqCount()
return entries - pending
}
// CQPending returns the number of CQEs waiting to be reaped.
// Caller-side runtime code can use this to decide when to drain
// completions. On single-issuer rings it is not safe for concurrent use with ResizeRings;
// caller must serialize those operations.
func (ur *Uring) CQPending() int {
ur.lockSubmitState()
defer ur.unlockSubmitState()
if ur.closed.Load() || ur.cq.kHead == nil || ur.cq.kTail == nil {
return 0
}
h := atomic.LoadUint32(ur.cq.kHead)
t := atomic.LoadUint32(ur.cq.kTail)
return int(t - h)
}
// RingFD returns the io_uring file descriptor.
// Required for cross-ring operations via IORING_OP_MSG_RING.
//
//go:nosplit
func (ur *Uring) RingFD() iofd.FD {
return iofd.FD(ur.ringFd)
}
// ========================================
// Socket Operations
// ========================================
// SocketRaw creates a socket using io_uring.
// The fd field in sqeCtx is ignored (will be set to domain by the kernel).
func (ur *Uring) SocketRaw(sqeCtx SQEContext, domain, typ, proto int, options ...OpOptionFunc) error {
flags, fileIndex := ur.socketOptions(options)
ctx := sqeCtx.OrFlags(flags)
return ur.socket(ctx, domain, typ, proto, fileIndex)
}
// TCP4Socket creates a TCP IPv4 socket.
func (ur *Uring) TCP4Socket(sqeCtx SQEContext, options ...OpOptionFunc) error {
return ur.SocketRaw(sqeCtx, AF_INET, SOCK_STREAM|SOCK_CLOEXEC, IPPROTO_TCP, options...)
}
// TCP6Socket creates a TCP IPv6 socket.
func (ur *Uring) TCP6Socket(sqeCtx SQEContext, options ...OpOptionFunc) error {
return ur.SocketRaw(sqeCtx, AF_INET6, SOCK_STREAM|SOCK_CLOEXEC, IPPROTO_TCP, options...)
}
// UDP4Socket creates a UDP IPv4 socket.
func (ur *Uring) UDP4Socket(sqeCtx SQEContext, options ...OpOptionFunc) error {
return ur.SocketRaw(sqeCtx, AF_INET, SOCK_DGRAM|SOCK_CLOEXEC, IPPROTO_UDP, options...)
}
// UDP6Socket creates a UDP IPv6 socket.
func (ur *Uring) UDP6Socket(sqeCtx SQEContext, options ...OpOptionFunc) error {
return ur.SocketRaw(sqeCtx, AF_INET6, SOCK_DGRAM|SOCK_CLOEXEC, IPPROTO_UDP, options...)
}
// UDPLITE4Socket creates a UDP-Lite IPv4 socket.
func (ur *Uring) UDPLITE4Socket(sqeCtx SQEContext, options ...OpOptionFunc) error {
return ur.SocketRaw(sqeCtx, AF_INET, SOCK_DGRAM|SOCK_CLOEXEC, IPPROTO_UDPLITE, options...)
}
// UDPLITE6Socket creates a UDP-Lite IPv6 socket.
func (ur *Uring) UDPLITE6Socket(sqeCtx SQEContext, options ...OpOptionFunc) error {
return ur.SocketRaw(sqeCtx, AF_INET6, SOCK_DGRAM|SOCK_CLOEXEC, IPPROTO_UDPLITE, options...)
}
// SCTP4Socket creates an SCTP IPv4 socket.
func (ur *Uring) SCTP4Socket(sqeCtx SQEContext, options ...OpOptionFunc) error {
return ur.SocketRaw(sqeCtx, AF_INET, SOCK_SEQPACKET|SOCK_CLOEXEC, IPPROTO_SCTP, options...)
}
// SCTP6Socket creates an SCTP IPv6 socket.
func (ur *Uring) SCTP6Socket(sqeCtx SQEContext, options ...OpOptionFunc) error {
return ur.SocketRaw(sqeCtx, AF_INET6, SOCK_SEQPACKET|SOCK_CLOEXEC, IPPROTO_SCTP, options...)
}
// UnixSocket creates a Unix domain socket.
func (ur *Uring) UnixSocket(sqeCtx SQEContext, options ...OpOptionFunc) error {
return ur.SocketRaw(sqeCtx, AF_LOCAL, SOCK_SEQPACKET|SOCK_CLOEXEC, 0, options...)
}
// SocketDirect creates a socket directly into a registered file table slot.
// The fileIndex specifies which slot to use (0-based), or use IORING_FILE_INDEX_ALLOC
// for auto-allocation (the allocated index is returned in CQE res).
// Requires registered files via RegisterFiles or RegisterFilesSparse.
func (ur *Uring) SocketDirect(sqeCtx SQEContext, domain, typ, proto int, fileIndex uint32, options ...OpOptionFunc) error {
flags, _ := ur.socketOptions(options)
ctx := sqeCtx.OrFlags(flags)
return ur.socketDirect(ctx, domain, typ, proto, fileIndex)
}
// TCP4SocketDirect creates a TCP IPv4 socket directly into a registered file table slot.
// Uses IORING_FILE_INDEX_ALLOC for auto-allocation; returns slot index in CQE res.
func (ur *Uring) TCP4SocketDirect(sqeCtx SQEContext, options ...OpOptionFunc) error {
return ur.SocketDirect(sqeCtx, AF_INET, SOCK_STREAM|SOCK_NONBLOCK, IPPROTO_TCP, IORING_FILE_INDEX_ALLOC, options...)
}
// TCP6SocketDirect creates a TCP IPv6 socket directly into a registered file table slot.
// Uses IORING_FILE_INDEX_ALLOC for auto-allocation; returns slot index in CQE res.
func (ur *Uring) TCP6SocketDirect(sqeCtx SQEContext, options ...OpOptionFunc) error {
return ur.SocketDirect(sqeCtx, AF_INET6, SOCK_STREAM|SOCK_NONBLOCK, IPPROTO_TCP, IORING_FILE_INDEX_ALLOC, options...)
}
// UDP4SocketDirect creates a UDP IPv4 socket directly into a registered file table slot.
// Uses IORING_FILE_INDEX_ALLOC for auto-allocation; returns slot index in CQE res.
func (ur *Uring) UDP4SocketDirect(sqeCtx SQEContext, options ...OpOptionFunc) error {
return ur.SocketDirect(sqeCtx, AF_INET, SOCK_DGRAM|SOCK_NONBLOCK, IPPROTO_UDP, IORING_FILE_INDEX_ALLOC, options...)
}
// UDP6SocketDirect creates a UDP IPv6 socket directly into a registered file table slot.
// Uses IORING_FILE_INDEX_ALLOC for auto-allocation; returns slot index in CQE res.
func (ur *Uring) UDP6SocketDirect(sqeCtx SQEContext, options ...OpOptionFunc) error {
return ur.SocketDirect(sqeCtx, AF_INET6, SOCK_DGRAM|SOCK_NONBLOCK, IPPROTO_UDP, IORING_FILE_INDEX_ALLOC, options...)
}
// UDPLITE4SocketDirect creates a UDP-Lite IPv4 socket directly into a registered file table slot.
// Uses IORING_FILE_INDEX_ALLOC for auto-allocation; returns slot index in CQE res.
func (ur *Uring) UDPLITE4SocketDirect(sqeCtx SQEContext, options ...OpOptionFunc) error {
return ur.SocketDirect(sqeCtx, AF_INET, SOCK_DGRAM|SOCK_NONBLOCK, IPPROTO_UDPLITE, IORING_FILE_INDEX_ALLOC, options...)
}
// UDPLITE6SocketDirect creates a UDP-Lite IPv6 socket directly into a registered file table slot.
// Uses IORING_FILE_INDEX_ALLOC for auto-allocation; returns slot index in CQE res.
func (ur *Uring) UDPLITE6SocketDirect(sqeCtx SQEContext, options ...OpOptionFunc) error {
return ur.SocketDirect(sqeCtx, AF_INET6, SOCK_DGRAM|SOCK_NONBLOCK, IPPROTO_UDPLITE, IORING_FILE_INDEX_ALLOC, options...)
}
// SCTP4SocketDirect creates an SCTP IPv4 socket directly into a registered file table slot.
// Uses IORING_FILE_INDEX_ALLOC for auto-allocation; returns slot index in CQE res.
func (ur *Uring) SCTP4SocketDirect(sqeCtx SQEContext, options ...OpOptionFunc) error {
return ur.SocketDirect(sqeCtx, AF_INET, SOCK_SEQPACKET|SOCK_NONBLOCK, IPPROTO_SCTP, IORING_FILE_INDEX_ALLOC, options...)
}
// SCTP6SocketDirect creates an SCTP IPv6 socket directly into a registered file table slot.
// Uses IORING_FILE_INDEX_ALLOC for auto-allocation; returns slot index in CQE res.
func (ur *Uring) SCTP6SocketDirect(sqeCtx SQEContext, options ...OpOptionFunc) error {
return ur.SocketDirect(sqeCtx, AF_INET6, SOCK_SEQPACKET|SOCK_NONBLOCK, IPPROTO_SCTP, IORING_FILE_INDEX_ALLOC, options...)
}
// UnixSocketDirect creates a Unix domain socket directly into a registered file table slot.
// Uses IORING_FILE_INDEX_ALLOC for auto-allocation; returns slot index in CQE res.
func (ur *Uring) UnixSocketDirect(sqeCtx SQEContext, options ...OpOptionFunc) error {
return ur.SocketDirect(sqeCtx, AF_LOCAL, SOCK_SEQPACKET|SOCK_NONBLOCK, 0, IORING_FILE_INDEX_ALLOC, options...)
}
// Bind binds a socket to an address.
func (ur *Uring) Bind(sqeCtx SQEContext, addr Addr, options ...OpOptionFunc) error {
flags := ur.bindOptions(options)
ctx := sqeCtx.OrFlags(flags)
return ur.bind(ctx, AddrToSockaddr(addr))
}
// Listen starts listening on a socket.
func (ur *Uring) Listen(sqeCtx SQEContext, options ...OpOptionFunc) error {
flags, backlog := ur.listenOptions(options)
ctx := sqeCtx.OrFlags(flags)
return ur.listen(ctx, backlog)
}
// Accept accepts a new connection from a listener socket.
// The fd in sqeCtx should be set to the listener socket.
func (ur *Uring) Accept(sqeCtx SQEContext, options ...OpOptionFunc) error {
flags, ioprio := ur.acceptOptions(options)
ctx := sqeCtx.OrFlags(flags)
return ur.accept(ctx, ioprio)
}
// SubmitAcceptMultishot submits the raw kernel multishot accept opcode.
// For the managed subscription helper, use [Uring.AcceptMultishot].
func (ur *Uring) SubmitAcceptMultishot(sqeCtx SQEContext, options ...OpOptionFunc) error {
flags, ioprio := ur.acceptOptions(options)
ctx := sqeCtx.OrFlags(flags)
return ur.accept(ctx, ioprio|IORING_ACCEPT_MULTISHOT)
}
// AcceptDirect accepts a connection directly into a registered file table slot.
// The fileIndex specifies which slot to use (0-based), or use IORING_FILE_INDEX_ALLOC
// for auto-allocation (the allocated index is returned in CQE res).
// SOCK_CLOEXEC is not supported with direct accept.
// Requires registered files via RegisterFiles or RegisterFilesSparse.
func (ur *Uring) AcceptDirect(sqeCtx SQEContext, fileIndex uint32, options ...OpOptionFunc) error {
flags, ioprio := ur.acceptOptions(options)
ctx := sqeCtx.OrFlags(flags)
return ur.acceptDirect(ctx, ioprio, fileIndex)
}
// SubmitAcceptDirectMultishot submits the raw kernel multishot accept-direct opcode.
// Each accepted connection uses the next available slot from auto-allocation.
// Requires IORING_FILE_INDEX_ALLOC as fileIndex for auto-allocation.
func (ur *Uring) SubmitAcceptDirectMultishot(sqeCtx SQEContext, fileIndex uint32, options ...OpOptionFunc) error {
flags, ioprio := ur.acceptOptions(options)
ctx := sqeCtx.OrFlags(flags)
return ur.acceptDirect(ctx, ioprio|IORING_ACCEPT_MULTISHOT, fileIndex)
}
// Connect initiates a socket connection to a remote address.
func (ur *Uring) Connect(sqeCtx SQEContext, remote Addr, options ...OpOptionFunc) error {
flags := ur.connectOptions(options)
ctx := sqeCtx.OrFlags(flags)
return ur.connect(ctx, AddrToSockaddr(remote))
}
// Receive performs a socket receive operation with MSG_WAITALL semantics.
// If b is nil, it uses buffer selection from the kernel-provided buffer ring.
// If b is non-nil, caller must keep b valid until the operation completes.
func (ur *Uring) Receive(sqeCtx SQEContext, pollFD PollFd, b []byte, options ...OpOptionFunc) error {
ctx := sqeCtx.WithFD(iofd.FD(pollFD.Fd()))
if b == nil {
flags, ioprio, bufSize, bufGroup := ur.receiveWithBufferSelectOptions(pollFD, options)
ctx = ctx.OrFlags(flags | ur.readLikeOpFlags)
return ur.receiveWithBufferSelect(ctx, ioprio, bufSize, bufGroup)
}
flags, ioprio, offset, n := ur.receiveOptions(b, options)
ctx = ctx.OrFlags(flags | ur.readLikeOpFlags)
return ur.receive(ctx, ioprio, b, uint64(offset), n)
}
// SubmitReceiveMultishot submits the raw kernel multishot receive opcode.
// For the managed subscription helper, use [Uring.ReceiveMultishot].
// If b is non-nil, caller must keep b valid until the operation completes.
func (ur *Uring) SubmitReceiveMultishot(sqeCtx SQEContext, pollFD PollFd, b []byte, options ...OpOptionFunc) error {
ctx := sqeCtx.WithFD(iofd.FD(pollFD.Fd()))
if b == nil {
flags, ioprio, bufSize, bufGroup := ur.receiveWithBufferSelectOptions(pollFD, options)
ctx = ctx.OrFlags(flags | ur.readLikeOpFlags)
return ur.receiveWithBufferSelect(ctx, ioprio|IORING_RECV_MULTISHOT, bufSize, bufGroup)
}
flags, ioprio, offset, n := ur.receiveOptions(b, options)
ctx = ctx.OrFlags(flags | ur.readLikeOpFlags)
return ur.receive(ctx, ioprio|IORING_RECV_MULTISHOT, b, uint64(offset), n)
}
// ReceiveBundle performs a bundle receive operation.
// Grabs multiple contiguous buffers from the buffer group in a single operation.
// The CQE result contains bytes received; use BundleBuffers() to get buffer range.
// Always uses buffer selection from the kernel-provided buffer ring.
func (ur *Uring) ReceiveBundle(sqeCtx SQEContext, pollFD PollFd, options ...OpOptionFunc) error {
ctx := sqeCtx.WithFD(iofd.FD(pollFD.Fd()))
flags, ioprio, bufSize, bufGroup := ur.receiveWithBufferSelectOptions(pollFD, options)
ctx = ctx.OrFlags(flags | ur.readLikeOpFlags)
return ur.receiveBundle(ctx, ioprio, bufSize, bufGroup)
}
// SubmitReceiveBundleMultishot submits the raw kernel multishot bundle receive opcode.
// It combines multishot with bundle reception for high-throughput raw CQE flows.
func (ur *Uring) SubmitReceiveBundleMultishot(sqeCtx SQEContext, pollFD PollFd, options ...OpOptionFunc) error {
ctx := sqeCtx.WithFD(iofd.FD(pollFD.Fd()))
flags, ioprio, bufSize, bufGroup := ur.receiveWithBufferSelectOptions(pollFD, options)
ctx = ctx.OrFlags(flags | ur.readLikeOpFlags)
return ur.receiveBundle(ctx, ioprio|IORING_RECV_MULTISHOT, bufSize, bufGroup)
}
// BundleIterator constructs a BundleIterator for the given CQE and buffer group.
// The group parameter is the group ID as used during buffer ring registration.
// Returns a zero BundleIterator and false if the group is not registered or the CQE has no data.
//
// The returned iterator borrows ring-owned backing memory. Process the buffers
// before calling Recycle, and call Recycle from a single goroutine without
// racing other buffer-ring recycle/advance activity on the same Uring.
func (ur *Uring) BundleIterator(cqe CQEView, group uint16) (BundleIterator, bool) {
return ur.bufferRings.bundleIterator(cqe, int(group)-ur.ReadBufferGidOffset, uint16(ur.ReadBufferGidOffset), group)
}
// Send writes data to a socket.
// Caller must keep p valid until the operation completes.
func (ur *Uring) Send(sqeCtx SQEContext, pollFD PollFd, p []byte, options ...OpOptionFunc) error {
flags, ioprio, offset, n := ur.sendOptions(p, options)
ctx := sqeCtx.WithFD(iofd.FD(pollFD.Fd())).OrFlags(flags | ur.writeLikeOpFlags)
return ur.send(ctx, ioprio, p, uint64(offset), n)
}
// SendTargets represents a set of target sockets for multicast/broadcast.
type SendTargets interface {
// Count returns the number of targets.
Count() int
// FD returns the file descriptor at index i.
FD(i int) iofd.FD
}
// Multicast sends data to multiple sockets, selecting copy vs zero-copy per message size.
//
// Strategy selection (conservative thresholds, based on Linux 6.18 measurements):
// io_uring cycle ~523ns, ZC needs 2 cycles (~1046ns overhead).
// Uses zero-copy only when memcpy savings clearly exceed overhead:
// - N < 8: >= 8 KiB uses zero-copy (high bar, overhead not amortized)
// - N < 64: >= 4 KiB uses zero-copy
// - N < 512: >= 3 KiB uses zero-copy
// - N < 4096: >= 2 KiB uses zero-copy
// - N >= 4096: >= 1.5 KiB uses zero-copy (fully amortized)
//
// For aggressive zero-copy usage, use MulticastZeroCopy instead.
//
// Zero-copy notes:
// - Produces two CQEs per send: completion (IORING_CQE_F_MORE) + notification
// - p must remain valid until all target sends complete
// - Zero-copy sends keep registered buffers immutable until notification CQE
// - Requires TCP sockets; returns EOPNOTSUPP on Unix sockets or loopback
//
// Parameters:
// - sqeCtx: base context (FD will be overwritten per target)
// - targets: collection of target sockets
// - bufIndex: registered buffer index (use -1 for non-registered buffer)
// - p: payload data (used when bufIndex < 0)
// - offset: offset within buffer
// - n: number of bytes to send
func (ur *Uring) Multicast(sqeCtx SQEContext, targets SendTargets, bufIndex int, p []byte, offset int64, n int, options ...OpOptionFunc) error {
count := targets.Count()
if count == 0 {
return nil
}
if n < 0 {
return ErrInvalidParam
}
flags := ur.operationOptions(options)
ctx := sqeCtx.OrFlags(flags | ur.writeLikeOpFlags)
offsetInt := 0
userPayload := p
if bufIndex >= 0 {
buf := ur.RegisteredBuffer(bufIndex)
if buf == nil || offset < 0 || offset > int64(len(buf)) {
return ErrInvalidParam
}
offsetInt = int(offset)
if n > len(buf)-offsetInt {
return ErrInvalidParam
}
} else {
if offset < 0 || offset > int64(len(p)) {
return ErrInvalidParam
}
offsetInt = int(offset)
if n > len(p)-offsetInt {
return ErrInvalidParam
}
if offsetInt < len(p) {
userPayload = p[offsetInt:]
}
}
// Strategy: use zero-copy with registered buffers based on payload size and
// destination count. More destinations amortize ZC overhead (pinning, two-CQE).
//
// Conservative thresholds (based on Linux 6.18 measurements):
// io_uring cycle ~523ns, ZC needs 2 cycles (~1046ns overhead).
// Use ZC only when memcpy savings clearly exceed overhead.
//
// N < 8: 8 KiB (high bar - ZC overhead not amortized)
// N < 64: 4 KiB
// N < 512: 3 KiB
// N < 4096: 2 KiB
// N >= 4096: 1.5 KiB (fully amortized)
var threshold int
switch {
case count >= 4096:
threshold = 1536
case count >= 512:
threshold = 2048
case count >= 64:
threshold = 3072
case count >= 8:
threshold = 4096
default:
threshold = 8192
}
useZeroCopy := bufIndex >= 0 && n >= threshold
var err error
for i := range count {
fd := targets.FD(i)
targetCtx := ctx.WithFD(fd)
var sendErr error
if useZeroCopy {
sendErr = ur.sendZeroCopyFixed(targetCtx, bufIndex, uint64(offsetInt), n, 0)
} else if bufIndex >= 0 {
sendErr = ur.sendFixed(targetCtx, bufIndex, uint64(offsetInt), n, 0)
} else {
sendErr = ur.send(targetCtx, 0, userPayload, 0, n)
}
err = errors.Join(err, sendErr)
}
return err
}
// MulticastZeroCopy sends data to multiple sockets using zero-copy with registered buffers.
// This method uses very aggressive thresholds - user explicitly requested zero-copy.
//
// Very aggressive thresholds (use ZC whenever there's any reasonable chance of benefit):
// - N < 4: >= 1.5 KiB uses zero-copy (minimal bar)
// - N < 16: >= 1 KiB uses zero-copy
// - N < 64: >= 512 B uses zero-copy
// - N < 256: >= 128 B uses zero-copy
// - N >= 256: any size uses zero-copy (fully amortized)
//
// For conservative zero-copy usage, use Multicast instead.
//
// Prerequisites:
// - Buffer must be registered via IORING_REGISTER_BUFFERS2
// - bufIndex must be a valid registered buffer index
//
// Use this for:
// - Live streaming (same video/audio frame to thousands of viewers)
// - Real-time gaming (same game state to many players)
// - Any scenario with O(1) payload and O(N) targets
//
// Zero-copy notes:
// - Produces two CQEs per send: completion (IORING_CQE_F_MORE) + notification
// - Registered buffers must remain immutable until notification CQE
// - May return EOPNOTSUPP on Unix sockets or loopback
func (ur *Uring) MulticastZeroCopy(sqeCtx SQEContext, targets SendTargets, bufIndex int, offset int64, n int, options ...OpOptionFunc) error {
count := targets.Count()
if count == 0 {
return nil
}
if n < 0 {
return ErrInvalidParam
}
buf := ur.RegisteredBuffer(bufIndex)
if buf == nil || offset < 0 || offset > int64(len(buf)) {
return ErrInvalidParam
}
offsetInt := int(offset)
if n > len(buf)-offsetInt {
return ErrInvalidParam
}
flags := ur.operationOptions(options)
ctx := sqeCtx.OrFlags(flags | ur.writeLikeOpFlags)
// Very aggressive thresholds - user explicitly requested zero-copy.
// Use ZC whenever there's any reasonable chance of benefit.
//
// N < 4: 1.5 KiB (minimal bar)
// N < 16: 1 KiB
// N < 64: 512 B
// N < 256: 128 B
// N >= 256: any size (fully amortized)
var threshold int
switch {
case count >= 256:
threshold = 0 // any size
case count >= 64:
threshold = 128
case count >= 16:
threshold = 512
case count >= 4:
threshold = 1024
default:
threshold = 1536
}
// Fall back to sendFixed if below threshold.
useZeroCopy := n >= threshold
var err error
for i := range count {
fd := targets.FD(i)
targetCtx := ctx.WithFD(fd)
var sendErr error
if useZeroCopy {
sendErr = ur.sendZeroCopyFixed(targetCtx, bufIndex, uint64(offsetInt), n, 0)
} else {
sendErr = ur.sendFixed(targetCtx, bufIndex, uint64(offsetInt), n, 0)
}
err = errors.Join(err, sendErr)
}
return err
}
// Timeout submits a timeout request with the specified duration.
func (ur *Uring) Timeout(sqeCtx SQEContext, d time.Duration, options ...OpOptionFunc) error {
flags, cnt, timeoutFlags := ur.timeoutOptions(options)
ctx := sqeCtx.OrFlags(flags)
nano := d.Nanoseconds()
return ur.timeout(ctx, cnt, &Timespec{Sec: nano / int64(time.Second), Nsec: nano % int64(time.Second)}, int(timeoutFlags))
}
// Shutdown gracefully closes a socket.
func (ur *Uring) Shutdown(sqeCtx SQEContext, how int, options ...OpOptionFunc) error {
flags := ur.shutdownOptions(options)
ctx := sqeCtx.OrFlags(flags)
return ur.shutdown(ctx, how)
}
// Nop submits a no-op request.
func (ur *Uring) Nop(sqeCtx SQEContext, options ...OpOptionFunc) error {
flags := ur.operationOptions(options)
return ur.nop(sqeCtx.OrFlags(flags))
}
// Close submits `IORING_OP_CLOSE` for the descriptor carried in sqeCtx.
// If sqeCtx carries IOSQE_FIXED_FILE, Close treats its FD field as a
// registered-file slot and emits close-direct semantics for that slot.
// It closes a target descriptor; it does not tear down the Uring instance.
func (ur *Uring) Close(sqeCtx SQEContext, options ...OpOptionFunc) error {
flags := ur.closeOptions(options)
return ur.close(sqeCtx.OrFlags(flags))
}
// Read performs a read operation.
// Caller must keep b valid until the operation completes.
func (ur *Uring) Read(sqeCtx SQEContext, b []byte, options ...OpOptionFunc) error {
flags, ioprio, offset, n := ur.readOptions(b, options)
ctx := sqeCtx.OrFlags(flags | ur.readLikeOpFlags)
return ur.read(ctx, ioprio, b, uint64(offset), n)
}
// Write performs a write operation.
// Caller must keep b valid until the operation completes.
func (ur *Uring) Write(sqeCtx SQEContext, b []byte, options ...OpOptionFunc) error {
flags, ioprio, offset, n := ur.writeOptions(b, options)
ctx := sqeCtx.OrFlags(flags | ur.writeLikeOpFlags)
return ur.write(ctx, ioprio, b, uint64(offset), n)
}
// Splice transfers data between file descriptors.
func (ur *Uring) Splice(sqeCtx SQEContext, fdIn int, n int, options ...OpOptionFunc) error {
flags, _, spliceFlags := ur.spliceOptions(options)
ctx := sqeCtx.OrFlags(flags)
return ur.splice(ctx, fdIn, nil, nil, n, int(spliceFlags))
}
// Tee duplicates data between pipes.
func (ur *Uring) Tee(sqeCtx SQEContext, fdIn int, length int, options ...OpOptionFunc) error {
flags, _, spliceFlags := ur.teeOptions(options)
ctx := sqeCtx.OrFlags(flags)
return ur.tee(ctx, fdIn, length, int(spliceFlags))
}
// Sync performs a file sync operation.
func (ur *Uring) Sync(sqeCtx SQEContext, options ...OpOptionFunc) error {
flags, fsyncFlags := ur.syncOptions(options)
return ur.fsync(sqeCtx.OrFlags(flags), fsyncFlags)
}
// ReadV performs a vectored read operation.
// Caller must keep iovs and their backing buffers valid until the operation completes.
func (ur *Uring) ReadV(sqeCtx SQEContext, iovs [][]byte, options ...OpOptionFunc) error {
flags := ur.operationOptions(options)
ctx := sqeCtx.OrFlags(flags | ur.readLikeOpFlags)
return ur.readv(ctx, iovs)
}
// WriteV performs a vectored write operation.
// Caller must keep iovs and their backing buffers valid until the operation completes.
func (ur *Uring) WriteV(sqeCtx SQEContext, iovs [][]byte, options ...OpOptionFunc) error {
flags := ur.operationOptions(options)
ctx := sqeCtx.OrFlags(flags | ur.writeLikeOpFlags)
return ur.writev(ctx, iovs)
}