aboutsummaryrefslogtreecommitdiffstats
path: root/src/gallifrey/store.clj
blob: 85a17d7bfba4ec72de6856b387d1189d74dc6be5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
(ns gallifrey.store
  (:require [utilis.map :refer [compact map-keys map-vals]]
            [utilis.types.keyword :refer [->keyword]]
            [rethinkdb.query :as r]
            [inflections.core :refer [hyphenate underscore]]
            [clj-time.core :as t]
            [integrant.core :as ig])
  (:import [clojure.lang ExceptionInfo]))

(declare ensure-db ensure-table ensure-index)

(defmethod ig/init-key :gallifrey/store [_ {:keys [db-server db-name]}]
  (let [{:keys [host port] :or {host "localhost" port 28015}} db-server
        connection (r/connect :host host :port port :db db-name)
        assertions-table "assertions"
        lifecycle-table "lifecycle"
        entity-id-index "entity-id-index"]
    (ensure-db connection db-name)
    (ensure-table connection db-name assertions-table)
    (ensure-index connection db-name assertions-table entity-id-index :entity-id)
    (ensure-table connection db-name lifecycle-table)
    {:db-server db-server
     :db-name db-name
     :assertions-table assertions-table
     :lifecycle-table lifecycle-table
     :entity-id-index entity-id-index
     :connection connection
     :entity-locks (atom {})}))

(defmethod ig/halt-key! :gallifrey/store [_ {:keys [connection subscriptions]}]
  (.close connection)
  nil)


(declare entity-lock entity-lifecycle update-lifecycle entity-assertions)

(defn get-entity
  "Get all the non-retracted attributes associated with an entity referenced by
  entity-id at time 't-k'. 't-k' defaults to 'now' if it is not provided."
  [store entity-id & {:keys [t-k]}]
  (let [t-k (or t-k (t/now))]
    (->> (entity-assertions store entity-id :t-k t-k)
         (reduce (fn [doc {:keys [attribute value] :as a}]
                   (-> doc
                       (assoc (keyword attribute) value)
                       (assoc-in [:_meta (keyword attribute)]
                                 (dissoc a :id :attribute :value :entity-id))))
                 {})
         not-empty)))

(defn entity-existed?
  "Return a boolean indicating whether an entity referenced by entity-id
  ever existed prior to 't-k'. 't-k' defaults to 'now' if it is not provided."
  [store entity-id & {:keys [t-k]}]
  (let [t-k (or t-k (t/now))
        deleted (:deleted (entity-lifecycle store entity-id))]
    (boolean (some #(t/before? % t-k) deleted))))

(defn entity-lifespan
  "Return a collection of hash map containing the :created, :updated and
  :deleted timestamps for the entity as a whole."
  [store entity-id]
  (let [lifecycle (entity-lifecycle store entity-id)
        ranges (map (fn [c d] [c d])
                    (:created lifecycle)
                    (concat (:deleted lifecycle) (repeat nil)))
        lifespans (group-by (fn [u]
                              (reduce (fn [u [c d]]
                                        (if (and (t/before? c u)
                                                 (or (nil? d)
                                                     (t/before? u d)))
                                          (reduced [c d])
                                          u)) u ranges))
                            (:updated lifecycle))]
    (mapv (fn [[c d]]
            (compact {:created c :deleted d
                      :updated (get lifespans [c d])})) ranges)))

(defn put-entity
  "Stores the assertions included in 'doc' and automatically retracts any prior
  assertions against the attributes in 'doc'. If 'changes-only' is set to true,
  only attributes included in 'doc' that are asserting new values are captured.

  The 'actor' making the assertions must be provided as a string."
  [store actor entity-id doc & {:keys [changes-only]
                                :or {changes-only true}}]
  (locking (entity-lock store entity-id)
    (let [entity (entity-lifecycle store entity-id)
          now (t/now)
          doc (map-keys name doc)
          attributes (->> doc keys set)
          assertions (->> (dissoc doc "id")
                          (map (fn [[k v]]
                                 (compact
                                  {:entity-id (str entity-id)
                                   :attribute k
                                   :value v
                                   :k-start now
                                   :k-start-actor actor})))
                          (remove nil?))
          write-assertions (fn [assertions]
                             (-> (r/db (:db-name store)) (r/table (:assertions-table store))
                                 (r/insert assertions {:conflict :replace})
                                 (r/run (:connection store))))]
      (if (or (not entity)
              (and (not-empty (:deleted entity))
                   (t/before? (last (:created entity)) (last (:deleted entity)))
                   (t/before? (last (:deleted entity)) now)))
        (do (write-assertions assertions)
            (update-lifecycle store (-> entity (assoc :id entity-id)
                                        (update :created conj now)))
            {:created? true})
        (let [existing-assertions (entity-assertions store entity-id :t-k now)
              duplicates (set
                          (when changes-only
                            (->> existing-assertions
                                 (filter (fn [{:keys [attribute value]}]
                                           (= value (get doc attribute))))
                                 (map :attribute))))
              attributes (->> attributes (remove duplicates) set)
              retractions (->> existing-assertions
                               (filter #(attributes (:attribute %)))
                               (map #(assoc % :k-end now :k-end-actor actor)))
              assertions (remove #(duplicates (:attribute %)) assertions)]
          (when (not-empty assertions)
            (write-assertions (concat retractions assertions))
            (update-lifecycle store (update entity :updated conj now)))
          (merge
           {:updated (mapv :attribute assertions)}
           (when (not-empty duplicates)
             {:ignored (vec duplicates)})))))))

(defn delete-entity
  "Automatically retracts all the assertions made against the entity referenced
  by 'entity-id'"
  [store actor entity-id]
  (let [now (t/now)
        retractions (->> (entity-assertions store entity-id :t-k now)
                         (map #(assoc % :k-end now :k-end-actor actor)))]
    (update-lifecycle store (update (entity-lifecycle store entity-id) :deleted conj now))
    (-> (r/db (:db-name store)) (r/table (:assertions-table store))
        (r/insert retractions {:conflict :replace})
        (r/run (:connection store)))))

;;; Implementation

(defn- entity-lock
  [store entity-id]
  (locking (:entity-locks store)
    (if-let [l (get @(:entity-locks store) entity-id)]
      l
      (let [l (Object.)]
        (swap! (:entity-locks store) assoc entity-id l)
        l))))

(defn- entity-lifecycle
  [store entity-id]
  (-> (r/db (:db-name store)) (r/table (:lifecycle-table store))
      (r/get entity-id)
      (r/run (:connection store))))

(defn- update-lifecycle
  [store lifecycle-record]
  (-> (r/db (:db-name store)) (r/table (:lifecycle-table store))
      (r/insert lifecycle-record {:conflict :update})
      (r/run (:connection store))))

(defn entity-assertions
  [store entity-id & {:keys [t-k]}]
  (-> (r/db (:db-name store)) (r/table (:assertions-table store))
      (r/get-all [entity-id] {:index (:entity-id-index store)})
      (cond-> t-k (r/filter (r/fn [row]
                              (r/and
                               (r/le (r/get-field row "k-start") t-k)
                               (r/or (r/not (r/has-fields row "k-end"))
                                     (r/gt (r/get-field row "k-end") t-k))))))
      (r/run (:connection store))))

(defn- ensure-db
  [conn db-name]
  (when-not ((set (r/run (r/db-list) conn)) db-name)
    (r/run (r/db-create db-name) conn)))

(defn- ensure-table
  [conn db-name assertions-table & {:keys [init-fn]}]
  (when-not ((set (r/run (r/table-list (r/db db-name)) conn)) assertions-table)
    (-> (r/db db-name) (r/table-create assertions-table) (r/run conn))))

(defn- ensure-index
  [conn db-name assertions-table index-name field]
  (when-not ((set (-> (r/db db-name) (r/table assertions-table)
                      (r/index-list) (r/run conn))) index-name)
    (-> (r/db db-name)
        (r/table assertions-table)
        (r/index-create index-name (r/fn [row]
                                     (r/get-field row (->keyword field))))
        (r/run conn))))