diff --git a/ntex-async-std/src/io.rs b/ntex-async-std/src/io.rs index 6ef23b0f..7180aeae 100644 --- a/ntex-async-std/src/io.rs +++ b/ntex-async-std/src/io.rs @@ -1,9 +1,11 @@ -use std::{any, cell::RefCell, future::poll_fn, io, pin::Pin, task::Context, task::Poll}; +use std::{ + any, cell::RefCell, future::poll_fn, io, pin::Pin, task::ready, task::Context, + task::Poll, +}; use async_std::io::{Read as ARead, Write as AWrite}; use ntex_bytes::{Buf, BufMut, BytesVec}; -use ntex_io::{types, Handle, IoStream, ReadContext, WriteContext}; -use ntex_util::{future::lazy, ready}; +use ntex_io::{types, Handle, IoStream, ReadContext, WriteContext, WriteContextBuf}; use crate::TcpStream; @@ -51,11 +53,17 @@ struct Write(RefCell); impl ntex_io::AsyncWrite for Write { #[inline] - async fn write(&mut self, mut buf: BytesVec) -> (BytesVec, io::Result<()>) { - match lazy(|cx| flush_io(&mut self.0.borrow_mut().0, &mut buf, cx)).await { - Poll::Ready(res) => (buf, res), - Poll::Pending => (buf, Ok(())), - } + async fn write(&mut self, buf: &mut WriteContextBuf) -> io::Result<()> { + poll_fn(|cx| { + if let Some(mut b) = buf.take() { + let result = flush_io(&mut self.0.borrow_mut().0, &mut b, cx); + buf.set(b); + result + } else { + Poll::Ready(Ok(())) + } + }) + .await } #[inline] @@ -186,11 +194,17 @@ mod unixstream { impl ntex_io::AsyncWrite for Write { #[inline] - async fn write(&mut self, mut buf: BytesVec) -> (BytesVec, io::Result<()>) { - match lazy(|cx| flush_io(&mut self.0.borrow_mut().0, &mut buf, cx)).await { - Poll::Ready(res) => (buf, res), - Poll::Pending => (buf, Ok(())), - } + async fn write(&mut self, buf: &mut WriteContextBuf) -> io::Result<()> { + poll_fn(|cx| { + if let Some(mut b) = buf.take() { + let result = flush_io(&mut self.0.borrow_mut().0, &mut b, cx); + buf.set(b); + result + } else { + Poll::Ready(Ok(())) + } + }) + .await } #[inline] diff --git a/ntex-compio/CHANGES.md b/ntex-compio/CHANGES.md index 46374448..e4b381bc 100644 --- a/ntex-compio/CHANGES.md +++ b/ntex-compio/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [0.1.2] - 2024-09-11 + +* Use new io api + ## [0.1.1] - 2024-09-05 * Tune write task diff --git a/ntex-compio/src/io.rs b/ntex-compio/src/io.rs index a7f98241..c27e6759 100644 --- a/ntex-compio/src/io.rs +++ b/ntex-compio/src/io.rs @@ -4,27 +4,14 @@ use compio::buf::{BufResult, IoBuf, IoBufMut, SetBufInit}; use compio::io::{AsyncRead, AsyncWrite}; use compio::net::TcpStream; use ntex_bytes::{Buf, BufMut, BytesVec}; -use ntex_io::{types, Handle, IoStream, ReadContext, WriteContext}; +use ntex_io::{types, Handle, IoStream, ReadContext, WriteContext, WriteContextBuf}; impl IoStream for crate::TcpStream { fn start(self, read: ReadContext, write: WriteContext) -> Option> { let io = self.0.clone(); compio::runtime::spawn(async move { - let mut wr_io = WriteIo(io.clone()); - let wr_task = compio::runtime::spawn(async move { - write.handle(&mut wr_io).await; - log::debug!("{} Write task is stopped", write.tag()); - }); - let mut io = ReadIo(io); - - read.handle(&mut io).await; - log::debug!("{} Read task is stopped", read.tag()); - - if !wr_task.is_finished() { - let _ = wr_task.await; - } - - match io.0.close().await { + run(io.clone(), &read, write).await; + match io.close().await { Ok(_) => log::debug!("{} Stream is closed", read.tag()), Err(e) => log::error!("{} Stream is closed, {:?}", read.tag(), e), } @@ -38,24 +25,9 @@ impl IoStream for crate::TcpStream { #[cfg(unix)] impl IoStream for crate::UnixStream { fn start(self, read: ReadContext, write: WriteContext) -> Option> { - let io = self.0; compio::runtime::spawn(async move { - let mut wr_io = WriteIo(io.clone()); - let wr_task = compio::runtime::spawn(async move { - write.handle(&mut wr_io).await; - log::debug!("{} Write task is stopped", write.tag()); - }); - - let mut io = ReadIo(io); - - read.handle(&mut io).await; - log::debug!("{} Read task is stopped", read.tag()); - - if !wr_task.is_finished() { - let _ = wr_task.await; - } - - match io.0.close().await { + run(self.0.clone(), &read, write).await; + match self.0.close().await { Ok(_) => log::debug!("{} Unix stream is closed", read.tag()), Err(e) => log::error!("{} Unix stream is closed, {:?}", read.tag(), e), } @@ -110,6 +82,26 @@ impl SetBufInit for CompioBuf { } } +async fn run( + io: T, + read: &ReadContext, + write: WriteContext, +) { + let mut wr_io = WriteIo(io.clone()); + let wr_task = compio::runtime::spawn(async move { + write.handle(&mut wr_io).await; + log::debug!("{} Write task is stopped", write.tag()); + }); + let mut io = ReadIo(io); + + read.handle(&mut io).await; + log::debug!("{} Read task is stopped", read.tag()); + + if !wr_task.is_finished() { + let _ = wr_task.await; + } +} + struct ReadIo(T); impl ntex_io::AsyncRead for ReadIo @@ -130,31 +122,33 @@ where T: AsyncWrite, { #[inline] - async fn write(&mut self, buf: BytesVec) -> (BytesVec, io::Result<()>) { - let mut buf = CompioBuf(buf); - loop { - let BufResult(result, buf1) = self.0.write(buf).await; - buf = buf1; - - return match result { - Ok(0) => ( - buf.0, - Err(io::Error::new( + async fn write(&mut self, wbuf: &mut WriteContextBuf) -> io::Result<()> { + if let Some(b) = wbuf.take() { + let mut buf = CompioBuf(b); + loop { + let BufResult(result, buf1) = self.0.write(buf).await; + buf = buf1; + + let result = match result { + Ok(0) => Err(io::Error::new( io::ErrorKind::WriteZero, "failed to write frame to transport", )), - ), - Ok(size) => { - buf.0.advance(size); - - if buf.0.is_empty() { - (buf.0, Ok(())) - } else { - continue; + Ok(size) => { + buf.0.advance(size); + if buf.0.is_empty() { + Ok(()) + } else { + continue; + } } - } - Err(e) => (buf.0, Err(e)), - }; + Err(e) => Err(e), + }; + wbuf.set(buf.0); + return result; + } + } else { + Ok(()) } } diff --git a/ntex-glommio/src/io.rs b/ntex-glommio/src/io.rs index 53166de7..09fc0616 100644 --- a/ntex-glommio/src/io.rs +++ b/ntex-glommio/src/io.rs @@ -1,11 +1,8 @@ -use std::task::{Context, Poll}; -use std::{any, future::poll_fn, future::Future, io, pin::Pin}; +use std::{any, future::poll_fn, io, pin::Pin, task::ready, task::Context, task::Poll}; -use futures_lite::future::FutureExt; use futures_lite::io::{AsyncRead, AsyncWrite}; use ntex_bytes::{Buf, BufMut, BytesVec}; -use ntex_io::{types, Handle, IoStream, ReadContext, WriteContext, WriteStatus}; -use ntex_util::{ready, time::sleep, time::Sleep}; +use ntex_io::{types, Handle, IoStream, ReadContext, WriteContext, WriteContextBuf}; use crate::net_impl::{TcpStream, UnixStream}; @@ -62,11 +59,59 @@ struct Write(TcpStream); impl ntex_io::AsyncWrite for Write { #[inline] - async fn write(&mut self, mut buf: BytesVec) -> (BytesVec, io::Result<()>) { - match lazy(|cx| flush_io(&mut *self.0.borrow_mut(), &mut buf, cx)).await { - Poll::Ready(res) => (buf, res), - Poll::Pending => (buf, Ok(())), - } + async fn write(&mut self, buf: &mut WriteContextBuf) -> io::Result<()> { + poll_fn(|cx| { + if let Some(mut b) = buf.take() { + let result = flush_io(&mut *self.0 .0.borrow_mut(), &mut b, cx); + buf.set(b); + result + } else { + Poll::Ready(Ok(())) + } + }) + .await + } + + #[inline] + async fn flush(&mut self) -> io::Result<()> { + Ok(()) + } + + #[inline] + async fn shutdown(&mut self) -> io::Result<()> { + poll_fn(|cx| Pin::new(&mut *self.0 .0.borrow_mut()).poll_close(cx)).await + } +} + +struct UnixRead(UnixStream); + +impl ntex_io::AsyncRead for UnixRead { + async fn read(&mut self, mut buf: BytesVec) -> (BytesVec, io::Result) { + // read data from socket + let result = poll_fn(|cx| { + let mut io = self.0 .0.borrow_mut(); + poll_read_buf(Pin::new(&mut *io), cx, &mut buf) + }) + .await; + (buf, result) + } +} + +struct UnixWrite(UnixStream); + +impl ntex_io::AsyncWrite for UnixWrite { + #[inline] + async fn write(&mut self, buf: &mut WriteContextBuf) -> io::Result<()> { + poll_fn(|cx| { + if let Some(mut b) = buf.take() { + let result = flush_io(&mut *self.0 .0.borrow_mut(), &mut b, cx); + buf.set(b); + result + } else { + Poll::Ready(Ok(())) + } + }) + .await } #[inline] @@ -76,7 +121,7 @@ impl ntex_io::AsyncWrite for Write { #[inline] async fn shutdown(&mut self) -> io::Result<()> { - poll_fn(|cx| Pin::new(&mut *self.0.borrow_mut()).poll_close(cx)).await + poll_fn(|cx| Pin::new(&mut *self.0 .0.borrow_mut()).poll_close(cx)).await } } @@ -125,7 +170,7 @@ pub(super) fn flush_io( // log::trace!("flushed {} bytes", written); // flush - return if written > 0 { + if written > 0 { match Pin::new(&mut *io).poll_flush(cx) { Poll::Ready(Ok(_)) => result, Poll::Pending => Poll::Pending, @@ -136,7 +181,7 @@ pub(super) fn flush_io( } } else { result - }; + } } else { Poll::Ready(Ok(())) } @@ -158,39 +203,3 @@ pub fn poll_read_buf( Poll::Ready(Ok(n)) } - -struct UnixRead(UnixStream); - -impl ntex_io::AsyncRead for UnixRead { - async fn read(&mut self, mut buf: BytesVec) -> (BytesVec, io::Result) { - // read data from socket - let result = poll_fn(|cx| { - let mut io = self.0 .0.borrow_mut(); - poll_read_buf(Pin::new(&mut *io), cx, &mut buf) - }) - .await; - (buf, result) - } -} - -struct UnixWrite(UnixStream); - -impl ntex_io::AsyncWrite for UnixWrite { - #[inline] - async fn write(&mut self, mut buf: BytesVec) -> (BytesVec, io::Result<()>) { - match lazy(|cx| flush_io(&mut *self.0.borrow_mut(), &mut buf, cx)).await { - Poll::Ready(res) => (buf, res), - Poll::Pending => (buf, Ok(())), - } - } - - #[inline] - async fn flush(&mut self) -> io::Result<()> { - Ok(()) - } - - #[inline] - async fn shutdown(&mut self) -> io::Result<()> { - poll_fn(|cx| Pin::new(&mut *self.0.borrow_mut()).poll_close(cx)).await - } -} diff --git a/ntex-io/src/filter.rs b/ntex-io/src/filter.rs index f74e057c..b452429b 100644 --- a/ntex-io/src/filter.rs +++ b/ntex-io/src/filter.rs @@ -95,7 +95,7 @@ impl Filter for Base { fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll { let mut flags = self.0.flags(); - if flags.contains(Flags::IO_STOPPED) { + if flags.is_stopped() { Poll::Ready(WriteStatus::Terminate) } else { self.0 .0.write_task.register(cx.waker()); diff --git a/ntex-io/src/flags.rs b/ntex-io/src/flags.rs index 82056ff5..029d891b 100644 --- a/ntex-io/src/flags.rs +++ b/ntex-io/src/flags.rs @@ -36,6 +36,10 @@ bitflags::bitflags! { } impl Flags { + pub(crate) fn is_stopped(&self) -> bool { + self.intersects(Flags::IO_STOPPED) + } + pub(crate) fn is_waiting_for_write(&self) -> bool { self.intersects(Flags::BUF_W_MUST_FLUSH | Flags::BUF_W_BACKPRESSURE) } diff --git a/ntex-io/src/io.rs b/ntex-io/src/io.rs index ded12141..8bedd1df 100644 --- a/ntex-io/src/io.rs +++ b/ntex-io/src/io.rs @@ -165,7 +165,7 @@ impl Io { let inner = Rc::new(IoState { filter: FilterPtr::null(), pool: Cell::new(pool), - flags: Cell::new(Flags::empty()), + flags: Cell::new(Flags::WR_PAUSED), error: Cell::new(None), dispatch_task: LocalWaker::new(), read_task: LocalWaker::new(), @@ -421,7 +421,7 @@ impl Io { let st = self.st(); let mut flags = st.flags.get(); - if flags.contains(Flags::IO_STOPPED) { + if flags.is_stopped() { Poll::Ready(self.error().map(Err).unwrap_or(Ok(None))) } else { st.dispatch_task.register(cx.waker()); @@ -531,7 +531,7 @@ impl Io { } else { let st = self.st(); let flags = st.flags.get(); - if flags.contains(Flags::IO_STOPPED) { + if flags.is_stopped() { Err(RecvError::PeerGone(self.error())) } else if flags.contains(Flags::DSP_STOP) { st.remove_flags(Flags::DSP_STOP); @@ -568,7 +568,7 @@ impl Io { pub fn poll_flush(&self, cx: &mut Context<'_>, full: bool) -> Poll> { let flags = self.flags(); - if flags.contains(Flags::IO_STOPPED) { + if flags.is_stopped() { Poll::Ready(self.error().map(Err).unwrap_or(Ok(()))) } else { let st = self.st(); @@ -595,7 +595,7 @@ impl Io { let st = self.st(); let flags = st.flags.get(); - if flags.intersects(Flags::IO_STOPPED) { + if flags.is_stopped() { if let Some(err) = self.error() { Poll::Ready(Err(err)) } else { @@ -700,7 +700,7 @@ impl Drop for Io { if st.filter.is_set() { // filter is unsafe and must be dropped explicitly, // and wont be dropped without special attention - if !st.flags.get().contains(Flags::IO_STOPPED) { + if !st.flags.get().is_stopped() { log::trace!( "{}: Io is dropped, force stopping io streams {:?}", st.tag.get(), @@ -884,7 +884,7 @@ pub struct OnDisconnect { impl OnDisconnect { pub(super) fn new(inner: Rc) -> Self { - Self::new_inner(inner.flags.get().contains(Flags::IO_STOPPED), inner) + Self::new_inner(inner.flags.get().is_stopped(), inner) } fn new_inner(disconnected: bool, inner: Rc) -> Self { @@ -909,7 +909,7 @@ impl OnDisconnect { #[inline] /// Check if connection is disconnected pub fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<()> { - if self.token == usize::MAX || self.inner.flags.get().contains(Flags::IO_STOPPED) { + if self.token == usize::MAX || self.inner.flags.get().is_stopped() { Poll::Ready(()) } else if let Some(on_disconnect) = self.inner.on_disconnect.take() { on_disconnect[self.token].register(cx.waker()); diff --git a/ntex-io/src/lib.rs b/ntex-io/src/lib.rs index 2a09cdae..ee3ede96 100644 --- a/ntex-io/src/lib.rs +++ b/ntex-io/src/lib.rs @@ -31,7 +31,7 @@ pub use self::filter::{Base, Filter, Layer}; pub use self::framed::Framed; pub use self::io::{Io, IoRef, OnDisconnect}; pub use self::seal::{IoBoxed, Sealed}; -pub use self::tasks::{ReadContext, WriteContext}; +pub use self::tasks::{ReadContext, WriteContext, WriteContextBuf}; pub use self::timer::TimerHandle; pub use self::utils::{seal, Decoded}; @@ -45,7 +45,7 @@ pub trait AsyncRead { #[doc(hidden)] pub trait AsyncWrite { - async fn write(&mut self, buf: BytesVec) -> (BytesVec, sio::Result<()>); + async fn write(&mut self, buf: &mut WriteContextBuf) -> sio::Result<()>; async fn flush(&mut self) -> sio::Result<()>; diff --git a/ntex-io/src/tasks.rs b/ntex-io/src/tasks.rs index 281faaaf..b6ff33c2 100644 --- a/ntex-io/src/tasks.rs +++ b/ntex-io/src/tasks.rs @@ -1,6 +1,6 @@ use std::{future::poll_fn, io, task::Poll}; -use ntex_bytes::BufMut; +use ntex_bytes::{BufMut, BytesVec}; use ntex_util::{future::select, future::Either, time::sleep}; use crate::{AsyncRead, AsyncWrite, Flags, IoRef, ReadStatus, WriteStatus}; @@ -166,6 +166,13 @@ impl ReadContext { /// Context for io write task pub struct WriteContext(IoRef); +#[derive(Debug)] +/// Context buf for io write task +pub struct WriteContextBuf { + io: IoRef, + buf: Option, +} + impl WriteContext { pub(crate) fn new(io: &IoRef) -> Self { Self(io.clone()) @@ -187,6 +194,19 @@ impl WriteContext { self.0 .0.io_stopped(err); } + /// Check if io is closed + async fn when_stopped(&self) { + poll_fn(|cx| { + if self.0.flags().is_stopped() { + Poll::Ready(()) + } else { + self.0 .0.write_task.register(cx.waker()); + Poll::Pending + } + }) + .await + } + /// Handle write io operations pub async fn handle(&self, io: &mut T) where @@ -194,7 +214,10 @@ impl WriteContext { { let inner = &self.0 .0; let mut delay = None; - let mut buf = None; + let mut buf = WriteContextBuf { + io: self.0.clone(), + buf: None, + }; loop { // check readiness @@ -222,73 +245,14 @@ impl WriteContext { inner.flags.set(flags); } - // call provided callback + // handle write match result { WriteStatus::Ready => { // write io stream - let (buf_result, result) = if let Some(b) = buf.take() { - io.write(b).await - } else if let Some(b) = inner.buffer.get_write_destination() { - io.write(b).await - } else { - // nothing to write, wait for wakeup - if flags.is_waiting_for_write() { - flags.waiting_for_write_is_done(); - inner.dispatch_task.wake(); - } - flags.insert(Flags::WR_PAUSED); - - if flags.contains(Flags::BUF_W_BACKPRESSURE) { - flags.remove( - Flags::BUF_W_BACKPRESSURE | Flags::BUF_W_MUST_FLUSH, - ); - inner.dispatch_task.wake(); - } - inner.flags.set(flags); - - continue; - }; - - match result { - Ok(_) => { - let len = if buf_result.is_empty() { - // return io.flush().await; - self.0.memory_pool().release_write_buf(buf_result); - 0 - } else if let Some(b) = - inner.buffer.set_write_destination(buf_result) - { - // write buffer is already set, we have to write - // current buffer - let l = b.len(); - buf = Some(b); - l - } else { - 0 - }; - - // if write buffer is smaller than high watermark value, turn off back-pressure - let mut flags = inner.flags.get(); - let len = len + inner.buffer.write_destination_size(); - - if len == 0 { - if flags.is_waiting_for_write() { - flags.waiting_for_write_is_done(); - inner.dispatch_task.wake(); - } - flags.insert(Flags::WR_PAUSED); - inner.flags.set(flags); - } else if flags.contains(Flags::BUF_W_BACKPRESSURE) - && len < inner.pool.get().write_params_high() << 1 - { - flags.remove(Flags::BUF_W_BACKPRESSURE); - inner.flags.set(flags); - inner.dispatch_task.wake(); - } - - continue; - } - Err(e) => self.close(Some(e)), + match select(io.write(&mut buf), self.when_stopped()).await { + Either::Left(Ok(_)) => continue, + Either::Left(Err(e)) => self.close(Some(e)), + Either::Right(_) => return, } } WriteStatus::Timeout(time) => { @@ -301,23 +265,7 @@ impl WriteContext { let fut = async { // write io stream - loop { - buf = if let Some(b) = buf { - let (b, result) = io.write(b).await; - result?; - if !b.is_empty() { - Some(b) - } else { - None - } - } else { - inner.buffer.get_write_destination() - }; - - if buf.is_none() { - break; - } - } + io.write(&mut buf).await?; io.flush().await?; io.shutdown().await?; Ok(()) @@ -337,6 +285,50 @@ impl WriteContext { } } +impl WriteContextBuf { + pub fn set(&mut self, mut buf: BytesVec) { + if buf.is_empty() { + self.io.memory_pool().release_write_buf(buf); + } else if let Some(b) = self.buf.take() { + buf.extend_from_slice(&b); + self.io.memory_pool().release_write_buf(b); + self.buf = Some(buf); + } else if let Some(b) = self.io.0.buffer.set_write_destination(buf) { + // write buffer is already set + self.buf = Some(b); + } + + // if write buffer is smaller than high watermark value, turn off back-pressure + let inner = &self.io.0; + let len = self.buf.as_ref().map(|b| b.len()).unwrap_or_default() + + inner.buffer.write_destination_size(); + let mut flags = inner.flags.get(); + + if len == 0 { + if flags.is_waiting_for_write() { + flags.waiting_for_write_is_done(); + inner.dispatch_task.wake(); + } + flags.insert(Flags::WR_PAUSED); + inner.flags.set(flags); + } else if flags.contains(Flags::BUF_W_BACKPRESSURE) + && len < inner.pool.get().write_params_high() << 1 + { + flags.remove(Flags::BUF_W_BACKPRESSURE); + inner.flags.set(flags); + inner.dispatch_task.wake(); + } + } + + pub fn take(&mut self) -> Option { + if let Some(buf) = self.buf.take() { + Some(buf) + } else { + self.io.0.buffer.get_write_destination() + } + } +} + fn shutdown_filters(io: &IoRef) { let st = &io.0; let flags = st.flags.get(); diff --git a/ntex-io/src/testing.rs b/ntex-io/src/testing.rs index 496d143c..63c3f759 100644 --- a/ntex-io/src/testing.rs +++ b/ntex-io/src/testing.rs @@ -7,7 +7,7 @@ use std::{any, cell::RefCell, cmp, fmt, future::poll_fn, io, mem, net, rc::Rc}; use ntex_bytes::{Buf, BufMut, Bytes, BytesVec}; use ntex_util::time::{sleep, Millis}; -use crate::{types, Handle, IoStream, ReadContext, WriteContext}; +use crate::{types, Handle, IoStream, ReadContext, WriteContext, WriteContextBuf}; #[derive(Default)] struct AtomicWaker(Arc>>>); @@ -395,10 +395,17 @@ impl crate::AsyncRead for Read { struct Write(Rc); impl crate::AsyncWrite for Write { - async fn write(&mut self, mut buf: BytesVec) -> (BytesVec, io::Result<()>) { - let result = poll_fn(|cx| write_io(&self.0, &mut buf, cx)).await; - - (buf, result) + async fn write(&mut self, buf: &mut WriteContextBuf) -> io::Result<()> { + poll_fn(|cx| { + if let Some(mut b) = buf.take() { + let result = write_io(&self.0, &mut b, cx); + buf.set(b); + result + } else { + Poll::Ready(Ok(())) + } + }) + .await } async fn flush(&mut self) -> io::Result<()> { diff --git a/ntex-tokio/CHANGES.md b/ntex-tokio/CHANGES.md index ceaa64a8..08b453b7 100644 --- a/ntex-tokio/CHANGES.md +++ b/ntex-tokio/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [0.5.2] - 2024-09-11 + +* Use new io api + ## [0.5.1] - 2024-09-06 * Stop write task if io is closed diff --git a/ntex-tokio/src/io.rs b/ntex-tokio/src/io.rs index a894b9cf..a0c52f3b 100644 --- a/ntex-tokio/src/io.rs +++ b/ntex-tokio/src/io.rs @@ -2,8 +2,11 @@ use std::task::{Context, Poll}; use std::{any, cell::RefCell, cmp, future::poll_fn, io, mem, pin::Pin, rc::Rc, rc::Weak}; use ntex_bytes::{Buf, BufMut, BytesVec}; -use ntex_io::{types, Filter, Handle, Io, IoBoxed, IoStream, ReadContext, WriteContext}; -use ntex_util::{future::lazy, ready, time::Millis}; +use ntex_io::{ + types, Filter, Handle, Io, IoBoxed, IoStream, ReadContext, WriteContext, + WriteContextBuf, +}; +use ntex_util::{ready, time::Millis}; use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; use tokio::net::TcpStream; @@ -58,11 +61,17 @@ struct Write(Rc>); impl ntex_io::AsyncWrite for Write { #[inline] - async fn write(&mut self, mut buf: BytesVec) -> (BytesVec, io::Result<()>) { - match lazy(|cx| flush_io(&mut *self.0.borrow_mut(), &mut buf, cx)).await { - Poll::Ready(res) => (buf, res), - Poll::Pending => (buf, Ok(())), - } + async fn write(&mut self, buf: &mut WriteContextBuf) -> io::Result<()> { + poll_fn(|cx| { + if let Some(mut b) = buf.take() { + let result = flush_io(&mut *self.0.borrow_mut(), &mut b, cx); + buf.set(b); + result + } else { + Poll::Ready(Ok(())) + } + }) + .await } #[inline] @@ -257,11 +266,17 @@ mod unixstream { impl ntex_io::AsyncWrite for Write { #[inline] - async fn write(&mut self, mut buf: BytesVec) -> (BytesVec, io::Result<()>) { - match lazy(|cx| flush_io(&mut *self.0.borrow_mut(), &mut buf, cx)).await { - Poll::Ready(res) => (buf, res), - Poll::Pending => (buf, Ok(())), - } + async fn write(&mut self, buf: &mut WriteContextBuf) -> io::Result<()> { + poll_fn(|cx| { + if let Some(mut b) = buf.take() { + let result = flush_io(&mut *self.0.borrow_mut(), &mut b, cx); + buf.set(b); + result + } else { + Poll::Ready(Ok(())) + } + }) + .await } #[inline] diff --git a/ntex-tokio/src/lib.rs b/ntex-tokio/src/lib.rs index 79d538e2..8916e200 100644 --- a/ntex-tokio/src/lib.rs +++ b/ntex-tokio/src/lib.rs @@ -4,10 +4,8 @@ use ntex_bytes::PoolRef; use ntex_io::Io; mod io; -mod signals; pub use self::io::{SocketOptions, TokioIoBoxed}; -pub use self::signals::{signal, Signal}; struct TcpStream(tokio::net::TcpStream); diff --git a/ntex-tokio/src/signals.rs b/ntex-tokio/src/signals.rs deleted file mode 100644 index 87ff229d..00000000 --- a/ntex-tokio/src/signals.rs +++ /dev/null @@ -1,138 +0,0 @@ -use std::{ - cell::RefCell, future::Future, mem, pin::Pin, rc::Rc, task::Context, task::Poll, -}; - -use tokio::sync::oneshot; -use tokio::task::spawn_local; - -thread_local! { - static SRUN: RefCell = const { RefCell::new(false) }; - static SHANDLERS: Rc>>> = Default::default(); -} - -/// Different types of process signals -#[derive(PartialEq, Eq, Clone, Copy, Debug)] -pub enum Signal { - /// SIGHUP - Hup, - /// SIGINT - Int, - /// SIGTERM - Term, - /// SIGQUIT - Quit, -} - -/// Register signal handler. -/// -/// Signals are handled by oneshots, you have to re-register -/// after each signal. -pub fn signal() -> Option> { - if !SRUN.with(|v| *v.borrow()) { - spawn_local(Signals::new()); - } - SHANDLERS.with(|handlers| { - let (tx, rx) = oneshot::channel(); - handlers.borrow_mut().push(tx); - Some(rx) - }) -} - -struct Signals { - #[cfg(not(unix))] - signal: Pin>>>, - #[cfg(unix)] - signals: Vec<( - Signal, - tokio::signal::unix::Signal, - tokio::signal::unix::SignalKind, - )>, -} - -impl Signals { - fn new() -> Signals { - SRUN.with(|h| *h.borrow_mut() = true); - - #[cfg(not(unix))] - { - Signals { - signal: Box::pin(tokio::signal::ctrl_c()), - } - } - - #[cfg(unix)] - { - use tokio::signal::unix; - - let sig_map = [ - (unix::SignalKind::interrupt(), Signal::Int), - (unix::SignalKind::hangup(), Signal::Hup), - (unix::SignalKind::terminate(), Signal::Term), - (unix::SignalKind::quit(), Signal::Quit), - ]; - - let mut signals = Vec::new(); - for (kind, sig) in sig_map.iter() { - match unix::signal(*kind) { - Ok(stream) => signals.push((*sig, stream, *kind)), - Err(e) => log::error!( - "Cannot initialize stream handler for {:?} err: {}", - sig, - e - ), - } - } - - Signals { signals } - } - } -} - -impl Drop for Signals { - fn drop(&mut self) { - SRUN.with(|h| *h.borrow_mut() = false); - } -} - -impl Future for Signals { - type Output = (); - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - #[cfg(not(unix))] - { - if self.signal.as_mut().poll(cx).is_ready() { - let handlers = SHANDLERS.with(|h| mem::take(&mut *h.borrow_mut())); - for sender in handlers { - let _ = sender.send(Signal::Int); - } - } - Poll::Pending - } - #[cfg(unix)] - { - for (sig, stream, kind) in self.signals.iter_mut() { - loop { - if Pin::new(&mut *stream).poll_recv(cx).is_ready() { - let handlers = SHANDLERS.with(|h| mem::take(&mut *h.borrow_mut())); - for sender in handlers { - let _ = sender.send(*sig); - } - match tokio::signal::unix::signal(*kind) { - Ok(s) => { - *stream = s; - continue; - } - Err(e) => log::error!( - "Cannot initialize stream handler for {:?} err: {}", - sig, - e - ), - } - } - break; - } - } - Poll::Pending - } - } -} diff --git a/ntex/Cargo.toml b/ntex/Cargo.toml index 46e88c43..28881625 100644 --- a/ntex/Cargo.toml +++ b/ntex/Cargo.toml @@ -71,7 +71,7 @@ ntex-bytes = "0.1.27" ntex-server = "2.3" ntex-h2 = "1.1" ntex-rt = "0.4.15" -ntex-io = "2.4" +ntex-io = "2.5" ntex-net = "2.1" ntex-tls = "2.1"