Skip to content

Commit

Permalink
Notify readiness waiters if ready call get dropped
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 committed Sep 29, 2024
1 parent 0d6f348 commit a4c57d1
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 24 deletions.
4 changes: 4 additions & 0 deletions ntex-service/CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [3.1.0] - 2024-09-29

* Notify readiness waiters if ready call get dropped

## [3.0.0] - 2024-05-28

* Use "async fn" for Service::ready() and Service::shutdown() methods
Expand Down
2 changes: 1 addition & 1 deletion ntex-service/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex-service"
version = "3.0.0"
version = "3.1.0"
authors = ["ntex contributors <team@ntex.rs>"]
description = "ntex service"
keywords = ["network", "framework", "async", "futures"]
Expand Down
136 changes: 114 additions & 22 deletions ntex-service/src/ctx.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{cell, fmt, future::poll_fn, future::Future, marker, pin, rc::Rc, task};
use std::{cell, fmt, future::Future, marker, pin::Pin, rc::Rc, task};

use crate::Service;

Expand Down Expand Up @@ -146,26 +146,15 @@ impl<'a, S> ServiceCtx<'a, S> {
T: Service<R>,
{
// check readiness and notify waiters
let mut fut = svc.ready(ServiceCtx {
idx: self.idx,
waiters: self.waiters,
_t: marker::PhantomData,
});

poll_fn(|cx| {
if self.waiters.can_check(self.idx, cx) {
// SAFETY: `fut` never moves
let p = unsafe { pin::Pin::new_unchecked(&mut fut) };
match p.poll(cx) {
task::Poll::Pending => self.waiters.register(self.idx, cx),
task::Poll::Ready(res) => {
self.waiters.notify();
return task::Poll::Ready(res);
}
}
}
task::Poll::Pending
})
ReadyCall {
completed: false,
fut: svc.ready(ServiceCtx {
idx: self.idx,
waiters: self.waiters,
_t: marker::PhantomData,
}),
ctx: *self,
}
.await
}

Expand Down Expand Up @@ -230,11 +219,51 @@ impl<'a, S> fmt::Debug for ServiceCtx<'a, S> {
}
}

struct ReadyCall<'a, S: ?Sized, F: Future> {
completed: bool,
fut: F,
ctx: ServiceCtx<'a, S>,
}

impl<'a, S: ?Sized, F: Future> Drop for ReadyCall<'a, S, F> {
fn drop(&mut self) {
if !self.completed {
self.ctx.waiters.notify();
}
}
}

impl<'a, S: ?Sized, F: Future> Unpin for ReadyCall<'a, S, F> {}

impl<'a, S: ?Sized, F: Future> Future for ReadyCall<'a, S, F> {
type Output = F::Output;

fn poll(
mut self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> task::Poll<Self::Output> {
if self.ctx.waiters.can_check(self.ctx.idx, cx) {
// SAFETY: `fut` never moves
let result = unsafe { Pin::new_unchecked(&mut self.as_mut().fut).poll(cx) };
match result {
task::Poll::Pending => self.ctx.waiters.register(self.ctx.idx, cx),
task::Poll::Ready(res) => {
self.completed = true;
self.ctx.waiters.notify();
return task::Poll::Ready(res);
}
}
}
task::Poll::Pending
}
}

#[cfg(test)]
mod tests {
use std::{cell::Cell, cell::RefCell, future::poll_fn, task::Poll};

use ntex_util::{channel::condition, future::lazy, time};
use ntex_util::channel::{condition, oneshot};
use ntex_util::{future::lazy, future::select, spawn, time};

use super::*;
use crate::Pipeline;
Expand Down Expand Up @@ -299,6 +328,69 @@ mod tests {
assert_eq!(cnt.get(), 3);
}

#[ntex::test]
async fn test_ready_on_drop() {
let cnt = Rc::new(Cell::new(0));
let con = condition::Condition::new();
let srv = Pipeline::from(Srv(cnt.clone(), con.wait()));

let srv1 = srv.clone();
let srv2 = srv1.clone().bind();

let (tx, rx) = oneshot::channel();
spawn(async move {
select(rx, srv1.ready()).await;
time::sleep(time::Millis(25000)).await;
drop(srv1);
});

Check warning on line 345 in ntex-service/src/ctx.rs

View check run for this annotation

Codecov / codecov/patch

ntex-service/src/ctx.rs#L344-L345

Added lines #L344 - L345 were not covered by tests
time::sleep(time::Millis(250)).await;

let res = lazy(|cx| srv2.poll_ready(cx)).await;
assert_eq!(res, Poll::Pending);

let _ = tx.send(());
time::sleep(time::Millis(250)).await;

let res = lazy(|cx| srv2.poll_ready(cx)).await;
assert_eq!(res, Poll::Pending);

con.notify();
let res = lazy(|cx| srv2.poll_ready(cx)).await;
assert_eq!(res, Poll::Ready(Ok(())));
}

#[ntex::test]
async fn test_ready_after_shutdown() {
let cnt = Rc::new(Cell::new(0));
let con = condition::Condition::new();
let srv = Pipeline::from(Srv(cnt.clone(), con.wait()));

let srv1 = srv.clone().bind();
let srv2 = srv1.clone();

let (tx, rx) = oneshot::channel();
spawn(async move {
select(rx, poll_fn(|cx| srv1.poll_ready(cx))).await;
poll_fn(|cx| srv1.poll_shutdown(cx)).await;
time::sleep(time::Millis(25000)).await;
drop(srv1);
});

Check warning on line 377 in ntex-service/src/ctx.rs

View check run for this annotation

Codecov / codecov/patch

ntex-service/src/ctx.rs#L376-L377

Added lines #L376 - L377 were not covered by tests
time::sleep(time::Millis(250)).await;

let res = lazy(|cx| srv2.poll_ready(cx)).await;
assert_eq!(res, Poll::Pending);

let _ = tx.send(());
time::sleep(time::Millis(250)).await;

let res = lazy(|cx| srv2.poll_ready(cx)).await;
assert_eq!(res, Poll::Pending);

con.notify();
let res = lazy(|cx| srv2.poll_ready(cx)).await;
assert_eq!(res, Poll::Ready(Ok(())));
}

#[ntex::test]
async fn test_shared_call() {
let data = Rc::new(RefCell::new(Vec::new()));
Expand Down
9 changes: 9 additions & 0 deletions ntex-service/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,15 @@ struct CheckReadiness<S: 'static, F, Fut> {

impl<S, F, Fut> Unpin for CheckReadiness<S, F, Fut> {}

impl<S, F, Fut> Drop for CheckReadiness<S, F, Fut> {
fn drop(&mut self) {
// future fot dropped during polling, we must notify other waiters
if self.fut.is_some() {
self.pl.waiters.notify();
}
}
}

impl<T, S, F, Fut> Future for CheckReadiness<S, F, Fut>
where
F: Fn(&'static Pipeline<S>) -> Fut,
Expand Down
2 changes: 1 addition & 1 deletion ntex/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ brotli = ["dep:brotli2"]
ntex-codec = "0.6.2"
ntex-http = "0.1.12"
ntex-router = "0.5.3"
ntex-service = "3.0"
ntex-service = "3.1"
ntex-macros = "0.1.3"
ntex-util = "2"
ntex-bytes = "0.1.27"
Expand Down

0 comments on commit a4c57d1

Please sign in to comment.