Skip to content

Commit

Permalink
Start http client timeout after sending body
Browse files Browse the repository at this point in the history
  • Loading branch information
fafhrd91 committed Nov 12, 2023
1 parent c6b2612 commit 9a82a78
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 33 deletions.
4 changes: 4 additions & 0 deletions ntex/CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changes

## [0.7.10] - 2023-11-12

* Start http client timeout after sending body

## [0.7.9] - 2023-11-11

* Update ntex io
Expand Down
6 changes: 3 additions & 3 deletions ntex/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ntex"
version = "0.7.9"
version = "0.7.10"
authors = ["ntex contributors <team@ntex.rs>"]
description = "Framework for composable network services"
readme = "README.md"
Expand Down Expand Up @@ -72,7 +72,7 @@ log = "0.4"
nanorand = { version = "0.7", default-features = false, features = ["std", "wyrand"] }
polling = "3.3"
pin-project-lite = "0.2"
regex = { version = "1.10.1", default-features = false, features = ["std"] }
regex = { version = "1.10", default-features = false, features = ["std"] }
sha-1 = "0.10"
serde = { version = "1.0", features=["derive"] }
socket2 = "0.5"
Expand All @@ -94,7 +94,7 @@ tls-openssl = { version="0.10", package = "openssl", optional = true }

# rustls
tls-rustls = { version = "0.21", package = "rustls", optional = true }
webpki-roots = { version = "0.25.2", optional = true }
webpki-roots = { version = "0.25", optional = true }

# compression
brotli2 = { version="0.3.2", optional = true }
Expand Down
7 changes: 4 additions & 3 deletions ntex/src/http/client/connect.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
use std::{fmt, net};

use crate::http::{body::Body, RequestHeadType};
use crate::{service::Pipeline, service::Service, util::BoxFuture};
use crate::{service::Pipeline, service::Service, time::Millis, util::BoxFuture};

use super::error::{ConnectError, SendRequestError};
use super::response::ClientResponse;
use super::{Connect as ClientConnect, Connection};

// #[derive(Debug)]
pub(super) struct ConnectorWrapper<T>(pub(crate) Pipeline<T>);

impl<T> fmt::Debug for ConnectorWrapper<T>
Expand All @@ -27,6 +26,7 @@ pub(super) trait Connect: fmt::Debug {
head: RequestHeadType,
body: Body,
addr: Option<net::SocketAddr>,
timeout: Millis,
) -> BoxFuture<'_, Result<ClientResponse, SendRequestError>>;
}

Expand All @@ -39,6 +39,7 @@ where
head: RequestHeadType,
body: Body,
addr: Option<net::SocketAddr>,
timeout: Millis,
) -> BoxFuture<'_, Result<ClientResponse, SendRequestError>> {
Box::pin(async move {
// connect to the host
Expand All @@ -51,7 +52,7 @@ where

// send request
connection
.send_request(head, body)
.send_request(head, body, timeout)
.await
.map(|(head, payload)| ClientResponse::new(head, payload))
})
Expand Down
16 changes: 14 additions & 2 deletions ntex/src/http/client/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::http::body::MessageBody;
use crate::http::message::{RequestHeadType, ResponseHead};
use crate::http::payload::Payload;
use crate::io::{types::HttpProtocol, IoBoxed};
use crate::time::Millis;

use super::{error::SendRequestError, h1proto, h2proto, pool::Acquired};

Expand Down Expand Up @@ -81,12 +82,23 @@ impl Connection {
mut self,
head: H,
body: B,
timeout: Millis,
) -> Result<(ResponseHead, Payload), SendRequestError> {
match self.io.take().unwrap() {
ConnectionType::H1(io) => {
h1proto::send_request(io, head.into(), body, self.created, self.pool).await
h1proto::send_request(
io,
head.into(),
body,
self.created,
timeout,
self.pool,
)
.await
}
ConnectionType::H2(io) => {
h2proto::send_request(io, head.into(), body, timeout).await
}
ConnectionType::H2(io) => h2proto::send_request(io, head.into(), body).await,
}
}
}
27 changes: 18 additions & 9 deletions ntex/src/http/client/h1proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::http::header::{HeaderMap, HeaderValue, HOST};
use crate::http::message::{RequestHeadType, ResponseHead};
use crate::http::payload::{Payload, PayloadStream};
use crate::io::{IoBoxed, RecvError};
use crate::time::{timeout_checked, Millis};
use crate::util::{poll_fn, ready, BufMut, Bytes, BytesMut, Stream};

use super::connection::{Connection, ConnectionType};
Expand All @@ -18,6 +19,7 @@ pub(super) async fn send_request<B>(
mut head: RequestHeadType,
body: B,
created: Instant,
timeout: Millis,
pool: Option<Acquired>,
) -> Result<(ResponseHead, Payload), SendRequestError>
where
Expand Down Expand Up @@ -73,17 +75,24 @@ where
log::trace!("reading http1 response");

// read response and init read body
let head = if let Some(result) = io.recv(&codec).await? {
log::trace!(
"http1 response is received, type: {:?}, response: {:#?}",
codec.message_type(),
result
);
result
} else {
return Err(SendRequestError::from(ConnectError::Disconnected(None)));
let fut = async {
if let Some(result) = io.recv(&codec).await? {
log::trace!(
"http1 response is received, type: {:?}, response: {:#?}",
codec.message_type(),

Check warning on line 82 in ntex/src/http/client/h1proto.rs

View check run for this annotation

Codecov / codecov/patch

ntex/src/http/client/h1proto.rs#L81-L82

Added lines #L81 - L82 were not covered by tests
result
);
Ok(result)
} else {
Err(SendRequestError::from(ConnectError::Disconnected(None)))

Check warning on line 87 in ntex/src/http/client/h1proto.rs

View check run for this annotation

Codecov / codecov/patch

ntex/src/http/client/h1proto.rs#L87

Added line #L87 was not covered by tests
}
};

let head = timeout_checked(timeout, fut)
.await
.map_err(|_| SendRequestError::Timeout)
.and_then(|res| res)?;

match codec.message_type() {
h1::MessageType::None => {
release_connection(io, !codec.keepalive(), created, pool);
Expand Down
14 changes: 13 additions & 1 deletion ntex/src/http/client/h2proto.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use std::io;

use ntex_h2::{self as h2, client::SimpleClient, frame};
use ntex_h2::client::{RecvStream, SimpleClient};
use ntex_h2::{self as h2, frame};

use crate::http::body::{BodySize, MessageBody};
use crate::http::header::{self, HeaderMap, HeaderValue};
use crate::http::message::{RequestHeadType, ResponseHead};
use crate::http::{h2::payload, payload::Payload, Method, Version};
use crate::time::{timeout_checked, Millis};
use crate::util::{poll_fn, ByteString, Bytes};

use super::error::{ConnectError, SendRequestError};
Expand All @@ -14,6 +16,7 @@ pub(super) async fn send_request<B>(
client: H2Client,
head: RequestHeadType,
body: B,
timeout: Millis,
) -> Result<(ResponseHead, Payload), SendRequestError>
where
B: MessageBody,
Expand Down Expand Up @@ -85,6 +88,15 @@ where
});
}

timeout_checked(timeout, get_response(rcv_stream))
.await
.map_err(|_| SendRequestError::Timeout)
.and_then(|res| res)
}

async fn get_response(
rcv_stream: RecvStream,
) -> Result<(ResponseHead, Payload), SendRequestError> {
let h2::Message { stream, kind } = rcv_stream
.recv()
.await
Expand Down
25 changes: 10 additions & 15 deletions ntex/src/http/client/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::http::body::{Body, BodyStream};
use crate::http::error::HttpError;
use crate::http::header::{self, HeaderMap, HeaderName, HeaderValue};
use crate::http::RequestHeadType;
use crate::time::{sleep, Millis, Sleep};
use crate::time::Millis;
use crate::util::{BoxFuture, Bytes, Stream};

#[cfg(feature = "compress")]
Expand Down Expand Up @@ -50,7 +50,6 @@ impl From<PrepForSendingError> for SendRequestError {
pub enum SendClientRequest {
Fut(
BoxFuture<'static, Result<ClientResponse, SendRequestError>>,
Option<Sleep>,
bool,
),
Err(Option<SendRequestError>),
Expand All @@ -60,9 +59,8 @@ impl SendClientRequest {
pub(crate) fn new(
send: BoxFuture<'static, Result<ClientResponse, SendRequestError>>,
response_decompress: bool,
timeout: Millis,
) -> SendClientRequest {
SendClientRequest::Fut(send, timeout.map(sleep), response_decompress)
SendClientRequest::Fut(send, response_decompress)
}
}

Expand All @@ -73,14 +71,7 @@ impl Future for SendClientRequest {
let this = self.get_mut();

match this {
SendClientRequest::Fut(send, delay, _response_decompress) => {
if delay.is_some() {
match delay.as_ref().unwrap().poll_elapsed(cx) {
Poll::Pending => (),
_ => return Poll::Ready(Err(SendRequestError::Timeout)),
}
}

SendClientRequest::Fut(send, _response_decompress) => {
let res = match Pin::new(send).poll(cx) {
Poll::Ready(res) => res,
Poll::Pending => return Poll::Pending,
Expand Down Expand Up @@ -143,10 +134,14 @@ impl RequestHeadType {
}
let body = body.into();

let fut =
Box::pin(async move { config.connector.send_request(self, body, addr).await });
let fut = Box::pin(async move {
config
.connector
.send_request(self, body, addr, timeout)
.await
});

SendClientRequest::new(fut, response_decompress, timeout)
SendClientRequest::new(fut, response_decompress)
}

pub(super) fn send_json<T: Serialize>(
Expand Down

0 comments on commit 9a82a78

Please sign in to comment.