Skip to content

Commit

Permalink
feat(framework) Update proto files for SuperExec logstream (#3622)
Browse files Browse the repository at this point in the history
  • Loading branch information
chongshenng authored Jun 17, 2024
1 parent 8ea769c commit 64682df
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 4 deletions.
5 changes: 5 additions & 0 deletions src/proto/flwr/proto/exec.proto
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,12 @@ package flwr.proto;
service Exec {
// Start run upon request
rpc StartRun(StartRunRequest) returns (StartRunResponse) {}

// Start log stream upon request
rpc StreamLogs(StreamLogsRequest) returns (stream StreamLogsResponse) {}
}

message StartRunRequest { bytes fab_file = 1; }
message StartRunResponse { sint64 run_id = 1; }
message StreamLogsRequest { sint64 run_id = 1; }
message StreamLogsResponse { string log_output = 1; }
10 changes: 7 additions & 3 deletions src/py/flwr/proto/exec_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 23 additions & 0 deletions src/py/flwr/proto/exec_pb2.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ isort:skip_file
import builtins
import google.protobuf.descriptor
import google.protobuf.message
import typing
import typing_extensions

DESCRIPTOR: google.protobuf.descriptor.FileDescriptor
Expand All @@ -30,3 +31,25 @@ class StartRunResponse(google.protobuf.message.Message):
) -> None: ...
def ClearField(self, field_name: typing_extensions.Literal["run_id",b"run_id"]) -> None: ...
global___StartRunResponse = StartRunResponse

class StreamLogsRequest(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
RUN_ID_FIELD_NUMBER: builtins.int
run_id: builtins.int
def __init__(self,
*,
run_id: builtins.int = ...,
) -> None: ...
def ClearField(self, field_name: typing_extensions.Literal["run_id",b"run_id"]) -> None: ...
global___StreamLogsRequest = StreamLogsRequest

class StreamLogsResponse(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor
LOG_OUTPUT_FIELD_NUMBER: builtins.int
log_output: typing.Text
def __init__(self,
*,
log_output: typing.Text = ...,
) -> None: ...
def ClearField(self, field_name: typing_extensions.Literal["log_output",b"log_output"]) -> None: ...
global___StreamLogsResponse = StreamLogsResponse
34 changes: 34 additions & 0 deletions src/py/flwr/proto/exec_pb2_grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ def __init__(self, channel):
request_serializer=flwr_dot_proto_dot_exec__pb2.StartRunRequest.SerializeToString,
response_deserializer=flwr_dot_proto_dot_exec__pb2.StartRunResponse.FromString,
)
self.StreamLogs = channel.unary_stream(
'/flwr.proto.Exec/StreamLogs',
request_serializer=flwr_dot_proto_dot_exec__pb2.StreamLogsRequest.SerializeToString,
response_deserializer=flwr_dot_proto_dot_exec__pb2.StreamLogsResponse.FromString,
)


class ExecServicer(object):
Expand All @@ -31,6 +36,13 @@ def StartRun(self, request, context):
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')

def StreamLogs(self, request, context):
"""Start log stream upon request
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')


def add_ExecServicer_to_server(servicer, server):
rpc_method_handlers = {
Expand All @@ -39,6 +51,11 @@ def add_ExecServicer_to_server(servicer, server):
request_deserializer=flwr_dot_proto_dot_exec__pb2.StartRunRequest.FromString,
response_serializer=flwr_dot_proto_dot_exec__pb2.StartRunResponse.SerializeToString,
),
'StreamLogs': grpc.unary_stream_rpc_method_handler(
servicer.StreamLogs,
request_deserializer=flwr_dot_proto_dot_exec__pb2.StreamLogsRequest.FromString,
response_serializer=flwr_dot_proto_dot_exec__pb2.StreamLogsResponse.SerializeToString,
),
}
generic_handler = grpc.method_handlers_generic_handler(
'flwr.proto.Exec', rpc_method_handlers)
Expand All @@ -65,3 +82,20 @@ def StartRun(request,
flwr_dot_proto_dot_exec__pb2.StartRunResponse.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)

@staticmethod
def StreamLogs(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_stream(request, target, '/flwr.proto.Exec/StreamLogs',
flwr_dot_proto_dot_exec__pb2.StreamLogsRequest.SerializeToString,
flwr_dot_proto_dot_exec__pb2.StreamLogsResponse.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
14 changes: 14 additions & 0 deletions src/py/flwr/proto/exec_pb2_grpc.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ isort:skip_file
import abc
import flwr.proto.exec_pb2
import grpc
import typing

class ExecStub:
def __init__(self, channel: grpc.Channel) -> None: ...
Expand All @@ -13,6 +14,11 @@ class ExecStub:
flwr.proto.exec_pb2.StartRunResponse]
"""Start run upon request"""

StreamLogs: grpc.UnaryStreamMultiCallable[
flwr.proto.exec_pb2.StreamLogsRequest,
flwr.proto.exec_pb2.StreamLogsResponse]
"""Start log stream upon request"""


class ExecServicer(metaclass=abc.ABCMeta):
@abc.abstractmethod
Expand All @@ -23,5 +29,13 @@ class ExecServicer(metaclass=abc.ABCMeta):
"""Start run upon request"""
pass

@abc.abstractmethod
def StreamLogs(self,
request: flwr.proto.exec_pb2.StreamLogsRequest,
context: grpc.ServicerContext,
) -> typing.Iterator[flwr.proto.exec_pb2.StreamLogsResponse]:
"""Start log stream upon request"""
pass


def add_ExecServicer_to_server(servicer: ExecServicer, server: grpc.Server) -> None: ...
13 changes: 12 additions & 1 deletion src/py/flwr/superexec/exec_servicer.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@


from logging import ERROR, INFO
from typing import Dict
from typing import Any, Dict, Generator

import grpc

Expand All @@ -25,6 +25,8 @@
from flwr.proto.exec_pb2 import ( # pylint: disable=E0611
StartRunRequest,
StartRunResponse,
StreamLogsRequest,
StreamLogsResponse,
)

from .executor import Executor, RunTracker
Expand Down Expand Up @@ -52,3 +54,12 @@ def StartRun(
self.runs[run.run_id] = run

return StartRunResponse(run_id=run.run_id)

def StreamLogs(
self, request: StreamLogsRequest, context: grpc.ServicerContext
) -> Generator[StreamLogsResponse, Any, None]:
"""Get logs."""
logs = ["a", "b", "c"]
while context.is_active():
for i in range(len(logs)): # pylint: disable=C0200
yield StreamLogsResponse(log_output=logs[i])

0 comments on commit 64682df

Please sign in to comment.