From fe73a2cff34b18d3f745191b8197a7818d96a3f6 Mon Sep 17 00:00:00 2001 From: Ferdinand Beyer Date: Fri, 19 Jul 2024 13:47:35 +0200 Subject: [PATCH 1/7] Fix `.cljfmt.edn` --- .cljfmt.edn | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.cljfmt.edn b/.cljfmt.edn index fd3e04d..7b61c44 100644 --- a/.cljfmt.edn +++ b/.cljfmt.edn @@ -3,8 +3,8 @@ :remove-consecutive-blank-lines? true :insert-missing-whitespace? true :align-associative? false - :indents {#re "^(?!catch-kondo-errors).*" [[:block 0]] - catch-kondo-errors [[:inner 0]]} + :extra-indents {#re "^(?!catch-kondo-errors).*" [[:block 0]] + catch-kondo-errors [[:inner 0]]} :test-code (comment (:require From 85d1a8a95967f97fd46c29c81db2ed66aaf1e025 Mon Sep 17 00:00:00 2001 From: Ferdinand Beyer Date: Fri, 19 Jul 2024 13:49:54 +0200 Subject: [PATCH 2/7] Complete pending request promise on default executor --- src/lsp4clj/server.clj | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/src/lsp4clj/server.clj b/src/lsp4clj/server.clj index ddb0ba7..62021a8 100644 --- a/src/lsp4clj/server.clj +++ b/src/lsp4clj/server.clj @@ -95,7 +95,7 @@ ;; client. This cannot be `(-> (p/deferred) (p/catch))` because that returns ;; a promise which, when cancelled, does nothing because there's no ;; exception handler chained onto it. Instead, we must cancel the - ;; `(p/deffered)` promise itself. + ;; `(p/deferred)` promise itself. (p/catch p CancellationException (fn [_] (protocols.endpoint/send-notification server "$/cancelRequest" {:id id}))) @@ -351,9 +351,16 @@ (if-let [{:keys [p started] :as req} (get pending-requests id)] (do (trace this trace/received-response req resp started now) - (if error - (p/reject! p (ex-info "Received error response" resp)) - (p/resolve! p result))) + ;; Note that we are called from the server's pipeline, a core.async + ;; go-loop, and therefore must not block. Callbacks of the pending + ;; request's promise will be executed in the completing thread, + ;; which should not be our thread. This is very easy for users to + ;; miss, therefore we complete the promise on the default executor. + (p/thread-call :default + (fn [] + (if error + (p/reject! p (ex-info "Received error response" resp)) + (p/resolve! p result))))) (trace this trace/received-unmatched-response resp now))) (catch Throwable e (log-error-receiving this e resp)))) From 6add493783030ba59c3a87a29f8f5127acad4d77 Mon Sep 17 00:00:00 2001 From: Ferdinand Beyer Date: Thu, 25 Jul 2024 19:56:25 +0200 Subject: [PATCH 3/7] Improve comment Co-authored-by: Jacob Maine --- src/lsp4clj/server.clj | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/lsp4clj/server.clj b/src/lsp4clj/server.clj index 62021a8..bd11072 100644 --- a/src/lsp4clj/server.clj +++ b/src/lsp4clj/server.clj @@ -352,9 +352,12 @@ (do (trace this trace/received-response req resp started now) ;; Note that we are called from the server's pipeline, a core.async - ;; go-loop, and therefore must not block. Callbacks of the pending - ;; request's promise will be executed in the completing thread, - ;; which should not be our thread. This is very easy for users to + ;; go-loop, and therefore must not block. Callbacks of the pending + ;; request's promise (`p`) will be executed in the completing + ;; thread, whatever that thread is. Since the callbacks are not + ;; under our control, they are under our users' control, they could + ;; block. Therefore, we do not want the completing thread to be our + ;; thread. This is very easy for users to ;; miss, therefore we complete the promise on the default executor. (p/thread-call :default (fn [] From 58139e1d1ef33c53a995028cf559e3520a54c0a1 Mon Sep 17 00:00:00 2001 From: Ferdinand Beyer Date: Tue, 6 Aug 2024 17:41:12 +0200 Subject: [PATCH 4/7] Support user-defined response executor --- src/lsp4clj/server.clj | 22 +++++++----- test/lsp4clj/server_test.clj | 67 ++++++++++++++++++++++++++++++++++++ 2 files changed, 80 insertions(+), 9 deletions(-) diff --git a/src/lsp4clj/server.clj b/src/lsp4clj/server.clj index bd11072..843b712 100644 --- a/src/lsp4clj/server.clj +++ b/src/lsp4clj/server.clj @@ -9,6 +9,7 @@ [lsp4clj.protocols.endpoint :as protocols.endpoint] [lsp4clj.trace :as trace] [promesa.core :as p] + [promesa.exec :as p.exec] [promesa.protocols :as p.protocols]) (:import (java.util.concurrent CancellationException))) @@ -196,6 +197,7 @@ trace-ch tracer* ^java.time.Clock clock + response-executor on-close request-id* pending-sent-requests* @@ -357,13 +359,13 @@ ;; thread, whatever that thread is. Since the callbacks are not ;; under our control, they are under our users' control, they could ;; block. Therefore, we do not want the completing thread to be our - ;; thread. This is very easy for users to - ;; miss, therefore we complete the promise on the default executor. - (p/thread-call :default - (fn [] - (if error - (p/reject! p (ex-info "Received error response" resp)) - (p/resolve! p result))))) + ;; thread. This is very easy for users to miss, therefore we + ;; complete the promise using an explicit executor. + (p.exec/submit! response-executor + (fn [] + (if error + (p/reject! p (ex-info "Received error response" resp)) + (p/resolve! p result))))) (trace this trace/received-unmatched-response resp now))) (catch Throwable e (log-error-receiving this e resp)))) @@ -420,9 +422,10 @@ (update server :tracer* reset! (trace/tracer-for-level trace-level))) (defn chan-server - [{:keys [output-ch input-ch log-ch trace? trace-level trace-ch clock on-close] + [{:keys [output-ch input-ch log-ch trace? trace-level trace-ch clock on-close response-executor] :or {clock (java.time.Clock/systemDefaultZone) - on-close (constantly nil)}}] + on-close (constantly nil) + response-executor :default}}] (let [;; before defaulting trace-ch, so that default is "off" tracer (trace/tracer-for-level (or trace-level (when (or trace? trace-ch) "verbose") @@ -437,6 +440,7 @@ :tracer* (atom tracer) :clock clock :on-close on-close + :response-executor response-executor :request-id* (atom 0) :pending-sent-requests* (atom {}) :pending-received-requests* (atom {}) diff --git a/test/lsp4clj/server_test.clj b/test/lsp4clj/server_test.clj index cf88638..09c5fe7 100644 --- a/test/lsp4clj/server_test.clj +++ b/test/lsp4clj/server_test.clj @@ -481,6 +481,73 @@ (h/assert-take output-ch))) (server/shutdown server)))) +(defn- core-async-dispatch-thread? [^Thread thread] + (re-matches #"async-dispatch-\d+" (.getName thread))) + +(deftest can-determine-core-async-dispatch-thread + (testing "current thread" + (is (not (core-async-dispatch-thread? (Thread/currentThread))))) + (testing "thread running go blocks" + (let [ch (async/chan) + _ (async/go (async/>! ch (Thread/currentThread))) + thread (async/!! ch (Thread/currentThread))) + thread (async/ (server/send-request server "req" {:body "foo"}) + (p/then (fn [_] (Thread/currentThread)))) + client-rcvd-msg (h/assert-take output-ch) + _ (async/put! input-ch (lsp.responses/response (:id client-rcvd-msg) {:result "good"})) + thread (deref thread-p 100 nil)] + (is (not (core-async-dispatch-thread? thread))) + (is (instance? java.util.concurrent.ForkJoinWorkerThread thread) + "completes on default ForkJoinPool executor") + (server/shutdown server))) + (testing "exceptional completion" + (let [input-ch (async/chan 3) + output-ch (async/chan 3) + server (server/chan-server {:output-ch output-ch + :input-ch input-ch}) + _ (server/start server nil) + thread-p (-> (server/send-request server "req" {:body "foo"}) + (p/catch (fn [_] (Thread/currentThread)))) + client-rcvd-msg (h/assert-take output-ch) + _ (async/put! input-ch + (-> (lsp.responses/response (:id client-rcvd-msg)) + (lsp.responses/error {:code 1234 + :message "Something bad" + :data {:body "foo"}}))) + thread (deref thread-p 100 nil)] + (is (not (core-async-dispatch-thread? thread))) + (is (instance? java.util.concurrent.ForkJoinWorkerThread thread) + "completes on default ForkJoinPool executor") + (server/shutdown server))) + (testing "completion with :current-thread executor for legacy behavior" + (let [input-ch (async/chan 3) + output-ch (async/chan 3) + server (server/chan-server {:output-ch output-ch + :input-ch input-ch + :response-executor :current-thread}) + _ (server/start server nil) + thread-p (-> (server/send-request server "req" {:body "foo"}) + (p/then (fn [_] (Thread/currentThread)))) + client-rcvd-msg (h/assert-take output-ch) + _ (async/put! input-ch (lsp.responses/response (:id client-rcvd-msg) {:result "good"})) + thread (deref thread-p 100 nil)] + (is (core-async-dispatch-thread? thread) "completes on core.async dispatch thread") + (server/shutdown server)))) + (def fixed-clock (-> (java.time.LocalDateTime/of 2022 03 05 13 35 23 0) (.toInstant java.time.ZoneOffset/UTC) From 6aaf08643d4f85daeeb2aae0f9c509e434cdf77d Mon Sep 17 00:00:00 2001 From: Ferdinand Beyer Date: Tue, 6 Aug 2024 17:50:52 +0200 Subject: [PATCH 5/7] Simplify test (go blocks return channels) --- test/lsp4clj/server_test.clj | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/test/lsp4clj/server_test.clj b/test/lsp4clj/server_test.clj index 09c5fe7..3b3a3f0 100644 --- a/test/lsp4clj/server_test.clj +++ b/test/lsp4clj/server_test.clj @@ -488,14 +488,10 @@ (testing "current thread" (is (not (core-async-dispatch-thread? (Thread/currentThread))))) (testing "thread running go blocks" - (let [ch (async/chan) - _ (async/go (async/>! ch (Thread/currentThread))) - thread (async/!! ch (Thread/currentThread))) - thread (async/ Date: Fri, 9 Aug 2024 10:56:36 +0200 Subject: [PATCH 6/7] Update readme and changelog --- CHANGELOG.md | 4 ++++ README.md | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8dcb3de..2f02af1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,10 @@ ## Unreleased +- Add a `:response-executor` option to control on which thread responses to + server-initiated requests are run, defaulting to Promesa's `:default` + executor, i.e. `ForkJoinPool/commonPool`. + ## v1.10.0 - Add `textDocument/foldingRange` schemas. diff --git a/README.md b/README.md index 3722a52..d440db7 100644 --- a/README.md +++ b/README.md @@ -120,6 +120,10 @@ Alternatively, you can convert the request to a promesa promise, and handle it u In this case `(promesa/cancel! request)` will send `$/cancelRequest`. +Response promises are completed on Promesa's `:default` executor. You +can specify your own executor by passing the `:response-executor` option +when creating your server instance. + ### Start and stop a server The last step is to start the server you created earlier. Use `lsp4clj.server/start`. This method accepts two arguments, the server and a "context". From cb24771ee9fd076c037dfecd8865cbbcbc11fea8 Mon Sep 17 00:00:00 2001 From: Ferdinand Beyer Date: Fri, 9 Aug 2024 10:58:48 +0200 Subject: [PATCH 7/7] Add `:dev` alias During development, you usually want the `test` directory on your classpath. With Calva, I cannot use the existing `:test` alias since it has `main-opts`. Alternatively, we could extract the Kaocha runner from the `:test` alias to its own. --- deps.edn | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/deps.edn b/deps.edn index 05dcf53..04d3135 100644 --- a/deps.edn +++ b/deps.edn @@ -4,7 +4,8 @@ cheshire/cheshire {:mvn/version "5.11.0"} funcool/promesa {:mvn/version "10.0.594"}} :paths ["src" "resources"] - :aliases {:test {:extra-deps {lambdaisland/kaocha {:mvn/version "1.64.1010"}} + :aliases {:dev {:extra-paths ["test"]} + :test {:extra-deps {lambdaisland/kaocha {:mvn/version "1.64.1010"}} :extra-paths ["test"] :main-opts ["-m" "kaocha.runner"]} :build {:extra-paths ["resources"]