From d0dceef6a3504a376cc689afc119ef15a56b5f7a Mon Sep 17 00:00:00 2001 From: "C.T. Paterson" Date: Tue, 4 Sep 2018 14:05:24 -0400 Subject: Adding version.properties for build job, tests Issue-ID: AAI-1547 Change-Id: I70ecf0f427eb30da8df1ec3e2a0cec383e0063fb Signed-off-by: C.T. Paterson --- src/chameleon/core.clj | 13 +++++++++ src/chameleon/event.clj | 4 +-- src/chameleon/kafka.clj | 54 +++++++++++++++++++++++--------------- src/chameleon/logging.clj | 18 +++---------- src/chameleon/route.clj | 42 ++++++++++++++--------------- src/chameleon/specs.clj | 67 +++++++++++++++++++++++++++++++++++++++-------- 6 files changed, 129 insertions(+), 69 deletions(-) create mode 100644 src/chameleon/core.clj (limited to 'src') diff --git a/src/chameleon/core.clj b/src/chameleon/core.clj new file mode 100644 index 0000000..adcb223 --- /dev/null +++ b/src/chameleon/core.clj @@ -0,0 +1,13 @@ +(ns chameleon.core + (:require [clojure.spec.alpha :as s])) + +(defn conform-multiple + [& spec-form-pair] + (if (s/valid? :chameleon.specs/spec-form-pair spec-form-pair) + (->> spec-form-pair + (partition 2) + (map (fn [[sp form]] + (when (s/invalid? (s/conform sp form)) + (s/explain-data sp form)))) + (remove nil?)) + (s/explain-data ::spec-form-pair spec-form-pair))) diff --git a/src/chameleon/event.clj b/src/chameleon/event.clj index 2eddf85..40592dc 100644 --- a/src/chameleon/event.clj +++ b/src/chameleon/event.clj @@ -39,9 +39,9 @@ [event-config gallifrey-host loggers] (let [{:keys [topic consumer-group processor kafka-config]} (:aai event-config) [error-logger audit-logger] loggers - kfc (ck/clj-kafka-consumer kafka-config consumer-group topic) + kfc (ck/clj-kafka-consumer kafka-config consumer-group topic error-logger) chan (ca/chan 5) - error-chan (ck/subscribe kfc chan 30000 "Polling-Kafka-Thread")] + error-chan (ck/subscribe kfc chan 30000 "Polling-Kafka-Thread" error-logger)] (log/info error-logger "EVENT_PROCESSOR" [(format "AAI created. Starting polling a KAFKA Topic '%s' on '%s'" topic (kafka-config "bootstrap.servers"))]) (ca/go-loop [] diff --git a/src/chameleon/kafka.clj b/src/chameleon/kafka.clj index ca730a1..ae8e77f 100644 --- a/src/chameleon/kafka.clj +++ b/src/chameleon/kafka.clj @@ -1,6 +1,9 @@ (ns chameleon.kafka (:refer-clojure :exclude [send]) - (:require [clojure.core.async :as ca]) + (:require [clojure.core.async :as ca] + [clojure.spec.alpha :as s] + [chameleon.core :as core] + [chameleon.logging :as log]) (:import [java.util Properties] [org.apache.kafka.clients.consumer KafkaConsumer] [org.apache.kafka.clients.consumer ConsumerRecord] @@ -15,7 +18,7 @@ (def deser StringDeserializer) (def ser StringSerializer) -(defmacro run-away +(defmacro threaded [name & body] `(try (.start (Thread. (fn [] ~@body) ~name)) (catch OutOfMemoryError e# @@ -23,24 +26,39 @@ (defn clj-kafka-consumer "Given a config, group-id, and a topic, return a CljKafkaConsumer" - [config group-id topic] - (CljKafkaConsumer. (assoc config "group.id" group-id) topic)) + [config group-id topic error-logger] + (let [valid? (core/conform-multiple :kafka/config config + :chameleon.specs/string group-id + :chameleon.specs/string topic)] + (if-not (seq valid?) + (CljKafkaConsumer. (assoc config "group.id" group-id) topic) + (log/info error-logger "ERROR" (->> valid? + (into ["SPEC ERROR"]) + (mapv str)))))) (defn subscribe "Given a CljKafkaConsumer, a channel, and a session timeout (in ms), return a channel. The input channel is where the messages will be published." - ([{:keys [config topic] :as conf} channel session-timeout] + ([{:keys [config topic] :as conf} channel session-timeout error-logger] (subscribe conf channel session-timeout "Kafka-Subscriber-Thread")) - ([{:keys [config topic]} channel session-timeout thread-name] - (let [control-channel (ca/chan (ca/dropping-buffer 1))] - (run-away thread-name (consume config topic channel session-timeout control-channel)) - control-channel))) + ([{:keys [config topic]} channel session-timeout thread-name error-logger] + (let [valid? (core/conform-multiple :kafka/timeout session-timeout + :kafka/chan channel + :kafka/config config + :chameleon.specs/string topic) + control-channel (ca/chan (ca/dropping-buffer 1))] + (if-not (seq valid?) + (do (threaded thread-name (consume config topic channel session-timeout control-channel)) + control-channel) + (log/info error-logger "ERROR" (->> valid? + (into ["SPEC ERROR"]) + (mapv str))))))) ;; Private (defn- consumer-record->clojure - [consumer-record] + [^ConsumerRecord consumer-record] {:topic (.topic ^ConsumerRecord consumer-record) :partition (.partition ^ConsumerRecord consumer-record) :offset (.offset ^ConsumerRecord consumer-record) @@ -61,13 +79,6 @@ true)) true)) -(defn- as-properties - [m] - (reduce (fn [props [k v]] - (.setProperty ^Properties props k v) - props) - (Properties.) m)) - (defn- consume "Consume elements from a topic and put it on the given channel. Block until a given timeout to put messages on the channel @@ -79,13 +90,14 @@ kaka consumer will be disconnected and an exception will be put on the control channel." [config topic chan sla-timeout control-channel] - (let [consumer (KafkaConsumer. ^Properties (as-properties config))] + (let [consumer (KafkaConsumer. config)] (.subscribe ^KafkaConsumer consumer (list topic)) (loop [] (let [consumer-records (.poll consumer 0)] - (when (and (->> consumer-records - (map consumer-record->clojure) - (send-with-sla-timeout chan sla-timeout)) + (when (and (->> consumer-records + (s/conform :kafka/consumer-record) + (map consumer-record->clojure) + (send-with-sla-timeout chan sla-timeout)) (try (.commitSync consumer) true (catch Exception e diff --git a/src/chameleon/logging.clj b/src/chameleon/logging.clj index aa43974..8ac078c 100644 --- a/src/chameleon/logging.clj +++ b/src/chameleon/logging.clj @@ -3,7 +3,8 @@ [camel-snake-kebab.extras :refer [transform-keys]] [clojure.java.io :as io] [integrant.core :as ig] - [clojure.spec.alpha :as s]) + [clojure.spec.alpha :as s] + [chameleon.core :as core]) (:import [org.onap.aai.cl.api Logger LogFields LogLine] [org.onap.aai.cl.eelf LoggerFactory LogMessageEnum AaiLoggerAdapter AuditLogLine] [org.onap.aai.cl.mdc MdcContext MdcOverride] @@ -20,17 +21,6 @@ (EELFResourceManager/loadMessageBundle logmsgs) [(error-logger "chameleon.loggging") (audit-logger "chameleon.loggging")]) -(defn conform-multiple - [& spec-form-pair] - (if (s/valid? :chameleon.specs/spec-form-pair spec-form-pair) - (->> spec-form-pair - (partition 2) - (map (fn [[sp form]] - (when (s/invalid? (s/conform sp form)) - (s/explain-data sp form)))) - (remove nil?)) - (s/explain-data :chameleon.specs/spec-form-pair spec-form-pair))) - (defn mdc-set! "Sets the global MDC context for the current thread." [m] @@ -72,8 +62,8 @@ (defn info [^AaiLoggerAdapter logger ^String enum msgs & {:keys [fields] :or {fields {}}}] - (let [confirmed-specs (conform-multiple :logging/valid-fields fields :logging/msgs msgs - :chameleon.specs/logger logger)] + (let [confirmed-specs (core/conform-multiple :logging/valid-fields fields :logging/msgs msgs + :chameleon.specs/logger logger)] (if (empty? confirmed-specs) (.info logger (string->enum enum) (logfields fields) (into-array java.lang.String msgs)) (.info logger (string->enum "ERROR") (logfields fields) (->> confirmed-specs diff --git a/src/chameleon/route.clj b/src/chameleon/route.clj index 707c608..bd74df5 100644 --- a/src/chameleon/route.clj +++ b/src/chameleon/route.clj @@ -19,34 +19,34 @@ (defn assert-create! "Creates an entity in Gallifrey with an initial set of assertions coming from the provided payload" [host actor type key payload & [time-dimensions]] - (kitclient/request {:url (str "https://" host "/" type "/" key) - :method :put - :query-params (into {"actor" actor "create" "true"} time-dimensions) - :body payload - :insecure? true - :keepalive 300 - :timeout 1000})) + @(kitclient/request {:url (str "https://" host "/" type "/" key) + :method :put + :query-params (into {"actor" actor "create" "true"} time-dimensions) + :body payload + :insecure? true + :keepalive 300 + :timeout 1000})) (defn assert-update! "Update an entity in Gallifrey with a set of assertions coming from the provided payload" [host actor type key payload & [time-dimensions]] - (kitclient/request {:url (str "https://" host "/" type "/" key) - :method :put - :query-params (into {"actor" actor "changes-only" "true"} time-dimensions) - :body payload - :insecure? true - :keepalive 300 - :timeout 1000})) + @(kitclient/request {:url (str "https://" host "/" type "/" key) + :method :put + :query-params (into {"actor" actor "changes-only" "true"} time-dimensions) + :body payload + :insecure? true + :keepalive 300 + :timeout 1000})) (defn assert-delete! "Assert a deletion for an entity in Gallifrey based on the provided key." [host actor type key & [time-dimensions]] - (kitclient/request {:url (str "https://" host "/" type "/" key) - :method :delete - :query-params (into {"actor" actor} time-dimensions) - :insecure? true - :keepalive 300 - :timeout 1000})) + @(kitclient/request {:url (str "https://" host "/" type "/" key) + :method :delete + :query-params (into {"actor" actor} time-dimensions) + :insecure? true + :keepalive 300 + :timeout 1000})) (defn assert-gallifrey! [host actor type payload & [e-logger a-logger]] @@ -58,7 +58,7 @@ "CREATE" (assert-create! host actor type key body time) "UPDATE" (assert-update! host actor type key body time) "DELETE" (assert-delete! host actor type key time)) - {:keys [status body]} @g-assert] + {:keys [status body]} g-assert] (log/info e-logger "RESPONSE" (mapv str [operation key status body])) (if (and (>= status 200) (<= status 299)) (log/info e-logger "GALLIFREY_ASSERTED" ["SUCCEEDED" (str type) (str key)]) diff --git a/src/chameleon/specs.clj b/src/chameleon/specs.clj index 630a4a8..4158e09 100644 --- a/src/chameleon/specs.clj +++ b/src/chameleon/specs.clj @@ -3,12 +3,17 @@ [chameleon.logging :as log] [clojure.spec.gen.alpha :as gen] [cheshire.core :as c] - [clojure.string :as str])) + [clojure.core.async :as ca] + [clojure.string :as str] + [cheshire.core :as json]) + (:import [org.apache.kafka.clients.consumer ConsumerRecord] + [java.util Properties])) (s/def ::host string?) (s/def ::provenance string?) (s/def ::payload map?) (s/def ::string (s/spec (s/and string? (complement str/blank?)) :gen gen/string-alphanumeric)) +(s/def ::int (s/spec (s/and int? #(< 0 %) #(> 999999999 %)))) (s/def ::type (s/spec ::string :gen #(gen/elements ["vserver" "pserver" "generic-vnf"]))) (s/def ::id uuid?) (s/def ::source (s/keys :req-un [::type ::id])) @@ -38,25 +43,24 @@ :spike/operation :spike/timestamp])) (s/def :spike/payload (s/spec string? :gen #(gen/fmap (partial c/generate-string) (s/gen :spike/event)))) -(s/def :gallifrey/k-end-actor ::string) ;; gallifrey response -(s/def :gallifrey/k-start-actor ::string) -(s/def :gallifrey/k-end inst?) -(s/def :gallifrey/k-start inst?) -(s/def :gallifrey/history (s/keys :req-un [:gallifrey/k-end-actor :gallifrey/k-end - :gallifrey/k-start-actor :gallifrey/k-start])) (s/def :relationship/_meta (s/map-of ::string :gallifrey/history)) (s/def :relationship/_type (s/spec string? :gen #(gen/return "relationship"))) -(s/def :gallifrey/_type (s/spec ::string :gen #(gen/return "entity"))) -(s/def :gallifrey/properties (s/keys :req-un [:gallifrey/_type ::type])) -(s/def :relationship/type (s/spec string? :gen #(->> (gen/string-alphanumeric) - (gen/such-that (complement str/blank?)) +(s/def :relationship/type (s/spec string? :gen #(->> (s/gen ::string) (gen/fmap (partial str "tosca.relationship."))))) (s/def :relationship/properties (s/keys :req-un [:relationship/_type :relationship/type])) (s/def ::relationship (s/keys :req-un [:relationship/properties ::source ::target :relationship/_meta ::_id])) (s/def ::relationships (s/coll-of ::relationship :gen-max 8)) +(s/def :gallifrey/k-start-actor ::string) +(s/def :gallifrey/k-end inst?) +(s/def :gallifrey/k-start inst?) +(s/def :gallifrey/k-end-actor ::string) +(s/def :gallifrey/history (s/keys :req-un [:gallifrey/k-start-actor :gallifrey/k-start] + :opt [:gallifrey/k-end-actor :gallifrey/k-end])) +(s/def :gallifrey/_type (s/spec ::string :gen #(gen/return "entity"))) +(s/def :gallifrey/properties (s/keys :req-un [:gallifrey/_type ::type])) (s/def :gallifrey/payload (s/spec map? :gen #(->> [::_id :gallifrey/properties :gallifrey/properties ::relationships] @@ -64,8 +68,49 @@ s/gen (gen/fmap (partial clojure.walk/stringify-keys))))) +(s/def :gallifrey/response (s/spec ::string)) + + +;; REST Requests + +(s/def :route/response (s/spec #(instance? clojure.lang.IDeref %) :gen (fn [] (->> (s/gen ::string) + (gen/fmap #(future %)))))) + +(s/def :stubbed/gallifrey-response + (s/spec string?)) + ;; Logger specs (s/def ::logger (s/spec log/logger? :gen #(gen/return (log/error-logger "chameleon.specs")))) (s/def ::loggers (s/cat :e :chameleon.specs/logger :a :chameleon.specs/logger)) (s/def :logging/msgs (s/* string?)) (s/def :logging/valid-fields log/valid-logfields?) + +;; Kafka Specs +(defn channel? + [x] + (and (satisfies? clojure.core.async.impl.protocols/Channel x) + (satisfies? clojure.core.async.impl.protocols/WritePort x) + (satisfies? clojure.core.async.impl.protocols/ReadPort x))) + +(s/def :kafka/config (s/map-of ::string ::string)) +(s/def :kafka/timeout ::int) +(s/def :kafka/chan (s/spec channel? :gen #(gen/return (ca/chan)))) +(s/def :kafka/topic ::string) +(s/def :kafka/partition ::int) +(s/def :kafka/offset ::int) +(s/def :kafka/key ::string) +(s/def :kafka/value ::string) + +(s/def :kafka/consumer-record (s/spec #(instance? ConsumerRecord %) + :gen (fn [] (->> (s/cat :topic :kafka/topic + :partition :kafka/partition + :offset :kafka/offset + :key :kafka/key + :value :kafka/value) + s/gen + (gen/fmap (fn [[topic partition offset key value]] + (ConsumerRecord. topic partition offset key value))))))) + +(s/def :kafka/clojure-consumer-record (s/spec (s/keys :req-un [:kafka/topic :kafka/partition + :kafka/offset :kafka/key :kafka/value]))) +(s/def :kafka/properties (s/spec #(instance? Properties %) :gen #(s/tuple keyword? ::string))) -- cgit 1.2.3-korg