Skip to content

Commit

Permalink
Run un-readiness check in separate task (#464)
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 authored Nov 10, 2024
1 parent a97059a commit c657b9c
Show file tree
Hide file tree
Showing 7 changed files with 114 additions and 26 deletions.
2 changes: 2 additions & 0 deletions ntex-io/CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

* Check service readiness once per decoded item

* Run un-readiness check in separate task

## [2.8.2] - 2024-11-05

* Do not rely on not_ready(), always check service readiness
Expand Down
3 changes: 2 additions & 1 deletion ntex-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ path = "src/lib.rs"
ntex-codec = "0.6"
ntex-bytes = "0.1"
ntex-util = "2.5"
ntex-service = "3.3"
ntex-service = "3.3.3"
ntex-rt = "0.4"

bitflags = "2"
log = "0.4"
Expand Down
62 changes: 45 additions & 17 deletions ntex-io/src/dispatcher.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Framed transport dispatcher
#![allow(clippy::let_underscore_future)]
use std::task::{ready, Context, Poll};
use std::{cell::Cell, future::Future, pin::Pin, rc::Rc};
use std::{cell::Cell, future::poll_fn, future::Future, pin::Pin, rc::Rc};

use ntex_codec::{Decoder, Encoder};
use ntex_service::{IntoService, Pipeline, PipelineBinding, PipelineCall, Service};
Expand Down Expand Up @@ -126,12 +126,12 @@ pin_project_lite::pin_project! {
bitflags::bitflags! {
#[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
struct Flags: u8 {
const READY_ERR = 0b000001;
const IO_ERR = 0b000010;
const KA_ENABLED = 0b000100;
const KA_TIMEOUT = 0b001000;
const READ_TIMEOUT = 0b010000;
const READY = 0b100000;
const READY_ERR = 0b0000001;
const IO_ERR = 0b0000010;
const KA_ENABLED = 0b0000100;
const KA_TIMEOUT = 0b0001000;
const READ_TIMEOUT = 0b0010000;
const READY_TASK = 0b1000000;
}
}

Expand Down Expand Up @@ -160,7 +160,8 @@ where
codec: U,
service: PipelineBinding<S, DispatchItem<U>>,
error: Cell<Option<DispatcherError<S::Error, <U as Encoder>::Error>>>,
inflight: Cell<usize>,
inflight: Cell<u32>,
ready: Cell<bool>,
}

#[derive(Copy, Clone, Debug)]
Expand Down Expand Up @@ -222,6 +223,7 @@ where
codec,
error: Cell::new(None),
inflight: Cell::new(0),
ready: Cell::new(false),
service: Pipeline::new(service.into_service()).bind(),
});

Expand Down Expand Up @@ -282,6 +284,12 @@ where
}
}

// ready task
if slf.flags.contains(Flags::READY_TASK) {
slf.flags.insert(Flags::READY_TASK);
ntex_rt::spawn(not_ready(slf.shared.clone()));
}

loop {
match slf.st {
DispatcherState::Processing => {
Expand Down Expand Up @@ -342,7 +350,7 @@ where
PollService::Continue => continue,
};

slf.flags.remove(Flags::READY);
slf.shared.ready.set(false);
slf.call_service(cx, item);
}
// handle write back-pressure
Expand Down Expand Up @@ -472,19 +480,14 @@ where
}

fn poll_service(&mut self, cx: &mut Context<'_>) -> Poll<PollService<U>> {
if self.flags.contains(Flags::READY) {
if self.shared.service.poll_not_ready(cx).is_ready() {
self.flags.remove(Flags::READY);
} else {
return Poll::Ready(self.check_error());
}
if self.shared.ready.get() {
return Poll::Ready(self.check_error());
}

// wait until service becomes ready
match self.shared.service.poll_ready(cx) {
Poll::Ready(Ok(_)) => {
self.flags.insert(Flags::READY);
let _ = self.shared.service.poll_not_ready(cx);
self.shared.ready.set(true);
Poll::Ready(self.check_error())
}
// pause io read task
Expand Down Expand Up @@ -625,6 +628,30 @@ where
}
}

async fn not_ready<S, U>(slf: Rc<DispatcherShared<S, U>>)
where
S: Service<DispatchItem<U>, Response = Option<Response<U>>> + 'static,
U: Encoder + Decoder + 'static,
{
let pl = slf.service.clone();
loop {
if !pl.is_shutdown() {
if let Err(err) = poll_fn(|cx| pl.poll_ready(cx)).await {
log::trace!("{}: Service readiness check failed, stopping", slf.io.tag());
slf.error.set(Some(DispatcherError::Service(err)));
break;
}
if !pl.is_shutdown() {
poll_fn(|cx| pl.poll_not_ready(cx)).await;
slf.ready.set(false);
slf.io.wake();
continue;
}
}
break;
}
}

#[cfg(test)]
mod tests {
use std::sync::{atomic::AtomicBool, atomic::Ordering::Relaxed, Arc, Mutex};
Expand Down Expand Up @@ -724,6 +751,7 @@ mod tests {
io: state.into(),
error: Cell::new(None),
inflight: Cell::new(0),
ready: Cell::new(false),
service: Pipeline::new(service).bind(),
});

Expand Down
4 changes: 4 additions & 0 deletions ntex-service/CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [3.3.3] - 2024-11-10

* Add Pipeline::is_shutdown() helper

## [3.3.2] - 2024-11-10

* Fix un-needed wakeups for unready future
Expand Down
2 changes: 1 addition & 1 deletion ntex-service/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-service"
version = "3.3.2"
version = "3.3.3"
authors = ["ntex contributors <team@ntex.rs>"]
description = "ntex service"
keywords = ["network", "framework", "async", "futures"]
Expand Down
26 changes: 26 additions & 0 deletions ntex-service/src/ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,23 @@ pub struct ServiceCtx<'a, S: ?Sized> {
#[derive(Debug)]
pub(crate) struct WaitersRef {
cur: cell::Cell<u32>,
shutdown: cell::Cell<bool>,
wakers: cell::UnsafeCell<Vec<u32>>,
indexes: cell::UnsafeCell<slab::Slab<Option<Waker>>>,
}

impl WaitersRef {
pub(crate) fn new() -> (u32, Self) {
let mut waiters = slab::Slab::new();

// first insert for wake ups from services
let _ = waiters.insert(None);

(
waiters.insert(Default::default()) as u32,
WaitersRef {
cur: cell::Cell::new(u32::MAX),
shutdown: cell::Cell::new(false),
indexes: cell::UnsafeCell::new(waiters),
wakers: cell::UnsafeCell::new(Vec::default()),
},
Expand Down Expand Up @@ -62,6 +68,18 @@ impl WaitersRef {
self.get()[idx as usize] = Some(cx.waker().clone());
}

pub(crate) fn register_unready(&self, cx: &mut Context<'_>) {
self.get()[0] = Some(cx.waker().clone());
}

pub(crate) fn notify_unready(&self) {
if let Some(item) = self.get().get_mut(0) {
if let Some(waker) = item.take() {
waker.wake();
}
}
}

pub(crate) fn notify(&self) {
let wakers = self.get_wakers();
if !wakers.is_empty() {
Expand Down Expand Up @@ -90,6 +108,14 @@ impl WaitersRef {
false
}
}

pub(crate) fn shutdown(&self) {
self.shutdown.set(true);
}

pub(crate) fn is_shutdown(&self) -> bool {
self.shutdown.get()
}
}

impl<'a, S> ServiceCtx<'a, S> {
Expand Down
41 changes: 34 additions & 7 deletions ntex-service/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,12 @@ impl<S> Pipeline<S> {
}
}

#[inline]
/// Check if shutdown is initiated.
pub fn is_shutdown(&self) -> bool {
self.state.waiters.is_shutdown()
}

#[inline]
/// Shutdown enclosed service.
pub async fn shutdown<R>(&self)
Expand Down Expand Up @@ -202,6 +208,12 @@ where
&self.pl.state.svc
}

#[inline]
/// Get pipeline
pub fn pipeline(&self) -> Pipeline<S> {
self.pl.clone()
}

#[inline]
/// Returns `Ready` when the pipeline is able to process requests.
///
Expand Down Expand Up @@ -263,6 +275,8 @@ where
// `self` is alive
let pl: &'static Pipeline<S> = unsafe { std::mem::transmute(&self.pl) };
*st = State::Shutdown(Box::pin(async move { pl.shutdown().await }));
pl.state.waiters.shutdown();
pl.state.waiters.notify_unready();
self.poll_shutdown(cx)
}
State::Shutdown(ref mut fut) => Pin::new(fut).poll(cx),
Expand Down Expand Up @@ -300,6 +314,12 @@ where
}
}

#[inline]
/// Check if shutdown is initiated.
pub fn is_shutdown(&self) -> bool {
self.pl.state.waiters.is_shutdown()
}

#[inline]
/// Shutdown enclosed service.
pub async fn shutdown(&self) {
Expand Down Expand Up @@ -449,26 +469,33 @@ struct CheckUnReadiness<S: 'static, F, Fut> {

impl<S, F, Fut> Unpin for CheckUnReadiness<S, F, Fut> {}

impl<T, S, F, Fut> Future for CheckUnReadiness<S, F, Fut>
impl<S, F, Fut> Future for CheckUnReadiness<S, F, Fut>
where
F: Fn(&'static Pipeline<S>) -> Fut,
Fut: Future<Output = T>,
Fut: Future<Output = ()>,
{
type Output = T;
type Output = ();

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
let mut slf = self.as_mut();

if slf.fut.is_none() {
slf.fut = Some((slf.f)(slf.pl));
}
let fut = slf.fut.as_mut().unwrap();
match unsafe { Pin::new_unchecked(fut) }.poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(res) => {
Poll::Pending => {
if slf.pl.state.waiters.is_shutdown() {
Poll::Ready(())
} else {
slf.pl.state.waiters.register_unready(cx);
Poll::Pending
}
}
Poll::Ready(()) => {
let _ = slf.fut.take();
slf.pl.state.waiters.notify();
Poll::Ready(res)
Poll::Ready(())
}
}
}
Expand Down

0 comments on commit c657b9c

Please sign in to comment.