diff --git a/.cljfmt.edn b/.cljfmt.edn index fd3e04d..7b61c44 100644 --- a/.cljfmt.edn +++ b/.cljfmt.edn @@ -3,8 +3,8 @@ :remove-consecutive-blank-lines? true :insert-missing-whitespace? true :align-associative? false - :indents {#re "^(?!catch-kondo-errors).*" [[:block 0]] - catch-kondo-errors [[:inner 0]]} + :extra-indents {#re "^(?!catch-kondo-errors).*" [[:block 0]] + catch-kondo-errors [[:inner 0]]} :test-code (comment (:require diff --git a/CHANGELOG.md b/CHANGELOG.md index 8dcb3de..2f02af1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,10 @@ ## Unreleased +- Add a `:response-executor` option to control on which thread responses to + server-initiated requests are run, defaulting to Promesa's `:default` + executor, i.e. `ForkJoinPool/commonPool`. + ## v1.10.0 - Add `textDocument/foldingRange` schemas. diff --git a/README.md b/README.md index 3722a52..d440db7 100644 --- a/README.md +++ b/README.md @@ -120,6 +120,10 @@ Alternatively, you can convert the request to a promesa promise, and handle it u In this case `(promesa/cancel! request)` will send `$/cancelRequest`. +Response promises are completed on Promesa's `:default` executor. You +can specify your own executor by passing the `:response-executor` option +when creating your server instance. + ### Start and stop a server The last step is to start the server you created earlier. Use `lsp4clj.server/start`. This method accepts two arguments, the server and a "context". diff --git a/deps.edn b/deps.edn index 05dcf53..04d3135 100644 --- a/deps.edn +++ b/deps.edn @@ -4,7 +4,8 @@ cheshire/cheshire {:mvn/version "5.11.0"} funcool/promesa {:mvn/version "10.0.594"}} :paths ["src" "resources"] - :aliases {:test {:extra-deps {lambdaisland/kaocha {:mvn/version "1.64.1010"}} + :aliases {:dev {:extra-paths ["test"]} + :test {:extra-deps {lambdaisland/kaocha {:mvn/version "1.64.1010"}} :extra-paths ["test"] :main-opts ["-m" "kaocha.runner"]} :build {:extra-paths ["resources"] diff --git a/src/lsp4clj/server.clj b/src/lsp4clj/server.clj index ddb0ba7..843b712 100644 --- a/src/lsp4clj/server.clj +++ b/src/lsp4clj/server.clj @@ -9,6 +9,7 @@ [lsp4clj.protocols.endpoint :as protocols.endpoint] [lsp4clj.trace :as trace] [promesa.core :as p] + [promesa.exec :as p.exec] [promesa.protocols :as p.protocols]) (:import (java.util.concurrent CancellationException))) @@ -95,7 +96,7 @@ ;; client. This cannot be `(-> (p/deferred) (p/catch))` because that returns ;; a promise which, when cancelled, does nothing because there's no ;; exception handler chained onto it. Instead, we must cancel the - ;; `(p/deffered)` promise itself. + ;; `(p/deferred)` promise itself. (p/catch p CancellationException (fn [_] (protocols.endpoint/send-notification server "$/cancelRequest" {:id id}))) @@ -196,6 +197,7 @@ trace-ch tracer* ^java.time.Clock clock + response-executor on-close request-id* pending-sent-requests* @@ -351,9 +353,19 @@ (if-let [{:keys [p started] :as req} (get pending-requests id)] (do (trace this trace/received-response req resp started now) - (if error - (p/reject! p (ex-info "Received error response" resp)) - (p/resolve! p result))) + ;; Note that we are called from the server's pipeline, a core.async + ;; go-loop, and therefore must not block. Callbacks of the pending + ;; request's promise (`p`) will be executed in the completing + ;; thread, whatever that thread is. Since the callbacks are not + ;; under our control, they are under our users' control, they could + ;; block. Therefore, we do not want the completing thread to be our + ;; thread. This is very easy for users to miss, therefore we + ;; complete the promise using an explicit executor. + (p.exec/submit! response-executor + (fn [] + (if error + (p/reject! p (ex-info "Received error response" resp)) + (p/resolve! p result))))) (trace this trace/received-unmatched-response resp now))) (catch Throwable e (log-error-receiving this e resp)))) @@ -410,9 +422,10 @@ (update server :tracer* reset! (trace/tracer-for-level trace-level))) (defn chan-server - [{:keys [output-ch input-ch log-ch trace? trace-level trace-ch clock on-close] + [{:keys [output-ch input-ch log-ch trace? trace-level trace-ch clock on-close response-executor] :or {clock (java.time.Clock/systemDefaultZone) - on-close (constantly nil)}}] + on-close (constantly nil) + response-executor :default}}] (let [;; before defaulting trace-ch, so that default is "off" tracer (trace/tracer-for-level (or trace-level (when (or trace? trace-ch) "verbose") @@ -427,6 +440,7 @@ :tracer* (atom tracer) :clock clock :on-close on-close + :response-executor response-executor :request-id* (atom 0) :pending-sent-requests* (atom {}) :pending-received-requests* (atom {}) diff --git a/test/lsp4clj/server_test.clj b/test/lsp4clj/server_test.clj index cf88638..3b3a3f0 100644 --- a/test/lsp4clj/server_test.clj +++ b/test/lsp4clj/server_test.clj @@ -481,6 +481,69 @@ (h/assert-take output-ch))) (server/shutdown server)))) +(defn- core-async-dispatch-thread? [^Thread thread] + (re-matches #"async-dispatch-\d+" (.getName thread))) + +(deftest can-determine-core-async-dispatch-thread + (testing "current thread" + (is (not (core-async-dispatch-thread? (Thread/currentThread))))) + (testing "thread running go blocks" + (let [thread (async/ (server/send-request server "req" {:body "foo"}) + (p/then (fn [_] (Thread/currentThread)))) + client-rcvd-msg (h/assert-take output-ch) + _ (async/put! input-ch (lsp.responses/response (:id client-rcvd-msg) {:result "good"})) + thread (deref thread-p 100 nil)] + (is (not (core-async-dispatch-thread? thread))) + (is (instance? java.util.concurrent.ForkJoinWorkerThread thread) + "completes on default ForkJoinPool executor") + (server/shutdown server))) + (testing "exceptional completion" + (let [input-ch (async/chan 3) + output-ch (async/chan 3) + server (server/chan-server {:output-ch output-ch + :input-ch input-ch}) + _ (server/start server nil) + thread-p (-> (server/send-request server "req" {:body "foo"}) + (p/catch (fn [_] (Thread/currentThread)))) + client-rcvd-msg (h/assert-take output-ch) + _ (async/put! input-ch + (-> (lsp.responses/response (:id client-rcvd-msg)) + (lsp.responses/error {:code 1234 + :message "Something bad" + :data {:body "foo"}}))) + thread (deref thread-p 100 nil)] + (is (not (core-async-dispatch-thread? thread))) + (is (instance? java.util.concurrent.ForkJoinWorkerThread thread) + "completes on default ForkJoinPool executor") + (server/shutdown server))) + (testing "completion with :current-thread executor for legacy behavior" + (let [input-ch (async/chan 3) + output-ch (async/chan 3) + server (server/chan-server {:output-ch output-ch + :input-ch input-ch + :response-executor :current-thread}) + _ (server/start server nil) + thread-p (-> (server/send-request server "req" {:body "foo"}) + (p/then (fn [_] (Thread/currentThread)))) + client-rcvd-msg (h/assert-take output-ch) + _ (async/put! input-ch (lsp.responses/response (:id client-rcvd-msg) {:result "good"})) + thread (deref thread-p 100 nil)] + (is (core-async-dispatch-thread? thread) "completes on core.async dispatch thread") + (server/shutdown server)))) + (def fixed-clock (-> (java.time.LocalDateTime/of 2022 03 05 13 35 23 0) (.toInstant java.time.ZoneOffset/UTC)