Skip to content

Commit

Permalink
WIP - instrumenting
Browse files Browse the repository at this point in the history
  • Loading branch information
wilkerlucio committed Jan 12, 2024
1 parent eb64902 commit 3c6a912
Show file tree
Hide file tree
Showing 6 changed files with 221 additions and 157 deletions.
1 change: 1 addition & 0 deletions .clj-kondo/com.wsscode/pathom3/config.edn
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{:lint-as {com.wsscode.pathom3.connect.operation/defmutation clojure.core/defn
com.wsscode.pathom3.connect.operation/defresolver clojure.core/defn
com.wsscode.pathom3.plugin/defplugin clojure.core/def
com.wsscode.pathom3.trace/with-span! clojure.core/let
com.wsscode.promesa.macros/clet clojure.core/let
com.wsscode.promesa.macros/ctry clojure.core/try}}
72 changes: 43 additions & 29 deletions src/main/com/wsscode/pathom3/connect/planner.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
[com.wsscode.pathom3.format.shape-descriptor :as pfsd]
[com.wsscode.pathom3.path :as p.path]
[com.wsscode.pathom3.placeholder :as pph]
[com.wsscode.pathom3.trace :as t]
[edn-query-language.core :as eql])
#?(:cljs
(:require-macros
Expand Down Expand Up @@ -1057,6 +1058,7 @@
(inc-snapshot-depth)
(add-resolvers-trail resolvers)
(assoc
::sub-graph-compute-reason "shape reachable?"
::optimize-graph? false
::available-data available
:edn-query-language.ast/node (pfsd/shape-descriptor->ast missing))))]
Expand Down Expand Up @@ -1176,6 +1178,7 @@
(inc-snapshot-depth)
(add-resolvers-trail (::resolvers env))
(assoc
::sub-graph-compute-reason "dynamic nested requirements"
:edn-query-language.ast/node ast
::available-data available)))
root-res (find-root-resolver-nodes graph)
Expand Down Expand Up @@ -1248,6 +1251,7 @@
(inc-snapshot-depth)
(add-resolvers-trail (::resolvers env))
(assoc
::sub-graph-compute-reason "nested requirements"
:com.wsscode.pathom3.error/lenient-mode? true
:edn-query-language.ast/node ast
::available-data (pfsd/merge-shapes available-data provide-sub))))
Expand Down Expand Up @@ -1717,7 +1721,8 @@
(if lenient-mode?
(try
(verify-plan!* env graph)
(catch #?(:clj Throwable :cljs :default) _
(catch #?(:clj Throwable :cljs :default) e
(t/log! env {::t/name ::plan-verify-failed ::t/error (ex-message e)})
(assoc graph ::verification-failed? true)))
(verify-plan!* env graph)))

Expand All @@ -1728,7 +1733,7 @@
(:children ast)))

(defn rehydrate-graph-idents
"To optimize the plan caching Pathom will remove the values of the idents at the cache
"To optimize the plan caching, Pathom will remove the values of the idents at the cache
key. But upon later usage of the cache, the cache key will hit the previous AST, but
that AST still has the initial values used on caching. This function will rehydrate
the AST replacing the cached ident values with the current ident values."
Expand Down Expand Up @@ -1806,10 +1811,12 @@
=> ::graph]
(compute-run-graph {} env))

([graph {::keys [optimize-graph?]
([graph {::keys [optimize-graph?
sub-graph-compute-reason]
:or {optimize-graph? true}
:as env}]
[(? (s/keys))
[(? (s/keys :opt [::optimize-graph?
::sub-graph-compute-reason]))
(s/keys
:req [:edn-query-language.ast/node]
:opt [::available-data
Expand All @@ -1821,31 +1828,38 @@
(add-snapshot! graph env {::snapshot-event ::snapshot-start-graph
::snapshot-message "=== Start query plan ==="})

(verify-plan!
env
(rehydrate-graph-idents
(p.cache/cached ::plan-cache* env [(hash (::pci/index-oir env))
(::available-data env)
(pf.eql/cacheable-ast (:edn-query-language.ast/node env))
(boolean optimize-graph?)]
#(let [env' (-> (merge (base-env) env)
(vary-meta assoc ::original-env env))]
(cond->
(compute-run-graph*
(merge (base-graph)
graph
{::index-ast (pf.eql/index-ast (:edn-query-language.ast/node env))
::source-ast (:edn-query-language.ast/node env)
::available-data (::available-data env)
::user-request-shape (pfsd/ast->shape-descriptor (:edn-query-language.ast/node env))})
env')

optimize-graph?
(optimize-graph env')

true
(mark-fast-placeholder-processes env'))))
(:edn-query-language.ast/node env)))))
(t/with-span! [env {::t/env env
::t/name (if sub-graph-compute-reason
(str "Secondary plan: " sub-graph-compute-reason)
"Plan")}]
(let [plan
(verify-plan!
env
(rehydrate-graph-idents
(p.cache/cached ::plan-cache* env [(hash (::pci/index-oir env))
(::available-data env)
(pf.eql/cacheable-ast (:edn-query-language.ast/node env))
(boolean optimize-graph?)]
#(let [env' (-> (merge (base-env) env)
(vary-meta assoc ::original-env env))]
(cond->
(compute-run-graph*
(merge (base-graph)
graph
{::index-ast (pf.eql/index-ast (:edn-query-language.ast/node env))
::source-ast (:edn-query-language.ast/node env)
::available-data (::available-data env)
::user-request-shape (pfsd/ast->shape-descriptor (:edn-query-language.ast/node env))})
env')

optimize-graph?
(optimize-graph env')

true
(mark-fast-placeholder-processes env'))))
(:edn-query-language.ast/node env)))]
(t/set-attributes! env {::graph plan})
plan))))

; endregion

Expand Down
130 changes: 75 additions & 55 deletions src/main/com/wsscode/pathom3/connect/runner.cljc
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
[com.wsscode.pathom3.format.shape-descriptor :as pfsd]
[com.wsscode.pathom3.path :as p.path]
[com.wsscode.pathom3.placeholder :as pph]
[com.wsscode.pathom3.plugin :as p.plugin])
[com.wsscode.pathom3.plugin :as p.plugin]
[com.wsscode.pathom3.trace :as t])
#?(:clj
(:import
(clojure.lang
Expand Down Expand Up @@ -291,13 +292,15 @@
If there is ident data already, it gets merged with the ident value."
[env idents]
(doseq [[k v :as ident] idents]
(p.ent/swap-entity! env
(fn [entity]
(p.plugin/run-with-plugins env ::wrap-merge-attribute
(fn process-idents-merge-attr--internal [env m k v]
(assoc m k (process-attr-subquery env {} k v)))
env entity ident (assoc (get entity ident) k v))))))
(t/with-span! [env {::t/env env
::t/name "Process idents"}]
(doseq [[k v :as ident] idents]
(p.ent/swap-entity! env
(fn [entity]
(p.plugin/run-with-plugins env ::wrap-merge-attribute
(fn process-idents-merge-attr--internal [env m k v]
(assoc m k (process-attr-subquery env {} k v)))
env entity ident (assoc (get entity ident) k v)))))))

(defn run-next-node!
"Runs the next node associated with the node, in case it exists."
Expand Down Expand Up @@ -328,6 +331,8 @@
[{::keys [node-run-stats*] :as env}
{::pcp/keys [node-id]}
error]
(t/log! env {::t/name ::resolver-error
::t/error (ex-message error)})
(if node-run-stats*
(doto node-run-stats*
(refs/gswap! assoc-in [node-id ::node-error] (p.error/datafy-processor-error error))
Expand Down Expand Up @@ -499,46 +504,53 @@
{::pco/keys [op-name]
::pcp/keys [input]
:as node}]
(let [resolver (pci/resolver env op-name)
{::pco/keys [op-name batch? cache? cache-store optionals]
:or {cache? true}
:as r-config} (pco/operation-config resolver)
env (assoc env ::pcp/node node)
entity (p.ent/entity env)
input+opts (pfsd/merge-shapes input optionals)
input-data (pfsd/select-shape-filtering entity input+opts input)
input-data (enhance-dynamic-input r-config node input-data)
params (pco/params env)
cache-store (choose-cache-store env cache-store)
resolver-cache* (get env cache-store)
_ (merge-node-stats! env node
{::resolver-run-start-ms (time/now-ms)})
response (try
(let [batch-check (wait-batch-check env node entity input input+opts)]
(cond
batch-check
batch-check

batch?
(if-let [x (p.cache/cache-find resolver-cache* [op-name input-data params])]
(val x)
(if (::unsupported-batch? env)
(invoke-resolver-cached-batch
env cache? op-name resolver cache-store input-data params)
(batch-hold-token env cache? op-name node cache-store input-data params)))

:else
(invoke-resolver-cached
env cache? op-name resolver cache-store input-data params)))
(catch #?(:clj Throwable :cljs :default) e
(report-resolver-error env node e)))
finish (time/now-ms)
response (validate-response! env node response)]
(merge-node-stats! env node
(cond-> {::resolver-run-finish-ms finish}
(not (::batch-hold response))
(merge (report-resolver-io-stats env input-data response))))
response))
(t/with-span! [env {::t/env env
::t/name (str "Run resolver " op-name)
::t/attributes {::pcp/node node}}]
(let [resolver (pci/resolver env op-name)
{::pco/keys [op-name batch? cache? cache-store optionals]
:or {cache? true}
:as r-config} (pco/operation-config resolver)
env (assoc env ::pcp/node node)
entity (p.ent/entity env)
input+opts (pfsd/merge-shapes input optionals)
input-data (pfsd/select-shape-filtering entity input+opts input)
input-data (enhance-dynamic-input r-config node input-data)
params (pco/params env)
cache-store (choose-cache-store env cache-store)
resolver-cache* (get env cache-store)
_ (merge-node-stats! env node
{::resolver-run-start-ms (time/now-ms)})
_ (t/set-attributes! env (cond-> {}
(seq params) (assoc ::node-resolver-params params)
(seq input-data) (assoc ::node-resolver-input input-data)))
response (try
(let [batch-check (wait-batch-check env node entity input input+opts)]
(cond
batch-check
batch-check

batch?
(if-let [x (p.cache/cache-find resolver-cache* [op-name input-data params])]
(val x)
(if (::unsupported-batch? env)
(invoke-resolver-cached-batch
env cache? op-name resolver cache-store input-data params)
(batch-hold-token env cache? op-name node cache-store input-data params)))

:else
(invoke-resolver-cached
env cache? op-name resolver cache-store input-data params)))
(catch #?(:clj Throwable :cljs :default) e
(report-resolver-error env node e)))
finish (time/now-ms)
response (validate-response! env node response)]
(t/set-attributes! env ::node-resolver-output response)
(merge-node-stats! env node
(cond-> {::resolver-run-finish-ms finish}
(not (::batch-hold response))
(merge (report-resolver-io-stats env input-data response))))
response)))

(defn user-demand-completed? [env]
(empty? (pfsd/missing-from-data (p.ent/entity env) (-> env ::pcp/graph ::pcp/user-request-shape))))
Expand Down Expand Up @@ -817,8 +829,11 @@
(defn process-mutations!
"Runs the mutations gathered by the planner."
[{::pcp/keys [graph] :as env}]
(doseq [ast (::pcp/mutations graph)]
(invoke-mutation! env ast)))
(when (seq (::pcp/mutations graph))
(t/with-span! [env {::t/env env
::t/name "Run mutations"}]
(doseq [ast (::pcp/mutations graph)]
(invoke-mutation! env ast)))))

(defn check-entity-requires!
"Verify if entity contains all required keys from graph index-ast. This is
Expand Down Expand Up @@ -855,8 +870,7 @@
(if (-> env ::pcp/graph ::pcp/placeholders)
(merge-resolver-response! env (placeholder-merge-entity env)))
; entity ready
(p.plugin/run-with-plugins env ::wrap-entity-ready! run-graph-done!
env))
(p.plugin/run-with-plugins env ::wrap-entity-ready! run-graph-done! env))

(defn run-root-node!
[{::pcp/keys [graph] :as env}]
Expand Down Expand Up @@ -885,14 +899,18 @@

; compute nested available fields
(if-let [nested (::pcp/nested-process graph)]
(merge-resolver-response! env (select-keys (p.ent/entity env) nested)))
(t/with-span! [env {::t/env env
::t/name "Run nested fields"}]
(merge-resolver-response! env (select-keys (p.ent/entity env) nested))))

; process idents
(if-let [idents (::pcp/idents graph)]
(process-idents! env idents))

; now run the nodes
(run-root-node! env)
(t/with-span! [env {::t/env env
::t/name "Run graph nodes"}]
(run-root-node! env))

env))

Expand Down Expand Up @@ -1176,7 +1194,9 @@
[(s/keys) (s/or :ast :edn-query-language.ast/node
:graph ::pcp/graph) ::p.ent/entity-tree*
=> (s/keys)]
(run-graph-with-plugins env ast-or-graph entity-tree* run-graph-impl!))
(t/with-span! [env {::t/env env
::t/name ::trace-plan-and-run}]
(run-graph-with-plugins env ast-or-graph entity-tree* run-graph-impl!)))

(>defn with-resolver-cache
([env] [map? => map?] (with-resolver-cache env (atom {})))
Expand Down
Loading

0 comments on commit 3c6a912

Please sign in to comment.