diff options
Diffstat (limited to 'src/gallifrey/store.clj')
-rw-r--r-- | src/gallifrey/store.clj | 197 |
1 files changed, 197 insertions, 0 deletions
diff --git a/src/gallifrey/store.clj b/src/gallifrey/store.clj new file mode 100644 index 0000000..85a17d7 --- /dev/null +++ b/src/gallifrey/store.clj @@ -0,0 +1,197 @@ +(ns gallifrey.store + (:require [utilis.map :refer [compact map-keys map-vals]] + [utilis.types.keyword :refer [->keyword]] + [rethinkdb.query :as r] + [inflections.core :refer [hyphenate underscore]] + [clj-time.core :as t] + [integrant.core :as ig]) + (:import [clojure.lang ExceptionInfo])) + +(declare ensure-db ensure-table ensure-index) + +(defmethod ig/init-key :gallifrey/store [_ {:keys [db-server db-name]}] + (let [{:keys [host port] :or {host "localhost" port 28015}} db-server + connection (r/connect :host host :port port :db db-name) + assertions-table "assertions" + lifecycle-table "lifecycle" + entity-id-index "entity-id-index"] + (ensure-db connection db-name) + (ensure-table connection db-name assertions-table) + (ensure-index connection db-name assertions-table entity-id-index :entity-id) + (ensure-table connection db-name lifecycle-table) + {:db-server db-server + :db-name db-name + :assertions-table assertions-table + :lifecycle-table lifecycle-table + :entity-id-index entity-id-index + :connection connection + :entity-locks (atom {})})) + +(defmethod ig/halt-key! :gallifrey/store [_ {:keys [connection subscriptions]}] + (.close connection) + nil) + + +(declare entity-lock entity-lifecycle update-lifecycle entity-assertions) + +(defn get-entity + "Get all the non-retracted attributes associated with an entity referenced by + entity-id at time 't-k'. 't-k' defaults to 'now' if it is not provided." + [store entity-id & {:keys [t-k]}] + (let [t-k (or t-k (t/now))] + (->> (entity-assertions store entity-id :t-k t-k) + (reduce (fn [doc {:keys [attribute value] :as a}] + (-> doc + (assoc (keyword attribute) value) + (assoc-in [:_meta (keyword attribute)] + (dissoc a :id :attribute :value :entity-id)))) + {}) + not-empty))) + +(defn entity-existed? + "Return a boolean indicating whether an entity referenced by entity-id + ever existed prior to 't-k'. 't-k' defaults to 'now' if it is not provided." + [store entity-id & {:keys [t-k]}] + (let [t-k (or t-k (t/now)) + deleted (:deleted (entity-lifecycle store entity-id))] + (boolean (some #(t/before? % t-k) deleted)))) + +(defn entity-lifespan + "Return a collection of hash map containing the :created, :updated and + :deleted timestamps for the entity as a whole." + [store entity-id] + (let [lifecycle (entity-lifecycle store entity-id) + ranges (map (fn [c d] [c d]) + (:created lifecycle) + (concat (:deleted lifecycle) (repeat nil))) + lifespans (group-by (fn [u] + (reduce (fn [u [c d]] + (if (and (t/before? c u) + (or (nil? d) + (t/before? u d))) + (reduced [c d]) + u)) u ranges)) + (:updated lifecycle))] + (mapv (fn [[c d]] + (compact {:created c :deleted d + :updated (get lifespans [c d])})) ranges))) + +(defn put-entity + "Stores the assertions included in 'doc' and automatically retracts any prior + assertions against the attributes in 'doc'. If 'changes-only' is set to true, + only attributes included in 'doc' that are asserting new values are captured. + + The 'actor' making the assertions must be provided as a string." + [store actor entity-id doc & {:keys [changes-only] + :or {changes-only true}}] + (locking (entity-lock store entity-id) + (let [entity (entity-lifecycle store entity-id) + now (t/now) + doc (map-keys name doc) + attributes (->> doc keys set) + assertions (->> (dissoc doc "id") + (map (fn [[k v]] + (compact + {:entity-id (str entity-id) + :attribute k + :value v + :k-start now + :k-start-actor actor}))) + (remove nil?)) + write-assertions (fn [assertions] + (-> (r/db (:db-name store)) (r/table (:assertions-table store)) + (r/insert assertions {:conflict :replace}) + (r/run (:connection store))))] + (if (or (not entity) + (and (not-empty (:deleted entity)) + (t/before? (last (:created entity)) (last (:deleted entity))) + (t/before? (last (:deleted entity)) now))) + (do (write-assertions assertions) + (update-lifecycle store (-> entity (assoc :id entity-id) + (update :created conj now))) + {:created? true}) + (let [existing-assertions (entity-assertions store entity-id :t-k now) + duplicates (set + (when changes-only + (->> existing-assertions + (filter (fn [{:keys [attribute value]}] + (= value (get doc attribute)))) + (map :attribute)))) + attributes (->> attributes (remove duplicates) set) + retractions (->> existing-assertions + (filter #(attributes (:attribute %))) + (map #(assoc % :k-end now :k-end-actor actor))) + assertions (remove #(duplicates (:attribute %)) assertions)] + (when (not-empty assertions) + (write-assertions (concat retractions assertions)) + (update-lifecycle store (update entity :updated conj now))) + (merge + {:updated (mapv :attribute assertions)} + (when (not-empty duplicates) + {:ignored (vec duplicates)}))))))) + +(defn delete-entity + "Automatically retracts all the assertions made against the entity referenced + by 'entity-id'" + [store actor entity-id] + (let [now (t/now) + retractions (->> (entity-assertions store entity-id :t-k now) + (map #(assoc % :k-end now :k-end-actor actor)))] + (update-lifecycle store (update (entity-lifecycle store entity-id) :deleted conj now)) + (-> (r/db (:db-name store)) (r/table (:assertions-table store)) + (r/insert retractions {:conflict :replace}) + (r/run (:connection store))))) + +;;; Implementation + +(defn- entity-lock + [store entity-id] + (locking (:entity-locks store) + (if-let [l (get @(:entity-locks store) entity-id)] + l + (let [l (Object.)] + (swap! (:entity-locks store) assoc entity-id l) + l)))) + +(defn- entity-lifecycle + [store entity-id] + (-> (r/db (:db-name store)) (r/table (:lifecycle-table store)) + (r/get entity-id) + (r/run (:connection store)))) + +(defn- update-lifecycle + [store lifecycle-record] + (-> (r/db (:db-name store)) (r/table (:lifecycle-table store)) + (r/insert lifecycle-record {:conflict :update}) + (r/run (:connection store)))) + +(defn entity-assertions + [store entity-id & {:keys [t-k]}] + (-> (r/db (:db-name store)) (r/table (:assertions-table store)) + (r/get-all [entity-id] {:index (:entity-id-index store)}) + (cond-> t-k (r/filter (r/fn [row] + (r/and + (r/le (r/get-field row "k-start") t-k) + (r/or (r/not (r/has-fields row "k-end")) + (r/gt (r/get-field row "k-end") t-k)))))) + (r/run (:connection store)))) + +(defn- ensure-db + [conn db-name] + (when-not ((set (r/run (r/db-list) conn)) db-name) + (r/run (r/db-create db-name) conn))) + +(defn- ensure-table + [conn db-name assertions-table & {:keys [init-fn]}] + (when-not ((set (r/run (r/table-list (r/db db-name)) conn)) assertions-table) + (-> (r/db db-name) (r/table-create assertions-table) (r/run conn)))) + +(defn- ensure-index + [conn db-name assertions-table index-name field] + (when-not ((set (-> (r/db db-name) (r/table assertions-table) + (r/index-list) (r/run conn))) index-name) + (-> (r/db db-name) + (r/table assertions-table) + (r/index-create index-name (r/fn [row] + (r/get-field row (->keyword field)))) + (r/run conn)))) |