From 3373b3819663bde95b2cd96565072ce94f03fdae Mon Sep 17 00:00:00 2001 From: Charles Beauville Date: Sat, 17 Feb 2024 11:11:50 +0100 Subject: [PATCH 01/22] Add client connection retry handling --- src/py/flwr/client/app.py | 23 ++++++++++++++- src/py/flwr/client/grpc_client/connection.py | 7 +++++ .../client/grpc_rere_client/connection.py | 29 ++++++++++++++++--- src/py/flwr/client/rest_client/connection.py | 28 +++++++++++++++--- 4 files changed, 78 insertions(+), 9 deletions(-) diff --git a/src/py/flwr/client/app.py b/src/py/flwr/client/app.py index 15f7c5057a2..eded55838a6 100644 --- a/src/py/flwr/client/app.py +++ b/src/py/flwr/client/app.py @@ -36,6 +36,9 @@ ) from flwr.common.logger import log, warn_deprecated_feature, warn_experimental_feature from flwr.common.message import Message +from flwr.common.retry_invoker import RetryInvoker, exponential +from grpc import RpcError +from requests.exceptions import ConnectionError from .clientapp import load_client_app from .grpc_client.connection import grpc_connection @@ -272,6 +275,8 @@ def _start_client_internal( root_certificates: Optional[Union[bytes, str]] = None, insecure: Optional[bool] = None, transport: Optional[str] = None, + retry_max_tries: Optional[int] = None, + retry_max_time: Optional[float] = None, ) -> None: """Start a Flower client node which connects to a Flower server. @@ -299,7 +304,7 @@ class `flwr.client.Client` (default: None) The PEM-encoded root certificates as a byte string or a path string. If provided, a secure connection using the certificates will be established to an SSL-enabled Flower server. - insecure : bool (default: True) + insecure : Optional[bool] (default: None) Starts an insecure gRPC connection when True. Enables HTTPS connection when False, using system certificates if `root_certificates` is None. transport : Optional[str] (default: None) @@ -307,6 +312,14 @@ class `flwr.client.Client` (default: None) - 'grpc-bidi': gRPC, bidirectional streaming - 'grpc-rere': gRPC, request-response (experimental) - 'rest': HTTP (experimental) + retry_max_tries: Optional[int] + The maximum number of times the client will try to reconnect to the + server before giving up in case of gRPC error. If set to None, there + is no limit to the number of tries. + retry_max_time: Optional[float] + The maximum total amount of time before the client stops trying to + reconnect to the server before giving up in case of gRPC error. If set + to None, there is no limit to the total time. """ if insecure is None: insecure = root_certificates is None @@ -340,6 +353,13 @@ def _load_client_app() -> ClientApp: # Initialize connection context manager connection, address = _init_connection(transport, server_address) + retry_invoker = RetryInvoker( + exponential, + ConnectionError if transport == "rest" else RpcError, + max_tries=retry_max_tries, + max_time=retry_max_time, + ) + node_state = NodeState() while True: @@ -349,6 +369,7 @@ def _load_client_app() -> ClientApp: insecure, grpc_max_message_length, root_certificates, + retry_invoker, ) as conn: receive, send, create_node, delete_node = conn diff --git a/src/py/flwr/client/grpc_client/connection.py b/src/py/flwr/client/grpc_client/connection.py index e6d21963fcb..4c5d1d65b59 100644 --- a/src/py/flwr/client/grpc_client/connection.py +++ b/src/py/flwr/client/grpc_client/connection.py @@ -36,6 +36,7 @@ from flwr.common.logger import log from flwr.common.message import Message, Metadata from flwr.common.recordset import RecordSet +from flwr.common.retry_invoker import RetryInvoker from flwr.proto.transport_pb2 import ( # pylint: disable=E0611 ClientMessage, Reason, @@ -61,6 +62,7 @@ def grpc_connection( # pylint: disable=R0915 insecure: bool, max_message_length: int = GRPC_MAX_MESSAGE_LENGTH, root_certificates: Optional[Union[bytes, str]] = None, + retry_invoker: Optional[RetryInvoker] = None, # pylint: disable=unused-argument ) -> Iterator[ Tuple[ Callable[[], Optional[Message]], @@ -77,6 +79,9 @@ def grpc_connection( # pylint: disable=R0915 The IPv4 or IPv6 address of the server. If the Flower server runs on the same machine on port 8080, then `server_address` would be `"0.0.0.0:8080"` or `"[::]:8080"`. + insecure : bool + Starts an insecure gRPC connection when True. Enables HTTPS connection + when False, using system certificates if `root_certificates` is None. max_message_length : int The maximum length of gRPC messages that can be exchanged with the Flower server. The default should be sufficient for most models. Users who train @@ -89,6 +94,8 @@ def grpc_connection( # pylint: disable=R0915 The PEM-encoded root certificates as a byte string or a path string. If provided, a secure connection using the certificates will be established to an SSL-enabled Flower server. + retry_invoker: Optional[RetryInvoker] (default: None) + Unused argument present for compatibilty. Returns ------- diff --git a/src/py/flwr/client/grpc_rere_client/connection.py b/src/py/flwr/client/grpc_rere_client/connection.py index 07635d00272..c2cea6d193e 100644 --- a/src/py/flwr/client/grpc_rere_client/connection.py +++ b/src/py/flwr/client/grpc_rere_client/connection.py @@ -20,6 +20,7 @@ from pathlib import Path from typing import Callable, Dict, Iterator, Optional, Tuple, Union, cast +import grpc from flwr.client.message_handler.task_handler import ( configure_task_res, get_task_ins, @@ -30,6 +31,7 @@ from flwr.common.grpc import create_channel from flwr.common.logger import log, warn_experimental_feature from flwr.common.message import Message +from flwr.common.retry_invoker import RetryInvoker, exponential from flwr.common.serde import message_from_taskins, message_to_taskres from flwr.proto.fleet_pb2 import ( # pylint: disable=E0611 CreateNodeRequest, @@ -56,6 +58,7 @@ def grpc_request_response( insecure: bool, max_message_length: int = GRPC_MAX_MESSAGE_LENGTH, # pylint: disable=W0613 root_certificates: Optional[Union[bytes, str]] = None, + retry_invoker: Optional[RetryInvoker] = None, # pylint: disable=unused-argument ) -> Iterator[ Tuple[ Callable[[], Optional[Message]], @@ -75,12 +78,18 @@ def grpc_request_response( The IPv6 address of the server with `http://` or `https://`. If the Flower server runs on the same machine on port 8080, then `server_address` would be `"http://[::]:8080"`. + insecure : bool + Starts an insecure gRPC connection when True. Enables HTTPS connection + when False, using system certificates if `root_certificates` is None. max_message_length : int Ignored, only present to preserve API-compatibility. root_certificates : Optional[Union[bytes, str]] (default: None) Path of the root certificate. If provided, a secure connection using the certificates will be established to an SSL-enabled Flower server. Bytes won't work for the REST API. + retry_invoker: Optional[RetryInvoker] (default: None) + `RetryInvoker` object that will try to reconnect the client to the server + after gRPC errors. Returns ------- @@ -109,6 +118,17 @@ def grpc_request_response( # Enable create_node and delete_node to store node node_store: Dict[str, Optional[Node]] = {KEY_NODE: None} + retry_invoker = ( + RetryInvoker( + exponential, + grpc.RpcError, + max_tries=1, + max_time=None, + ) + if retry_invoker is None + else retry_invoker + ) + ########################################################################### # receive/send functions ########################################################################### @@ -116,7 +136,8 @@ def grpc_request_response( def create_node() -> None: """Set create_node.""" create_node_request = CreateNodeRequest() - create_node_response = stub.CreateNode( + create_node_response = retry_invoker.invoke( + stub.CreateNode, request=create_node_request, ) node_store[KEY_NODE] = create_node_response.node @@ -130,7 +151,7 @@ def delete_node() -> None: node: Node = cast(Node, node_store[KEY_NODE]) delete_node_request = DeleteNodeRequest(node=node) - stub.DeleteNode(request=delete_node_request) + retry_invoker.invoke(stub.DeleteNode, request=delete_node_request) del node_store[KEY_NODE] @@ -144,7 +165,7 @@ def receive() -> Optional[Message]: # Request instructions (task) from server request = PullTaskInsRequest(node=node) - response = stub.PullTaskIns(request=request) + response = retry_invoker.invoke(stub.PullTaskIns, request=request) # Get the current TaskIns task_ins: Optional[TaskIns] = get_task_ins(response) @@ -186,7 +207,7 @@ def send(message: Message) -> None: # Serialize ProtoBuf to bytes request = PushTaskResRequest(task_res_list=[task_res]) - _ = stub.PushTaskRes(request) + _ = retry_invoker.invoke(stub.PushTaskRes, request) state[KEY_TASK_INS] = None diff --git a/src/py/flwr/client/rest_client/connection.py b/src/py/flwr/client/rest_client/connection.py index a5c8ea0957d..9caaf57ecb2 100644 --- a/src/py/flwr/client/rest_client/connection.py +++ b/src/py/flwr/client/rest_client/connection.py @@ -30,6 +30,7 @@ from flwr.common.constant import MISSING_EXTRA_REST from flwr.common.logger import log from flwr.common.message import Message +from flwr.common.retry_invoker import RetryInvoker, exponential from flwr.common.serde import message_from_taskins, message_to_taskres from flwr.proto.fleet_pb2 import ( # pylint: disable=E0611 CreateNodeRequest, @@ -68,6 +69,7 @@ def http_request_response( root_certificates: Optional[ Union[bytes, str] ] = None, # pylint: disable=unused-argument + retry_invoker: Optional[RetryInvoker] = None, # pylint: disable=unused-argument ) -> Iterator[ Tuple[ Callable[[], Optional[Message]], @@ -87,12 +89,16 @@ def http_request_response( The IPv6 address of the server with `http://` or `https://`. If the Flower server runs on the same machine on port 8080, then `server_address` would be `"http://[::]:8080"`. + insecure : bool + Unused argument present for compatibilty. max_message_length : int Ignored, only present to preserve API-compatibility. root_certificates : Optional[Union[bytes, str]] (default: None) Path of the root certificate. If provided, a secure connection using the certificates will be established to an SSL-enabled Flower server. Bytes won't work for the REST API. + retry_invoker: Optional[RetryInvoker] (default: None) + Unused argument present for compatibilty. Returns ------- @@ -128,6 +134,16 @@ def http_request_response( # Enable create_node and delete_node to store node node_store: Dict[str, Optional[Node]] = {KEY_NODE: None} + retry_invoker = ( + RetryInvoker( + exponential, + requests.exceptions.ConnectionError, + max_tries=1, + max_time=None, + ) + if retry_invoker is None + else retry_invoker + ) ########################################################################### # receive/send functions ########################################################################### @@ -137,7 +153,8 @@ def create_node() -> None: create_node_req_proto = CreateNodeRequest() create_node_req_bytes: bytes = create_node_req_proto.SerializeToString() - res = requests.post( + res = retry_invoker.invoke( + requests.post, url=f"{base_url}/{PATH_CREATE_NODE}", headers={ "Accept": "application/protobuf", @@ -180,7 +197,8 @@ def delete_node() -> None: node: Node = cast(Node, node_store[KEY_NODE]) delete_node_req_proto = DeleteNodeRequest(node=node) delete_node_req_req_bytes: bytes = delete_node_req_proto.SerializeToString() - res = requests.post( + res = retry_invoker.invoke( + requests.post, url=f"{base_url}/{PATH_DELETE_NODE}", headers={ "Accept": "application/protobuf", @@ -221,7 +239,8 @@ def receive() -> Optional[Message]: pull_task_ins_req_bytes: bytes = pull_task_ins_req_proto.SerializeToString() # Request instructions (task) from server - res = requests.post( + res = retry_invoker.invoke( + requests.post, url=f"{base_url}/{PATH_PULL_TASK_INS}", headers={ "Accept": "application/protobuf", @@ -303,7 +322,8 @@ def send(message: Message) -> None: ) # Send ClientMessage to server - res = requests.post( + res = retry_invoker.invoke( + requests.post, url=f"{base_url}/{PATH_PUSH_TASK_RES}", headers={ "Accept": "application/protobuf", From 8a6661ad0ac3e3f74c647f6db323beb64f0f95ff Mon Sep 17 00:00:00 2001 From: Charles Beauville Date: Sat, 17 Feb 2024 11:25:27 +0100 Subject: [PATCH 02/22] Format files --- src/py/flwr/client/app.py | 5 +++-- src/py/flwr/client/grpc_rere_client/connection.py | 1 + 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/src/py/flwr/client/app.py b/src/py/flwr/client/app.py index eded55838a6..3ffa0f7b2fb 100644 --- a/src/py/flwr/client/app.py +++ b/src/py/flwr/client/app.py @@ -22,6 +22,9 @@ from pathlib import Path from typing import Callable, ContextManager, Optional, Tuple, Union +from grpc import RpcError +from requests.exceptions import ConnectionError + from flwr.client.client import Client from flwr.client.clientapp import ClientApp from flwr.client.typing import ClientFn @@ -37,8 +40,6 @@ from flwr.common.logger import log, warn_deprecated_feature, warn_experimental_feature from flwr.common.message import Message from flwr.common.retry_invoker import RetryInvoker, exponential -from grpc import RpcError -from requests.exceptions import ConnectionError from .clientapp import load_client_app from .grpc_client.connection import grpc_connection diff --git a/src/py/flwr/client/grpc_rere_client/connection.py b/src/py/flwr/client/grpc_rere_client/connection.py index c2cea6d193e..b41fb7f4e0c 100644 --- a/src/py/flwr/client/grpc_rere_client/connection.py +++ b/src/py/flwr/client/grpc_rere_client/connection.py @@ -21,6 +21,7 @@ from typing import Callable, Dict, Iterator, Optional, Tuple, Union, cast import grpc + from flwr.client.message_handler.task_handler import ( configure_task_res, get_task_ins, From 397c7f1b77b1468c10ec8816cd1d676825d3805a Mon Sep 17 00:00:00 2001 From: Charles Beauville Date: Sat, 17 Feb 2024 11:41:37 +0100 Subject: [PATCH 03/22] Fix typing --- src/py/flwr/client/app.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/py/flwr/client/app.py b/src/py/flwr/client/app.py index 3ffa0f7b2fb..471f5bed269 100644 --- a/src/py/flwr/client/app.py +++ b/src/py/flwr/client/app.py @@ -540,6 +540,7 @@ def _init_connection( Callable[[Message], None], Optional[Callable[[], None]], Optional[Callable[[], None]], + Optional[RetryInvoker], ] ], ], From a9bf7fff8a5e10bbd29885aef15d20de79c2ae30 Mon Sep 17 00:00:00 2001 From: Charles Beauville Date: Sun, 18 Feb 2024 17:10:42 +0100 Subject: [PATCH 04/22] Fix type --- src/py/flwr/client/app.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/py/flwr/client/app.py b/src/py/flwr/client/app.py index 471f5bed269..000e0ea58ab 100644 --- a/src/py/flwr/client/app.py +++ b/src/py/flwr/client/app.py @@ -533,14 +533,13 @@ def _init_connection( transport: Optional[str], server_address: str ) -> Tuple[ Callable[ - [str, bool, int, Union[bytes, str, None]], + [str, bool, int, Union[bytes, str, None], Optional[RetryInvoker]], ContextManager[ Tuple[ Callable[[], Optional[Message]], Callable[[Message], None], Optional[Callable[[], None]], Optional[Callable[[], None]], - Optional[RetryInvoker], ] ], ], From fbc580d582b2d4940b59541b31489c5ac2bfec40 Mon Sep 17 00:00:00 2001 From: Charles Beauville Date: Sun, 18 Feb 2024 17:17:10 +0100 Subject: [PATCH 05/22] Rename connection error --- src/py/flwr/client/app.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/py/flwr/client/app.py b/src/py/flwr/client/app.py index 000e0ea58ab..389ecd1d227 100644 --- a/src/py/flwr/client/app.py +++ b/src/py/flwr/client/app.py @@ -23,12 +23,16 @@ from typing import Callable, ContextManager, Optional, Tuple, Union from grpc import RpcError -from requests.exceptions import ConnectionError +from requests.exceptions import ConnectionError as RequestsConnectionError from flwr.client.client import Client from flwr.client.clientapp import ClientApp from flwr.client.typing import ClientFn -from flwr.common import GRPC_MAX_MESSAGE_LENGTH, EventType, event +from flwr.common import ( + GRPC_MAX_MESSAGE_LENGTH, + EventType, + event as RequestsConnectionError, +) from flwr.common.address import parse_address from flwr.common.constant import ( MISSING_EXTRA_REST, @@ -356,7 +360,7 @@ def _load_client_app() -> ClientApp: retry_invoker = RetryInvoker( exponential, - ConnectionError if transport == "rest" else RpcError, + RequestsConnectionError if transport == "rest" else RpcError, max_tries=retry_max_tries, max_time=retry_max_time, ) From 8ceabe0d8adfdb87ffeb3f8076fcdb4fd65f2e3c Mon Sep 17 00:00:00 2001 From: Charles Beauville Date: Sun, 18 Feb 2024 17:19:01 +0100 Subject: [PATCH 06/22] Fix name error --- src/py/flwr/client/app.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/src/py/flwr/client/app.py b/src/py/flwr/client/app.py index 389ecd1d227..5570105b4c2 100644 --- a/src/py/flwr/client/app.py +++ b/src/py/flwr/client/app.py @@ -28,11 +28,7 @@ from flwr.client.client import Client from flwr.client.clientapp import ClientApp from flwr.client.typing import ClientFn -from flwr.common import ( - GRPC_MAX_MESSAGE_LENGTH, - EventType, - event as RequestsConnectionError, -) +from flwr.common import GRPC_MAX_MESSAGE_LENGTH, EventType, event from flwr.common.address import parse_address from flwr.common.constant import ( MISSING_EXTRA_REST, From 3f71762dffd77e778d46068948163c92cd94e845 Mon Sep 17 00:00:00 2001 From: Charles Beauville Date: Sun, 18 Feb 2024 17:30:48 +0100 Subject: [PATCH 07/22] Correct docstrings --- src/py/flwr/client/app.py | 8 ++++---- src/py/flwr/client/grpc_rere_client/connection.py | 3 ++- src/py/flwr/client/rest_client/connection.py | 4 +++- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/src/py/flwr/client/app.py b/src/py/flwr/client/app.py index 5570105b4c2..91a025490e3 100644 --- a/src/py/flwr/client/app.py +++ b/src/py/flwr/client/app.py @@ -315,12 +315,12 @@ class `flwr.client.Client` (default: None) - 'rest': HTTP (experimental) retry_max_tries: Optional[int] The maximum number of times the client will try to reconnect to the - server before giving up in case of gRPC error. If set to None, there - is no limit to the number of tries. + server before giving up in case of a connection error. If set to None, + there is no limit to the number of tries. retry_max_time: Optional[float] The maximum total amount of time before the client stops trying to - reconnect to the server before giving up in case of gRPC error. If set - to None, there is no limit to the total time. + reconnect to the server before giving up in case of connection error. + If set to None, there is no limit to the total time. """ if insecure is None: insecure = root_certificates is None diff --git a/src/py/flwr/client/grpc_rere_client/connection.py b/src/py/flwr/client/grpc_rere_client/connection.py index b41fb7f4e0c..75d1ba425c1 100644 --- a/src/py/flwr/client/grpc_rere_client/connection.py +++ b/src/py/flwr/client/grpc_rere_client/connection.py @@ -90,7 +90,8 @@ def grpc_request_response( Flower server. Bytes won't work for the REST API. retry_invoker: Optional[RetryInvoker] (default: None) `RetryInvoker` object that will try to reconnect the client to the server - after gRPC errors. + after gRPC errors. If None, the client will only try to + reconnect once after a failure. Returns ------- diff --git a/src/py/flwr/client/rest_client/connection.py b/src/py/flwr/client/rest_client/connection.py index 9caaf57ecb2..dd91de14b2d 100644 --- a/src/py/flwr/client/rest_client/connection.py +++ b/src/py/flwr/client/rest_client/connection.py @@ -98,7 +98,9 @@ def http_request_response( connection using the certificates will be established to an SSL-enabled Flower server. Bytes won't work for the REST API. retry_invoker: Optional[RetryInvoker] (default: None) - Unused argument present for compatibilty. + `RetryInvoker` object that will try to reconnect the client to the server + after REST connection errors. If None, the client will only try to + reconnect once after a failure. Returns ------- From 61a0bd08304ec73e32e46f58ad16e8fc38228e32 Mon Sep 17 00:00:00 2001 From: Charles Beauville Date: Sun, 18 Feb 2024 17:52:56 +0100 Subject: [PATCH 08/22] Remove unused import --- src/py/flwr/client/grpc_client/connection.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/py/flwr/client/grpc_client/connection.py b/src/py/flwr/client/grpc_client/connection.py index ae84f484555..87170be30d6 100644 --- a/src/py/flwr/client/grpc_client/connection.py +++ b/src/py/flwr/client/grpc_client/connection.py @@ -34,7 +34,7 @@ ) from flwr.common.grpc import create_channel from flwr.common.logger import log -from flwr.common.retry_invoker import RetryInvoker, exponential +from flwr.common.retry_invoker import RetryInvoker from flwr.proto.transport_pb2 import ( # pylint: disable=E0611 ClientMessage, Reason, From 4e582d52a24057d4ee4f17812eb4cece67d40c50 Mon Sep 17 00:00:00 2001 From: Charles Beauville Date: Tue, 20 Feb 2024 15:56:56 +0100 Subject: [PATCH 09/22] Add arguments --- src/py/flwr/client/app.py | 31 ++++++++++++++++++++++++++++++- 1 file changed, 30 insertions(+), 1 deletion(-) diff --git a/src/py/flwr/client/app.py b/src/py/flwr/client/app.py index f9e1ed80a2e..50c428c41d1 100644 --- a/src/py/flwr/client/app.py +++ b/src/py/flwr/client/app.py @@ -107,6 +107,8 @@ def _load() -> ClientApp: transport="rest" if args.rest else "grpc-rere", root_certificates=root_certificates, insecure=args.insecure, + retry_max_tries=args.retry_max_tries, + retry_max_time=args.retry_max_time, ) event(EventType.RUN_CLIENT_APP_LEAVE) @@ -144,6 +146,21 @@ def _parse_args_run_client_app() -> argparse.ArgumentParser: default="0.0.0.0:9092", help="Server address", ) + parser.add_argument( + "--retry_max_tries", + type=int, + default=1, + help="The maximum number of times the client will try to reconnect to the" + "server before giving up in case of a connection error. If set to None," + "there is no limit to the number of tries.", + ) + parser.add_argument( + "--retry_max_time", + type=float, + help="The maximum total amount of time before the client stops trying to" + "reconnect to the server before giving up in case of connection error." + "If set to None, there is no limit to the total time.", + ) parser.add_argument( "--dir", default="", @@ -183,6 +200,8 @@ def start_client( root_certificates: Optional[Union[bytes, str]] = None, insecure: Optional[bool] = None, transport: Optional[str] = None, + retry_max_tries: Optional[int] = 1, + retry_max_time: Optional[float] = None, ) -> None: """Start a Flower client node which connects to a Flower server. @@ -216,6 +235,14 @@ class `flwr.client.Client` (default: None) - 'grpc-bidi': gRPC, bidirectional streaming - 'grpc-rere': gRPC, request-response (experimental) - 'rest': HTTP (experimental) + retry_max_tries: Optional[int] + The maximum number of times the client will try to reconnect to the + server before giving up in case of a connection error. If set to None, + there is no limit to the number of tries. + retry_max_time: Optional[float] + The maximum total amount of time before the client stops trying to + reconnect to the server before giving up in case of connection error. + If set to None, there is no limit to the total time. Examples -------- @@ -257,6 +284,8 @@ class `flwr.client.Client` (default: None) root_certificates=root_certificates, insecure=insecure, transport=transport, + retry_max_tries=retry_max_tries, + retry_max_time=retry_max_time, ) event(EventType.START_CLIENT_LEAVE) @@ -275,7 +304,7 @@ def _start_client_internal( root_certificates: Optional[Union[bytes, str]] = None, insecure: Optional[bool] = None, transport: Optional[str] = None, - retry_max_tries: Optional[int] = None, + retry_max_tries: Optional[int] = 1, retry_max_time: Optional[float] = None, ) -> None: """Start a Flower client node which connects to a Flower server. From 2e05148b0c6d2c665284f38a4d34674aa26d4563 Mon Sep 17 00:00:00 2001 From: Charles Beauville Date: Wed, 28 Feb 2024 14:49:59 +0100 Subject: [PATCH 10/22] Add logging and make retry_invoker none optional --- src/py/flwr/client/app.py | 27 +++++++++++++++++-- src/py/flwr/client/grpc_client/connection.py | 6 ++--- .../client/grpc_rere_client/connection.py | 21 ++++----------- src/py/flwr/client/rest_client/connection.py | 22 +++++---------- 4 files changed, 39 insertions(+), 37 deletions(-) diff --git a/src/py/flwr/client/app.py b/src/py/flwr/client/app.py index af0fa114715..6af628c59af 100644 --- a/src/py/flwr/client/app.py +++ b/src/py/flwr/client/app.py @@ -388,6 +388,27 @@ def _load_client_app() -> ClientApp: RequestsConnectionError if transport == "rest" else RpcError, max_tries=retry_max_tries, max_time=retry_max_time, + on_giveup=lambda retry_state: log( + DEBUG, + "Giving up reconnection after %.2f seconds and %s tries.", + retry_state.elapsed_time, + retry_state.tries, + ) + if retry_state.tries > 1 + else None, + on_success=lambda retry_state: log( + DEBUG, + "Reconnection successful after %.2f seconds and %s tries.", + retry_state.elapsed_time, + retry_state.tries, + ) + if retry_state.tries > 1 + else None, + on_backoff=lambda retry_state: log( + DEBUG, + "Reconnection attempt failed, retrying in %.2f seconds", + retry_state.actual_wait, + ), ) node_state = NodeState() @@ -397,9 +418,9 @@ def _load_client_app() -> ClientApp: with connection( address, insecure, + retry_invoker, grpc_max_message_length, root_certificates, - retry_invoker, ) as conn: receive, send, create_node, delete_node = conn @@ -558,7 +579,9 @@ def start_numpy_client( ) -def _init_connection(transport: Optional[str], server_address: str) -> Tuple[ +def _init_connection( + transport: Optional[str], server_address: str +) -> Tuple[ Callable[ [str, bool, int, Union[bytes, str, None], Optional[RetryInvoker]], ContextManager[ diff --git a/src/py/flwr/client/grpc_client/connection.py b/src/py/flwr/client/grpc_client/connection.py index fa1889f680f..ddbb5336b2a 100644 --- a/src/py/flwr/client/grpc_client/connection.py +++ b/src/py/flwr/client/grpc_client/connection.py @@ -63,9 +63,9 @@ def on_channel_state_change(channel_connectivity: str) -> None: def grpc_connection( # pylint: disable=R0915 server_address: str, insecure: bool, + retry_invoker: RetryInvoker, # pylint: disable=unused-argument max_message_length: int = GRPC_MAX_MESSAGE_LENGTH, root_certificates: Optional[Union[bytes, str]] = None, - retry_invoker: Optional[RetryInvoker] = None, # pylint: disable=unused-argument ) -> Iterator[ Tuple[ Callable[[], Optional[Message]], @@ -85,6 +85,8 @@ def grpc_connection( # pylint: disable=R0915 insecure : bool Starts an insecure gRPC connection when True. Enables HTTPS connection when False, using system certificates if `root_certificates` is None. + retry_invoker: RetryInvoker + Unused argument present for compatibilty. max_message_length : int The maximum length of gRPC messages that can be exchanged with the Flower server. The default should be sufficient for most models. Users who train @@ -97,8 +99,6 @@ def grpc_connection( # pylint: disable=R0915 The PEM-encoded root certificates as a byte string or a path string. If provided, a secure connection using the certificates will be established to an SSL-enabled Flower server. - retry_invoker: Optional[RetryInvoker] (default: None) - Unused argument present for compatibilty. Returns ------- diff --git a/src/py/flwr/client/grpc_rere_client/connection.py b/src/py/flwr/client/grpc_rere_client/connection.py index 064d85aa2aa..332edcfa812 100644 --- a/src/py/flwr/client/grpc_rere_client/connection.py +++ b/src/py/flwr/client/grpc_rere_client/connection.py @@ -54,9 +54,9 @@ def on_channel_state_change(channel_connectivity: str) -> None: def grpc_request_response( server_address: str, insecure: bool, + retry_invoker: RetryInvoker, max_message_length: int = GRPC_MAX_MESSAGE_LENGTH, # pylint: disable=W0613 root_certificates: Optional[Union[bytes, str]] = None, - retry_invoker: Optional[RetryInvoker] = None, # pylint: disable=unused-argument ) -> Iterator[ Tuple[ Callable[[], Optional[Message]], @@ -79,16 +79,16 @@ def grpc_request_response( insecure : bool Starts an insecure gRPC connection when True. Enables HTTPS connection when False, using system certificates if `root_certificates` is None. + retry_invoker: RetryInvoker + `RetryInvoker` object that will try to reconnect the client to the server + after gRPC errors. If None, the client will only try to + reconnect once after a failure. max_message_length : int Ignored, only present to preserve API-compatibility. root_certificates : Optional[Union[bytes, str]] (default: None) Path of the root certificate. If provided, a secure connection using the certificates will be established to an SSL-enabled Flower server. Bytes won't work for the REST API. - retry_invoker: Optional[RetryInvoker] (default: None) - `RetryInvoker` object that will try to reconnect the client to the server - after gRPC errors. If None, the client will only try to - reconnect once after a failure. Returns ------- @@ -117,17 +117,6 @@ def grpc_request_response( # Enable create_node and delete_node to store node node_store: Dict[str, Optional[Node]] = {KEY_NODE: None} - retry_invoker = ( - RetryInvoker( - exponential, - grpc.RpcError, - max_tries=1, - max_time=None, - ) - if retry_invoker is None - else retry_invoker - ) - ########################################################################### # receive/send functions ########################################################################### diff --git a/src/py/flwr/client/rest_client/connection.py b/src/py/flwr/client/rest_client/connection.py index 724d66ad4f1..d2cc71ba3b3 100644 --- a/src/py/flwr/client/rest_client/connection.py +++ b/src/py/flwr/client/rest_client/connection.py @@ -27,7 +27,7 @@ from flwr.common.constant import MISSING_EXTRA_REST from flwr.common.logger import log from flwr.common.message import Message, Metadata -from flwr.common.retry_invoker import RetryInvoker, exponential +from flwr.common.retry_invoker import RetryInvoker from flwr.common.serde import message_from_taskins, message_to_taskres from flwr.proto.fleet_pb2 import ( # pylint: disable=E0611 CreateNodeRequest, @@ -62,11 +62,11 @@ def http_request_response( server_address: str, insecure: bool, # pylint: disable=unused-argument + retry_invoker: RetryInvoker, max_message_length: int = GRPC_MAX_MESSAGE_LENGTH, # pylint: disable=W0613 root_certificates: Optional[ Union[bytes, str] ] = None, # pylint: disable=unused-argument - retry_invoker: Optional[RetryInvoker] = None, # pylint: disable=unused-argument ) -> Iterator[ Tuple[ Callable[[], Optional[Message]], @@ -88,16 +88,16 @@ def http_request_response( on port 8080, then `server_address` would be `"http://[::]:8080"`. insecure : bool Unused argument present for compatibilty. + retry_invoker: RetryInvoker + `RetryInvoker` object that will try to reconnect the client to the server + after REST connection errors. If None, the client will only try to + reconnect once after a failure. max_message_length : int Ignored, only present to preserve API-compatibility. root_certificates : Optional[Union[bytes, str]] (default: None) Path of the root certificate. If provided, a secure connection using the certificates will be established to an SSL-enabled Flower server. Bytes won't work for the REST API. - retry_invoker: Optional[RetryInvoker] (default: None) - `RetryInvoker` object that will try to reconnect the client to the server - after REST connection errors. If None, the client will only try to - reconnect once after a failure. Returns ------- @@ -133,16 +133,6 @@ def http_request_response( # Enable create_node and delete_node to store node node_store: Dict[str, Optional[Node]] = {KEY_NODE: None} - retry_invoker = ( - RetryInvoker( - exponential, - requests.exceptions.ConnectionError, - max_tries=1, - max_time=None, - ) - if retry_invoker is None - else retry_invoker - ) ########################################################################### # receive/send functions ########################################################################### From 4b004c6177180b29d0a8dc115d2f4ba01f4f5a9f Mon Sep 17 00:00:00 2001 From: Charles Beauville Date: Wed, 28 Feb 2024 15:21:15 +0100 Subject: [PATCH 11/22] Reformat --- src/py/flwr/client/app.py | 40 ++++++++++++++++++++------------------- 1 file changed, 21 insertions(+), 19 deletions(-) diff --git a/src/py/flwr/client/app.py b/src/py/flwr/client/app.py index 6af628c59af..22bde9d3448 100644 --- a/src/py/flwr/client/app.py +++ b/src/py/flwr/client/app.py @@ -388,22 +388,26 @@ def _load_client_app() -> ClientApp: RequestsConnectionError if transport == "rest" else RpcError, max_tries=retry_max_tries, max_time=retry_max_time, - on_giveup=lambda retry_state: log( - DEBUG, - "Giving up reconnection after %.2f seconds and %s tries.", - retry_state.elapsed_time, - retry_state.tries, - ) - if retry_state.tries > 1 - else None, - on_success=lambda retry_state: log( - DEBUG, - "Reconnection successful after %.2f seconds and %s tries.", - retry_state.elapsed_time, - retry_state.tries, - ) - if retry_state.tries > 1 - else None, + on_giveup=lambda retry_state: ( + log( + DEBUG, + "Giving up reconnection after %.2f seconds and %s tries.", + retry_state.elapsed_time, + retry_state.tries, + ) + if retry_state.tries > 1 + else None + ), + on_success=lambda retry_state: ( + log( + DEBUG, + "Reconnection successful after %.2f seconds and %s tries.", + retry_state.elapsed_time, + retry_state.tries, + ) + if retry_state.tries > 1 + else None + ), on_backoff=lambda retry_state: log( DEBUG, "Reconnection attempt failed, retrying in %.2f seconds", @@ -579,9 +583,7 @@ def start_numpy_client( ) -def _init_connection( - transport: Optional[str], server_address: str -) -> Tuple[ +def _init_connection(transport: Optional[str], server_address: str) -> Tuple[ Callable[ [str, bool, int, Union[bytes, str, None], Optional[RetryInvoker]], ContextManager[ From 3f07bb7a13da6f3f113ae6fd5e1842147cfb5894 Mon Sep 17 00:00:00 2001 From: Charles Beauville Date: Wed, 28 Feb 2024 17:48:16 +0100 Subject: [PATCH 12/22] Remove unused imports --- src/py/flwr/client/grpc_rere_client/connection.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/py/flwr/client/grpc_rere_client/connection.py b/src/py/flwr/client/grpc_rere_client/connection.py index 332edcfa812..e6e22998b94 100644 --- a/src/py/flwr/client/grpc_rere_client/connection.py +++ b/src/py/flwr/client/grpc_rere_client/connection.py @@ -21,15 +21,13 @@ from pathlib import Path from typing import Callable, Dict, Iterator, Optional, Tuple, Union, cast -import grpc - from flwr.client.message_handler.message_handler import validate_out_message from flwr.client.message_handler.task_handler import get_task_ins, validate_task_ins from flwr.common import GRPC_MAX_MESSAGE_LENGTH from flwr.common.grpc import create_channel from flwr.common.logger import log, warn_experimental_feature from flwr.common.message import Message, Metadata -from flwr.common.retry_invoker import RetryInvoker, exponential +from flwr.common.retry_invoker import RetryInvoker from flwr.common.serde import message_from_taskins, message_to_taskres from flwr.proto.fleet_pb2 import ( # pylint: disable=E0611 CreateNodeRequest, From 5e0bcc9bbe2283b3e8a67fb863bafb0a4dffe9f1 Mon Sep 17 00:00:00 2001 From: Charles Beauville Date: Wed, 28 Feb 2024 17:57:22 +0100 Subject: [PATCH 13/22] Fix type hints --- src/py/flwr/client/app.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/py/flwr/client/app.py b/src/py/flwr/client/app.py index 22bde9d3448..e727f04e905 100644 --- a/src/py/flwr/client/app.py +++ b/src/py/flwr/client/app.py @@ -585,7 +585,7 @@ def start_numpy_client( def _init_connection(transport: Optional[str], server_address: str) -> Tuple[ Callable[ - [str, bool, int, Union[bytes, str, None], Optional[RetryInvoker]], + [str, bool, RetryInvoker, int, Union[bytes, str, None]], ContextManager[ Tuple[ Callable[[], Optional[Message]], From b083edaf81bb67de6befb83d8b05757cffd39895 Mon Sep 17 00:00:00 2001 From: Charles Beauville Date: Wed, 28 Feb 2024 18:05:46 +0100 Subject: [PATCH 14/22] Add retry_invoker in test --- src/py/flwr/client/grpc_client/connection_test.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/src/py/flwr/client/grpc_client/connection_test.py b/src/py/flwr/client/grpc_client/connection_test.py index 30bff068b60..1f2218a55fb 100644 --- a/src/py/flwr/client/grpc_client/connection_test.py +++ b/src/py/flwr/client/grpc_client/connection_test.py @@ -33,6 +33,7 @@ ) from flwr.server.client_manager import SimpleClientManager from flwr.server.superlink.fleet.grpc_bidi.grpc_server import start_grpc_server +from py.flwr.common.retry_invoker import RetryInvoker, exponential from .connection import grpc_connection @@ -127,7 +128,16 @@ def test_integration_connection() -> None: def run_client() -> int: messages_received: int = 0 - with grpc_connection(server_address=f"[::]:{port}", insecure=True) as conn: + with grpc_connection( + server_address=f"[::]:{port}", + insecure=True, + retry_invoker=RetryInvoker( + wait_factory=exponential, + recoverable_exceptions=grpc.RpcError, + max_tries=1, + max_time=None, + ), + ) as conn: receive, send, _, _ = conn # Setup processing loop From ae0bfcb7e412ff130814917b1a8efb1f1bd7d941 Mon Sep 17 00:00:00 2001 From: Charles Beauville Date: Wed, 28 Feb 2024 18:15:30 +0100 Subject: [PATCH 15/22] Fix import --- src/py/flwr/client/grpc_client/connection_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/py/flwr/client/grpc_client/connection_test.py b/src/py/flwr/client/grpc_client/connection_test.py index 1f2218a55fb..28e03979fd6 100644 --- a/src/py/flwr/client/grpc_client/connection_test.py +++ b/src/py/flwr/client/grpc_client/connection_test.py @@ -26,6 +26,7 @@ from flwr.common import ConfigsRecord, Message, Metadata, RecordSet from flwr.common import recordset_compat as compat from flwr.common.constant import MESSAGE_TYPE_GET_PROPERTIES +from flwr.common.retry_invoker import RetryInvoker, exponential from flwr.common.typing import Code, GetPropertiesRes, Status from flwr.proto.transport_pb2 import ( # pylint: disable=E0611 ClientMessage, @@ -33,7 +34,6 @@ ) from flwr.server.client_manager import SimpleClientManager from flwr.server.superlink.fleet.grpc_bidi.grpc_server import start_grpc_server -from py.flwr.common.retry_invoker import RetryInvoker, exponential from .connection import grpc_connection From 73ee50ccca2d5a8818148566066c45d28f4caee0 Mon Sep 17 00:00:00 2001 From: Charles Beauville Date: Thu, 29 Feb 2024 10:57:13 +0100 Subject: [PATCH 16/22] Update docstrings and args --- src/py/flwr/client/app.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/py/flwr/client/app.py b/src/py/flwr/client/app.py index e727f04e905..98a57204a7d 100644 --- a/src/py/flwr/client/app.py +++ b/src/py/flwr/client/app.py @@ -151,15 +151,15 @@ def _parse_args_run_client_app() -> argparse.ArgumentParser: "--retry_max_tries", type=int, default=1, - help="The maximum number of times the client will try to reconnect to the" + help="The maximum number of times the client will try to connect to the" "server before giving up in case of a connection error. If set to None," "there is no limit to the number of tries.", ) parser.add_argument( "--retry_max_time", type=float, - help="The maximum total amount of time before the client stops trying to" - "reconnect to the server before giving up in case of connection error." + help="The maximum duration before the client stops trying to" + "connect to the server in case of connection error." "If set to None, there is no limit to the total time.", ) parser.add_argument( @@ -237,12 +237,12 @@ class `flwr.client.Client` (default: None) - 'grpc-rere': gRPC, request-response (experimental) - 'rest': HTTP (experimental) retry_max_tries: Optional[int] - The maximum number of times the client will try to reconnect to the + The maximum number of times the client will try to connect to the server before giving up in case of a connection error. If set to None, there is no limit to the number of tries. retry_max_time: Optional[float] - The maximum total amount of time before the client stops trying to - reconnect to the server before giving up in case of connection error. + The maximum duration before the client stops trying to + connect to the server in case of connection error. If set to None, there is no limit to the total time. Examples @@ -343,12 +343,12 @@ class `flwr.client.Client` (default: None) - 'grpc-rere': gRPC, request-response (experimental) - 'rest': HTTP (experimental) retry_max_tries: Optional[int] - The maximum number of times the client will try to reconnect to the + The maximum number of times the client will try to connect to the server before giving up in case of a connection error. If set to None, there is no limit to the number of tries. retry_max_time: Optional[float] - The maximum total amount of time before the client stops trying to - reconnect to the server before giving up in case of connection error. + The maximum duration before the client stops trying to + connect to the server in case of connection error. If set to None, there is no limit to the total time. """ if insecure is None: From 0ba49e367261b283e07e88b8439583971d2a9458 Mon Sep 17 00:00:00 2001 From: Charles Beauville Date: Thu, 29 Feb 2024 11:48:58 +0100 Subject: [PATCH 17/22] Improve logging --- src/py/flwr/client/app.py | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/src/py/flwr/client/app.py b/src/py/flwr/client/app.py index 98a57204a7d..465ae233520 100644 --- a/src/py/flwr/client/app.py +++ b/src/py/flwr/client/app.py @@ -390,7 +390,7 @@ def _load_client_app() -> ClientApp: max_time=retry_max_time, on_giveup=lambda retry_state: ( log( - DEBUG, + WARN, "Giving up reconnection after %.2f seconds and %s tries.", retry_state.elapsed_time, retry_state.tries, @@ -400,18 +400,22 @@ def _load_client_app() -> ClientApp: ), on_success=lambda retry_state: ( log( - DEBUG, - "Reconnection successful after %.2f seconds and %s tries.", + INFO, + "Connection successful after %.2f seconds and %s tries.", retry_state.elapsed_time, retry_state.tries, ) if retry_state.tries > 1 else None ), - on_backoff=lambda retry_state: log( - DEBUG, - "Reconnection attempt failed, retrying in %.2f seconds", - retry_state.actual_wait, + on_backoff=lambda retry_state: ( + log(WARN, "Connection attempt failed, retrying...") + if retry_state.tries == 1 + else log( + DEBUG, + "Connection attempt failed, retrying in %.2f seconds", + retry_state.actual_wait, + ) ), ) From 9983cd62cddb4bd8676ed0412dccc453ccf3c11a Mon Sep 17 00:00:00 2001 From: Charles Beauville Date: Mon, 4 Mar 2024 21:42:37 +0100 Subject: [PATCH 18/22] Use better names --- src/py/flwr/client/app.py | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/src/py/flwr/client/app.py b/src/py/flwr/client/app.py index 465ae233520..1d2d0320299 100644 --- a/src/py/flwr/client/app.py +++ b/src/py/flwr/client/app.py @@ -108,8 +108,8 @@ def _load() -> ClientApp: transport="rest" if args.rest else "grpc-rere", root_certificates=root_certificates, insecure=args.insecure, - retry_max_tries=args.retry_max_tries, - retry_max_time=args.retry_max_time, + max_retries=args.max_retries, + max_wait_time=args.max_wait_time, ) register_exit_handlers(event_type=EventType.RUN_CLIENT_APP_LEAVE) @@ -148,7 +148,7 @@ def _parse_args_run_client_app() -> argparse.ArgumentParser: help="Server address", ) parser.add_argument( - "--retry_max_tries", + "--max-retries", type=int, default=1, help="The maximum number of times the client will try to connect to the" @@ -156,7 +156,7 @@ def _parse_args_run_client_app() -> argparse.ArgumentParser: "there is no limit to the number of tries.", ) parser.add_argument( - "--retry_max_time", + "--max-wait-time", type=float, help="The maximum duration before the client stops trying to" "connect to the server in case of connection error." @@ -201,8 +201,8 @@ def start_client( root_certificates: Optional[Union[bytes, str]] = None, insecure: Optional[bool] = None, transport: Optional[str] = None, - retry_max_tries: Optional[int] = 1, - retry_max_time: Optional[float] = None, + max_retries: Optional[int] = 1, + max_wait_time: Optional[float] = None, ) -> None: """Start a Flower client node which connects to a Flower server. @@ -236,11 +236,11 @@ class `flwr.client.Client` (default: None) - 'grpc-bidi': gRPC, bidirectional streaming - 'grpc-rere': gRPC, request-response (experimental) - 'rest': HTTP (experimental) - retry_max_tries: Optional[int] + max_retries: Optional[int] The maximum number of times the client will try to connect to the server before giving up in case of a connection error. If set to None, there is no limit to the number of tries. - retry_max_time: Optional[float] + max_wait_time: Optional[float] The maximum duration before the client stops trying to connect to the server in case of connection error. If set to None, there is no limit to the total time. @@ -285,8 +285,8 @@ class `flwr.client.Client` (default: None) root_certificates=root_certificates, insecure=insecure, transport=transport, - retry_max_tries=retry_max_tries, - retry_max_time=retry_max_time, + max_retries=max_retries, + max_wait_time=max_wait_time, ) event(EventType.START_CLIENT_LEAVE) @@ -305,8 +305,8 @@ def _start_client_internal( root_certificates: Optional[Union[bytes, str]] = None, insecure: Optional[bool] = None, transport: Optional[str] = None, - retry_max_tries: Optional[int] = 1, - retry_max_time: Optional[float] = None, + max_retries: Optional[int] = 1, + max_wait_time: Optional[float] = None, ) -> None: """Start a Flower client node which connects to a Flower server. @@ -342,11 +342,11 @@ class `flwr.client.Client` (default: None) - 'grpc-bidi': gRPC, bidirectional streaming - 'grpc-rere': gRPC, request-response (experimental) - 'rest': HTTP (experimental) - retry_max_tries: Optional[int] + max_retries: Optional[int] The maximum number of times the client will try to connect to the server before giving up in case of a connection error. If set to None, there is no limit to the number of tries. - retry_max_time: Optional[float] + max_wait_time: Optional[float] The maximum duration before the client stops trying to connect to the server in case of connection error. If set to None, there is no limit to the total time. @@ -386,8 +386,8 @@ def _load_client_app() -> ClientApp: retry_invoker = RetryInvoker( exponential, RequestsConnectionError if transport == "rest" else RpcError, - max_tries=retry_max_tries, - max_time=retry_max_time, + max_tries=max_retries, + max_time=max_wait_time, on_giveup=lambda retry_state: ( log( WARN, From 5ed73480f8f0d4f21577629cb892bfe7f2645a75 Mon Sep 17 00:00:00 2001 From: Charles Beauville Date: Tue, 5 Mar 2024 14:27:39 +0100 Subject: [PATCH 19/22] Apply suggestions from code review Co-authored-by: Daniel J. Beutel --- src/py/flwr/client/app.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/py/flwr/client/app.py b/src/py/flwr/client/app.py index 1d2d0320299..2a4e67678a9 100644 --- a/src/py/flwr/client/app.py +++ b/src/py/flwr/client/app.py @@ -236,11 +236,11 @@ class `flwr.client.Client` (default: None) - 'grpc-bidi': gRPC, bidirectional streaming - 'grpc-rere': gRPC, request-response (experimental) - 'rest': HTTP (experimental) - max_retries: Optional[int] + max_retries: Optional[int] (default: 1) The maximum number of times the client will try to connect to the server before giving up in case of a connection error. If set to None, there is no limit to the number of tries. - max_wait_time: Optional[float] + max_wait_time: Optional[float] (default: None) The maximum duration before the client stops trying to connect to the server in case of connection error. If set to None, there is no limit to the total time. @@ -342,11 +342,11 @@ class `flwr.client.Client` (default: None) - 'grpc-bidi': gRPC, bidirectional streaming - 'grpc-rere': gRPC, request-response (experimental) - 'rest': HTTP (experimental) - max_retries: Optional[int] + max_retries: Optional[int] (default: 1) The maximum number of times the client will try to connect to the server before giving up in case of a connection error. If set to None, there is no limit to the number of tries. - max_wait_time: Optional[float] + max_wait_time: Optional[float] (default: None) The maximum duration before the client stops trying to connect to the server in case of connection error. If set to None, there is no limit to the total time. From d72580f66b8146a488e0f4f1b02d8a3cacb0b9a5 Mon Sep 17 00:00:00 2001 From: Charles Beauville Date: Tue, 5 Mar 2024 14:41:35 +0100 Subject: [PATCH 20/22] Make infinite retries the default and solve imports --- src/py/flwr/client/app.py | 39 ++++++++++++++++++++++----------------- 1 file changed, 22 insertions(+), 17 deletions(-) diff --git a/src/py/flwr/client/app.py b/src/py/flwr/client/app.py index 2a4e67678a9..f7034402c06 100644 --- a/src/py/flwr/client/app.py +++ b/src/py/flwr/client/app.py @@ -23,7 +23,6 @@ from typing import Callable, ContextManager, Optional, Tuple, Union from grpc import RpcError -from requests.exceptions import ConnectionError as RequestsConnectionError from flwr.client.client import Client from flwr.client.client_app import ClientApp @@ -150,17 +149,18 @@ def _parse_args_run_client_app() -> argparse.ArgumentParser: parser.add_argument( "--max-retries", type=int, - default=1, + default=None, help="The maximum number of times the client will try to connect to the" - "server before giving up in case of a connection error. If set to None," - "there is no limit to the number of tries.", + "server before giving up in case of a connection error. By default," + "it is set to None, meaning there is no limit to the number of tries.", ) parser.add_argument( "--max-wait-time", type=float, + default=None, help="The maximum duration before the client stops trying to" - "connect to the server in case of connection error." - "If set to None, there is no limit to the total time.", + "connect to the server in case of connection error. By default, it" + "is set to None, meaning there is no limit to the total time.", ) parser.add_argument( "--dir", @@ -201,7 +201,7 @@ def start_client( root_certificates: Optional[Union[bytes, str]] = None, insecure: Optional[bool] = None, transport: Optional[str] = None, - max_retries: Optional[int] = 1, + max_retries: Optional[int] = None, max_wait_time: Optional[float] = None, ) -> None: """Start a Flower client node which connects to a Flower server. @@ -236,7 +236,7 @@ class `flwr.client.Client` (default: None) - 'grpc-bidi': gRPC, bidirectional streaming - 'grpc-rere': gRPC, request-response (experimental) - 'rest': HTTP (experimental) - max_retries: Optional[int] (default: 1) + max_retries: Optional[int] (default: None) The maximum number of times the client will try to connect to the server before giving up in case of a connection error. If set to None, there is no limit to the number of tries. @@ -305,7 +305,7 @@ def _start_client_internal( root_certificates: Optional[Union[bytes, str]] = None, insecure: Optional[bool] = None, transport: Optional[str] = None, - max_retries: Optional[int] = 1, + max_retries: Optional[int] = None, max_wait_time: Optional[float] = None, ) -> None: """Start a Flower client node which connects to a Flower server. @@ -342,7 +342,7 @@ class `flwr.client.Client` (default: None) - 'grpc-bidi': gRPC, bidirectional streaming - 'grpc-rere': gRPC, request-response (experimental) - 'rest': HTTP (experimental) - max_retries: Optional[int] (default: 1) + max_retries: Optional[int] (default: None) The maximum number of times the client will try to connect to the server before giving up in case of a connection error. If set to None, there is no limit to the number of tries. @@ -381,11 +381,13 @@ def _load_client_app() -> ClientApp: # Both `client` and `client_fn` must not be used directly # Initialize connection context manager - connection, address = _init_connection(transport, server_address) + connection, address, connection_error_type = _init_connection( + transport, server_address + ) retry_invoker = RetryInvoker( - exponential, - RequestsConnectionError if transport == "rest" else RpcError, + wait_factory=exponential, + recoverable_exceptions=connection_error_type, max_tries=max_retries, max_time=max_wait_time, on_giveup=lambda retry_state: ( @@ -600,6 +602,7 @@ def _init_connection(transport: Optional[str], server_address: str) -> Tuple[ ], ], str, + type[Exception], ]: # Parse IP address parsed_address = parse_address(server_address) @@ -615,6 +618,8 @@ def _init_connection(transport: Optional[str], server_address: str) -> Tuple[ # Use either gRPC bidirectional streaming or REST request/response if transport == TRANSPORT_TYPE_REST: try: + from requests.exceptions import ConnectionError as RequestsConnectionError + from .rest_client.connection import http_request_response except ModuleNotFoundError: sys.exit(MISSING_EXTRA_REST) @@ -623,14 +628,14 @@ def _init_connection(transport: Optional[str], server_address: str) -> Tuple[ "When using the REST API, please provide `https://` or " "`http://` before the server address (e.g. `http://127.0.0.1:8080`)" ) - connection = http_request_response + connection, error_type = http_request_response, RequestsConnectionError elif transport == TRANSPORT_TYPE_GRPC_RERE: - connection = grpc_request_response + connection, error_type = grpc_request_response, RpcError elif transport == TRANSPORT_TYPE_GRPC_BIDI: - connection = grpc_connection + connection, error_type = grpc_connection, RpcError else: raise ValueError( f"Unknown transport type: {transport} (possible: {TRANSPORT_TYPES})" ) - return connection, address + return connection, address, error_type From d6585101b6dfec8ef78101716ff46a96b05a082d Mon Sep 17 00:00:00 2001 From: Charles Beauville Date: Tue, 5 Mar 2024 14:45:20 +0100 Subject: [PATCH 21/22] Use correct Type --- src/py/flwr/client/app.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/py/flwr/client/app.py b/src/py/flwr/client/app.py index f7034402c06..f333698ac72 100644 --- a/src/py/flwr/client/app.py +++ b/src/py/flwr/client/app.py @@ -602,7 +602,7 @@ def _init_connection(transport: Optional[str], server_address: str) -> Tuple[ ], ], str, - type[Exception], + Type[Exception], ]: # Parse IP address parsed_address = parse_address(server_address) From 56ebc0c187535cf023ee02de044ff84736505e44 Mon Sep 17 00:00:00 2001 From: Charles Beauville Date: Tue, 5 Mar 2024 14:49:27 +0100 Subject: [PATCH 22/22] Add necessary import --- src/py/flwr/client/app.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/py/flwr/client/app.py b/src/py/flwr/client/app.py index f333698ac72..43781776f78 100644 --- a/src/py/flwr/client/app.py +++ b/src/py/flwr/client/app.py @@ -20,7 +20,7 @@ import time from logging import DEBUG, INFO, WARN from pathlib import Path -from typing import Callable, ContextManager, Optional, Tuple, Union +from typing import Callable, ContextManager, Optional, Tuple, Type, Union from grpc import RpcError