diff --git a/io-uring-test/src/tests/register_buf_ring.rs b/io-uring-test/src/tests/register_buf_ring.rs index a3eb112..c84b98f 100644 --- a/io-uring-test/src/tests/register_buf_ring.rs +++ b/io-uring-test/src/tests/register_buf_ring.rs @@ -2,127 +2,34 @@ // The entry point in this file can be found by searching for 'pub'. use crate::Test; -use io_uring::types; -use io_uring::types::BufRingEntry; -use io_uring::{cqueue, opcode, squeue, IoUring}; +use io_uring::buf_ring::BufRing; +use io_uring::{cqueue, opcode, squeue, CompletionQueue, IoUring, SubmissionQueue}; +use io_uring::{types, Submitter}; -use std::cell::Cell; +use std::cell::UnsafeCell; use std::fmt; use std::io; +use std::mem::ManuallyDrop; use std::os::unix::io::AsRawFd; -use std::ptr; use std::rc::Rc; -use std::sync::atomic::{self, AtomicU16}; type Bgid = u16; // Buffer group id type Bid = u16; // Buffer id -/// An anonymous region of memory mapped using `mmap(2)`, not backed by a file -/// but that is guaranteed to be page-aligned and zero-filled. -pub struct AnonymousMmap { - addr: ptr::NonNull, - len: usize, -} - -impl AnonymousMmap { - /// Allocate `len` bytes that are page aligned and zero-filled. - pub fn new(len: usize) -> io::Result { - unsafe { - match libc::mmap( - ptr::null_mut(), - len, - libc::PROT_READ | libc::PROT_WRITE, - libc::MAP_ANONYMOUS | libc::MAP_SHARED | libc::MAP_POPULATE, - -1, - 0, - ) { - libc::MAP_FAILED => Err(io::Error::last_os_error()), - addr => { - // here, `mmap` will never return null - let addr = ptr::NonNull::new_unchecked(addr); - Ok(AnonymousMmap { addr, len }) - } - } - } - } - - /// Do not make the stored memory accessible by child processes after a `fork`. - pub fn dontfork(&self) -> io::Result<()> { - match unsafe { libc::madvise(self.addr.as_ptr(), self.len, libc::MADV_DONTFORK) } { - 0 => Ok(()), - _ => Err(io::Error::last_os_error()), - } - } - - /// Get a pointer to the memory. - #[inline] - pub fn as_ptr(&self) -> *const libc::c_void { - self.addr.as_ptr() - } - - /// Get a mut pointer to the memory. - #[inline] - pub fn as_ptr_mut(&self) -> *mut libc::c_void { - self.addr.as_ptr() - } - - /// Get a pointer to the data at the given offset. - #[inline] - #[allow(dead_code)] - pub unsafe fn offset(&self, offset: u32) -> *const libc::c_void { - self.as_ptr().add(offset as usize) - } - - /// Get a mut pointer to the data at the given offset. - #[inline] - #[allow(dead_code)] - pub unsafe fn offset_mut(&self, offset: u32) -> *mut libc::c_void { - self.as_ptr_mut().add(offset as usize) - } -} - -impl Drop for AnonymousMmap { - fn drop(&mut self) { - unsafe { - libc::munmap(self.addr.as_ptr(), self.len); - } - } -} - -struct InnerBufRing { - // All these fields are constant once the struct is instantiated except the one of type Cell. - bgid: Bgid, - - ring_entries_mask: u16, // Invariant one less than ring_entries which is > 0, power of 2, max 2^15 (32768). - - buf_cnt: u16, // Invariants: > 0, <= ring_entries. - buf_len: usize, // Invariant: > 0. - - // `ring_start` holds the memory allocated for the buf_ring, the ring of entries describing - // the buffers being made available to the uring interface for this buf group id. - ring_start: AnonymousMmap, - - buf_list: Vec>, - - // `local_tail` is the copy of the tail index that we update when a buffer is dropped and - // therefore its buffer id is released and added back to the ring. It also serves for adding - // buffers to the ring during init but that's not as interesting. - local_tail: Cell, - - // `shared_tail` points to the u16 memory inside the rings that the uring interface uses as the - // tail field. It is where the application writes new tail values and the kernel reads the tail - // value from time to time. The address could be computed from ring_start when needed. This - // might be here for no good reason any more. - shared_tail: *const AtomicU16, +struct InnerBufRing<'a> { + buf_ring: ManuallyDrop>>, + buf_list: ManuallyDrop>>, + buf_len: usize, } -impl InnerBufRing { +impl<'a> InnerBufRing<'a> { fn new( + submitter: &Submitter<'a>, bgid: Bgid, ring_entries: u16, buf_cnt: u16, buf_len: usize, - ) -> io::Result { + ) -> io::Result> { // Check that none of the important args are zero and the ring_entries is at least large // enough to hold all the buffers and that ring_entries is a power of 2. if (buf_cnt == 0) @@ -133,76 +40,14 @@ impl InnerBufRing { return Err(io::Error::from(io::ErrorKind::InvalidInput)); } - // entry_size is 16 bytes. - let entry_size = std::mem::size_of::(); - assert_eq!(entry_size, 16); - let ring_size = entry_size * (ring_entries as usize); - - // The memory is required to be page aligned and zero-filled by the uring buf_ring - // interface. Anonymous mmap promises both of those things. - // https://man7.org/linux/man-pages/man2/mmap.2.html - let ring_start = AnonymousMmap::new(ring_size).unwrap(); - ring_start.dontfork()?; - - // Probably some functional way to do this. - let buf_list: Vec> = { - let mut bp = Vec::with_capacity(buf_cnt as _); - for _ in 0..buf_cnt { - bp.push(vec![0; buf_len]); - } - bp - }; - - let shared_tail = - unsafe { types::BufRingEntry::tail(ring_start.as_ptr() as *const BufRingEntry) } - as *const AtomicU16; - - let ring_entries_mask = ring_entries - 1; - assert!((ring_entries & ring_entries_mask) == 0); - - let buf_ring = InnerBufRing { - bgid, - ring_entries_mask, - buf_cnt, - buf_len, - ring_start, - buf_list, - local_tail: Cell::new(0), - shared_tail, - }; - - Ok(buf_ring) - } - - // Register the buffer ring with the uring interface. - // Normally this is done automatically when building a BufRing. - // - // Warning: requires the CURRENT driver is already in place or will panic. - fn register(&self, ring: &mut IoUring) -> io::Result<()> - where - S: squeue::EntryMarker, - C: cqueue::EntryMarker, - { - let bgid = self.bgid; - - // Safety: The ring, represented by the ring_start and the ring_entries remains valid until - // it is unregistered. The backing store is an AnonymousMmap which remains valid until it - // is dropped which in this case, is when Self is dropped. - let res = unsafe { - ring.submitter().register_buf_ring( - self.ring_start.as_ptr() as _, - self.ring_entries(), - bgid, - ) - }; - - if let Err(e) = res { - match e.raw_os_error() { + let res = submitter.setup_buf_ring(ring_entries, bgid); + let mut buf_ring = match res { + Err(e) => match e.raw_os_error() { Some(libc::EINVAL) => { // using buf_ring requires kernel 5.19 or greater. return Err(io::Error::new( io::ErrorKind::Other, - format!("buf_ring.register returned {}, most likely indicating this kernel is not 5.19+", e), + format!("setup_buf_ring returned {}, most likely indicating this kernel is not 5.19+", e), )); } Some(libc::EEXIST) => { @@ -213,7 +58,7 @@ impl InnerBufRing { return Err(io::Error::new( io::ErrorKind::Other, format!( - "buf_ring.register returned `{}`, indicating the attempted buffer group id {} was already registered", + "setup_buf_ring returned `{}`, indicating the attempted buffer group id {} was already registered", e, bgid), )); @@ -221,42 +66,62 @@ impl InnerBufRing { _ => { return Err(io::Error::new( io::ErrorKind::Other, - format!("buf_ring.register returned `{}` for group id {}", e, bgid), + format!("setup_buf_ring returned `{}` for group id {}", e, bgid), )); } - } + }, + Ok(buf_ring) => buf_ring, }; - // Add the buffers after the registration. Really seems it could be done earlier too. - - for bid in 0..self.buf_cnt { - self.buf_ring_push(bid); + // Probably some functional way to do this. + let mut buf_list: Vec> = { + let mut bp = Vec::with_capacity(buf_cnt as _); + for _ in 0..buf_cnt { + bp.push(vec![0; buf_len]); + } + bp + }; + unsafe { + buf_ring.push_multiple(buf_list.iter_mut().enumerate().map(|(i, b)| { + ( + i as u16, + std::slice::from_raw_parts_mut(b.as_mut_ptr().cast(), b.capacity()), + ) + })); } - self.buf_ring_sync(); - res - } - - // Unregister the buffer ring from the io_uring. - // Normally this is done automatically when the BufRing goes out of scope. - fn unregister(&self, ring: &mut IoUring) -> io::Result<()> - where - S: squeue::EntryMarker, - C: cqueue::EntryMarker, - { - let bgid = self.bgid; + let buf_ring = InnerBufRing { + buf_ring: ManuallyDrop::new(UnsafeCell::new(buf_ring)), + buf_list: ManuallyDrop::new(buf_list), + buf_len, + }; - ring.submitter().unregister_buf_ring(bgid) + Ok(buf_ring) } + fn unregister(mut self) -> io::Result<()> { + unsafe { + ManuallyDrop::into_inner(std::ptr::read(&self.buf_ring)) + .into_inner() + .unregister()?; + ManuallyDrop::drop(&mut self.buf_list); + } + std::mem::forget(self); + Ok(()) + } // Returns the buffer group id. fn bgid(&self) -> Bgid { - self.bgid + unsafe { &*self.buf_ring.get() }.bgid() } // Returns the buffer the uring interface picked from the buf_ring for the completion result // represented by the res and flags. - fn get_buf(&self, buf_ring: FixedSizeBufRing, res: u32, flags: u32) -> io::Result { + fn get_buf( + &self, + buf_ring: FixedSizeBufRing<'a>, + res: u32, + flags: u32, + ) -> io::Result> { // This fn does the odd thing of having self as the BufRing and taking an argument that is // the same BufRing but wrapped in Rc<_> so the wrapped buf_ring can be passed to the // outgoing GBuf. @@ -274,7 +139,6 @@ impl InnerBufRing { // as the kernel could use the same buffer for different data concurrently. unsafe fn dropping_bid(&self, bid: Bid) { self.buf_ring_push(bid); - self.buf_ring_sync(); } fn buf_capacity(&self) -> usize { @@ -285,58 +149,39 @@ impl InnerBufRing { self.buf_list[bid as usize].as_ptr() } - fn ring_entries(&self) -> u16 { - self.ring_entries_mask + 1 - } - - fn mask(&self) -> u16 { - self.ring_entries_mask - } - // Push the `bid` buffer to the buf_ring tail. // This test version does not safeguard against a duplicate // `bid` being pushed. fn buf_ring_push(&self, bid: Bid) { - assert!(bid < self.buf_cnt); - - // N.B. The uring buf_ring indexing mechanism calls for the tail values to exceed the - // actual number of ring entries. This allows the uring interface to distinguish between - // empty and full buf_rings. As a result, the ring mask is only applied to the index used - // for computing the ring entry, not to the tail value itself. + assert!((bid as usize) < self.buf_list.len()); - let old_tail = self.local_tail.get(); - self.local_tail.set(old_tail.wrapping_add(1)); - let ring_idx = old_tail & self.mask(); - - let entries = self.ring_start.as_ptr_mut() as *mut BufRingEntry; - let re = unsafe { &mut *entries.add(ring_idx as usize) }; - - re.set_addr(self.stable_ptr(bid) as _); - re.set_len(self.buf_len as _); - re.set_bid(bid); - - // Also note, we have not updated the tail as far as the kernel is concerned. - // That is done with buf_ring_sync. + let buf = &self.buf_list[bid as usize]; + unsafe { + (*self.buf_ring.get()).push( + bid, + std::slice::from_raw_parts_mut(buf.as_ptr().cast_mut().cast(), buf.capacity()), + ); + } } +} - // Make 'local_tail' visible to the kernel. Called after buf_ring_push() has been - // called to fill in new buffers. - fn buf_ring_sync(&self) { +impl Drop for InnerBufRing<'_> { + fn drop(&mut self) { unsafe { - (*self.shared_tail).store(self.local_tail.get(), atomic::Ordering::Release); + std::ptr::read(self).unregister().ok(); } } } #[derive(Clone)] -struct FixedSizeBufRing { +struct FixedSizeBufRing<'a> { // The BufRing is reference counted because each buffer handed out has a reference back to its // buffer group, or in this case, to its buffer ring. - rc: Rc, + rc: Rc>, } -impl FixedSizeBufRing { - fn new(buf_ring: InnerBufRing) -> Self { +impl<'a> FixedSizeBufRing<'a> { + fn new(buf_ring: InnerBufRing<'a>) -> Self { FixedSizeBufRing { rc: Rc::new(buf_ring), } @@ -391,7 +236,7 @@ impl Builder { } // Return a FixedSizeBufRing. - fn build(&self) -> io::Result { + fn build<'a>(&self, submitter: &Submitter<'a>) -> io::Result> { let mut b: Builder = *self; // Two cases where both buf_cnt and ring_entries are set to the max of the two. @@ -416,20 +261,20 @@ impl Builder { // wrap calculation trivial. b.ring_entries = b.ring_entries.next_power_of_two(); - let inner = InnerBufRing::new(b.bgid, b.ring_entries, b.buf_cnt, b.buf_len)?; + let inner = InnerBufRing::new(submitter, b.bgid, b.ring_entries, b.buf_cnt, b.buf_len)?; Ok(FixedSizeBufRing::new(inner)) } } // This tracks a buffer that has been filled in by the kernel, having gotten the memory // from a buffer ring, and returned to userland via a cqe entry. -struct GBuf { - bufgroup: FixedSizeBufRing, +struct GBuf<'a> { + bufgroup: FixedSizeBufRing<'a>, len: usize, bid: Bid, } -impl fmt::Debug for GBuf { +impl fmt::Debug for GBuf<'_> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("GBuf") .field("bgid", &self.bufgroup.rc.bgid()) @@ -440,8 +285,8 @@ impl fmt::Debug for GBuf { } } -impl GBuf { - fn new(bufgroup: FixedSizeBufRing, bid: Bid, len: usize) -> Self { +impl<'a> GBuf<'a> { + fn new(bufgroup: FixedSizeBufRing<'a>, bid: Bid, len: usize) -> Self { assert!(len <= bufgroup.rc.buf_len); Self { bufgroup, len, bid } @@ -478,7 +323,7 @@ impl GBuf { } } -impl Drop for GBuf { +impl Drop for GBuf<'_> { fn drop(&mut self) { // Add the buffer back to the bufgroup, for the kernel to reuse. unsafe { self.bufgroup.rc.dropping_bid(self.bid) }; @@ -488,11 +333,7 @@ impl Drop for GBuf { // Begin of test functions. // Verify register and unregister of a buf_ring. -fn buf_ring_reg_and_unreg(ring: &mut IoUring, _test: &Test) -> io::Result<()> -where - S: squeue::EntryMarker, - C: cqueue::EntryMarker, -{ +fn buf_ring_reg_and_unreg(submitter: &Submitter, _test: &Test) -> io::Result<()> { // Create a BufRing // Register it // Unregister it @@ -503,24 +344,26 @@ where // Try to unnregister it again // Drop it - let buf_ring = Builder::new(777).ring_entries(16).buf_len(4096).build()?; - - buf_ring.rc.register(ring)?; - buf_ring.rc.unregister(ring)?; + let buf_ring = Builder::new(777) + .ring_entries(16) + .buf_len(4096) + .build(submitter)?; - // Register twice. - buf_ring.rc.register(ring)?; - assert!(buf_ring.rc.register(ring).is_err()); - - // Unregister twice. - buf_ring.rc.unregister(ring)?; - assert!(buf_ring.rc.unregister(ring).is_err()); + Rc::try_unwrap(buf_ring.rc) + .unwrap_or_else(|_| unreachable!()) + .unregister()?; Ok(()) } // Write sample to file descriptor. -fn write_text_to_file(ring: &mut IoUring, fd: types::Fd, text: &[u8]) -> io::Result<()> +fn write_text_to_file( + submitter: &Submitter, + sq: &mut SubmissionQueue, + cq: &mut CompletionQueue, + fd: types::Fd, + text: &[u8], +) -> io::Result<()> where S: squeue::EntryMarker, C: cqueue::EntryMarker, @@ -528,17 +371,18 @@ where let write_e = opcode::Write::new(fd, text.as_ptr(), text.len() as _); unsafe { - let mut queue = ring.submission(); let write_e = write_e .build() .user_data(0x01) .flags(squeue::Flags::IO_LINK) .into(); - queue.push(&write_e).expect("queue is full"); + sq.push(&write_e).expect("queue is full"); + sq.sync(); } - assert_eq!(ring.submit_and_wait(1)?, 1); + assert_eq!(submitter.submit_and_wait(1)?, 1); - let cqes: Vec = ring.completion().map(Into::into).collect(); + cq.sync(); + let cqes: Vec = cq.map(Into::into).collect(); assert_eq!(cqes.len(), 1); assert_eq!(cqes[0].user_data(), 0x01); assert_eq!(cqes[0].result(), text.len() as i32); @@ -546,12 +390,14 @@ where } // Read from file descriptor, returning a buffer from the buf_ring. -fn buf_ring_read( - ring: &mut IoUring, - buf_ring: &FixedSizeBufRing, +fn buf_ring_read<'a, S, C>( + submitter: &Submitter, + sq: &mut SubmissionQueue, + cq: &mut CompletionQueue, + buf_ring: &FixedSizeBufRing<'a>, fd: types::Fd, len: u32, -) -> io::Result +) -> io::Result> where S: squeue::EntryMarker, C: cqueue::EntryMarker, @@ -561,20 +407,20 @@ where .buf_group(buf_ring.rc.bgid()); unsafe { - let mut queue = ring.submission(); - queue - .push( - &read_e - .build() - .user_data(0x02) - .flags(squeue::Flags::BUFFER_SELECT) - .into(), - ) - .expect("queue is full"); + sq.push( + &read_e + .build() + .user_data(0x02) + .flags(squeue::Flags::BUFFER_SELECT) + .into(), + ) + .expect("queue is full"); + sq.sync(); } - assert_eq!(ring.submit_and_wait(1)?, 1); + assert_eq!(submitter.submit_and_wait(1)?, 1); - let cqes: Vec = ring.completion().map(Into::into).collect(); + cq.sync(); + let cqes: Vec = cq.map(Into::into).collect(); assert_eq!(cqes.len(), 1); assert_eq!(cqes[0].user_data(), 0x02); @@ -624,33 +470,33 @@ fn buf_ring_play( // and two buffers so the ring starts completely full. // Then register it with the uring interface. + let (submitter, mut sq, mut cq) = ring.split(); + let buf_ring = Builder::new(888) .ring_entries(2) .buf_cnt(2) .buf_len(128) - .build()?; - - buf_ring.rc.register(ring)?; + .build(&submitter)?; // Create a temporary file with a short sample text we will be reading multiple times. let fd = tempfile::tempfile()?; let fd = types::Fd(fd.as_raw_fd()); - write_text_to_file(ring, fd, text)?; + write_text_to_file(&submitter, &mut sq, &mut cq, fd, text)?; // Use the uring buf_ring feature to have two buffers taken from the buf_ring and read into, // from the file, returning the buffer here. The read function is designed to read the same // text each time - not normal, but sufficient for this unit test. - let buf0 = buf_ring_read(ring, &buf_ring, fd, len)?; - let buf1 = buf_ring_read(ring, &buf_ring, fd, len)?; + let buf0 = buf_ring_read(&submitter, &mut sq, &mut cq, &buf_ring, fd, len)?; + let buf1 = buf_ring_read(&submitter, &mut sq, &mut cq, &buf_ring, fd, len)?; normal_check(&buf0, 0); normal_check(&buf1, 1); // Expect next read to fail because the ring started with two buffers and those buffer wrappers // haven't been dropped yet so the ring should be empty. - let res2 = buf_ring_read(ring, &buf_ring, fd, len); + let res2 = buf_ring_read(&submitter, &mut sq, &mut cq, &buf_ring, fd, len); assert_eq!(Some(libc::ENOBUFS), res2.unwrap_err().raw_os_error()); // Drop in reverse order and see that the two are then used in that reverse order by the uring @@ -659,8 +505,8 @@ fn buf_ring_play( std::mem::drop(buf1); std::mem::drop(buf0); - let buf3 = buf_ring_read(ring, &buf_ring, fd, len)?; - let buf4 = buf_ring_read(ring, &buf_ring, fd, len)?; + let buf3 = buf_ring_read(&submitter, &mut sq, &mut cq, &buf_ring, fd, len)?; + let buf4 = buf_ring_read(&submitter, &mut sq, &mut cq, &buf_ring, fd, len)?; normal_check(&buf3, 1); // bid 1 should come back first. normal_check(&buf4, 0); // bid 0 should come back second. @@ -670,13 +516,9 @@ fn buf_ring_play( // Now we loop u16::MAX times to ensure proper behavior when the tail // overflows the bounds of a u16. for _ in 0..=u16::MAX { - let _ = buf_ring_read(ring, &buf_ring, fd, len)?; + let _ = buf_ring_read(&submitter, &mut sq, &mut cq, &buf_ring, fd, len)?; } - // Be nice. In this test, the buf_ring is manually unregistered. - // There is no need to ensure the buffers have been dropped first. - buf_ring.rc.unregister(ring)?; - Ok(()) } @@ -697,7 +539,7 @@ pub fn test_register_buf_ring( println!("test register_buf_ring"); - buf_ring_reg_and_unreg(ring, test)?; + buf_ring_reg_and_unreg(&ring.submitter(), test)?; buf_ring_play(ring, test)?; diff --git a/src/buf_ring.rs b/src/buf_ring.rs new file mode 100644 index 0000000..5d9339b --- /dev/null +++ b/src/buf_ring.rs @@ -0,0 +1,231 @@ +use std::{ + io, + mem::{self, MaybeUninit}, + os::fd::{AsRawFd, RawFd}, + ptr, slice, + sync::atomic::{AtomicU16, Ordering}, +}; + +use crate::{ + register::execute, + sys, + types::BufRingEntry, + util::{cast_ptr, OwnedFd}, +}; + +pub(crate) fn register(fd: RawFd, ring_addr: u64, ring_entries: u16, bgid: u16) -> io::Result<()> { + // The interface type for ring_entries is u32 but the same interface only allows a u16 for + // the tail to be specified, so to try and avoid further confusion, we limit the + // ring_entries to u16 here too. The value is actually limited to 2^15 (32768) but we can + // let the kernel enforce that. + let arg = sys::io_uring_buf_reg { + ring_addr, + ring_entries: ring_entries as _, + bgid, + ..Default::default() + }; + execute( + fd, + sys::IORING_REGISTER_PBUF_RING, + cast_ptr::(&arg).cast(), + 1, + ) + .map(drop) +} + +pub(crate) fn unregister(fd: RawFd, bgid: u16) -> io::Result<()> { + let arg = sys::io_uring_buf_reg { + ring_addr: 0, + ring_entries: 0, + bgid, + ..Default::default() + }; + execute( + fd, + sys::IORING_UNREGISTER_PBUF_RING, + cast_ptr::(&arg).cast(), + 1, + ) + .map(drop) +} + +/// An anonymous region of memory mapped using `mmap(2)`, not backed by a file +/// but that is guaranteed to be page-aligned and zero-filled. +struct AnonymousMmap { + addr: ptr::NonNull, + len: usize, +} + +impl AnonymousMmap { + /// Allocate `len` bytes that are page aligned and zero-filled. + pub fn new(len: usize) -> io::Result { + unsafe { + match libc::mmap( + ptr::null_mut(), + len, + libc::PROT_READ | libc::PROT_WRITE, + libc::MAP_ANONYMOUS | libc::MAP_SHARED | libc::MAP_POPULATE, + -1, + 0, + ) { + libc::MAP_FAILED => Err(io::Error::last_os_error()), + addr => { + // here, `mmap` will never return null + let addr = ptr::NonNull::new_unchecked(addr); + Ok(AnonymousMmap { addr, len }) + } + } + } + } + + /// Do not make the stored memory accessible by child processes after a `fork`. + pub fn dontfork(&self) -> io::Result<()> { + match unsafe { libc::madvise(self.addr.as_ptr(), self.len, libc::MADV_DONTFORK) } { + 0 => Ok(()), + _ => Err(io::Error::last_os_error()), + } + } + + /// Get a pointer to the memory. + #[inline] + pub fn as_ptr(&self) -> *const libc::c_void { + self.addr.as_ptr() + } + + /// Get a mut pointer to the memory. + #[inline] + pub fn as_ptr_mut(&self) -> *mut libc::c_void { + self.addr.as_ptr() + } +} + +impl Drop for AnonymousMmap { + fn drop(&mut self) { + unsafe { + libc::munmap(self.addr.as_ptr(), self.len); + } + } +} + +/// A wrapper for buffer ring. The entries are allocated with `mmap`. +/// +/// This struct doesn't own the buffer. +pub struct BufRing<'a> { + fd: &'a OwnedFd, + entries: mem::ManuallyDrop, + len: u16, + bgid: u16, +} + +impl<'a> BufRing<'a> { + pub(crate) fn new(fd: &'a OwnedFd, len: u16, bgid: u16) -> io::Result { + let entries = AnonymousMmap::new((len as usize) * mem::size_of::())?; + entries.dontfork()?; + register(fd.as_raw_fd(), entries.as_ptr() as _, len, bgid)?; + // SAFETY: no one use the tail at this moment + unsafe { + *BufRingEntry::tail(entries.as_ptr().cast()).cast_mut() = 0; + } + + Ok(Self { + fd, + entries: mem::ManuallyDrop::new(entries), + len, + bgid, + }) + } + + /// Unregister the buffer ring. + /// + /// If it fails to unregister, the inner memory will be leaked. + pub fn unregister(mut self) -> io::Result<()> { + unregister(self.fd.as_raw_fd(), self.bgid)?; + // SAFETY: unregister successfully + unsafe { + mem::ManuallyDrop::drop(&mut self.entries); + } + mem::forget(self); + Ok(()) + } + + /// Get the number of allocated entries. + #[inline] + pub const fn capacity(&self) -> usize { + self.len as _ + } + + /// Get the buffer group id of current ring. + #[inline] + pub const fn bgid(&self) -> u16 { + self.bgid + } + + fn as_slice_uninit_mut(&mut self) -> &mut [MaybeUninit] { + // SAFETY: the pointer is valid + unsafe { slice::from_raw_parts_mut(self.entries.as_ptr_mut().cast(), self.capacity()) } + } + + #[inline] + const fn mask(&self) -> u16 { + self.len - 1 + } + + #[inline] + fn atomic_tail(&self) -> &AtomicU16 { + // Safety: no one read/write tail ptr without atomic operation after init + unsafe { AtomicU16::from_ptr(BufRingEntry::tail(self.entries.as_ptr().cast()).cast_mut()) } + } + + unsafe fn push_inner(&mut self, bid: u16, buf: &mut [MaybeUninit], offset: u16) { + let mask = self.mask(); + let tail = self.atomic_tail(); + let index = ((tail.load(Ordering::Acquire) + offset) & mask) as usize; + + // SAFETY: only write plain data here + let buf_ring_entry = self.as_slice_uninit_mut()[index].assume_init_mut(); + + buf_ring_entry.set_addr(buf.as_mut_ptr() as _); + buf_ring_entry.set_len(buf.len() as _); + buf_ring_entry.set_bid(bid); + } + + unsafe fn advance(&self, count: u16) { + self.atomic_tail().fetch_add(count, Ordering::Release); + } + + /// Attempts to push an buffer entry into the ring. + /// + /// # Safety + /// + /// Developers must ensure that the buffer is valid before the ring is unregistered. + pub unsafe fn push(&mut self, bid: u16, buf: &mut [MaybeUninit]) { + self.push_inner(bid, buf, 0); + self.advance(1); + } + + /// Attempts to push several buffer entries into the ring. + /// + /// # Safety + /// + /// Developers must ensure that the buffers are valid before the ring is unregistered. + pub unsafe fn push_multiple<'b>( + &mut self, + bufs: impl IntoIterator])>, + ) { + let mut len = 0u16; + for (i, (bid, buf)) in bufs.into_iter().enumerate() { + self.push_inner(bid, buf, i as _); + len += 1; + } + self.advance(len); + } +} + +impl Drop for BufRing<'_> { + fn drop(&mut self) { + // SAFETY: ManuallyDrop + unsafe { + ptr::read(self).unregister().ok(); + } + } +} diff --git a/src/lib.rs b/src/lib.rs index 317d61e..2e6dac2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -6,6 +6,7 @@ #[macro_use] mod util; +pub mod buf_ring; pub mod cqueue; pub mod opcode; pub mod register; diff --git a/src/submit.rs b/src/submit.rs index 97589eb..39971e3 100644 --- a/src/submit.rs +++ b/src/submit.rs @@ -3,10 +3,10 @@ use std::sync::atomic; use std::{io, mem, ptr}; use crate::register::{execute, Probe}; -use crate::sys; use crate::types::{CancelBuilder, Timespec}; use crate::util::{cast_ptr, OwnedFd}; use crate::Parameters; +use crate::{buf_ring, sys}; use crate::register::Restriction; @@ -464,42 +464,24 @@ impl<'a> Submitter<'a> { ring_entries: u16, bgid: u16, ) -> io::Result<()> { - // The interface type for ring_entries is u32 but the same interface only allows a u16 for - // the tail to be specified, so to try and avoid further confusion, we limit the - // ring_entries to u16 here too. The value is actually limited to 2^15 (32768) but we can - // let the kernel enforce that. - let arg = sys::io_uring_buf_reg { - ring_addr, - ring_entries: ring_entries as _, - bgid, - ..Default::default() - }; - execute( - self.fd.as_raw_fd(), - sys::IORING_REGISTER_PBUF_RING, - cast_ptr::(&arg).cast(), - 1, - ) - .map(drop) + buf_ring::register(self.fd.as_raw_fd(), ring_addr, ring_entries, bgid) } /// Unregister a previously registered buffer ring. /// /// Available since 5.19. pub fn unregister_buf_ring(&self, bgid: u16) -> io::Result<()> { - let arg = sys::io_uring_buf_reg { - ring_addr: 0, - ring_entries: 0, - bgid, - ..Default::default() - }; - execute( - self.fd.as_raw_fd(), - sys::IORING_UNREGISTER_PBUF_RING, - cast_ptr::(&arg).cast(), - 1, - ) - .map(drop) + buf_ring::unregister(self.fd.as_raw_fd(), bgid) + } + + /// Setup and register a buffer ring. See [`Submitter::register_buf_ring`] for + /// requirements of `ring_entries`. + pub fn setup_buf_ring( + &self, + ring_entries: u16, + bgid: u16, + ) -> io::Result> { + buf_ring::BufRing::new(self.fd, ring_entries, bgid) } /// Performs a synchronous cancellation request, similar to [AsyncCancel](crate::opcode::AsyncCancel),