From f00bd0b4da9a5d63a373dbde7a694babc380b330 Mon Sep 17 00:00:00 2001 From: Ximon Eighteen <3304436+ximon18@users.noreply.github.com> Date: Fri, 13 Sep 2024 13:33:11 +0200 Subject: [PATCH 1/6] Remove unnecessary Unpin constraint. --- src/net/server/message.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/net/server/message.rs b/src/net/server/message.rs index 84889a45b..8dddd053e 100644 --- a/src/net/server/message.rs +++ b/src/net/server/message.rs @@ -192,7 +192,7 @@ where impl Request where - Octs: AsRef<[u8]> + Send + Sync + Unpin, + Octs: AsRef<[u8]> + Send + Sync, { /// Creates a new request wrapper around a message along with its context. pub fn new( From 2bae2e1d6945033512c26689e7845c146991603d Mon Sep 17 00:00:00 2001 From: Ximon Eighteen <3304436+ximon18@users.noreply.github.com> Date: Fri, 13 Sep 2024 14:07:59 +0200 Subject: [PATCH 2/6] Handle ServiceError by halting response processing for a request and returning an appropriate DNS error message. Factor common service invoking and response processing code out to new trait ServiceInvoker and implement the network server specific parts for the UDP and TCP servers. Also simplifies some trait bounds. --- src/net/server/connection.rs | 306 ++++++++++++++++++++-------------- src/net/server/dgram.rs | 314 +++++++++++++++++++++-------------- src/net/server/dispatcher.rs | 174 +++++++++++++++++++ src/net/server/mod.rs | 1 + 4 files changed, 551 insertions(+), 244 deletions(-) create mode 100644 src/net/server/dispatcher.rs diff --git a/src/net/server/connection.rs b/src/net/server/connection.rs index a805b1b17..7019b6daa 100644 --- a/src/net/server/connection.rs +++ b/src/net/server/connection.rs @@ -8,7 +8,6 @@ use std::net::SocketAddr; use std::sync::Arc; use arc_swap::ArcSwap; -use futures::StreamExt; use octseq::Octets; use tokio::io::{ AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadHalf, WriteHalf, @@ -26,10 +25,11 @@ use crate::base::{Message, StreamTarget}; use crate::net::server::buf::BufSource; use crate::net::server::message::Request; use crate::net::server::metrics::ServerMetrics; -use crate::net::server::service::{Service, ServiceError, ServiceFeedback}; +use crate::net::server::service::{Service, ServiceError}; use crate::net::server::util::to_pcap_text; use crate::utils::config::DefMinMax; +use super::dispatcher::{InvokerStatus, ServiceInvoker}; use super::message::{NonUdpTransportContext, TransportSpecificContext}; use super::stream::Config as ServerConfig; use super::ServerCommand; @@ -221,9 +221,10 @@ impl Clone for Config { /// A handler for a single stream connection between client and server. pub struct Connection where - Buf: BufSource, - Buf::Output: Send + Sync + Unpin, - Svc: Service + Clone, + Buf: BufSource + Clone + Send + Sync + 'static, + Buf::Output: Octets + Send + Sync + Unpin, + Svc: Service + Clone + Send + Sync + 'static, + Svc::Target: Composer + Default + Send, { /// Flag used by the Drop impl to track if the metric count has to be /// decreased or not. @@ -266,6 +267,9 @@ where /// [`ServerMetrics`] describing the status of the server. metrics: Arc, + + /// Dispatches requests to the service and enqueues responses for sending. + request_dispatcher: ServiceResponseHandler, } /// Creation @@ -273,9 +277,12 @@ where impl Connection where Stream: AsyncRead + AsyncWrite, - Buf: BufSource, + Buf: BufSource + Clone + Send + Sync, Buf::Output: Octets + Send + Sync + Unpin, - Svc: Service + Clone, + Svc: Service + Clone + Send + Sync, + Svc::Target: Composer + Default + Send, + Svc::Stream: Send, + Svc::Future: Send, { /// Creates a new handler for an accepted stream connection. #[must_use] @@ -322,6 +329,12 @@ where // uses of self we have to do while running. let stream_rx = Some(stream_rx); + let request_dispatcher = ServiceResponseHandler::new( + config.clone(), + result_q_tx.clone(), + metrics.clone(), + ); + Self { active: false, buf, @@ -334,6 +347,7 @@ where service, idle_timer, metrics, + request_dispatcher, } } } @@ -346,8 +360,9 @@ where Buf: BufSource + Send + Sync + Clone + 'static, Buf::Output: Octets + Send + Sync + Unpin, Svc: Service + Clone + Send + Sync + 'static, - Svc::Target: Composer + Send, + Svc::Target: Composer + Default + Send, Svc::Stream: Send, + Svc::Future: Send, { /// Start reading requests and writing responses to the stream. /// @@ -363,9 +378,7 @@ where pub async fn run( mut self, command_rx: watch::Receiver>, - ) where - Svc::Future: Send, - { + ) { self.metrics.inc_num_connections(); // Flag that we have to decrease the metric count on Drop. @@ -383,7 +396,7 @@ where Buf: BufSource + Send + Sync + Clone + 'static, Buf::Output: Octets + Send + Sync + Unpin, Svc: Service + Clone + Send + Sync + 'static, - Svc::Target: Composer + Send, + Svc::Target: Composer + Default + Send, Svc::Future: Send, Svc::Stream: Send, { @@ -536,10 +549,7 @@ where } /// Stop queueing new responses and process those already in the queue. - async fn flush_write_queue(&mut self) - // where - // Target: Composer, - { + async fn flush_write_queue(&mut self) { debug!("Flushing connection write queue."); // Stop accepting new response messages (should we check for in-flight // messages that haven't generated a response yet but should be @@ -564,10 +574,7 @@ where async fn process_queued_result( &mut self, response: Option>>, - ) -> Result<(), ConnectionEvent> -// where - // Target: Composer, - { + ) -> Result<(), ConnectionEvent> { // If we failed to read the results of requests processed by the // service because the queue holding those results is empty and can no // longer be read from, then there is no point continuing to read from @@ -592,14 +599,11 @@ where async fn write_response_to_stream( &mut self, msg: StreamTarget, - ) - // where - // Target: AsRef<[u8]>, - { + ) { if enabled!(Level::TRACE) { let bytes = msg.as_dgram_slice(); let pcap_text = to_pcap_text(bytes, bytes.len()); - trace!(addr = %self.addr, pcap_text, "Sending response"); + trace!(addr = %self.addr, pcap_text, "Sending {} bytes of response tp {}", self.addr, bytes.len()); } match timeout( @@ -653,6 +657,7 @@ where ) -> Result<(), ConnectionEvent> where Svc::Stream: Send, + Svc::Target: Default, { match res { Ok(buf) => { @@ -682,112 +687,24 @@ where let ctx = NonUdpTransportContext::new(Some( self.config.load().idle_timeout, )); - let ctx = TransportSpecificContext::NonUdp(ctx); + let request = Request::new( self.addr, received_at, msg, - ctx, + TransportSpecificContext::NonUdp(ctx), (), ); - let svc = self.service.clone(); - let result_q_tx = self.result_q_tx.clone(); - let metrics = self.metrics.clone(); - let config = self.config.clone(); - trace!( "Spawning task to handle new message with id {}", request.message().header().id() ); + + let mut dispatcher = self.request_dispatcher.clone(); + let service = self.service.clone(); tokio::spawn(async move { - let request_id = request.message().header().id(); - trace!( - "Calling service for request id {request_id}" - ); - let mut stream = svc.call(request).await; - let mut in_transaction = false; - - trace!("Awaiting service call results for request id {request_id}"); - while let Some(Ok(call_result)) = - stream.next().await - { - trace!("Processing service call result for request id {request_id}"); - let (response, feedback) = - call_result.into_inner(); - - if let Some(feedback) = feedback { - match feedback { - ServiceFeedback::Reconfigure { - idle_timeout, - } => { - if let Some(idle_timeout) = - idle_timeout - { - debug!( - "Reconfigured connection timeout to {idle_timeout:?}" - ); - let guard = config.load(); - let mut new_config = **guard; - new_config.idle_timeout = - idle_timeout; - config.store(Arc::new( - new_config, - )); - } - } - - ServiceFeedback::BeginTransaction => { - in_transaction = true; - } - - ServiceFeedback::EndTransaction => { - in_transaction = false; - } - } - } - - if let Some(mut response) = response { - loop { - match result_q_tx.try_send(response) { - Ok(()) => { - let pending_writes = - result_q_tx - .max_capacity() - - result_q_tx - .capacity(); - trace!("Queued message for sending: # pending writes={pending_writes}"); - metrics - .set_num_pending_writes( - pending_writes, - ); - break; - } - - Err(TrySendError::Closed(_)) => { - error!("Unable to queue message for sending: server is shutting down."); - break; - } - - Err(TrySendError::Full( - unused_response, - )) => { - if in_transaction { - // Wait until there is space in the message queue. - tokio::task::yield_now() - .await; - response = - unused_response; - } else { - error!("Unable to queue message for sending: queue is full."); - break; - } - } - } - } - } - } - trace!("Finished processing service call results for request id {request_id}"); + dispatcher.dispatch(request, service, ()).await }); } } @@ -804,9 +721,10 @@ where impl Drop for Connection where - Buf: BufSource, - Buf::Output: Send + Sync + Unpin, - Svc: Service + Clone, + Buf: BufSource + Clone + Send + Sync, + Buf::Output: Octets + Send + Sync + Unpin, + Svc: Service + Clone + Send + Sync, + Svc::Target: Composer + Default + Send, { fn drop(&mut self) { if self.active { @@ -1075,3 +993,147 @@ impl IdleTimer { self.reset_idle_timer() } } + +//------------ ServiceResponseHandler ----------------------------------------- + +/// Handles responses from the [`Service`] impl. +struct ServiceResponseHandler +where + RequestOctets: Octets + Send + Sync, + Svc: Service + Clone + Send + Sync + 'static, + Svc::Target: Composer + Default + Send, +{ + /// User supplied settings that influence our behaviour. + /// + /// May updated during request and response processing based on received + /// [`ServiceFeedback`]. + config: Arc>, + + /// The writer for pushing ready responses onto the queue waiting + /// to be written back the client. + result_q_tx: mpsc::Sender>>, + + /// [`ServerMetrics`] describing the status of the server. + metrics: Arc, + + /// The status of the service invoker. + status: InvokerStatus, +} + +impl ServiceResponseHandler +where + RequestOctets: Octets + Send + Sync, + Svc: Service + Clone + Send + Sync, + Svc::Target: Composer + Default + Send, +{ + /// Creates a new instance of the service response handler. + fn new( + config: Arc>, + result_q_tx: mpsc::Sender< + AdditionalBuilder>, + >, + metrics: Arc, + ) -> Self { + Self { + config, + result_q_tx, + metrics, + status: InvokerStatus::Normal, + } + } + + /// Apply changes to our configuration as requested by the [`Service`] + /// impl. + fn update_config(&self, idle_timeout: Option) { + if let Some(idle_timeout) = idle_timeout { + debug!("Reconfigured connection timeout to {idle_timeout:?}"); + let guard = self.config.load(); + let mut new_config = **guard; + new_config.idle_timeout = idle_timeout; + self.config.store(Arc::new(new_config)); + } + } + + /// Enqueue a response from the [`Service`] impl for writing back to the + /// client. + async fn do_enqueue_response( + &self, + mut response: AdditionalBuilder>, + ) { + loop { + match self.result_q_tx.try_send(response) { + Ok(()) => { + let pending_writes = self.result_q_tx.max_capacity() + - self.result_q_tx.capacity(); + trace!("Queued message for sending: # pending writes={pending_writes}"); + self.metrics.set_num_pending_writes(pending_writes); + break; + } + + Err(TrySendError::Closed(_)) => { + error!("Unable to queue message for sending: server is shutting down."); + break; + } + + Err(TrySendError::Full(unused_response)) => { + if matches!(self.status, InvokerStatus::InTransaction) { + // Wait until there is space in the message queue. + tokio::task::yield_now().await; + response = unused_response; + } else { + error!("Unable to queue message for sending: queue is full."); + break; + } + } + } + } + } +} + +//--- Clone + +impl Clone for ServiceResponseHandler +where + RequestOctets: Octets + Send + Sync, + Svc: Service + Clone + Send + Sync + 'static, + Svc::Target: Composer + Default + Send, +{ + fn clone(&self) -> Self { + Self { + config: self.config.clone(), + result_q_tx: self.result_q_tx.clone(), + metrics: self.metrics.clone(), + status: InvokerStatus::Normal, + } + } +} + +//--- ServiceInvoker + +impl ServiceInvoker + for ServiceResponseHandler +where + RequestOctets: Octets + Send + Sync, + Svc: Service + Clone + Send + Sync, + Svc::Target: Composer + Default + Send, +{ + fn status(&self) -> InvokerStatus { + self.status + } + + fn set_status(&mut self, status: InvokerStatus) { + self.status = status; + } + + fn reconfigure(&self, idle_timeout: Option) { + self.update_config(idle_timeout); + } + + async fn enqueue_response( + &self, + response: AdditionalBuilder>, + _meta: &(), + ) { + self.do_enqueue_response(response).await + } +} diff --git a/src/net/server/dgram.rs b/src/net/server/dgram.rs index 752b8f4ca..c718a1678 100644 --- a/src/net/server/dgram.rs +++ b/src/net/server/dgram.rs @@ -21,7 +21,6 @@ use std::string::ToString; use std::sync::{Arc, Mutex}; use arc_swap::ArcSwap; -use futures::prelude::stream::StreamExt; use octseq::Octets; use tokio::io::ReadBuf; use tokio::net::UdpSocket; @@ -34,18 +33,20 @@ use tracing::warn; use tracing::Level; use tracing::{enabled, error, trace}; +use crate::base::message_builder::AdditionalBuilder; use crate::base::wire::Composer; -use crate::base::Message; +use crate::base::{Message, StreamTarget}; use crate::net::server::buf::BufSource; use crate::net::server::error::Error; use crate::net::server::message::Request; use crate::net::server::metrics::ServerMetrics; -use crate::net::server::service::{Service, ServiceFeedback}; +use crate::net::server::service::Service; use crate::net::server::sock::AsyncDgramSock; use crate::net::server::util::to_pcap_text; use crate::utils::config::DefMinMax; use super::buf::VecBufSource; +use super::dispatcher::{InvokerStatus, ServiceInvoker}; use super::message::{TransportSpecificContext, UdpTransportContext}; use super::ServerCommand; @@ -252,15 +253,11 @@ pub struct DgramServer where Sock: AsyncDgramSock + Send + Sync + 'static, Buf: BufSource + Send + Sync, - ::Output: Octets + Send + Sync + Unpin + 'static, - Svc: Clone - + Service<::Output, ()> - + Send - + Sync - + 'static, - ::Output, ()>>::Future: Send, - ::Output, ()>>::Stream: Send, - ::Output, ()>>::Target: Composer + Send, + Buf::Output: Octets + Send + Sync + Unpin + 'static, + Svc: Service + Clone + Send + Sync + 'static, + Svc::Future: Send, + Svc::Stream: Send, + Svc::Target: Composer + Default + Send, { /// The configuration of the server. config: Arc>, @@ -288,6 +285,9 @@ where /// [`ServerMetrics`] describing the status of the server. metrics: Arc, + + /// Dispatches requests to the service and enqueues responses for sending. + request_dispatcher: ServiceResponseHandler, } /// Creation @@ -296,11 +296,11 @@ impl DgramServer where Sock: AsyncDgramSock + Send + Sync, Buf: BufSource + Send + Sync, - ::Output: Octets + Send + Sync + Unpin, - Svc: Clone + Service<::Output, ()> + Send + Sync, - ::Output, ()>>::Future: Send, - ::Output, ()>>::Stream: Send, - ::Output, ()>>::Target: Composer + Send, + Buf::Output: Octets + Send + Sync + Unpin, + Svc: Service + Clone + Send + Sync + 'static, + Svc::Future: Send, + Svc::Stream: Send, + Svc::Target: Composer + Default + Send, { /// Constructs a new [`DgramServer`] with default configuration. /// @@ -334,15 +334,23 @@ where let command_tx = Arc::new(Mutex::new(command_tx)); let metrics = Arc::new(ServerMetrics::connection_less()); let config = Arc::new(ArcSwap::from_pointee(config)); + let sock = Arc::new(sock); + + let request_dispatcher = ServiceResponseHandler::new( + config.clone(), + sock.clone(), + metrics.clone(), + ); DgramServer { config, command_tx, command_rx, - sock: sock.into(), + sock, buf, service, metrics, + request_dispatcher, } } } @@ -353,11 +361,11 @@ impl DgramServer where Sock: AsyncDgramSock + Send + Sync, Buf: BufSource + Send + Sync, - ::Output: Octets + Send + Sync + Unpin, - Svc: Clone + Service<::Output, ()> + Send + Sync, - ::Output, ()>>::Future: Send, - ::Output, ()>>::Stream: Send, - ::Output, ()>>::Target: Composer + Send, + Buf::Output: Octets + Send + Sync + Unpin, + Svc: Service + Clone + Send + Sync + 'static, + Svc::Future: Send, + Svc::Stream: Send, + Svc::Target: Composer + Default + Send, { /// Get a reference to the network source being used to receive messages. #[must_use] @@ -378,15 +386,11 @@ impl DgramServer where Sock: AsyncDgramSock + Send + Sync + 'static, Buf: BufSource + Send + Sync, - ::Output: Octets + Send + Sync + 'static + Unpin, - Svc: Clone - + Service<::Output, ()> - + Send - + Sync - + 'static, - ::Output, ()>>::Future: Send, - ::Output, ()>>::Stream: Send, - ::Output, ()>>::Target: Composer + Send, + Buf::Output: Octets + Send + Sync + Unpin + 'static, + Svc: Service + Clone + Send + Sync + 'static, + Svc::Future: Send, + Svc::Stream: Send, + Svc::Target: Composer + Default + Send, { /// Start the server. /// @@ -466,11 +470,11 @@ impl DgramServer where Sock: AsyncDgramSock + Send + Sync, Buf: BufSource + Send + Sync, - ::Output: Octets + Send + Sync + Unpin, - Svc: Clone + Service<::Output, ()> + Send + Sync, - ::Output, ()>>::Future: Send, - ::Output, ()>>::Stream: Send, - ::Output, ()>>::Target: Composer + Send, + Buf::Output: Octets + Send + Sync + Unpin + 'static, + Svc: Service + Clone + Send + Sync + 'static, + Svc::Future: Send, + Svc::Stream: Send, + Svc::Target: Composer + Default + Send, { /// Receive incoming messages until shutdown or fatal error. async fn run_until_error(&self) -> Result<(), String> { @@ -502,74 +506,28 @@ where trace!(%addr, pcap_text, "Received message"); } - let svc = self.service.clone(); - let cfg = self.config.clone(); - let metrics = self.metrics.clone(); - let cloned_sock = self.sock.clone(); - let write_timeout = self.config.load().write_timeout; - - tokio::spawn(async move { - match Message::from_octets(buf) { - Err(err) => { - tracing::warn!("Failed while parsing request message: {err}"); - } - - Ok(msg) => { - let ctx = UdpTransportContext::new(cfg.load().max_response_size); - let ctx = TransportSpecificContext::Udp(ctx); - let request = Request::new(addr, received_at, msg, ctx, ()); - let mut stream = svc.call(request).await; - while let Some(Ok(call_result)) = stream.next().await { - let (response, feedback) = call_result.into_inner(); - - if let Some(feedback) = feedback { - match feedback { - ServiceFeedback::Reconfigure { - idle_timeout: _, // N/A - only applies to connection-oriented transports - } => { - // Nothing to do. - } - - ServiceFeedback::BeginTransaction|ServiceFeedback::EndTransaction => { - // Nothing to do. - } - } - } - - // Process the DNS response message, if any. - if let Some(response) = response { - // Convert the DNS response message into bytes. - let target = response.finish(); - let bytes = target.as_dgram_slice(); - - // Logging - if enabled!(Level::TRACE) { - let pcap_text = to_pcap_text(bytes, bytes.len()); - trace!(%addr, pcap_text, "Sending response"); - } - - metrics.inc_num_pending_writes(); - - // Actually write the DNS response message bytes to the UDP - // socket. - if let Err(err) = Self::send_to( - &cloned_sock, - bytes, - &addr, - write_timeout, - ) - .await - { - warn!(%addr, "Failed to send response: {err}"); - } - - metrics.dec_num_pending_writes(); - metrics.inc_num_sent_responses(); - } - } - } + match Message::from_octets(buf) { + Err(err) => { + tracing::warn!("Failed while parsing request message: {err}"); } - }); + + Ok(msg) => { + let ctx = UdpTransportContext::new(self.config.load().max_response_size); + let ctx = TransportSpecificContext::Udp(ctx); + let request = Request::new(addr, received_at, msg, ctx, ()); + + trace!( + "Spawning task to handle new message with id {}", + request.message().header().id() + ); + + let mut dispatcher = self.request_dispatcher.clone(); + let service = self.service.clone(); + tokio::spawn(async move { + dispatcher.dispatch(request, service, addr).await + }); + } + } } } } @@ -636,9 +594,101 @@ where .try_recv_buf_from(&mut buf) .map(|(bytes_read, addr)| (msg, addr, bytes_read)) } +} + +//--- Drop + +impl Drop for DgramServer +where + Sock: AsyncDgramSock + Send + Sync + 'static, + Buf: BufSource + Send + Sync, + Buf::Output: Octets + Send + Sync + Unpin + 'static, + Svc: Service + Clone + Send + Sync + 'static, + Svc::Future: Send, + Svc::Stream: Send, + Svc::Target: Composer + Default + Send, +{ + fn drop(&mut self) { + // Shutdown the DgramServer. Don't handle the failure case here as + // I'm not sure if it's safe to log or write to stderr from a Drop + // impl. + let _ = self.shutdown(); + } +} + +//------------ ServiceResponseHandler ----------------------------------------- + +/// Handles responses from the [`Service`] impl. +struct ServiceResponseHandler { + /// User supplied settings that influence our behaviour. + /// + /// May updated during request and response processing based on received + /// [`ServiceFeedback`]. + config: Arc>, + + /// The network socket to which responses will be sent. + sock: Arc, + + /// [`ServerMetrics`] describing the status of the server. + metrics: Arc, + + /// The status of the service invoker. + status: InvokerStatus, +} + +impl ServiceResponseHandler +where + Sock: AsyncDgramSock + Send + Sync + 'static, +{ + /// Creates a new instance of the service response handler. + fn new( + config: Arc>, + sock: Arc, + metrics: Arc, + ) -> Self { + Self { + config, + sock, + metrics, + status: InvokerStatus::Normal, + } + } + + /// Send a response from the [`Service`] impl to the client. + async fn send_response( + &self, + addr: SocketAddr, + response: AdditionalBuilder>, + ) { + // Convert the DNS response message into bytes. + let target = response.finish(); + let bytes = target.as_dgram_slice(); + + // Logging + if enabled!(Level::TRACE) { + let pcap_text = to_pcap_text(bytes, bytes.len()); + trace!(%addr, pcap_text, "Sending {} bytes of response tp {addr}", bytes.len()); + } + + self.metrics.inc_num_pending_writes(); + + let write_timeout = self.config.load().write_timeout; + + // Actually write the DNS response message bytes to the UDP + // socket. + if let Err(err) = + Self::write_to_network(&self.sock, bytes, &addr, write_timeout) + .await + { + warn!(%addr, "Failed to send response: {err}"); + } + + self.metrics.dec_num_pending_writes(); + self.metrics.inc_num_sent_responses(); + } /// Send a single datagram using the user supplied network socket. - async fn send_to( + async fn write_to_network( sock: &Sock, data: &[u8], dest: &SocketAddr, @@ -662,26 +712,46 @@ where } } -//--- Drop +//--- Clone -impl Drop for DgramServer +impl Clone for ServiceResponseHandler { + fn clone(&self) -> Self { + Self { + config: self.config.clone(), + sock: self.sock.clone(), + metrics: self.metrics.clone(), + status: InvokerStatus::Normal, + } + } +} + +//--- ServiceInvoker + +impl ServiceInvoker + for ServiceResponseHandler where Sock: AsyncDgramSock + Send + Sync + 'static, - Buf: BufSource + Send + Sync, - ::Output: Octets + Send + Sync + Unpin + 'static, - Svc: Clone - + Service<::Output, ()> - + Send - + Sync - + 'static, - ::Output, ()>>::Future: Send, - ::Output, ()>>::Stream: Send, - ::Output, ()>>::Target: Composer + Send, + RequestOctets: Octets + Send + Sync, + Svc: Service + Clone + Send + Sync, + Svc::Target: Composer + Default + Send, { - fn drop(&mut self) { - // Shutdown the DgramServer. Don't handle the failure case here as - // I'm not sure if it's safe to log or write to stderr from a Drop - // impl. - let _ = self.shutdown(); + fn status(&self) -> InvokerStatus { + self.status + } + + fn set_status(&mut self, status: InvokerStatus) { + self.status = status; + } + + fn reconfigure(&self, _idle_timeout: Option) { + // N/A + } + + async fn enqueue_response( + &self, + response: AdditionalBuilder>, + addr: &SocketAddr, + ) { + self.send_response(*addr, response).await } } diff --git a/src/net/server/dispatcher.rs b/src/net/server/dispatcher.rs new file mode 100644 index 000000000..f84b60017 --- /dev/null +++ b/src/net/server/dispatcher.rs @@ -0,0 +1,174 @@ +/// Common service invoking logic for network servers. +/// +/// Used by [`stream::Connection`][net::server::stream::Connection] and +/// [`dgram::Dgram`][net::server::dgram::Dgram]. +use core::clone::Clone; +use core::default::Default; +use core::time::Duration; + +use futures_util::StreamExt; +use octseq::Octets; +use tracing::trace; + +use crate::base::message_builder::AdditionalBuilder; +use crate::base::wire::Composer; +use crate::base::{Message, StreamTarget}; + +use super::message::Request; +use super::service::{Service, ServiceFeedback, ServiceResult}; +use super::util::mk_error_response; + +//------------ InvokerStatus -------------------------------------------------- + +/// The current status of the service invoker. +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +pub enum InvokerStatus { + /// Processing independent responses. + Normal, + + /// Processing related responses. + InTransaction, + + /// No more responses to the current request will be processed. + Aborting, +} + +//------------ ServiceInvoker ------------------------------------------------- + +/// Dispatch requests to a [`Service`] and do common response processing. +/// +/// Response streams will be split into individual responses and passed to the +/// trait implementer for writing back to the network. +/// +/// If the [`Service`] impl returns a [`ServiceError`] a corresponding DNS +/// error response will be created and no further responses from the service +/// for the current request will be processed and the service response stream +/// will be dropped. +/// +/// Also handles [`ServiceFeedback`] by invoking fn impls on the trait +/// implementing type. +#[allow(async_fn_in_trait)] +pub trait ServiceInvoker +where + Svc: Service, + Svc::Target: Composer + Default, + RequestOctets: Octets + Send + Sync, +{ + /// Dispatch a request and process the responses. + /// + /// Dispatches the given request to the given [`Service`] impl and + /// processes the stream of resulting responses, passing them to the trait + /// impl'd [`enqueue_response`] function with the provided metadata for + /// writing back to the network. until no more responses exist or the + /// trait impl'd [`status`] function reports that the state is + /// [`InvokerStatus::Aborting`]. + /// + /// On [`ServiceFeedback::Reconfigure`] passes the new configuration data + /// to the trait impl'd [`reconfugure`] function. + async fn dispatch( + &mut self, + request: Request, + svc: Svc, + enqueue_meta: EnqueueMeta, + ) { + let req_msg = request.message().clone(); + let request_id = request.message().header().id(); + + // Dispatch the request to the service for processing. + trace!("Calling service for request id {request_id}"); + let mut stream = svc.call(request).await; + + // Handle the resulting stream of responses, most likely just one as + // only XFR requests potentially result in multiple responses. + trace!("Awaiting service call results for request id {request_id}"); + while let Some(item) = stream.next().await { + trace!( + "Processing service call result for request id {request_id}" + ); + + let response = self.process_response_stream_item(item, &req_msg); + + if let Some(response) = response { + self.enqueue_response(response, &enqueue_meta).await; + } + + if matches!(self.status(), InvokerStatus::Aborting) { + trace!("Aborting response stream processing for request id {request_id}"); + break; + } + } + trace!("Finished processing service call results for request id {request_id}"); + } + + /// Processing a single response stream item. + /// + /// Calls [`process_feedback`] if necessary. Extracts any response for + /// further processing by the caller. + /// + /// On [`ServiceError`] calls the trait impl'd [`set_status`] function + /// with `InvokerStatus::Aborting` and returns a generated error response + /// instead of the response from the service. + fn process_response_stream_item( + &mut self, + stream_item: ServiceResult, + req_msg: &Message, + ) -> Option>> { + match stream_item { + Ok(call_result) => { + let (response, feedback) = call_result.into_inner(); + if let Some(feedback) = feedback { + self.process_feedback(feedback); + } + response + } + + Err(err) => { + self.set_status(InvokerStatus::Aborting); + Some(mk_error_response(req_msg, err.rcode().into())) + } + } + } + + //// Acts on [`ServiceFeedback`] received from the [`Service`]. + /// + /// Calls the trait impl'd [`reconfigure`] on + /// [`ServiceFeedback::Reconfigure`]. + /// + /// Calls the trait impl'd [`set_status`] on + /// [`ServiceFeedback::BeginTransaction`] with + /// [`InvokerStatus::InTransaction`]. + /// + /// Calls the trait impl'd [`set_status`] on + /// [`ServiceFeedback::EndTransaction`] with [`InvokerStatus::Normal`]. + fn process_feedback(&mut self, feedback: ServiceFeedback) { + match feedback { + ServiceFeedback::Reconfigure { idle_timeout } => { + self.reconfigure(idle_timeout); + } + + ServiceFeedback::BeginTransaction => { + self.set_status(InvokerStatus::InTransaction); + } + + ServiceFeedback::EndTransaction => { + self.set_status(InvokerStatus::Normal); + } + } + } + + /// Returns the current status of the service invoker. + fn status(&self) -> InvokerStatus; + + /// Sets the status of the service invoker to the given status. + fn set_status(&mut self, status: InvokerStatus); + + /// Reconfigures the network server with new settings. + fn reconfigure(&self, idle_timeout: Option); + + /// Enqueues a response for writing back to the client. + async fn enqueue_response( + &self, + response: AdditionalBuilder>, + meta: &EnqueueMeta, + ); +} diff --git a/src/net/server/mod.rs b/src/net/server/mod.rs index ab7a3b7dc..341b2ba5c 100644 --- a/src/net/server/mod.rs +++ b/src/net/server/mod.rs @@ -182,6 +182,7 @@ pub use connection::Config as ConnectionConfig; pub mod buf; pub mod dgram; +pub mod dispatcher; pub mod error; pub mod message; pub mod metrics; From 890c4c8f77d35528f56d817e311b6c8ddffe2170 Mon Sep 17 00:00:00 2001 From: Ximon Eighteen <3304436+ximon18@users.noreply.github.com> Date: Fri, 13 Sep 2024 21:59:11 +0200 Subject: [PATCH 3/6] Rename module from dispatcher to invoker. --- src/net/server/connection.rs | 2 +- src/net/server/dgram.rs | 2 +- src/net/server/{dispatcher.rs => invoker.rs} | 0 src/net/server/mod.rs | 2 +- 4 files changed, 3 insertions(+), 3 deletions(-) rename src/net/server/{dispatcher.rs => invoker.rs} (100%) diff --git a/src/net/server/connection.rs b/src/net/server/connection.rs index 7019b6daa..a2af0fd7a 100644 --- a/src/net/server/connection.rs +++ b/src/net/server/connection.rs @@ -29,7 +29,7 @@ use crate::net::server::service::{Service, ServiceError}; use crate::net::server::util::to_pcap_text; use crate::utils::config::DefMinMax; -use super::dispatcher::{InvokerStatus, ServiceInvoker}; +use super::invoker::{InvokerStatus, ServiceInvoker}; use super::message::{NonUdpTransportContext, TransportSpecificContext}; use super::stream::Config as ServerConfig; use super::ServerCommand; diff --git a/src/net/server/dgram.rs b/src/net/server/dgram.rs index c718a1678..8231b43ae 100644 --- a/src/net/server/dgram.rs +++ b/src/net/server/dgram.rs @@ -46,7 +46,7 @@ use crate::net::server::util::to_pcap_text; use crate::utils::config::DefMinMax; use super::buf::VecBufSource; -use super::dispatcher::{InvokerStatus, ServiceInvoker}; +use super::invoker::{InvokerStatus, ServiceInvoker}; use super::message::{TransportSpecificContext, UdpTransportContext}; use super::ServerCommand; diff --git a/src/net/server/dispatcher.rs b/src/net/server/invoker.rs similarity index 100% rename from src/net/server/dispatcher.rs rename to src/net/server/invoker.rs diff --git a/src/net/server/mod.rs b/src/net/server/mod.rs index 341b2ba5c..e1cffa7c0 100644 --- a/src/net/server/mod.rs +++ b/src/net/server/mod.rs @@ -182,7 +182,7 @@ pub use connection::Config as ConnectionConfig; pub mod buf; pub mod dgram; -pub mod dispatcher; +pub mod invoker; pub mod error; pub mod message; pub mod metrics; From f53310c3e91f0d8327625ddd25d563bc8e2128bd Mon Sep 17 00:00:00 2001 From: Ximon Eighteen <3304436+ximon18@users.noreply.github.com> Date: Fri, 13 Sep 2024 22:25:11 +0200 Subject: [PATCH 4/6] Fix compilation to be copatible with `cargo +nightly update -Z minimal-versions`. --- src/net/server/connection.rs | 11 ++++-- src/net/server/dgram.rs | 17 +++++---- src/net/server/invoker.rs | 74 +++++++++++++++++++++--------------- src/net/server/mod.rs | 2 +- 4 files changed, 62 insertions(+), 42 deletions(-) diff --git a/src/net/server/connection.rs b/src/net/server/connection.rs index a2af0fd7a..a9b37f38d 100644 --- a/src/net/server/connection.rs +++ b/src/net/server/connection.rs @@ -1,7 +1,10 @@ //! Support for stream based connections. +use core::future::Future; use core::ops::{ControlFlow, Deref}; +use core::pin::Pin; use core::time::Duration; +use std::boxed::Box; use std::fmt::Display; use std::io; use std::net::SocketAddr; @@ -1113,7 +1116,7 @@ where impl ServiceInvoker for ServiceResponseHandler where - RequestOctets: Octets + Send + Sync, + RequestOctets: Octets + Send + Sync + 'static, Svc: Service + Clone + Send + Sync, Svc::Target: Composer + Default + Send, { @@ -1129,11 +1132,11 @@ where self.update_config(idle_timeout); } - async fn enqueue_response( + fn enqueue_response( &self, response: AdditionalBuilder>, _meta: &(), - ) { - self.do_enqueue_response(response).await + ) -> Pin + Send + '_>> { + Box::pin(async move { self.do_enqueue_response(response).await }) } } diff --git a/src/net/server/dgram.rs b/src/net/server/dgram.rs index 8231b43ae..0183eb837 100644 --- a/src/net/server/dgram.rs +++ b/src/net/server/dgram.rs @@ -11,9 +11,12 @@ //! [Datagram]: https://en.wikipedia.org/wiki/Datagram use core::fmt::Debug; use core::future::poll_fn; +use core::future::Future; use core::ops::Deref; +use core::pin::Pin; use core::time::Duration; +use std::boxed::Box; use std::io; use std::net::SocketAddr; use std::string::String; @@ -731,8 +734,8 @@ impl ServiceInvoker for ServiceResponseHandler where Sock: AsyncDgramSock + Send + Sync + 'static, - RequestOctets: Octets + Send + Sync, - Svc: Service + Clone + Send + Sync, + RequestOctets: Octets + Send + Sync + 'static, + Svc: Service + Clone + Send + Sync + 'static, Svc::Target: Composer + Default + Send, { fn status(&self) -> InvokerStatus { @@ -747,11 +750,11 @@ where // N/A } - async fn enqueue_response( - &self, + fn enqueue_response<'a>( + &'a self, response: AdditionalBuilder>, - addr: &SocketAddr, - ) { - self.send_response(*addr, response).await + addr: &'a SocketAddr, + ) -> Pin + Send + '_>> { + Box::pin(async move { self.send_response(*addr, response).await }) } } diff --git a/src/net/server/invoker.rs b/src/net/server/invoker.rs index f84b60017..b5f2617f7 100644 --- a/src/net/server/invoker.rs +++ b/src/net/server/invoker.rs @@ -4,7 +4,10 @@ /// [`dgram::Dgram`][net::server::dgram::Dgram]. use core::clone::Clone; use core::default::Default; +use core::future::Future; +use core::pin::Pin; use core::time::Duration; +use std::boxed::Box; use futures_util::StreamExt; use octseq::Octets; @@ -47,12 +50,12 @@ pub enum InvokerStatus { /// /// Also handles [`ServiceFeedback`] by invoking fn impls on the trait /// implementing type. -#[allow(async_fn_in_trait)] pub trait ServiceInvoker where - Svc: Service, + Svc: Service + Send + Sync + 'static, Svc::Target: Composer + Default, - RequestOctets: Octets + Send + Sync, + RequestOctets: Octets + Send + Sync + 'static, + EnqueueMeta: Send + Sync + 'static, { /// Dispatch a request and process the responses. /// @@ -65,39 +68,50 @@ where /// /// On [`ServiceFeedback::Reconfigure`] passes the new configuration data /// to the trait impl'd [`reconfugure`] function. - async fn dispatch( + fn dispatch( &mut self, request: Request, svc: Svc, enqueue_meta: EnqueueMeta, - ) { - let req_msg = request.message().clone(); - let request_id = request.message().header().id(); - - // Dispatch the request to the service for processing. - trace!("Calling service for request id {request_id}"); - let mut stream = svc.call(request).await; - - // Handle the resulting stream of responses, most likely just one as - // only XFR requests potentially result in multiple responses. - trace!("Awaiting service call results for request id {request_id}"); - while let Some(item) = stream.next().await { + ) -> Pin + Send + '_>> + where + Self: Send + Sync, + Svc::Target: Send, + Svc::Stream: Send, + Svc::Future: Send, + { + Box::pin(async move { + let req_msg = request.message().clone(); + let request_id = request.message().header().id(); + + // Dispatch the request to the service for processing. + trace!("Calling service for request id {request_id}"); + let mut stream = svc.call(request).await; + + // Handle the resulting stream of responses, most likely just one as + // only XFR requests potentially result in multiple responses. trace!( - "Processing service call result for request id {request_id}" + "Awaiting service call results for request id {request_id}" ); + while let Some(item) = stream.next().await { + trace!( + "Processing service call result for request id {request_id}" + ); - let response = self.process_response_stream_item(item, &req_msg); + let response = + self.process_response_stream_item(item, &req_msg); - if let Some(response) = response { - self.enqueue_response(response, &enqueue_meta).await; - } + if let Some(response) = response { + self.enqueue_response(response, &enqueue_meta).await; + } - if matches!(self.status(), InvokerStatus::Aborting) { - trace!("Aborting response stream processing for request id {request_id}"); - break; + if matches!(self.status(), InvokerStatus::Aborting) { + trace!("Aborting response stream processing for request id {request_id}"); + break; + } } - } - trace!("Finished processing service call results for request id {request_id}"); + trace!("Finished processing service call results for request id {request_id}"); + }) } /// Processing a single response stream item. @@ -166,9 +180,9 @@ where fn reconfigure(&self, idle_timeout: Option); /// Enqueues a response for writing back to the client. - async fn enqueue_response( - &self, + fn enqueue_response<'a>( + &'a self, response: AdditionalBuilder>, - meta: &EnqueueMeta, - ); + meta: &'a EnqueueMeta, + ) -> Pin + Send + '_>>; } diff --git a/src/net/server/mod.rs b/src/net/server/mod.rs index e1cffa7c0..5cb615f63 100644 --- a/src/net/server/mod.rs +++ b/src/net/server/mod.rs @@ -182,8 +182,8 @@ pub use connection::Config as ConnectionConfig; pub mod buf; pub mod dgram; -pub mod invoker; pub mod error; +pub mod invoker; pub mod message; pub mod metrics; pub mod middleware; From 7e955ce24d7e008a75a030d85f8c1c291ec0d839 Mon Sep 17 00:00:00 2001 From: Ximon Eighteen <3304436+ximon18@users.noreply.github.com> Date: Fri, 13 Sep 2024 22:29:18 +0200 Subject: [PATCH 5/6] Compilation fix for elided named lifetime. --- src/net/server/dgram.rs | 2 +- src/net/server/invoker.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/net/server/dgram.rs b/src/net/server/dgram.rs index 0183eb837..3af4786a6 100644 --- a/src/net/server/dgram.rs +++ b/src/net/server/dgram.rs @@ -754,7 +754,7 @@ where &'a self, response: AdditionalBuilder>, addr: &'a SocketAddr, - ) -> Pin + Send + '_>> { + ) -> Pin + Send + 'a>> { Box::pin(async move { self.send_response(*addr, response).await }) } } diff --git a/src/net/server/invoker.rs b/src/net/server/invoker.rs index b5f2617f7..db6e706ba 100644 --- a/src/net/server/invoker.rs +++ b/src/net/server/invoker.rs @@ -184,5 +184,5 @@ where &'a self, response: AdditionalBuilder>, meta: &'a EnqueueMeta, - ) -> Pin + Send + '_>>; + ) -> Pin + Send + 'a>>; } From 183ae91fe7a450d8b829956554692d6f901dd530 Mon Sep 17 00:00:00 2001 From: Ximon Eighteen <3304436+ximon18@users.noreply.github.com> Date: Thu, 3 Oct 2024 15:00:50 +0200 Subject: [PATCH 6/6] Added a comment. --- src/net/server/connection.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/net/server/connection.rs b/src/net/server/connection.rs index a35acf23d..06f8b61a0 100644 --- a/src/net/server/connection.rs +++ b/src/net/server/connection.rs @@ -680,6 +680,9 @@ where tracing::warn!( "Failed while parsing request message: {err}" ); + // Consider the client to be a threat to us if it is + // sending garbage that we can't parse: disconnect it + // immediately. return Err(ConnectionEvent::DisconnectWithoutFlush); }