From e023325d3e76a71ac795ebbdb74f5a89756040a7 Mon Sep 17 00:00:00 2001 From: Adrian Batos-Parac Date: Thu, 22 Feb 2018 14:43:42 -0500 Subject: Initial Commit of Chameleon Commit the initial set of code for the Chameleon offering to ONAP Change-Id: Ia58bd49eafc0a3702c17c9cab34d666ed1627ba5 Issue-ID: AAI-797 Signed-off-by: Adrian Batos-Parac --- src/chameleon/aai_processor.clj | 26 +++++++++++++++ src/chameleon/config.clj | 18 +++++++++++ src/chameleon/event.clj | 18 +++++++++++ src/chameleon/handler.clj | 61 +++++++++++++++++++++++++++++++++++ src/chameleon/http_server.clj | 9 ++++++ src/chameleon/route.clj | 71 +++++++++++++++++++++++++++++++++++++++++ src/chameleon/txform.clj | 19 +++++++++++ 7 files changed, 222 insertions(+) create mode 100644 src/chameleon/aai_processor.clj create mode 100644 src/chameleon/config.clj create mode 100644 src/chameleon/event.clj create mode 100644 src/chameleon/handler.clj create mode 100644 src/chameleon/http_server.clj create mode 100644 src/chameleon/route.clj create mode 100644 src/chameleon/txform.clj (limited to 'src') diff --git a/src/chameleon/aai_processor.clj b/src/chameleon/aai_processor.clj new file mode 100644 index 0000000..c709ed1 --- /dev/null +++ b/src/chameleon/aai_processor.clj @@ -0,0 +1,26 @@ +(ns chameleon.aai-processor + (:require [chameleon.txform :refer :all] + [chameleon.route :refer :all] + [cheshire.core :refer :all])) + +(defn from-gallifrey + "Transforms Gallifrey response payloads into a format consumable by AAI-centric clients" + [body] + (->> body + (map (fn [[k v]] [(clojure.string/split k #"\.") v])) + ((fn [x] (reduce #(assoc-in %1 (first %2) (second %2) ) {} x))))) + +(defn from-spike + "Transforms Spike-based event payloads to a format accepted by Gallifrey for vertices and relationships" + [gallifrey-host payload] + (let [txpayload (map-keywords (parse-string payload)) + operation (:operation txpayload) + entity-type (if (contains? txpayload :vertex) + :vertex + :relationship) + entity (map-keywords (entity-type txpayload)) + key (:key entity) + properties (assoc (:properties entity) :type (:type entity))] + (assert-gallifrey gallifrey-host "aai" (if (= entity-type :vertex) + {:meta {:key key :operation operation} :body (generate-string properties)} + {:meta {:key key :operation operation} :body (generate-string (conj properties (flatten-entry entity :source) (flatten-entry entity :target)))})))) diff --git a/src/chameleon/config.clj b/src/chameleon/config.clj new file mode 100644 index 0000000..4982f4a --- /dev/null +++ b/src/chameleon/config.clj @@ -0,0 +1,18 @@ +(ns chameleon.config + (:require [integrant.core :as ig] + [chameleon.aai-processor :refer :all])) + +(defn config + [app-config] + (let [conf { + :chameleon/event + {:event-config (assoc-in (:event-config app-config) + [:aai :processor] from-spike) + :gallifrey-host (:gallifrey-host app-config)} + :chameleon/handler + {:gallifrey-host (:gallifrey-host app-config)} + :chameleon/http-server + {:port (:http-port app-config) + :handler (ig/ref :chameleon/handler)}}] + (ig/load-namespaces conf) + conf)) diff --git a/src/chameleon/event.clj b/src/chameleon/event.clj new file mode 100644 index 0000000..8201acb --- /dev/null +++ b/src/chameleon/event.clj @@ -0,0 +1,18 @@ +(ns chameleon.event + (:require [chameleon.txform] + [chameleon.route] + [integrant.core :as ig]) + (:import [org.onap.aai.event.client DMaaPEventConsumer])) + +(defmethod ig/init-key :chameleon/event + [_ {:keys [event-config gallifrey-host]}] + (let [{:keys [host topic motsid pass consumer-group consumer-id timeout batch-size type processor]} (:aai event-config) + event-processor (DMaaPEventConsumer. host topic motsid pass consumer-group consumer-id timeout batch-size type)] + (println "Event processor for AAI created. Starting event polling on " host topic) + (.start (Thread. (fn [] (while true + (let [it (.iterator (.consume event-processor))] + (println "Polling...") + (while (.hasNext it) + (let [event (.next it)] + (processor gallifrey-host event)))))))) + )) diff --git a/src/chameleon/handler.clj b/src/chameleon/handler.clj new file mode 100644 index 0000000..7a4a5fe --- /dev/null +++ b/src/chameleon/handler.clj @@ -0,0 +1,61 @@ +(ns chameleon.handler + (:require [chameleon.route :as c-route] + [chameleon.aai-processor] + [utilis.map :refer [map-vals compact]] + [liberator.core :refer [defresource]] + [compojure.core :refer [GET PUT PATCH ANY defroutes]] + [compojure.route :refer [resources]] + [ring.util.response :refer [resource-response content-type]] + [ring.middleware.defaults :refer [wrap-defaults api-defaults]] + [ring.middleware.anti-forgery :refer [wrap-anti-forgery]] + [ring.middleware.session :refer [wrap-session]] + [cheshire.core :as json] + [clj-time.format :as tf] + [integrant.core :as ig])) + +(declare handler) + +(defonce ^:private g-host (atom nil)) + +(defmethod ig/init-key :chameleon/handler [_ {:keys [gallifrey-host]}] + (reset! g-host gallifrey-host) + handler) + +(defmethod ig/halt-key! :chameleon/handler [_ _] + (reset! g-host nil)) + +(declare serialize de-serialize) + +(defresource entity-endpoint [id] + :allowed-methods [:get] + :available-media-types ["application/json"] + :exists? (fn [ctx] + (let [resource (-> (c-route/query @g-host id (-> ctx :request :params :t-k)))] + (when (= (:status resource) 200) + {::resource (-> resource :body json/parse-string (dissoc "_meta") (chameleon.aai-processor/from-gallifrey))}))) + :existed? (fn [ctx] + (when-let [status (-> (c-route/query @g-host id (-> ctx :request :params :t-k)) :status)] + (= status 410))) + :handle-ok ::resource) + +(defroutes app-routes + (GET "/entity/:id" [id] (entity-endpoint id)) + (resources "/")) + +(def handler + (-> app-routes + (wrap-defaults api-defaults))) + + +;;; Implementation + +(defn- serialize + [e] + (compact + (update e :_meta #(map-vals + (fn [m] + (map-vals str m)) %)))) + +(defn- de-serialize + [e] + e) diff --git a/src/chameleon/http_server.clj b/src/chameleon/http_server.clj new file mode 100644 index 0000000..cf8ae07 --- /dev/null +++ b/src/chameleon/http_server.clj @@ -0,0 +1,9 @@ +(ns chameleon.http-server + (:require [org.httpkit.server :refer [run-server]] + [integrant.core :as ig])) + +(defmethod ig/init-key :chameleon/http-server [_ {:keys [port handler]}] + (run-server handler {:port port})) + +(defmethod ig/halt-key! :chameleon/http-server [_ server] + (server :timeout 100)) diff --git a/src/chameleon/route.clj b/src/chameleon/route.clj new file mode 100644 index 0000000..2b30f42 --- /dev/null +++ b/src/chameleon/route.clj @@ -0,0 +1,71 @@ +(ns chameleon.route + (:require [org.httpkit.client :as kitclient])) + +(defn- interpret-response + "Print out the response from the Gallifrey server" + [key response] + (let [{:keys [status body]}@response] + (println "Response for request with key " key " resulted in status " status + " with body " body ))) + +(defn query + "Retrieve an entity referenced by id at the provided host. Optionally provide + a time 't-k' that defines a query based on when the system knew about the + state of the entity." + [host key & [k]] + @(kitclient/request { + :url (str "https://" host "/entity/" key) + :method :get + :query-params (if-let [t-k k] {"t-k" t-k}) + :insecure? true + :keepalive 300 + :timeout 1000})) + +(defn assert-create + "Creates an entity in Gallifrey with an initial set of assertions coming from the provided payload" + [host actor key payload] + (print "Final: " payload) + (kitclient/request { + :url (str "https://" host "/entity/" key) + :method :put + :query-params {"actor" actor "create" "true"} + :body payload + :insecure? true + :keepalive 300 + :timeout 1000})) + +(defn assert-update + "Update an entity in Gallifrey with a set of assertions coming from the provided payload" + [host actor key payload] + (kitclient/request { + :url (str "https://" host "/entity/" key) + :method :put + :query-params {"actor" actor "changes-only" "true"} + :body payload + :insecure? true + :keepalive 300 + :timeout 1000})) + +(defn assert-delete + "Assert a deletion for an entity in Gallifrey based on the provided key." + [host actor key] + (kitclient/request { + :url (str "https://" host "/entity/" key) + :method :delete + :query-params {"actor" actor} + :insecure? true + :keepalive 300 + :timeout 1000})) + +(defn assert-gallifrey [host actor payload] + "Propagates an assertion to Gallifrey based off of an event payload coming in from the event service." + (let [{:keys [meta body]} payload + {:keys [key operation]} meta] + (println operation " entity with key " key) + (interpret-response key (case operation + "CREATE" + (assert-create host actor key body) + "UPDATE" + (assert-update host actor key body) + "DELETE" + (assert-delete host actor key))))) diff --git a/src/chameleon/txform.clj b/src/chameleon/txform.clj new file mode 100644 index 0000000..95f357f --- /dev/null +++ b/src/chameleon/txform.clj @@ -0,0 +1,19 @@ +(ns chameleon.txform + (:require [cheshire.core :refer :all] + [clojure.string :as str])) + +(defn map-keywords + "Maps all string based keys to keywords" + [kmap] + (into {} (for [[key value] kmap] [(keyword key) value]))) + +(defn flatten-key + "Maps a parent-child pair to a period separated keyword" + [parent child] + (keyword (str (name parent) "." (name child)))) + +(defn flatten-entry + "Flattens a nested map entry to a period separated keyword entry" + [map key] + (reduce #(assoc %1 (flatten-key key (first %2)) (second %2)) + {} (seq (key map)))) -- cgit 1.2.3-korg