Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Complete pending server-initiated request promises on default executor #43

Merged
merged 7 commits into from
Aug 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .cljfmt.edn
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
:remove-consecutive-blank-lines? true
:insert-missing-whitespace? true
:align-associative? false
:indents {#re "^(?!catch-kondo-errors).*" [[:block 0]]
catch-kondo-errors [[:inner 0]]}
:extra-indents {#re "^(?!catch-kondo-errors).*" [[:block 0]]
catch-kondo-errors [[:inner 0]]}
:test-code
(comment
(:require
Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## Unreleased

- Add a `:response-executor` option to control on which thread responses to
server-initiated requests are run, defaulting to Promesa's `:default`
executor, i.e. `ForkJoinPool/commonPool`.

## v1.10.0

- Add `textDocument/foldingRange` schemas.
Expand Down
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ Alternatively, you can convert the request to a promesa promise, and handle it u

In this case `(promesa/cancel! request)` will send `$/cancelRequest`.

Response promises are completed on Promesa's `:default` executor. You
can specify your own executor by passing the `:response-executor` option
when creating your server instance.

### Start and stop a server

The last step is to start the server you created earlier. Use `lsp4clj.server/start`. This method accepts two arguments, the server and a "context".
Expand Down
3 changes: 2 additions & 1 deletion deps.edn
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
cheshire/cheshire {:mvn/version "5.11.0"}
funcool/promesa {:mvn/version "10.0.594"}}
:paths ["src" "resources"]
:aliases {:test {:extra-deps {lambdaisland/kaocha {:mvn/version "1.64.1010"}}
:aliases {:dev {:extra-paths ["test"]}
:test {:extra-deps {lambdaisland/kaocha {:mvn/version "1.64.1010"}}
:extra-paths ["test"]
:main-opts ["-m" "kaocha.runner"]}
:build {:extra-paths ["resources"]
Expand Down
26 changes: 20 additions & 6 deletions src/lsp4clj/server.clj
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
[lsp4clj.protocols.endpoint :as protocols.endpoint]
[lsp4clj.trace :as trace]
[promesa.core :as p]
[promesa.exec :as p.exec]
[promesa.protocols :as p.protocols])
(:import
(java.util.concurrent CancellationException)))
Expand Down Expand Up @@ -95,7 +96,7 @@
;; client. This cannot be `(-> (p/deferred) (p/catch))` because that returns
;; a promise which, when cancelled, does nothing because there's no
;; exception handler chained onto it. Instead, we must cancel the
;; `(p/deffered)` promise itself.
;; `(p/deferred)` promise itself.
(p/catch p CancellationException
(fn [_]
(protocols.endpoint/send-notification server "$/cancelRequest" {:id id})))
Expand Down Expand Up @@ -196,6 +197,7 @@
trace-ch
tracer*
^java.time.Clock clock
response-executor
on-close
request-id*
pending-sent-requests*
Expand Down Expand Up @@ -351,9 +353,19 @@
(if-let [{:keys [p started] :as req} (get pending-requests id)]
(do
(trace this trace/received-response req resp started now)
(if error
(p/reject! p (ex-info "Received error response" resp))
(p/resolve! p result)))
;; Note that we are called from the server's pipeline, a core.async
;; go-loop, and therefore must not block. Callbacks of the pending
;; request's promise (`p`) will be executed in the completing
;; thread, whatever that thread is. Since the callbacks are not
;; under our control, they are under our users' control, they could
;; block. Therefore, we do not want the completing thread to be our
;; thread. This is very easy for users to miss, therefore we
;; complete the promise using an explicit executor.
(p.exec/submit! response-executor
(fn []
(if error
(p/reject! p (ex-info "Received error response" resp))
(p/resolve! p result)))))
(trace this trace/received-unmatched-response resp now)))
(catch Throwable e
(log-error-receiving this e resp))))
Expand Down Expand Up @@ -410,9 +422,10 @@
(update server :tracer* reset! (trace/tracer-for-level trace-level)))

(defn chan-server
[{:keys [output-ch input-ch log-ch trace? trace-level trace-ch clock on-close]
[{:keys [output-ch input-ch log-ch trace? trace-level trace-ch clock on-close response-executor]
:or {clock (java.time.Clock/systemDefaultZone)
on-close (constantly nil)}}]
on-close (constantly nil)
response-executor :default}}]
(let [;; before defaulting trace-ch, so that default is "off"
tracer (trace/tracer-for-level (or trace-level
(when (or trace? trace-ch) "verbose")
Expand All @@ -427,6 +440,7 @@
:tracer* (atom tracer)
:clock clock
:on-close on-close
:response-executor response-executor
:request-id* (atom 0)
:pending-sent-requests* (atom {})
:pending-received-requests* (atom {})
Expand Down
63 changes: 63 additions & 0 deletions test/lsp4clj/server_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,69 @@
(h/assert-take output-ch)))
(server/shutdown server))))

(defn- core-async-dispatch-thread? [^Thread thread]
(re-matches #"async-dispatch-\d+" (.getName thread)))

(deftest can-determine-core-async-dispatch-thread
(testing "current thread"
(is (not (core-async-dispatch-thread? (Thread/currentThread)))))
(testing "thread running go blocks"
(let [thread (async/<!! (async/go (Thread/currentThread)))]
(is (core-async-dispatch-thread? thread))))
(testing "thread running core.async thread macro"
(let [thread (async/<!! (async/thread (Thread/currentThread)))]
(is (not (core-async-dispatch-thread? thread))))))

(deftest request-should-complete-on-a-suitable-executor
(testing "successful completion"
(let [input-ch (async/chan 3)
output-ch (async/chan 3)
server (server/chan-server {:output-ch output-ch
:input-ch input-ch})
_ (server/start server nil)
thread-p (-> (server/send-request server "req" {:body "foo"})
(p/then (fn [_] (Thread/currentThread))))
client-rcvd-msg (h/assert-take output-ch)
_ (async/put! input-ch (lsp.responses/response (:id client-rcvd-msg) {:result "good"}))
thread (deref thread-p 100 nil)]
(is (not (core-async-dispatch-thread? thread)))
(is (instance? java.util.concurrent.ForkJoinWorkerThread thread)
"completes on default ForkJoinPool executor")
(server/shutdown server)))
(testing "exceptional completion"
(let [input-ch (async/chan 3)
output-ch (async/chan 3)
server (server/chan-server {:output-ch output-ch
:input-ch input-ch})
_ (server/start server nil)
thread-p (-> (server/send-request server "req" {:body "foo"})
(p/catch (fn [_] (Thread/currentThread))))
client-rcvd-msg (h/assert-take output-ch)
_ (async/put! input-ch
(-> (lsp.responses/response (:id client-rcvd-msg))
(lsp.responses/error {:code 1234
:message "Something bad"
:data {:body "foo"}})))
thread (deref thread-p 100 nil)]
(is (not (core-async-dispatch-thread? thread)))
(is (instance? java.util.concurrent.ForkJoinWorkerThread thread)
"completes on default ForkJoinPool executor")
(server/shutdown server)))
(testing "completion with :current-thread executor for legacy behavior"
(let [input-ch (async/chan 3)
output-ch (async/chan 3)
server (server/chan-server {:output-ch output-ch
:input-ch input-ch
:response-executor :current-thread})
_ (server/start server nil)
thread-p (-> (server/send-request server "req" {:body "foo"})
(p/then (fn [_] (Thread/currentThread))))
client-rcvd-msg (h/assert-take output-ch)
_ (async/put! input-ch (lsp.responses/response (:id client-rcvd-msg) {:result "good"}))
thread (deref thread-p 100 nil)]
(is (core-async-dispatch-thread? thread) "completes on core.async dispatch thread")
(server/shutdown server))))

(def fixed-clock
(-> (java.time.LocalDateTime/of 2022 03 05 13 35 23 0)
(.toInstant java.time.ZoneOffset/UTC)
Expand Down