Skip to content

Commit

Permalink
feat: add configuration for idle timeout and max idle connections (#1121
Browse files Browse the repository at this point in the history
)
  • Loading branch information
nadin-Starkware authored Oct 6, 2024
1 parent 54765ac commit 1b5af9e
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 13 deletions.
65 changes: 65 additions & 0 deletions config/mempool/default_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,19 @@
"privacy": "TemporaryValue",
"value": true
},
"components.batcher.remote_config.idle_connections": {
"description": "The maximum number of idle connections to keep alive.",
"privacy": "Public",
"value": 18446744073709551615
},
"components.batcher.remote_config.idle_timeout": {
"description": "The duration to keep an idle connection open before closing.",
"privacy": "Public",
"value": {
"nanos": 0,
"secs": 90
}
},
"components.batcher.remote_config.retries": {
"description": "The max number of retries for sending a message.",
"privacy": "Public",
Expand Down Expand Up @@ -119,6 +132,19 @@
"privacy": "TemporaryValue",
"value": true
},
"components.consensus_manager.remote_config.idle_connections": {
"description": "The maximum number of idle connections to keep alive.",
"privacy": "Public",
"value": 18446744073709551615
},
"components.consensus_manager.remote_config.idle_timeout": {
"description": "The duration to keep an idle connection open before closing.",
"privacy": "Public",
"value": {
"nanos": 0,
"secs": 90
}
},
"components.consensus_manager.remote_config.retries": {
"description": "The max number of retries for sending a message.",
"privacy": "Public",
Expand Down Expand Up @@ -154,6 +180,19 @@
"privacy": "TemporaryValue",
"value": true
},
"components.gateway.remote_config.idle_connections": {
"description": "The maximum number of idle connections to keep alive.",
"privacy": "Public",
"value": 18446744073709551615
},
"components.gateway.remote_config.idle_timeout": {
"description": "The duration to keep an idle connection open before closing.",
"privacy": "Public",
"value": {
"nanos": 0,
"secs": 90
}
},
"components.gateway.remote_config.retries": {
"description": "The max number of retries for sending a message.",
"privacy": "Public",
Expand Down Expand Up @@ -189,6 +228,19 @@
"privacy": "TemporaryValue",
"value": false
},
"components.http_server.remote_config.idle_connections": {
"description": "The maximum number of idle connections to keep alive.",
"privacy": "Public",
"value": 18446744073709551615
},
"components.http_server.remote_config.idle_timeout": {
"description": "The duration to keep an idle connection open before closing.",
"privacy": "Public",
"value": {
"nanos": 0,
"secs": 90
}
},
"components.http_server.remote_config.retries": {
"description": "The max number of retries for sending a message.",
"privacy": "Public",
Expand Down Expand Up @@ -224,6 +276,19 @@
"privacy": "TemporaryValue",
"value": true
},
"components.mempool.remote_config.idle_connections": {
"description": "The maximum number of idle connections to keep alive.",
"privacy": "Public",
"value": 18446744073709551615
},
"components.mempool.remote_config.idle_timeout": {
"description": "The duration to keep an idle connection open before closing.",
"privacy": "Public",
"value": {
"nanos": 0,
"secs": 90
}
},
"components.mempool.remote_config.retries": {
"description": "The max number of retries for sending a message.",
"privacy": "Public",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ use crate::serde_utils::BincodeSerdeWrapper;
/// ```rust
/// // Example usage of the RemoteComponentClient
///
/// use std::time::Duration;
///
/// use serde::{Deserialize, Serialize};
///
/// use crate::starknet_mempool_infra::component_client::RemoteComponentClient;
Expand All @@ -53,7 +55,12 @@ use crate::serde_utils::BincodeSerdeWrapper;
/// let ip_address = std::net::IpAddr::V6(std::net::Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1));
/// let port: u16 = 8080;
/// let socket = std::net::SocketAddr::new(ip_address, port);
/// let config = RemoteComponentCommunicationConfig { socket, retries: 3 };
/// let config = RemoteComponentCommunicationConfig {
/// socket,
/// retries: 3,
/// idle_connections: usize::MAX,
/// idle_timeout: Duration::from_secs(90),
/// };
/// let client = RemoteComponentClient::<MyRequest, MyResponse>::new(config);
///
/// // Instantiate a request.
Expand Down Expand Up @@ -94,8 +101,11 @@ where
};
// TODO(Tsabary): Add a configuration for the maximum number of idle connections.
// TODO(Tsabary): Add a configuration for "keep-alive" time of idle connections.
let client =
Client::builder().http2_only(true).pool_max_idle_per_host(usize::MAX).build_http();
let client = Client::builder()
.http2_only(true)
.pool_max_idle_per_host(config.idle_connections)
.pool_idle_timeout(config.idle_timeout)
.build_http();
Self { uri, client, config, _req: PhantomData, _res: PhantomData }
}

Expand Down
24 changes: 23 additions & 1 deletion crates/mempool_infra/src/component_definitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::any::type_name;
use std::collections::BTreeMap;
use std::fmt::Debug;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::time::Duration;

use async_trait::async_trait;
use papyrus_config::dumping::{ser_param, SerializeConfig};
Expand All @@ -17,6 +18,8 @@ use crate::errors::ComponentError;
pub const APPLICATION_OCTET_STREAM: &str = "application/octet-stream";
const DEFAULT_CHANNEL_BUFFER_SIZE: usize = 32;
const DEFAULT_RETRIES: usize = 3;
const DEFAULT_IDLE_CONNECTIONS: usize = usize::MAX;
const DEFAULT_IDLE_TIMEOUT: Duration = Duration::from_secs(90);

#[async_trait]
pub trait ComponentRequestHandler<Request, Response> {
Expand Down Expand Up @@ -93,6 +96,8 @@ impl Default for LocalComponentCommunicationConfig {
pub struct RemoteComponentCommunicationConfig {
pub socket: SocketAddr,
pub retries: usize,
pub idle_connections: usize,
pub idle_timeout: Duration,
}

impl SerializeConfig for RemoteComponentCommunicationConfig {
Expand All @@ -110,13 +115,30 @@ impl SerializeConfig for RemoteComponentCommunicationConfig {
"The max number of retries for sending a message.",
ParamPrivacyInput::Public,
),
ser_param(
"idle_connections",
&self.idle_connections,
"The maximum number of idle connections to keep alive.",
ParamPrivacyInput::Public,
),
ser_param(
"idle_timeout",
&self.idle_timeout,
"The duration to keep an idle connection open before closing.",
ParamPrivacyInput::Public,
),
])
}
}

impl Default for RemoteComponentCommunicationConfig {
fn default() -> Self {
let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 8080);
Self { socket, retries: DEFAULT_RETRIES }
Self {
socket,
retries: DEFAULT_RETRIES,
idle_connections: DEFAULT_IDLE_CONNECTIONS,
idle_timeout: DEFAULT_IDLE_TIMEOUT,
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::fmt::Debug;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;

use async_trait::async_trait;
use hyper::body::to_bytes;
Expand Down Expand Up @@ -53,7 +54,8 @@ use crate::tests::{
type ComponentAClient = RemoteComponentClient<ComponentARequest, ComponentAResponse>;
type ComponentBClient = RemoteComponentClient<ComponentBRequest, ComponentBResponse>;

const MAX_RETRIES: usize = 0;
const MAX_IDLE_CONNECTION: usize = usize::MAX;
const IDLE_TIMEOUT: Duration = Duration::from_secs(90);
const MOCK_SERVER_ERROR: &str = "mock server error";
const ARBITRARY_DATA: &str = "arbitrary data";
// ServerError::RequestDeserializationFailure error message.
Expand Down Expand Up @@ -140,13 +142,13 @@ where
// Ensure the server starts running.
task::yield_now().await;

let config = RemoteComponentCommunicationConfig { socket, retries: MAX_RETRIES };
let config = RemoteComponentCommunicationConfig { socket, ..Default::default() };
ComponentAClient::new(config)
}

async fn setup_for_tests(setup_value: ValueB, a_socket: SocketAddr, b_socket: SocketAddr) {
let a_config = RemoteComponentCommunicationConfig { socket: a_socket, retries: MAX_RETRIES };
let b_config = RemoteComponentCommunicationConfig { socket: b_socket, retries: MAX_RETRIES };
let a_config = RemoteComponentCommunicationConfig { socket: a_socket, ..Default::default() };
let b_config = RemoteComponentCommunicationConfig { socket: b_socket, ..Default::default() };

let a_remote_client = ComponentAClient::new(a_config);
let b_remote_client = ComponentBClient::new(b_config);
Expand Down Expand Up @@ -196,8 +198,8 @@ async fn test_proper_setup() {
let b_socket = get_available_socket().await;

setup_for_tests(setup_value, a_socket, b_socket).await;
let a_config = RemoteComponentCommunicationConfig { socket: a_socket, retries: MAX_RETRIES };
let b_config = RemoteComponentCommunicationConfig { socket: b_socket, retries: MAX_RETRIES };
let a_config = RemoteComponentCommunicationConfig { socket: a_socket, ..Default::default() };
let b_config = RemoteComponentCommunicationConfig { socket: b_socket, ..Default::default() };

let a_remote_client = ComponentAClient::new(a_config);
let b_remote_client = ComponentBClient::new(b_config);
Expand Down Expand Up @@ -242,7 +244,7 @@ async fn test_faulty_client_setup() {
#[tokio::test]
async fn test_unconnected_server() {
let socket = get_available_socket().await;
let config = RemoteComponentCommunicationConfig { socket, retries: MAX_RETRIES };
let config = RemoteComponentCommunicationConfig { socket, ..Default::default() };
let client = ComponentAClient::new(config);
let expected_error_contained_keywords = ["Connection refused"];
verify_error(client, &expected_error_contained_keywords).await;
Expand Down Expand Up @@ -311,12 +313,22 @@ async fn test_retry_request() {
// The initial server state is 'false', hence the first attempt returns an error and
// sets the server state to 'true'. The second attempt (first retry) therefore returns a
// 'success', while setting the server state to 'false' yet again.
let retry_config = RemoteComponentCommunicationConfig { socket, retries: 1 };
let retry_config = RemoteComponentCommunicationConfig {
socket,
retries: 1,
idle_connections: MAX_IDLE_CONNECTION,
idle_timeout: IDLE_TIMEOUT,
};
let a_client_retry = ComponentAClient::new(retry_config);
assert_eq!(a_client_retry.a_get_value().await.unwrap(), VALID_VALUE_A);

// The current server state is 'false', hence the first and only attempt returns an error.
let no_retry_config = RemoteComponentCommunicationConfig { socket, retries: 0 };
let no_retry_config = RemoteComponentCommunicationConfig {
socket,
retries: 0,
idle_connections: MAX_IDLE_CONNECTION,
idle_timeout: IDLE_TIMEOUT,
};
let a_client_no_retry = ComponentAClient::new(no_retry_config);
let expected_error_contained_keywords = [StatusCode::IM_A_TEAPOT.as_str()];
verify_error(a_client_no_retry.clone(), &expected_error_contained_keywords).await;
Expand Down

0 comments on commit 1b5af9e

Please sign in to comment.