Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 committed Sep 11, 2024
1 parent 568df1c commit 15503f5
Showing 1 changed file with 68 additions and 30 deletions.
98 changes: 68 additions & 30 deletions ntex-tokio/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,29 @@ impl ntex_io::AsyncRead for Read {
async fn read(&mut self, mut buf: BytesVec) -> (BytesVec, io::Result<usize>) {
// read data from socket
let result = poll_fn(|cx| {
let mut n = 0;
let mut io = self.0.borrow_mut();
poll_read_buf(Pin::new(&mut *io), cx, &mut buf)
loop {
return match poll_read_buf(Pin::new(&mut *io), cx, &mut buf)? {
Poll::Pending => {
if n > 0 {
Poll::Ready(Ok(n))
} else {
Poll::Pending
}
}
Poll::Ready(size) => {
n += size;
if n > 0 && buf.remaining_mut() > 0 {
continue;
}
Poll::Ready(Ok(n))
}
};
}
})
.await;

(buf, result)
}
}
Expand Down Expand Up @@ -85,6 +104,34 @@ impl ntex_io::AsyncWrite for Write {
}
}

pub fn poll_read_buf<T: AsyncRead>(
io: Pin<&mut T>,
cx: &mut Context<'_>,
buf: &mut BytesVec,
) -> Poll<io::Result<usize>> {
let n = {
let dst =
unsafe { &mut *(buf.chunk_mut() as *mut _ as *mut [mem::MaybeUninit<u8>]) };
let mut buf = ReadBuf::uninit(dst);
let ptr = buf.filled().as_ptr();
if io.poll_read(cx, &mut buf)?.is_pending() {
return Poll::Pending;
}

// Ensure the pointer does not change from under us
assert_eq!(ptr, buf.filled().as_ptr());
buf.filled().len()
};

// Safety: This is guaranteed to be the number of initialized (and read)
// bytes due to the invariants provided by `ReadBuf::filled`.
unsafe {
buf.advance_mut(n);
}

Poll::Ready(Ok(n))
}

/// Flush write buffer to underlying I/O stream.
pub(super) fn flush_io<T: AsyncRead + AsyncWrite + Unpin>(
io: &mut T,
Expand Down Expand Up @@ -254,10 +301,29 @@ mod unixstream {
async fn read(&mut self, mut buf: BytesVec) -> (BytesVec, io::Result<usize>) {
// read data from socket
let result = poll_fn(|cx| {
let mut n = 0;
let mut io = self.0.borrow_mut();
poll_read_buf(Pin::new(&mut *io), cx, &mut buf)
loop {
return match poll_read_buf(Pin::new(&mut *io), cx, &mut buf)? {
Poll::Pending => {
if n > 0 {
Poll::Ready(Ok(n))
} else {
Poll::Pending
}
}
Poll::Ready(size) => {
n += size;
if n > 0 && buf.remaining_mut() > 0 {
continue;
}
Poll::Ready(Ok(n))
}
};
}
})
.await;

(buf, result)
}
}
Expand Down Expand Up @@ -290,31 +356,3 @@ mod unixstream {
}
}
}

pub fn poll_read_buf<T: AsyncRead>(
io: Pin<&mut T>,
cx: &mut Context<'_>,
buf: &mut BytesVec,
) -> Poll<io::Result<usize>> {
let n = {
let dst =
unsafe { &mut *(buf.chunk_mut() as *mut _ as *mut [mem::MaybeUninit<u8>]) };
let mut buf = ReadBuf::uninit(dst);
let ptr = buf.filled().as_ptr();
if io.poll_read(cx, &mut buf)?.is_pending() {
return Poll::Pending;
}

// Ensure the pointer does not change from under us
assert_eq!(ptr, buf.filled().as_ptr());
buf.filled().len()
};

// Safety: This is guaranteed to be the number of initialized (and read)
// bytes due to the invariants provided by `ReadBuf::filled`.
unsafe {
buf.advance_mut(n);
}

Poll::Ready(Ok(n))
}

0 comments on commit 15503f5

Please sign in to comment.