From f27996273c0af2939d596b6ac3c17e72cca72174 Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Tue, 10 Sep 2024 19:06:11 +0500 Subject: [PATCH] wip --- ntex-async-std/src/io.rs | 464 +++++++-------------------------- ntex-compio/src/io.rs | 137 ++++------ ntex-glommio/src/io.rs | 480 +++++++--------------------------- ntex-io/src/buf.rs | 32 +-- ntex-io/src/ioref.rs | 4 - ntex-io/src/lib.rs | 18 +- ntex-io/src/tasks.rs | 378 +++++++++++---------------- ntex-io/src/testing.rs | 270 ++++++-------------- ntex-tokio/src/io.rs | 537 +++++++-------------------------------- 9 files changed, 561 insertions(+), 1759 deletions(-) diff --git a/ntex-async-std/src/io.rs b/ntex-async-std/src/io.rs index 4fb4c66d8..6ef23b0f1 100644 --- a/ntex-async-std/src/io.rs +++ b/ntex-async-std/src/io.rs @@ -1,20 +1,22 @@ -use std::future::{poll_fn, Future}; -use std::{any, cell::RefCell, io, pin::Pin, task::Context, task::Poll}; +use std::{any, cell::RefCell, future::poll_fn, io, pin::Pin, task::Context, task::Poll}; -use async_std::io::{Read, Write}; +use async_std::io::{Read as ARead, Write as AWrite}; use ntex_bytes::{Buf, BufMut, BytesVec}; -use ntex_io::{types, Handle, IoStream, ReadContext, WriteContext, WriteStatus}; -use ntex_util::{ready, time::sleep, time::Sleep}; +use ntex_io::{types, Handle, IoStream, ReadContext, WriteContext}; +use ntex_util::{future::lazy, ready}; use crate::TcpStream; impl IoStream for TcpStream { fn start(self, read: ReadContext, write: WriteContext) -> Option> { - let mut rio = ReadTask(RefCell::new(self.clone())); + let mut rio = Read(RefCell::new(self.clone())); async_std::task::spawn_local(async move { read.handle(&mut rio).await; }); - async_std::task::spawn_local(WriteTask::new(self.clone(), write)); + let mut wio = Write(RefCell::new(self.clone())); + async_std::task::spawn_local(async move { + write.handle(&mut wio).await; + }); Some(Box::new(self)) } } @@ -31,9 +33,9 @@ impl Handle for TcpStream { } /// Read io task -struct ReadTask(RefCell); +struct Read(RefCell); -impl ntex_io::AsyncRead for ReadTask { +impl ntex_io::AsyncRead for Read { async fn read(&mut self, mut buf: BytesVec) -> (BytesVec, io::Result) { // read data from socket let result = poll_fn(|cx| { @@ -45,235 +47,91 @@ impl ntex_io::AsyncRead for ReadTask { } } -#[derive(Debug)] -enum IoWriteState { - Processing(Option), - Shutdown(Sleep, Shutdown), -} - -#[derive(Debug)] -enum Shutdown { - None, - Stopping(u16), -} +struct Write(RefCell); -/// Write io task -struct WriteTask { - st: IoWriteState, - io: TcpStream, - state: WriteContext, -} - -impl WriteTask { - /// Create new write io task - fn new(io: TcpStream, state: WriteContext) -> Self { - Self { - io, - state, - st: IoWriteState::Processing(None), +impl ntex_io::AsyncWrite for Write { + #[inline] + async fn write(&mut self, mut buf: BytesVec) -> (BytesVec, io::Result<()>) { + match lazy(|cx| flush_io(&mut self.0.borrow_mut().0, &mut buf, cx)).await { + Poll::Ready(res) => (buf, res), + Poll::Pending => (buf, Ok(())), } } -} - -impl Future for WriteTask { - type Output = (); - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.as_mut().get_mut(); - - match this.st { - IoWriteState::Processing(ref mut delay) => { - match this.state.poll_ready(cx) { - Poll::Ready(WriteStatus::Ready) => { - if let Some(delay) = delay { - if delay.poll_elapsed(cx).is_ready() { - this.state.close(Some(io::Error::new( - io::ErrorKind::TimedOut, - "Operation timedout", - ))); - return Poll::Ready(()); - } - } - - // flush io stream - let io = &mut this.io.0; - match ready!(this.state.with_buf(|buf| flush_io(io, buf, cx))) { - Ok(()) => Poll::Pending, - Err(e) => { - this.state.close(Some(e)); - Poll::Ready(()) - } - } - } - Poll::Ready(WriteStatus::Timeout(time)) => { - log::trace!("initiate timeout delay for {:?}", time); - if delay.is_none() { - *delay = Some(sleep(time)); - } - self.poll(cx) - } - Poll::Ready(WriteStatus::Shutdown(time)) => { - log::trace!("write task is instructed to shutdown"); - let timeout = if let Some(delay) = delay.take() { - delay - } else { - sleep(time) - }; - - this.st = IoWriteState::Shutdown(timeout, Shutdown::None); - self.poll(cx) - } - Poll::Ready(WriteStatus::Terminate) => { - log::trace!("write task is instructed to terminate"); - - let _ = Pin::new(&mut this.io.0).poll_close(cx); - this.state.close(None); - Poll::Ready(()) - } - Poll::Pending => Poll::Pending, - } - } - IoWriteState::Shutdown(ref mut delay, ref mut st) => { - // close WRITE side and wait for disconnect on read side. - // use disconnect timeout, otherwise it could hang forever. - loop { - match st { - Shutdown::None => { - // flush write buffer - let io = &mut this.io.0; - match this.state.with_buf(|buf| flush_io(io, buf, cx)) { - Poll::Ready(Ok(())) => { - if let Err(e) = - this.io.0.shutdown(std::net::Shutdown::Write) - { - this.state.close(Some(e)); - return Poll::Ready(()); - } - *st = Shutdown::Stopping(0); - continue; - } - Poll::Ready(Err(err)) => { - log::trace!( - "write task is closed with err during flush, {:?}", - err - ); - this.state.close(Some(err)); - return Poll::Ready(()); - } - Poll::Pending => (), - } - } - Shutdown::Stopping(ref mut count) => { - // read until 0 or err - let mut buf = [0u8; 512]; - let io = &mut this.io; - loop { - match Pin::new(&mut io.0).poll_read(cx, &mut buf) { - Poll::Ready(Err(e)) => { - log::trace!("write task is stopped"); - this.state.close(Some(e)); - return Poll::Ready(()); - } - Poll::Ready(Ok(0)) => { - log::trace!("async-std socket is disconnected"); - this.state.close(None); - return Poll::Ready(()); - } - Poll::Ready(Ok(n)) => { - *count += n as u16; - if *count > 4096 { - log::trace!( - "write task is stopped, too much input" - ); - this.state.close(None); - return Poll::Ready(()); - } - } - Poll::Pending => break, - } - } - } - } + #[inline] + async fn flush(&mut self) -> io::Result<()> { + Ok(()) + } - // disconnect timeout - if delay.poll_elapsed(cx).is_pending() { - return Poll::Pending; - } - log::trace!("write task is stopped after delay"); - this.state.close(None); - let _ = Pin::new(&mut this.io.0).poll_close(cx); - return Poll::Ready(()); - } - } - } + #[inline] + async fn shutdown(&mut self) -> io::Result<()> { + self.0.borrow().0.shutdown(std::net::Shutdown::Both) } } /// Flush write buffer to underlying I/O stream. -pub(super) fn flush_io( +pub(super) fn flush_io( io: &mut T, - buf: &mut Option, + buf: &mut BytesVec, cx: &mut Context<'_>, ) -> Poll> { - if let Some(buf) = buf { - let len = buf.len(); - - if len != 0 { - // log::trace!("flushing framed transport: {:?}", buf.len()); - - let mut written = 0; - let result = loop { - break match Pin::new(&mut *io).poll_write(cx, &buf[written..]) { - Poll::Ready(Ok(n)) => { - if n == 0 { - log::trace!("Disconnected during flush, written {}", written); - Poll::Ready(Err(io::Error::new( - io::ErrorKind::WriteZero, - "failed to write frame to transport", - ))) + let len = buf.len(); + + if len != 0 { + // log::trace!("flushing framed transport: {:?}", buf.len()); + + let mut written = 0; + let result = loop { + break match Pin::new(&mut *io).poll_write(cx, &buf[written..]) { + Poll::Ready(Ok(n)) => { + if n == 0 { + log::trace!("Disconnected during flush, written {}", written); + Poll::Ready(Err(io::Error::new( + io::ErrorKind::WriteZero, + "failed to write frame to transport", + ))) + } else { + written += n; + if written == len { + buf.clear(); + Poll::Ready(Ok(())) } else { - written += n; - if written == len { - buf.clear(); - Poll::Ready(Ok(())) - } else { - continue; - } + continue; } } - Poll::Pending => { - // remove written data - buf.advance(written); - Poll::Pending - } - Poll::Ready(Err(e)) => { - log::trace!("Error during flush: {}", e); - Poll::Ready(Err(e)) - } - }; - }; - // log::trace!("flushed {} bytes", written); - - // flush - return if written > 0 { - match Pin::new(&mut *io).poll_flush(cx) { - Poll::Ready(Ok(_)) => result, - Poll::Pending => Poll::Pending, - Poll::Ready(Err(e)) => { - log::trace!("error during flush: {}", e); - Poll::Ready(Err(e)) - } } - } else { - result + Poll::Pending => { + // remove written data + buf.advance(written); + Poll::Pending + } + Poll::Ready(Err(e)) => { + log::trace!("Error during flush: {}", e); + Poll::Ready(Err(e)) + } }; + }; + // log::trace!("flushed {} bytes", written); + + // flush + if written > 0 { + match Pin::new(&mut *io).poll_flush(cx) { + Poll::Ready(Ok(_)) => result, + Poll::Pending => Poll::Pending, + Poll::Ready(Err(e)) => { + log::trace!("error during flush: {}", e); + Poll::Ready(Err(e)) + } + } + } else { + result } + } else { + Poll::Ready(Ok(())) } - Poll::Ready(Ok(())) } -pub fn poll_read_buf( +pub fn poll_read_buf( io: Pin<&mut T>, cx: &mut Context<'_>, buf: &mut BytesVec, @@ -297,19 +155,22 @@ mod unixstream { impl IoStream for UnixStream { fn start(self, read: ReadContext, write: WriteContext) -> Option> { - let mut rio = ReadTask(RefCell::new(self.clone())); + let mut rio = Read(RefCell::new(self.clone())); async_std::task::spawn_local(async move { read.handle(&mut rio).await; }); - async_std::task::spawn_local(WriteTask::new(self, write)); + let mut wio = Write(RefCell::new(self)); + async_std::task::spawn_local(async move { + write.handle(&mut wio).await; + }); None } } /// Read io task - struct ReadTask(RefCell); + struct Read(RefCell); - impl ntex_io::AsyncRead for ReadTask { + impl ntex_io::AsyncRead for Read { async fn read(&mut self, mut buf: BytesVec) -> (BytesVec, io::Result) { // read data from socket let result = poll_fn(|cx| { @@ -321,158 +182,25 @@ mod unixstream { } } - /// Write io task - struct WriteTask { - st: IoWriteState, - io: UnixStream, - state: WriteContext, - } + struct Write(RefCell); - impl WriteTask { - /// Create new write io task - fn new(io: UnixStream, state: WriteContext) -> Self { - Self { - io, - state, - st: IoWriteState::Processing(None), + impl ntex_io::AsyncWrite for Write { + #[inline] + async fn write(&mut self, mut buf: BytesVec) -> (BytesVec, io::Result<()>) { + match lazy(|cx| flush_io(&mut self.0.borrow_mut().0, &mut buf, cx)).await { + Poll::Ready(res) => (buf, res), + Poll::Pending => (buf, Ok(())), } } - } - - impl Future for WriteTask { - type Output = (); - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.as_mut().get_mut(); - match this.st { - IoWriteState::Processing(ref mut delay) => { - match this.state.poll_ready(cx) { - Poll::Ready(WriteStatus::Ready) => { - if let Some(delay) = delay { - if delay.poll_elapsed(cx).is_ready() { - this.state.close(Some(io::Error::new( - io::ErrorKind::TimedOut, - "Operation timedout", - ))); - return Poll::Ready(()); - } - } - - // flush io stream - let io = &mut this.io.0; - match ready!(this.state.with_buf(|buf| flush_io(io, buf, cx))) { - Ok(()) => Poll::Pending, - Err(e) => { - this.state.close(Some(e)); - Poll::Ready(()) - } - } - } - Poll::Ready(WriteStatus::Timeout(time)) => { - log::trace!("initiate timeout delay for {:?}", time); - if delay.is_none() { - *delay = Some(sleep(time)); - } - self.poll(cx) - } - Poll::Ready(WriteStatus::Shutdown(time)) => { - log::trace!("write task is instructed to shutdown"); - - let timeout = if let Some(delay) = delay.take() { - delay - } else { - sleep(time) - }; - - this.st = IoWriteState::Shutdown(timeout, Shutdown::None); - self.poll(cx) - } - Poll::Ready(WriteStatus::Terminate) => { - log::trace!("write task is instructed to terminate"); - - let _ = Pin::new(&mut this.io.0).poll_close(cx); - this.state.close(None); - Poll::Ready(()) - } - Poll::Pending => Poll::Pending, - } - } - IoWriteState::Shutdown(ref mut delay, ref mut st) => { - // close WRITE side and wait for disconnect on read side. - // use disconnect timeout, otherwise it could hang forever. - loop { - match st { - Shutdown::None => { - // flush write buffer - let io = &mut this.io.0; - match this.state.with_buf(|buf| flush_io(io, buf, cx)) { - Poll::Ready(Ok(())) => { - if let Err(e) = - this.io.0.shutdown(std::net::Shutdown::Write) - { - this.state.close(Some(e)); - return Poll::Ready(()); - } - *st = Shutdown::Stopping(0); - continue; - } - Poll::Ready(Err(err)) => { - log::trace!( - "write task is closed with err during flush, {:?}", - err - ); - this.state.close(Some(err)); - return Poll::Ready(()); - } - Poll::Pending => (), - } - } - Shutdown::Stopping(ref mut count) => { - // read until 0 or err - let mut buf = [0u8; 512]; - let io = &mut this.io; - loop { - match Pin::new(&mut io.0).poll_read(cx, &mut buf) { - Poll::Ready(Err(e)) => { - log::trace!("write task is stopped"); - this.state.close(Some(e)); - return Poll::Ready(()); - } - Poll::Ready(Ok(0)) => { - log::trace!( - "async-std unix socket is disconnected" - ); - this.state.close(None); - return Poll::Ready(()); - } - Poll::Ready(Ok(n)) => { - *count += n as u16; - if *count > 4096 { - log::trace!( - "write task is stopped, too much input" - ); - this.state.close(None); - return Poll::Ready(()); - } - } - Poll::Pending => break, - } - } - } - } + #[inline] + async fn flush(&mut self) -> io::Result<()> { + Ok(()) + } - // disconnect timeout - if delay.poll_elapsed(cx).is_pending() { - return Poll::Pending; - } - log::trace!("write task is stopped after delay"); - this.state.close(None); - let _ = Pin::new(&mut this.io.0).poll_close(cx); - return Poll::Ready(()); - } - } - } + #[inline] + async fn shutdown(&mut self) -> io::Result<()> { + self.0.borrow().0.shutdown(std::net::Shutdown::Both) } } } diff --git a/ntex-compio/src/io.rs b/ntex-compio/src/io.rs index 7a5d1bb84..a7f98241f 100644 --- a/ntex-compio/src/io.rs +++ b/ntex-compio/src/io.rs @@ -4,16 +4,15 @@ use compio::buf::{BufResult, IoBuf, IoBufMut, SetBufInit}; use compio::io::{AsyncRead, AsyncWrite}; use compio::net::TcpStream; use ntex_bytes::{Buf, BufMut, BytesVec}; -use ntex_io::{types, Handle, IoStream, ReadContext, WriteContext, WriteStatus}; -use ntex_util::{future::select, future::Either, time::sleep}; +use ntex_io::{types, Handle, IoStream, ReadContext, WriteContext}; impl IoStream for crate::TcpStream { fn start(self, read: ReadContext, write: WriteContext) -> Option> { let io = self.0.clone(); compio::runtime::spawn(async move { - let mut wr_io = io.clone(); + let mut wr_io = WriteIo(io.clone()); let wr_task = compio::runtime::spawn(async move { - write_task(&mut wr_io, &write).await; + write.handle(&mut wr_io).await; log::debug!("{} Write task is stopped", write.tag()); }); let mut io = ReadIo(io); @@ -41,9 +40,9 @@ impl IoStream for crate::UnixStream { fn start(self, read: ReadContext, write: WriteContext) -> Option> { let io = self.0; compio::runtime::spawn(async move { - let mut wr_io = io.clone(); + let mut wr_io = WriteIo(io.clone()); let wr_task = compio::runtime::spawn(async move { - write_task(&mut wr_io, &write).await; + write.handle(&mut wr_io).await; log::debug!("{} Write task is stopped", write.tag()); }); @@ -111,105 +110,61 @@ impl SetBufInit for CompioBuf { } } -struct ReadIo(T); +struct ReadIo(T); impl ntex_io::AsyncRead for ReadIo where T: AsyncRead, { + #[inline] async fn read(&mut self, buf: BytesVec) -> (BytesVec, io::Result) { let BufResult(result, buf) = self.0.read(CompioBuf(buf)).await; (buf.0, result) } } -/// Write io task -async fn write_task(mut io: T, state: &WriteContext) { - let mut delay = None; - - loop { - let result = if let Some(ref mut sleep) = delay { - let result = match select(sleep, state.ready()).await { - Either::Left(_) => { - state.close(Some(io::Error::new( - io::ErrorKind::TimedOut, - "Operation timedout", - ))); - return; - } - Either::Right(res) => res, - }; - delay = None; - result - } else { - state.ready().await - }; - - match result { - WriteStatus::Ready => { - // write io stream - match write(&mut io, state).await { - Ok(()) => continue, - Err(e) => { - state.close(Some(e)); - } - } - } - WriteStatus::Timeout(time) => { - log::trace!("{}: Initiate timeout delay for {:?}", state.tag(), time); - delay = Some(sleep(time)); - continue; - } - WriteStatus::Shutdown(time) => { - log::trace!("{}: Write task is instructed to shutdown", state.tag()); - - let fut = async { - write(&mut io, state).await?; - io.flush().await?; - io.shutdown().await?; - Ok(()) - }; - match select(sleep(time), fut).await { - Either::Left(_) => state.close(None), - Either::Right(res) => state.close(res.err()), - } - } - WriteStatus::Terminate => { - log::trace!("{}: Write task is instructed to terminate", state.tag()); - state.close(io.shutdown().await.err()); - } - } - break; - } -} +struct WriteIo(T); -// write to io stream -async fn write(io: &mut T, state: &WriteContext) -> io::Result<()> { - state - .with_buf_async(|buf| async { - let mut buf = CompioBuf(buf); - loop { - let BufResult(result, buf1) = io.write(buf).await; - buf = buf1; - - return match result { - Ok(0) => Err(io::Error::new( +impl ntex_io::AsyncWrite for WriteIo +where + T: AsyncWrite, +{ + #[inline] + async fn write(&mut self, buf: BytesVec) -> (BytesVec, io::Result<()>) { + let mut buf = CompioBuf(buf); + loop { + let BufResult(result, buf1) = self.0.write(buf).await; + buf = buf1; + + return match result { + Ok(0) => ( + buf.0, + Err(io::Error::new( io::ErrorKind::WriteZero, "failed to write frame to transport", )), - Ok(size) => { - if buf.0.len() == size { - // return io.flush().await; - state.memory_pool().release_write_buf(buf.0); - Ok(()) - } else { - buf.0.advance(size); - continue; - } + ), + Ok(size) => { + buf.0.advance(size); + + if buf.0.is_empty() { + (buf.0, Ok(())) + } else { + continue; } - Err(e) => Err(e), - }; - } - }) - .await + } + Err(e) => (buf.0, Err(e)), + }; + } + } + + #[inline] + async fn flush(&mut self) -> io::Result<()> { + self.0.flush().await + } + + #[inline] + async fn shutdown(&mut self) -> io::Result<()> { + self.0.shutdown().await + } } diff --git a/ntex-glommio/src/io.rs b/ntex-glommio/src/io.rs index 698719728..53166de7f 100644 --- a/ntex-glommio/src/io.rs +++ b/ntex-glommio/src/io.rs @@ -11,24 +11,23 @@ use crate::net_impl::{TcpStream, UnixStream}; impl IoStream for TcpStream { fn start(self, read: ReadContext, write: WriteContext) -> Option> { - let mut rio = ReadTask(self.clone()); - glommio::spawn_local(async move { - read.handle(&mut rio).await; - }) - .detach(); - glommio::spawn_local(WriteTask::new(self.clone(), write)).detach(); + let mut rio = Read(self.clone()); + glommio::spawn_local(async move { read.handle(&mut rio).await }).detach(); + let mut wio = Write(self.clone()); + glommio::spawn_local(async move { write.handle(&mut wio).await }).detach(); Some(Box::new(self)) } } impl IoStream for UnixStream { fn start(self, read: ReadContext, write: WriteContext) -> Option> { - let mut rio = UnixReadTask(self.clone()); + let mut rio = UnixRead(self.clone()); glommio::spawn_local(async move { read.handle(&mut rio).await; }) .detach(); - glommio::spawn_local(UnixWriteTask::new(self, write)).detach(); + let mut wio = UnixWrite(self); + glommio::spawn_local(async move { write.handle(&mut wio).await }).detach(); None } } @@ -45,9 +44,9 @@ impl Handle for TcpStream { } /// Read io task -struct ReadTask(TcpStream); +struct Read(TcpStream); -impl ntex_io::AsyncRead for ReadTask { +impl ntex_io::AsyncRead for Read { async fn read(&mut self, mut buf: BytesVec) -> (BytesVec, io::Result) { // read data from socket let result = poll_fn(|cx| { @@ -59,245 +58,88 @@ impl ntex_io::AsyncRead for ReadTask { } } -enum IoWriteState { - Processing(Option), - Shutdown(Sleep, Shutdown), -} - -enum Shutdown { - Flush, - Close(Pin>>>), - Stopping(u16), -} +struct Write(TcpStream); -/// Write io task -struct WriteTask { - st: IoWriteState, - io: TcpStream, - state: WriteContext, -} - -impl WriteTask { - /// Create new write io task - fn new(io: TcpStream, state: WriteContext) -> Self { - Self { - io, - state, - st: IoWriteState::Processing(None), +impl ntex_io::AsyncWrite for Write { + #[inline] + async fn write(&mut self, mut buf: BytesVec) -> (BytesVec, io::Result<()>) { + match lazy(|cx| flush_io(&mut *self.0.borrow_mut(), &mut buf, cx)).await { + Poll::Ready(res) => (buf, res), + Poll::Pending => (buf, Ok(())), } } -} - -impl Future for WriteTask { - type Output = (); - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.as_mut().get_mut(); - - match this.st { - IoWriteState::Processing(ref mut delay) => { - match this.state.poll_ready(cx) { - Poll::Ready(WriteStatus::Ready) => { - if let Some(delay) = delay { - if delay.poll_elapsed(cx).is_ready() { - this.state.close(Some(io::Error::new( - io::ErrorKind::TimedOut, - "Operation timedout", - ))); - return Poll::Ready(()); - } - } - - // flush io stream - match ready!(this.state.with_buf(|buf| flush_io( - &mut *this.io.0.borrow_mut(), - buf, - cx - ))) { - Ok(()) => Poll::Pending, - Err(e) => { - this.state.close(Some(e)); - Poll::Ready(()) - } - } - } - Poll::Ready(WriteStatus::Timeout(time)) => { - log::trace!("initiate timeout delay for {:?}", time); - if delay.is_none() { - *delay = Some(sleep(time)); - } - self.poll(cx) - } - Poll::Ready(WriteStatus::Shutdown(time)) => { - log::trace!("write task is instructed to shutdown"); - - let timeout = if let Some(delay) = delay.take() { - delay - } else { - sleep(time) - }; - - this.st = IoWriteState::Shutdown(timeout, Shutdown::Flush); - self.poll(cx) - } - Poll::Ready(WriteStatus::Terminate) => { - log::trace!("write task is instructed to terminate"); - let _ = Pin::new(&mut *this.io.0.borrow_mut()).poll_close(cx); - this.state.close(None); - Poll::Ready(()) - } - Poll::Pending => Poll::Pending, - } - } - IoWriteState::Shutdown(ref mut delay, ref mut st) => { - // close WRITE side and wait for disconnect on read side. - // use disconnect timeout, otherwise it could hang forever. - loop { - match st { - Shutdown::Flush => { - // flush write buffer - let mut io = this.io.0.borrow_mut(); - match this.state.with_buf(|buf| flush_io(&mut *io, buf, cx)) { - Poll::Ready(Ok(())) => { - let io = this.io.clone(); - #[allow(clippy::await_holding_refcell_ref)] - let fut = Box::pin(async move { - io.0.borrow() - .shutdown(std::net::Shutdown::Write) - .await - }); - *st = Shutdown::Close(fut); - continue; - } - Poll::Ready(Err(err)) => { - log::trace!( - "write task is closed with err during flush, {:?}", - err - ); - this.state.close(Some(err)); - return Poll::Ready(()); - } - Poll::Pending => (), - } - } - Shutdown::Close(ref mut fut) => { - if ready!(fut.poll(cx)).is_err() { - this.state.close(None); - return Poll::Ready(()); - } - *st = Shutdown::Stopping(0); - continue; - } - Shutdown::Stopping(ref mut count) => { - // read until 0 or err - let mut buf = [0u8; 512]; - let io = &mut this.io; - loop { - match Pin::new(&mut *io.0.borrow_mut()) - .poll_read(cx, &mut buf) - { - Poll::Ready(Err(e)) => { - log::trace!("write task is stopped"); - this.state.close(Some(e)); - return Poll::Ready(()); - } - Poll::Ready(Ok(0)) => { - log::trace!("glommio socket is disconnected"); - this.state.close(None); - return Poll::Ready(()); - } - Poll::Ready(Ok(n)) => { - *count += n as u16; - if *count > 4096 { - log::trace!( - "write task is stopped, too much input" - ); - this.state.close(None); - return Poll::Ready(()); - } - } - Poll::Pending => break, - } - } - } - } + #[inline] + async fn flush(&mut self) -> io::Result<()> { + Ok(()) + } - // disconnect timeout - if delay.poll_elapsed(cx).is_pending() { - return Poll::Pending; - } - log::trace!("write task is stopped after delay"); - this.state.close(None); - let _ = Pin::new(&mut *this.io.0.borrow_mut()).poll_close(cx); - return Poll::Ready(()); - } - } - } + #[inline] + async fn shutdown(&mut self) -> io::Result<()> { + poll_fn(|cx| Pin::new(&mut *self.0.borrow_mut()).poll_close(cx)).await } } /// Flush write buffer to underlying I/O stream. pub(super) fn flush_io( io: &mut T, - buf: &mut Option, + buf: &mut BytesVec, cx: &mut Context<'_>, ) -> Poll> { - if let Some(buf) = buf { - let len = buf.len(); - - if len != 0 { - // log::trace!("flushing framed transport: {:?}", buf.len()); - - let mut written = 0; - let result = loop { - break match Pin::new(&mut *io).poll_write(cx, &buf[written..]) { - Poll::Ready(Ok(n)) => { - if n == 0 { - log::trace!("Disconnected during flush, written {}", written); - Poll::Ready(Err(io::Error::new( - io::ErrorKind::WriteZero, - "failed to write frame to transport", - ))) + let len = buf.len(); + + if len != 0 { + // log::trace!("flushing framed transport: {:?}", buf.len()); + + let mut written = 0; + let result = loop { + break match Pin::new(&mut *io).poll_write(cx, &buf[written..]) { + Poll::Ready(Ok(n)) => { + if n == 0 { + log::trace!("Disconnected during flush, written {}", written); + Poll::Ready(Err(io::Error::new( + io::ErrorKind::WriteZero, + "failed to write frame to transport", + ))) + } else { + written += n; + if written == len { + buf.clear(); + Poll::Ready(Ok(())) } else { - written += n; - if written == len { - buf.clear(); - Poll::Ready(Ok(())) - } else { - continue; - } + continue; } } - Poll::Pending => { - // remove written data - buf.advance(written); - Poll::Pending - } - Poll::Ready(Err(e)) => { - log::trace!("Error during flush: {}", e); - Poll::Ready(Err(e)) - } - }; - }; - log::trace!("flushed {} bytes", written); - - // flush - return if written > 0 { - match Pin::new(&mut *io).poll_flush(cx) { - Poll::Ready(Ok(_)) => result, - Poll::Pending => Poll::Pending, - Poll::Ready(Err(e)) => { - log::trace!("error during flush: {}", e); - Poll::Ready(Err(e)) - } } - } else { - result + Poll::Pending => { + // remove written data + buf.advance(written); + Poll::Pending + } + Poll::Ready(Err(e)) => { + log::trace!("Error during flush: {}", e); + Poll::Ready(Err(e)) + } }; - } + }; + // log::trace!("flushed {} bytes", written); + + // flush + return if written > 0 { + match Pin::new(&mut *io).poll_flush(cx) { + Poll::Ready(Ok(_)) => result, + Poll::Pending => Poll::Pending, + Poll::Ready(Err(e)) => { + log::trace!("error during flush: {}", e); + Poll::Ready(Err(e)) + } + } + } else { + result + }; + } else { + Poll::Ready(Ok(())) } - Poll::Ready(Ok(())) } pub fn poll_read_buf( @@ -317,10 +159,9 @@ pub fn poll_read_buf( Poll::Ready(Ok(n)) } -/// Read io task -struct UnixReadTask(UnixStream); +struct UnixRead(UnixStream); -impl ntex_io::AsyncRead for UnixReadTask { +impl ntex_io::AsyncRead for UnixRead { async fn read(&mut self, mut buf: BytesVec) -> (BytesVec, io::Result) { // read data from socket let result = poll_fn(|cx| { @@ -332,169 +173,24 @@ impl ntex_io::AsyncRead for UnixReadTask { } } -/// Write io task -struct UnixWriteTask { - st: IoWriteState, - io: UnixStream, - state: WriteContext, -} +struct UnixWrite(UnixStream); -impl UnixWriteTask { - /// Create new write io task - fn new(io: UnixStream, state: WriteContext) -> Self { - Self { - io, - state, - st: IoWriteState::Processing(None), +impl ntex_io::AsyncWrite for UnixWrite { + #[inline] + async fn write(&mut self, mut buf: BytesVec) -> (BytesVec, io::Result<()>) { + match lazy(|cx| flush_io(&mut *self.0.borrow_mut(), &mut buf, cx)).await { + Poll::Ready(res) => (buf, res), + Poll::Pending => (buf, Ok(())), } } -} - -impl Future for UnixWriteTask { - type Output = (); - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.as_mut().get_mut(); - match this.st { - IoWriteState::Processing(ref mut delay) => { - match this.state.poll_ready(cx) { - Poll::Ready(WriteStatus::Ready) => { - if let Some(delay) = delay { - if delay.poll_elapsed(cx).is_ready() { - this.state.close(Some(io::Error::new( - io::ErrorKind::TimedOut, - "Operation timedout", - ))); - return Poll::Ready(()); - } - } - - // flush io stream - match ready!(this.state.with_buf(|buf| flush_io( - &mut *this.io.0.borrow_mut(), - buf, - cx - ))) { - Ok(()) => Poll::Pending, - Err(e) => { - this.state.close(Some(e)); - Poll::Ready(()) - } - } - } - Poll::Ready(WriteStatus::Timeout(time)) => { - log::trace!("initiate timeout delay for {:?}", time); - if delay.is_none() { - *delay = Some(sleep(time)); - } - self.poll(cx) - } - Poll::Ready(WriteStatus::Shutdown(time)) => { - log::trace!("write task is instructed to shutdown"); - - let timeout = if let Some(delay) = delay.take() { - delay - } else { - sleep(time) - }; - - this.st = IoWriteState::Shutdown(timeout, Shutdown::Flush); - self.poll(cx) - } - Poll::Ready(WriteStatus::Terminate) => { - log::trace!("write task is instructed to terminate"); - - let _ = Pin::new(&mut *this.io.0.borrow_mut()).poll_close(cx); - this.state.close(None); - Poll::Ready(()) - } - Poll::Pending => Poll::Pending, - } - } - IoWriteState::Shutdown(ref mut delay, ref mut st) => { - // close WRITE side and wait for disconnect on read side. - // use disconnect timeout, otherwise it could hang forever. - loop { - match st { - Shutdown::Flush => { - // flush write buffer - let mut io = this.io.0.borrow_mut(); - match this.state.with_buf(|buf| flush_io(&mut *io, buf, cx)) { - Poll::Ready(Ok(())) => { - let io = this.io.clone(); - #[allow(clippy::await_holding_refcell_ref)] - let fut = Box::pin(async move { - io.0.borrow() - .shutdown(std::net::Shutdown::Write) - .await - }); - *st = Shutdown::Close(fut); - continue; - } - Poll::Ready(Err(err)) => { - log::trace!( - "write task is closed with err during flush, {:?}", - err - ); - this.state.close(Some(err)); - return Poll::Ready(()); - } - Poll::Pending => (), - } - } - Shutdown::Close(ref mut fut) => { - if ready!(fut.poll(cx)).is_err() { - this.state.close(None); - return Poll::Ready(()); - } - *st = Shutdown::Stopping(0); - continue; - } - Shutdown::Stopping(ref mut count) => { - // read until 0 or err - let mut buf = [0u8; 512]; - let io = &mut this.io; - loop { - match Pin::new(&mut *io.0.borrow_mut()) - .poll_read(cx, &mut buf) - { - Poll::Ready(Err(e)) => { - log::trace!("write task is stopped"); - this.state.close(Some(e)); - return Poll::Ready(()); - } - Poll::Ready(Ok(0)) => { - log::trace!("glommio unix socket is disconnected"); - this.state.close(None); - return Poll::Ready(()); - } - Poll::Ready(Ok(n)) => { - *count += n as u16; - if *count > 4096 { - log::trace!( - "write task is stopped, too much input" - ); - this.state.close(None); - return Poll::Ready(()); - } - } - Poll::Pending => break, - } - } - } - } + #[inline] + async fn flush(&mut self) -> io::Result<()> { + Ok(()) + } - // disconnect timeout - if delay.poll_elapsed(cx).is_pending() { - return Poll::Pending; - } - log::trace!("write task is stopped after delay"); - this.state.close(None); - let _ = Pin::new(&mut *this.io.0.borrow_mut()).poll_close(cx); - return Poll::Ready(()); - } - } - } + #[inline] + async fn shutdown(&mut self) -> io::Result<()> { + poll_fn(|cx| Pin::new(&mut *self.0.borrow_mut()).poll_close(cx)).await } } diff --git a/ntex-io/src/buf.rs b/ntex-io/src/buf.rs index 478442efb..7d4624f05 100644 --- a/ntex-io/src/buf.rs +++ b/ntex-io/src/buf.rs @@ -152,27 +152,6 @@ impl Stack { } } - pub(crate) fn with_read_source(&self, io: &IoRef, f: F) -> R - where - F: FnOnce(&mut BytesVec) -> R, - { - let item = self.get_last_level(); - let mut rb = item.0.take(); - if rb.is_none() { - rb = Some(io.memory_pool().get_read_buf()); - } - - let result = f(rb.as_mut().unwrap()); - if let Some(b) = rb { - if b.is_empty() { - io.memory_pool().release_read_buf(b); - } else { - item.0.set(Some(b)); - } - } - result - } - pub(crate) fn with_read_destination(&self, io: &IoRef, f: F) -> R where F: FnOnce(&mut BytesVec) -> R, @@ -226,6 +205,17 @@ impl Stack { self.get_last_level().1.take() } + pub(crate) fn set_write_destination(&self, buf: BytesVec) -> Option { + let b = self.get_last_level().1.take(); + if b.is_some() { + self.get_last_level().1.set(b); + Some(buf) + } else { + self.get_last_level().1.set(Some(buf)); + None + } + } + pub(crate) fn with_write_destination(&self, io: &IoRef, f: F) -> R where F: FnOnce(&mut Option) -> R, diff --git a/ntex-io/src/ioref.rs b/ntex-io/src/ioref.rs index 340c03f54..02dc33766 100644 --- a/ntex-io/src/ioref.rs +++ b/ntex-io/src/ioref.rs @@ -41,10 +41,6 @@ impl IoRef { .intersects(Flags::IO_STOPPING | Flags::IO_STOPPED) } - pub(crate) fn is_io_closed(&self) -> bool { - self.0.flags.get().intersects(Flags::IO_STOPPED) - } - #[inline] /// Check if write back-pressure is enabled pub fn is_wr_backpressure(&self) -> bool { diff --git a/ntex-io/src/lib.rs b/ntex-io/src/lib.rs index 094cb2a86..2a09cdaea 100644 --- a/ntex-io/src/lib.rs +++ b/ntex-io/src/lib.rs @@ -39,6 +39,19 @@ pub use self::utils::{seal, Decoded}; pub use self::flags::Flags; #[doc(hidden)] +pub trait AsyncRead { + async fn read(&mut self, buf: BytesVec) -> (BytesVec, sio::Result); +} + +#[doc(hidden)] +pub trait AsyncWrite { + async fn write(&mut self, buf: BytesVec) -> (BytesVec, sio::Result<()>); + + async fn flush(&mut self) -> sio::Result<()>; + + async fn shutdown(&mut self) -> sio::Result<()>; +} + /// Status for read task #[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] pub enum ReadStatus { @@ -46,11 +59,6 @@ pub enum ReadStatus { Terminate, } -#[doc(hidden)] -pub trait AsyncRead { - async fn read(&mut self, buf: BytesVec) -> (BytesVec, sio::Result); -} - /// Status for write task #[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)] pub enum WriteStatus { diff --git a/ntex-io/src/tasks.rs b/ntex-io/src/tasks.rs index 747d5703a..281faaafe 100644 --- a/ntex-io/src/tasks.rs +++ b/ntex-io/src/tasks.rs @@ -1,9 +1,9 @@ -use std::{future::poll_fn, future::Future, io, task::Context, task::Poll}; +use std::{future::poll_fn, io, task::Poll}; -use ntex_bytes::{BufMut, BytesVec, PoolRef}; -use ntex_util::future::{select, Either}; +use ntex_bytes::BufMut; +use ntex_util::{future::select, future::Either, time::sleep}; -use crate::{AsyncRead, Flags, IoRef, ReadStatus, WriteStatus}; +use crate::{AsyncRead, AsyncWrite, Flags, IoRef, ReadStatus, WriteStatus}; #[derive(Debug)] /// Context for io read task @@ -20,22 +20,8 @@ impl ReadContext { self.0.tag() } - #[deprecated] - #[inline] - /// Check readiness for read operations - pub async fn ready(&self) -> ReadStatus { - poll_fn(|cx| self.0.filter().poll_read_ready(cx)).await - } - - #[deprecated] - #[inline] - /// Wait when io get closed or preparing for close - pub async fn wait_for_close(&self) { - self.wait_for_close2().await - } - /// Wait when io get closed or preparing for close - async fn wait_for_close2(&self) { + async fn wait_for_close(&self) { poll_fn(|cx| { let flags = self.0.flags(); @@ -52,113 +38,7 @@ impl ReadContext { .await } - #[deprecated] - #[inline] - /// Check readiness for read operations - pub fn poll_ready(&self, cx: &mut Context<'_>) -> Poll { - self.0.filter().poll_read_ready(cx) - } - - #[deprecated] - /// Get read buffer - pub fn with_buf(&self, f: F) -> Poll<()> - where - F: FnOnce(&mut BytesVec, usize, usize) -> Poll>, - { - let inner = &self.0 .0; - let (hw, lw) = self.0.memory_pool().read_params().unpack(); - let (result, nbytes, total) = inner.buffer.with_read_source(&self.0, |buf| { - let total = buf.len(); - - // call provided callback - let result = f(buf, hw, lw); - let total2 = buf.len(); - let nbytes = if total2 > total { total2 - total } else { 0 }; - (result, nbytes, total2) - }); - - // handle buffer changes - if nbytes > 0 { - let filter = self.0.filter(); - let _ = filter - .process_read_buf(&self.0, &inner.buffer, 0, nbytes) - .and_then(|status| { - if status.nbytes > 0 { - // dest buffer has new data, wake up dispatcher - if inner.buffer.read_destination_size() >= hw { - log::trace!( - "{}: Io read buffer is too large {}, enable read back-pressure", - self.0.tag(), - total - ); - inner.insert_flags(Flags::BUF_R_READY | Flags::BUF_R_FULL); - } else { - inner.insert_flags(Flags::BUF_R_READY); - - if nbytes >= hw { - // read task is paused because of read back-pressure - // but there is no new data in top most read buffer - // so we need to wake up read task to read more data - // otherwise read task would sleep forever - inner.read_task.wake(); - } - } - log::trace!( - "{}: New {} bytes available, wakeup dispatcher", - self.0.tag(), - nbytes - ); - inner.dispatch_task.wake(); - } else { - if nbytes >= hw { - // read task is paused because of read back-pressure - // but there is no new data in top most read buffer - // so we need to wake up read task to read more data - // otherwise read task would sleep forever - inner.read_task.wake(); - } - if inner.flags.get().contains(Flags::RD_NOTIFY) { - // in case of "notify" we must wake up dispatch task - // if we read any data from source - inner.dispatch_task.wake(); - } - } - - // while reading, filter wrote some data - // in that case filters need to process write buffers - // and potentialy wake write task - if status.need_write { - filter.process_write_buf(&self.0, &inner.buffer, 0) - } else { - Ok(()) - } - }) - .map_err(|err| { - inner.dispatch_task.wake(); - inner.io_stopped(Some(err)); - inner.insert_flags(Flags::BUF_R_READY); - }); - } - - match result { - Poll::Ready(Ok(())) => { - inner.io_stopped(None); - Poll::Ready(()) - } - Poll::Ready(Err(e)) => { - inner.io_stopped(Some(e)); - Poll::Ready(()) - } - Poll::Pending => { - if inner.flags.get().contains(Flags::IO_STOPPING_FILTERS) { - shutdown_filters(&self.0); - } - Poll::Pending - } - } - } - - /// Get read buffer (async) + /// Handle read io operations pub async fn handle(&self, io: &mut T) where T: AsyncRead, @@ -192,7 +72,7 @@ impl ReadContext { let total = buf.len(); // call provided callback - let (buf, result) = match select(io.read(buf), self.wait_for_close2()).await { + let (buf, result) = match select(io.read(buf), self.wait_for_close()).await { Either::Left(res) => res, Either::Right(_) => { log::trace!("{}: Read io is closed, stop read task", self.tag()); @@ -297,127 +177,163 @@ impl WriteContext { self.0.tag() } - #[inline] - /// Return memory pool for this context - pub fn memory_pool(&self) -> PoolRef { - self.0.memory_pool() - } - - #[inline] /// Check readiness for write operations - pub async fn ready(&self) -> WriteStatus { + async fn ready(&self) -> WriteStatus { poll_fn(|cx| self.0.filter().poll_write_ready(cx)).await } - #[inline] - /// Check readiness for write operations - pub fn poll_ready(&self, cx: &mut Context<'_>) -> Poll { - self.0.filter().poll_write_ready(cx) - } - - #[inline] - /// Check if io is closed - pub fn poll_close(&self, cx: &mut Context<'_>) -> Poll<()> { - if self.0.is_io_closed() { - Poll::Ready(()) - } else { - self.0 .0.write_task.register(cx.waker()); - Poll::Pending - } + /// Indicate that write io task is stopped + fn close(&self, err: Option) { + self.0 .0.io_stopped(err); } - /// Get write buffer - pub fn with_buf(&self, f: F) -> Poll> + /// Handle write io operations + pub async fn handle(&self, io: &mut T) where - F: FnOnce(&mut Option) -> Poll>, + T: AsyncWrite, { let inner = &self.0 .0; + let mut delay = None; + let mut buf = None; - // call provided callback - let (result, len) = inner.buffer.with_write_destination(&self.0, |buf| { - let result = f(buf); - (result, buf.as_ref().map(|b| b.len()).unwrap_or(0)) - }); - - // if write buffer is smaller than high watermark value, turn off back-pressure - let mut flags = inner.flags.get(); - if len == 0 { - if flags.is_waiting_for_write() { - flags.waiting_for_write_is_done(); - inner.dispatch_task.wake(); - } - } else if flags.contains(Flags::BUF_W_BACKPRESSURE) - && len < inner.pool.get().write_params_high() << 1 - { - flags.remove(Flags::BUF_W_BACKPRESSURE); - inner.dispatch_task.wake(); - } - - match result { - Poll::Pending => flags.remove(Flags::WR_PAUSED), - Poll::Ready(Ok(())) => flags.insert(Flags::WR_PAUSED), - Poll::Ready(Err(_)) => {} - } + loop { + // check readiness + let result = if let Some(ref mut sleep) = delay { + let result = match select(sleep, self.ready()).await { + Either::Left(_) => { + self.close(Some(io::Error::new( + io::ErrorKind::TimedOut, + "Operation timedout", + ))); + return; + } + Either::Right(res) => res, + }; + delay = None; + result + } else { + self.ready().await + }; - inner.flags.set(flags); - result - } + // running + let mut flags = inner.flags.get(); + if flags.contains(Flags::WR_PAUSED) { + flags.remove(Flags::WR_PAUSED); + inner.flags.set(flags); + } - /// Get write buffer (async) - pub async fn with_buf_async(&self, f: F) -> io::Result<()> - where - F: FnOnce(BytesVec) -> R, - R: Future>, - { - let inner = &self.0 .0; + // call provided callback + match result { + WriteStatus::Ready => { + // write io stream + let (buf_result, result) = if let Some(b) = buf.take() { + io.write(b).await + } else if let Some(b) = inner.buffer.get_write_destination() { + io.write(b).await + } else { + // nothing to write, wait for wakeup + if flags.is_waiting_for_write() { + flags.waiting_for_write_is_done(); + inner.dispatch_task.wake(); + } + flags.insert(Flags::WR_PAUSED); - // running - let mut flags = inner.flags.get(); - if flags.contains(Flags::WR_PAUSED) { - flags.remove(Flags::WR_PAUSED); - inner.flags.set(flags); - } + if flags.contains(Flags::BUF_W_BACKPRESSURE) { + flags.remove( + Flags::BUF_W_BACKPRESSURE | Flags::BUF_W_MUST_FLUSH, + ); + inner.dispatch_task.wake(); + } + inner.flags.set(flags); + + continue; + }; + + match result { + Ok(_) => { + let len = if buf_result.is_empty() { + // return io.flush().await; + self.0.memory_pool().release_write_buf(buf_result); + 0 + } else if let Some(b) = + inner.buffer.set_write_destination(buf_result) + { + // write buffer is already set, we have to write + // current buffer + let l = b.len(); + buf = Some(b); + l + } else { + 0 + }; + + // if write buffer is smaller than high watermark value, turn off back-pressure + let mut flags = inner.flags.get(); + let len = len + inner.buffer.write_destination_size(); + + if len == 0 { + if flags.is_waiting_for_write() { + flags.waiting_for_write_is_done(); + inner.dispatch_task.wake(); + } + flags.insert(Flags::WR_PAUSED); + inner.flags.set(flags); + } else if flags.contains(Flags::BUF_W_BACKPRESSURE) + && len < inner.pool.get().write_params_high() << 1 + { + flags.remove(Flags::BUF_W_BACKPRESSURE); + inner.flags.set(flags); + inner.dispatch_task.wake(); + } - // buffer - let buf = inner.buffer.get_write_destination(); + continue; + } + Err(e) => self.close(Some(e)), + } + } + WriteStatus::Timeout(time) => { + log::trace!("{}: Initiate timeout delay for {:?}", self.tag(), time); + delay = Some(sleep(time)); + continue; + } + WriteStatus::Shutdown(time) => { + log::trace!("{}: Write task is instructed to shutdown", self.tag()); + + let fut = async { + // write io stream + loop { + buf = if let Some(b) = buf { + let (b, result) = io.write(b).await; + result?; + if !b.is_empty() { + Some(b) + } else { + None + } + } else { + inner.buffer.get_write_destination() + }; - // call provided callback - let result = if let Some(buf) = buf { - if !buf.is_empty() { - f(buf).await - } else { - Ok(()) - } - } else { - Ok(()) - }; - - // if write buffer is smaller than high watermark value, turn off back-pressure - let mut flags = inner.flags.get(); - let len = inner.buffer.write_destination_size(); - - if len == 0 { - if flags.is_waiting_for_write() { - flags.waiting_for_write_is_done(); - inner.dispatch_task.wake(); + if buf.is_none() { + break; + } + } + io.flush().await?; + io.shutdown().await?; + Ok(()) + }; + match select(sleep(time), fut).await { + Either::Left(_) => self.close(None), + Either::Right(res) => self.close(res.err()), + } + } + WriteStatus::Terminate => { + log::trace!("{}: Write task is instructed to terminate", self.tag()); + self.close(io.shutdown().await.err()); + } } - flags.insert(Flags::WR_PAUSED); - inner.flags.set(flags); - } else if flags.contains(Flags::BUF_W_BACKPRESSURE) - && len < inner.pool.get().write_params_high() << 1 - { - flags.remove(Flags::BUF_W_BACKPRESSURE); - inner.flags.set(flags); - inner.dispatch_task.wake(); + return; } - - result - } - - #[inline] - /// Indicate that write io task is stopped - pub fn close(&self, err: Option) { - self.0 .0.io_stopped(err); } } diff --git a/ntex-io/src/testing.rs b/ntex-io/src/testing.rs index 845f6cfea..496d143cc 100644 --- a/ntex-io/src/testing.rs +++ b/ntex-io/src/testing.rs @@ -1,14 +1,13 @@ //! utilities and helpers for testing #![allow(clippy::let_underscore_future)] -use std::future::{poll_fn, Future}; use std::sync::{Arc, Mutex}; -use std::task::{ready, Context, Poll, Waker}; -use std::{any, cell::RefCell, cmp, fmt, io, mem, net, pin::Pin, rc::Rc}; +use std::task::{Context, Poll, Waker}; +use std::{any, cell::RefCell, cmp, fmt, future::poll_fn, io, mem, net, rc::Rc}; use ntex_bytes::{Buf, BufMut, Bytes, BytesVec}; -use ntex_util::time::{sleep, Millis, Sleep}; +use ntex_util::time::{sleep, Millis}; -use crate::{types, Handle, IoStream, ReadContext, WriteContext, WriteStatus}; +use crate::{types, Handle, IoStream, ReadContext, WriteContext}; #[derive(Default)] struct AtomicWaker(Arc>>>); @@ -356,14 +355,14 @@ impl IoStream for IoTest { fn start(self, read: ReadContext, write: WriteContext) -> Option> { let io = Rc::new(self); - let mut rio = ReadTask(io.clone()); + let mut rio = Read(io.clone()); let _ = ntex_util::spawn(async move { read.handle(&mut rio).await; }); - let _ = ntex_util::spawn(WriteTask { - io: io.clone(), - state: write, - st: IoWriteState::Processing(None), + + let mut wio = Write(io.clone()); + let _ = ntex_util::spawn(async move { + write.handle(&mut wio).await; }); Some(Box::new(io)) @@ -382,9 +381,9 @@ impl Handle for Rc { } /// Read io task -struct ReadTask(Rc); +struct Read(Rc); -impl crate::AsyncRead for ReadTask { +impl crate::AsyncRead for Read { async fn read(&mut self, mut buf: BytesVec) -> (BytesVec, io::Result) { // read data from socket let result = poll_fn(|cx| self.0.poll_read_buf(cx, &mut buf)).await; @@ -392,205 +391,80 @@ impl crate::AsyncRead for ReadTask { } } -#[derive(Debug)] -enum IoWriteState { - Processing(Option), - Shutdown(Option, Shutdown), -} - -#[derive(Debug)] -enum Shutdown { - None, - Flushed, - Stopping, -} - -/// Write io task -struct WriteTask { - st: IoWriteState, - io: Rc, - state: WriteContext, -} +/// Write +struct Write(Rc); -impl Future for WriteTask { - type Output = (); - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.as_mut().get_mut(); - - match this.st { - IoWriteState::Processing(ref mut delay) => { - match this.state.poll_ready(cx) { - Poll::Ready(WriteStatus::Ready) => { - // flush framed instance - match ready!(flush_io(&this.io, &this.state, cx)) { - Ok(()) => Poll::Pending, - Err(e) => { - this.state.close(Some(e)); - Poll::Ready(()) - } - } - } - Poll::Ready(WriteStatus::Timeout(time)) => { - if delay.is_none() { - *delay = Some(sleep(time)); - } - self.poll(cx) - } - Poll::Ready(WriteStatus::Shutdown(time)) => { - log::trace!("write task is instructed to shutdown"); +impl crate::AsyncWrite for Write { + async fn write(&mut self, mut buf: BytesVec) -> (BytesVec, io::Result<()>) { + let result = poll_fn(|cx| write_io(&self.0, &mut buf, cx)).await; - let timeout = if let Some(delay) = delay.take() { - delay - } else { - sleep(time) - }; + (buf, result) + } - this.st = IoWriteState::Shutdown(Some(timeout), Shutdown::None); - self.poll(cx) - } - Poll::Ready(WriteStatus::Terminate) => { - log::trace!("write task is instructed to terminate"); - // shutdown WRITE side - this.io - .local - .lock() - .unwrap() - .borrow_mut() - .flags - .insert(IoTestFlags::CLOSED); - this.state.close(None); - Poll::Ready(()) - } - Poll::Pending => Poll::Pending, - } - } - IoWriteState::Shutdown(ref mut delay, ref mut st) => { - // close WRITE side and wait for disconnect on read side. - // use disconnect timeout, otherwise it could hang forever. - loop { - match st { - Shutdown::None => { - // flush write buffer - match flush_io(&this.io, &this.state, cx) { - Poll::Ready(Ok(())) => { - *st = Shutdown::Flushed; - continue; - } - Poll::Ready(Err(err)) => { - log::trace!( - "write task is closed with err during flush {:?}", - err - ); - this.state.close(Some(err)); - return Poll::Ready(()); - } - Poll::Pending => (), - } - } - Shutdown::Flushed => { - // shutdown WRITE side - this.io - .local - .lock() - .unwrap() - .borrow_mut() - .flags - .insert(IoTestFlags::CLOSED); - *st = Shutdown::Stopping; - continue; - } - Shutdown::Stopping => { - // read until 0 or err - let io = &this.io; - loop { - let mut buf = BytesVec::new(); - match io.poll_read_buf(cx, &mut buf) { - Poll::Ready(Err(e)) => { - this.state.close(Some(e)); - log::trace!("write task is stopped"); - return Poll::Ready(()); - } - Poll::Ready(Ok(0)) => { - this.state.close(None); - log::trace!("write task is stopped"); - return Poll::Ready(()); - } - Poll::Pending => break, - _ => (), - } - } - } - } + async fn flush(&mut self) -> io::Result<()> { + Ok(()) + } - // disconnect timeout - if let Some(ref delay) = delay { - if delay.poll_elapsed(cx).is_pending() { - return Poll::Pending; - } - } - log::trace!("write task is stopped after delay"); - this.state.close(None); - return Poll::Ready(()); - } - } - } + async fn shutdown(&mut self) -> io::Result<()> { + // shutdown WRITE side + self.0 + .local + .lock() + .unwrap() + .borrow_mut() + .flags + .insert(IoTestFlags::CLOSED); + Ok(()) } } /// Flush write buffer to underlying I/O stream. -pub(super) fn flush_io( +pub(super) fn write_io( io: &IoTest, - state: &WriteContext, + buf: &mut BytesVec, cx: &mut Context<'_>, ) -> Poll> { - state.with_buf(|buf| { - if let Some(buf) = buf { - let len = buf.len(); - - if len != 0 { - log::trace!("flushing framed transport: {}", len); - - let mut written = 0; - let result = loop { - break match io.poll_write_buf(cx, &buf[written..]) { - Poll::Ready(Ok(n)) => { - if n == 0 { - log::trace!( - "disconnected during flush, written {}", - written - ); - Poll::Ready(Err(io::Error::new( - io::ErrorKind::WriteZero, - "failed to write frame to transport", - ))) - } else { - written += n; - if written == len { - buf.clear(); - Poll::Ready(Ok(())) - } else { - continue; - } - } - } - Poll::Pending => { - // remove written data - buf.advance(written); - Poll::Pending - } - Poll::Ready(Err(e)) => { - log::trace!("error during flush: {}", e); - Poll::Ready(Err(e)) + let len = buf.len(); + + if len != 0 { + log::trace!("flushing framed transport: {}", len); + + let mut written = 0; + let result = loop { + break match io.poll_write_buf(cx, &buf[written..]) { + Poll::Ready(Ok(n)) => { + if n == 0 { + log::trace!("disconnected during flush, written {}", written); + Poll::Ready(Err(io::Error::new( + io::ErrorKind::WriteZero, + "failed to write frame to transport", + ))) + } else { + written += n; + if written == len { + buf.clear(); + Poll::Ready(Ok(())) + } else { + continue; } - }; - }; - log::trace!("flushed {} bytes", written); - return result; - } - } + } + } + Poll::Pending => { + // remove written data + buf.advance(written); + Poll::Pending + } + Poll::Ready(Err(e)) => { + log::trace!("error during flush: {}", e); + Poll::Ready(Err(e)) + } + }; + }; + log::trace!("flushed {} bytes", written); + result + } else { Poll::Ready(Ok(())) - }) + } } #[cfg(test)] diff --git a/ntex-tokio/src/io.rs b/ntex-tokio/src/io.rs index 96b1ed65a..a894b9cf7 100644 --- a/ntex-tokio/src/io.rs +++ b/ntex-tokio/src/io.rs @@ -1,12 +1,9 @@ -use std::future::{poll_fn, Future}; use std::task::{Context, Poll}; -use std::{any, cell::RefCell, cmp, io, mem, pin::Pin, rc::Rc, rc::Weak}; +use std::{any, cell::RefCell, cmp, future::poll_fn, io, mem, pin::Pin, rc::Rc, rc::Weak}; use ntex_bytes::{Buf, BufMut, BytesVec}; -use ntex_io::{ - types, Filter, Handle, Io, IoBoxed, IoStream, ReadContext, WriteContext, WriteStatus, -}; -use ntex_util::{ready, time::sleep, time::Millis, time::Sleep}; +use ntex_io::{types, Filter, Handle, Io, IoBoxed, IoStream, ReadContext, WriteContext}; +use ntex_util::{future::lazy, ready, time::Millis}; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; use tokio::net::TcpStream; @@ -14,11 +11,14 @@ impl IoStream for crate::TcpStream { fn start(self, read: ReadContext, write: WriteContext) -> Option> { let io = Rc::new(RefCell::new(self.0)); - let mut rio = ReadTask(io.clone()); + let mut rio = Read(io.clone()); tokio::task::spawn_local(async move { read.handle(&mut rio).await; }); - tokio::task::spawn_local(WriteTask::new(io.clone(), write)); + let mut wio = Write(io.clone()); + tokio::task::spawn_local(async move { + write.handle(&mut wio).await; + }); Some(Box::new(HandleWrapper(io))) } } @@ -39,9 +39,10 @@ impl Handle for HandleWrapper { } /// Read io task -struct ReadTask(Rc>); +struct Read(Rc>); -impl ntex_io::AsyncRead for ReadTask { +impl ntex_io::AsyncRead for Read { + #[inline] async fn read(&mut self, mut buf: BytesVec) -> (BytesVec, io::Result) { // read data from socket let result = poll_fn(|cx| { @@ -53,281 +54,81 @@ impl ntex_io::AsyncRead for ReadTask { } } -#[derive(Debug)] -enum IoWriteState { - Processing(Option), - Shutdown(Sleep, Shutdown), -} +struct Write(Rc>); -#[derive(Debug)] -enum Shutdown { - None, - Flushed, - Stopping(u16), -} - -/// Write io task -struct WriteTask { - st: IoWriteState, - io: Rc>, - state: WriteContext, -} - -impl WriteTask { - /// Create new write io task - fn new(io: Rc>, state: WriteContext) -> Self { - Self { - io, - state, - st: IoWriteState::Processing(None), +impl ntex_io::AsyncWrite for Write { + #[inline] + async fn write(&mut self, mut buf: BytesVec) -> (BytesVec, io::Result<()>) { + match lazy(|cx| flush_io(&mut *self.0.borrow_mut(), &mut buf, cx)).await { + Poll::Ready(res) => (buf, res), + Poll::Pending => (buf, Ok(())), } } -} - -impl Future for WriteTask { - type Output = (); - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.as_mut().get_mut(); - - if this.state.poll_close(cx).is_ready() { - return Poll::Ready(()); - } - - match this.st { - IoWriteState::Processing(ref mut delay) => { - match ready!(this.state.poll_ready(cx)) { - WriteStatus::Ready => { - if let Some(delay) = delay { - if delay.poll_elapsed(cx).is_ready() { - this.state.close(Some(io::Error::new( - io::ErrorKind::TimedOut, - "Operation timedout", - ))); - return Poll::Ready(()); - } - } - - // flush io stream - match ready!(this.state.with_buf(|buf| flush_io( - &mut *this.io.borrow_mut(), - buf, - cx, - &this.state - ))) { - Ok(()) => Poll::Pending, - Err(e) => { - this.state.close(Some(e)); - Poll::Ready(()) - } - } - } - WriteStatus::Timeout(time) => { - log::trace!( - "{}: Initiate timeout delay for {:?}", - this.state.tag(), - time - ); - if delay.is_none() { - *delay = Some(sleep(time)); - } - self.poll(cx) - } - WriteStatus::Shutdown(time) => { - log::trace!( - "{}: Write task is instructed to shutdown", - this.state.tag() - ); - - let timeout = if let Some(delay) = delay.take() { - delay - } else { - sleep(time) - }; - this.st = IoWriteState::Shutdown(timeout, Shutdown::None); - self.poll(cx) - } - WriteStatus::Terminate => { - log::trace!( - "{}: Write task is instructed to terminate", - this.state.tag() - ); - - if !matches!( - this.io.borrow().linger(), - Ok(Some(std::time::Duration::ZERO)) - ) { - // call shutdown to prevent flushing data on terminated Io. when - // linger is set to zero, closing will reset the connection, so - // shutdown is not neccessary. - let _ = Pin::new(&mut *this.io.borrow_mut()).poll_shutdown(cx); - } - this.state.close(None); - Poll::Ready(()) - } - } - } - IoWriteState::Shutdown(ref mut delay, ref mut st) => { - // close WRITE side and wait for disconnect on read side. - // use disconnect timeout, otherwise it could hang forever. - loop { - if this.state.poll_close(cx).is_ready() { - return Poll::Ready(()); - } - match st { - Shutdown::None => { - // flush write buffer - let mut io = this.io.borrow_mut(); - match this - .state - .with_buf(|buf| flush_io(&mut *io, buf, cx, &this.state)) - { - Poll::Ready(Ok(())) => { - *st = Shutdown::Flushed; - continue; - } - Poll::Ready(Err(err)) => { - log::trace!( - "{}: Write task is closed with err during flush, {:?}", this.state.tag(), - err - ); - this.state.close(Some(err)); - return Poll::Ready(()); - } - Poll::Pending => (), - } - } - Shutdown::Flushed => { - // shutdown WRITE side - match Pin::new(&mut *this.io.borrow_mut()).poll_shutdown(cx) { - Poll::Ready(Ok(_)) => { - *st = Shutdown::Stopping(0); - continue; - } - Poll::Ready(Err(e)) => { - log::trace!( - "{}: Write task is closed with err during shutdown", - this.state.tag() - ); - this.state.close(Some(e)); - return Poll::Ready(()); - } - _ => (), - } - } - Shutdown::Stopping(ref mut count) => { - // read until 0 or err - let mut buf = [0u8; 512]; - loop { - let mut read_buf = ReadBuf::new(&mut buf); - match Pin::new(&mut *this.io.borrow_mut()) - .poll_read(cx, &mut read_buf) - { - Poll::Ready(Err(_)) | Poll::Ready(Ok(_)) - if read_buf.filled().is_empty() => - { - this.state.close(None); - log::trace!( - "{}: Tokio write task is stopped", - this.state.tag() - ); - return Poll::Ready(()); - } - Poll::Pending => { - *count += read_buf.filled().len() as u16; - if *count > 4096 { - log::trace!("{}: Tokio write task is stopped, too much input", this.state.tag()); - this.state.close(None); - return Poll::Ready(()); - } - break; - } - _ => (), - } - } - } - } + #[inline] + async fn flush(&mut self) -> io::Result<()> { + Ok(()) + } - // disconnect timeout - if delay.poll_elapsed(cx).is_pending() { - return Poll::Pending; - } - log::trace!("{}: Write task is stopped after delay", this.state.tag()); - this.state.close(None); - return Poll::Ready(()); - } - } - } + #[inline] + async fn shutdown(&mut self) -> io::Result<()> { + poll_fn(|cx| Pin::new(&mut *self.0.borrow_mut()).poll_shutdown(cx)).await } } /// Flush write buffer to underlying I/O stream. pub(super) fn flush_io( io: &mut T, - buf: &mut Option, + buf: &mut BytesVec, cx: &mut Context<'_>, - st: &WriteContext, ) -> Poll> { - if let Some(buf) = buf { - let len = buf.len(); - - if len != 0 { - // log::trace!("{}: Flushing framed transport: {:?}", st.tag(), buf.len()); - - let mut written = 0; - let result = loop { - break match Pin::new(&mut *io).poll_write(cx, &buf[written..]) { - Poll::Ready(Ok(n)) => { - if n == 0 { - log::trace!( - "{}: Disconnected during flush, written {}", - st.tag(), - written - ); - Poll::Ready(Err(io::Error::new( - io::ErrorKind::WriteZero, - "failed to write frame to transport", - ))) + let len = buf.len(); + + if len != 0 { + // log::trace!("{}: Flushing framed transport: {:?}", st.tag(), buf.len()); + + let mut written = 0; + let result = loop { + break match Pin::new(&mut *io).poll_write(cx, &buf[written..]) { + Poll::Ready(Ok(n)) => { + if n == 0 { + Poll::Ready(Err(io::Error::new( + io::ErrorKind::WriteZero, + "failed to write frame to transport", + ))) + } else { + written += n; + if written == len { + buf.clear(); + Poll::Ready(Ok(())) } else { - written += n; - if written == len { - buf.clear(); - Poll::Ready(Ok(())) - } else { - continue; - } + continue; } } - Poll::Pending => { - // remove written data - buf.advance(written); - Poll::Pending - } - Poll::Ready(Err(e)) => { - log::trace!("{}: Error during flush: {}", st.tag(), e); - Poll::Ready(Err(e)) - } - }; - }; - // log::trace!("{}: flushed {} bytes", st.tag(), written); - - // flush - return if written > 0 { - match Pin::new(&mut *io).poll_flush(cx) { - Poll::Ready(Ok(_)) => result, - Poll::Pending => Poll::Pending, - Poll::Ready(Err(e)) => { - log::trace!("{}: Error during flush: {}", st.tag(), e); - Poll::Ready(Err(e)) - } } - } else { - result + Poll::Pending => { + // remove written data + buf.advance(written); + Poll::Pending + } + Poll::Ready(Err(e)) => Poll::Ready(Err(e)), }; + }; + // log::trace!("{}: flushed {} bytes", st.tag(), written); + + // flush + if written > 0 { + match Pin::new(&mut *io).poll_flush(cx) { + Poll::Ready(Ok(_)) => result, + Poll::Pending => Poll::Pending, + Poll::Ready(Err(e)) => Poll::Ready(Err(e)), + } + } else { + result } + } else { + Poll::Ready(Ok(())) } - Poll::Ready(Ok(())) } pub struct TokioIoBoxed(IoBoxed); @@ -425,18 +226,22 @@ mod unixstream { fn start(self, read: ReadContext, write: WriteContext) -> Option> { let io = Rc::new(RefCell::new(self.0)); - let mut rio = ReadTask(io.clone()); + let mut rio = Read(io.clone()); tokio::task::spawn_local(async move { read.handle(&mut rio).await; }); - tokio::task::spawn_local(WriteTask::new(io, write)); + let mut wio = Write(io.clone()); + tokio::task::spawn_local(async move { + write.handle(&mut wio).await; + }); None } } - struct ReadTask(Rc>); + struct Read(Rc>); - impl ntex_io::AsyncRead for ReadTask { + impl ntex_io::AsyncRead for Read { + #[inline] async fn read(&mut self, mut buf: BytesVec) -> (BytesVec, io::Result) { // read data from socket let result = poll_fn(|cx| { @@ -448,191 +253,25 @@ mod unixstream { } } - /// Write io task - struct WriteTask { - st: IoWriteState, - io: Rc>, - state: WriteContext, - } + struct Write(Rc>); - impl WriteTask { - /// Create new write io task - fn new(io: Rc>, state: WriteContext) -> Self { - Self { - io, - state, - st: IoWriteState::Processing(None), + impl ntex_io::AsyncWrite for Write { + #[inline] + async fn write(&mut self, mut buf: BytesVec) -> (BytesVec, io::Result<()>) { + match lazy(|cx| flush_io(&mut *self.0.borrow_mut(), &mut buf, cx)).await { + Poll::Ready(res) => (buf, res), + Poll::Pending => (buf, Ok(())), } } - } - impl Future for WriteTask { - type Output = (); - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.as_mut().get_mut(); - - if this.state.poll_close(cx).is_ready() { - return Poll::Ready(()); - } - - match this.st { - IoWriteState::Processing(ref mut delay) => { - match this.state.poll_ready(cx) { - Poll::Ready(WriteStatus::Ready) => { - if let Some(delay) = delay { - if delay.poll_elapsed(cx).is_ready() { - this.state.close(Some(io::Error::new( - io::ErrorKind::TimedOut, - "Operation timedout", - ))); - return Poll::Ready(()); - } - } - - // flush io stream - match ready!(this.state.with_buf(|buf| flush_io( - &mut *this.io.borrow_mut(), - buf, - cx, - &this.state - ))) { - Ok(()) => Poll::Pending, - Err(e) => { - this.state.close(Some(e)); - Poll::Ready(()) - } - } - } - Poll::Ready(WriteStatus::Timeout(time)) => { - if delay.is_none() { - *delay = Some(sleep(time)); - } - self.poll(cx) - } - Poll::Ready(WriteStatus::Shutdown(time)) => { - log::trace!( - "{}: Write task is instructed to shutdown", - this.state.tag() - ); - - let timeout = if let Some(delay) = delay.take() { - delay - } else { - sleep(time) - }; - - this.st = IoWriteState::Shutdown(timeout, Shutdown::None); - self.poll(cx) - } - Poll::Ready(WriteStatus::Terminate) => { - log::trace!( - "{}: Write task is instructed to terminate", - this.state.tag() - ); - - let _ = Pin::new(&mut *this.io.borrow_mut()).poll_shutdown(cx); - this.state.close(None); - Poll::Ready(()) - } - Poll::Pending => Poll::Pending, - } - } - IoWriteState::Shutdown(ref mut delay, ref mut st) => { - // close WRITE side and wait for disconnect on read side. - // use disconnect timeout, otherwise it could hang forever. - loop { - if this.state.poll_close(cx).is_ready() { - return Poll::Ready(()); - } - match st { - Shutdown::None => { - // flush write buffer - let mut io = this.io.borrow_mut(); - match this.state.with_buf(|buf| { - flush_io(&mut *io, buf, cx, &this.state) - }) { - Poll::Ready(Ok(())) => { - *st = Shutdown::Flushed; - continue; - } - Poll::Ready(Err(err)) => { - log::trace!( - "{}: Write task is closed with err during flush, {:?}", this.state.tag(), - err - ); - this.state.close(Some(err)); - return Poll::Ready(()); - } - Poll::Pending => (), - } - } - Shutdown::Flushed => { - // shutdown WRITE side - match Pin::new(&mut *this.io.borrow_mut()).poll_shutdown(cx) - { - Poll::Ready(Ok(_)) => { - *st = Shutdown::Stopping(0); - continue; - } - Poll::Ready(Err(e)) => { - log::trace!( - "{}: Write task is closed with err during shutdown", this.state.tag() - ); - this.state.close(Some(e)); - return Poll::Ready(()); - } - _ => (), - } - } - Shutdown::Stopping(ref mut count) => { - // read until 0 or err - let mut buf = [0u8; 512]; - loop { - let mut read_buf = ReadBuf::new(&mut buf); - match Pin::new(&mut *this.io.borrow_mut()) - .poll_read(cx, &mut read_buf) - { - Poll::Ready(Err(_)) | Poll::Ready(Ok(_)) - if read_buf.filled().is_empty() => - { - this.state.close(None); - log::trace!( - "{}: Write task is stopped", - this.state.tag() - ); - return Poll::Ready(()); - } - Poll::Pending => { - *count += read_buf.filled().len() as u16; - if *count > 4096 { - log::trace!( - "{}: Write task is stopped, too much input", this.state.tag() - ); - this.state.close(None); - return Poll::Ready(()); - } - break; - } - _ => (), - } - } - } - } + #[inline] + async fn flush(&mut self) -> io::Result<()> { + Ok(()) + } - // disconnect timeout - if delay.poll_elapsed(cx).is_pending() { - return Poll::Pending; - } - log::trace!( - "{}: Write task is stopped after delay", - this.state.tag() - ); - this.state.close(None); - return Poll::Ready(()); - } - } - } + #[inline] + async fn shutdown(&mut self) -> io::Result<()> { + poll_fn(|cx| Pin::new(&mut *self.0.borrow_mut()).poll_shutdown(cx)).await } } }