Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 committed Sep 10, 2024
1 parent cf6797a commit eecfcc6
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 67 deletions.
1 change: 1 addition & 0 deletions ntex-io/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ pub use self::utils::{seal, Decoded};
#[doc(hidden)]
pub use self::flags::Flags;

#[doc(hidden)]
/// Status for read task
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
pub enum ReadStatus {
Expand Down
11 changes: 10 additions & 1 deletion ntex-io/src/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,22 @@ impl ReadContext {
self.0.tag()
}

#[deprecated]
#[inline]
/// Check readiness for read operations
pub async fn ready(&self) -> ReadStatus {
poll_fn(|cx| self.0.filter().poll_read_ready(cx)).await
}

#[deprecated]
#[inline]
/// Wait when io get closed or preparing for close
pub async fn wait_for_close(&self) {
self.wait_for_close2().await
}

Check warning on line 35 in ntex-io/src/tasks.rs

View check run for this annotation

Codecov / codecov/patch

ntex-io/src/tasks.rs#L34-L35

Added lines #L34 - L35 were not covered by tests

/// Wait when io get closed or preparing for close
async fn wait_for_close2(&self) {
poll_fn(|cx| {
let flags = self.0.flags();

Expand All @@ -45,12 +52,14 @@ impl ReadContext {
.await
}

#[deprecated]
#[inline]
/// Check readiness for read operations
pub fn poll_ready(&self, cx: &mut Context<'_>) -> Poll<ReadStatus> {
self.0.filter().poll_read_ready(cx)
}

#[deprecated]
/// Get read buffer
pub fn with_buf<F>(&self, f: F) -> Poll<()>
where
Expand Down Expand Up @@ -183,7 +192,7 @@ impl ReadContext {
let total = buf.len();

// call provided callback
let (buf, result) = match select(io.read(buf), self.wait_for_close()).await {
let (buf, result) = match select(io.read(buf), self.wait_for_close2()).await {
Either::Left(res) => res,
Either::Right(_) => {
log::trace!("{}: Read io is closed, stop read task", self.tag());
Expand Down
76 changes: 10 additions & 66 deletions ntex-io/src/testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{any, cell::RefCell, cmp, fmt, io, mem, net, pin::Pin, rc::Rc};
use ntex_bytes::{Buf, BufMut, Bytes, BytesVec};
use ntex_util::time::{sleep, Millis, Sleep};

use crate::{types, Handle, IoStream, ReadContext, ReadStatus, WriteContext, WriteStatus};
use crate::{types, Handle, IoStream, ReadContext, WriteContext, WriteStatus};

#[derive(Default)]
struct AtomicWaker(Arc<Mutex<RefCell<Option<Waker>>>>);
Expand Down Expand Up @@ -356,9 +356,9 @@ impl IoStream for IoTest {
fn start(self, read: ReadContext, write: WriteContext) -> Option<Box<dyn Handle>> {
let io = Rc::new(self);

let _ = ntex_util::spawn(ReadTask {
io: io.clone(),
state: read,
let mut rio = ReadTask(io.clone());
let _ = ntex_util::spawn(async move {
read.handle(&mut rio).await;
});
let _ = ntex_util::spawn(WriteTask {
io: io.clone(),
Expand All @@ -382,69 +382,13 @@ impl Handle for Rc<IoTest> {
}

/// Read io task
struct ReadTask {
io: Rc<IoTest>,
state: ReadContext,
}

impl Future for ReadTask {
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.as_ref();

this.state.with_buf(|buf, hw, lw| {
match this.state.poll_ready(cx) {
Poll::Ready(ReadStatus::Terminate) => {
log::trace!("read task is instructed to terminate");
Poll::Ready(Ok(()))
}
Poll::Ready(ReadStatus::Ready) => {
let io = &this.io;

// read data from socket
let mut new_bytes = 0;
loop {
// make sure we've got room
let remaining = buf.remaining_mut();
if remaining < lw {
buf.reserve(hw - remaining);
}
match io.poll_read_buf(cx, buf) {
Poll::Pending => {
log::trace!(
"no more data in io stream, read: {:?}",
new_bytes
);
break;
}
Poll::Ready(Ok(n)) => {
if n == 0 {
log::trace!("io stream is disconnected");
return Poll::Ready(Ok(()));
} else {
new_bytes += n;
if buf.len() >= hw {
log::trace!(
"high water mark pause reading, read: {:?}",
new_bytes
);
break;
}
}
}
Poll::Ready(Err(err)) => {
log::trace!("read task failed on io {:?}", err);
return Poll::Ready(Err(err));
}
}
}
struct ReadTask(Rc<IoTest>);

Poll::Pending
}
Poll::Pending => Poll::Pending,
}
})
impl crate::AsyncRead for ReadTask {
async fn read(&mut self, mut buf: BytesVec) -> (BytesVec, io::Result<usize>) {
// read data from socket
let result = poll_fn(|cx| self.0.poll_read_buf(cx, &mut buf)).await;
(buf, result)
}
}

Expand Down

0 comments on commit eecfcc6

Please sign in to comment.