Skip to content

Commit

Permalink
Async write support
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 committed Sep 10, 2024
1 parent f279962 commit a940756
Show file tree
Hide file tree
Showing 15 changed files with 271 additions and 368 deletions.
40 changes: 27 additions & 13 deletions ntex-async-std/src/io.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use std::{any, cell::RefCell, future::poll_fn, io, pin::Pin, task::Context, task::Poll};
use std::{
any, cell::RefCell, future::poll_fn, io, pin::Pin, task::ready, task::Context,
task::Poll,
};

use async_std::io::{Read as ARead, Write as AWrite};
use ntex_bytes::{Buf, BufMut, BytesVec};
use ntex_io::{types, Handle, IoStream, ReadContext, WriteContext};
use ntex_util::{future::lazy, ready};
use ntex_io::{types, Handle, IoStream, ReadContext, WriteContext, WriteContextBuf};

use crate::TcpStream;

Expand Down Expand Up @@ -51,11 +53,17 @@ struct Write(RefCell<TcpStream>);

impl ntex_io::AsyncWrite for Write {
#[inline]
async fn write(&mut self, mut buf: BytesVec) -> (BytesVec, io::Result<()>) {
match lazy(|cx| flush_io(&mut self.0.borrow_mut().0, &mut buf, cx)).await {
Poll::Ready(res) => (buf, res),
Poll::Pending => (buf, Ok(())),
}
async fn write(&mut self, buf: &mut WriteContextBuf) -> io::Result<()> {
poll_fn(|cx| {
if let Some(mut b) = buf.take() {
let result = flush_io(&mut self.0.borrow_mut().0, &mut b, cx);
buf.set(b);
result
} else {
Poll::Ready(Ok(()))
}
})
.await
}

#[inline]
Expand Down Expand Up @@ -186,11 +194,17 @@ mod unixstream {

impl ntex_io::AsyncWrite for Write {
#[inline]
async fn write(&mut self, mut buf: BytesVec) -> (BytesVec, io::Result<()>) {
match lazy(|cx| flush_io(&mut self.0.borrow_mut().0, &mut buf, cx)).await {
Poll::Ready(res) => (buf, res),
Poll::Pending => (buf, Ok(())),
}
async fn write(&mut self, buf: &mut WriteContextBuf) -> io::Result<()> {
poll_fn(|cx| {
if let Some(mut b) = buf.take() {
let result = flush_io(&mut self.0.borrow_mut().0, &mut b, cx);
buf.set(b);
result
} else {
Poll::Ready(Ok(()))
}
})
.await
}

#[inline]
Expand Down
4 changes: 4 additions & 0 deletions ntex-compio/CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [0.1.2] - 2024-09-11

* Use new io api

## [0.1.1] - 2024-09-05

* Tune write task
Expand Down
102 changes: 48 additions & 54 deletions ntex-compio/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,14 @@ use compio::buf::{BufResult, IoBuf, IoBufMut, SetBufInit};
use compio::io::{AsyncRead, AsyncWrite};
use compio::net::TcpStream;
use ntex_bytes::{Buf, BufMut, BytesVec};
use ntex_io::{types, Handle, IoStream, ReadContext, WriteContext};
use ntex_io::{types, Handle, IoStream, ReadContext, WriteContext, WriteContextBuf};

impl IoStream for crate::TcpStream {
fn start(self, read: ReadContext, write: WriteContext) -> Option<Box<dyn Handle>> {
let io = self.0.clone();
compio::runtime::spawn(async move {
let mut wr_io = WriteIo(io.clone());
let wr_task = compio::runtime::spawn(async move {
write.handle(&mut wr_io).await;
log::debug!("{} Write task is stopped", write.tag());
});
let mut io = ReadIo(io);

read.handle(&mut io).await;
log::debug!("{} Read task is stopped", read.tag());

if !wr_task.is_finished() {
let _ = wr_task.await;
}

match io.0.close().await {
run(io.clone(), &read, write).await;
match io.close().await {
Ok(_) => log::debug!("{} Stream is closed", read.tag()),
Err(e) => log::error!("{} Stream is closed, {:?}", read.tag(), e),
}
Expand All @@ -38,24 +25,9 @@ impl IoStream for crate::TcpStream {
#[cfg(unix)]
impl IoStream for crate::UnixStream {
fn start(self, read: ReadContext, write: WriteContext) -> Option<Box<dyn Handle>> {
let io = self.0;
compio::runtime::spawn(async move {
let mut wr_io = WriteIo(io.clone());
let wr_task = compio::runtime::spawn(async move {
write.handle(&mut wr_io).await;
log::debug!("{} Write task is stopped", write.tag());
});

let mut io = ReadIo(io);

read.handle(&mut io).await;
log::debug!("{} Read task is stopped", read.tag());

if !wr_task.is_finished() {
let _ = wr_task.await;
}

match io.0.close().await {
run(self.0.clone(), &read, write).await;
match self.0.close().await {
Ok(_) => log::debug!("{} Unix stream is closed", read.tag()),
Err(e) => log::error!("{} Unix stream is closed, {:?}", read.tag(), e),
}
Expand Down Expand Up @@ -110,6 +82,26 @@ impl SetBufInit for CompioBuf {
}
}

async fn run<T: AsyncRead + AsyncWrite + Clone + 'static>(
io: T,
read: &ReadContext,
write: WriteContext,
) {
let mut wr_io = WriteIo(io.clone());
let wr_task = compio::runtime::spawn(async move {
write.handle(&mut wr_io).await;
log::debug!("{} Write task is stopped", write.tag());
});
let mut io = ReadIo(io);

read.handle(&mut io).await;
log::debug!("{} Read task is stopped", read.tag());

if !wr_task.is_finished() {
let _ = wr_task.await;
}
}

struct ReadIo<T>(T);

impl<T> ntex_io::AsyncRead for ReadIo<T>
Expand All @@ -130,31 +122,33 @@ where
T: AsyncWrite,
{
#[inline]
async fn write(&mut self, buf: BytesVec) -> (BytesVec, io::Result<()>) {
let mut buf = CompioBuf(buf);
loop {
let BufResult(result, buf1) = self.0.write(buf).await;
buf = buf1;

return match result {
Ok(0) => (
buf.0,
Err(io::Error::new(
async fn write(&mut self, wbuf: &mut WriteContextBuf) -> io::Result<()> {
if let Some(b) = wbuf.take() {
let mut buf = CompioBuf(b);
loop {
let BufResult(result, buf1) = self.0.write(buf).await;
buf = buf1;

let result = match result {
Ok(0) => Err(io::Error::new(
io::ErrorKind::WriteZero,
"failed to write frame to transport",
)),
),
Ok(size) => {
buf.0.advance(size);

if buf.0.is_empty() {
(buf.0, Ok(()))
} else {
continue;
Ok(size) => {
buf.0.advance(size);
if buf.0.is_empty() {
Ok(())
} else {
continue;
}
}
}
Err(e) => (buf.0, Err(e)),
};
Err(e) => Err(e),
};
wbuf.set(buf.0);
return result;
}
} else {
Ok(())
}
}

Expand Down
107 changes: 58 additions & 49 deletions ntex-glommio/src/io.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
use std::task::{Context, Poll};
use std::{any, future::poll_fn, future::Future, io, pin::Pin};
use std::{any, future::poll_fn, io, pin::Pin, task::ready, task::Context, task::Poll};

use futures_lite::future::FutureExt;
use futures_lite::io::{AsyncRead, AsyncWrite};
use ntex_bytes::{Buf, BufMut, BytesVec};
use ntex_io::{types, Handle, IoStream, ReadContext, WriteContext, WriteStatus};
use ntex_util::{ready, time::sleep, time::Sleep};
use ntex_io::{types, Handle, IoStream, ReadContext, WriteContext, WriteContextBuf};

use crate::net_impl::{TcpStream, UnixStream};

Expand Down Expand Up @@ -62,11 +59,59 @@ struct Write(TcpStream);

impl ntex_io::AsyncWrite for Write {
#[inline]
async fn write(&mut self, mut buf: BytesVec) -> (BytesVec, io::Result<()>) {
match lazy(|cx| flush_io(&mut *self.0.borrow_mut(), &mut buf, cx)).await {
Poll::Ready(res) => (buf, res),
Poll::Pending => (buf, Ok(())),
}
async fn write(&mut self, buf: &mut WriteContextBuf) -> io::Result<()> {
poll_fn(|cx| {
if let Some(mut b) = buf.take() {
let result = flush_io(&mut *self.0 .0.borrow_mut(), &mut b, cx);
buf.set(b);
result
} else {
Poll::Ready(Ok(()))
}
})
.await
}

#[inline]
async fn flush(&mut self) -> io::Result<()> {
Ok(())
}

#[inline]
async fn shutdown(&mut self) -> io::Result<()> {
poll_fn(|cx| Pin::new(&mut *self.0 .0.borrow_mut()).poll_close(cx)).await
}
}

struct UnixRead(UnixStream);

impl ntex_io::AsyncRead for UnixRead {
async fn read(&mut self, mut buf: BytesVec) -> (BytesVec, io::Result<usize>) {
// read data from socket
let result = poll_fn(|cx| {
let mut io = self.0 .0.borrow_mut();
poll_read_buf(Pin::new(&mut *io), cx, &mut buf)
})
.await;
(buf, result)
}
}

struct UnixWrite(UnixStream);

impl ntex_io::AsyncWrite for UnixWrite {
#[inline]
async fn write(&mut self, buf: &mut WriteContextBuf) -> io::Result<()> {
poll_fn(|cx| {
if let Some(mut b) = buf.take() {
let result = flush_io(&mut *self.0 .0.borrow_mut(), &mut b, cx);
buf.set(b);
result
} else {
Poll::Ready(Ok(()))
}
})
.await
}

#[inline]
Expand All @@ -76,7 +121,7 @@ impl ntex_io::AsyncWrite for Write {

#[inline]
async fn shutdown(&mut self) -> io::Result<()> {
poll_fn(|cx| Pin::new(&mut *self.0.borrow_mut()).poll_close(cx)).await
poll_fn(|cx| Pin::new(&mut *self.0 .0.borrow_mut()).poll_close(cx)).await
}
}

Expand Down Expand Up @@ -125,7 +170,7 @@ pub(super) fn flush_io<T: AsyncRead + AsyncWrite + Unpin>(
// log::trace!("flushed {} bytes", written);

// flush
return if written > 0 {
if written > 0 {
match Pin::new(&mut *io).poll_flush(cx) {
Poll::Ready(Ok(_)) => result,
Poll::Pending => Poll::Pending,
Expand All @@ -136,7 +181,7 @@ pub(super) fn flush_io<T: AsyncRead + AsyncWrite + Unpin>(
}
} else {
result
};
}
} else {
Poll::Ready(Ok(()))
}
Expand All @@ -158,39 +203,3 @@ pub fn poll_read_buf<T: AsyncRead>(

Poll::Ready(Ok(n))
}

struct UnixRead(UnixStream);

impl ntex_io::AsyncRead for UnixRead {
async fn read(&mut self, mut buf: BytesVec) -> (BytesVec, io::Result<usize>) {
// read data from socket
let result = poll_fn(|cx| {
let mut io = self.0 .0.borrow_mut();
poll_read_buf(Pin::new(&mut *io), cx, &mut buf)
})
.await;
(buf, result)
}
}

struct UnixWrite(UnixStream);

impl ntex_io::AsyncWrite for UnixWrite {
#[inline]
async fn write(&mut self, mut buf: BytesVec) -> (BytesVec, io::Result<()>) {
match lazy(|cx| flush_io(&mut *self.0.borrow_mut(), &mut buf, cx)).await {
Poll::Ready(res) => (buf, res),
Poll::Pending => (buf, Ok(())),
}
}

#[inline]
async fn flush(&mut self) -> io::Result<()> {
Ok(())
}

#[inline]
async fn shutdown(&mut self) -> io::Result<()> {
poll_fn(|cx| Pin::new(&mut *self.0.borrow_mut()).poll_close(cx)).await
}
}
2 changes: 1 addition & 1 deletion ntex-io/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ impl Filter for Base {
fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<WriteStatus> {
let mut flags = self.0.flags();

if flags.contains(Flags::IO_STOPPED) {
if flags.is_stopped() {
Poll::Ready(WriteStatus::Terminate)
} else {
self.0 .0.write_task.register(cx.waker());
Expand Down
4 changes: 4 additions & 0 deletions ntex-io/src/flags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ bitflags::bitflags! {
}

impl Flags {
pub(crate) fn is_stopped(&self) -> bool {
self.intersects(Flags::IO_STOPPED)
}

pub(crate) fn is_waiting_for_write(&self) -> bool {
self.intersects(Flags::BUF_W_MUST_FLUSH | Flags::BUF_W_BACKPRESSURE)
}
Expand Down
Loading

0 comments on commit a940756

Please sign in to comment.