diff --git a/dune-project b/dune-project index 9856ec7..cf3a55e 100644 --- a/dune-project +++ b/dune-project @@ -29,7 +29,8 @@ (synopsis "A modular gRPC library") (description "This library builds some of the signatures and implementations of gRPC functionality. This is used in the more specialised package `grpc-lwt` which has more machinery, however this library can also be used to do some bits yourself.") - (tags (network rpc serialisation)) + (tags + (network rpc serialisation)) (depends (ocaml (>= 4.08)) @@ -44,7 +45,8 @@ (synopsis "An Lwt implementation of gRPC") (description "Functionality for building gRPC services and rpcs with `lwt`.") - (tags (network rpc serialisation)) + (tags + (network rpc serialisation)) (depends (grpc (= :version)) @@ -57,31 +59,61 @@ (synopsis "An Async implementation of gRPC") (description "Functionality for building gRPC services and rpcs with `async`.") - (tags (network rpc serialisation)) + (tags + (network rpc serialisation)) (depends (ocaml (>= 4.11)) (grpc (= :version)) - (async (>= v0.16)) + (async + (>= v0.16)) stringext)) (package (name grpc-eio) (synopsis "An Eio implementation of gRPC") (description - "Functionality for building gRPC services and rpcs with `eio`.") + "Functionality for building gRPC services and rpcs with `eio`.") (depends (grpc (= :version)) - (eio (>= 0.12)) - stringext)) + (eio + (>= 0.12)) + stringext)) + +(package + (name grpc-protoc-plugin) + (synopsis "An implementation of gRPC using ocaml-protoc-plugin") + (description + "Functionality for building gRPC services and rpcs with `ocaml-protoc-plugin`") + (depends + (grpc + (= :version)) + (ocaml-protoc-plugin + (>= 4.5)))) + +(package + (name grpc-protoc) + (synopsis "An implementation of gRPC using ocaml-protoc") + (description + "Functionality for building gRPC services and rpcs with `ocaml-protoc`") + (depends + (grpc + (= :version)) + (ocaml-protoc + (>= 3.0)) + (pbrt + (>= 3.0)) + (pbrt_services + (>= 3.0)))) (package (name grpc-examples) (synopsis "Various gRPC examples") (description "Various gRPC examples.") - (tags (network rpc serialisation)) + (tags + (network rpc serialisation)) (depends grpc-lwt h2-lwt-unix @@ -89,25 +121,36 @@ h2-async grpc-eio h2-eio - (ocaml-protoc-plugin (>= 4.5)) + (ocaml-protoc-plugin + (>= 4.5)) ppx_deriving_yojson conduit-lwt-unix cohttp-lwt-unix tls-async - (lwt_ssl (>= 1.2.0)) - (mdx (and (>= 2.2.1) :with-test)) - (eio_main (>= 0.12)) + (lwt_ssl + (>= 1.2.0)) + (mdx + (and + (>= 2.2.1) + :with-test)) + (eio_main + (>= 0.12)) stringext)) (package (name grpc-bench) (synopsis "Benchmarking package for gRPC") (description "Benchmarking package for gRPC.") - (tags (network rpc serialisation benchmark)) + (tags + (network rpc serialisation benchmark)) (depends grpc - (bechamel(>= 0.4.0)) + (bechamel + (>= 0.4.0)) notty - (bechamel-notty (>= 0.4.0)) - (bigstringaf (>= 0.9.1)) - (notty (>= 0.2.3)))) + (bechamel-notty + (>= 0.4.0)) + (bigstringaf + (>= 0.9.1)) + (notty + (>= 0.2.3)))) diff --git a/examples/greeter-client-eio/dune b/examples/greeter-client-eio/dune index 37f97bc..40151b3 100644 --- a/examples/greeter-client-eio/dune +++ b/examples/greeter-client-eio/dune @@ -1,3 +1,3 @@ (executable (name greeter_client_eio) - (libraries grpc grpc-eio ocaml-protoc-plugin eio_main greeter h2 h2-eio)) + (libraries grpc grpc-eio grpc-protoc-plugin eio_main greeter h2 h2-eio)) diff --git a/examples/greeter-client-eio/greeter_client_eio.ml b/examples/greeter-client-eio/greeter_client_eio.ml index c8b0530..bd19265 100644 --- a/examples/greeter-client-eio/greeter_client_eio.ml +++ b/examples/greeter-client-eio/greeter_client_eio.ml @@ -19,31 +19,23 @@ let main env = H2_eio.Client.create_connection ~sw ~error_handler:ignore socket in - let open Ocaml_protoc_plugin in let open Greeter.Mypackage in - let encode, decode = Service.make_client_functions Greeter.sayHello in - let encoded_request = - HelloRequest.make ~name () |> encode |> Writer.contents - in + let request = HelloRequest.make ~name () in - let f decoder = - match decoder with - | Some decoder -> ( - Reader.create decoder |> decode |> function - | Ok v -> v - | Error e -> - failwith - (Printf.sprintf "Could not decode request: %s" - (Result.show_error e))) + let f response = + match response with + | Some response -> response | None -> Greeter.SayHello.Response.make () in let result = - Grpc_eio.Client.call ~service:"mypackage.Greeter" ~rpc:"SayHello" + Grpc_eio.Client.Typed_rpc.call + (Grpc_protoc_plugin.Client_rpc.unary (module Greeter.SayHello)) ~do_request:(H2_eio.Client.request connection ~error_handler:ignore) - ~handler:(Grpc_eio.Client.Rpc.unary encoded_request ~f) + ~handler:(Grpc_eio.Client.Typed_rpc.unary request ~f) () in + Eio.Promise.await (H2_eio.Client.shutdown connection); result in diff --git a/examples/greeter-protoc-client-eio/dune b/examples/greeter-protoc-client-eio/dune new file mode 100644 index 0000000..9ba8a85 --- /dev/null +++ b/examples/greeter-protoc-client-eio/dune @@ -0,0 +1,3 @@ +(executable + (name greeter_client_eio) + (libraries grpc grpc-eio grpc-protoc eio_main greeter_protoc h2 h2-eio)) diff --git a/examples/greeter-protoc-client-eio/greeter_client_eio.ml b/examples/greeter-protoc-client-eio/greeter_client_eio.ml new file mode 100644 index 0000000..5fc1521 --- /dev/null +++ b/examples/greeter-protoc-client-eio/greeter_client_eio.ml @@ -0,0 +1,48 @@ +let main env = + let name = if Array.length Sys.argv > 1 then Sys.argv.(1) else "anonymous" in + let host = "localhost" in + let port = "8080" in + let network = Eio.Stdenv.net env in + let run sw = + let inet, port = + Eio_unix.run_in_systhread (fun () -> + Unix.getaddrinfo host port [ Unix.(AI_FAMILY PF_INET) ]) + |> List.filter_map (fun (addr : Unix.addr_info) -> + match addr.ai_addr with + | Unix.ADDR_UNIX _ -> None + | ADDR_INET (addr, port) -> Some (addr, port)) + |> List.hd + in + let addr = `Tcp (Eio_unix.Net.Ipaddr.of_unix inet, port) in + let socket = Eio.Net.connect ~sw network addr in + let connection = + H2_eio.Client.create_connection ~sw ~error_handler:ignore socket + in + + let request = Greeter_protoc.Greeter.default_hello_request ~name () in + + let f response = + match response with + | Some response -> response + | None -> Greeter_protoc.Greeter.default_hello_reply () + in + + let result = + Grpc_eio.Client.Typed_rpc.call + (Grpc_protoc.Client_rpc.unary + Greeter_protoc.Greeter.Greeter.Client.sayHello) + ~do_request:(H2_eio.Client.request connection ~error_handler:ignore) + ~handler:(Grpc_eio.Client.Typed_rpc.unary request ~f) + () + in + + Eio.Promise.await (H2_eio.Client.shutdown connection); + result + in + Eio.Switch.run run + +let () = + match Eio_main.run main with + | Ok (reply, status) -> + Eio.traceln "%s: %s" (Grpc.Status.show status) reply.message + | Error err -> Eio.traceln "Error: %a" H2.Status.pp_hum err diff --git a/examples/greeter-protoc-server-eio/dune b/examples/greeter-protoc-server-eio/dune new file mode 100644 index 0000000..b4cb2b0 --- /dev/null +++ b/examples/greeter-protoc-server-eio/dune @@ -0,0 +1,3 @@ +(executable + (name greeter_server_eio) + (libraries grpc grpc-eio grpc-protoc eio_main greeter_protoc h2 h2-eio)) diff --git a/examples/greeter-protoc-server-eio/greeter_server_eio.ml b/examples/greeter-protoc-server-eio/greeter_server_eio.ml new file mode 100644 index 0000000..ac11d5d --- /dev/null +++ b/examples/greeter-protoc-server-eio/greeter_server_eio.ml @@ -0,0 +1,59 @@ +open Grpc_eio + +let sayHello rpc = + Grpc_eio.Server.Typed_rpc.unary (Grpc_protoc.Server_rpc.unary rpc) + ~f:(fun (request : Greeter_protoc.Greeter.hello_request) -> + let message = + if request.name = "" then "You forgot your name!" + else Format.sprintf "Hello, %s!" request.name + in + let reply = Greeter_protoc.Greeter.default_hello_reply ~message () in + (Grpc.Status.(v OK), Some reply)) + +let connection_handler server sw = + let error_handler client_address ?request:_ _error start_response = + Eio.traceln "Error in request from:%a" Eio.Net.Sockaddr.pp client_address; + let response_body = start_response H2.Headers.empty in + H2.Body.Writer.write_string response_body + "There was an error handling your request.\n"; + H2.Body.Writer.close response_body + in + let request_handler client_address request_descriptor = + Eio.traceln "Handling a request from:%a" Eio.Net.Sockaddr.pp client_address; + Eio.Fiber.fork ~sw (fun () -> + Grpc_eio.Server.handle_request server request_descriptor) + in + fun socket addr -> + H2_eio.Server.create_connection_handler ?config:None ~request_handler + ~error_handler addr ~sw socket + +let serve server env = + let port = 8080 in + let net = Eio.Stdenv.net env in + let addr = `Tcp (Eio.Net.Ipaddr.V4.loopback, port) in + Eio.Switch.run @@ fun sw -> + let handler = connection_handler server sw in + let server_socket = + Eio.Net.listen net ~sw ~reuse_addr:true ~backlog:10 addr + in + let rec listen () = + Eio.Net.accept_fork ~sw server_socket + ~on_error:(fun exn -> Eio.traceln "%s" (Printexc.to_string exn)) + handler; + listen () + in + Printf.printf "Listening on port %i for grpc requests\n" port; + print_endline ""; + print_endline "Try running:"; + print_endline ""; + print_endline + {| dune exec -- examples/greeter-protoc-client-eio/greeter_client_eio.exe |}; + listen () + +let () = + let server = + Greeter_protoc.Greeter.Greeter.Server.make ~sayHello () + |> Grpc_protoc.handlers |> Server.Typed_rpc.server + in + + Eio_main.run (serve server) diff --git a/examples/greeter-protoc/dune b/examples/greeter-protoc/dune new file mode 100644 index 0000000..9ebdcdb --- /dev/null +++ b/examples/greeter-protoc/dune @@ -0,0 +1,14 @@ +(library + (name greeter_protoc) + (package grpc-examples) + (libraries ocaml-protoc pbrt pbrt_services)) + +(rule + (copy ../greeter/greeter.proto greeter.proto)) + +(rule + (targets greeter.ml greeter.mli) + (deps + (:proto greeter.proto)) + (action + (run ocaml-protoc %{proto} --binary --pp --services --ml_out .))) diff --git a/examples/greeter-server-eio/dune b/examples/greeter-server-eio/dune index 8108aa6..05f400e 100644 --- a/examples/greeter-server-eio/dune +++ b/examples/greeter-server-eio/dune @@ -1,3 +1,3 @@ (executable (name greeter_server_eio) - (libraries grpc grpc-eio ocaml-protoc-plugin eio_main greeter h2 h2-eio)) + (libraries grpc grpc-eio grpc-protoc-plugin eio_main greeter h2 h2-eio)) diff --git a/examples/greeter-server-eio/greeter_server_eio.ml b/examples/greeter-server-eio/greeter_server_eio.ml index 16aaba0..bd2a636 100644 --- a/examples/greeter-server-eio/greeter_server_eio.ml +++ b/examples/greeter-server-eio/greeter_server_eio.ml @@ -1,22 +1,16 @@ open Grpc_eio -let say_hello buffer = - let open Ocaml_protoc_plugin in - let open Greeter.Mypackage in - let decode, encode = Service.make_service_functions Greeter.sayHello in - let request = - Reader.create buffer |> decode |> function - | Ok v -> v - | Error e -> - failwith - (Printf.sprintf "Could not decode request: %s" (Result.show_error e)) - in - let message = - if request = "" then "You forgot your name!" - else Format.sprintf "Hello, %s!" request - in - let reply = Greeter.SayHello.Response.make ~message () in - (Grpc.Status.(v OK), Some (encode reply |> Writer.contents)) +let say_hello = + let module SayHello = Greeter.Mypackage.Greeter.SayHello in + Grpc_eio.Server.Typed_rpc.unary + (Grpc_protoc_plugin.Server_rpc.unary (module SayHello)) + ~f:(fun request -> + let message = + if request = "" then "You forgot your name!" + else Format.sprintf "Hello, %s!" request + in + let reply = SayHello.Response.make ~message () in + (Grpc.Status.(v OK), Some reply)) let connection_handler server sw = let error_handler client_address ?request:_ _error start_response = @@ -59,12 +53,8 @@ let serve server env = listen () let () = - let greeter_service = - Server.Service.( - v () |> add_rpc ~name:"SayHello" ~rpc:(Unary say_hello) |> handle_request) - in let server = - Server.( - v () |> add_service ~name:"mypackage.Greeter" ~service:greeter_service) + Server.Typed_rpc.server (Grpc_protoc_plugin.handlers [ say_hello ]) in + Eio_main.run (serve server) diff --git a/examples/routeguide-protoc/proto/dune b/examples/routeguide-protoc/proto/dune new file mode 100644 index 0000000..6d00d81 --- /dev/null +++ b/examples/routeguide-protoc/proto/dune @@ -0,0 +1,24 @@ +(library + (name routeguide_protoc) + (package grpc-examples) + (preprocess + (pps ppx_deriving.show ppx_deriving.eq)) + (libraries ocaml-protoc pbrt pbrt_services)) + +(rule + (copy ../../routeguide/proto/route_guide.proto route_guide.proto)) + +(rule + (targets route_guide.ml route_guide.mli) + (deps + (:proto route_guide.proto)) + (action + (run + ocaml-protoc + %{proto} + --binary + --ocaml_all_types_ppx + "deriving show { with_path = false }, eq" + --services + --ml_out + .))) diff --git a/examples/routeguide-protoc/src/client.ml b/examples/routeguide-protoc/src/client.ml new file mode 100644 index 0000000..860da38 --- /dev/null +++ b/examples/routeguide-protoc/src/client.ml @@ -0,0 +1,198 @@ +open Grpc_eio +module Route_guide = Routeguide_protoc.Route_guide + +(* $MDX part-begin=client-h2 *) +let client ~sw host port network = + let inet, port = + Eio_unix.run_in_systhread (fun () -> + Unix.getaddrinfo host port [ Unix.(AI_FAMILY PF_INET) ]) + |> List.filter_map (fun (addr : Unix.addr_info) -> + match addr.ai_addr with + | Unix.ADDR_UNIX _ -> None + | ADDR_INET (addr, port) -> Some (addr, port)) + |> List.hd + in + let addr = `Tcp (Eio_unix.Net.Ipaddr.of_unix inet, port) in + let socket = Eio.Net.connect ~sw network addr in + H2_eio.Client.create_connection ~sw ~error_handler:ignore socket + +(* $MDX part-end *) +(* $MDX part-begin=client-get-feature *) +let call_get_feature connection point = + let response = + Client.Typed_rpc.call + (Grpc_protoc.Client_rpc.unary Route_guide.RouteGuide.Client.getFeature) + ~do_request:(H2_eio.Client.request connection ~error_handler:ignore) + ~handler: + (Client.Typed_rpc.unary point ~f:(function + | Some feature -> feature + | None -> Route_guide.default_feature ())) + () + in + match response with + | Ok (res, _ok) -> + Format.printf "RESPONSE = {%s}" (Route_guide.show_feature res) + | Error _ -> Printf.printf "an error occurred" + +(* $MDX part-end *) +(* $MDX part-begin=client-list-features *) +let print_features connection = + let rectangle = + Route_guide.default_rectangle + ~lo: + (Routeguide_protoc.Route_guide.default_point ~latitude:400000000l + ~longitude:(-750000000l) () + |> Option.some) + ~hi: + (Routeguide_protoc.Route_guide.default_point ~latitude:420000000l + ~longitude:(-730000000l) () + |> Option.some) + () + in + + let stream = + Client.Typed_rpc.call + (Grpc_protoc.Client_rpc.server_streaming + Route_guide.RouteGuide.Client.listFeatures) + ~do_request:(H2_eio.Client.request connection ~error_handler:ignore) + ~handler:(Client.Typed_rpc.server_streaming rectangle ~f:Fun.id) + () + in + match stream with + | Ok (results, _ok) -> + Seq.iter + (fun f -> Format.printf "RESPONSE = {%a}" Route_guide.pp_feature f) + results + | Error e -> + failwith (Printf.sprintf "HTTP2 error: %s" (H2.Status.to_string e)) + +(* $MDX part-end *) +(* $MDX part-begin=client-random-point *) +let random_point () : Route_guide.point = + let latitude = (Random.int 180 - 90) * 10000000 |> Int32.of_int in + let longitude = (Random.int 360 - 180) * 10000000 |> Int32.of_int in + Route_guide.default_point ~latitude ~longitude () + +(* $MDX part-end *) +(* $MDX part-begin=client-record-route *) +let run_record_route connection = + let points = + Random.int 100 + |> Seq.unfold (function 0 -> None | x -> Some (random_point (), x - 1)) + in + + let response = + Client.Typed_rpc.call + (Grpc_protoc.Client_rpc.client_streaming + Route_guide.RouteGuide.Client.recordRoute) + ~do_request:(H2_eio.Client.request connection ~error_handler:ignore) + ~handler: + (Client.Typed_rpc.client_streaming ~f:(fun f response -> + (* Stream points to server. *) + Seq.iter (fun point -> Seq.write f point) points; + + (* Signal we have finished sending points. *) + Seq.close_writer f; + + (* Decode RouteSummary responses. *) + Eio.Promise.await response |> function + | Some summary -> summary + | None -> failwith (Printf.sprintf "No RouteSummary received."))) + () + in + match response with + | Ok (result, _ok) -> + Format.printf "SUMMARY = {%a}" Route_guide.pp_route_summary result + | Error e -> + failwith (Printf.sprintf "HTTP2 error: %s" (H2.Status.to_string e)) + +(* $MDX part-end *) +(* $MDX part-begin=client-route-chat-1 *) +let run_route_chat clock connection = + (* Generate locations. *) + let location_count = 5 in + Printf.printf "Generating %i locations\n" location_count; + let route_notes = + location_count + |> Seq.unfold (function + | 0 -> None + | x -> + Some + ( Route_guide.default_route_note + ~location:(random_point () |> Option.some) + ~message:(Printf.sprintf "Random Message %i" x) + (), + x - 1 )) + in + (* $MDX part-end *) + (* $MDX part-begin=client-route-chat-2 *) + let rec go writer reader notes = + match Seq.uncons notes with + | None -> + Seq.close_writer writer (* Signal no more notes from the client. *) + | Some (route_note, xs) -> ( + Seq.write writer route_note; + + (* Yield and sleep, waiting for server reply. *) + Eio.Time.sleep clock 1.0; + Eio.Fiber.yield (); + + match Seq.uncons reader with + | None -> failwith "Expecting response" + | Some (route_note, reader') -> + Format.printf "NOTE = {%s}\n" + (Route_guide.show_route_note route_note); + go writer reader' xs) + in + let result = + Client.Typed_rpc.call + (Grpc_protoc.Client_rpc.bidirectional_streaming + Route_guide.RouteGuide.Client.routeChat) + ~do_request:(H2_eio.Client.request connection ~error_handler:ignore) + ~handler: + (Client.Typed_rpc.bidirectional_streaming ~f:(fun writer reader -> + go writer reader route_notes)) + () + in + match result with + | Ok ((), _ok) -> () + | Error e -> + failwith (Printf.sprintf "HTTP2 error: %s" (H2.Status.to_string e)) + +(* $MDX part-end *) +(* $MDX part-begin=client-main *) + +let main env = + let port = "8080" in + let host = "localhost" in + let clock = Eio.Stdenv.clock env in + let network = Eio.Stdenv.net env in + let () = Random.self_init () in + + let run sw = + let connection = client ~sw host port network in + + Printf.printf "*** SIMPLE RPC ***\n"; + let request = + Route_guide.default_point ~latitude:409146138l ~longitude:(-746188906l) () + in + let result = call_get_feature connection request in + + Printf.printf "\n*** SERVER STREAMING ***\n"; + print_features connection; + + Printf.printf "\n*** CLIENT STREAMING ***\n"; + run_record_route connection; + + Printf.printf "\n*** BIDIRECTIONAL STREAMING ***\n"; + run_route_chat clock connection; + + Eio.Promise.await (H2_eio.Client.shutdown connection); + result + in + + Eio.Switch.run run + +let () = Eio_main.run main + +(* $MDX part-end *) diff --git a/examples/routeguide-protoc/src/dune b/examples/routeguide-protoc/src/dune new file mode 100644 index 0000000..b0b91be --- /dev/null +++ b/examples/routeguide-protoc/src/dune @@ -0,0 +1,14 @@ +(executables + (names server client) + (package grpc-examples) + (public_names routeguide-protoc-server routeguide-protoc-client) + (libraries + grpc-eio + grpc-protoc + eio_main + h2-eio + routeguide_protoc + yojson + ppx_deriving_yojson.runtime) + (preprocess + (pps ppx_deriving_yojson ppx_deriving.show ppx_deriving.eq))) diff --git a/examples/routeguide-protoc/src/server.ml b/examples/routeguide-protoc/src/server.ml new file mode 100644 index 0000000..3c18fb3 --- /dev/null +++ b/examples/routeguide-protoc/src/server.ml @@ -0,0 +1,245 @@ +open Grpc_eio +module Route_guide = Routeguide_protoc.Route_guide + +(* Derived data types to make reading JSON data easier. *) +type location = { latitude : int; longitude : int } [@@deriving yojson] +type feature = { location : location; name : string } [@@deriving yojson] +type feature_list = feature list [@@deriving yojson] + +(* This will act as a master state that the server is serving over RPC. *) +type t = { features : Route_guide.feature list } + +module RouteNotesMap = Hashtbl.Make (struct + type t = Route_guide.point + + let equal = Route_guide.equal_point + let hash s = Hashtbl.hash s +end) + +(** Load route_guide data from a JSON file. *) +let load_features path : Route_guide.feature list = + let json = Yojson.Safe.from_file path in + match feature_list_of_yojson json with + | Ok v -> + List.map + (fun feature -> + Route_guide.default_feature ~name:feature.name + ~location: + (Route_guide.default_point + ~longitude:(feature.location.longitude |> Int32.of_int) + ~latitude:(feature.location.latitude |> Int32.of_int) + () + |> Option.some) + ()) + v + | Error err -> failwith err + +let in_range (point : Route_guide.point) (rect : Route_guide.rectangle) : bool = + let lo = Option.get rect.lo in + let hi = Option.get rect.hi in + + let left = Int32.min lo.longitude hi.longitude in + let right = Int32.max lo.longitude hi.longitude in + let top = Int32.max lo.latitude hi.latitude in + let bottom = Int32.min lo.latitude hi.latitude in + + point.longitude >= left && point.longitude <= right + && point.latitude >= bottom && point.latitude <= top + +let pi = 4. *. atan 1. +let radians_of_degrees = ( *. ) (pi /. 180.) + +(* Calculates the distance between two points using the "haversine" formula. *) +(* This code was taken from http://www.movable-type.co.uk/scripts/latlong.html. *) +let calc_distance (p1 : Route_guide.point) (p2 : Route_guide.point) : int = + let cord_factor = 1e7 in + let r = 6_371_000.0 in + (* meters *) + let lat1 = Int32.to_float p1.latitude /. cord_factor in + let lat2 = Int32.to_float p2.latitude /. cord_factor in + let lng1 = Int32.to_float p1.longitude /. cord_factor in + let lng2 = Int32.to_float p2.longitude /. cord_factor in + + let lat_rad1 = radians_of_degrees lat1 in + let lat_rad2 = radians_of_degrees lat2 in + + let delta_lat = radians_of_degrees (lat2 -. lat1) in + let delta_lng = radians_of_degrees (lng2 -. lng1) in + + let a = + (sin (delta_lat /. 2.0) *. sin (delta_lat /. 2.0)) + +. cos lat_rad1 *. cos lat_rad2 + *. sin (delta_lng /. 2.0) + *. sin (delta_lng /. 2.0) + in + let c = 2.0 *. atan2 (sqrt a) (sqrt (1.0 -. a)) in + Float.to_int (r *. c) + +(* $MDX part-begin=server-get-feature *) +let get_feature (t : t) rpc = + Grpc_eio.Server.Typed_rpc.unary (Grpc_protoc.Server_rpc.unary rpc) + ~f:(fun point -> + Eio.traceln "GetFeature = {:%a}" Route_guide.pp_point point; + + (* Lookup the feature and if found return it. *) + let feature = + List.find_opt + (fun (f : Route_guide.feature) -> + match (f.location, point) with + | Some p1, p2 -> Route_guide.equal_point p1 p2 + | _, _ -> false) + t.features + in + Eio.traceln "Found feature %s" + (feature + |> Option.map Route_guide.show_feature + |> Option.value ~default:"Missing"); + match feature with + | Some feature -> (Grpc.Status.(v OK), Some feature) + | None -> + (* No feature was found, return an unnamed feature. *) + ( Grpc.Status.(v OK), + Some (Route_guide.default_feature ~location:(Some point) ()) )) + +(* $MDX part-end *) +(* $MDX part-begin=server-list-features *) +let list_features (t : t) rpc = + Grpc_eio.Server.Typed_rpc.server_streaming + (Grpc_protoc.Server_rpc.server_streaming rpc) ~f:(fun rectangle f -> + (* Lookup and reply with features found. *) + let () = + List.iter + (fun (feature : Route_guide.feature) -> + if in_range (Option.get feature.location) rectangle then f feature + else ()) + t.features + in + Grpc.Status.(v OK)) + +(* $MDX part-end *) +(* $MDX part-begin=server-record-route *) +let record_route (t : t) (clock : _ Eio.Time.clock) rpc = + Grpc_eio.Server.Typed_rpc.client_streaming + (Grpc_protoc.Server_rpc.client_streaming rpc) + ~f:(fun (stream : Route_guide.point Seq.t) -> + Eio.traceln "RecordRoute"; + + let last_point = ref None in + let start = Eio.Time.now clock in + + let point_count, feature_count, distance = + Seq.fold_left + (fun (point_count, feature_count, distance) point -> + Eio.traceln " ==> Point = {%s}" (Route_guide.show_point point); + + (* Increment the point count *) + let point_count = point_count + 1 in + + (* Find features *) + let feature_count = + List.find_all + (fun (feature : Route_guide.feature) -> + Route_guide.equal_point (Option.get feature.location) point) + t.features + |> fun x -> List.length x + feature_count + in + + (* Calculate the distance *) + let distance = + match !last_point with + | Some last_point -> calc_distance last_point point + | None -> distance + in + last_point := Some point; + (point_count, feature_count, distance)) + (0, 0, 0) stream + in + let stop = Eio.Time.now clock in + let elapsed_time = int_of_float (stop -. start) in + let summary = + Route_guide.default_route_summary + ~point_count:(point_count |> Int32.of_int) + ~feature_count:(feature_count |> Int32.of_int) + ~distance:(distance |> Int32.of_int) + ~elapsed_time:(elapsed_time |> Int32.of_int) + () + in + Eio.traceln "RecordRoute exit\n"; + (Grpc.Status.(v OK), Some summary)) + +(* $MDX part-end *) +(* $MDX part-begin=server-route-chat *) +let route_chat (_ : t) rpc = + Grpc_eio.Server.Typed_rpc.bidirectional_streaming + (Grpc_protoc.Server_rpc.bidirectional_streaming rpc) + ~f:(fun + (stream : Route_guide.route_note Seq.t) + (f : Route_guide.route_note -> unit) + -> + Printf.printf "RouteChat\n"; + + Seq.iter + (fun note -> + Printf.printf " ==> Note = {%s}\n" (Route_guide.show_route_note note); + f note) + stream; + + Printf.printf "RouteChat exit\n"; + Grpc.Status.(v OK)) + +(* $MDX part-end *) +(* $MDX part-begin=server-grpc *) +let server t clock = + Route_guide.RouteGuide.Server.make ~getFeature:(get_feature t) + ~listFeatures:(list_features t) ~recordRoute:(record_route t clock) + ~routeChat:(route_chat t) () + |> Grpc_protoc.handlers |> Server.Typed_rpc.server + +(* $MDX part-end *) +let connection_handler server ~sw = + let error_handler client_address ?request:_ _error start_response = + Eio.traceln "Error in request from:%a" Eio.Net.Sockaddr.pp client_address; + let response_body = start_response H2.Headers.empty in + H2.Body.Writer.write_string response_body + "There was an error handling your request.\n"; + H2.Body.Writer.close response_body + in + let request_handler _client_address request_descriptor = + Eio.Fiber.fork ~sw (fun () -> + Grpc_eio.Server.handle_request server request_descriptor) + in + fun socket addr -> + H2_eio.Server.create_connection_handler ?config:None ~request_handler + ~error_handler addr socket ~sw + +(* $MDX part-begin=server-main *) +let serve t env = + let port = 8080 in + let net = Eio.Stdenv.net env in + let clock = Eio.Stdenv.clock env in + let addr = `Tcp (Eio.Net.Ipaddr.V4.loopback, port) in + Eio.Switch.run @@ fun sw -> + let handler = connection_handler ~sw (server t clock) in + let server_socket = + Eio.Net.listen net ~sw ~reuse_addr:true ~backlog:10 addr + in + let rec listen () = + Eio.Net.accept_fork ~sw server_socket + ~on_error:(fun exn -> Eio.traceln "%s" (Printexc.to_string exn)) + handler; + listen () + in + Eio.traceln "Listening on port %i for grpc requests\n" port; + listen () + +let () = + let path = + if Array.length Sys.argv > 1 then Sys.argv.(1) + else failwith "Path to datafile required." + in + + (* Load features. *) + let t = { features = load_features path } in + + Eio_main.run (serve t) +(* $MDX part-end *) diff --git a/examples/routeguide-tutorial.md b/examples/routeguide-tutorial.md index 96128be..b99e3b3 100644 --- a/examples/routeguide-tutorial.md +++ b/examples/routeguide-tutorial.md @@ -192,20 +192,10 @@ The individual service functions from our proto definition are implemented using ```ocaml -let route_guide_service clock = - Server.Service.( - v () - |> add_rpc ~name:"GetFeature" ~rpc:(Unary get_feature) - |> add_rpc ~name:"ListFeatures" ~rpc:(Server_streaming list_features) - |> add_rpc ~name:"RecordRoute" ~rpc:(Client_streaming (record_route clock)) - |> add_rpc ~name:"RouteChat" ~rpc:(Bidirectional_streaming route_chat) - |> handle_request) - -let server clock = - Server.( - v () - |> add_service ~name:"routeguide.RouteGuide" - ~service:(route_guide_service clock)) +let server t clock = + Server.Typed_rpc.server + (Grpc_protoc_plugin.handlers + [ get_feature t; list_features t; record_route t clock; route_chat t ]) ``` ### Simple RPC @@ -214,36 +204,28 @@ Let's look at the simplest type first, `GetFeature` which just gets a `Point` fr ```ocaml -let get_feature (buffer : string) = - let decode, encode = Service.make_service_functions RouteGuide.getFeature in - (* Decode the request. *) - let point = - Reader.create buffer |> decode |> function - | Ok v -> v - | Error e -> - failwith - (Printf.sprintf "Could not decode request: %s" (Result.show_error e)) - in - Eio.traceln "GetFeature = {:%s}" (Point.show point); - - (* Lookup the feature and if found return it. *) - let feature = - List.find_opt - (fun (f : Feature.t) -> - match (f.location, point) with - | Some p1, p2 -> Point.equal p1 p2 - | _, _ -> false) - !features - in - Eio.traceln "Found feature %s" - (feature |> Option.map Feature.show |> Option.value ~default:"Missing"); - match feature with - | Some feature -> - (Grpc.Status.(v OK), Some (feature |> encode |> Writer.contents)) - | None -> - (* No feature was found, return an unnamed feature. *) - ( Grpc.Status.(v OK), - Some (Feature.make ~location:point () |> encode |> Writer.contents) ) +let get_feature (t : t) = + Grpc_eio.Server.Typed_rpc.unary + (Grpc_protoc_plugin.Server_rpc.unary (module RouteGuide.GetFeature)) + ~f:(fun point -> + Eio.traceln "GetFeature = {:%s}" (Point.show point); + + (* Lookup the feature and if found return it. *) + let feature = + List.find_opt + (fun (f : Feature.t) -> + match (f.location, point) with + | Some p1, p2 -> Point.equal p1 p2 + | _, _ -> false) + t.features + in + Eio.traceln "Found feature %s" + (feature |> Option.map Feature.show |> Option.value ~default:"Missing"); + match feature with + | Some feature -> (Grpc.Status.(v OK), Some feature) + | None -> + (* No feature was found, return an unnamed feature. *) + (Grpc.Status.(v OK), Some (Feature.make ~location:point ()))) ``` The method is passed the client's `Point` protocol buffer request. It decodes the request into a `Point.t` and uses that to look up the feature. It returns a `Feature` protocol buffer object with the response information indicating the successful response, based on the feature found or an unnamed default feature. @@ -254,27 +236,20 @@ Now let's look at one of our streaming RPCs. `list_features` is a server-side st ```ocaml -let list_features (buffer : string) (f : string -> unit) = - (* Decode request. *) - let decode, encode = Service.make_service_functions RouteGuide.listFeatures in - let rectangle = - Reader.create buffer |> decode |> function - | Ok v -> v - | Error e -> - failwith - (Printf.sprintf "Could not decode request: %s" (Result.show_error e)) - in - - (* Lookup and reply with features found. *) - let () = - List.iter - (fun (feature : Feature.t) -> - if in_range (Option.get feature.location) rectangle then - encode feature |> Writer.contents |> f - else ()) - !features - in - Grpc.Status.(v OK) +let list_features (t : t) = + Grpc_eio.Server.Typed_rpc.server_streaming + (Grpc_protoc_plugin.Server_rpc.server_streaming + (module RouteGuide.ListFeatures)) + ~f:(fun rectangle f -> + (* Lookup and reply with features found. *) + let () = + List.iter + (fun (feature : Feature.t) -> + if in_range (Option.get feature.location) rectangle then f feature + else ()) + t.features + in + Grpc.Status.(v OK)) ``` Like `get_feature` `list_feature`'s input is a single message. A `Rectangle` that is decoded from a string buffer. The `f: (string -> unit)` function is for writing the encoded responses back to the client. In the function we decode the request, lookup any matching features and stream them back to the client as we find them using `f`. Once we've looked at all the `features` we respond with an `OK` indicating the streaming has finished successfully. @@ -285,55 +260,50 @@ Now let's look at something a little more complicated: the client-side streaming ```ocaml -let record_route (clock : _ Eio.Time.clock) (stream : string Seq.t) = - Eio.traceln "RecordRoute"; - - let last_point = ref None in - let start = Eio.Time.now clock in - let decode, encode = Service.make_service_functions RouteGuide.recordRoute in - - let point_count, feature_count, distance = - Seq.fold_left - (fun (point_count, feature_count, distance) i -> - let point = - Reader.create i |> decode |> function - | Ok v -> v - | Error e -> - failwith - (Printf.sprintf "Could not decode request: %s" - (Result.show_error e)) - in - Eio.traceln " ==> Point = {%s}" (Point.show point); - - (* Increment the point count *) - let point_count = point_count + 1 in - - (* Find features *) - let feature_count = - List.find_all - (fun (feature : Feature.t) -> - Point.equal (Option.get feature.location) point) - !features - |> fun x -> List.length x + feature_count - in - - (* Calculate the distance *) - let distance = - match !last_point with - | Some last_point -> calc_distance last_point point - | None -> distance - in - last_point := Some point; - (point_count, feature_count, distance)) - (0, 0, 0) stream - in - let stop = Eio.Time.now clock in - let elapsed_time = int_of_float (stop -. start) in - let summary = - RouteSummary.make ~point_count ~feature_count ~distance ~elapsed_time () - in - Eio.traceln "RecordRoute exit\n"; - (Grpc.Status.(v OK), Some (encode summary |> Writer.contents)) +let record_route (t : t) (clock : _ Eio.Time.clock) = + Grpc_eio.Server.Typed_rpc.client_streaming + (Grpc_protoc_plugin.Server_rpc.client_streaming + (module RouteGuide.RecordRoute)) + ~f:(fun (stream : Point.t Seq.t) -> + Eio.traceln "RecordRoute"; + + let last_point = ref None in + let start = Eio.Time.now clock in + + let point_count, feature_count, distance = + Seq.fold_left + (fun (point_count, feature_count, distance) point -> + Eio.traceln " ==> Point = {%s}" (Point.show point); + + (* Increment the point count *) + let point_count = point_count + 1 in + + (* Find features *) + let feature_count = + List.find_all + (fun (feature : Feature.t) -> + Point.equal (Option.get feature.location) point) + t.features + |> fun x -> List.length x + feature_count + in + + (* Calculate the distance *) + let distance = + match !last_point with + | Some last_point -> calc_distance last_point point + | None -> distance + in + last_point := Some point; + (point_count, feature_count, distance)) + (0, 0, 0) stream + in + let stop = Eio.Time.now clock in + let elapsed_time = int_of_float (stop -. start) in + let summary = + RouteSummary.make ~point_count ~feature_count ~distance ~elapsed_time () + in + Eio.traceln "RecordRoute exit\n"; + (Grpc.Status.(v OK), Some summary)) ``` ### Bidirectional streaming RPCs @@ -342,26 +312,21 @@ Finally, let's look at our bidirectional streaming RPC `route_chat`, which recei ```ocaml -let route_chat (stream : string Seq.t) (f : string -> unit) = - Printf.printf "RouteChat\n"; - - let decode, encode = Service.make_service_functions RouteGuide.routeChat in - Seq.iter - (fun i -> - let note = - Reader.create i |> decode |> function - | Ok v -> v - | Error e -> - failwith - (Printf.sprintf "Could not decode request: %s" - (Result.show_error e)) - in - Printf.printf " ==> Note = {%s}\n" (RouteNote.show note); - encode note |> Writer.contents |> f) - stream; +let route_chat (_ : t) = + Grpc_eio.Server.Typed_rpc.bidirectional_streaming + (Grpc_protoc_plugin.Server_rpc.bidirectional_streaming + (module RouteGuide.RouteChat)) + ~f:(fun (stream : RouteNote.t Seq.t) (f : RouteNote.t -> unit) -> + Printf.printf "RouteChat\n"; + + Seq.iter + (fun note -> + Printf.printf " ==> Note = {%s}\n" (RouteNote.show note); + f note) + stream; - Printf.printf "RouteChat exit\n"; - Grpc.Status.(v OK) + Printf.printf "RouteChat exit\n"; + Grpc.Status.(v OK)) ``` `route_chat` receives a `string Seq.t` of requests which it decodes, logs to stdout to show it has received the note, and then encodes again to send back to the client. Finally it responds with an `OK` indicating it has finished. The logic is we receive one `RouteNote` and respond directly with the same `RouteNote` using the `f` function supplied. @@ -372,13 +337,13 @@ Once we've implemented all our functions, we also need to startup a gRPC server ```ocaml -let serve server env = +let serve t env = let port = 8080 in let net = Eio.Stdenv.net env in let clock = Eio.Stdenv.clock env in let addr = `Tcp (Eio.Net.Ipaddr.V4.loopback, port) in Eio.Switch.run @@ fun sw -> - let handler = connection_handler ~sw (server clock) in + let handler = connection_handler ~sw (server t clock) in let server_socket = Eio.Net.listen net ~sw ~reuse_addr:true ~backlog:10 addr in @@ -398,9 +363,9 @@ let () = in (* Load features. *) - features := load path; + let t = { features = load_features path } in - Eio_main.run (serve server) + Eio_main.run (serve t) ``` To handle requests we use `h2-lwt-unix`, an implementation of the HTTP/2 specification entirely in OCaml. What that means is we can swap in other h2 implementations like MirageOS to run in a Unikernel or Async to use JaneStreet's alternatve async implementation. Furthermore we can add TLS or SSL encryptionon to our HTTP/2 stack. @@ -437,23 +402,14 @@ Calling the simple RPC `get_feature` requires building up a `Client.call` repres ```ocaml let call_get_feature connection point = - let encode, decode = Service.make_client_functions RouteGuide.getFeature in let response = - Client.call ~service:"routeguide.RouteGuide" ~rpc:"GetFeature" + Client.Typed_rpc.call + (Grpc_protoc_plugin.Client_rpc.unary (module RouteGuide.GetFeature)) ~do_request:(H2_eio.Client.request connection ~error_handler:ignore) ~handler: - (Client.Rpc.unary - (encode point |> Writer.contents) - ~f:(fun response -> - match response with - | Some response -> ( - Reader.create response |> decode |> function - | Ok feature -> feature - | Error e -> - failwith - (Printf.sprintf "Could not decode request: %s" - (Result.show_error e))) - | None -> Feature.make ())) + (Client.Typed_rpc.unary point ~f:(function + | Some feature -> feature + | None -> Feature.make ())) () in match response with @@ -474,26 +430,12 @@ let print_features connection = () in - let encode, decode = Service.make_client_functions RouteGuide.listFeatures in let stream = - Client.call ~service:"routeguide.RouteGuide" ~rpc:"ListFeatures" + Client.Typed_rpc.call + (Grpc_protoc_plugin.Client_rpc.server_streaming + (module RouteGuide.ListFeatures)) ~do_request:(H2_eio.Client.request connection ~error_handler:ignore) - ~handler: - (Client.Rpc.server_streaming - (encode rectangle |> Writer.contents) - ~f:(fun responses -> - let stream = - Seq.map - (fun str -> - Reader.create str |> decode |> function - | Ok feature -> feature - | Error e -> - failwith - (Printf.sprintf "Could not decode request: %s" - (Result.show_error e))) - responses - in - stream)) + ~handler:(Client.Typed_rpc.server_streaming rectangle ~f:Fun.id) () in match stream with @@ -526,30 +468,22 @@ let run_record_route connection = |> Seq.unfold (function 0 -> None | x -> Some (random_point (), x - 1)) in - let encode, decode = Service.make_client_functions RouteGuide.recordRoute in let response = - Client.call ~service:"routeguide.RouteGuide" ~rpc:"RecordRoute" + Client.Typed_rpc.call + (Grpc_protoc_plugin.Client_rpc.client_streaming + (module RouteGuide.RecordRoute)) ~do_request:(H2_eio.Client.request connection ~error_handler:ignore) ~handler: - (Client.Rpc.client_streaming ~f:(fun f response -> + (Client.Typed_rpc.client_streaming ~f:(fun f response -> (* Stream points to server. *) - Seq.iter - (fun point -> - encode point |> Writer.contents |> fun x -> Seq.write f x) - points; + Seq.iter (fun point -> Seq.write f point) points; (* Signal we have finished sending points. *) Seq.close_writer f; (* Decode RouteSummary responses. *) Eio.Promise.await response |> function - | Some str -> ( - Reader.create str |> decode |> function - | Ok feature -> feature - | Error err -> - failwith - (Printf.sprintf "Could not decode request: %s" - (Result.show_error err))) + | Some summary -> summary | None -> failwith (Printf.sprintf "No RouteSummary received."))) () in @@ -587,14 +521,12 @@ let run_route_chat clock connection = We start by generating a short sequence of locations, similar to how we did for `record_route`. ```ocaml - let encode, decode = Service.make_client_functions RouteGuide.routeChat in let rec go writer reader notes = match Seq.uncons notes with | None -> Seq.close_writer writer (* Signal no more notes from the client. *) | Some (route_note, xs) -> ( - encode route_note |> Writer.contents |> fun x -> - Seq.write writer x; + Seq.write writer route_note; (* Yield and sleep, waiting for server reply. *) Eio.Time.sleep clock 1.0; @@ -602,23 +534,17 @@ We start by generating a short sequence of locations, similar to how we did for match Seq.uncons reader with | None -> failwith "Expecting response" - | Some (response, reader') -> - let route_note = - Reader.create response |> decode |> function - | Ok route_note -> route_note - | Error e -> - failwith - (Printf.sprintf "Could not decode request: %s" - (Result.show_error e)) - in + | Some (route_note, reader') -> Printf.printf "NOTE = {%s}\n" (RouteNote.show route_note); go writer reader' xs) in let result = - Client.call ~service:"routeguide.RouteGuide" ~rpc:"RouteChat" + Client.Typed_rpc.call + (Grpc_protoc_plugin.Client_rpc.bidirectional_streaming + (module RouteGuide.RouteChat)) ~do_request:(H2_eio.Client.request connection ~error_handler:ignore) ~handler: - (Client.Rpc.bidirectional_streaming ~f:(fun writer reader -> + (Client.Typed_rpc.bidirectional_streaming ~f:(fun writer reader -> go writer reader route_notes)) () in diff --git a/examples/routeguide/src/client.ml b/examples/routeguide/src/client.ml index 47d8dba..94c8167 100644 --- a/examples/routeguide/src/client.ml +++ b/examples/routeguide/src/client.ml @@ -1,6 +1,5 @@ open Grpc_eio open Routeguide.Route_guide.Routeguide -open Ocaml_protoc_plugin (* $MDX part-begin=client-h2 *) let client ~sw host port network = @@ -20,23 +19,14 @@ let client ~sw host port network = (* $MDX part-end *) (* $MDX part-begin=client-get-feature *) let call_get_feature connection point = - let encode, decode = Service.make_client_functions RouteGuide.getFeature in let response = - Client.call ~service:"routeguide.RouteGuide" ~rpc:"GetFeature" + Client.Typed_rpc.call + (Grpc_protoc_plugin.Client_rpc.unary (module RouteGuide.GetFeature)) ~do_request:(H2_eio.Client.request connection ~error_handler:ignore) ~handler: - (Client.Rpc.unary - (encode point |> Writer.contents) - ~f:(fun response -> - match response with - | Some response -> ( - Reader.create response |> decode |> function - | Ok feature -> feature - | Error e -> - failwith - (Printf.sprintf "Could not decode request: %s" - (Result.show_error e))) - | None -> Feature.make ())) + (Client.Typed_rpc.unary point ~f:(function + | Some feature -> feature + | None -> Feature.make ())) () in match response with @@ -53,26 +43,12 @@ let print_features connection = () in - let encode, decode = Service.make_client_functions RouteGuide.listFeatures in let stream = - Client.call ~service:"routeguide.RouteGuide" ~rpc:"ListFeatures" + Client.Typed_rpc.call + (Grpc_protoc_plugin.Client_rpc.server_streaming + (module RouteGuide.ListFeatures)) ~do_request:(H2_eio.Client.request connection ~error_handler:ignore) - ~handler: - (Client.Rpc.server_streaming - (encode rectangle |> Writer.contents) - ~f:(fun responses -> - let stream = - Seq.map - (fun str -> - Reader.create str |> decode |> function - | Ok feature -> feature - | Error e -> - failwith - (Printf.sprintf "Could not decode request: %s" - (Result.show_error e))) - responses - in - stream)) + ~handler:(Client.Typed_rpc.server_streaming rectangle ~f:Fun.id) () in match stream with @@ -98,30 +74,22 @@ let run_record_route connection = |> Seq.unfold (function 0 -> None | x -> Some (random_point (), x - 1)) in - let encode, decode = Service.make_client_functions RouteGuide.recordRoute in let response = - Client.call ~service:"routeguide.RouteGuide" ~rpc:"RecordRoute" + Client.Typed_rpc.call + (Grpc_protoc_plugin.Client_rpc.client_streaming + (module RouteGuide.RecordRoute)) ~do_request:(H2_eio.Client.request connection ~error_handler:ignore) ~handler: - (Client.Rpc.client_streaming ~f:(fun f response -> + (Client.Typed_rpc.client_streaming ~f:(fun f response -> (* Stream points to server. *) - Seq.iter - (fun point -> - encode point |> Writer.contents |> fun x -> Seq.write f x) - points; + Seq.iter (fun point -> Seq.write f point) points; (* Signal we have finished sending points. *) Seq.close_writer f; (* Decode RouteSummary responses. *) Eio.Promise.await response |> function - | Some str -> ( - Reader.create str |> decode |> function - | Ok feature -> feature - | Error err -> - failwith - (Printf.sprintf "Could not decode request: %s" - (Result.show_error err))) + | Some summary -> summary | None -> failwith (Printf.sprintf "No RouteSummary received."))) () in @@ -150,14 +118,12 @@ let run_route_chat clock connection = in (* $MDX part-end *) (* $MDX part-begin=client-route-chat-2 *) - let encode, decode = Service.make_client_functions RouteGuide.routeChat in let rec go writer reader notes = match Seq.uncons notes with | None -> Seq.close_writer writer (* Signal no more notes from the client. *) | Some (route_note, xs) -> ( - encode route_note |> Writer.contents |> fun x -> - Seq.write writer x; + Seq.write writer route_note; (* Yield and sleep, waiting for server reply. *) Eio.Time.sleep clock 1.0; @@ -165,23 +131,17 @@ let run_route_chat clock connection = match Seq.uncons reader with | None -> failwith "Expecting response" - | Some (response, reader') -> - let route_note = - Reader.create response |> decode |> function - | Ok route_note -> route_note - | Error e -> - failwith - (Printf.sprintf "Could not decode request: %s" - (Result.show_error e)) - in + | Some (route_note, reader') -> Printf.printf "NOTE = {%s}\n" (RouteNote.show route_note); go writer reader' xs) in let result = - Client.call ~service:"routeguide.RouteGuide" ~rpc:"RouteChat" + Client.Typed_rpc.call + (Grpc_protoc_plugin.Client_rpc.bidirectional_streaming + (module RouteGuide.RouteChat)) ~do_request:(H2_eio.Client.request connection ~error_handler:ignore) ~handler: - (Client.Rpc.bidirectional_streaming ~f:(fun writer reader -> + (Client.Typed_rpc.bidirectional_streaming ~f:(fun writer reader -> go writer reader route_notes)) () in diff --git a/examples/routeguide/src/dune b/examples/routeguide/src/dune index 9c5afaf..5da330a 100644 --- a/examples/routeguide/src/dune +++ b/examples/routeguide/src/dune @@ -4,9 +4,9 @@ (public_names routeguide-server routeguide-client) (libraries grpc-eio + grpc-protoc-plugin eio_main h2-eio - ocaml-protoc-plugin routeguide yojson ppx_deriving_yojson.runtime) diff --git a/examples/routeguide/src/server.ml b/examples/routeguide/src/server.ml index bfa30d9..557e323 100644 --- a/examples/routeguide/src/server.ml +++ b/examples/routeguide/src/server.ml @@ -1,13 +1,13 @@ open Grpc_eio open Routeguide.Route_guide.Routeguide -open Ocaml_protoc_plugin (* Derived data types to make reading JSON data easier. *) type location = { latitude : int; longitude : int } [@@deriving yojson] type feature = { location : location; name : string } [@@deriving yojson] type feature_list = feature list [@@deriving yojson] -let features : Feature.t list ref = ref [] +(* This will act as a master state that the server is serving over RPC. *) +type t = { features : Feature.t list } module RouteNotesMap = Hashtbl.Make (struct type t = Point.t @@ -17,7 +17,7 @@ module RouteNotesMap = Hashtbl.Make (struct end) (** Load route_guide data from a JSON file. *) -let load path : Feature.t list = +let load_features path : Feature.t list = let json = Yojson.Safe.from_file path in match feature_list_of_yojson json with | Ok v -> @@ -73,152 +73,117 @@ let calc_distance (p1 : Point.t) (p2 : Point.t) : int = Float.to_int (r *. c) (* $MDX part-begin=server-get-feature *) -let get_feature (buffer : string) = - let decode, encode = Service.make_service_functions RouteGuide.getFeature in - (* Decode the request. *) - let point = - Reader.create buffer |> decode |> function - | Ok v -> v - | Error e -> - failwith - (Printf.sprintf "Could not decode request: %s" (Result.show_error e)) - in - Eio.traceln "GetFeature = {:%s}" (Point.show point); - - (* Lookup the feature and if found return it. *) - let feature = - List.find_opt - (fun (f : Feature.t) -> - match (f.location, point) with - | Some p1, p2 -> Point.equal p1 p2 - | _, _ -> false) - !features - in - Eio.traceln "Found feature %s" - (feature |> Option.map Feature.show |> Option.value ~default:"Missing"); - match feature with - | Some feature -> - (Grpc.Status.(v OK), Some (feature |> encode |> Writer.contents)) - | None -> - (* No feature was found, return an unnamed feature. *) - ( Grpc.Status.(v OK), - Some (Feature.make ~location:point () |> encode |> Writer.contents) ) +let get_feature (t : t) = + Grpc_eio.Server.Typed_rpc.unary + (Grpc_protoc_plugin.Server_rpc.unary (module RouteGuide.GetFeature)) + ~f:(fun point -> + Eio.traceln "GetFeature = {:%s}" (Point.show point); + + (* Lookup the feature and if found return it. *) + let feature = + List.find_opt + (fun (f : Feature.t) -> + match (f.location, point) with + | Some p1, p2 -> Point.equal p1 p2 + | _, _ -> false) + t.features + in + Eio.traceln "Found feature %s" + (feature |> Option.map Feature.show |> Option.value ~default:"Missing"); + match feature with + | Some feature -> (Grpc.Status.(v OK), Some feature) + | None -> + (* No feature was found, return an unnamed feature. *) + (Grpc.Status.(v OK), Some (Feature.make ~location:point ()))) (* $MDX part-end *) (* $MDX part-begin=server-list-features *) -let list_features (buffer : string) (f : string -> unit) = - (* Decode request. *) - let decode, encode = Service.make_service_functions RouteGuide.listFeatures in - let rectangle = - Reader.create buffer |> decode |> function - | Ok v -> v - | Error e -> - failwith - (Printf.sprintf "Could not decode request: %s" (Result.show_error e)) - in - - (* Lookup and reply with features found. *) - let () = - List.iter - (fun (feature : Feature.t) -> - if in_range (Option.get feature.location) rectangle then - encode feature |> Writer.contents |> f - else ()) - !features - in - Grpc.Status.(v OK) +let list_features (t : t) = + Grpc_eio.Server.Typed_rpc.server_streaming + (Grpc_protoc_plugin.Server_rpc.server_streaming + (module RouteGuide.ListFeatures)) + ~f:(fun rectangle f -> + (* Lookup and reply with features found. *) + let () = + List.iter + (fun (feature : Feature.t) -> + if in_range (Option.get feature.location) rectangle then f feature + else ()) + t.features + in + Grpc.Status.(v OK)) (* $MDX part-end *) (* $MDX part-begin=server-record-route *) -let record_route (clock : _ Eio.Time.clock) (stream : string Seq.t) = - Eio.traceln "RecordRoute"; - - let last_point = ref None in - let start = Eio.Time.now clock in - let decode, encode = Service.make_service_functions RouteGuide.recordRoute in - - let point_count, feature_count, distance = - Seq.fold_left - (fun (point_count, feature_count, distance) i -> - let point = - Reader.create i |> decode |> function - | Ok v -> v - | Error e -> - failwith - (Printf.sprintf "Could not decode request: %s" - (Result.show_error e)) - in - Eio.traceln " ==> Point = {%s}" (Point.show point); - - (* Increment the point count *) - let point_count = point_count + 1 in - - (* Find features *) - let feature_count = - List.find_all - (fun (feature : Feature.t) -> - Point.equal (Option.get feature.location) point) - !features - |> fun x -> List.length x + feature_count - in - - (* Calculate the distance *) - let distance = - match !last_point with - | Some last_point -> calc_distance last_point point - | None -> distance - in - last_point := Some point; - (point_count, feature_count, distance)) - (0, 0, 0) stream - in - let stop = Eio.Time.now clock in - let elapsed_time = int_of_float (stop -. start) in - let summary = - RouteSummary.make ~point_count ~feature_count ~distance ~elapsed_time () - in - Eio.traceln "RecordRoute exit\n"; - (Grpc.Status.(v OK), Some (encode summary |> Writer.contents)) +let record_route (t : t) (clock : _ Eio.Time.clock) = + Grpc_eio.Server.Typed_rpc.client_streaming + (Grpc_protoc_plugin.Server_rpc.client_streaming + (module RouteGuide.RecordRoute)) + ~f:(fun (stream : Point.t Seq.t) -> + Eio.traceln "RecordRoute"; + + let last_point = ref None in + let start = Eio.Time.now clock in + + let point_count, feature_count, distance = + Seq.fold_left + (fun (point_count, feature_count, distance) point -> + Eio.traceln " ==> Point = {%s}" (Point.show point); + + (* Increment the point count *) + let point_count = point_count + 1 in + + (* Find features *) + let feature_count = + List.find_all + (fun (feature : Feature.t) -> + Point.equal (Option.get feature.location) point) + t.features + |> fun x -> List.length x + feature_count + in + + (* Calculate the distance *) + let distance = + match !last_point with + | Some last_point -> calc_distance last_point point + | None -> distance + in + last_point := Some point; + (point_count, feature_count, distance)) + (0, 0, 0) stream + in + let stop = Eio.Time.now clock in + let elapsed_time = int_of_float (stop -. start) in + let summary = + RouteSummary.make ~point_count ~feature_count ~distance ~elapsed_time () + in + Eio.traceln "RecordRoute exit\n"; + (Grpc.Status.(v OK), Some summary)) (* $MDX part-end *) (* $MDX part-begin=server-route-chat *) -let route_chat (stream : string Seq.t) (f : string -> unit) = - Printf.printf "RouteChat\n"; - - let decode, encode = Service.make_service_functions RouteGuide.routeChat in - Seq.iter - (fun i -> - let note = - Reader.create i |> decode |> function - | Ok v -> v - | Error e -> - failwith - (Printf.sprintf "Could not decode request: %s" - (Result.show_error e)) - in - Printf.printf " ==> Note = {%s}\n" (RouteNote.show note); - encode note |> Writer.contents |> f) - stream; - - Printf.printf "RouteChat exit\n"; - Grpc.Status.(v OK) +let route_chat (_ : t) = + Grpc_eio.Server.Typed_rpc.bidirectional_streaming + (Grpc_protoc_plugin.Server_rpc.bidirectional_streaming + (module RouteGuide.RouteChat)) + ~f:(fun (stream : RouteNote.t Seq.t) (f : RouteNote.t -> unit) -> + Printf.printf "RouteChat\n"; + + Seq.iter + (fun note -> + Printf.printf " ==> Note = {%s}\n" (RouteNote.show note); + f note) + stream; + + Printf.printf "RouteChat exit\n"; + Grpc.Status.(v OK)) (* $MDX part-end *) (* $MDX part-begin=server-grpc *) -let route_guide_service clock = - Server.Service.( - v () - |> add_rpc ~name:"GetFeature" ~rpc:(Unary get_feature) - |> add_rpc ~name:"ListFeatures" ~rpc:(Server_streaming list_features) - |> add_rpc ~name:"RecordRoute" ~rpc:(Client_streaming (record_route clock)) - |> add_rpc ~name:"RouteChat" ~rpc:(Bidirectional_streaming route_chat) - |> handle_request) - -let server clock = - Server.( - v () - |> add_service ~name:"routeguide.RouteGuide" - ~service:(route_guide_service clock)) +let server t clock = + Server.Typed_rpc.server + (Grpc_protoc_plugin.handlers + [ get_feature t; list_features t; record_route t clock; route_chat t ]) (* $MDX part-end *) let connection_handler server ~sw = @@ -238,13 +203,13 @@ let connection_handler server ~sw = ~error_handler addr socket ~sw (* $MDX part-begin=server-main *) -let serve server env = +let serve t env = let port = 8080 in let net = Eio.Stdenv.net env in let clock = Eio.Stdenv.clock env in let addr = `Tcp (Eio.Net.Ipaddr.V4.loopback, port) in Eio.Switch.run @@ fun sw -> - let handler = connection_handler ~sw (server clock) in + let handler = connection_handler ~sw (server t clock) in let server_socket = Eio.Net.listen net ~sw ~reuse_addr:true ~backlog:10 addr in @@ -264,7 +229,7 @@ let () = in (* Load features. *) - features := load path; + let t = { features = load_features path } in - Eio_main.run (serve server) + Eio_main.run (serve t) (* $MDX part-end *) diff --git a/grpc-protoc-plugin.opam b/grpc-protoc-plugin.opam new file mode 100644 index 0000000..1ddae22 --- /dev/null +++ b/grpc-protoc-plugin.opam @@ -0,0 +1,40 @@ +# This file is generated by dune, edit dune-project instead +opam-version: "2.0" +synopsis: "An implementation of gRPC using ocaml-protoc-plugin" +description: + "Functionality for building gRPC services and rpcs with `ocaml-protoc-plugin`" +maintainer: ["Daniel Quernheim "] +authors: [ + "Andrew Jeffery " + "Daniel Quernheim " + "Michael Bacarella " + "Sven Anderson " + "Tim McGilchrist " + "Wojtek Czekalski " + "dimitris.mostrous " +] +license: "BSD-3-Clause" +homepage: "https://github.com/dialohq/ocaml-grpc" +doc: "https://dialohq.github.io/ocaml-grpc" +bug-reports: "https://github.com/dialohq/ocaml-grpc/issues" +depends: [ + "dune" {>= "3.7"} + "grpc" {= version} + "ocaml-protoc-plugin" {>= "4.5"} + "odoc" {with-doc} +] +build: [ + ["dune" "subst"] {dev} + [ + "dune" + "build" + "-p" + name + "-j" + jobs + "@install" + "@runtest" {with-test} + "@doc" {with-doc} + ] +] +dev-repo: "git+https://github.com/dialohq/ocaml-grpc.git" diff --git a/grpc-protoc.opam b/grpc-protoc.opam new file mode 100644 index 0000000..fde3dfd --- /dev/null +++ b/grpc-protoc.opam @@ -0,0 +1,42 @@ +# This file is generated by dune, edit dune-project instead +opam-version: "2.0" +synopsis: "An implementation of gRPC using ocaml-protoc" +description: + "Functionality for building gRPC services and rpcs with `ocaml-protoc`" +maintainer: ["Daniel Quernheim "] +authors: [ + "Andrew Jeffery " + "Daniel Quernheim " + "Michael Bacarella " + "Sven Anderson " + "Tim McGilchrist " + "Wojtek Czekalski " + "dimitris.mostrous " +] +license: "BSD-3-Clause" +homepage: "https://github.com/dialohq/ocaml-grpc" +doc: "https://dialohq.github.io/ocaml-grpc" +bug-reports: "https://github.com/dialohq/ocaml-grpc/issues" +depends: [ + "dune" {>= "3.7"} + "grpc" {= version} + "ocaml-protoc" {>= "3.0"} + "pbrt" {>= "3.0"} + "pbrt_services" {>= "3.0"} + "odoc" {with-doc} +] +build: [ + ["dune" "subst"] {dev} + [ + "dune" + "build" + "-p" + name + "-j" + jobs + "@install" + "@runtest" {with-test} + "@doc" {with-doc} + ] +] +dev-repo: "git+https://github.com/dialohq/ocaml-grpc.git" diff --git a/lib/grpc-eio/client.ml b/lib/grpc-eio/client.ml index 4efe5cd..7d2aa47 100644 --- a/lib/grpc-eio/client.ml +++ b/lib/grpc-eio/client.ml @@ -66,20 +66,98 @@ let call ~service ~rpc ?(scheme = "https") ~handler ~(do_request : do_request) Ok (result, status) | error_status -> Error error_status +let make_handler ~encode_request ~decode_response ~f write_body read_body = + let response_reader, response_writer = Seq.create_reader_writer () in + let request_reader, request_writer = Seq.create_reader_writer () in + Connection.grpc_recv_streaming ~decode:decode_response read_body + response_writer; + let res, res_notify = Eio.Promise.create () in + Eio.Fiber.both + (fun () -> + Eio.Promise.resolve res_notify (f request_writer response_reader)) + (fun () -> + Connection.grpc_send_streaming_client ~encode:encode_request write_body + request_reader); + Eio.Promise.await res + +module Typed_rpc = struct + type ('request, 'request_mode, 'response, 'response_mode, 'a) handler = + ('request, 'request_mode, 'response, 'response_mode) Grpc.Rpc.Client_rpc.t -> + H2.Body.Writer.t -> + H2.Body.Reader.t -> + 'a + + let make_handler (type request response) + ~(rpc : (request, _, response, _) Grpc.Rpc.Client_rpc.t) ~f = + make_handler ~encode_request:rpc.encode_request + ~decode_response:rpc.decode_response ~f + + let bidirectional_streaming (type request response) ~f + (rpc : + ( request, + Grpc.Rpc.Value_mode.stream, + response, + Grpc.Rpc.Value_mode.stream ) + Grpc.Rpc.Client_rpc.t) = + make_handler ~rpc ~f + + let client_streaming (type request response) ~f + (rpc : + ( request, + Grpc.Rpc.Value_mode.stream, + response, + Grpc.Rpc.Value_mode.unary ) + Grpc.Rpc.Client_rpc.t) = + make_handler ~rpc ~f:(fun request_writer responses -> + let response, response_resolver = Eio.Promise.create () in + Eio.Fiber.pair + (fun () -> f request_writer response) + (fun () -> + Eio.Promise.resolve response_resolver + (Seq.read_and_exhaust responses)) + |> fst) + + let server_streaming (type request response) ~f (request : request) + (rpc : + ( request, + Grpc.Rpc.Value_mode.unary, + response, + Grpc.Rpc.Value_mode.stream ) + Grpc.Rpc.Client_rpc.t) = + make_handler ~rpc ~f:(fun request_writer responses -> + Seq.write request_writer request; + Seq.close_writer request_writer; + f responses) + + let unary (type request response) ~f (request : request) + (rpc : + ( request, + Grpc.Rpc.Value_mode.unary, + response, + Grpc.Rpc.Value_mode.unary ) + Grpc.Rpc.Client_rpc.t) = + make_handler ~rpc ~f:(fun request_writer responses -> + Seq.write request_writer request; + Seq.close_writer request_writer; + let response = Seq.read_and_exhaust responses in + f response) + + let call (type request request_mode response response_mode a) + (rpc : + (request, request_mode, response, response_mode) Grpc.Rpc.Client_rpc.t) + ?scheme + ~(handler : (request, request_mode, response, response_mode, a) handler) + ~do_request ?headers () = + call + ~service:(Grpc.Rpc.Service_spec.packaged_service_name rpc.service_spec) + ~rpc:rpc.rpc_name ?scheme ~handler:(handler rpc) ~do_request ?headers () +end + module Rpc = struct type 'a handler = H2.Body.Writer.t -> H2.Body.Reader.t -> 'a - let bidirectional_streaming ~f write_body read_body = - let response_reader, response_writer = Seq.create_reader_writer () in - let request_reader, request_writer = Seq.create_reader_writer () in - Connection.grpc_recv_streaming read_body response_writer; - let res, res_notify = Eio.Promise.create () in - Eio.Fiber.both - (fun () -> - Eio.Promise.resolve res_notify (f request_writer response_reader)) - (fun () -> - Connection.grpc_send_streaming_client write_body request_reader); - Eio.Promise.await res + let bidirectional_streaming ~f = + make_handler ~encode_request:Fun.id ~decode_response:Fun.id ~f let client_streaming ~f = bidirectional_streaming ~f:(fun request_writer responses -> @@ -103,4 +181,6 @@ module Rpc = struct Seq.close_writer request_writer; let response = Seq.read_and_exhaust responses in f response) + + let call = call end diff --git a/lib/grpc-eio/client.mli b/lib/grpc-eio/client.mli index 745d33c..07a3767 100644 --- a/lib/grpc-eio/client.mli +++ b/lib/grpc-eio/client.mli @@ -1,3 +1,81 @@ +type response_handler = H2.Client_connection.response_handler + +type do_request = + ?flush_headers_immediately:bool -> + ?trailers_handler:(H2.Headers.t -> unit) -> + H2.Request.t -> + response_handler:response_handler -> + H2.Body.Writer.t +(** [do_request] is the type of a function that performs the request *) + +(** {1 Typed API} *) + +module Typed_rpc : sig + (** This is an experimental API to call RPC from the client side. Compared to + {Rpc}, this interface will: + + - handle the coding/decoding of messages for you under the hood; + - use the service and RPC names provided by the rpc specification to + call the services with their expected names. *) + + type ('request, 'request_mode, 'response, 'response_mode, 'a) handler + + (** The next functions are meant to be used by the client to handle + call to RPCs. *) + + val bidirectional_streaming : + f:('request Seq.writer -> 'response Seq.t -> 'a) -> + ( 'request, + Grpc.Rpc.Value_mode.stream, + 'response, + Grpc.Rpc.Value_mode.stream, + 'a ) + handler + + val client_streaming : + f:('request Seq.writer -> 'response option Eio.Promise.t -> 'a) -> + ( 'request, + Grpc.Rpc.Value_mode.stream, + 'response, + Grpc.Rpc.Value_mode.unary, + 'a ) + handler + + val server_streaming : + f:('response Seq.t -> 'a) -> + 'request -> + ( 'request, + Grpc.Rpc.Value_mode.unary, + 'response, + Grpc.Rpc.Value_mode.stream, + 'a ) + handler + + val unary : + f:('response option -> 'a) -> + 'request -> + ( 'request, + Grpc.Rpc.Value_mode.unary, + 'response, + Grpc.Rpc.Value_mode.unary, + 'a ) + handler + + val call : + ('request, 'request_mode, 'response, 'response_mode) Grpc.Rpc.Client_rpc.t -> + ?scheme:string -> + handler:('request, 'request_mode, 'response, 'response_mode, 'a) handler -> + do_request:do_request -> + ?headers:H2.Headers.t -> + unit -> + ('a * Grpc.Status.t, H2.Status.t) result + (** The rpc specification must be provided as it is used to handle + coding/decoding of messages as well as allows referring to the service + and RPC names specified in the [.proto] file. *) +end + +(** {1 Untyped API} *) + module Rpc : sig type 'a handler = H2.Body.Writer.t -> H2.Body.Reader.t -> 'a @@ -22,17 +100,20 @@ module Rpc : sig (** [unary ~f enc write read] sets up the sending and receiving logic using [write] and [read], then sends [enc] and calls [f] with a promise for the response. *) -end -type response_handler = H2.Client_connection.response_handler - -type do_request = - ?flush_headers_immediately:bool -> - ?trailers_handler:(H2.Headers.t -> unit) -> - H2.Request.t -> - response_handler:response_handler -> - H2.Body.Writer.t -(** [do_request] is the type of a function that performs the request *) + val call : + service:string -> + rpc:string -> + ?scheme:string -> + handler:'a handler -> + do_request:do_request -> + ?headers:H2.Headers.t -> + unit -> + ('a * Grpc.Status.t, H2.Status.t) result + (** [call ~service ~rpc ~handler ~do_request ()] calls the rpc endpoint given + by [service] and [rpc] using the [do_request] function. The [handler] is + called when this request is set up to send and receive data. *) +end val call : service:string -> @@ -43,6 +124,4 @@ val call : ?headers:H2.Headers.t -> unit -> ('a * Grpc.Status.t, H2.Status.t) result -(** [call ~service ~rpc ~handler ~do_request ()] calls the rpc endpoint given - by [service] and [rpc] using the [do_request] function. The [handler] is - called when this request is set up to send and receive data. *) +(** [@@deprecated "This function was renamed [Grpc_eio.Client.Rpc.call]."] *) diff --git a/lib/grpc-eio/connection.ml b/lib/grpc-eio/connection.ml index 3de3965..31f6930 100644 --- a/lib/grpc-eio/connection.ml +++ b/lib/grpc-eio/connection.ml @@ -1,23 +1,25 @@ -let grpc_recv_streaming body message_buffer_writer = +let grpc_recv_streaming ~decode body message_buffer_writer = let request_buffer = Grpc.Buffer.v () in let on_eof () = Seq.close_writer message_buffer_writer in let rec on_read buffer ~off ~len = Grpc.Buffer.copy_from_bigstringaf ~src_off:off ~src:buffer ~dst:request_buffer ~length:len; - Grpc.Message.extract_all (Seq.write message_buffer_writer) request_buffer; + Grpc.Message.extract_all + (fun message -> Seq.write message_buffer_writer (decode message)) + request_buffer; H2.Body.Reader.schedule_read body ~on_read ~on_eof in H2.Body.Reader.schedule_read body ~on_read ~on_eof -let grpc_send_streaming_client body encoder_stream = +let grpc_send_streaming_client ~encode body encoder_stream = Seq.iter (fun encoder -> - let payload = Grpc.Message.make encoder in + let payload = Grpc.Message.make (encode encoder) in H2.Body.Writer.write_string body payload) encoder_stream; H2.Body.Writer.close body -let grpc_send_streaming request encoder_stream status_promise = +let grpc_send_streaming ~encode request encoder_stream status_promise = let body = H2.Reqd.respond_with_streaming ~flush_headers_immediately:true request (H2.Response.create @@ -27,7 +29,7 @@ let grpc_send_streaming request encoder_stream status_promise = in Seq.iter (fun input -> - let payload = Grpc.Message.make input in + let payload = Grpc.Message.make (encode input) in H2.Body.Writer.write_string body payload; H2.Body.Writer.flush body (fun () -> ())) encoder_stream; diff --git a/lib/grpc-eio/connection.mli b/lib/grpc-eio/connection.mli new file mode 100644 index 0000000..fee84be --- /dev/null +++ b/lib/grpc-eio/connection.mli @@ -0,0 +1,12 @@ +val grpc_recv_streaming : + decode:(string -> 'a) -> H2.Body.Reader.t -> 'a Seq.writer -> unit + +val grpc_send_streaming_client : + encode:('a -> string) -> H2.Body.Writer.t -> 'a Seq.reader -> unit + +val grpc_send_streaming : + encode:('a -> string) -> + H2.Reqd.t -> + 'a Seq.reader -> + Grpc.Status.t Eio.Promise.t -> + unit diff --git a/lib/grpc-eio/server.ml b/lib/grpc-eio/server.ml index ffd850c..fff6503 100644 --- a/lib/grpc-eio/server.ml +++ b/lib/grpc-eio/server.ml @@ -1,6 +1,7 @@ module ServiceMap = Map.Make (String) -type service = H2.Reqd.t -> unit +type reqd_handler = H2.Reqd.t -> unit +type service = reqd_handler type t = service ServiceMap.t let v () = ServiceMap.empty @@ -44,6 +45,176 @@ let handle_request t reqd = | None -> respond_with `Unsupported_media_type) | _ -> respond_with `Not_found +let implement_rpc ~decode_request ~encode_response ~f reqd = + let body = H2.Reqd.request_body reqd in + let request_reader, request_writer = Seq.create_reader_writer () in + let response_reader, response_writer = Seq.create_reader_writer () in + Connection.grpc_recv_streaming ~decode:decode_request body request_writer; + let status_promise, status_notify = Eio.Promise.create () in + Eio.Fiber.both + (fun () -> + let respond = Seq.write response_writer in + let status = f request_reader respond in + Seq.close_writer response_writer; + Eio.Promise.resolve status_notify status) + (fun () -> + try + Connection.grpc_send_streaming ~encode:encode_response reqd + response_reader status_promise + with exn -> + (* https://github.com/anmonteiro/ocaml-h2/issues/175 *) + Eio.traceln "%s" (Printexc.to_string exn)) + +module Typed_rpc = struct + module Service = struct + module RpcMap = Map.Make (String) + + type t = reqd_handler RpcMap.t + + let v () = RpcMap.empty + let add_rpc ~name ~rpc t = RpcMap.add name rpc t + + let handle_request (t : t) reqd = + let request = H2.Reqd.request reqd in + let respond_with code = + H2.Reqd.respond_with_string reqd (H2.Response.create code) "" + in + let parts = String.split_on_char '/' request.target in + if List.length parts > 1 then + let rpc_name = List.nth parts (List.length parts - 1) in + match RpcMap.find_opt rpc_name t with + | Some rpc -> rpc reqd + | None -> respond_with `Not_found + else respond_with `Not_found + end + + type server = t + + type ('request, 'response) unary = + 'request -> Grpc.Status.t * 'response option + + type ('request, 'response) client_streaming = + 'request Seq.t -> Grpc.Status.t * 'response option + + type ('request, 'response) server_streaming = + 'request -> ('response -> unit) -> Grpc.Status.t + + type ('request, 'response) bidirectional_streaming = + 'request Seq.t -> ('response -> unit) -> Grpc.Status.t + + type 'service_spec t = + | T : { + rpc_spec : + ( 'request, + 'request_mode, + 'response, + 'response_mode, + 'service_spec ) + Grpc.Rpc.Server_rpc.t; + rpc_impl : reqd_handler; + } + -> 'service_spec t + + let rec make_handlers handlers = + match (handlers : _ Grpc.Rpc.Handlers.t) with + | a :: tl -> List.concat (make_handlers a :: List.map make_handlers tl) + | Handlers { handlers = ts } -> ts + | With_service_spec { service_spec; handlers = ts } -> + List.map + (fun (T t) -> + T + { + t with + rpc_spec = { t.rpc_spec with service_spec = Some service_spec }; + }) + ts + + let server handlers : server = + let handlers = make_handlers handlers in + List.fold_left + (fun map (T t as packed) -> + let service_name = + match t.rpc_spec.service_spec with + | Some service_spec -> + Grpc.Rpc.Service_spec.packaged_service_name service_spec + in + let rpc_impl = + ServiceMap.find_opt service_name map |> Option.value ~default:[] + in + ServiceMap.add service_name (packed :: rpc_impl) map) + ServiceMap.empty handlers + |> ServiceMap.map (fun ts -> + let service = + List.fold_left + (fun acc (T t) -> + Service.add_rpc ~name:t.rpc_spec.rpc_name ~rpc:t.rpc_impl acc) + (Service.v ()) ts + in + Service.handle_request service) + + let implement_rpc (type request response) + ~(rpc_spec : (request, _, response, _, _) Grpc.Rpc.Server_rpc.t) ~f = + let rpc_impl = + implement_rpc ~decode_request:rpc_spec.decode_request + ~encode_response:rpc_spec.encode_response ~f + in + T { rpc_spec; rpc_impl } + + let bidirectional_streaming (type request response) + (rpc_spec : + ( request, + Grpc.Rpc.Value_mode.stream, + response, + Grpc.Rpc.Value_mode.stream, + _ ) + Grpc.Rpc.Server_rpc.t) ~f = + implement_rpc ~rpc_spec ~f + + let unary (type request response) + (rpc_spec : + ( request, + Grpc.Rpc.Value_mode.unary, + response, + Grpc.Rpc.Value_mode.unary, + _ ) + Grpc.Rpc.Server_rpc.t) ~f = + implement_rpc ~rpc_spec ~f:(fun requests respond -> + match Seq.read_and_exhaust requests with + | None -> Grpc.Status.(v OK) + | Some request -> + let status, response = f request in + (match response with + | None -> () + | Some response -> respond response); + status) + + let server_streaming (type request response) + (rpc_spec : + ( request, + Grpc.Rpc.Value_mode.unary, + response, + Grpc.Rpc.Value_mode.stream, + _ ) + Grpc.Rpc.Server_rpc.t) ~f = + implement_rpc ~rpc_spec ~f:(fun requests respond -> + match Seq.read_and_exhaust requests with + | None -> Grpc.Status.(v OK) + | Some request -> f request respond) + + let client_streaming (type request response) + (rpc_spec : + ( request, + Grpc.Rpc.Value_mode.stream, + response, + Grpc.Rpc.Value_mode.unary, + _ ) + Grpc.Rpc.Server_rpc.t) ~f = + implement_rpc ~rpc_spec ~f:(fun requests respond -> + let status, response = f requests in + (match response with None -> () | Some response -> respond response); + status) +end + module Rpc = struct type unary = string -> Grpc.Status.t * string option type client_streaming = string Seq.t -> Grpc.Status.t * string option @@ -59,22 +230,7 @@ module Rpc = struct | Bidirectional_streaming of bidirectional_streaming let bidirectional_streaming ~f reqd = - let body = H2.Reqd.request_body reqd in - let request_reader, request_writer = Seq.create_reader_writer () in - let response_reader, response_writer = Seq.create_reader_writer () in - Connection.grpc_recv_streaming body request_writer; - let status_promise, status_notify = Eio.Promise.create () in - Eio.Fiber.both - (fun () -> - let respond = Seq.write response_writer in - let status = f request_reader respond in - Seq.close_writer response_writer; - Eio.Promise.resolve status_notify status) - (fun () -> - try Connection.grpc_send_streaming reqd response_reader status_promise - with exn -> - (* https://github.com/anmonteiro/ocaml-h2/issues/175 *) - Eio.traceln "%s" (Printexc.to_string exn)) + implement_rpc ~decode_request:Fun.id ~encode_response:Fun.id ~f reqd let client_streaming ~f reqd = bidirectional_streaming reqd ~f:(fun requests respond -> @@ -101,29 +257,15 @@ module Rpc = struct end module Service = struct - module RpcMap = Map.Make (String) - - type t = Rpc.t RpcMap.t - - let v () = RpcMap.empty - let add_rpc ~name ~rpc t = RpcMap.add name rpc t + include Typed_rpc.Service - let handle_request (t : t) reqd = - let request = H2.Reqd.request reqd in - let respond_with code = - H2.Reqd.respond_with_string reqd (H2.Response.create code) "" - in - let parts = String.split_on_char '/' request.target in - if List.length parts > 1 then - let rpc_name = List.nth parts (List.length parts - 1) in - let rpc = RpcMap.find_opt rpc_name t in - match rpc with - | Some rpc -> ( - match rpc with - | Unary f -> Rpc.unary ~f reqd - | Client_streaming f -> Rpc.client_streaming ~f reqd - | Server_streaming f -> Rpc.server_streaming ~f reqd - | Bidirectional_streaming f -> Rpc.bidirectional_streaming ~f reqd) - | None -> respond_with `Not_found - else respond_with `Not_found + let add_rpc ~name ~rpc t = + add_rpc ~name + ~rpc: + (match rpc with + | Rpc.Unary f -> Rpc.unary ~f + | Client_streaming f -> Rpc.client_streaming ~f + | Server_streaming f -> Rpc.server_streaming ~f + | Bidirectional_streaming f -> Rpc.bidirectional_streaming ~f) + t end diff --git a/lib/grpc-eio/server.mli b/lib/grpc-eio/server.mli index 40961f5..1f4be5a 100644 --- a/lib/grpc-eio/server.mli +++ b/lib/grpc-eio/server.mli @@ -1,5 +1,94 @@ include Grpc.Server.S +(** {1 Typed API} *) + +module Typed_rpc : sig + (** This is an experimental API to build RPCs on the server side. Compared to + {Rpc}, this interface will: + + - handle the coding/decoding of messages for you under the hood; + - use the service and RPC names provided by the rpc specification to + register the services with their expected names. *) + + type server := t + + type ('request, 'response) unary = + 'request -> Grpc.Status.t * 'response option + (** [unary] is the type for a unary grpc rpc, one request, one response. *) + + type ('request, 'response) client_streaming = + 'request Seq.t -> Grpc.Status.t * 'response option + (** [client_streaming] is the type for an rpc where the client streams the + requests and the server responds once. *) + + type ('request, 'response) server_streaming = + 'request -> ('response -> unit) -> Grpc.Status.t + (** [server_streaming] is the type for an rpc where the client sends one + request and the server sends multiple responses. *) + + type ('request, 'response) bidirectional_streaming = + 'request Seq.t -> ('response -> unit) -> Grpc.Status.t + (** [bidirectional_streaming] is the type for an rpc where both the client and + server can send multiple messages. *) + + type 'service_spec t + (** [t] represents an implementation for an RPC on the server side. *) + + (** The next functions are meant to be used by the server to create RPC + implementations. The rpc specification that the function implements must + be provided as it is used to handle coding/decoding of messages. It also + allows to refer to the service and RPC names specified in the [.proto] + file. *) + + val unary : + ( 'request, + Grpc.Rpc.Value_mode.unary, + 'response, + Grpc.Rpc.Value_mode.unary, + 'service_spec ) + Grpc.Rpc.Server_rpc.t -> + f:('request, 'response) unary -> + 'service_spec t + + val client_streaming : + ( 'request, + Grpc.Rpc.Value_mode.stream, + 'response, + Grpc.Rpc.Value_mode.unary, + 'service_spec ) + Grpc.Rpc.Server_rpc.t -> + f:('request, 'response) client_streaming -> + 'service_spec t + + val server_streaming : + ( 'request, + Grpc.Rpc.Value_mode.unary, + 'response, + Grpc.Rpc.Value_mode.stream, + 'service_spec ) + Grpc.Rpc.Server_rpc.t -> + f:('request, 'response) server_streaming -> + 'service_spec t + + val bidirectional_streaming : + ( 'request, + Grpc.Rpc.Value_mode.stream, + 'response, + Grpc.Rpc.Value_mode.stream, + 'service_spec ) + Grpc.Rpc.Server_rpc.t -> + f:('request, 'response) bidirectional_streaming -> + 'service_spec t + + val server : (Grpc.Rpc.Service_spec.t t, unit t) Grpc.Rpc.Handlers.t -> server + (** Having built a list of RPCs you will use this function to package them up + into a server that is ready to be served over the network. This function + takes care of registering the services based on the names provided by the + protoc specification. *) +end + +(** {1 Untyped API} *) + module Rpc : sig type unary = string -> Grpc.Status.t * string option (** [unary] is the type for a unary grpc rpc, one request, one response. *) diff --git a/lib/grpc-protoc-plugin/dune b/lib/grpc-protoc-plugin/dune new file mode 100644 index 0000000..900987e --- /dev/null +++ b/lib/grpc-protoc-plugin/dune @@ -0,0 +1,4 @@ +(library + (name grpc_protoc_plugin) + (public_name grpc-protoc-plugin) + (libraries grpc ocaml-protoc-plugin)) diff --git a/lib/grpc-protoc-plugin/grpc_protoc_plugin.ml b/lib/grpc-protoc-plugin/grpc_protoc_plugin.ml new file mode 100644 index 0000000..baea692 --- /dev/null +++ b/lib/grpc-protoc-plugin/grpc_protoc_plugin.ml @@ -0,0 +1,69 @@ +module type S = Ocaml_protoc_plugin.Service.Rpc + +let encode (type a) + (module M : Ocaml_protoc_plugin.Runtime.Runtime'.Service.Message + with type t = a) (a : a) = + a |> M.to_proto |> Ocaml_protoc_plugin.Runtime.Runtime'.Writer.contents + +let decode (type a) + (module M : Ocaml_protoc_plugin.Runtime.Runtime'.Service.Message + with type t = a) buffer = + buffer |> Ocaml_protoc_plugin.Runtime.Runtime'.Reader.create |> M.from_proto + |> function + | Ok r -> r + | Error e -> + failwith + (Printf.sprintf "Could not decode request: %s" + (Ocaml_protoc_plugin.Result.show_error e)) + +let service_spec (type request response) + (module R : S with type Request.t = request and type Response.t = response) + = + { + Grpc.Rpc.Service_spec.package = R.package_name |> Option.to_list; + service_name = R.service_name; + } + +module Client_rpc = struct + let make (type request response) + (module R : S with type Request.t = request and type Response.t = response) + ~request_mode ~response_mode = + { + Grpc.Rpc.Client_rpc.service_spec = service_spec (module R); + rpc_name = R.method_name; + encode_request = encode (module R.Request); + decode_response = decode (module R.Response); + request_mode; + response_mode; + } + + let unary rpc = make rpc ~request_mode:Unary ~response_mode:Unary + let client_streaming rpc = make rpc ~request_mode:Stream ~response_mode:Unary + let server_streaming rpc = make rpc ~request_mode:Unary ~response_mode:Stream + + let bidirectional_streaming rpc = + make rpc ~request_mode:Stream ~response_mode:Stream +end + +module Server_rpc = struct + let make (type request response) + (module R : S with type Request.t = request and type Response.t = response) + ~request_mode ~response_mode = + { + Grpc.Rpc.Server_rpc.service_spec = Some (service_spec (module R)); + rpc_name = R.method_name; + decode_request = decode (module R.Request); + encode_response = encode (module R.Response); + request_mode; + response_mode; + } + + let unary rpc = make rpc ~request_mode:Unary ~response_mode:Unary + let client_streaming rpc = make rpc ~request_mode:Stream ~response_mode:Unary + let server_streaming rpc = make rpc ~request_mode:Unary ~response_mode:Stream + + let bidirectional_streaming rpc = + make rpc ~request_mode:Stream ~response_mode:Stream +end + +let handlers handlers = Grpc.Rpc.Handlers.Handlers { handlers } diff --git a/lib/grpc-protoc-plugin/grpc_protoc_plugin.mli b/lib/grpc-protoc-plugin/grpc_protoc_plugin.mli new file mode 100644 index 0000000..7bd147f --- /dev/null +++ b/lib/grpc-protoc-plugin/grpc_protoc_plugin.mli @@ -0,0 +1,75 @@ +module type S = Ocaml_protoc_plugin.Service.Rpc + +module Client_rpc : sig + val unary : + (module S with type Request.t = 'request and type Response.t = 'response) -> + ( 'request, + Grpc.Rpc.Value_mode.unary, + 'response, + Grpc.Rpc.Value_mode.unary ) + Grpc.Rpc.Client_rpc.t + + val client_streaming : + (module S with type Request.t = 'request and type Response.t = 'response) -> + ( 'request, + Grpc.Rpc.Value_mode.stream, + 'response, + Grpc.Rpc.Value_mode.unary ) + Grpc.Rpc.Client_rpc.t + + val server_streaming : + (module S with type Request.t = 'request and type Response.t = 'response) -> + ( 'request, + Grpc.Rpc.Value_mode.unary, + 'response, + Grpc.Rpc.Value_mode.stream ) + Grpc.Rpc.Client_rpc.t + + val bidirectional_streaming : + (module S with type Request.t = 'request and type Response.t = 'response) -> + ( 'request, + Grpc.Rpc.Value_mode.stream, + 'response, + Grpc.Rpc.Value_mode.stream ) + Grpc.Rpc.Client_rpc.t +end + +module Server_rpc : sig + val unary : + (module S with type Request.t = 'request and type Response.t = 'response) -> + ( 'request, + Grpc.Rpc.Value_mode.unary, + 'response, + Grpc.Rpc.Value_mode.unary, + Grpc.Rpc.Service_spec.t ) + Grpc.Rpc.Server_rpc.t + + val client_streaming : + (module S with type Request.t = 'request and type Response.t = 'response) -> + ( 'request, + Grpc.Rpc.Value_mode.stream, + 'response, + Grpc.Rpc.Value_mode.unary, + Grpc.Rpc.Service_spec.t ) + Grpc.Rpc.Server_rpc.t + + val server_streaming : + (module S with type Request.t = 'request and type Response.t = 'response) -> + ( 'request, + Grpc.Rpc.Value_mode.unary, + 'response, + Grpc.Rpc.Value_mode.stream, + Grpc.Rpc.Service_spec.t ) + Grpc.Rpc.Server_rpc.t + + val bidirectional_streaming : + (module S with type Request.t = 'request and type Response.t = 'response) -> + ( 'request, + Grpc.Rpc.Value_mode.stream, + 'response, + Grpc.Rpc.Value_mode.stream, + Grpc.Rpc.Service_spec.t ) + Grpc.Rpc.Server_rpc.t +end + +val handlers : 'a list -> ('a, _) Grpc.Rpc.Handlers.t diff --git a/lib/grpc-protoc/dune b/lib/grpc-protoc/dune new file mode 100644 index 0000000..14a98b6 --- /dev/null +++ b/lib/grpc-protoc/dune @@ -0,0 +1,4 @@ +(library + (name grpc_protoc) + (public_name grpc-protoc) + (libraries grpc ocaml-protoc pbrt pbrt_services)) diff --git a/lib/grpc-protoc/grpc_protoc.ml b/lib/grpc-protoc/grpc_protoc.ml new file mode 100644 index 0000000..5ba88aa --- /dev/null +++ b/lib/grpc-protoc/grpc_protoc.ml @@ -0,0 +1,55 @@ +let encode (type a) (encode : a -> Pbrt.Encoder.t -> unit) (a : a) = + let encoder = Pbrt.Encoder.create () in + encode a encoder; + Pbrt.Encoder.to_string encoder + +let decode (type a) (decode : Pbrt.Decoder.t -> a) buffer = + let decoder = Pbrt.Decoder.of_string buffer in + decode decoder + +module Client_rpc = struct + let make (type request response) + (rpc : (request, _, response, _) Pbrt_services.Client.rpc) ~request_mode + ~response_mode = + { + Grpc.Rpc.Client_rpc.service_spec = + { package = rpc.package; service_name = rpc.service_name }; + rpc_name = rpc.rpc_name; + encode_request = encode rpc.encode_pb_req; + decode_response = decode rpc.decode_pb_res; + request_mode; + response_mode; + } + + let unary rpc = make rpc ~request_mode:Unary ~response_mode:Unary + let client_streaming rpc = make rpc ~request_mode:Stream ~response_mode:Unary + let server_streaming rpc = make rpc ~request_mode:Unary ~response_mode:Stream + + let bidirectional_streaming rpc = + make rpc ~request_mode:Stream ~response_mode:Stream +end + +module Server_rpc = struct + let make (type request response) + (rpc : (request, _, response, _) Pbrt_services.Server.rpc) ~request_mode + ~response_mode = + { + Grpc.Rpc.Server_rpc.service_spec = None; + rpc_name = rpc.name; + decode_request = decode rpc.decode_pb_req; + encode_response = encode rpc.encode_pb_res; + request_mode; + response_mode; + } + + let unary rpc = make rpc ~request_mode:Unary ~response_mode:Unary + let client_streaming rpc = make rpc ~request_mode:Stream ~response_mode:Unary + let server_streaming rpc = make rpc ~request_mode:Unary ~response_mode:Stream + + let bidirectional_streaming rpc = + make rpc ~request_mode:Stream ~response_mode:Stream +end + +let handlers { Pbrt_services.Server.package; service_name; handlers } = + Grpc.Rpc.Handlers.With_service_spec + { service_spec = { package; service_name }; handlers } diff --git a/lib/grpc-protoc/grpc_protoc.mli b/lib/grpc-protoc/grpc_protoc.mli new file mode 100644 index 0000000..6390a10 --- /dev/null +++ b/lib/grpc-protoc/grpc_protoc.mli @@ -0,0 +1,105 @@ +module Client_rpc : sig + val unary : + ( 'request, + Pbrt_services.Value_mode.unary, + 'response, + Pbrt_services.Value_mode.unary ) + Pbrt_services.Client.rpc -> + ( 'request, + Grpc.Rpc.Value_mode.unary, + 'response, + Grpc.Rpc.Value_mode.unary ) + Grpc.Rpc.Client_rpc.t + + val client_streaming : + ( 'request, + Pbrt_services.Value_mode.stream, + 'response, + Pbrt_services.Value_mode.unary ) + Pbrt_services.Client.rpc -> + ( 'request, + Grpc.Rpc.Value_mode.stream, + 'response, + Grpc.Rpc.Value_mode.unary ) + Grpc.Rpc.Client_rpc.t + + val server_streaming : + ( 'request, + Pbrt_services.Value_mode.unary, + 'response, + Pbrt_services.Value_mode.stream ) + Pbrt_services.Client.rpc -> + ( 'request, + Grpc.Rpc.Value_mode.unary, + 'response, + Grpc.Rpc.Value_mode.stream ) + Grpc.Rpc.Client_rpc.t + + val bidirectional_streaming : + ( 'request, + Pbrt_services.Value_mode.stream, + 'response, + Pbrt_services.Value_mode.stream ) + Pbrt_services.Client.rpc -> + ( 'request, + Grpc.Rpc.Value_mode.stream, + 'response, + Grpc.Rpc.Value_mode.stream ) + Grpc.Rpc.Client_rpc.t +end + +module Server_rpc : sig + val unary : + ( 'request, + Pbrt_services.Value_mode.unary, + 'response, + Pbrt_services.Value_mode.unary ) + Pbrt_services.Server.rpc -> + ( 'request, + Grpc.Rpc.Value_mode.unary, + 'response, + Grpc.Rpc.Value_mode.unary, + unit ) + Grpc.Rpc.Server_rpc.t + + val client_streaming : + ( 'request, + Pbrt_services.Value_mode.stream, + 'response, + Pbrt_services.Value_mode.unary ) + Pbrt_services.Server.rpc -> + ( 'request, + Grpc.Rpc.Value_mode.stream, + 'response, + Grpc.Rpc.Value_mode.unary, + unit ) + Grpc.Rpc.Server_rpc.t + + val server_streaming : + ( 'request, + Pbrt_services.Value_mode.unary, + 'response, + Pbrt_services.Value_mode.stream ) + Pbrt_services.Server.rpc -> + ( 'request, + Grpc.Rpc.Value_mode.unary, + 'response, + Grpc.Rpc.Value_mode.stream, + unit ) + Grpc.Rpc.Server_rpc.t + + val bidirectional_streaming : + ( 'request, + Pbrt_services.Value_mode.stream, + 'response, + Pbrt_services.Value_mode.stream ) + Pbrt_services.Server.rpc -> + ( 'request, + Grpc.Rpc.Value_mode.stream, + 'response, + Grpc.Rpc.Value_mode.stream, + unit ) + Grpc.Rpc.Server_rpc.t +end + +val handlers : 'a Pbrt_services.Server.t -> (_, 'a) Grpc.Rpc.Handlers.t diff --git a/lib/grpc/grpc.ml b/lib/grpc/grpc.ml index 00ca697..c84744b 100644 --- a/lib/grpc/grpc.ml +++ b/lib/grpc/grpc.ml @@ -2,3 +2,4 @@ module Server = Server module Status = Status module Message = Message module Buffer = Buffer +module Rpc = Rpc diff --git a/lib/grpc/rpc.ml b/lib/grpc/rpc.ml new file mode 100644 index 0000000..487cf7e --- /dev/null +++ b/lib/grpc/rpc.ml @@ -0,0 +1,47 @@ +type buffer = string + +module Value_mode = struct + type unary + type stream + type _ t = Unary : unary t | Stream : stream t +end + +module Service_spec = struct + type t = { package : string list; service_name : string } + + let packaged_service_name t = + String.concat "." (t.package @ [ t.service_name ]) +end + +module Handlers = struct + type ('a, 'b) t = + | Handlers of { handlers : 'a list } + | With_service_spec of { handlers : 'b list; service_spec : Service_spec.t } + | ( :: ) of ('a, 'b) t * ('a, 'b) t list +end + +module Client_rpc = struct + type ('request, 'request_mode, 'response, 'response_mode) t = { + service_spec : Service_spec.t; + rpc_name : string; + encode_request : 'request -> buffer; + decode_response : buffer -> 'response; + request_mode : 'request_mode Value_mode.t; + response_mode : 'response_mode Value_mode.t; + } +end + +module Server_rpc = struct + module Service_spec = struct + type 'a t = None : unit t | Some : Service_spec.t -> Service_spec.t t + end + + type ('request, 'request_mode, 'response, 'response_mode, 'service_spec) t = { + service_spec : 'service_spec Service_spec.t; + rpc_name : string; + decode_request : buffer -> 'request; + encode_response : 'response -> buffer; + request_mode : 'request_mode Value_mode.t; + response_mode : 'response_mode Value_mode.t; + } +end diff --git a/lib/grpc/rpc.mli b/lib/grpc/rpc.mli new file mode 100644 index 0000000..4f86232 --- /dev/null +++ b/lib/grpc/rpc.mli @@ -0,0 +1,48 @@ +type buffer = string + +(** Exploring a separate client/server api that works better with [ocaml-protoc]. *) + +module Value_mode : sig + type unary + type stream + type _ t = Unary : unary t | Stream : stream t +end + +module Service_spec : sig + type t = { package : string list; service_name : string } + + val packaged_service_name : t -> string +end + +module Handlers : sig + type ('a, 'b) t = + | Handlers of { handlers : 'a list } + | With_service_spec of { handlers : 'b list; service_spec : Service_spec.t } + | ( :: ) of ('a, 'b) t * ('a, 'b) t list +end + +module Client_rpc : sig + type ('request, 'request_mode, 'response, 'response_mode) t = { + service_spec : Service_spec.t; + rpc_name : string; + encode_request : 'request -> buffer; + decode_response : buffer -> 'response; + request_mode : 'request_mode Value_mode.t; + response_mode : 'response_mode Value_mode.t; + } +end + +module Server_rpc : sig + module Service_spec : sig + type 'a t = None : unit t | Some : Service_spec.t -> Service_spec.t t + end + + type ('request, 'request_mode, 'response, 'response_mode, 'service_spec) t = { + service_spec : 'service_spec Service_spec.t; + rpc_name : string; + decode_request : buffer -> 'request; + encode_response : 'response -> buffer; + request_mode : 'request_mode Value_mode.t; + response_mode : 'response_mode Value_mode.t; + } +end