Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use CollectorManager instead of now deprecated Collector API in search #65

Merged
merged 3 commits into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 33 additions & 4 deletions src/com/eldrix/hermes/impl/lucene.clj
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
(ns ^:no-doc com.eldrix.hermes.impl.lucene
(:require [clojure.core.async :as a])
(:import (java.util List ArrayList)
(org.apache.lucene.search CollectionTerminatedException IndexSearcher BooleanClause$Occur BooleanQuery$Builder Query
(org.apache.lucene.search CollectionTerminatedException CollectorManager IndexSearcher BooleanClause$Occur BooleanQuery$Builder Query
MatchAllDocsQuery BooleanQuery BooleanClause Collector LeafCollector Scorable ScoreMode)))

(set! *warn-on-reflection* true)
Expand All @@ -26,6 +26,16 @@
(.add coll (+ base-id doc-id))))))
(scoreMode [_] ScoreMode/COMPLETE_NO_SCORES))

;; A Lucene CollectorManager that can collect results in parallel and then
;; create a lazy concatenation of the results once search is complete
(deftype IntoSequenceCollectorManager []
CollectorManager
(newCollector [_]
(IntoArrayCollector. (ArrayList.)))
(reduce [_ collectors]
(mapcat #(.-coll ^IntoArrayCollector %) collectors)))

;; A Lucene Collector that puts search results onto a core.async channel
(deftype IntoChannelCollector [ch]
Collector
(getLeafCollector [_ ctx]
Expand All @@ -37,23 +47,42 @@
(throw (CollectionTerminatedException.))))))) ;; ... then prematurely terminate collection of the current leaf
(scoreMode [_] ScoreMode/COMPLETE_NO_SCORES))

(defn search-all
;; A Lucene CollectorManager that can collect results in parallel putting
;; results onto a channel, optionally closing when done.
(deftype IntoChannelCollectorManager [ch close?]
CollectorManager
(newCollector [_]
(IntoChannelCollector. ch))
(reduce [_ _]
(when close? (a/close! ch))))

(defn ^:deprecated search-all*
"Search a lucene index and return *all* results matching query specified.
Results are returned as a sequence of Lucene document ids."
[^IndexSearcher searcher ^Query q]
(let [coll (ArrayList.)]
(.search searcher q (IntoArrayCollector. coll))
(seq coll)))

(defn stream-all
(defn search-all
[^IndexSearcher searcher ^Query q]
(.search searcher q (IntoSequenceCollectorManager.)))

(defn ^:deprecated stream-all*
"Search a lucene index and return *all* results on the channel specified.
Results are returned as Lucene document ids."
([^IndexSearcher searcher ^Query q ch]
(stream-all searcher q ch true))
(stream-all* searcher q ch true))
([^IndexSearcher searcher ^Query q ch close?]
(.search searcher q (IntoChannelCollector. ch))
(when close? (a/close! ch))))

(defn stream-all
([^IndexSearcher searcher ^Query q ch]
(stream-all searcher q ch true))
([^IndexSearcher searcher ^Query q ch close?]
(.search searcher q (IntoChannelCollectorManager. ch close?))))

(defn- single-must-not-clause?
"Checks that a boolean query isn't simply a single 'must-not' clause.
Such a query will fail to return any results if used alone."
Expand Down
40 changes: 38 additions & 2 deletions test/com/eldrix/hermes/search_test.clj
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
(ns com.eldrix.hermes.search-test
(:require [clojure.spec.gen.alpha :as gen]
(:require [clojure.core.async :as async]
[clojure.spec.gen.alpha :as gen]
[clojure.test :refer [deftest is]]
[com.eldrix.hermes.core :as hermes]
[com.eldrix.hermes.impl.lucene :as lucene]
[com.eldrix.hermes.impl.search :as search]))

(def example-results-1
Expand Down Expand Up @@ -37,4 +39,38 @@
(with-open [svc (hermes/open "snomed.db")]
(let [q (search/q-descendantOrSelfOf 24700007)]
(is (= (search/do-query-for-concept-ids (:searcher svc) q)
(into #{} (map :conceptId) (search/do-query-for-results (:searcher svc) q nil)))))))
(into #{} (map :conceptId) (search/do-query-for-results (:searcher svc) q nil)))))))

(defn ch->set
"Drain the clojure.core.async channel `ch` and return results as a set."
[ch]
(loop [results (transient #{})]
(if-let [result (async/<!! ch)]
(recur (conj! results result))
(persistent! results))))

(defn test-query [svc q]
(let [searcher (.-searcher svc)
ch1 (async/chan)
ch2 (async/chan)]
(async/thread (lucene/stream-all searcher q ch1))
(async/thread (lucene/stream-all* searcher q ch2))
(is (= (set (lucene/search-all searcher q))
(set (lucene/search-all* searcher q))
(ch->set ch1)
(ch->set ch2)) (str "Query returned different results" q))))

(deftest ^:live search-parallel
(with-open [svc (hermes/open "snomed.db")]
(let [concept-ids (take 5000 (shuffle (#'hermes/get-n-concept-ids svc 1000000)))]
(doseq [concept-id concept-ids]
(test-query svc (search/q-descendantOf concept-id))))))

(comment
(def svc (hermes/open "snomed.db"))
(def searcher (.-searcher svc))
(def q (search/q-descendantOf 25700007)) ;138875005
(require '[criterium.core :as crit])
(crit/bench (lucene/search-all searcher q))
(crit/bench (lucene/search-all* searcher q)))