Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 committed Sep 10, 2024
1 parent eecfcc6 commit f279962
Show file tree
Hide file tree
Showing 9 changed files with 561 additions and 1,759 deletions.
464 changes: 96 additions & 368 deletions ntex-async-std/src/io.rs

Large diffs are not rendered by default.

137 changes: 46 additions & 91 deletions ntex-compio/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,15 @@ use compio::buf::{BufResult, IoBuf, IoBufMut, SetBufInit};
use compio::io::{AsyncRead, AsyncWrite};
use compio::net::TcpStream;
use ntex_bytes::{Buf, BufMut, BytesVec};
use ntex_io::{types, Handle, IoStream, ReadContext, WriteContext, WriteStatus};
use ntex_util::{future::select, future::Either, time::sleep};
use ntex_io::{types, Handle, IoStream, ReadContext, WriteContext};

impl IoStream for crate::TcpStream {
fn start(self, read: ReadContext, write: WriteContext) -> Option<Box<dyn Handle>> {
let io = self.0.clone();
compio::runtime::spawn(async move {
let mut wr_io = io.clone();
let mut wr_io = WriteIo(io.clone());
let wr_task = compio::runtime::spawn(async move {
write_task(&mut wr_io, &write).await;
write.handle(&mut wr_io).await;
log::debug!("{} Write task is stopped", write.tag());
});
let mut io = ReadIo(io);
Expand Down Expand Up @@ -41,9 +40,9 @@ impl IoStream for crate::UnixStream {
fn start(self, read: ReadContext, write: WriteContext) -> Option<Box<dyn Handle>> {
let io = self.0;
compio::runtime::spawn(async move {
let mut wr_io = io.clone();
let mut wr_io = WriteIo(io.clone());
let wr_task = compio::runtime::spawn(async move {
write_task(&mut wr_io, &write).await;
write.handle(&mut wr_io).await;
log::debug!("{} Write task is stopped", write.tag());
});

Expand Down Expand Up @@ -111,105 +110,61 @@ impl SetBufInit for CompioBuf {
}
}

struct ReadIo<T: AsyncRead>(T);
struct ReadIo<T>(T);

impl<T> ntex_io::AsyncRead for ReadIo<T>
where
T: AsyncRead,
{
#[inline]
async fn read(&mut self, buf: BytesVec) -> (BytesVec, io::Result<usize>) {
let BufResult(result, buf) = self.0.read(CompioBuf(buf)).await;
(buf.0, result)
}
}

/// Write io task
async fn write_task<T: AsyncWrite>(mut io: T, state: &WriteContext) {
let mut delay = None;

loop {
let result = if let Some(ref mut sleep) = delay {
let result = match select(sleep, state.ready()).await {
Either::Left(_) => {
state.close(Some(io::Error::new(
io::ErrorKind::TimedOut,
"Operation timedout",
)));
return;
}
Either::Right(res) => res,
};
delay = None;
result
} else {
state.ready().await
};

match result {
WriteStatus::Ready => {
// write io stream
match write(&mut io, state).await {
Ok(()) => continue,
Err(e) => {
state.close(Some(e));
}
}
}
WriteStatus::Timeout(time) => {
log::trace!("{}: Initiate timeout delay for {:?}", state.tag(), time);
delay = Some(sleep(time));
continue;
}
WriteStatus::Shutdown(time) => {
log::trace!("{}: Write task is instructed to shutdown", state.tag());

let fut = async {
write(&mut io, state).await?;
io.flush().await?;
io.shutdown().await?;
Ok(())
};
match select(sleep(time), fut).await {
Either::Left(_) => state.close(None),
Either::Right(res) => state.close(res.err()),
}
}
WriteStatus::Terminate => {
log::trace!("{}: Write task is instructed to terminate", state.tag());
state.close(io.shutdown().await.err());
}
}
break;
}
}
struct WriteIo<T>(T);

// write to io stream
async fn write<T: AsyncWrite>(io: &mut T, state: &WriteContext) -> io::Result<()> {
state
.with_buf_async(|buf| async {
let mut buf = CompioBuf(buf);
loop {
let BufResult(result, buf1) = io.write(buf).await;
buf = buf1;

return match result {
Ok(0) => Err(io::Error::new(
impl<T> ntex_io::AsyncWrite for WriteIo<T>
where
T: AsyncWrite,
{
#[inline]
async fn write(&mut self, buf: BytesVec) -> (BytesVec, io::Result<()>) {
let mut buf = CompioBuf(buf);
loop {
let BufResult(result, buf1) = self.0.write(buf).await;
buf = buf1;

return match result {
Ok(0) => (
buf.0,
Err(io::Error::new(
io::ErrorKind::WriteZero,
"failed to write frame to transport",
)),
Ok(size) => {
if buf.0.len() == size {
// return io.flush().await;
state.memory_pool().release_write_buf(buf.0);
Ok(())
} else {
buf.0.advance(size);
continue;
}
),
Ok(size) => {
buf.0.advance(size);

if buf.0.is_empty() {
(buf.0, Ok(()))
} else {
continue;
}
Err(e) => Err(e),
};
}
})
.await
}
Err(e) => (buf.0, Err(e)),
};
}
}

#[inline]
async fn flush(&mut self) -> io::Result<()> {
self.0.flush().await
}

#[inline]
async fn shutdown(&mut self) -> io::Result<()> {
self.0.shutdown().await
}
}
Loading

0 comments on commit f279962

Please sign in to comment.