From 73558f1cf1accc20b58e728f44703404a50aad57 Mon Sep 17 00:00:00 2001 From: Adrian Batos-Parac Date: Fri, 13 Apr 2018 11:10:14 -0400 Subject: Fix graphwalk capabilities Fixes the issue where graphwalk was not working properly in the gallifrey code within ONAP. Change-Id: I43f7471e335972563769f3806f889884c2e46753 Issue-ID: AAI-797 Signed-off-by: Adrian Batos-Parac --- project.clj | 2 + src/gallifrey/handler.clj | 128 ++++++++++++----- src/gallifrey/store.clj | 360 ++++++++++++++++++++++++++++++++-------------- 3 files changed, 342 insertions(+), 148 deletions(-) diff --git a/project.clj b/project.clj index d520863..8637676 100644 --- a/project.clj +++ b/project.clj @@ -17,6 +17,8 @@ [integrant "0.6.2"] [clojure-future-spec "1.9.0-beta4"] + [metrics-clojure "2.10.0"] + [metrics-clojure-ring "2.10.0"] [yogthos/config "0.9"]] :min-lein-version "2.5.3" :profiles {:dev {:source-paths ["dev"] diff --git a/src/gallifrey/handler.clj b/src/gallifrey/handler.clj index 52d59c7..61bdd16 100644 --- a/src/gallifrey/handler.clj +++ b/src/gallifrey/handler.clj @@ -1,6 +1,7 @@ (ns gallifrey.handler (:require [gallifrey.store :as store] [utilis.map :refer [map-vals compact]] + [utilis.fn :refer [fsafe apply-kw]] [liberator.core :refer [defresource]] [liberator.representation :refer [as-response ring-response]] [compojure.core :refer [GET PUT PATCH ANY defroutes]] @@ -11,6 +12,8 @@ [ring.middleware.session :refer [wrap-session]] [cheshire.core :as json] [clj-time.format :as tf] + [metrics.ring.instrument :refer [instrument]] + [metrics.ring.expose :refer [expose-metrics-as-json]] [integrant.core :as ig])) (declare handler) @@ -24,81 +27,132 @@ (defmethod ig/halt-key! :gallifrey/handler [_ _] (reset! the-store nil)) -(declare serialize de-serialize) +(declare parse-ts serialize serialize-entity serialize-relationship) (defn entity-existed? - [id & {:keys [t-k]}] - (store/entity-existed? @the-store id :t-k t-k)) + [type id & {:keys [t-k]}] + (store/entity-existed? @the-store type id :t-k t-k)) -(defresource entity-endpoint [id] +(defresource entity-endpoint [type id] :allowed-methods [:get :put :delete] :available-media-types ["application/json"] :malformed? (fn [ctx] (when (#{:put :delete} (-> ctx :request :request-method)) (-> ctx :request :params :actor empty?))) :exists? (fn [ctx] - (if-let [resource (->> (when-let [t-k (-> ctx :request :params :t-k)] - (tf/parse t-k)) - (store/get-entity @the-store id :t-k) - serialize)] - {::resource resource} + (if-let [resource (store/get-entity @the-store type id + :t-t (parse-ts ctx :t-t) :t-k (parse-ts ctx :t-k))] + {::resource ((case type + "entity" serialize-entity + "relationship" serialize-relationship) resource)} (when (and (= :put (-> ctx :request :request-method)) (-> ctx :request :params :create not-empty)) true))) - :existed? (fn [ctx] - (entity-existed? id :t-k (when-let [t-k (-> ctx :request :params :t-k)] - (tf/parse t-k)))) + :existed? (fn [ctx] (entity-existed? type id :t-k (parse-ts ctx :t-k))) :handle-ok ::resource :can-put-to-missing? false :handle-not-implemented (fn [{{m :request-method} :request :as ctx}] (when (= :put m) (-> (as-response "Resource not found" ctx) - (assoc :status (if (entity-existed? id) 410 404)) - (ring-response)))) + (assoc :status (if (entity-existed? type id) 410 404)) + ring-response))) :put! (fn [ctx] (let [body (json/parse-string (slurp (get-in ctx [:request :body]))) + properties (get body "properties") + source {"source.type" (get-in body ["source" "type"]) + "source.id" (get-in body ["source" "id"])} + target {"target.type" (get-in body ["target" "type"]) + "target.id" (get-in body ["target" "id"])} actor (-> ctx :request :params :actor) - changes-only (when-let [c (-> ctx :request :params :changes-only)] - (boolean c))] - {::created? (:created? (store/put-entity @the-store actor id body + changes-only (boolean (-> ctx :request :params :changes-only))] + {::created? (:created? (store/put-entity @the-store actor type id + (compact (merge properties source target)) + :t-t (parse-ts ctx :t-t) :changes-only changes-only))})) :delete! (fn [ctx] - (let [actor (-> ctx :request :params :actor)] - (store/delete-entity @the-store actor id))) + (store/delete-entity @the-store (-> ctx :request :params :actor) type id)) :new? ::created?) -(defresource entity-lifespan-endpoint [id] +(defresource entity-history-endpoint [type id] :allowed-methods [:get] :available-media-types ["application/json"] :exists? (fn [ctx] (when-let [resource (not-empty - (store/entity-lifespan @the-store id))] - {::resource (-> resource - (update :created str) - (update :updated (partial map str)) - (update :deleted str) - compact)})) + (store/entity-history @the-store type id))] + {::resource (map-vals (partial map #(-> % + (update :k-start str) + (update :k-end str) + (update :t-start str) + (update :t-end str) + compact)) resource)})) :handle-ok ::resource) +(defresource entity-lifespan-endpoint [type id] + :allowed-methods [:get] + :available-media-types ["application/json"] + :exists? (fn [ctx] + (when-let [resource (not-empty + (store/entity-lifespan @the-store type id))] + {::resource (map #(-> % + (update :created str) + (update :updated (partial map str)) + (update :deleted str) + compact) resource)})) + :handle-ok ::resource) + +(defresource entity-aggregation-endpoint [type] + :allowed-methods [:get] + :available-media-types ["application/json"] + :handle-ok #(store/aggregate-entities @the-store type + :filters (->> (dissoc (get-in % [:request :params]) + :properties :t-t :t-k + :gallifrey-type) + (map (fn [[k v]] + (cond + (= "null" v) [k nil] + :else [k v]))) + (into {})) + :properties (get-in % [:request :params :properties]) + :t-t (parse-ts % :t-t) :t-k (parse-ts % :t-k))) + (defroutes app-routes - (ANY "/entity/:id" [id] (entity-endpoint id)) - (GET "/entity/:id/lifespan" [id] (entity-lifespan-endpoint id)) + (GET "/:gallifrey-type/aggregations" [gallifrey-type] (entity-aggregation-endpoint gallifrey-type)) + (ANY "/:gallifrey-type/:id" [gallifrey-type id] (entity-endpoint gallifrey-type id)) + (GET "/:gallifrey-type/:id/history" [gallifrey-type id] (entity-history-endpoint gallifrey-type id)) + (GET "/:gallifrey-type/:id/lifespan" [gallifrey-type id] (entity-lifespan-endpoint gallifrey-type id)) (resources "/")) (def handler (-> app-routes - (wrap-defaults api-defaults))) + (wrap-defaults api-defaults) + instrument + expose-metrics-as-json)) ;;; Implementation -(defn- serialize - [e] +(defn- parse-ts + [ctx key] + (when-let [ts (get-in ctx [:request :params key])] + (tf/parse ts))) + +(defn- serialize-relationship + [resource] (compact - (update e :_meta #(map-vals - (fn [m] - (map-vals str m)) %)))) + {:properties (dissoc resource :_id :_meta + :source.type :source.id + :target.type :target.id) + :source {:type (:source.type resource) + :id (:source.id resource)} + :target {:type (:target.type resource) + :id (:target.id resource)} + :_id (:_id resource) + :_meta (map-vals (partial map-vals str) (:_meta resource))})) -(defn- de-serialize - [e] - e) +(defn- serialize-entity + [resource] + (compact + {:properties (dissoc resource :_id :_meta :_relationships) + :relationships (map serialize-relationship (:_relationships resource)) + :_id (:_id resource) + :_meta (map-vals (partial map-vals str) (:_meta resource))})) diff --git a/src/gallifrey/store.clj b/src/gallifrey/store.clj index 85a17d7..4f10f7c 100644 --- a/src/gallifrey/store.clj +++ b/src/gallifrey/store.clj @@ -1,12 +1,34 @@ + (ns gallifrey.store (:require [utilis.map :refer [compact map-keys map-vals]] [utilis.types.keyword :refer [->keyword]] [rethinkdb.query :as r] [inflections.core :refer [hyphenate underscore]] [clj-time.core :as t] - [integrant.core :as ig]) + [metrics.core :refer [default-registry]] + [metrics.timers :refer [deftimer time!]] + [integrant.core :as ig] + [clojure.string :as st]) (:import [clojure.lang ExceptionInfo])) +(deftimer default-registry ["gallifrey" "store" get-entity-time]) +(deftimer default-registry ["gallifrey" "store" put-entity-time]) +(deftimer default-registry ["gallifrey" "store" delete-entity-time]) +(deftimer default-registry ["gallifrey" "store" aggregate-entities-time]) + +(deftimer default-registry ["gallifrey" "store" entity-lifespan-time]) +(deftimer default-registry ["gallifrey" "store" entity-history-time]) + +(deftimer default-registry ["gallifrey" "store" entity-existed-time]) + +(deftimer default-registry ["gallifrey" "store" entity-assertions-time]) +(deftimer default-registry ["gallifrey" "store" entity-relationships-time]) + +(deftimer default-registry ["gallifrey" "store" entity-lifecycle-time]) +(deftimer default-registry ["gallifrey" "store" update-lifecycle-time]) + +(deftimer default-registry ["gallifrey" "store" entity-lock-time]) + (declare ensure-db ensure-table ensure-index) (defmethod ig/init-key :gallifrey/store [_ {:keys [db-server db-name]}] @@ -14,16 +36,25 @@ connection (r/connect :host host :port port :db db-name) assertions-table "assertions" lifecycle-table "lifecycle" - entity-id-index "entity-id-index"] + entity-id-index "entity-id-index" + entity-type-index "entity-type-index" + attribute-index "attribute-index" + value-index "value-index"] (ensure-db connection db-name) (ensure-table connection db-name assertions-table) (ensure-index connection db-name assertions-table entity-id-index :entity-id) + (ensure-index connection db-name assertions-table entity-type-index :_type) + (ensure-index connection db-name assertions-table attribute-index :attribute) + (ensure-index connection db-name assertions-table value-index :value) (ensure-table connection db-name lifecycle-table) {:db-server db-server :db-name db-name :assertions-table assertions-table :lifecycle-table lifecycle-table :entity-id-index entity-id-index + :entity-type-index entity-type-index + :attribute-index attribute-index + :value-index value-index :connection connection :entity-locks (atom {})})) @@ -31,50 +62,82 @@ (.close connection) nil) - -(declare entity-lock entity-lifecycle update-lifecycle entity-assertions) +(declare entity-lock entity-lifecycle update-lifecycle + entity-assertions entity-relationships) (defn get-entity "Get all the non-retracted attributes associated with an entity referenced by - entity-id at time 't-k'. 't-k' defaults to 'now' if it is not provided." - [store entity-id & {:keys [t-k]}] - (let [t-k (or t-k (t/now))] - (->> (entity-assertions store entity-id :t-k t-k) - (reduce (fn [doc {:keys [attribute value] :as a}] - (-> doc - (assoc (keyword attribute) value) - (assoc-in [:_meta (keyword attribute)] - (dissoc a :id :attribute :value :entity-id)))) - {}) - not-empty))) + entity-id at time 't-t' or 't-k'. 't-k' defaults to 'now' if it is not provided." + [store entity-type entity-id & {:keys [t-t t-k]}] + (time! + get-entity-time + (let [t-k (or t-k (t/now))] + (->> (apply entity-assertions store entity-type entity-id (if t-t [:t-t t-t] [:t-k t-k])) + (reduce (fn [doc {:keys [attribute value] :as a}] + (-> doc + (assoc (keyword attribute) value) + (assoc :_id entity-id :_type entity-type) + (assoc-in [:_meta (keyword attribute)] + (dissoc a :id :_type :attribute :value :entity-id)) + (assoc :_relationships (when (= "entity" entity-type) + (apply entity-relationships store entity-id + (if t-t [:t-t t-t] [:t-k t-k])))))) + {}) + not-empty)))) (defn entity-existed? "Return a boolean indicating whether an entity referenced by entity-id - ever existed prior to 't-k'. 't-k' defaults to 'now' if it is not provided." - [store entity-id & {:keys [t-k]}] - (let [t-k (or t-k (t/now)) - deleted (:deleted (entity-lifecycle store entity-id))] - (boolean (some #(t/before? % t-k) deleted)))) + ever existed prior to 't-t' or 't-k'. 't-k' defaults to 'now' if it is + not provided." + [store entity-type entity-id & {:keys [t-t t-k]}] + (time! + entity-existed-time + (let [t-k (or t-k (t/now)) + deleted (:deleted (entity-lifecycle store entity-type entity-id))] + (boolean (some #(not (t/after? % (or t-t t-k))) deleted))))) + +(defn entity-history + [store entity-type entity-id] + "Return a map containing the history of changes to the entity attributes" + (time! + entity-history-time + (when-let [assertions (-> (r/db (:db-name store)) (r/table (:assertions-table store)) + (r/get-all [entity-id] {:index (:entity-id-index store)}) + (r/filter (r/fn [row] (r/eq (r/get-field row "_type") entity-type))) + (r/run (:connection store)))] + (->> assertions + (group-by :attribute) + (map-vals #(->> (map (fn [a] (compact + (select-keys a [:value + :k-start :k-end + :k-start-actor + :k-end-actor + :t-start :t-end + :t-start-actor + :t-end-actor]))) %) + (sort-by :k-start))))))) (defn entity-lifespan - "Return a collection of hash map containing the :created, :updated and + "Return a collection of hash maps containing the :created, :updated and :deleted timestamps for the entity as a whole." - [store entity-id] - (let [lifecycle (entity-lifecycle store entity-id) - ranges (map (fn [c d] [c d]) - (:created lifecycle) - (concat (:deleted lifecycle) (repeat nil))) - lifespans (group-by (fn [u] - (reduce (fn [u [c d]] - (if (and (t/before? c u) - (or (nil? d) - (t/before? u d))) - (reduced [c d]) - u)) u ranges)) - (:updated lifecycle))] - (mapv (fn [[c d]] - (compact {:created c :deleted d - :updated (get lifespans [c d])})) ranges))) + [store entity-type entity-id] + (time! + entity-lifespan-time + (let [lifecycle (entity-lifecycle store entity-type entity-id) + ranges (map (fn [c d] [c d]) + (:created lifecycle) + (concat (:deleted lifecycle) (repeat nil))) + lifespans (group-by (fn [u] + (reduce (fn [u [c d]] + (if (and (t/before? c u) + (or (nil? d) + (t/before? u d))) + (reduced [c d]) + u)) u ranges)) + (:updated lifecycle))] + (mapv (fn [[c d]] + (compact {:created c :deleted d + :updated (get lifespans [c d])})) ranges)))) (defn put-entity "Stores the assertions included in 'doc' and automatically retracts any prior @@ -82,100 +145,175 @@ only attributes included in 'doc' that are asserting new values are captured. The 'actor' making the assertions must be provided as a string." - [store actor entity-id doc & {:keys [changes-only] - :or {changes-only true}}] - (locking (entity-lock store entity-id) - (let [entity (entity-lifecycle store entity-id) - now (t/now) - doc (map-keys name doc) - attributes (->> doc keys set) - assertions (->> (dissoc doc "id") - (map (fn [[k v]] - (compact - {:entity-id (str entity-id) - :attribute k - :value v - :k-start now - :k-start-actor actor}))) - (remove nil?)) - write-assertions (fn [assertions] - (-> (r/db (:db-name store)) (r/table (:assertions-table store)) - (r/insert assertions {:conflict :replace}) - (r/run (:connection store))))] - (if (or (not entity) - (and (not-empty (:deleted entity)) - (t/before? (last (:created entity)) (last (:deleted entity))) - (t/before? (last (:deleted entity)) now))) - (do (write-assertions assertions) - (update-lifecycle store (-> entity (assoc :id entity-id) - (update :created conj now))) - {:created? true}) - (let [existing-assertions (entity-assertions store entity-id :t-k now) - duplicates (set - (when changes-only - (->> existing-assertions - (filter (fn [{:keys [attribute value]}] - (= value (get doc attribute)))) - (map :attribute)))) - attributes (->> attributes (remove duplicates) set) - retractions (->> existing-assertions - (filter #(attributes (:attribute %))) - (map #(assoc % :k-end now :k-end-actor actor))) - assertions (remove #(duplicates (:attribute %)) assertions)] - (when (not-empty assertions) - (write-assertions (concat retractions assertions)) - (update-lifecycle store (update entity :updated conj now))) - (merge - {:updated (mapv :attribute assertions)} - (when (not-empty duplicates) - {:ignored (vec duplicates)}))))))) + [store actor entity-type entity-id doc & {:keys [t-t changes-only] + :or {changes-only true}}] + (time! + put-entity-time + (locking (entity-lock store entity-type entity-id) + (let [entity (entity-lifecycle store entity-type entity-id) + now (t/now) + doc (map-keys name doc) + attributes (->> doc keys set) + assertions (->> (dissoc doc "id") + (map (fn [[k v]] + (compact + {:entity-id (str entity-id) + :_type entity-type + :attribute k + :value v + :k-start now + :k-start-actor actor + :t-start t-t}))) + (remove nil?)) + write-assertions (fn [assertions] + (-> (r/db (:db-name store)) (r/table (:assertions-table store)) + (r/insert assertions {:conflict :replace}) + (r/run (:connection store))))] + (if (or (not entity) + (and (not-empty (:deleted entity)) + (t/before? (last (:created entity)) (last (:deleted entity))) + (t/before? (last (:deleted entity)) now))) + (do (write-assertions assertions) + (update-lifecycle store entity-type (-> entity (assoc :id entity-id) + (update :created conj now))) + {:created? true}) + (let [existing-assertions (entity-assertions store entity-type entity-id :t-k now) + duplicates (set + (when changes-only + (->> existing-assertions + (filter (fn [{:keys [attribute value]}] + (= value (get doc attribute)))) + (map :attribute)))) + attributes (->> attributes (remove duplicates) set) + retractions (->> existing-assertions + (filter #(attributes (:attribute %))) + (map #(-> (assoc % :k-end now :k-end-actor actor + :t-end t-t) + (assoc :t-end-actor (when t-t actor)) + compact))) + assertions (remove #(duplicates (:attribute %)) assertions)] + (when (not-empty assertions) + (write-assertions (concat retractions assertions)) + (update-lifecycle store entity-type (update entity :updated conj now))) + (merge + {:updated (mapv :attribute assertions)} + (when (not-empty duplicates) + {:ignored (vec duplicates)})))))))) (defn delete-entity "Automatically retracts all the assertions made against the entity referenced by 'entity-id'" - [store actor entity-id] - (let [now (t/now) - retractions (->> (entity-assertions store entity-id :t-k now) - (map #(assoc % :k-end now :k-end-actor actor)))] - (update-lifecycle store (update (entity-lifecycle store entity-id) :deleted conj now)) - (-> (r/db (:db-name store)) (r/table (:assertions-table store)) - (r/insert retractions {:conflict :replace}) - (r/run (:connection store))))) + [store actor entity-type entity-id] + (time! + delete-entity-time + (let [now (t/now) + retractions (->> (entity-assertions store entity-type entity-id :t-k now) + (map #(assoc % :k-end now :k-end-actor actor)))] + (update-lifecycle store entity-type + (update (entity-lifecycle store entity-type entity-id) :deleted conj now)) + (-> (r/db (:db-name store)) (r/table (:assertions-table store)) + (r/insert retractions {:conflict :replace}) + (r/run (:connection store)))))) + +(defn aggregate-entities + [store entity-type & {:keys [filters properties t-t t-k]}] + (time! + aggregate-entities-time + (let [t-k (or t-k (t/now)) + properties (map keyword (st/split properties #",")) + entities (-> (r/db (:db-name store)) (r/table (:assertions-table store)) + (r/get-all [entity-type] {:index (:entity-type-index store)}) + (cond-> t-t (r/filter (r/fn [row] + (r/and + (r/le (r/get-field row "t-start") t-t) + (r/or (r/not (r/has-fields row "t-end")) + (r/gt (r/get-field row "t-end") t-t))))) + t-k (r/filter (r/fn [row] + (r/and + (r/le (r/get-field row "k-start") t-k) + (r/or (r/not (r/has-fields row "k-end")) + (r/gt (r/get-field row "k-end") t-k)))))) + (r/run (:connection store)) + (->> (reduce (fn [entities {:keys [entity-id attribute value]}] + (assoc-in entities + [entity-id (keyword attribute)] + value)) {}) + (map (fn [[id e]] + (assoc e :_id id :_type entity-type)))) + (cond->> (not-empty filters) + (filter #(every? (fn [k] (= (get % k) (get filters k))) (keys filters)))))] + (cond-> {:total (count entities)} + (not-empty properties) + (merge {:aggregations (->> properties + (map (fn [p] + [p (->> entities (group-by #(get % p)) + (map (fn [[value entities]] + {:key value + :doc_count (count entities)})))])) + (into {}))}))))) ;;; Implementation (defn- entity-lock - [store entity-id] - (locking (:entity-locks store) - (if-let [l (get @(:entity-locks store) entity-id)] - l - (let [l (Object.)] - (swap! (:entity-locks store) assoc entity-id l) - l)))) + [store entity-type entity-id] + (time! + entity-lock-time + (locking (:entity-locks store) + (if-let [l (get @(:entity-locks store) [entity-type entity-id])] + l + (let [l (Object.)] + (swap! (:entity-locks store) assoc [entity-type entity-id] l) + l))))) (defn- entity-lifecycle - [store entity-id] - (-> (r/db (:db-name store)) (r/table (:lifecycle-table store)) - (r/get entity-id) - (r/run (:connection store)))) + [store entity-type entity-id] + (time! + entity-lifecycle-time + (-> (r/db (:db-name store)) (r/table (:lifecycle-table store)) + (r/get entity-id) + (r/run (:connection store))))) (defn- update-lifecycle - [store lifecycle-record] - (-> (r/db (:db-name store)) (r/table (:lifecycle-table store)) - (r/insert lifecycle-record {:conflict :update}) - (r/run (:connection store)))) + [store entity-type lifecycle-record] + (time! + update-lifecycle-time + (-> (r/db (:db-name store)) (r/table (:lifecycle-table store)) + (r/insert (assoc lifecycle-record :entity-type entity-type) {:conflict :update}) + (r/run (:connection store))))) -(defn entity-assertions - [store entity-id & {:keys [t-k]}] +(defn fetch-assertions + [store entity-type id index & {:keys [t-t t-k]}] (-> (r/db (:db-name store)) (r/table (:assertions-table store)) - (r/get-all [entity-id] {:index (:entity-id-index store)}) - (cond-> t-k (r/filter (r/fn [row] + (r/get-all [id] {:index index}) + (r/filter (r/fn [row] (r/eq (r/get-field row "_type") entity-type))) + (cond-> t-t (r/filter (r/fn [row] + (r/and + (r/le (r/get-field row "t-start") t-t) + (r/or (r/not (r/has-fields row "t-end")) + (r/gt (r/get-field row "t-end") t-t))))) + t-k (r/filter (r/fn [row] (r/and (r/le (r/get-field row "k-start") t-k) (r/or (r/not (r/has-fields row "k-end")) (r/gt (r/get-field row "k-end") t-k)))))) (r/run (:connection store)))) +(defn entity-assertions + [store entity-type entity-id & {:keys [t-t t-k]}] + (time! + entity-assertions-time + (fetch-assertions store entity-type entity-id (:entity-id-index store) :t-t t-t :t-k t-k))) + +(defn entity-relationships + [store entity-id & {:keys [t-t t-k]}] + (time! + entity-relationships-time + (->> (fetch-assertions store "relationship" entity-id (:value-index store) + :t-t t-t :t-k t-k) + (map :entity-id) distinct + (pmap #(get-entity store "relationship" % :t-t t-t :t-k t-k)) + compact))) + (defn- ensure-db [conn db-name] (when-not ((set (r/run (r/db-list) conn)) db-name) -- cgit 1.2.3-korg