Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[catnap] Allow chained DemiBuffers buffers for push and pop #1140

Draft
wants to merge 1 commit into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 53 additions & 12 deletions src/rust/catnap/win/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,19 @@ const SOCKADDR_BUF_SIZE: usize = std::mem::size_of::<SOCKADDR_STORAGE>() + 16;
// AcceptEx buffer returns two addresses.
const ACCEPT_BUFFER_LEN: usize = (SOCKADDR_BUF_SIZE) * 2;

//======================================================================================================================
// Enums
//======================================================================================================================

/// Helper structure for converting between DemiBuffers and WSABUF slices.
enum WsaBufs {
/// Only one buffer present.
One(WSABUF),

/// More than one buffer in a chain. Buffers are sequential for vectored I/O.
Many(Vec<WSABUF>),
}

//======================================================================================================================
// Structures
//======================================================================================================================
Expand Down Expand Up @@ -129,6 +142,40 @@ pub struct PopState {
// Associated Functions
//======================================================================================================================

impl WsaBufs {
/// Convert a single DemiBuffer into a WSABUF.
fn convert_buf(buf: &mut DemiBuffer) -> WSABUF {
WSABUF {
len: buf.len() as u32,
buf: PSTR::from_raw(buf.as_mut_ptr()),
}
}

/// Map one or more DemiBuffers in a chain into a WsaBufs enum.
pub fn new(buf: &mut DemiBuffer) -> WsaBufs {
if buf.is_multi_segment() {
let result: Vec<WSABUF> = std::iter::once(Self::convert_buf(buf))
.chain(
std::iter::successors(buf.next(), DemiBuffer::next)
.map(|mut buf: DemiBuffer| Self::convert_buf(&mut buf)),
)
.collect::<Vec<WSABUF>>();

WsaBufs::Many(result)
} else {
WsaBufs::One(Self::convert_buf(buf))
}
}

/// Translate the WsaBufs enum into a slice of WSABUFs usable by the windows API.
pub fn as_slice(&self) -> &[WSABUF] {
match self {
WsaBufs::One(buf) => std::slice::from_ref(buf),
WsaBufs::Many(vec) => vec.as_slice(),
}
}
}

impl AcceptState {
/// Create a new, empty `AcceptState``.
pub fn new() -> Self {
Expand Down Expand Up @@ -453,17 +500,14 @@ impl Socket {
let mut bytes_transferred: u32 = 0;
let mut flags: u32 = 0;
let success: bool = unsafe {
let wsa_buffer: WSABUF = WSABUF {
len: pop_state.buffer.len() as u32,
// Safety: loading the buffer pointer won't violate pinning invariants.
buf: PSTR::from_raw(pop_state.as_mut().get_unchecked_mut().buffer.as_mut_ptr()),
};
// Safety: loading the buffer pointer won't violate pinning invariants.
let wsa_bufs: WsaBufs = WsaBufs::new(&mut pop_state.as_mut().get_unchecked_mut().buffer);

// NB winsock service providers are required to capture the entire WSABUF array inline with the call, so
// wsa_buffer and the derivative slice can safely drop after the call.
let result: i32 = WSARecvFrom(
self.s,
std::slice::from_ref(&wsa_buffer),
wsa_bufs.as_slice(),
Some(&mut bytes_transferred),
&mut flags,
Some(pop_state.as_mut().get_unchecked_mut().address.as_mut_ptr() as *mut SOCKADDR),
Expand Down Expand Up @@ -521,11 +565,8 @@ impl Socket {
) -> Result<(), Fail> {
let mut bytes_transferred: u32 = 0;
let success: bool = unsafe {
let wsa_buffer: WSABUF = WSABUF {
len: buffer.len() as u32,
// Safety: loading the buffer pointer won't violate pinning invariants.
buf: PSTR::from_raw(buffer.get_unchecked_mut().as_mut_ptr()),
};
// Safety: loading the buffer pointer won't violate pinning invariants.
let wsa_bufs: WsaBufs = WsaBufs::new(buffer.get_unchecked_mut());

let addr: Option<socket2::SockAddr> = addr.map(socket2::SockAddr::from);

Expand All @@ -535,7 +576,7 @@ impl Socket {
// functions equivalently to WSASend.
let result: i32 = WSASendTo(
self.s,
std::slice::from_ref(&wsa_buffer),
wsa_bufs.as_slice(),
Some(&mut bytes_transferred),
0,
addr.as_ref()
Expand Down
54 changes: 47 additions & 7 deletions src/rust/runtime/memory/demibuffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ impl MetaData {
while md.next.is_some() {
// Safety: The call to as_mut is safe, as the pointer is aligned and dereferenceable, and the MetaData
// struct it points to is initialized properly.
md = unsafe { md.next.unwrap().as_mut() };
md = unsafe { md.next.as_mut().unwrap().as_mut() };
}
&mut *md
}
Expand Down Expand Up @@ -251,7 +251,7 @@ impl DemiBuffer {

// Implementation Note:
// This function is replacing the new() function of DataBuffer, which could return failure. However, the only
// failure it actually reported was if the new DataBuffer request was for zero size. A seperate empty() function
// failure it actually reported was if the new DataBuffer request was for zero size. A separate empty() function
// was provided to allocate zero-size buffers. This new implementation does not have a special case for this,
// instead, zero is a valid argument to new(). So we no longer need the failure return case of this function.
//
Expand Down Expand Up @@ -292,7 +292,7 @@ impl DemiBuffer {
}

// Embed the buffer type into the lower bits of the pointer.
let tagged: NonNull<MetaData> = temp.with_addr(temp.addr() | Tag::Heap);
let tagged: NonNull<MetaData> = tag_ptr(temp, Tag::Heap);

// Return the new DemiBuffer.
DemiBuffer {
Expand Down Expand Up @@ -324,7 +324,7 @@ impl DemiBuffer {
pub unsafe fn from_mbuf(mbuf_ptr: *mut rte_mbuf) -> Self {
// Convert the raw pointer into a NonNull and add a tag indicating it is a DPDK buffer (i.e. a MBuf).
let temp: NonNull<MetaData> = NonNull::new_unchecked(mbuf_ptr as *mut _);
let tagged: NonNull<MetaData> = temp.with_addr(temp.addr() | Tag::Dpdk);
let tagged: NonNull<MetaData> = tag_ptr(temp, Tag::Dpdk);

DemiBuffer {
tagged_ptr: tagged,
Expand Down Expand Up @@ -353,6 +353,41 @@ impl DemiBuffer {
self.as_metadata().data_len as usize
}

/// Get the next buffer in the chain, if any.
pub fn next(&self) -> Option<DemiBuffer> {
match self.get_tag() {
Tag::Heap => match self.as_metadata().next {
Some(metadata) => {
let next: DemiBuffer = Self {
tagged_ptr: tag_ptr(metadata, self.get_tag()),
_phantom: PhantomData,
};
next.as_metadata().inc_refcnt();
Some(next)
},

None => None,
},

#[cfg(feature = "libdpdk")]
Tag::Dpdk => {
let mbuf: *mut rte_mbuf = self.as_mbuf();
let next_mbuf: *mut rte_mbuf = unsafe {
// Safety: The `mbuf` dereference below is safe, as it is aligned and dereferenceable.
(*mbuf).next
};

if next_mbuf == std::ptr::null_mut() {
None
} else {
let result: DemiBuffer = DemiBuffer::from_mbuf(next_mbuf);
result.as_metadata().inc_refcnt();
Some(result)
}
},
}
}

/// Removes `nbytes` bytes from the beginning of the `DemiBuffer` chain.
// Note: If `nbytes` is greater than the length of the first segment in the chain, then this function will fail and
// return an error, rather than remove the remaining bytes from subsequent segments in the chain. This is to match
Expand Down Expand Up @@ -630,7 +665,7 @@ impl DemiBuffer {
///
/// If the target [DemiBuffer] has multiple segments, `true` is returned. Otherwise, `false` is returned instead.
///
fn is_multi_segment(&self) -> bool {
pub fn is_multi_segment(&self) -> bool {
match self.get_tag() {
Tag::Heap => {
let md_front: &MetaData = self.as_metadata();
Expand All @@ -650,6 +685,11 @@ impl DemiBuffer {
// Helper Functions
// ----------------

// Add a tag to a MetaData address.
fn tag_ptr(metadata: NonNull<MetaData>, tag: Tag) -> NonNull<MetaData> {
metadata.with_addr(metadata.addr() | tag)
}

// Allocates the MetaData (plus the space for any directly attached data) for a new heap-allocated DemiBuffer.
fn allocate_metadata_data(direct_data_size: u16) -> NonNull<MetaData> {
// We need space for the MetaData struct, plus any extra memory for directly attached data.
Expand Down Expand Up @@ -793,7 +833,7 @@ impl Clone for DemiBuffer {
}

// Embed the buffer type into the lower bits of the pointer.
let tagged: NonNull<MetaData> = head.with_addr(head.addr() | Tag::Heap);
let tagged: NonNull<MetaData> = tag_ptr(head, Tag::Heap);

// Return the new DemiBuffer.
DemiBuffer {
Expand Down Expand Up @@ -983,7 +1023,7 @@ impl TryFrom<&[u8]> for DemiBuffer {
}

// Embed the buffer type into the lower bits of the pointer.
let tagged: NonNull<MetaData> = temp.with_addr(temp.addr() | Tag::Heap);
let tagged: NonNull<MetaData> = tag_ptr(temp, Tag::Heap);

// Return the new DemiBuffer.
Ok(DemiBuffer {
Expand Down