diff options
-rw-r--r-- | .gitignore | 94 | ||||
-rw-r--r-- | .gitreview | 4 | ||||
-rw-r--r-- | README.md | 3 | ||||
-rw-r--r-- | dev/dev.clj | 22 | ||||
-rw-r--r-- | dev/user.clj | 7 | ||||
-rw-r--r-- | devops/docker-compose.yml | 20 | ||||
-rw-r--r-- | devops/gallifrey.service | 13 | ||||
-rw-r--r-- | devops/gallifrey/Dockerfile | 45 | ||||
-rw-r--r-- | devops/gallifrey/build-gallifrey | 2 | ||||
-rw-r--r-- | devops/gallifrey/docker-entrypoint.sh | 5 | ||||
-rw-r--r-- | devops/install-service | 6 | ||||
-rw-r--r-- | devops/nginx/Dockerfile | 9 | ||||
-rw-r--r-- | devops/nginx/default.conf | 22 | ||||
-rw-r--r-- | devops/nginx/nginx.conf | 33 | ||||
-rw-r--r-- | prod/gallifrey/server.clj | 14 | ||||
-rw-r--r-- | project.clj | 28 | ||||
-rw-r--r-- | src/gallifrey/config.clj | 17 | ||||
-rw-r--r-- | src/gallifrey/handler.clj | 104 | ||||
-rw-r--r-- | src/gallifrey/http_server.clj | 9 | ||||
-rw-r--r-- | src/gallifrey/store.clj | 197 |
20 files changed, 654 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c0914f2 --- /dev/null +++ b/.gitignore @@ -0,0 +1,94 @@ +# Created by https://www.gitignore.io/api/clojure,emacs,osx,java + +### Clojure ### +pom.xml +pom.xml.asc +*jar +lib/ +classes/ +target/ +out/ +checkouts/ +.lein-deps-sum +.lein-repl-history +.lein-plugins/ +.lein-failures +.nrepl-port + + +### Emacs ### +# -*- mode: gitignore; -*- +*~ +\#*\# +/.emacs.desktop +/.emacs.desktop.lock +*.elc +auto-save-list +tramp +.\#* + +# Org-mode +.org-id-locations +*_archive + +# flymake-mode +*_flymake.* + +# eshell files +/eshell/history +/eshell/lastdir + +# elpa packages +/elpa/ + +# reftex files +*.rel + +# AUCTeX auto folder +/auto/ + +# cask packages +.cask/ + + +### OSX ### +.DS_Store +.AppleDouble +.LSOverride + +# Icon must end with two \r +Icon + + +# Thumbnails +._* + +# Files that might appear in the root of a volume +.DocumentRevisions-V100 +.fseventsd +.Spotlight-V100 +.TemporaryItems +.Trashes +.VolumeIcon.icns + +# Directories potentially created on remote AFP share +.AppleDB +.AppleDesktop +Network Trash Folder +Temporary Items +.apdisk + + +### Java ### +*.class + +# Mobile Tools for Java (J2ME) +.mtj.tmp/ + +# Package Files # +*.jar +*.war +*.ear + +# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml +hs_err_pid* diff --git a/.gitreview b/.gitreview new file mode 100644 index 0000000..a757c2e --- /dev/null +++ b/.gitreview @@ -0,0 +1,4 @@ +[gerrit] +host=gerrit.openecomp.org +port=29418 +project=gallifrey.git diff --git a/README.md b/README.md new file mode 100644 index 0000000..b3413c9 --- /dev/null +++ b/README.md @@ -0,0 +1,3 @@ +# gallifrey + +The time travelling database diff --git a/dev/dev.clj b/dev/dev.clj new file mode 100644 index 0000000..63ebec6 --- /dev/null +++ b/dev/dev.clj @@ -0,0 +1,22 @@ +(ns dev + "Tools for interactive development with the REPL. This file should + not be included in a production build of the application." + (:require [gallifrey.config :refer [config]] + [gallifrey.handler :refer [handler]] + [gallifrey.store :as gs] + + [integrant.core :as ig] + + [integrant.repl :refer [clear go halt init reset reset-all]] + [integrant.repl.state :refer [system]] + + [clojure.tools.namespace.repl :refer [refresh refresh-all disable-reload!]] + [clojure.repl :refer [apropos dir doc find-doc pst source]] + [clojure.test :refer [run-tests run-all-tests]] + [clojure.pprint :refer [pprint]] + [clojure.reflect :refer [reflect]])) + +(disable-reload! (find-ns 'integrant.core)) + +(integrant.repl/set-prep! (constantly (config {:db-server {:host "localhost" :port 28015} + :http-port 3449}))) diff --git a/dev/user.clj b/dev/user.clj new file mode 100644 index 0000000..9a2f701 --- /dev/null +++ b/dev/user.clj @@ -0,0 +1,7 @@ +(ns user) + +(defn dev + "Load and switch to the 'dev' namespace." + [] + (require 'dev) + (in-ns 'dev)) diff --git a/devops/docker-compose.yml b/devops/docker-compose.yml new file mode 100644 index 0000000..4ebd291 --- /dev/null +++ b/devops/docker-compose.yml @@ -0,0 +1,20 @@ +rethinkdb: + image: rethinkdb + container_name: rethinkdb + volumes: + - /opt/gallifrey/db:/data + +gallifrey: + build: ./gallifrey + container_name: gallifrey + links: + - rethinkdb + +nginx: + build: ./nginx + container_name: nginx + links: + - gallifrey + ports: + - 80:80 + - 443:443 diff --git a/devops/gallifrey.service b/devops/gallifrey.service new file mode 100644 index 0000000..43b3404 --- /dev/null +++ b/devops/gallifrey.service @@ -0,0 +1,13 @@ +[Unit] +Description=Gallifrey container +After=docker.socket early-docker.target network.target network-online.target +Wants=network-online.target +BindsTo=docker.service + +[Service] +Restart=always +ExecStart=/usr/bin/docker-compose -f /opt/gallifrey/docker-compose.yml -p gallifrey up +ExecStop=/usr/bin/docker-compose -f /opt/gallifrey/docker-compose.yml -p gallifrey down + +[Install] +WantedBy=multi-user.target diff --git a/devops/gallifrey/Dockerfile b/devops/gallifrey/Dockerfile new file mode 100644 index 0000000..4df1112 --- /dev/null +++ b/devops/gallifrey/Dockerfile @@ -0,0 +1,45 @@ +FROM alpine:3.6 + +# Java Version +ENV JAVA_VERSION_MAJOR 8 +ENV JAVA_VERSION_MINOR 131 +ENV JAVA_VERSION_BUILD 11 +ENV JAVA_PACKAGE jre +ENV GLIBC_VERSION 2.25-r0 + +ENV JAVA_8_BASE_URL http://download.oracle.com/otn-pub/java/jdk/${JAVA_VERSION_MAJOR}u${JAVA_VERSION_MINOR}-b${JAVA_VERSION_BUILD}/d54c1d3a095b4ff2b6607d096fa80163/${JAVA_PACKAGE}-${JAVA_VERSION_MAJOR}u${JAVA_VERSION_MINOR} + +RUN apk update +RUN apk add curl + +# Install glibc (required for java) +RUN curl -jsSL -o /etc/apk/keys/sgerrand.rsa.pub https://raw.githubusercontent.com/sgerrand/alpine-pkg-glibc/master/sgerrand.rsa.pub && \ + curl -jsSL -O https://github.com/sgerrand/alpine-pkg-glibc/releases/download/${GLIBC_VERSION}/glibc-${GLIBC_VERSION}.apk && \ + apk add glibc-${GLIBC_VERSION}.apk && \ + rm -f glibc-${GLIBC_VERSION}.apk + +# Install Java +RUN mkdir /opt && \ + curl -jsSL -H "Cookie: oraclelicense=accept-securebackup-cookie" ${JAVA_8_BASE_URL}-linux-x64.tar.gz | tar -xzf - -C /opt && \ + ln -s /opt/${JAVA_PACKAGE}1.${JAVA_VERSION_MAJOR}.0_${JAVA_VERSION_MINOR} /opt/${JAVA_PACKAGE} && \ + cd /opt/${JAVA_PACKAGE}/ && rm -rf *src.zip lib/missioncontrol lib/visualvm lib/plugin.jar plugin lib/*javafx* lib/*jfx* lib/ext/jfxrt.jar bin/javaws lib/javaws.jar lib/desktop lib/deploy* lib/amd64/libdecora_sse.so lib/amd64/libprism_*.so lib/amd64/libfxplugins.so lib/amd64/libglass.so lib/amd64/libgstreamer-lite.so lib/amd64/libjavafx*.so lib/amd64/libjfx*. + +# Install Java Cryptography Extension (JCE) Unlimited Strength +RUN curl -jksSLH "Cookie: oraclelicense=accept-securebackup-cookie" -o /tmp/jce_policy.zip \ + http://download.oracle.com/otn-pub/java/jce/${JAVA_VERSION_MAJOR}/jce_policy-${JAVA_VERSION_MAJOR}.zip && \ + unzip -o -d /opt/${JAVA_PACKAGE}/lib/security /tmp/jce_policy.zip && rm -f /tmp/jce_policy.zip + +RUN apk del curl + +ENV PATH /opt/jre/bin:${PATH} + +RUN apk add zip openssh-keygen openssh-client + +EXPOSE 8080 + +RUN mkdir -p /opt/gallifrey +COPY gallifrey.jar /opt/gallifrey + +COPY ./docker-entrypoint.sh / +RUN chmod 700 /docker-entrypoint.sh +ENTRYPOINT ["/docker-entrypoint.sh"] diff --git a/devops/gallifrey/build-gallifrey b/devops/gallifrey/build-gallifrey new file mode 100644 index 0000000..82257b9 --- /dev/null +++ b/devops/gallifrey/build-gallifrey @@ -0,0 +1,2 @@ +lein uberjar +cp -f ../../target/gallifrey.jar . diff --git a/devops/gallifrey/docker-entrypoint.sh b/devops/gallifrey/docker-entrypoint.sh new file mode 100644 index 0000000..177d979 --- /dev/null +++ b/devops/gallifrey/docker-entrypoint.sh @@ -0,0 +1,5 @@ +#!/bin/sh + +set -e + +java -jar /opt/gallifrey/gallifrey.jar diff --git a/devops/install-service b/devops/install-service new file mode 100644 index 0000000..826df18 --- /dev/null +++ b/devops/install-service @@ -0,0 +1,6 @@ +sudo cp gallifrey.service /etc/systemd/system +sudo chown root:root /etc/systemd/system/gallifrey.service +sudo chmod 644 /etc/systemd/system/gallifrey.service + +sudo systemctl enable gallifrey +sudo systemctl daemon-reload diff --git a/devops/nginx/Dockerfile b/devops/nginx/Dockerfile new file mode 100644 index 0000000..4f2ba9f --- /dev/null +++ b/devops/nginx/Dockerfile @@ -0,0 +1,9 @@ +FROM nginx:alpine + +COPY ssl-cert-snakeoil.pem /etc/ssl/certs/ +COPY ssl-cert-snakeoil.key /etc/ssl/private/ +RUN chown -R nginx:nginx /etc/ssl +RUN chmod 640 /etc/ssl/private/ssl-cert-snakeoil.key +RUN chmod 750 /etc/ssl/private + +COPY default.conf /etc/nginx/conf.d/ diff --git a/devops/nginx/default.conf b/devops/nginx/default.conf new file mode 100644 index 0000000..ea9980f --- /dev/null +++ b/devops/nginx/default.conf @@ -0,0 +1,22 @@ +server { + # Listen on 80 and 443 + listen 80; + listen 443 ssl; + # Self-signed certificate. + ssl_certificate /etc/ssl/certs/ssl-cert-snakeoil.pem; + ssl_certificate_key /etc/ssl/private/ssl-cert-snakeoil.key; + + # Redirect all non-SSL traffic to SSL. + if ($ssl_protocol = "") { + rewrite ^ https://$host$request_uri? permanent; + } + + # Split off traffic to gallifrey, and make sure that websockets + # are managed correctly. + location / { + proxy_pass http://gallifrey:8081; + proxy_http_version 1.1; + proxy_set_header Upgrade websocket; + proxy_set_header Connection upgrade; + } +} diff --git a/devops/nginx/nginx.conf b/devops/nginx/nginx.conf new file mode 100644 index 0000000..3ebc618 --- /dev/null +++ b/devops/nginx/nginx.conf @@ -0,0 +1,33 @@ + +user nginx; +worker_processes 1; + +error_log /var/log/nginx/error.log warn; +pid /var/run/nginx.pid; + + +events { + worker_connections 1024; +} + + +http { + include /etc/nginx/mime.types; + default_type application/octet-stream; + + log_format main '$remote_addr - $remote_user [$time_local] "$request" ' + '$status $body_bytes_sent "$http_referer" ' + '"$http_user_agent" "$http_x_forwarded_for"'; + + access_log /var/log/nginx/access.log main; + + sendfile on; + #tcp_nopush on; + + keepalive_timeout 65; + + #gzip on; + + include /etc/nginx/conf.d/*.conf; + include /etc/nginx/sites-available/*.conf; +} diff --git a/prod/gallifrey/server.clj b/prod/gallifrey/server.clj new file mode 100644 index 0000000..cefa6b5 --- /dev/null +++ b/prod/gallifrey/server.clj @@ -0,0 +1,14 @@ +(ns gallifrey.server + (:require [gallifrey.config :refer [config]] + [gallifrey.handler :refer [handler]] + [config.core :refer [env]] + [org.httpkit.server :refer [run-server]] + [integrant.core :as ig]) + (:gen-class)) + +(defn -main [& args] + (let [port (Integer/parseInt (or (env :http-port) "8081"))] + (println "Listening on port" port) + (ig/init (config {:db-server {:host (or (env :db-host) "rethinkdb") + :port 28015} + :http-port port})))) diff --git a/project.clj b/project.clj new file mode 100644 index 0000000..d520863 --- /dev/null +++ b/project.clj @@ -0,0 +1,28 @@ +(defproject gallifrey "0.4.0" + :dependencies [[org.clojure/clojure "1.8.0"] + + [com.7theta/utilis "1.0.4"] + + [http-kit "2.2.0"] + [ring/ring-core "1.6.3"] + [ring/ring-defaults "0.3.1"] + [ring/ring-anti-forgery "1.1.0"] + [compojure "1.6.0"] + [liberator "0.15.1"] + [cheshire "5.7.1"] + + [com.apa512/rethinkdb "0.15.26"] + [inflections "0.13.0"] + [clj-time "0.14.2"] + + [integrant "0.6.2"] + [clojure-future-spec "1.9.0-beta4"] + [yogthos/config "0.9"]] + :min-lein-version "2.5.3" + :profiles {:dev {:source-paths ["dev"] + :dependencies [[ring/ring-devel "1.6.3"] + [integrant/repl "0.2.0"]]} + :uberjar {:source-paths ["prod"] + :main gallifrey.server + :uberjar-name "gallifrey.jar"}} + :prep-tasks ["compile"]) diff --git a/src/gallifrey/config.clj b/src/gallifrey/config.clj new file mode 100644 index 0000000..fd9e5ee --- /dev/null +++ b/src/gallifrey/config.clj @@ -0,0 +1,17 @@ +(ns gallifrey.config + (:require [integrant.core :as ig])) + +(defn config + [app-config] + (let [conf {:gallifrey/store + {:db-server (:db-server app-config) + :db-name "gallifrey"} + + :gallifrey/handler + {:store (ig/ref :gallifrey/store)} + + :gallifrey/http-server + {:port (:http-port app-config) + :handler (ig/ref :gallifrey/handler)}}] + (ig/load-namespaces conf) + conf)) diff --git a/src/gallifrey/handler.clj b/src/gallifrey/handler.clj new file mode 100644 index 0000000..52d59c7 --- /dev/null +++ b/src/gallifrey/handler.clj @@ -0,0 +1,104 @@ +(ns gallifrey.handler + (:require [gallifrey.store :as store] + [utilis.map :refer [map-vals compact]] + [liberator.core :refer [defresource]] + [liberator.representation :refer [as-response ring-response]] + [compojure.core :refer [GET PUT PATCH ANY defroutes]] + [compojure.route :refer [resources]] + [ring.util.response :refer [resource-response content-type]] + [ring.middleware.defaults :refer [wrap-defaults api-defaults]] + [ring.middleware.anti-forgery :refer [wrap-anti-forgery]] + [ring.middleware.session :refer [wrap-session]] + [cheshire.core :as json] + [clj-time.format :as tf] + [integrant.core :as ig])) + +(declare handler) + +(defonce ^:private the-store (atom nil)) + +(defmethod ig/init-key :gallifrey/handler [_ {:keys [store]}] + (reset! the-store store) + handler) + +(defmethod ig/halt-key! :gallifrey/handler [_ _] + (reset! the-store nil)) + +(declare serialize de-serialize) + +(defn entity-existed? + [id & {:keys [t-k]}] + (store/entity-existed? @the-store id :t-k t-k)) + +(defresource entity-endpoint [id] + :allowed-methods [:get :put :delete] + :available-media-types ["application/json"] + :malformed? (fn [ctx] + (when (#{:put :delete} (-> ctx :request :request-method)) + (-> ctx :request :params :actor empty?))) + :exists? (fn [ctx] + (if-let [resource (->> (when-let [t-k (-> ctx :request :params :t-k)] + (tf/parse t-k)) + (store/get-entity @the-store id :t-k) + serialize)] + {::resource resource} + (when (and (= :put (-> ctx :request :request-method)) + (-> ctx :request :params :create not-empty)) + true))) + :existed? (fn [ctx] + (entity-existed? id :t-k (when-let [t-k (-> ctx :request :params :t-k)] + (tf/parse t-k)))) + :handle-ok ::resource + :can-put-to-missing? false + :handle-not-implemented (fn [{{m :request-method} :request :as ctx}] + (when (= :put m) + (-> (as-response "Resource not found" ctx) + (assoc :status (if (entity-existed? id) 410 404)) + (ring-response)))) + :put! (fn [ctx] + (let [body (json/parse-string (slurp (get-in ctx [:request :body]))) + actor (-> ctx :request :params :actor) + changes-only (when-let [c (-> ctx :request :params :changes-only)] + (boolean c))] + {::created? (:created? (store/put-entity @the-store actor id body + :changes-only changes-only))})) + :delete! (fn [ctx] + (let [actor (-> ctx :request :params :actor)] + (store/delete-entity @the-store actor id))) + :new? ::created?) + +(defresource entity-lifespan-endpoint [id] + :allowed-methods [:get] + :available-media-types ["application/json"] + :exists? (fn [ctx] + (when-let [resource (not-empty + (store/entity-lifespan @the-store id))] + {::resource (-> resource + (update :created str) + (update :updated (partial map str)) + (update :deleted str) + compact)})) + :handle-ok ::resource) + +(defroutes app-routes + (ANY "/entity/:id" [id] (entity-endpoint id)) + (GET "/entity/:id/lifespan" [id] (entity-lifespan-endpoint id)) + (resources "/")) + +(def handler + (-> app-routes + (wrap-defaults api-defaults))) + + +;;; Implementation + +(defn- serialize + [e] + (compact + (update e :_meta #(map-vals + (fn [m] + (map-vals str m)) %)))) + +(defn- de-serialize + [e] + e) diff --git a/src/gallifrey/http_server.clj b/src/gallifrey/http_server.clj new file mode 100644 index 0000000..96b5757 --- /dev/null +++ b/src/gallifrey/http_server.clj @@ -0,0 +1,9 @@ +(ns gallifrey.http-server + (:require [org.httpkit.server :refer [run-server]] + [integrant.core :as ig])) + +(defmethod ig/init-key :gallifrey/http-server [_ {:keys [port handler]}] + (run-server handler {:port port})) + +(defmethod ig/halt-key! :gallifrey/http-server [_ server] + (server :timeout 100)) diff --git a/src/gallifrey/store.clj b/src/gallifrey/store.clj new file mode 100644 index 0000000..85a17d7 --- /dev/null +++ b/src/gallifrey/store.clj @@ -0,0 +1,197 @@ +(ns gallifrey.store + (:require [utilis.map :refer [compact map-keys map-vals]] + [utilis.types.keyword :refer [->keyword]] + [rethinkdb.query :as r] + [inflections.core :refer [hyphenate underscore]] + [clj-time.core :as t] + [integrant.core :as ig]) + (:import [clojure.lang ExceptionInfo])) + +(declare ensure-db ensure-table ensure-index) + +(defmethod ig/init-key :gallifrey/store [_ {:keys [db-server db-name]}] + (let [{:keys [host port] :or {host "localhost" port 28015}} db-server + connection (r/connect :host host :port port :db db-name) + assertions-table "assertions" + lifecycle-table "lifecycle" + entity-id-index "entity-id-index"] + (ensure-db connection db-name) + (ensure-table connection db-name assertions-table) + (ensure-index connection db-name assertions-table entity-id-index :entity-id) + (ensure-table connection db-name lifecycle-table) + {:db-server db-server + :db-name db-name + :assertions-table assertions-table + :lifecycle-table lifecycle-table + :entity-id-index entity-id-index + :connection connection + :entity-locks (atom {})})) + +(defmethod ig/halt-key! :gallifrey/store [_ {:keys [connection subscriptions]}] + (.close connection) + nil) + + +(declare entity-lock entity-lifecycle update-lifecycle entity-assertions) + +(defn get-entity + "Get all the non-retracted attributes associated with an entity referenced by + entity-id at time 't-k'. 't-k' defaults to 'now' if it is not provided." + [store entity-id & {:keys [t-k]}] + (let [t-k (or t-k (t/now))] + (->> (entity-assertions store entity-id :t-k t-k) + (reduce (fn [doc {:keys [attribute value] :as a}] + (-> doc + (assoc (keyword attribute) value) + (assoc-in [:_meta (keyword attribute)] + (dissoc a :id :attribute :value :entity-id)))) + {}) + not-empty))) + +(defn entity-existed? + "Return a boolean indicating whether an entity referenced by entity-id + ever existed prior to 't-k'. 't-k' defaults to 'now' if it is not provided." + [store entity-id & {:keys [t-k]}] + (let [t-k (or t-k (t/now)) + deleted (:deleted (entity-lifecycle store entity-id))] + (boolean (some #(t/before? % t-k) deleted)))) + +(defn entity-lifespan + "Return a collection of hash map containing the :created, :updated and + :deleted timestamps for the entity as a whole." + [store entity-id] + (let [lifecycle (entity-lifecycle store entity-id) + ranges (map (fn [c d] [c d]) + (:created lifecycle) + (concat (:deleted lifecycle) (repeat nil))) + lifespans (group-by (fn [u] + (reduce (fn [u [c d]] + (if (and (t/before? c u) + (or (nil? d) + (t/before? u d))) + (reduced [c d]) + u)) u ranges)) + (:updated lifecycle))] + (mapv (fn [[c d]] + (compact {:created c :deleted d + :updated (get lifespans [c d])})) ranges))) + +(defn put-entity + "Stores the assertions included in 'doc' and automatically retracts any prior + assertions against the attributes in 'doc'. If 'changes-only' is set to true, + only attributes included in 'doc' that are asserting new values are captured. + + The 'actor' making the assertions must be provided as a string." + [store actor entity-id doc & {:keys [changes-only] + :or {changes-only true}}] + (locking (entity-lock store entity-id) + (let [entity (entity-lifecycle store entity-id) + now (t/now) + doc (map-keys name doc) + attributes (->> doc keys set) + assertions (->> (dissoc doc "id") + (map (fn [[k v]] + (compact + {:entity-id (str entity-id) + :attribute k + :value v + :k-start now + :k-start-actor actor}))) + (remove nil?)) + write-assertions (fn [assertions] + (-> (r/db (:db-name store)) (r/table (:assertions-table store)) + (r/insert assertions {:conflict :replace}) + (r/run (:connection store))))] + (if (or (not entity) + (and (not-empty (:deleted entity)) + (t/before? (last (:created entity)) (last (:deleted entity))) + (t/before? (last (:deleted entity)) now))) + (do (write-assertions assertions) + (update-lifecycle store (-> entity (assoc :id entity-id) + (update :created conj now))) + {:created? true}) + (let [existing-assertions (entity-assertions store entity-id :t-k now) + duplicates (set + (when changes-only + (->> existing-assertions + (filter (fn [{:keys [attribute value]}] + (= value (get doc attribute)))) + (map :attribute)))) + attributes (->> attributes (remove duplicates) set) + retractions (->> existing-assertions + (filter #(attributes (:attribute %))) + (map #(assoc % :k-end now :k-end-actor actor))) + assertions (remove #(duplicates (:attribute %)) assertions)] + (when (not-empty assertions) + (write-assertions (concat retractions assertions)) + (update-lifecycle store (update entity :updated conj now))) + (merge + {:updated (mapv :attribute assertions)} + (when (not-empty duplicates) + {:ignored (vec duplicates)}))))))) + +(defn delete-entity + "Automatically retracts all the assertions made against the entity referenced + by 'entity-id'" + [store actor entity-id] + (let [now (t/now) + retractions (->> (entity-assertions store entity-id :t-k now) + (map #(assoc % :k-end now :k-end-actor actor)))] + (update-lifecycle store (update (entity-lifecycle store entity-id) :deleted conj now)) + (-> (r/db (:db-name store)) (r/table (:assertions-table store)) + (r/insert retractions {:conflict :replace}) + (r/run (:connection store))))) + +;;; Implementation + +(defn- entity-lock + [store entity-id] + (locking (:entity-locks store) + (if-let [l (get @(:entity-locks store) entity-id)] + l + (let [l (Object.)] + (swap! (:entity-locks store) assoc entity-id l) + l)))) + +(defn- entity-lifecycle + [store entity-id] + (-> (r/db (:db-name store)) (r/table (:lifecycle-table store)) + (r/get entity-id) + (r/run (:connection store)))) + +(defn- update-lifecycle + [store lifecycle-record] + (-> (r/db (:db-name store)) (r/table (:lifecycle-table store)) + (r/insert lifecycle-record {:conflict :update}) + (r/run (:connection store)))) + +(defn entity-assertions + [store entity-id & {:keys [t-k]}] + (-> (r/db (:db-name store)) (r/table (:assertions-table store)) + (r/get-all [entity-id] {:index (:entity-id-index store)}) + (cond-> t-k (r/filter (r/fn [row] + (r/and + (r/le (r/get-field row "k-start") t-k) + (r/or (r/not (r/has-fields row "k-end")) + (r/gt (r/get-field row "k-end") t-k)))))) + (r/run (:connection store)))) + +(defn- ensure-db + [conn db-name] + (when-not ((set (r/run (r/db-list) conn)) db-name) + (r/run (r/db-create db-name) conn))) + +(defn- ensure-table + [conn db-name assertions-table & {:keys [init-fn]}] + (when-not ((set (r/run (r/table-list (r/db db-name)) conn)) assertions-table) + (-> (r/db db-name) (r/table-create assertions-table) (r/run conn)))) + +(defn- ensure-index + [conn db-name assertions-table index-name field] + (when-not ((set (-> (r/db db-name) (r/table assertions-table) + (r/index-list) (r/run conn))) index-name) + (-> (r/db db-name) + (r/table assertions-table) + (r/index-create index-name (r/fn [row] + (r/get-field row (->keyword field)))) + (r/run conn)))) |