diff options
Diffstat (limited to 'src/gallifrey/store.clj')
-rw-r--r-- | src/gallifrey/store.clj | 360 |
1 files changed, 249 insertions, 111 deletions
diff --git a/src/gallifrey/store.clj b/src/gallifrey/store.clj index 85a17d7..4f10f7c 100644 --- a/src/gallifrey/store.clj +++ b/src/gallifrey/store.clj @@ -1,12 +1,34 @@ + (ns gallifrey.store (:require [utilis.map :refer [compact map-keys map-vals]] [utilis.types.keyword :refer [->keyword]] [rethinkdb.query :as r] [inflections.core :refer [hyphenate underscore]] [clj-time.core :as t] - [integrant.core :as ig]) + [metrics.core :refer [default-registry]] + [metrics.timers :refer [deftimer time!]] + [integrant.core :as ig] + [clojure.string :as st]) (:import [clojure.lang ExceptionInfo])) +(deftimer default-registry ["gallifrey" "store" get-entity-time]) +(deftimer default-registry ["gallifrey" "store" put-entity-time]) +(deftimer default-registry ["gallifrey" "store" delete-entity-time]) +(deftimer default-registry ["gallifrey" "store" aggregate-entities-time]) + +(deftimer default-registry ["gallifrey" "store" entity-lifespan-time]) +(deftimer default-registry ["gallifrey" "store" entity-history-time]) + +(deftimer default-registry ["gallifrey" "store" entity-existed-time]) + +(deftimer default-registry ["gallifrey" "store" entity-assertions-time]) +(deftimer default-registry ["gallifrey" "store" entity-relationships-time]) + +(deftimer default-registry ["gallifrey" "store" entity-lifecycle-time]) +(deftimer default-registry ["gallifrey" "store" update-lifecycle-time]) + +(deftimer default-registry ["gallifrey" "store" entity-lock-time]) + (declare ensure-db ensure-table ensure-index) (defmethod ig/init-key :gallifrey/store [_ {:keys [db-server db-name]}] @@ -14,16 +36,25 @@ connection (r/connect :host host :port port :db db-name) assertions-table "assertions" lifecycle-table "lifecycle" - entity-id-index "entity-id-index"] + entity-id-index "entity-id-index" + entity-type-index "entity-type-index" + attribute-index "attribute-index" + value-index "value-index"] (ensure-db connection db-name) (ensure-table connection db-name assertions-table) (ensure-index connection db-name assertions-table entity-id-index :entity-id) + (ensure-index connection db-name assertions-table entity-type-index :_type) + (ensure-index connection db-name assertions-table attribute-index :attribute) + (ensure-index connection db-name assertions-table value-index :value) (ensure-table connection db-name lifecycle-table) {:db-server db-server :db-name db-name :assertions-table assertions-table :lifecycle-table lifecycle-table :entity-id-index entity-id-index + :entity-type-index entity-type-index + :attribute-index attribute-index + :value-index value-index :connection connection :entity-locks (atom {})})) @@ -31,50 +62,82 @@ (.close connection) nil) - -(declare entity-lock entity-lifecycle update-lifecycle entity-assertions) +(declare entity-lock entity-lifecycle update-lifecycle + entity-assertions entity-relationships) (defn get-entity "Get all the non-retracted attributes associated with an entity referenced by - entity-id at time 't-k'. 't-k' defaults to 'now' if it is not provided." - [store entity-id & {:keys [t-k]}] - (let [t-k (or t-k (t/now))] - (->> (entity-assertions store entity-id :t-k t-k) - (reduce (fn [doc {:keys [attribute value] :as a}] - (-> doc - (assoc (keyword attribute) value) - (assoc-in [:_meta (keyword attribute)] - (dissoc a :id :attribute :value :entity-id)))) - {}) - not-empty))) + entity-id at time 't-t' or 't-k'. 't-k' defaults to 'now' if it is not provided." + [store entity-type entity-id & {:keys [t-t t-k]}] + (time! + get-entity-time + (let [t-k (or t-k (t/now))] + (->> (apply entity-assertions store entity-type entity-id (if t-t [:t-t t-t] [:t-k t-k])) + (reduce (fn [doc {:keys [attribute value] :as a}] + (-> doc + (assoc (keyword attribute) value) + (assoc :_id entity-id :_type entity-type) + (assoc-in [:_meta (keyword attribute)] + (dissoc a :id :_type :attribute :value :entity-id)) + (assoc :_relationships (when (= "entity" entity-type) + (apply entity-relationships store entity-id + (if t-t [:t-t t-t] [:t-k t-k])))))) + {}) + not-empty)))) (defn entity-existed? "Return a boolean indicating whether an entity referenced by entity-id - ever existed prior to 't-k'. 't-k' defaults to 'now' if it is not provided." - [store entity-id & {:keys [t-k]}] - (let [t-k (or t-k (t/now)) - deleted (:deleted (entity-lifecycle store entity-id))] - (boolean (some #(t/before? % t-k) deleted)))) + ever existed prior to 't-t' or 't-k'. 't-k' defaults to 'now' if it is + not provided." + [store entity-type entity-id & {:keys [t-t t-k]}] + (time! + entity-existed-time + (let [t-k (or t-k (t/now)) + deleted (:deleted (entity-lifecycle store entity-type entity-id))] + (boolean (some #(not (t/after? % (or t-t t-k))) deleted))))) + +(defn entity-history + [store entity-type entity-id] + "Return a map containing the history of changes to the entity attributes" + (time! + entity-history-time + (when-let [assertions (-> (r/db (:db-name store)) (r/table (:assertions-table store)) + (r/get-all [entity-id] {:index (:entity-id-index store)}) + (r/filter (r/fn [row] (r/eq (r/get-field row "_type") entity-type))) + (r/run (:connection store)))] + (->> assertions + (group-by :attribute) + (map-vals #(->> (map (fn [a] (compact + (select-keys a [:value + :k-start :k-end + :k-start-actor + :k-end-actor + :t-start :t-end + :t-start-actor + :t-end-actor]))) %) + (sort-by :k-start))))))) (defn entity-lifespan - "Return a collection of hash map containing the :created, :updated and + "Return a collection of hash maps containing the :created, :updated and :deleted timestamps for the entity as a whole." - [store entity-id] - (let [lifecycle (entity-lifecycle store entity-id) - ranges (map (fn [c d] [c d]) - (:created lifecycle) - (concat (:deleted lifecycle) (repeat nil))) - lifespans (group-by (fn [u] - (reduce (fn [u [c d]] - (if (and (t/before? c u) - (or (nil? d) - (t/before? u d))) - (reduced [c d]) - u)) u ranges)) - (:updated lifecycle))] - (mapv (fn [[c d]] - (compact {:created c :deleted d - :updated (get lifespans [c d])})) ranges))) + [store entity-type entity-id] + (time! + entity-lifespan-time + (let [lifecycle (entity-lifecycle store entity-type entity-id) + ranges (map (fn [c d] [c d]) + (:created lifecycle) + (concat (:deleted lifecycle) (repeat nil))) + lifespans (group-by (fn [u] + (reduce (fn [u [c d]] + (if (and (t/before? c u) + (or (nil? d) + (t/before? u d))) + (reduced [c d]) + u)) u ranges)) + (:updated lifecycle))] + (mapv (fn [[c d]] + (compact {:created c :deleted d + :updated (get lifespans [c d])})) ranges)))) (defn put-entity "Stores the assertions included in 'doc' and automatically retracts any prior @@ -82,100 +145,175 @@ only attributes included in 'doc' that are asserting new values are captured. The 'actor' making the assertions must be provided as a string." - [store actor entity-id doc & {:keys [changes-only] - :or {changes-only true}}] - (locking (entity-lock store entity-id) - (let [entity (entity-lifecycle store entity-id) - now (t/now) - doc (map-keys name doc) - attributes (->> doc keys set) - assertions (->> (dissoc doc "id") - (map (fn [[k v]] - (compact - {:entity-id (str entity-id) - :attribute k - :value v - :k-start now - :k-start-actor actor}))) - (remove nil?)) - write-assertions (fn [assertions] - (-> (r/db (:db-name store)) (r/table (:assertions-table store)) - (r/insert assertions {:conflict :replace}) - (r/run (:connection store))))] - (if (or (not entity) - (and (not-empty (:deleted entity)) - (t/before? (last (:created entity)) (last (:deleted entity))) - (t/before? (last (:deleted entity)) now))) - (do (write-assertions assertions) - (update-lifecycle store (-> entity (assoc :id entity-id) - (update :created conj now))) - {:created? true}) - (let [existing-assertions (entity-assertions store entity-id :t-k now) - duplicates (set - (when changes-only - (->> existing-assertions - (filter (fn [{:keys [attribute value]}] - (= value (get doc attribute)))) - (map :attribute)))) - attributes (->> attributes (remove duplicates) set) - retractions (->> existing-assertions - (filter #(attributes (:attribute %))) - (map #(assoc % :k-end now :k-end-actor actor))) - assertions (remove #(duplicates (:attribute %)) assertions)] - (when (not-empty assertions) - (write-assertions (concat retractions assertions)) - (update-lifecycle store (update entity :updated conj now))) - (merge - {:updated (mapv :attribute assertions)} - (when (not-empty duplicates) - {:ignored (vec duplicates)}))))))) + [store actor entity-type entity-id doc & {:keys [t-t changes-only] + :or {changes-only true}}] + (time! + put-entity-time + (locking (entity-lock store entity-type entity-id) + (let [entity (entity-lifecycle store entity-type entity-id) + now (t/now) + doc (map-keys name doc) + attributes (->> doc keys set) + assertions (->> (dissoc doc "id") + (map (fn [[k v]] + (compact + {:entity-id (str entity-id) + :_type entity-type + :attribute k + :value v + :k-start now + :k-start-actor actor + :t-start t-t}))) + (remove nil?)) + write-assertions (fn [assertions] + (-> (r/db (:db-name store)) (r/table (:assertions-table store)) + (r/insert assertions {:conflict :replace}) + (r/run (:connection store))))] + (if (or (not entity) + (and (not-empty (:deleted entity)) + (t/before? (last (:created entity)) (last (:deleted entity))) + (t/before? (last (:deleted entity)) now))) + (do (write-assertions assertions) + (update-lifecycle store entity-type (-> entity (assoc :id entity-id) + (update :created conj now))) + {:created? true}) + (let [existing-assertions (entity-assertions store entity-type entity-id :t-k now) + duplicates (set + (when changes-only + (->> existing-assertions + (filter (fn [{:keys [attribute value]}] + (= value (get doc attribute)))) + (map :attribute)))) + attributes (->> attributes (remove duplicates) set) + retractions (->> existing-assertions + (filter #(attributes (:attribute %))) + (map #(-> (assoc % :k-end now :k-end-actor actor + :t-end t-t) + (assoc :t-end-actor (when t-t actor)) + compact))) + assertions (remove #(duplicates (:attribute %)) assertions)] + (when (not-empty assertions) + (write-assertions (concat retractions assertions)) + (update-lifecycle store entity-type (update entity :updated conj now))) + (merge + {:updated (mapv :attribute assertions)} + (when (not-empty duplicates) + {:ignored (vec duplicates)})))))))) (defn delete-entity "Automatically retracts all the assertions made against the entity referenced by 'entity-id'" - [store actor entity-id] - (let [now (t/now) - retractions (->> (entity-assertions store entity-id :t-k now) - (map #(assoc % :k-end now :k-end-actor actor)))] - (update-lifecycle store (update (entity-lifecycle store entity-id) :deleted conj now)) - (-> (r/db (:db-name store)) (r/table (:assertions-table store)) - (r/insert retractions {:conflict :replace}) - (r/run (:connection store))))) + [store actor entity-type entity-id] + (time! + delete-entity-time + (let [now (t/now) + retractions (->> (entity-assertions store entity-type entity-id :t-k now) + (map #(assoc % :k-end now :k-end-actor actor)))] + (update-lifecycle store entity-type + (update (entity-lifecycle store entity-type entity-id) :deleted conj now)) + (-> (r/db (:db-name store)) (r/table (:assertions-table store)) + (r/insert retractions {:conflict :replace}) + (r/run (:connection store)))))) + +(defn aggregate-entities + [store entity-type & {:keys [filters properties t-t t-k]}] + (time! + aggregate-entities-time + (let [t-k (or t-k (t/now)) + properties (map keyword (st/split properties #",")) + entities (-> (r/db (:db-name store)) (r/table (:assertions-table store)) + (r/get-all [entity-type] {:index (:entity-type-index store)}) + (cond-> t-t (r/filter (r/fn [row] + (r/and + (r/le (r/get-field row "t-start") t-t) + (r/or (r/not (r/has-fields row "t-end")) + (r/gt (r/get-field row "t-end") t-t))))) + t-k (r/filter (r/fn [row] + (r/and + (r/le (r/get-field row "k-start") t-k) + (r/or (r/not (r/has-fields row "k-end")) + (r/gt (r/get-field row "k-end") t-k)))))) + (r/run (:connection store)) + (->> (reduce (fn [entities {:keys [entity-id attribute value]}] + (assoc-in entities + [entity-id (keyword attribute)] + value)) {}) + (map (fn [[id e]] + (assoc e :_id id :_type entity-type)))) + (cond->> (not-empty filters) + (filter #(every? (fn [k] (= (get % k) (get filters k))) (keys filters)))))] + (cond-> {:total (count entities)} + (not-empty properties) + (merge {:aggregations (->> properties + (map (fn [p] + [p (->> entities (group-by #(get % p)) + (map (fn [[value entities]] + {:key value + :doc_count (count entities)})))])) + (into {}))}))))) ;;; Implementation (defn- entity-lock - [store entity-id] - (locking (:entity-locks store) - (if-let [l (get @(:entity-locks store) entity-id)] - l - (let [l (Object.)] - (swap! (:entity-locks store) assoc entity-id l) - l)))) + [store entity-type entity-id] + (time! + entity-lock-time + (locking (:entity-locks store) + (if-let [l (get @(:entity-locks store) [entity-type entity-id])] + l + (let [l (Object.)] + (swap! (:entity-locks store) assoc [entity-type entity-id] l) + l))))) (defn- entity-lifecycle - [store entity-id] - (-> (r/db (:db-name store)) (r/table (:lifecycle-table store)) - (r/get entity-id) - (r/run (:connection store)))) + [store entity-type entity-id] + (time! + entity-lifecycle-time + (-> (r/db (:db-name store)) (r/table (:lifecycle-table store)) + (r/get entity-id) + (r/run (:connection store))))) (defn- update-lifecycle - [store lifecycle-record] - (-> (r/db (:db-name store)) (r/table (:lifecycle-table store)) - (r/insert lifecycle-record {:conflict :update}) - (r/run (:connection store)))) + [store entity-type lifecycle-record] + (time! + update-lifecycle-time + (-> (r/db (:db-name store)) (r/table (:lifecycle-table store)) + (r/insert (assoc lifecycle-record :entity-type entity-type) {:conflict :update}) + (r/run (:connection store))))) -(defn entity-assertions - [store entity-id & {:keys [t-k]}] +(defn fetch-assertions + [store entity-type id index & {:keys [t-t t-k]}] (-> (r/db (:db-name store)) (r/table (:assertions-table store)) - (r/get-all [entity-id] {:index (:entity-id-index store)}) - (cond-> t-k (r/filter (r/fn [row] + (r/get-all [id] {:index index}) + (r/filter (r/fn [row] (r/eq (r/get-field row "_type") entity-type))) + (cond-> t-t (r/filter (r/fn [row] + (r/and + (r/le (r/get-field row "t-start") t-t) + (r/or (r/not (r/has-fields row "t-end")) + (r/gt (r/get-field row "t-end") t-t))))) + t-k (r/filter (r/fn [row] (r/and (r/le (r/get-field row "k-start") t-k) (r/or (r/not (r/has-fields row "k-end")) (r/gt (r/get-field row "k-end") t-k)))))) (r/run (:connection store)))) +(defn entity-assertions + [store entity-type entity-id & {:keys [t-t t-k]}] + (time! + entity-assertions-time + (fetch-assertions store entity-type entity-id (:entity-id-index store) :t-t t-t :t-k t-k))) + +(defn entity-relationships + [store entity-id & {:keys [t-t t-k]}] + (time! + entity-relationships-time + (->> (fetch-assertions store "relationship" entity-id (:value-index store) + :t-t t-t :t-k t-k) + (map :entity-id) distinct + (pmap #(get-entity store "relationship" % :t-t t-t :t-k t-k)) + compact))) + (defn- ensure-db [conn db-name] (when-not ((set (r/run (r/db-list) conn)) db-name) |