From 64682df42168d7cfa7f4fb32bde30db0d3496855 Mon Sep 17 00:00:00 2001 From: Chong Shen Ng Date: Mon, 17 Jun 2024 22:12:24 +0100 Subject: [PATCH] feat(framework) Update proto files for SuperExec logstream (#3622) --- src/proto/flwr/proto/exec.proto | 5 ++++ src/py/flwr/proto/exec_pb2.py | 10 +++++--- src/py/flwr/proto/exec_pb2.pyi | 23 +++++++++++++++++ src/py/flwr/proto/exec_pb2_grpc.py | 34 ++++++++++++++++++++++++++ src/py/flwr/proto/exec_pb2_grpc.pyi | 14 +++++++++++ src/py/flwr/superexec/exec_servicer.py | 13 +++++++++- 6 files changed, 95 insertions(+), 4 deletions(-) diff --git a/src/proto/flwr/proto/exec.proto b/src/proto/flwr/proto/exec.proto index 05885c9ceed..8e5f53b02ca 100644 --- a/src/proto/flwr/proto/exec.proto +++ b/src/proto/flwr/proto/exec.proto @@ -20,7 +20,12 @@ package flwr.proto; service Exec { // Start run upon request rpc StartRun(StartRunRequest) returns (StartRunResponse) {} + + // Start log stream upon request + rpc StreamLogs(StreamLogsRequest) returns (stream StreamLogsResponse) {} } message StartRunRequest { bytes fab_file = 1; } message StartRunResponse { sint64 run_id = 1; } +message StreamLogsRequest { sint64 run_id = 1; } +message StreamLogsResponse { string log_output = 1; } diff --git a/src/py/flwr/proto/exec_pb2.py b/src/py/flwr/proto/exec_pb2.py index a1d1f24af7d..7b037a9454c 100644 --- a/src/py/flwr/proto/exec_pb2.py +++ b/src/py/flwr/proto/exec_pb2.py @@ -14,7 +14,7 @@ -DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x15\x66lwr/proto/exec.proto\x12\nflwr.proto\"#\n\x0fStartRunRequest\x12\x10\n\x08\x66\x61\x62_file\x18\x01 \x01(\x0c\"\"\n\x10StartRunResponse\x12\x0e\n\x06run_id\x18\x01 \x01(\x12\x32O\n\x04\x45xec\x12G\n\x08StartRun\x12\x1b.flwr.proto.StartRunRequest\x1a\x1c.flwr.proto.StartRunResponse\"\x00\x62\x06proto3') +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\x15\x66lwr/proto/exec.proto\x12\nflwr.proto\"#\n\x0fStartRunRequest\x12\x10\n\x08\x66\x61\x62_file\x18\x01 \x01(\x0c\"\"\n\x10StartRunResponse\x12\x0e\n\x06run_id\x18\x01 \x01(\x12\"#\n\x11StreamLogsRequest\x12\x0e\n\x06run_id\x18\x01 \x01(\x12\"(\n\x12StreamLogsResponse\x12\x12\n\nlog_output\x18\x01 \x01(\t2\xa0\x01\n\x04\x45xec\x12G\n\x08StartRun\x12\x1b.flwr.proto.StartRunRequest\x1a\x1c.flwr.proto.StartRunResponse\"\x00\x12O\n\nStreamLogs\x12\x1d.flwr.proto.StreamLogsRequest\x1a\x1e.flwr.proto.StreamLogsResponse\"\x00\x30\x01\x62\x06proto3') _globals = globals() _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) @@ -25,6 +25,10 @@ _globals['_STARTRUNREQUEST']._serialized_end=72 _globals['_STARTRUNRESPONSE']._serialized_start=74 _globals['_STARTRUNRESPONSE']._serialized_end=108 - _globals['_EXEC']._serialized_start=110 - _globals['_EXEC']._serialized_end=189 + _globals['_STREAMLOGSREQUEST']._serialized_start=110 + _globals['_STREAMLOGSREQUEST']._serialized_end=145 + _globals['_STREAMLOGSRESPONSE']._serialized_start=147 + _globals['_STREAMLOGSRESPONSE']._serialized_end=187 + _globals['_EXEC']._serialized_start=190 + _globals['_EXEC']._serialized_end=350 # @@protoc_insertion_point(module_scope) diff --git a/src/py/flwr/proto/exec_pb2.pyi b/src/py/flwr/proto/exec_pb2.pyi index 8a0122062dc..466812808da 100644 --- a/src/py/flwr/proto/exec_pb2.pyi +++ b/src/py/flwr/proto/exec_pb2.pyi @@ -5,6 +5,7 @@ isort:skip_file import builtins import google.protobuf.descriptor import google.protobuf.message +import typing import typing_extensions DESCRIPTOR: google.protobuf.descriptor.FileDescriptor @@ -30,3 +31,25 @@ class StartRunResponse(google.protobuf.message.Message): ) -> None: ... def ClearField(self, field_name: typing_extensions.Literal["run_id",b"run_id"]) -> None: ... global___StartRunResponse = StartRunResponse + +class StreamLogsRequest(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + RUN_ID_FIELD_NUMBER: builtins.int + run_id: builtins.int + def __init__(self, + *, + run_id: builtins.int = ..., + ) -> None: ... + def ClearField(self, field_name: typing_extensions.Literal["run_id",b"run_id"]) -> None: ... +global___StreamLogsRequest = StreamLogsRequest + +class StreamLogsResponse(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + LOG_OUTPUT_FIELD_NUMBER: builtins.int + log_output: typing.Text + def __init__(self, + *, + log_output: typing.Text = ..., + ) -> None: ... + def ClearField(self, field_name: typing_extensions.Literal["log_output",b"log_output"]) -> None: ... +global___StreamLogsResponse = StreamLogsResponse diff --git a/src/py/flwr/proto/exec_pb2_grpc.py b/src/py/flwr/proto/exec_pb2_grpc.py index 349148eb992..8cf4ce52a30 100644 --- a/src/py/flwr/proto/exec_pb2_grpc.py +++ b/src/py/flwr/proto/exec_pb2_grpc.py @@ -19,6 +19,11 @@ def __init__(self, channel): request_serializer=flwr_dot_proto_dot_exec__pb2.StartRunRequest.SerializeToString, response_deserializer=flwr_dot_proto_dot_exec__pb2.StartRunResponse.FromString, ) + self.StreamLogs = channel.unary_stream( + '/flwr.proto.Exec/StreamLogs', + request_serializer=flwr_dot_proto_dot_exec__pb2.StreamLogsRequest.SerializeToString, + response_deserializer=flwr_dot_proto_dot_exec__pb2.StreamLogsResponse.FromString, + ) class ExecServicer(object): @@ -31,6 +36,13 @@ def StartRun(self, request, context): context.set_details('Method not implemented!') raise NotImplementedError('Method not implemented!') + def StreamLogs(self, request, context): + """Start log stream upon request + """ + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details('Method not implemented!') + raise NotImplementedError('Method not implemented!') + def add_ExecServicer_to_server(servicer, server): rpc_method_handlers = { @@ -39,6 +51,11 @@ def add_ExecServicer_to_server(servicer, server): request_deserializer=flwr_dot_proto_dot_exec__pb2.StartRunRequest.FromString, response_serializer=flwr_dot_proto_dot_exec__pb2.StartRunResponse.SerializeToString, ), + 'StreamLogs': grpc.unary_stream_rpc_method_handler( + servicer.StreamLogs, + request_deserializer=flwr_dot_proto_dot_exec__pb2.StreamLogsRequest.FromString, + response_serializer=flwr_dot_proto_dot_exec__pb2.StreamLogsResponse.SerializeToString, + ), } generic_handler = grpc.method_handlers_generic_handler( 'flwr.proto.Exec', rpc_method_handlers) @@ -65,3 +82,20 @@ def StartRun(request, flwr_dot_proto_dot_exec__pb2.StartRunResponse.FromString, options, channel_credentials, insecure, call_credentials, compression, wait_for_ready, timeout, metadata) + + @staticmethod + def StreamLogs(request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None): + return grpc.experimental.unary_stream(request, target, '/flwr.proto.Exec/StreamLogs', + flwr_dot_proto_dot_exec__pb2.StreamLogsRequest.SerializeToString, + flwr_dot_proto_dot_exec__pb2.StreamLogsResponse.FromString, + options, channel_credentials, + insecure, call_credentials, compression, wait_for_ready, timeout, metadata) diff --git a/src/py/flwr/proto/exec_pb2_grpc.pyi b/src/py/flwr/proto/exec_pb2_grpc.pyi index 6cab594babd..20da3a53f4a 100644 --- a/src/py/flwr/proto/exec_pb2_grpc.pyi +++ b/src/py/flwr/proto/exec_pb2_grpc.pyi @@ -5,6 +5,7 @@ isort:skip_file import abc import flwr.proto.exec_pb2 import grpc +import typing class ExecStub: def __init__(self, channel: grpc.Channel) -> None: ... @@ -13,6 +14,11 @@ class ExecStub: flwr.proto.exec_pb2.StartRunResponse] """Start run upon request""" + StreamLogs: grpc.UnaryStreamMultiCallable[ + flwr.proto.exec_pb2.StreamLogsRequest, + flwr.proto.exec_pb2.StreamLogsResponse] + """Start log stream upon request""" + class ExecServicer(metaclass=abc.ABCMeta): @abc.abstractmethod @@ -23,5 +29,13 @@ class ExecServicer(metaclass=abc.ABCMeta): """Start run upon request""" pass + @abc.abstractmethod + def StreamLogs(self, + request: flwr.proto.exec_pb2.StreamLogsRequest, + context: grpc.ServicerContext, + ) -> typing.Iterator[flwr.proto.exec_pb2.StreamLogsResponse]: + """Start log stream upon request""" + pass + def add_ExecServicer_to_server(servicer: ExecServicer, server: grpc.Server) -> None: ... diff --git a/src/py/flwr/superexec/exec_servicer.py b/src/py/flwr/superexec/exec_servicer.py index aa8172c1870..e5ef2bd59a7 100644 --- a/src/py/flwr/superexec/exec_servicer.py +++ b/src/py/flwr/superexec/exec_servicer.py @@ -16,7 +16,7 @@ from logging import ERROR, INFO -from typing import Dict +from typing import Any, Dict, Generator import grpc @@ -25,6 +25,8 @@ from flwr.proto.exec_pb2 import ( # pylint: disable=E0611 StartRunRequest, StartRunResponse, + StreamLogsRequest, + StreamLogsResponse, ) from .executor import Executor, RunTracker @@ -52,3 +54,12 @@ def StartRun( self.runs[run.run_id] = run return StartRunResponse(run_id=run.run_id) + + def StreamLogs( + self, request: StreamLogsRequest, context: grpc.ServicerContext + ) -> Generator[StreamLogsResponse, Any, None]: + """Get logs.""" + logs = ["a", "b", "c"] + while context.is_active(): + for i in range(len(logs)): # pylint: disable=C0200 + yield StreamLogsResponse(log_output=logs[i])