Skip to content

Commit

Permalink
Replay for impact should stop processing when baseline query times out (
Browse files Browse the repository at this point in the history
#5)

* feat: stop the RFI if baseline query times out
* test: test _rank_eval query construction
  • Loading branch information
dainiusjocas authored Mar 23, 2021
1 parent 56ca4d2 commit 68e6a78
Show file tree
Hide file tree
Showing 6 changed files with 208 additions and 73 deletions.
2 changes: 1 addition & 1 deletion dockerfiles/Dockerfile.executable-builder
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ FROM findepi/graalvm:java11-native as BUILDER

ENV GRAALVM_HOME=/graalvm
ENV JAVA_HOME=/graalvm
ENV CLOJURE_VERSION=1.10.2.774
ENV CLOJURE_VERSION=1.10.3.814

RUN apt-get install -y curl \
&& gu install native-image \
Expand Down
2 changes: 1 addition & 1 deletion dockerfiles/Dockerfile.test
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ FROM findepi/graalvm:java11-native as BUILDER

ENV GRAALVM_HOME=/graalvm
ENV JAVA_HOME=/graalvm
ENV CLOJURE_VERSION=1.10.2.774
ENV CLOJURE_VERSION=1.10.3.814

RUN apt-get install -y curl \
&& gu install native-image \
Expand Down
2 changes: 1 addition & 1 deletion dockerfiles/docker-compose.es.test.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
version: '3'
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:${ES_VERSION:-7.10.2}
image: docker.elastic.co/elasticsearch/elasticsearch:${ES_VERSION:-7.11.2}
environment:
- discovery.type=single-node
- bootstrap.memory_lock=true
Expand Down
4 changes: 2 additions & 2 deletions dockerfiles/docker-compose.es.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
version: '3'
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:${ES_VERSION:-7.10.2}
image: docker.elastic.co/elasticsearch/elasticsearch:${ES_VERSION:-7.11.2}
environment:
- discovery.type=single-node
- bootstrap.memory_lock=true
Expand All @@ -18,7 +18,7 @@ services:
- 9200:9200

kibana:
image: docker.elastic.co/kibana/kibana:${ES_VERSION:-7.10.2}
image: docker.elastic.co/kibana/kibana:${ES_VERSION:-7.11.2}
environment:
SERVER_NAME: kibana
LOGGING_QUIET: 'true'
Expand Down
161 changes: 93 additions & 68 deletions src/replay/impact.clj
Original file line number Diff line number Diff line change
Expand Up @@ -17,77 +17,101 @@
(defn get-index-or-alias [endpoint]
(last (re-find #"^/(.*)/_search" endpoint)))

(defn prepare-endpoint [^String endpoint]
(defn prepare-endpoint
"Prepares the endpoint for PIT queries: remove preference, routing, index name."
[^String endpoint]
(transform.uri/transform-uri
endpoint
[{:match "preference=[^&]*&?"
[{:match "preference=[^&]*&?" ;; Remove preference string
:replacement ""}
{:match "routing=[^&]*&?"
{:match "routing=[^&]*&?" ;; Remove routing parameter
:replacement ""}
{:match "^(/.*)(/.*)"
{:match "^(/.*)(/.*)" ;; Remove original index because PIT doesn't allow it
:replacement "$2"}
{:match "\\?$"
{:match "\\?$" ;; Remove trailing question mark
:replacement ""}]))

(defn generate-queries [opts query-body]
(impact-transform/generate-queries query-body (get-in opts [:replay :query-transforms])))

(defn get-baseline-resp [^String url query-body pit k]
(r/execute-request
{:url url
:body (assoc query-body :pit pit :size k)
:opts (assoc r/default-exponential-backoff-params :keywordize? true)
:method :get
:headers r/default-headers}))

(defn get-baseline-ratings [url query-body pit k ignore-timeouts]
(let [baseline-resp (get-baseline-resp url query-body pit k)]
(when (and (:timed_out baseline-resp) (not ignore-timeouts))
(throw (Exception. (format "Request to get baseline ratings timed-out. %s" baseline-resp))))
(map (fn [hit]
(assoc (select-keys hit [:_index :_id]) :rating 1))
(get-in baseline-resp [:hits :hits]))))

(defn get-grouped-query-variations [query-body opts k]
(->> (generate-queries opts query-body)
(map (fn [qv] (update qv :request assoc :size k)))
(group-by (fn [query-variation] (json/encode (:variation query-variation))))))

(defn prepare-rank-eval-request [ratings grouped-variations metric pit]
{:requests (map (fn [[id [{request :request}]]]
{:id id
:request (assoc request :pit pit)
:ratings ratings})
grouped-variations)
:metric metric})

(defn query-rank-eval-api [target-es-host target-index ratings grouped-variations metric pit]
(let [target-url (format "%s/%s/_rank_eval" target-es-host target-index)]
(r/execute-request
{:url target-url
:body (prepare-rank-eval-request ratings grouped-variations metric pit)
:opts (assoc r/default-exponential-backoff-params :keywordize? true)
:method :get
:headers r/default-headers})))

(defn construct-rfi-records [rank-eval-resp query-log-entry grouped-variations baseline-ratings k]
(let [{:keys [details failures metric_score]} rank-eval-resp
variation-ids (keys details)]
(map (fn [variation-id]
(let [query-log-entry-id (get query-log-entry :_id)
variation (first (get grouped-variations (name variation-id)))
impact {:top-k k
:variation-id (name variation-id)
:variation (map (fn [variation-map]
(update variation-map :value str))
(:variation variation))
:query (json/encode (:request variation))
:failures (json/encode failures)
:impact (float (- 1 (:metric_score (variation-id details))))
:average-impact (float (- 1 metric_score))
:hit-count (count (get-in details [variation-id :hits]))
:unrelated-count (count (get-in details [variation-id :unrated_docs]))
:metric-score (get-in details [variation-id :metric_score])
:original-hit-count (count baseline-ratings)
:details (json/encode (get details variation-id))}]
(-> query-log-entry
(update :_id (fn [replay-log-entry-id] (str replay-log-entry-id "-" (hash variation-id))))
(assoc-in [:_source :query_log_entry_id] query-log-entry-id)
(assoc-in [:_source :impact] impact))))
variation-ids)))

(defn measure-impact [opts query-log-entry]
(let [es-host (get-in opts [:replay :connection.url])
(let [target-es-host (get-in opts [:replay :connection.url])
raw-endpoint (get-in query-log-entry [:_source :uri])
endpoint (prepare-endpoint raw-endpoint)
target-index (get-index-or-alias raw-endpoint)
pit (assoc (pit/init es-host target-index opts) :keep_alive "30s")
query-string (get-in query-log-entry [:_source :request])
query-body (json/decode query-string)
url (format "%s%s" es-host endpoint)
target-index (or (get-in opts [:replay :target-index]) (get-index-or-alias raw-endpoint))
k (get-in opts [:replay :top-k])
baseline-resp (r/execute-request
{:url url
:body (assoc query-body :pit pit :size k)
:opts (assoc r/default-exponential-backoff-params :keywordize? true)
:method :get
:headers r/default-headers})]
(let [metric {:precision {:k k :relevant_rating_threshold 1 :ignore_unlabeled false}}
ratings (map (fn [hit] (assoc (select-keys hit [:_index :_id]) :rating 1))
(get-in baseline-resp [:hits :hits]))
target-url (format "%s/%s/_rank_eval" es-host target-index)
query-variations (generate-queries opts query-body)
grouped-variations (group-by (fn [qv] (json/encode (:variation qv)))
(map (fn [qv] (update qv :request assoc :size k)) query-variations))
rank-eval-resp (r/execute-request
{:url target-url
:body {:requests (map (fn [[id [{request :request}]]]
{:id id
:request (assoc request :pit pit)
:ratings ratings})
grouped-variations)
:metric metric}
:opts (assoc r/default-exponential-backoff-params :keywordize? true)
:method :get
:headers r/default-headers})]
(let [{:keys [details failures metric_score]} rank-eval-resp]
(map (fn [variation-id]
(let [query-log-entry-id (get query-log-entry :_id)
variation (first (get grouped-variations (name variation-id)))]
(-> query-log-entry
(update :_id (fn [replay-log-entry-id] (str replay-log-entry-id "-" (hash variation-id))))
(assoc-in [:_source :query_log_entry_id] query-log-entry-id)
(assoc-in [:_source :impact] {:top-k k
:variation-id (name variation-id)
:variation (map (fn [variation-map]
(update variation-map :value str))
(:variation variation))
:query (json/encode (:request variation))
:failures failures
:impact (float (- 1 (:metric_score (variation-id details))))
:average-impact (float (- 1 metric_score))
:hit-count (count (get-in details [variation-id :hits]))
:unrelated-count (count (get-in details [variation-id :unrated_docs]))
:metric-score (get-in details [variation-id :metric_score])
:original-hit-count (count ratings)
:details (json/encode (get details variation-id))}))))
(keys details))))))
query-body (json/decode (get-in query-log-entry [:_source :request]))
metric {:precision {:k k :relevant_rating_threshold 1 :ignore_unlabeled false}}
pit (assoc (pit/init target-es-host target-index opts) :keep_alive "30s")
baseline-ratings-url (format "%s%s" target-es-host (prepare-endpoint raw-endpoint))
baseline-ratings (get-baseline-ratings baseline-ratings-url query-body pit k (get-in opts [:replay :ignore-timeouts]))
grouped-variations (get-grouped-query-variations query-body opts k)
rank-eval-resp (query-rank-eval-api target-es-host target-index baseline-ratings grouped-variations metric pit)]
(println baseline-ratings)
(construct-rfi-records rank-eval-resp query-log-entry grouped-variations baseline-ratings k)))

(def defaults
{:max_docs 1
Expand All @@ -102,7 +126,9 @@
:top-k 10
:query-transforms []
:connection.url "http://localhost:9200"
:concurrency 1}
:target-index nil
:concurrency 1
:ignore-timeouts false}
:sink {:connection.url "http://localhost:9200"
:dest.index "impact_sink_index"
:batch.size 50}})
Expand All @@ -126,16 +152,15 @@

(comment
(replay.impact/execute
{:max_docs 100
{:max_docs 1
:source {:remote {:host "http://localhost:9200"}
:index "query_logs"
:query {:query {:bool
{:filter
[{:term {:query_from {:value 0}}}
{:term {:stats {:value "some value"}}}
{:range {:header.timestamp {:gte "now-2d"}}}
{:match {:request "multi_match"}}
{:prefix {:uri.keyword "/index_name/_search"}}]
{:prefix {:uri.keyword "/index-name/_search"}}]
:must_not
[{:exists {:field "query_sort"}}]}}
:sort [{:header.timestamp {:order :asc}}]
Expand All @@ -144,13 +169,13 @@
:replay {:connection.url "http://localhost:9200"
:concurrency 10
:top-k 100
:query-transforms [{:id "test"
:lang :sci
:script "(fn [query boost]\n (let [query-string (-> query\n (get-in [:query :bool :must])\n first\n (get-in [:constant_score :filter :multi_match :query]))\n clause-to-add {:constant_score {:boost boost\n :filter {:match {:title.folded {:_name \"boost_on_exactness\"\n :operator \"and\"\n :query query-string}}}}}]\n (update-in query [:query :bool :should] conj clause-to-add)))"
:vals [0.00001 0.0001 0.001 0.01 0.1 1 10 100 1000 10000]}
:query-transforms [{:id "jq-test"
:lang :jq
:script ". as [$query, $value] | $query | .size = $value"
:vals [1 10 100]}
{:id "test2"
:lang :sci
:script "(fn [query boost] query)"
:lang :js
:script "(query, value) => { query['from'] = value; return query; }"
:vals [123]}]}
:sink {:connection.url "http://localhost:9200"
:dest.index "impact_sink_index"
Expand Down
Loading

0 comments on commit 68e6a78

Please sign in to comment.