-
Notifications
You must be signed in to change notification settings - Fork 20
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Initial Eio port #284
Initial Eio port #284
Conversation
c6898a8
to
6f553aa
Compare
56d4530
to
2b4caf2
Compare
This switches capnp-rpc from Lwt to Eio. One particularly nice side effect of this is that `Service.return_lwt` has gone, as there is no distinction now between concurrent and non-concurrent service methods.
| Ok (`Data data) -> | ||
Log.debug (fun f -> f "Read %d bytes" (Cstruct.length data)); | ||
Capnp.Codecs.FramedStream.add_fragment t.decoder (Cstruct.to_string data); | ||
let buf = Cstruct.create 4096 in (* TODO: make this efficient *) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: there will be a follow-on PR replacing most of endpoint.ml with a buffered system.
@@ -8,36 +7,37 @@ let () = | |||
Logs.set_reporter (Logs_fmt.reporter ()) | |||
|
|||
let run_client service = | |||
let n = 100000 in | |||
(* let n = 100000 in *) (* XXX: improve speed *) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will be fixed in a follow-on PR.
If an application forgets to release a resource and it gets GC'd then we want to | ||
log a warning and clean up (so forgotten refs don't build up over time). | ||
|
||
Because GC finalizers can run at any time and from any thread, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Lwt version didn't handle the case of a finalizer running from a different thread. This is more important with multiple domains too.
(* todo: we stop waiting and we send a finish message, but we don't currently | ||
abort the service operation. *) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Lwt version doesn't cancel service operations either, so this isn't a regression.
Capability.await_settled_exn service >>= fun () -> | ||
Capability.await_settled_exn service2 >>= fun () -> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Depending on timing, these could fail if the vat is shut down before returning the results. So, use the non-exn version.
Lwt.wakeup set_service @@ Capnp_rpc_net.Restorer.grant @@ Echo.local (); | ||
service >>= fun _ -> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed this to have the test wait for the disconnected error before letting the server continue to ensure it really is testing this case.
val add_connection : t -> switch:Lwt_switch.t -> mode:[`Accept|`Connect] -> Endpoint.t -> CapTP.t Lwt.t | ||
(** [add_connection t ~switch ~mode endpoint] runs the CapTP protocol over [endpoint], | ||
val run_connection : t -> mode:[`Accept|`Connect] -> Endpoint.t -> (CapTP.t -> unit) -> unit | ||
(** [run_connection t ~mode endpoint r] runs the protocol over [endpoint], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This simplifies FD management, since endpoint can always be closed when the function returns.
Lwt_switch.add_hook switch (fun () -> | ||
let ex = Capnp_rpc.Exception.v ~ty:`Disconnected "Vat shut down" in | ||
ID_map.bindings t.connections |> Lwt_list.iter_p (fun (_, c) -> CapTP.disconnect c ex) >>= fun () -> | ||
t.connections <- ID_map.empty; | ||
Lwt_list.iter_p (fun c -> CapTP.disconnect c ex) t.anon_connections >|= fun () -> | ||
t.anon_connections <- []; | ||
ID_map.iter (fun _ th -> Lwt.cancel th) t.connecting; | ||
t.connecting <- ID_map.empty; | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's no need to clean up the connections now, because each connection has an owning daemon fiber that will close it when it ends.
let connect_anon t addr ~service = | ||
let switch = Lwt_switch.create () in | ||
Network.connect t.network ~switch ~secret_key:t.secret_key addr >>= function | ||
| Error (`Msg m) -> Lwt.return @@ Error (Capnp_rpc.Exception.v m) | ||
| Ok ep -> | ||
add_connection t ~switch ep ~mode:`Connect >|= fun conn -> | ||
Ok (CapTP.bootstrap conn service) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic got merged into initiate_connection
.
|
This switches capnp-rpc from Lwt to Eio. One particularly nice side effect of this is that
Service.return_lwt
has gone, as there is no distinction now between concurrent and non-concurrent service methods.#256 was supposed to be the initial support, but extra things got added to it and now people are using that branch, so this is a new "minimal" PR.
Once this is merged, some follow-on PRs will be needed: