summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJames Forsyth <jf2512@att.com>2018-04-19 13:22:38 +0000
committerGerrit Code Review <gerrit@onap.org>2018-04-19 13:22:38 +0000
commit236c2ccbe6ae61afd7677b409ff52fd7a6ce9298 (patch)
tree494a36e63e81a35e2636bc9601731f8543875f69
parentdc4797502d3620b5cab12f867163ac8f23cb2167 (diff)
parent73558f1cf1accc20b58e728f44703404a50aad57 (diff)
Merge "Fix graphwalk capabilities"
-rw-r--r--project.clj2
-rw-r--r--src/gallifrey/handler.clj128
-rw-r--r--src/gallifrey/store.clj360
3 files changed, 342 insertions, 148 deletions
diff --git a/project.clj b/project.clj
index d520863..8637676 100644
--- a/project.clj
+++ b/project.clj
@@ -17,6 +17,8 @@
[integrant "0.6.2"]
[clojure-future-spec "1.9.0-beta4"]
+ [metrics-clojure "2.10.0"]
+ [metrics-clojure-ring "2.10.0"]
[yogthos/config "0.9"]]
:min-lein-version "2.5.3"
:profiles {:dev {:source-paths ["dev"]
diff --git a/src/gallifrey/handler.clj b/src/gallifrey/handler.clj
index 52d59c7..61bdd16 100644
--- a/src/gallifrey/handler.clj
+++ b/src/gallifrey/handler.clj
@@ -1,6 +1,7 @@
(ns gallifrey.handler
(:require [gallifrey.store :as store]
[utilis.map :refer [map-vals compact]]
+ [utilis.fn :refer [fsafe apply-kw]]
[liberator.core :refer [defresource]]
[liberator.representation :refer [as-response ring-response]]
[compojure.core :refer [GET PUT PATCH ANY defroutes]]
@@ -11,6 +12,8 @@
[ring.middleware.session :refer [wrap-session]]
[cheshire.core :as json]
[clj-time.format :as tf]
+ [metrics.ring.instrument :refer [instrument]]
+ [metrics.ring.expose :refer [expose-metrics-as-json]]
[integrant.core :as ig]))
(declare handler)
@@ -24,81 +27,132 @@
(defmethod ig/halt-key! :gallifrey/handler [_ _]
(reset! the-store nil))
-(declare serialize de-serialize)
+(declare parse-ts serialize serialize-entity serialize-relationship)
(defn entity-existed?
- [id & {:keys [t-k]}]
- (store/entity-existed? @the-store id :t-k t-k))
+ [type id & {:keys [t-k]}]
+ (store/entity-existed? @the-store type id :t-k t-k))
-(defresource entity-endpoint [id]
+(defresource entity-endpoint [type id]
:allowed-methods [:get :put :delete]
:available-media-types ["application/json"]
:malformed? (fn [ctx]
(when (#{:put :delete} (-> ctx :request :request-method))
(-> ctx :request :params :actor empty?)))
:exists? (fn [ctx]
- (if-let [resource (->> (when-let [t-k (-> ctx :request :params :t-k)]
- (tf/parse t-k))
- (store/get-entity @the-store id :t-k)
- serialize)]
- {::resource resource}
+ (if-let [resource (store/get-entity @the-store type id
+ :t-t (parse-ts ctx :t-t) :t-k (parse-ts ctx :t-k))]
+ {::resource ((case type
+ "entity" serialize-entity
+ "relationship" serialize-relationship) resource)}
(when (and (= :put (-> ctx :request :request-method))
(-> ctx :request :params :create not-empty))
true)))
- :existed? (fn [ctx]
- (entity-existed? id :t-k (when-let [t-k (-> ctx :request :params :t-k)]
- (tf/parse t-k))))
+ :existed? (fn [ctx] (entity-existed? type id :t-k (parse-ts ctx :t-k)))
:handle-ok ::resource
:can-put-to-missing? false
:handle-not-implemented (fn [{{m :request-method} :request :as ctx}]
(when (= :put m)
(-> (as-response "Resource not found" ctx)
- (assoc :status (if (entity-existed? id) 410 404))
- (ring-response))))
+ (assoc :status (if (entity-existed? type id) 410 404))
+ ring-response)))
:put! (fn [ctx]
(let [body (json/parse-string (slurp (get-in ctx [:request :body])))
+ properties (get body "properties")
+ source {"source.type" (get-in body ["source" "type"])
+ "source.id" (get-in body ["source" "id"])}
+ target {"target.type" (get-in body ["target" "type"])
+ "target.id" (get-in body ["target" "id"])}
actor (-> ctx :request :params :actor)
- changes-only (when-let [c (-> ctx :request :params :changes-only)]
- (boolean c))]
- {::created? (:created? (store/put-entity @the-store actor id body
+ changes-only (boolean (-> ctx :request :params :changes-only))]
+ {::created? (:created? (store/put-entity @the-store actor type id
+ (compact (merge properties source target))
+ :t-t (parse-ts ctx :t-t)
:changes-only changes-only))}))
:delete! (fn [ctx]
- (let [actor (-> ctx :request :params :actor)]
- (store/delete-entity @the-store actor id)))
+ (store/delete-entity @the-store (-> ctx :request :params :actor) type id))
:new? ::created?)
-(defresource entity-lifespan-endpoint [id]
+(defresource entity-history-endpoint [type id]
:allowed-methods [:get]
:available-media-types ["application/json"]
:exists? (fn [ctx]
(when-let [resource (not-empty
- (store/entity-lifespan @the-store id))]
- {::resource (-> resource
- (update :created str)
- (update :updated (partial map str))
- (update :deleted str)
- compact)}))
+ (store/entity-history @the-store type id))]
+ {::resource (map-vals (partial map #(-> %
+ (update :k-start str)
+ (update :k-end str)
+ (update :t-start str)
+ (update :t-end str)
+ compact)) resource)}))
:handle-ok ::resource)
+(defresource entity-lifespan-endpoint [type id]
+ :allowed-methods [:get]
+ :available-media-types ["application/json"]
+ :exists? (fn [ctx]
+ (when-let [resource (not-empty
+ (store/entity-lifespan @the-store type id))]
+ {::resource (map #(-> %
+ (update :created str)
+ (update :updated (partial map str))
+ (update :deleted str)
+ compact) resource)}))
+ :handle-ok ::resource)
+
+(defresource entity-aggregation-endpoint [type]
+ :allowed-methods [:get]
+ :available-media-types ["application/json"]
+ :handle-ok #(store/aggregate-entities @the-store type
+ :filters (->> (dissoc (get-in % [:request :params])
+ :properties :t-t :t-k
+ :gallifrey-type)
+ (map (fn [[k v]]
+ (cond
+ (= "null" v) [k nil]
+ :else [k v])))
+ (into {}))
+ :properties (get-in % [:request :params :properties])
+ :t-t (parse-ts % :t-t) :t-k (parse-ts % :t-k)))
+
(defroutes app-routes
- (ANY "/entity/:id" [id] (entity-endpoint id))
- (GET "/entity/:id/lifespan" [id] (entity-lifespan-endpoint id))
+ (GET "/:gallifrey-type/aggregations" [gallifrey-type] (entity-aggregation-endpoint gallifrey-type))
+ (ANY "/:gallifrey-type/:id" [gallifrey-type id] (entity-endpoint gallifrey-type id))
+ (GET "/:gallifrey-type/:id/history" [gallifrey-type id] (entity-history-endpoint gallifrey-type id))
+ (GET "/:gallifrey-type/:id/lifespan" [gallifrey-type id] (entity-lifespan-endpoint gallifrey-type id))
(resources "/"))
(def handler
(-> app-routes
- (wrap-defaults api-defaults)))
+ (wrap-defaults api-defaults)
+ instrument
+ expose-metrics-as-json))
;;; Implementation
-(defn- serialize
- [e]
+(defn- parse-ts
+ [ctx key]
+ (when-let [ts (get-in ctx [:request :params key])]
+ (tf/parse ts)))
+
+(defn- serialize-relationship
+ [resource]
(compact
- (update e :_meta #(map-vals
- (fn [m]
- (map-vals str m)) %))))
+ {:properties (dissoc resource :_id :_meta
+ :source.type :source.id
+ :target.type :target.id)
+ :source {:type (:source.type resource)
+ :id (:source.id resource)}
+ :target {:type (:target.type resource)
+ :id (:target.id resource)}
+ :_id (:_id resource)
+ :_meta (map-vals (partial map-vals str) (:_meta resource))}))
-(defn- de-serialize
- [e]
- e)
+(defn- serialize-entity
+ [resource]
+ (compact
+ {:properties (dissoc resource :_id :_meta :_relationships)
+ :relationships (map serialize-relationship (:_relationships resource))
+ :_id (:_id resource)
+ :_meta (map-vals (partial map-vals str) (:_meta resource))}))
diff --git a/src/gallifrey/store.clj b/src/gallifrey/store.clj
index 85a17d7..4f10f7c 100644
--- a/src/gallifrey/store.clj
+++ b/src/gallifrey/store.clj
@@ -1,12 +1,34 @@
+
(ns gallifrey.store
(:require [utilis.map :refer [compact map-keys map-vals]]
[utilis.types.keyword :refer [->keyword]]
[rethinkdb.query :as r]
[inflections.core :refer [hyphenate underscore]]
[clj-time.core :as t]
- [integrant.core :as ig])
+ [metrics.core :refer [default-registry]]
+ [metrics.timers :refer [deftimer time!]]
+ [integrant.core :as ig]
+ [clojure.string :as st])
(:import [clojure.lang ExceptionInfo]))
+(deftimer default-registry ["gallifrey" "store" get-entity-time])
+(deftimer default-registry ["gallifrey" "store" put-entity-time])
+(deftimer default-registry ["gallifrey" "store" delete-entity-time])
+(deftimer default-registry ["gallifrey" "store" aggregate-entities-time])
+
+(deftimer default-registry ["gallifrey" "store" entity-lifespan-time])
+(deftimer default-registry ["gallifrey" "store" entity-history-time])
+
+(deftimer default-registry ["gallifrey" "store" entity-existed-time])
+
+(deftimer default-registry ["gallifrey" "store" entity-assertions-time])
+(deftimer default-registry ["gallifrey" "store" entity-relationships-time])
+
+(deftimer default-registry ["gallifrey" "store" entity-lifecycle-time])
+(deftimer default-registry ["gallifrey" "store" update-lifecycle-time])
+
+(deftimer default-registry ["gallifrey" "store" entity-lock-time])
+
(declare ensure-db ensure-table ensure-index)
(defmethod ig/init-key :gallifrey/store [_ {:keys [db-server db-name]}]
@@ -14,16 +36,25 @@
connection (r/connect :host host :port port :db db-name)
assertions-table "assertions"
lifecycle-table "lifecycle"
- entity-id-index "entity-id-index"]
+ entity-id-index "entity-id-index"
+ entity-type-index "entity-type-index"
+ attribute-index "attribute-index"
+ value-index "value-index"]
(ensure-db connection db-name)
(ensure-table connection db-name assertions-table)
(ensure-index connection db-name assertions-table entity-id-index :entity-id)
+ (ensure-index connection db-name assertions-table entity-type-index :_type)
+ (ensure-index connection db-name assertions-table attribute-index :attribute)
+ (ensure-index connection db-name assertions-table value-index :value)
(ensure-table connection db-name lifecycle-table)
{:db-server db-server
:db-name db-name
:assertions-table assertions-table
:lifecycle-table lifecycle-table
:entity-id-index entity-id-index
+ :entity-type-index entity-type-index
+ :attribute-index attribute-index
+ :value-index value-index
:connection connection
:entity-locks (atom {})}))
@@ -31,50 +62,82 @@
(.close connection)
nil)
-
-(declare entity-lock entity-lifecycle update-lifecycle entity-assertions)
+(declare entity-lock entity-lifecycle update-lifecycle
+ entity-assertions entity-relationships)
(defn get-entity
"Get all the non-retracted attributes associated with an entity referenced by
- entity-id at time 't-k'. 't-k' defaults to 'now' if it is not provided."
- [store entity-id & {:keys [t-k]}]
- (let [t-k (or t-k (t/now))]
- (->> (entity-assertions store entity-id :t-k t-k)
- (reduce (fn [doc {:keys [attribute value] :as a}]
- (-> doc
- (assoc (keyword attribute) value)
- (assoc-in [:_meta (keyword attribute)]
- (dissoc a :id :attribute :value :entity-id))))
- {})
- not-empty)))
+ entity-id at time 't-t' or 't-k'. 't-k' defaults to 'now' if it is not provided."
+ [store entity-type entity-id & {:keys [t-t t-k]}]
+ (time!
+ get-entity-time
+ (let [t-k (or t-k (t/now))]
+ (->> (apply entity-assertions store entity-type entity-id (if t-t [:t-t t-t] [:t-k t-k]))
+ (reduce (fn [doc {:keys [attribute value] :as a}]
+ (-> doc
+ (assoc (keyword attribute) value)
+ (assoc :_id entity-id :_type entity-type)
+ (assoc-in [:_meta (keyword attribute)]
+ (dissoc a :id :_type :attribute :value :entity-id))
+ (assoc :_relationships (when (= "entity" entity-type)
+ (apply entity-relationships store entity-id
+ (if t-t [:t-t t-t] [:t-k t-k]))))))
+ {})
+ not-empty))))
(defn entity-existed?
"Return a boolean indicating whether an entity referenced by entity-id
- ever existed prior to 't-k'. 't-k' defaults to 'now' if it is not provided."
- [store entity-id & {:keys [t-k]}]
- (let [t-k (or t-k (t/now))
- deleted (:deleted (entity-lifecycle store entity-id))]
- (boolean (some #(t/before? % t-k) deleted))))
+ ever existed prior to 't-t' or 't-k'. 't-k' defaults to 'now' if it is
+ not provided."
+ [store entity-type entity-id & {:keys [t-t t-k]}]
+ (time!
+ entity-existed-time
+ (let [t-k (or t-k (t/now))
+ deleted (:deleted (entity-lifecycle store entity-type entity-id))]
+ (boolean (some #(not (t/after? % (or t-t t-k))) deleted)))))
+
+(defn entity-history
+ [store entity-type entity-id]
+ "Return a map containing the history of changes to the entity attributes"
+ (time!
+ entity-history-time
+ (when-let [assertions (-> (r/db (:db-name store)) (r/table (:assertions-table store))
+ (r/get-all [entity-id] {:index (:entity-id-index store)})
+ (r/filter (r/fn [row] (r/eq (r/get-field row "_type") entity-type)))
+ (r/run (:connection store)))]
+ (->> assertions
+ (group-by :attribute)
+ (map-vals #(->> (map (fn [a] (compact
+ (select-keys a [:value
+ :k-start :k-end
+ :k-start-actor
+ :k-end-actor
+ :t-start :t-end
+ :t-start-actor
+ :t-end-actor]))) %)
+ (sort-by :k-start)))))))
(defn entity-lifespan
- "Return a collection of hash map containing the :created, :updated and
+ "Return a collection of hash maps containing the :created, :updated and
:deleted timestamps for the entity as a whole."
- [store entity-id]
- (let [lifecycle (entity-lifecycle store entity-id)
- ranges (map (fn [c d] [c d])
- (:created lifecycle)
- (concat (:deleted lifecycle) (repeat nil)))
- lifespans (group-by (fn [u]
- (reduce (fn [u [c d]]
- (if (and (t/before? c u)
- (or (nil? d)
- (t/before? u d)))
- (reduced [c d])
- u)) u ranges))
- (:updated lifecycle))]
- (mapv (fn [[c d]]
- (compact {:created c :deleted d
- :updated (get lifespans [c d])})) ranges)))
+ [store entity-type entity-id]
+ (time!
+ entity-lifespan-time
+ (let [lifecycle (entity-lifecycle store entity-type entity-id)
+ ranges (map (fn [c d] [c d])
+ (:created lifecycle)
+ (concat (:deleted lifecycle) (repeat nil)))
+ lifespans (group-by (fn [u]
+ (reduce (fn [u [c d]]
+ (if (and (t/before? c u)
+ (or (nil? d)
+ (t/before? u d)))
+ (reduced [c d])
+ u)) u ranges))
+ (:updated lifecycle))]
+ (mapv (fn [[c d]]
+ (compact {:created c :deleted d
+ :updated (get lifespans [c d])})) ranges))))
(defn put-entity
"Stores the assertions included in 'doc' and automatically retracts any prior
@@ -82,100 +145,175 @@
only attributes included in 'doc' that are asserting new values are captured.
The 'actor' making the assertions must be provided as a string."
- [store actor entity-id doc & {:keys [changes-only]
- :or {changes-only true}}]
- (locking (entity-lock store entity-id)
- (let [entity (entity-lifecycle store entity-id)
- now (t/now)
- doc (map-keys name doc)
- attributes (->> doc keys set)
- assertions (->> (dissoc doc "id")
- (map (fn [[k v]]
- (compact
- {:entity-id (str entity-id)
- :attribute k
- :value v
- :k-start now
- :k-start-actor actor})))
- (remove nil?))
- write-assertions (fn [assertions]
- (-> (r/db (:db-name store)) (r/table (:assertions-table store))
- (r/insert assertions {:conflict :replace})
- (r/run (:connection store))))]
- (if (or (not entity)
- (and (not-empty (:deleted entity))
- (t/before? (last (:created entity)) (last (:deleted entity)))
- (t/before? (last (:deleted entity)) now)))
- (do (write-assertions assertions)
- (update-lifecycle store (-> entity (assoc :id entity-id)
- (update :created conj now)))
- {:created? true})
- (let [existing-assertions (entity-assertions store entity-id :t-k now)
- duplicates (set
- (when changes-only
- (->> existing-assertions
- (filter (fn [{:keys [attribute value]}]
- (= value (get doc attribute))))
- (map :attribute))))
- attributes (->> attributes (remove duplicates) set)
- retractions (->> existing-assertions
- (filter #(attributes (:attribute %)))
- (map #(assoc % :k-end now :k-end-actor actor)))
- assertions (remove #(duplicates (:attribute %)) assertions)]
- (when (not-empty assertions)
- (write-assertions (concat retractions assertions))
- (update-lifecycle store (update entity :updated conj now)))
- (merge
- {:updated (mapv :attribute assertions)}
- (when (not-empty duplicates)
- {:ignored (vec duplicates)})))))))
+ [store actor entity-type entity-id doc & {:keys [t-t changes-only]
+ :or {changes-only true}}]
+ (time!
+ put-entity-time
+ (locking (entity-lock store entity-type entity-id)
+ (let [entity (entity-lifecycle store entity-type entity-id)
+ now (t/now)
+ doc (map-keys name doc)
+ attributes (->> doc keys set)
+ assertions (->> (dissoc doc "id")
+ (map (fn [[k v]]
+ (compact
+ {:entity-id (str entity-id)
+ :_type entity-type
+ :attribute k
+ :value v
+ :k-start now
+ :k-start-actor actor
+ :t-start t-t})))
+ (remove nil?))
+ write-assertions (fn [assertions]
+ (-> (r/db (:db-name store)) (r/table (:assertions-table store))
+ (r/insert assertions {:conflict :replace})
+ (r/run (:connection store))))]
+ (if (or (not entity)
+ (and (not-empty (:deleted entity))
+ (t/before? (last (:created entity)) (last (:deleted entity)))
+ (t/before? (last (:deleted entity)) now)))
+ (do (write-assertions assertions)
+ (update-lifecycle store entity-type (-> entity (assoc :id entity-id)
+ (update :created conj now)))
+ {:created? true})
+ (let [existing-assertions (entity-assertions store entity-type entity-id :t-k now)
+ duplicates (set
+ (when changes-only
+ (->> existing-assertions
+ (filter (fn [{:keys [attribute value]}]
+ (= value (get doc attribute))))
+ (map :attribute))))
+ attributes (->> attributes (remove duplicates) set)
+ retractions (->> existing-assertions
+ (filter #(attributes (:attribute %)))
+ (map #(-> (assoc % :k-end now :k-end-actor actor
+ :t-end t-t)
+ (assoc :t-end-actor (when t-t actor))
+ compact)))
+ assertions (remove #(duplicates (:attribute %)) assertions)]
+ (when (not-empty assertions)
+ (write-assertions (concat retractions assertions))
+ (update-lifecycle store entity-type (update entity :updated conj now)))
+ (merge
+ {:updated (mapv :attribute assertions)}
+ (when (not-empty duplicates)
+ {:ignored (vec duplicates)}))))))))
(defn delete-entity
"Automatically retracts all the assertions made against the entity referenced
by 'entity-id'"
- [store actor entity-id]
- (let [now (t/now)
- retractions (->> (entity-assertions store entity-id :t-k now)
- (map #(assoc % :k-end now :k-end-actor actor)))]
- (update-lifecycle store (update (entity-lifecycle store entity-id) :deleted conj now))
- (-> (r/db (:db-name store)) (r/table (:assertions-table store))
- (r/insert retractions {:conflict :replace})
- (r/run (:connection store)))))
+ [store actor entity-type entity-id]
+ (time!
+ delete-entity-time
+ (let [now (t/now)
+ retractions (->> (entity-assertions store entity-type entity-id :t-k now)
+ (map #(assoc % :k-end now :k-end-actor actor)))]
+ (update-lifecycle store entity-type
+ (update (entity-lifecycle store entity-type entity-id) :deleted conj now))
+ (-> (r/db (:db-name store)) (r/table (:assertions-table store))
+ (r/insert retractions {:conflict :replace})
+ (r/run (:connection store))))))
+
+(defn aggregate-entities
+ [store entity-type & {:keys [filters properties t-t t-k]}]
+ (time!
+ aggregate-entities-time
+ (let [t-k (or t-k (t/now))
+ properties (map keyword (st/split properties #","))
+ entities (-> (r/db (:db-name store)) (r/table (:assertions-table store))
+ (r/get-all [entity-type] {:index (:entity-type-index store)})
+ (cond-> t-t (r/filter (r/fn [row]
+ (r/and
+ (r/le (r/get-field row "t-start") t-t)
+ (r/or (r/not (r/has-fields row "t-end"))
+ (r/gt (r/get-field row "t-end") t-t)))))
+ t-k (r/filter (r/fn [row]
+ (r/and
+ (r/le (r/get-field row "k-start") t-k)
+ (r/or (r/not (r/has-fields row "k-end"))
+ (r/gt (r/get-field row "k-end") t-k))))))
+ (r/run (:connection store))
+ (->> (reduce (fn [entities {:keys [entity-id attribute value]}]
+ (assoc-in entities
+ [entity-id (keyword attribute)]
+ value)) {})
+ (map (fn [[id e]]
+ (assoc e :_id id :_type entity-type))))
+ (cond->> (not-empty filters)
+ (filter #(every? (fn [k] (= (get % k) (get filters k))) (keys filters)))))]
+ (cond-> {:total (count entities)}
+ (not-empty properties)
+ (merge {:aggregations (->> properties
+ (map (fn [p]
+ [p (->> entities (group-by #(get % p))
+ (map (fn [[value entities]]
+ {:key value
+ :doc_count (count entities)})))]))
+ (into {}))})))))
;;; Implementation
(defn- entity-lock
- [store entity-id]
- (locking (:entity-locks store)
- (if-let [l (get @(:entity-locks store) entity-id)]
- l
- (let [l (Object.)]
- (swap! (:entity-locks store) assoc entity-id l)
- l))))
+ [store entity-type entity-id]
+ (time!
+ entity-lock-time
+ (locking (:entity-locks store)
+ (if-let [l (get @(:entity-locks store) [entity-type entity-id])]
+ l
+ (let [l (Object.)]
+ (swap! (:entity-locks store) assoc [entity-type entity-id] l)
+ l)))))
(defn- entity-lifecycle
- [store entity-id]
- (-> (r/db (:db-name store)) (r/table (:lifecycle-table store))
- (r/get entity-id)
- (r/run (:connection store))))
+ [store entity-type entity-id]
+ (time!
+ entity-lifecycle-time
+ (-> (r/db (:db-name store)) (r/table (:lifecycle-table store))
+ (r/get entity-id)
+ (r/run (:connection store)))))
(defn- update-lifecycle
- [store lifecycle-record]
- (-> (r/db (:db-name store)) (r/table (:lifecycle-table store))
- (r/insert lifecycle-record {:conflict :update})
- (r/run (:connection store))))
+ [store entity-type lifecycle-record]
+ (time!
+ update-lifecycle-time
+ (-> (r/db (:db-name store)) (r/table (:lifecycle-table store))
+ (r/insert (assoc lifecycle-record :entity-type entity-type) {:conflict :update})
+ (r/run (:connection store)))))
-(defn entity-assertions
- [store entity-id & {:keys [t-k]}]
+(defn fetch-assertions
+ [store entity-type id index & {:keys [t-t t-k]}]
(-> (r/db (:db-name store)) (r/table (:assertions-table store))
- (r/get-all [entity-id] {:index (:entity-id-index store)})
- (cond-> t-k (r/filter (r/fn [row]
+ (r/get-all [id] {:index index})
+ (r/filter (r/fn [row] (r/eq (r/get-field row "_type") entity-type)))
+ (cond-> t-t (r/filter (r/fn [row]
+ (r/and
+ (r/le (r/get-field row "t-start") t-t)
+ (r/or (r/not (r/has-fields row "t-end"))
+ (r/gt (r/get-field row "t-end") t-t)))))
+ t-k (r/filter (r/fn [row]
(r/and
(r/le (r/get-field row "k-start") t-k)
(r/or (r/not (r/has-fields row "k-end"))
(r/gt (r/get-field row "k-end") t-k))))))
(r/run (:connection store))))
+(defn entity-assertions
+ [store entity-type entity-id & {:keys [t-t t-k]}]
+ (time!
+ entity-assertions-time
+ (fetch-assertions store entity-type entity-id (:entity-id-index store) :t-t t-t :t-k t-k)))
+
+(defn entity-relationships
+ [store entity-id & {:keys [t-t t-k]}]
+ (time!
+ entity-relationships-time
+ (->> (fetch-assertions store "relationship" entity-id (:value-index store)
+ :t-t t-t :t-k t-k)
+ (map :entity-id) distinct
+ (pmap #(get-entity store "relationship" % :t-t t-t :t-k t-k))
+ compact)))
+
(defn- ensure-db
[conn db-name]
(when-not ((set (r/run (r/db-list) conn)) db-name)