From 9e72dbce18ff6cd16e198f1a4f0f8e785218d62a Mon Sep 17 00:00:00 2001 From: Nikolay Kim Date: Tue, 14 Nov 2023 13:38:35 +0600 Subject: [PATCH] Refactor http/1 keep-alive timer --- ntex/CHANGES.md | 4 + ntex/src/http/config.rs | 127 +++++++++++++++++++++---- ntex/src/http/h1/codec.rs | 17 ++-- ntex/src/http/h1/dispatcher.rs | 169 ++++++++++++++++++++------------- 4 files changed, 224 insertions(+), 93 deletions(-) diff --git a/ntex/CHANGES.md b/ntex/CHANGES.md index ddea9fb91..f7bc9d49d 100644 --- a/ntex/CHANGES.md +++ b/ntex/CHANGES.md @@ -1,5 +1,9 @@ # Changes +## [0.7.11] - 2023-11-xx + +* Refactor http/1 timeouts + ## [0.7.10] - 2023-11-12 * Start http client timeout after sending body diff --git a/ntex/src/http/config.rs b/ntex/src/http/config.rs index 5ec41f8fe..a93a8428f 100644 --- a/ntex/src/http/config.rs +++ b/ntex/src/http/config.rs @@ -40,25 +40,27 @@ impl From> for KeepAlive { } } -#[derive(Debug)] +#[derive(Debug, Clone)] /// Http service configuration -pub struct ServiceConfig(pub(super) Rc); +pub struct ServiceConfig(pub(super) Inner); -#[derive(Debug)] +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +pub(super) struct ReadRate { + pub(super) rate: u16, + pub(super) timeout: time::Duration, + pub(super) max_timeout: time::Duration, +} + +#[derive(Debug, Clone)] pub(super) struct Inner { pub(super) keep_alive: Millis, - pub(super) client_timeout: Millis, pub(super) client_disconnect: Seconds, pub(super) ka_enabled: bool, pub(super) timer: DateService, pub(super) ssl_handshake_timeout: Millis, pub(super) h2config: h2::Config, -} - -impl Clone for ServiceConfig { - fn clone(&self) -> Self { - ServiceConfig(self.0.clone()) - } + pub(super) headers_read_rate: Option, + pub(super) payload_read_rate: Option, } impl Default for ServiceConfig { @@ -89,15 +91,94 @@ impl ServiceConfig { }; let keep_alive = if ka_enabled { keep_alive } else { Millis::ZERO }; - ServiceConfig(Rc::new(Inner { - keep_alive, - ka_enabled, - client_timeout, + ServiceConfig(Inner { client_disconnect, ssl_handshake_timeout, h2config, + keep_alive, + ka_enabled, timer: DateService::new(), - })) + headers_read_rate: Some(ReadRate { + rate: 256, + timeout: client_timeout.into(), + max_timeout: (client_timeout + Millis(3_000)).into(), + }), + payload_read_rate: None, + }) + } + + /// Set keep-alive timeout in seconds. + /// + /// To disable timeout set value to 0. + /// + /// By default keep-alive timeout is set to 30 seconds. + pub fn keepalive_timeout(mut self, timeout: Seconds) -> Self { + self.0.keep_alive = timeout.into(); + self.0.ka_enabled = !timeout.is_zero(); + self + } + + /// Set connection disconnect timeout. + /// + /// Defines a timeout for disconnect connection. If a disconnect procedure does not complete + /// within this time, the connection get dropped. + /// + /// To disable timeout set value to 0. + /// + /// By default disconnect timeout is set to 1 seconds. + pub fn disconnect_timeout(mut self, timeout: Seconds) -> Self { + self.0.client_disconnect = timeout; + self + } + + /// Set read rate parameters for request headers. + /// + /// Set max timeout for reading request headers. If the client + /// sends `rate` amount of data, increase the timeout by 1 second for every. + /// But no more than `max_timeout` timeout. + /// + /// By default headers read rate is set to 1sec with max timeout 5sec. + pub fn headers_read_rate( + mut self, + timeout: Seconds, + max_timeout: Seconds, + rate: u16, + ) -> Self { + if timeout.is_zero() { + self.0.headers_read_rate = Some(ReadRate { + rate, + timeout: timeout.into(), + max_timeout: max_timeout.into(), + }); + } else { + self.0.headers_read_rate = None; + } + self + } + + /// Set read rate parameters for request's payload. + /// + /// Set max timeout for reading payload. If the client + /// sends `rate` amount of data, increase the timeout by 1 second for every. + /// But no more than `max_timeout` timeout. + /// + /// By default payload read rate is disabled. + pub fn payload_read_rate( + mut self, + timeout: Seconds, + max_timeout: Seconds, + rate: u16, + ) -> Self { + if timeout.is_zero() { + self.0.payload_read_rate = Some(ReadRate { + rate, + timeout: timeout.into(), + max_timeout: max_timeout.into(), + }); + } else { + self.0.payload_read_rate = None; + } + self } } @@ -108,10 +189,11 @@ pub(super) struct DispatcherConfig { pub(super) expect: Pipeline, pub(super) upgrade: Option>, pub(super) keep_alive: Duration, - pub(super) client_timeout: Duration, pub(super) client_disconnect: Seconds, pub(super) h2config: h2::Config, pub(super) ka_enabled: bool, + pub(super) headers_read_rate: Option, + pub(super) payload_read_rate: Option, pub(super) timer: DateService, pub(super) on_request: Option>, } @@ -130,9 +212,10 @@ impl DispatcherConfig { upgrade: upgrade.map(|v| v.into()), on_request: on_request.map(|v| v.into()), keep_alive: Duration::from(cfg.0.keep_alive), - client_timeout: Duration::from(cfg.0.client_timeout), - client_disconnect: cfg.0.client_disconnect, + client_disconnect: cfg.0.client_disconnect.into(), ka_enabled: cfg.0.ka_enabled, + headers_read_rate: cfg.0.headers_read_rate, + payload_read_rate: cfg.0.payload_read_rate, h2config: cfg.0.h2config.clone(), timer: cfg.0.timer.clone(), } @@ -142,6 +225,14 @@ impl DispatcherConfig { pub(super) fn keep_alive_enabled(&self) -> bool { self.ka_enabled } + + pub(super) fn headers_read_rate(&self) -> Option<&ReadRate> { + self.headers_read_rate.as_ref() + } + + pub(super) fn payload_read_rate(&self) -> Option<&ReadRate> { + self.payload_read_rate.as_ref() + } } const DATE_VALUE_LENGTH_HDR: usize = 39; diff --git a/ntex/src/http/h1/codec.rs b/ntex/src/http/h1/codec.rs index 13e7941a0..eec7f09c7 100644 --- a/ntex/src/http/h1/codec.rs +++ b/ntex/src/http/h1/codec.rs @@ -82,7 +82,7 @@ impl Codec { flags: Cell::new(flags), decoder: decoder::MessageDecoder::default(), version: Cell::new(Version::HTTP_11), - ctype: Cell::new(ConnectionType::Close), + ctype: Cell::new(ConnectionType::KeepAlive), encoder: encoder::MessageEncoder::default(), } } @@ -99,12 +99,6 @@ impl Codec { self.ctype.get() == ConnectionType::KeepAlive } - #[inline] - /// Check if keep-alive enabled on server level - pub fn keepalive_enabled(&self) -> bool { - self.flags.get().contains(Flags::KEEPALIVE_ENABLED) - } - pub(super) fn set_ctype(&self, ctype: ConnectionType) { self.ctype.set(ctype) } @@ -139,11 +133,14 @@ impl Decoder for Codec { flags.set(Flags::HEAD, head.method == Method::HEAD); self.flags.set(flags); self.version.set(head.version); - self.ctype.set(head.connection_type()); - if self.ctype.get() == ConnectionType::KeepAlive + + let ctype = head.connection_type(); + if ctype == ConnectionType::KeepAlive && !flags.contains(Flags::KEEPALIVE_ENABLED) { self.ctype.set(ConnectionType::Close) + } else { + self.ctype.set(ctype) } if let PayloadType::Stream(_) = payload { @@ -249,6 +246,6 @@ mod tests { ); let _item = codec.decode(&mut buf).unwrap().unwrap(); assert!(codec.upgrade()); - assert!(!codec.keepalive_enabled()); + assert!(!codec.keepalive()); } } diff --git a/ntex/src/http/h1/dispatcher.rs b/ntex/src/http/h1/dispatcher.rs index f0cdbe3b7..cb2aa5fa8 100644 --- a/ntex/src/http/h1/dispatcher.rs +++ b/ntex/src/http/h1/dispatcher.rs @@ -1,9 +1,12 @@ //! Framed transport dispatcher use std::task::{Context, Poll}; -use std::{cell::RefCell, error::Error, future::Future, io, marker, pin::Pin, rc::Rc}; +use std::{ + cell::RefCell, error::Error, future::Future, io, marker, pin::Pin, rc::Rc, time, +}; use crate::io::{Filter, Io, IoBoxed, IoRef, IoStatusUpdate, RecvError}; use crate::service::{Pipeline, PipelineCall, Service}; +use crate::time::now; use crate::util::{ready, Bytes}; use crate::http; @@ -18,21 +21,22 @@ use super::decoder::{PayloadDecoder, PayloadItem, PayloadType}; use super::payload::{Payload, PayloadSender, PayloadStatus}; use super::{codec::Codec, Message}; +const ONE_SEC: time::Duration = time::Duration::from_secs(1); + bitflags::bitflags! { #[derive(Copy, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Hash)] - pub struct Flags: u16 { - /// We parsed one complete request message - const STARTED = 0b0000_0001; - /// Keep-alive is enabled on current connection - const KEEPALIVE = 0b0000_0010; - /// Keep-alive is registered - const KEEPALIVE_REG = 0b0000_0100; + pub struct Flags: u8 { /// Upgrade request - const UPGRADE = 0b0000_1000; + const UPGRADE = 0b0000_0001; /// Handling upgrade - const UPGRADE_HND = 0b0001_0000; + const UPGRADE_HND = 0b0000_0010; /// Stop after sending payload - const SENDPAYLOAD_AND_STOP = 0b0010_0000; + const SENDPAYLOAD_AND_STOP = 0b0000_0100; + + /// Read headers timer is enabled + const READ_HDRS_TIMEOUT = 0b0001_0000; + /// Read headers payload is enabled + const READ_PL_TIMEOUT = 0b0010_0000; } } @@ -54,8 +58,6 @@ enum State { Call, #[error("State::ReadRequest")] ReadRequest, - #[error("State::ReadFirstRequest")] - ReadFirstRequest, #[error("State::ReadPayload")] ReadPayload, #[error("State::SendPayload")] @@ -93,6 +95,8 @@ struct DispatcherInner { config: Rc>, error: Option, payload: Option<(PayloadDecoder, PayloadSender)>, + read_bytes: u32, + read_max_timeout: time::Instant, _t: marker::PhantomData<(S, B)>, } @@ -113,16 +117,16 @@ where io.set_disconnect_timeout(config.client_disconnect); // slow-request timer - let flags = if config.client_timeout.is_zero() { - Flags::empty() + let flags = if let Some(cfg) = config.headers_read_rate() { + io.start_timer(cfg.timeout); + Flags::READ_HDRS_TIMEOUT } else { - io.start_timer(config.client_timeout); - Flags::KEEPALIVE_REG + Flags::empty() }; Dispatcher { call: CallState::None, - st: State::ReadFirstRequest, + st: State::ReadRequest, inner: DispatcherInner { io, flags, @@ -130,6 +134,8 @@ where config, error: None, payload: None, + read_bytes: 0, + read_max_timeout: now(), _t: marker::PhantomData, }, } @@ -307,7 +313,7 @@ where *this.st = State::Stop; this.inner.error = Some(e); } else { - *this.st = this.inner.switch_to_read_request(); + *this.st = State::ReadRequest; } } // send response body @@ -374,11 +380,6 @@ where } } } - // read first request and call service - State::ReadFirstRequest => { - *this.st = ready!(this.inner.read_request(cx, &mut this.call)); - this.inner.flags.insert(Flags::STARTED); - } // stop io tasks and call upgrade service State::Upgrade(ref mut req) => { let io = this.inner.io.take(); @@ -401,7 +402,7 @@ where } // prepare to shutdown State::Stop => { - this.inner.unregister_keepalive(); + this.inner.io.stop_timer(); return if let Err(e) = ready!(this.inner.io.poll_shutdown(cx)) { // get io error @@ -432,29 +433,6 @@ where B: MessageBody, X: Service, { - fn switch_to_read_request(&mut self) -> State { - // connection is not keep-alive, disconnect - if !self.flags.contains(Flags::KEEPALIVE) || !self.codec.keepalive_enabled() { - self.io.close(); - State::Stop - } else { - // register keep-alive timer - if self.flags.contains(Flags::KEEPALIVE) { - self.flags.remove(Flags::KEEPALIVE); - self.flags.insert(Flags::KEEPALIVE_REG); - self.io.start_timer(self.config.keep_alive); - } - State::ReadRequest - } - } - - fn unregister_keepalive(&mut self) { - if self.flags.contains(Flags::KEEPALIVE_REG) { - self.io.stop_keepalive_timer(); - self.flags.remove(Flags::KEEPALIVE | Flags::KEEPALIVE_REG); - } - } - fn handle_error(&mut self, err: E, critical: bool) -> State where E: ResponseError + 'static, @@ -477,27 +455,28 @@ where } } + /// Handle normal requests fn service_call(&self, req: Request) -> CallState { - // Handle normal requests CallState::Service { fut: self.config.service.call_nowait(req), } } + /// Handle filter fut fn service_filter(&self, req: Request, f: &Pipeline) -> CallState { - // Handle filter fut CallState::Filter { fut: f.call_nowait((req, self.io.get_ref())), } } + /// Handle normal requests with EXPECT: 100-Continue` header fn service_expect(&self, req: Request) -> CallState { - // Handle normal requests with EXPECT: 100-Continue` header CallState::Expect { fut: self.config.expect.call_nowait(req), } } + /// Handle upgrade requests fn service_upgrade(&mut self, mut req: Request) -> CallState { // Move io into request let io: IoBoxed = self.io.take().into(); @@ -505,7 +484,6 @@ where io.get_ref(), RefCell::new(Some(Box::new((io, self.codec.clone())))), ))); - // Handle upgrade requests CallState::ServiceUpgrade { fut: self.config.service.call_nowait(req), } @@ -519,19 +497,28 @@ where log::trace!("trying to read http message"); loop { - let result = ready!(self.io.poll_recv(&self.codec, cx)); + // let result = ready!(self.io.poll_recv(&self.codec, cx)); + let result = match self.io.poll_recv_decode(&self.codec, cx) { + Ok(decoded) => { + if let Some(st) = + self.update_timer(decoded.item.is_some(), decoded.remains) + { + return Poll::Ready(st); + } + if let Some(item) = decoded.item { + Ok(item) + } else { + return Poll::Pending; + } + } + Err(err) => Err(err), + }; // decode incoming bytes stream return match result { Ok((mut req, pl)) => { log::trace!("http message is received: {:?} and payload {:?}", req, pl); - // keep-alive timer - if self.flags.contains(Flags::KEEPALIVE_REG) { - self.flags.remove(Flags::KEEPALIVE_REG); - self.io.stop_keepalive_timer(); - } - // configure request payload let upgrade = match pl { PayloadType::None => false, @@ -602,12 +589,13 @@ where Poll::Ready(State::Stop) } Err(RecvError::KeepAlive) => { - // keep-alive timeout - if !self.flags.contains(Flags::STARTED) { + if self.flags.contains(Flags::READ_HDRS_TIMEOUT) { log::trace!("slow request timeout"); let (req, body) = Response::RequestTimeout().finish().into_parts(); let _ = self.send_response(req, body.into_body()); self.error = Some(DispatchError::SlowRequestTimeout); + } else if self.flags.contains(Flags::READ_PL_TIMEOUT) { + log::trace!("slow payload timeout"); } else { log::trace!("keep-alive timeout, close connection"); } @@ -638,8 +626,6 @@ where if result.is_err() { State::Stop } else { - self.flags.set(Flags::KEEPALIVE, self.codec.keepalive()); - match body.size() { BodySize::None | BodySize::Empty => { if self.error.is_some() { @@ -647,7 +633,7 @@ where } else if self.payload.is_some() { State::ReadPayload } else { - self.switch_to_read_request() + State::ReadRequest } } _ => State::SendPayload { body }, @@ -681,7 +667,7 @@ where } else if self.payload.is_some() { Some(State::ReadPayload) } else { - Some(self.switch_to_read_request()) + Some(State::ReadRequest) } } Some(Err(e)) => { @@ -712,6 +698,59 @@ where Poll::Ready(IoStatusUpdate::WriteBackpressure) => false, } } + + fn update_timer(&mut self, received: bool, remains: usize) -> Option> { + // we got parsed frame + if received { + // remove all timers + self.flags + .remove(Flags::READ_HDRS_TIMEOUT | Flags::READ_PL_TIMEOUT); + self.io.stop_timer(); + } else if self.flags.contains(Flags::READ_HDRS_TIMEOUT) { + // update read timer + if let Some(ref cfg) = self.config.headers_read_rate { + let bytes = remains as u32; + let delta = (bytes - self.read_bytes).try_into().unwrap_or(u16::MAX); + + if delta >= cfg.rate { + let n = now(); + let next = self.io.timer_deadline() + ONE_SEC; + let new_timeout = if n >= next { ONE_SEC } else { next - n }; + + // max timeout + if cfg.max_timeout.is_zero() + || (n + new_timeout) <= self.read_max_timeout + { + self.read_bytes = bytes; + self.io.stop_timer(); + self.io.start_timer(new_timeout); + } + } + } + } else { + // no new data then start keep-alive timer + if remains == 0 { + if self.codec.keepalive() { + self.io.start_timer(self.config.keep_alive); + } else { + self.io.close(); + return Some(State::Stop); + } + } else if let Some(ref cfg) = self.config.headers_read_rate { + // we got new data but not enough to parse single frame + // start read timer + self.flags.insert(Flags::READ_HDRS_TIMEOUT); + + self.read_bytes = remains as u32; + self.io.start_timer(cfg.timeout); + if !cfg.max_timeout.is_zero() { + self.read_max_timeout = now() + cfg.max_timeout; + } + } + } + + None + } } /// Process request's payload