summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--.gitignore94
-rw-r--r--.gitreview4
-rw-r--r--README.md3
-rw-r--r--dev/dev.clj22
-rw-r--r--dev/user.clj7
-rw-r--r--devops/docker-compose.yml20
-rw-r--r--devops/gallifrey.service13
-rw-r--r--devops/gallifrey/Dockerfile45
-rw-r--r--devops/gallifrey/build-gallifrey2
-rw-r--r--devops/gallifrey/docker-entrypoint.sh5
-rw-r--r--devops/install-service6
-rw-r--r--devops/nginx/Dockerfile9
-rw-r--r--devops/nginx/default.conf22
-rw-r--r--devops/nginx/nginx.conf33
-rw-r--r--prod/gallifrey/server.clj14
-rw-r--r--project.clj28
-rw-r--r--src/gallifrey/config.clj17
-rw-r--r--src/gallifrey/handler.clj104
-rw-r--r--src/gallifrey/http_server.clj9
-rw-r--r--src/gallifrey/store.clj197
20 files changed, 654 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..c0914f2
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,94 @@
+# Created by https://www.gitignore.io/api/clojure,emacs,osx,java
+
+### Clojure ###
+pom.xml
+pom.xml.asc
+*jar
+lib/
+classes/
+target/
+out/
+checkouts/
+.lein-deps-sum
+.lein-repl-history
+.lein-plugins/
+.lein-failures
+.nrepl-port
+
+
+### Emacs ###
+# -*- mode: gitignore; -*-
+*~
+\#*\#
+/.emacs.desktop
+/.emacs.desktop.lock
+*.elc
+auto-save-list
+tramp
+.\#*
+
+# Org-mode
+.org-id-locations
+*_archive
+
+# flymake-mode
+*_flymake.*
+
+# eshell files
+/eshell/history
+/eshell/lastdir
+
+# elpa packages
+/elpa/
+
+# reftex files
+*.rel
+
+# AUCTeX auto folder
+/auto/
+
+# cask packages
+.cask/
+
+
+### OSX ###
+.DS_Store
+.AppleDouble
+.LSOverride
+
+# Icon must end with two \r
+Icon
+
+
+# Thumbnails
+._*
+
+# Files that might appear in the root of a volume
+.DocumentRevisions-V100
+.fseventsd
+.Spotlight-V100
+.TemporaryItems
+.Trashes
+.VolumeIcon.icns
+
+# Directories potentially created on remote AFP share
+.AppleDB
+.AppleDesktop
+Network Trash Folder
+Temporary Items
+.apdisk
+
+
+### Java ###
+*.class
+
+# Mobile Tools for Java (J2ME)
+.mtj.tmp/
+
+# Package Files #
+*.jar
+*.war
+*.ear
+
+# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
+hs_err_pid*
diff --git a/.gitreview b/.gitreview
new file mode 100644
index 0000000..a757c2e
--- /dev/null
+++ b/.gitreview
@@ -0,0 +1,4 @@
+[gerrit]
+host=gerrit.openecomp.org
+port=29418
+project=gallifrey.git
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..b3413c9
--- /dev/null
+++ b/README.md
@@ -0,0 +1,3 @@
+# gallifrey
+
+The time travelling database
diff --git a/dev/dev.clj b/dev/dev.clj
new file mode 100644
index 0000000..63ebec6
--- /dev/null
+++ b/dev/dev.clj
@@ -0,0 +1,22 @@
+(ns dev
+ "Tools for interactive development with the REPL. This file should
+ not be included in a production build of the application."
+ (:require [gallifrey.config :refer [config]]
+ [gallifrey.handler :refer [handler]]
+ [gallifrey.store :as gs]
+
+ [integrant.core :as ig]
+
+ [integrant.repl :refer [clear go halt init reset reset-all]]
+ [integrant.repl.state :refer [system]]
+
+ [clojure.tools.namespace.repl :refer [refresh refresh-all disable-reload!]]
+ [clojure.repl :refer [apropos dir doc find-doc pst source]]
+ [clojure.test :refer [run-tests run-all-tests]]
+ [clojure.pprint :refer [pprint]]
+ [clojure.reflect :refer [reflect]]))
+
+(disable-reload! (find-ns 'integrant.core))
+
+(integrant.repl/set-prep! (constantly (config {:db-server {:host "localhost" :port 28015}
+ :http-port 3449})))
diff --git a/dev/user.clj b/dev/user.clj
new file mode 100644
index 0000000..9a2f701
--- /dev/null
+++ b/dev/user.clj
@@ -0,0 +1,7 @@
+(ns user)
+
+(defn dev
+ "Load and switch to the 'dev' namespace."
+ []
+ (require 'dev)
+ (in-ns 'dev))
diff --git a/devops/docker-compose.yml b/devops/docker-compose.yml
new file mode 100644
index 0000000..4ebd291
--- /dev/null
+++ b/devops/docker-compose.yml
@@ -0,0 +1,20 @@
+rethinkdb:
+ image: rethinkdb
+ container_name: rethinkdb
+ volumes:
+ - /opt/gallifrey/db:/data
+
+gallifrey:
+ build: ./gallifrey
+ container_name: gallifrey
+ links:
+ - rethinkdb
+
+nginx:
+ build: ./nginx
+ container_name: nginx
+ links:
+ - gallifrey
+ ports:
+ - 80:80
+ - 443:443
diff --git a/devops/gallifrey.service b/devops/gallifrey.service
new file mode 100644
index 0000000..43b3404
--- /dev/null
+++ b/devops/gallifrey.service
@@ -0,0 +1,13 @@
+[Unit]
+Description=Gallifrey container
+After=docker.socket early-docker.target network.target network-online.target
+Wants=network-online.target
+BindsTo=docker.service
+
+[Service]
+Restart=always
+ExecStart=/usr/bin/docker-compose -f /opt/gallifrey/docker-compose.yml -p gallifrey up
+ExecStop=/usr/bin/docker-compose -f /opt/gallifrey/docker-compose.yml -p gallifrey down
+
+[Install]
+WantedBy=multi-user.target
diff --git a/devops/gallifrey/Dockerfile b/devops/gallifrey/Dockerfile
new file mode 100644
index 0000000..4df1112
--- /dev/null
+++ b/devops/gallifrey/Dockerfile
@@ -0,0 +1,45 @@
+FROM alpine:3.6
+
+# Java Version
+ENV JAVA_VERSION_MAJOR 8
+ENV JAVA_VERSION_MINOR 131
+ENV JAVA_VERSION_BUILD 11
+ENV JAVA_PACKAGE jre
+ENV GLIBC_VERSION 2.25-r0
+
+ENV JAVA_8_BASE_URL http://download.oracle.com/otn-pub/java/jdk/${JAVA_VERSION_MAJOR}u${JAVA_VERSION_MINOR}-b${JAVA_VERSION_BUILD}/d54c1d3a095b4ff2b6607d096fa80163/${JAVA_PACKAGE}-${JAVA_VERSION_MAJOR}u${JAVA_VERSION_MINOR}
+
+RUN apk update
+RUN apk add curl
+
+# Install glibc (required for java)
+RUN curl -jsSL -o /etc/apk/keys/sgerrand.rsa.pub https://raw.githubusercontent.com/sgerrand/alpine-pkg-glibc/master/sgerrand.rsa.pub && \
+ curl -jsSL -O https://github.com/sgerrand/alpine-pkg-glibc/releases/download/${GLIBC_VERSION}/glibc-${GLIBC_VERSION}.apk && \
+ apk add glibc-${GLIBC_VERSION}.apk && \
+ rm -f glibc-${GLIBC_VERSION}.apk
+
+# Install Java
+RUN mkdir /opt && \
+ curl -jsSL -H "Cookie: oraclelicense=accept-securebackup-cookie" ${JAVA_8_BASE_URL}-linux-x64.tar.gz | tar -xzf - -C /opt && \
+ ln -s /opt/${JAVA_PACKAGE}1.${JAVA_VERSION_MAJOR}.0_${JAVA_VERSION_MINOR} /opt/${JAVA_PACKAGE} && \
+ cd /opt/${JAVA_PACKAGE}/ && rm -rf *src.zip lib/missioncontrol lib/visualvm lib/plugin.jar plugin lib/*javafx* lib/*jfx* lib/ext/jfxrt.jar bin/javaws lib/javaws.jar lib/desktop lib/deploy* lib/amd64/libdecora_sse.so lib/amd64/libprism_*.so lib/amd64/libfxplugins.so lib/amd64/libglass.so lib/amd64/libgstreamer-lite.so lib/amd64/libjavafx*.so lib/amd64/libjfx*.
+
+# Install Java Cryptography Extension (JCE) Unlimited Strength
+RUN curl -jksSLH "Cookie: oraclelicense=accept-securebackup-cookie" -o /tmp/jce_policy.zip \
+ http://download.oracle.com/otn-pub/java/jce/${JAVA_VERSION_MAJOR}/jce_policy-${JAVA_VERSION_MAJOR}.zip && \
+ unzip -o -d /opt/${JAVA_PACKAGE}/lib/security /tmp/jce_policy.zip && rm -f /tmp/jce_policy.zip
+
+RUN apk del curl
+
+ENV PATH /opt/jre/bin:${PATH}
+
+RUN apk add zip openssh-keygen openssh-client
+
+EXPOSE 8080
+
+RUN mkdir -p /opt/gallifrey
+COPY gallifrey.jar /opt/gallifrey
+
+COPY ./docker-entrypoint.sh /
+RUN chmod 700 /docker-entrypoint.sh
+ENTRYPOINT ["/docker-entrypoint.sh"]
diff --git a/devops/gallifrey/build-gallifrey b/devops/gallifrey/build-gallifrey
new file mode 100644
index 0000000..82257b9
--- /dev/null
+++ b/devops/gallifrey/build-gallifrey
@@ -0,0 +1,2 @@
+lein uberjar
+cp -f ../../target/gallifrey.jar .
diff --git a/devops/gallifrey/docker-entrypoint.sh b/devops/gallifrey/docker-entrypoint.sh
new file mode 100644
index 0000000..177d979
--- /dev/null
+++ b/devops/gallifrey/docker-entrypoint.sh
@@ -0,0 +1,5 @@
+#!/bin/sh
+
+set -e
+
+java -jar /opt/gallifrey/gallifrey.jar
diff --git a/devops/install-service b/devops/install-service
new file mode 100644
index 0000000..826df18
--- /dev/null
+++ b/devops/install-service
@@ -0,0 +1,6 @@
+sudo cp gallifrey.service /etc/systemd/system
+sudo chown root:root /etc/systemd/system/gallifrey.service
+sudo chmod 644 /etc/systemd/system/gallifrey.service
+
+sudo systemctl enable gallifrey
+sudo systemctl daemon-reload
diff --git a/devops/nginx/Dockerfile b/devops/nginx/Dockerfile
new file mode 100644
index 0000000..4f2ba9f
--- /dev/null
+++ b/devops/nginx/Dockerfile
@@ -0,0 +1,9 @@
+FROM nginx:alpine
+
+COPY ssl-cert-snakeoil.pem /etc/ssl/certs/
+COPY ssl-cert-snakeoil.key /etc/ssl/private/
+RUN chown -R nginx:nginx /etc/ssl
+RUN chmod 640 /etc/ssl/private/ssl-cert-snakeoil.key
+RUN chmod 750 /etc/ssl/private
+
+COPY default.conf /etc/nginx/conf.d/
diff --git a/devops/nginx/default.conf b/devops/nginx/default.conf
new file mode 100644
index 0000000..ea9980f
--- /dev/null
+++ b/devops/nginx/default.conf
@@ -0,0 +1,22 @@
+server {
+ # Listen on 80 and 443
+ listen 80;
+ listen 443 ssl;
+ # Self-signed certificate.
+ ssl_certificate /etc/ssl/certs/ssl-cert-snakeoil.pem;
+ ssl_certificate_key /etc/ssl/private/ssl-cert-snakeoil.key;
+
+ # Redirect all non-SSL traffic to SSL.
+ if ($ssl_protocol = "") {
+ rewrite ^ https://$host$request_uri? permanent;
+ }
+
+ # Split off traffic to gallifrey, and make sure that websockets
+ # are managed correctly.
+ location / {
+ proxy_pass http://gallifrey:8081;
+ proxy_http_version 1.1;
+ proxy_set_header Upgrade websocket;
+ proxy_set_header Connection upgrade;
+ }
+}
diff --git a/devops/nginx/nginx.conf b/devops/nginx/nginx.conf
new file mode 100644
index 0000000..3ebc618
--- /dev/null
+++ b/devops/nginx/nginx.conf
@@ -0,0 +1,33 @@
+
+user nginx;
+worker_processes 1;
+
+error_log /var/log/nginx/error.log warn;
+pid /var/run/nginx.pid;
+
+
+events {
+ worker_connections 1024;
+}
+
+
+http {
+ include /etc/nginx/mime.types;
+ default_type application/octet-stream;
+
+ log_format main '$remote_addr - $remote_user [$time_local] "$request" '
+ '$status $body_bytes_sent "$http_referer" '
+ '"$http_user_agent" "$http_x_forwarded_for"';
+
+ access_log /var/log/nginx/access.log main;
+
+ sendfile on;
+ #tcp_nopush on;
+
+ keepalive_timeout 65;
+
+ #gzip on;
+
+ include /etc/nginx/conf.d/*.conf;
+ include /etc/nginx/sites-available/*.conf;
+}
diff --git a/prod/gallifrey/server.clj b/prod/gallifrey/server.clj
new file mode 100644
index 0000000..cefa6b5
--- /dev/null
+++ b/prod/gallifrey/server.clj
@@ -0,0 +1,14 @@
+(ns gallifrey.server
+ (:require [gallifrey.config :refer [config]]
+ [gallifrey.handler :refer [handler]]
+ [config.core :refer [env]]
+ [org.httpkit.server :refer [run-server]]
+ [integrant.core :as ig])
+ (:gen-class))
+
+(defn -main [& args]
+ (let [port (Integer/parseInt (or (env :http-port) "8081"))]
+ (println "Listening on port" port)
+ (ig/init (config {:db-server {:host (or (env :db-host) "rethinkdb")
+ :port 28015}
+ :http-port port}))))
diff --git a/project.clj b/project.clj
new file mode 100644
index 0000000..d520863
--- /dev/null
+++ b/project.clj
@@ -0,0 +1,28 @@
+(defproject gallifrey "0.4.0"
+ :dependencies [[org.clojure/clojure "1.8.0"]
+
+ [com.7theta/utilis "1.0.4"]
+
+ [http-kit "2.2.0"]
+ [ring/ring-core "1.6.3"]
+ [ring/ring-defaults "0.3.1"]
+ [ring/ring-anti-forgery "1.1.0"]
+ [compojure "1.6.0"]
+ [liberator "0.15.1"]
+ [cheshire "5.7.1"]
+
+ [com.apa512/rethinkdb "0.15.26"]
+ [inflections "0.13.0"]
+ [clj-time "0.14.2"]
+
+ [integrant "0.6.2"]
+ [clojure-future-spec "1.9.0-beta4"]
+ [yogthos/config "0.9"]]
+ :min-lein-version "2.5.3"
+ :profiles {:dev {:source-paths ["dev"]
+ :dependencies [[ring/ring-devel "1.6.3"]
+ [integrant/repl "0.2.0"]]}
+ :uberjar {:source-paths ["prod"]
+ :main gallifrey.server
+ :uberjar-name "gallifrey.jar"}}
+ :prep-tasks ["compile"])
diff --git a/src/gallifrey/config.clj b/src/gallifrey/config.clj
new file mode 100644
index 0000000..fd9e5ee
--- /dev/null
+++ b/src/gallifrey/config.clj
@@ -0,0 +1,17 @@
+(ns gallifrey.config
+ (:require [integrant.core :as ig]))
+
+(defn config
+ [app-config]
+ (let [conf {:gallifrey/store
+ {:db-server (:db-server app-config)
+ :db-name "gallifrey"}
+
+ :gallifrey/handler
+ {:store (ig/ref :gallifrey/store)}
+
+ :gallifrey/http-server
+ {:port (:http-port app-config)
+ :handler (ig/ref :gallifrey/handler)}}]
+ (ig/load-namespaces conf)
+ conf))
diff --git a/src/gallifrey/handler.clj b/src/gallifrey/handler.clj
new file mode 100644
index 0000000..52d59c7
--- /dev/null
+++ b/src/gallifrey/handler.clj
@@ -0,0 +1,104 @@
+(ns gallifrey.handler
+ (:require [gallifrey.store :as store]
+ [utilis.map :refer [map-vals compact]]
+ [liberator.core :refer [defresource]]
+ [liberator.representation :refer [as-response ring-response]]
+ [compojure.core :refer [GET PUT PATCH ANY defroutes]]
+ [compojure.route :refer [resources]]
+ [ring.util.response :refer [resource-response content-type]]
+ [ring.middleware.defaults :refer [wrap-defaults api-defaults]]
+ [ring.middleware.anti-forgery :refer [wrap-anti-forgery]]
+ [ring.middleware.session :refer [wrap-session]]
+ [cheshire.core :as json]
+ [clj-time.format :as tf]
+ [integrant.core :as ig]))
+
+(declare handler)
+
+(defonce ^:private the-store (atom nil))
+
+(defmethod ig/init-key :gallifrey/handler [_ {:keys [store]}]
+ (reset! the-store store)
+ handler)
+
+(defmethod ig/halt-key! :gallifrey/handler [_ _]
+ (reset! the-store nil))
+
+(declare serialize de-serialize)
+
+(defn entity-existed?
+ [id & {:keys [t-k]}]
+ (store/entity-existed? @the-store id :t-k t-k))
+
+(defresource entity-endpoint [id]
+ :allowed-methods [:get :put :delete]
+ :available-media-types ["application/json"]
+ :malformed? (fn [ctx]
+ (when (#{:put :delete} (-> ctx :request :request-method))
+ (-> ctx :request :params :actor empty?)))
+ :exists? (fn [ctx]
+ (if-let [resource (->> (when-let [t-k (-> ctx :request :params :t-k)]
+ (tf/parse t-k))
+ (store/get-entity @the-store id :t-k)
+ serialize)]
+ {::resource resource}
+ (when (and (= :put (-> ctx :request :request-method))
+ (-> ctx :request :params :create not-empty))
+ true)))
+ :existed? (fn [ctx]
+ (entity-existed? id :t-k (when-let [t-k (-> ctx :request :params :t-k)]
+ (tf/parse t-k))))
+ :handle-ok ::resource
+ :can-put-to-missing? false
+ :handle-not-implemented (fn [{{m :request-method} :request :as ctx}]
+ (when (= :put m)
+ (-> (as-response "Resource not found" ctx)
+ (assoc :status (if (entity-existed? id) 410 404))
+ (ring-response))))
+ :put! (fn [ctx]
+ (let [body (json/parse-string (slurp (get-in ctx [:request :body])))
+ actor (-> ctx :request :params :actor)
+ changes-only (when-let [c (-> ctx :request :params :changes-only)]
+ (boolean c))]
+ {::created? (:created? (store/put-entity @the-store actor id body
+ :changes-only changes-only))}))
+ :delete! (fn [ctx]
+ (let [actor (-> ctx :request :params :actor)]
+ (store/delete-entity @the-store actor id)))
+ :new? ::created?)
+
+(defresource entity-lifespan-endpoint [id]
+ :allowed-methods [:get]
+ :available-media-types ["application/json"]
+ :exists? (fn [ctx]
+ (when-let [resource (not-empty
+ (store/entity-lifespan @the-store id))]
+ {::resource (-> resource
+ (update :created str)
+ (update :updated (partial map str))
+ (update :deleted str)
+ compact)}))
+ :handle-ok ::resource)
+
+(defroutes app-routes
+ (ANY "/entity/:id" [id] (entity-endpoint id))
+ (GET "/entity/:id/lifespan" [id] (entity-lifespan-endpoint id))
+ (resources "/"))
+
+(def handler
+ (-> app-routes
+ (wrap-defaults api-defaults)))
+
+
+;;; Implementation
+
+(defn- serialize
+ [e]
+ (compact
+ (update e :_meta #(map-vals
+ (fn [m]
+ (map-vals str m)) %))))
+
+(defn- de-serialize
+ [e]
+ e)
diff --git a/src/gallifrey/http_server.clj b/src/gallifrey/http_server.clj
new file mode 100644
index 0000000..96b5757
--- /dev/null
+++ b/src/gallifrey/http_server.clj
@@ -0,0 +1,9 @@
+(ns gallifrey.http-server
+ (:require [org.httpkit.server :refer [run-server]]
+ [integrant.core :as ig]))
+
+(defmethod ig/init-key :gallifrey/http-server [_ {:keys [port handler]}]
+ (run-server handler {:port port}))
+
+(defmethod ig/halt-key! :gallifrey/http-server [_ server]
+ (server :timeout 100))
diff --git a/src/gallifrey/store.clj b/src/gallifrey/store.clj
new file mode 100644
index 0000000..85a17d7
--- /dev/null
+++ b/src/gallifrey/store.clj
@@ -0,0 +1,197 @@
+(ns gallifrey.store
+ (:require [utilis.map :refer [compact map-keys map-vals]]
+ [utilis.types.keyword :refer [->keyword]]
+ [rethinkdb.query :as r]
+ [inflections.core :refer [hyphenate underscore]]
+ [clj-time.core :as t]
+ [integrant.core :as ig])
+ (:import [clojure.lang ExceptionInfo]))
+
+(declare ensure-db ensure-table ensure-index)
+
+(defmethod ig/init-key :gallifrey/store [_ {:keys [db-server db-name]}]
+ (let [{:keys [host port] :or {host "localhost" port 28015}} db-server
+ connection (r/connect :host host :port port :db db-name)
+ assertions-table "assertions"
+ lifecycle-table "lifecycle"
+ entity-id-index "entity-id-index"]
+ (ensure-db connection db-name)
+ (ensure-table connection db-name assertions-table)
+ (ensure-index connection db-name assertions-table entity-id-index :entity-id)
+ (ensure-table connection db-name lifecycle-table)
+ {:db-server db-server
+ :db-name db-name
+ :assertions-table assertions-table
+ :lifecycle-table lifecycle-table
+ :entity-id-index entity-id-index
+ :connection connection
+ :entity-locks (atom {})}))
+
+(defmethod ig/halt-key! :gallifrey/store [_ {:keys [connection subscriptions]}]
+ (.close connection)
+ nil)
+
+
+(declare entity-lock entity-lifecycle update-lifecycle entity-assertions)
+
+(defn get-entity
+ "Get all the non-retracted attributes associated with an entity referenced by
+ entity-id at time 't-k'. 't-k' defaults to 'now' if it is not provided."
+ [store entity-id & {:keys [t-k]}]
+ (let [t-k (or t-k (t/now))]
+ (->> (entity-assertions store entity-id :t-k t-k)
+ (reduce (fn [doc {:keys [attribute value] :as a}]
+ (-> doc
+ (assoc (keyword attribute) value)
+ (assoc-in [:_meta (keyword attribute)]
+ (dissoc a :id :attribute :value :entity-id))))
+ {})
+ not-empty)))
+
+(defn entity-existed?
+ "Return a boolean indicating whether an entity referenced by entity-id
+ ever existed prior to 't-k'. 't-k' defaults to 'now' if it is not provided."
+ [store entity-id & {:keys [t-k]}]
+ (let [t-k (or t-k (t/now))
+ deleted (:deleted (entity-lifecycle store entity-id))]
+ (boolean (some #(t/before? % t-k) deleted))))
+
+(defn entity-lifespan
+ "Return a collection of hash map containing the :created, :updated and
+ :deleted timestamps for the entity as a whole."
+ [store entity-id]
+ (let [lifecycle (entity-lifecycle store entity-id)
+ ranges (map (fn [c d] [c d])
+ (:created lifecycle)
+ (concat (:deleted lifecycle) (repeat nil)))
+ lifespans (group-by (fn [u]
+ (reduce (fn [u [c d]]
+ (if (and (t/before? c u)
+ (or (nil? d)
+ (t/before? u d)))
+ (reduced [c d])
+ u)) u ranges))
+ (:updated lifecycle))]
+ (mapv (fn [[c d]]
+ (compact {:created c :deleted d
+ :updated (get lifespans [c d])})) ranges)))
+
+(defn put-entity
+ "Stores the assertions included in 'doc' and automatically retracts any prior
+ assertions against the attributes in 'doc'. If 'changes-only' is set to true,
+ only attributes included in 'doc' that are asserting new values are captured.
+
+ The 'actor' making the assertions must be provided as a string."
+ [store actor entity-id doc & {:keys [changes-only]
+ :or {changes-only true}}]
+ (locking (entity-lock store entity-id)
+ (let [entity (entity-lifecycle store entity-id)
+ now (t/now)
+ doc (map-keys name doc)
+ attributes (->> doc keys set)
+ assertions (->> (dissoc doc "id")
+ (map (fn [[k v]]
+ (compact
+ {:entity-id (str entity-id)
+ :attribute k
+ :value v
+ :k-start now
+ :k-start-actor actor})))
+ (remove nil?))
+ write-assertions (fn [assertions]
+ (-> (r/db (:db-name store)) (r/table (:assertions-table store))
+ (r/insert assertions {:conflict :replace})
+ (r/run (:connection store))))]
+ (if (or (not entity)
+ (and (not-empty (:deleted entity))
+ (t/before? (last (:created entity)) (last (:deleted entity)))
+ (t/before? (last (:deleted entity)) now)))
+ (do (write-assertions assertions)
+ (update-lifecycle store (-> entity (assoc :id entity-id)
+ (update :created conj now)))
+ {:created? true})
+ (let [existing-assertions (entity-assertions store entity-id :t-k now)
+ duplicates (set
+ (when changes-only
+ (->> existing-assertions
+ (filter (fn [{:keys [attribute value]}]
+ (= value (get doc attribute))))
+ (map :attribute))))
+ attributes (->> attributes (remove duplicates) set)
+ retractions (->> existing-assertions
+ (filter #(attributes (:attribute %)))
+ (map #(assoc % :k-end now :k-end-actor actor)))
+ assertions (remove #(duplicates (:attribute %)) assertions)]
+ (when (not-empty assertions)
+ (write-assertions (concat retractions assertions))
+ (update-lifecycle store (update entity :updated conj now)))
+ (merge
+ {:updated (mapv :attribute assertions)}
+ (when (not-empty duplicates)
+ {:ignored (vec duplicates)})))))))
+
+(defn delete-entity
+ "Automatically retracts all the assertions made against the entity referenced
+ by 'entity-id'"
+ [store actor entity-id]
+ (let [now (t/now)
+ retractions (->> (entity-assertions store entity-id :t-k now)
+ (map #(assoc % :k-end now :k-end-actor actor)))]
+ (update-lifecycle store (update (entity-lifecycle store entity-id) :deleted conj now))
+ (-> (r/db (:db-name store)) (r/table (:assertions-table store))
+ (r/insert retractions {:conflict :replace})
+ (r/run (:connection store)))))
+
+;;; Implementation
+
+(defn- entity-lock
+ [store entity-id]
+ (locking (:entity-locks store)
+ (if-let [l (get @(:entity-locks store) entity-id)]
+ l
+ (let [l (Object.)]
+ (swap! (:entity-locks store) assoc entity-id l)
+ l))))
+
+(defn- entity-lifecycle
+ [store entity-id]
+ (-> (r/db (:db-name store)) (r/table (:lifecycle-table store))
+ (r/get entity-id)
+ (r/run (:connection store))))
+
+(defn- update-lifecycle
+ [store lifecycle-record]
+ (-> (r/db (:db-name store)) (r/table (:lifecycle-table store))
+ (r/insert lifecycle-record {:conflict :update})
+ (r/run (:connection store))))
+
+(defn entity-assertions
+ [store entity-id & {:keys [t-k]}]
+ (-> (r/db (:db-name store)) (r/table (:assertions-table store))
+ (r/get-all [entity-id] {:index (:entity-id-index store)})
+ (cond-> t-k (r/filter (r/fn [row]
+ (r/and
+ (r/le (r/get-field row "k-start") t-k)
+ (r/or (r/not (r/has-fields row "k-end"))
+ (r/gt (r/get-field row "k-end") t-k))))))
+ (r/run (:connection store))))
+
+(defn- ensure-db
+ [conn db-name]
+ (when-not ((set (r/run (r/db-list) conn)) db-name)
+ (r/run (r/db-create db-name) conn)))
+
+(defn- ensure-table
+ [conn db-name assertions-table & {:keys [init-fn]}]
+ (when-not ((set (r/run (r/table-list (r/db db-name)) conn)) assertions-table)
+ (-> (r/db db-name) (r/table-create assertions-table) (r/run conn))))
+
+(defn- ensure-index
+ [conn db-name assertions-table index-name field]
+ (when-not ((set (-> (r/db db-name) (r/table assertions-table)
+ (r/index-list) (r/run conn))) index-name)
+ (-> (r/db db-name)
+ (r/table assertions-table)
+ (r/index-create index-name (r/fn [row]
+ (r/get-field row (->keyword field))))
+ (r/run conn))))