aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorC.T. Paterson <ctpaterson+onap@gmail.com>2018-09-04 14:05:24 -0400
committerC.T. Paterson <ctpaterson+onap@gmail.com>2018-09-04 14:49:55 -0400
commitd0dceef6a3504a376cc689afc119ef15a56b5f7a (patch)
tree6403bf7e70a014860e426589bdeec9fc11401607 /src
parent1586036fd345c240982237b76469a1f5d04d122d (diff)
Adding version.properties for build job, tests
Issue-ID: AAI-1547 Change-Id: I70ecf0f427eb30da8df1ec3e2a0cec383e0063fb Signed-off-by: C.T. Paterson <ctpaterson+onap@gmail.com>
Diffstat (limited to 'src')
-rw-r--r--src/chameleon/core.clj13
-rw-r--r--src/chameleon/event.clj4
-rw-r--r--src/chameleon/kafka.clj54
-rw-r--r--src/chameleon/logging.clj18
-rw-r--r--src/chameleon/route.clj42
-rw-r--r--src/chameleon/specs.clj67
6 files changed, 129 insertions, 69 deletions
diff --git a/src/chameleon/core.clj b/src/chameleon/core.clj
new file mode 100644
index 0000000..adcb223
--- /dev/null
+++ b/src/chameleon/core.clj
@@ -0,0 +1,13 @@
+(ns chameleon.core
+ (:require [clojure.spec.alpha :as s]))
+
+(defn conform-multiple
+ [& spec-form-pair]
+ (if (s/valid? :chameleon.specs/spec-form-pair spec-form-pair)
+ (->> spec-form-pair
+ (partition 2)
+ (map (fn [[sp form]]
+ (when (s/invalid? (s/conform sp form))
+ (s/explain-data sp form))))
+ (remove nil?))
+ (s/explain-data ::spec-form-pair spec-form-pair)))
diff --git a/src/chameleon/event.clj b/src/chameleon/event.clj
index 2eddf85..40592dc 100644
--- a/src/chameleon/event.clj
+++ b/src/chameleon/event.clj
@@ -39,9 +39,9 @@
[event-config gallifrey-host loggers]
(let [{:keys [topic consumer-group processor kafka-config]} (:aai event-config)
[error-logger audit-logger] loggers
- kfc (ck/clj-kafka-consumer kafka-config consumer-group topic)
+ kfc (ck/clj-kafka-consumer kafka-config consumer-group topic error-logger)
chan (ca/chan 5)
- error-chan (ck/subscribe kfc chan 30000 "Polling-Kafka-Thread")]
+ error-chan (ck/subscribe kfc chan 30000 "Polling-Kafka-Thread" error-logger)]
(log/info error-logger "EVENT_PROCESSOR"
[(format "AAI created. Starting polling a KAFKA Topic '%s' on '%s'" topic (kafka-config "bootstrap.servers"))])
(ca/go-loop []
diff --git a/src/chameleon/kafka.clj b/src/chameleon/kafka.clj
index ca730a1..ae8e77f 100644
--- a/src/chameleon/kafka.clj
+++ b/src/chameleon/kafka.clj
@@ -1,6 +1,9 @@
(ns chameleon.kafka
(:refer-clojure :exclude [send])
- (:require [clojure.core.async :as ca])
+ (:require [clojure.core.async :as ca]
+ [clojure.spec.alpha :as s]
+ [chameleon.core :as core]
+ [chameleon.logging :as log])
(:import [java.util Properties]
[org.apache.kafka.clients.consumer KafkaConsumer]
[org.apache.kafka.clients.consumer ConsumerRecord]
@@ -15,7 +18,7 @@
(def deser StringDeserializer)
(def ser StringSerializer)
-(defmacro run-away
+(defmacro threaded
[name & body]
`(try (.start (Thread. (fn [] ~@body) ~name))
(catch OutOfMemoryError e#
@@ -23,24 +26,39 @@
(defn clj-kafka-consumer
"Given a config, group-id, and a topic, return a CljKafkaConsumer"
- [config group-id topic]
- (CljKafkaConsumer. (assoc config "group.id" group-id) topic))
+ [config group-id topic error-logger]
+ (let [valid? (core/conform-multiple :kafka/config config
+ :chameleon.specs/string group-id
+ :chameleon.specs/string topic)]
+ (if-not (seq valid?)
+ (CljKafkaConsumer. (assoc config "group.id" group-id) topic)
+ (log/info error-logger "ERROR" (->> valid?
+ (into ["SPEC ERROR"])
+ (mapv str))))))
(defn subscribe
"Given a CljKafkaConsumer, a channel, and a session timeout (in
ms), return a channel. The input channel is where the messages will
be published."
- ([{:keys [config topic] :as conf} channel session-timeout]
+ ([{:keys [config topic] :as conf} channel session-timeout error-logger]
(subscribe conf channel session-timeout "Kafka-Subscriber-Thread"))
- ([{:keys [config topic]} channel session-timeout thread-name]
- (let [control-channel (ca/chan (ca/dropping-buffer 1))]
- (run-away thread-name (consume config topic channel session-timeout control-channel))
- control-channel)))
+ ([{:keys [config topic]} channel session-timeout thread-name error-logger]
+ (let [valid? (core/conform-multiple :kafka/timeout session-timeout
+ :kafka/chan channel
+ :kafka/config config
+ :chameleon.specs/string topic)
+ control-channel (ca/chan (ca/dropping-buffer 1))]
+ (if-not (seq valid?)
+ (do (threaded thread-name (consume config topic channel session-timeout control-channel))
+ control-channel)
+ (log/info error-logger "ERROR" (->> valid?
+ (into ["SPEC ERROR"])
+ (mapv str)))))))
;; Private
(defn- consumer-record->clojure
- [consumer-record]
+ [^ConsumerRecord consumer-record]
{:topic (.topic ^ConsumerRecord consumer-record)
:partition (.partition ^ConsumerRecord consumer-record)
:offset (.offset ^ConsumerRecord consumer-record)
@@ -61,13 +79,6 @@
true))
true))
-(defn- as-properties
- [m]
- (reduce (fn [props [k v]]
- (.setProperty ^Properties props k v)
- props)
- (Properties.) m))
-
(defn- consume
"Consume elements from a topic and put it on the given
channel. Block until a given timeout to put messages on the channel
@@ -79,13 +90,14 @@
kaka consumer will be disconnected and an exception will be put on
the control channel."
[config topic chan sla-timeout control-channel]
- (let [consumer (KafkaConsumer. ^Properties (as-properties config))]
+ (let [consumer (KafkaConsumer. config)]
(.subscribe ^KafkaConsumer consumer (list topic))
(loop []
(let [consumer-records (.poll consumer 0)]
- (when (and (->> consumer-records
- (map consumer-record->clojure)
- (send-with-sla-timeout chan sla-timeout))
+ (when (and (->> consumer-records
+ (s/conform :kafka/consumer-record)
+ (map consumer-record->clojure)
+ (send-with-sla-timeout chan sla-timeout))
(try (.commitSync consumer)
true
(catch Exception e
diff --git a/src/chameleon/logging.clj b/src/chameleon/logging.clj
index aa43974..8ac078c 100644
--- a/src/chameleon/logging.clj
+++ b/src/chameleon/logging.clj
@@ -3,7 +3,8 @@
[camel-snake-kebab.extras :refer [transform-keys]]
[clojure.java.io :as io]
[integrant.core :as ig]
- [clojure.spec.alpha :as s])
+ [clojure.spec.alpha :as s]
+ [chameleon.core :as core])
(:import [org.onap.aai.cl.api Logger LogFields LogLine]
[org.onap.aai.cl.eelf LoggerFactory LogMessageEnum AaiLoggerAdapter AuditLogLine]
[org.onap.aai.cl.mdc MdcContext MdcOverride]
@@ -20,17 +21,6 @@
(EELFResourceManager/loadMessageBundle logmsgs)
[(error-logger "chameleon.loggging") (audit-logger "chameleon.loggging")])
-(defn conform-multiple
- [& spec-form-pair]
- (if (s/valid? :chameleon.specs/spec-form-pair spec-form-pair)
- (->> spec-form-pair
- (partition 2)
- (map (fn [[sp form]]
- (when (s/invalid? (s/conform sp form))
- (s/explain-data sp form))))
- (remove nil?))
- (s/explain-data :chameleon.specs/spec-form-pair spec-form-pair)))
-
(defn mdc-set!
"Sets the global MDC context for the current thread."
[m]
@@ -72,8 +62,8 @@
(defn info
[^AaiLoggerAdapter logger ^String enum msgs & {:keys [fields] :or {fields {}}}]
- (let [confirmed-specs (conform-multiple :logging/valid-fields fields :logging/msgs msgs
- :chameleon.specs/logger logger)]
+ (let [confirmed-specs (core/conform-multiple :logging/valid-fields fields :logging/msgs msgs
+ :chameleon.specs/logger logger)]
(if (empty? confirmed-specs)
(.info logger (string->enum enum) (logfields fields) (into-array java.lang.String msgs))
(.info logger (string->enum "ERROR") (logfields fields) (->> confirmed-specs
diff --git a/src/chameleon/route.clj b/src/chameleon/route.clj
index 707c608..bd74df5 100644
--- a/src/chameleon/route.clj
+++ b/src/chameleon/route.clj
@@ -19,34 +19,34 @@
(defn assert-create!
"Creates an entity in Gallifrey with an initial set of assertions coming from the provided payload"
[host actor type key payload & [time-dimensions]]
- (kitclient/request {:url (str "https://" host "/" type "/" key)
- :method :put
- :query-params (into {"actor" actor "create" "true"} time-dimensions)
- :body payload
- :insecure? true
- :keepalive 300
- :timeout 1000}))
+ @(kitclient/request {:url (str "https://" host "/" type "/" key)
+ :method :put
+ :query-params (into {"actor" actor "create" "true"} time-dimensions)
+ :body payload
+ :insecure? true
+ :keepalive 300
+ :timeout 1000}))
(defn assert-update!
"Update an entity in Gallifrey with a set of assertions coming from the provided payload"
[host actor type key payload & [time-dimensions]]
- (kitclient/request {:url (str "https://" host "/" type "/" key)
- :method :put
- :query-params (into {"actor" actor "changes-only" "true"} time-dimensions)
- :body payload
- :insecure? true
- :keepalive 300
- :timeout 1000}))
+ @(kitclient/request {:url (str "https://" host "/" type "/" key)
+ :method :put
+ :query-params (into {"actor" actor "changes-only" "true"} time-dimensions)
+ :body payload
+ :insecure? true
+ :keepalive 300
+ :timeout 1000}))
(defn assert-delete!
"Assert a deletion for an entity in Gallifrey based on the provided key."
[host actor type key & [time-dimensions]]
- (kitclient/request {:url (str "https://" host "/" type "/" key)
- :method :delete
- :query-params (into {"actor" actor} time-dimensions)
- :insecure? true
- :keepalive 300
- :timeout 1000}))
+ @(kitclient/request {:url (str "https://" host "/" type "/" key)
+ :method :delete
+ :query-params (into {"actor" actor} time-dimensions)
+ :insecure? true
+ :keepalive 300
+ :timeout 1000}))
(defn assert-gallifrey!
[host actor type payload & [e-logger a-logger]]
@@ -58,7 +58,7 @@
"CREATE" (assert-create! host actor type key body time)
"UPDATE" (assert-update! host actor type key body time)
"DELETE" (assert-delete! host actor type key time))
- {:keys [status body]} @g-assert]
+ {:keys [status body]} g-assert]
(log/info e-logger "RESPONSE" (mapv str [operation key status body]))
(if (and (>= status 200) (<= status 299))
(log/info e-logger "GALLIFREY_ASSERTED" ["SUCCEEDED" (str type) (str key)])
diff --git a/src/chameleon/specs.clj b/src/chameleon/specs.clj
index 630a4a8..4158e09 100644
--- a/src/chameleon/specs.clj
+++ b/src/chameleon/specs.clj
@@ -3,12 +3,17 @@
[chameleon.logging :as log]
[clojure.spec.gen.alpha :as gen]
[cheshire.core :as c]
- [clojure.string :as str]))
+ [clojure.core.async :as ca]
+ [clojure.string :as str]
+ [cheshire.core :as json])
+ (:import [org.apache.kafka.clients.consumer ConsumerRecord]
+ [java.util Properties]))
(s/def ::host string?)
(s/def ::provenance string?)
(s/def ::payload map?)
(s/def ::string (s/spec (s/and string? (complement str/blank?)) :gen gen/string-alphanumeric))
+(s/def ::int (s/spec (s/and int? #(< 0 %) #(> 999999999 %))))
(s/def ::type (s/spec ::string :gen #(gen/elements ["vserver" "pserver" "generic-vnf"])))
(s/def ::id uuid?)
(s/def ::source (s/keys :req-un [::type ::id]))
@@ -38,25 +43,24 @@
:spike/operation :spike/timestamp]))
(s/def :spike/payload (s/spec string? :gen #(gen/fmap (partial c/generate-string)
(s/gen :spike/event))))
-(s/def :gallifrey/k-end-actor ::string)
;; gallifrey response
-(s/def :gallifrey/k-start-actor ::string)
-(s/def :gallifrey/k-end inst?)
-(s/def :gallifrey/k-start inst?)
-(s/def :gallifrey/history (s/keys :req-un [:gallifrey/k-end-actor :gallifrey/k-end
- :gallifrey/k-start-actor :gallifrey/k-start]))
(s/def :relationship/_meta (s/map-of ::string :gallifrey/history))
(s/def :relationship/_type (s/spec string? :gen #(gen/return "relationship")))
-(s/def :gallifrey/_type (s/spec ::string :gen #(gen/return "entity")))
-(s/def :gallifrey/properties (s/keys :req-un [:gallifrey/_type ::type]))
-(s/def :relationship/type (s/spec string? :gen #(->> (gen/string-alphanumeric)
- (gen/such-that (complement str/blank?))
+(s/def :relationship/type (s/spec string? :gen #(->> (s/gen ::string)
(gen/fmap (partial str "tosca.relationship.")))))
(s/def :relationship/properties (s/keys :req-un [:relationship/_type :relationship/type]))
(s/def ::relationship (s/keys :req-un [:relationship/properties ::source
::target :relationship/_meta ::_id]))
(s/def ::relationships (s/coll-of ::relationship :gen-max 8))
+(s/def :gallifrey/k-start-actor ::string)
+(s/def :gallifrey/k-end inst?)
+(s/def :gallifrey/k-start inst?)
+(s/def :gallifrey/k-end-actor ::string)
+(s/def :gallifrey/history (s/keys :req-un [:gallifrey/k-start-actor :gallifrey/k-start]
+ :opt [:gallifrey/k-end-actor :gallifrey/k-end]))
+(s/def :gallifrey/_type (s/spec ::string :gen #(gen/return "entity")))
+(s/def :gallifrey/properties (s/keys :req-un [:gallifrey/_type ::type]))
(s/def :gallifrey/payload (s/spec map?
:gen #(->> [::_id :gallifrey/properties
:gallifrey/properties ::relationships]
@@ -64,8 +68,49 @@
s/gen
(gen/fmap (partial clojure.walk/stringify-keys)))))
+(s/def :gallifrey/response (s/spec ::string))
+
+
+;; REST Requests
+
+(s/def :route/response (s/spec #(instance? clojure.lang.IDeref %) :gen (fn [] (->> (s/gen ::string)
+ (gen/fmap #(future %))))))
+
+(s/def :stubbed/gallifrey-response
+ (s/spec string?))
+
;; Logger specs
(s/def ::logger (s/spec log/logger? :gen #(gen/return (log/error-logger "chameleon.specs"))))
(s/def ::loggers (s/cat :e :chameleon.specs/logger :a :chameleon.specs/logger))
(s/def :logging/msgs (s/* string?))
(s/def :logging/valid-fields log/valid-logfields?)
+
+;; Kafka Specs
+(defn channel?
+ [x]
+ (and (satisfies? clojure.core.async.impl.protocols/Channel x)
+ (satisfies? clojure.core.async.impl.protocols/WritePort x)
+ (satisfies? clojure.core.async.impl.protocols/ReadPort x)))
+
+(s/def :kafka/config (s/map-of ::string ::string))
+(s/def :kafka/timeout ::int)
+(s/def :kafka/chan (s/spec channel? :gen #(gen/return (ca/chan))))
+(s/def :kafka/topic ::string)
+(s/def :kafka/partition ::int)
+(s/def :kafka/offset ::int)
+(s/def :kafka/key ::string)
+(s/def :kafka/value ::string)
+
+(s/def :kafka/consumer-record (s/spec #(instance? ConsumerRecord %)
+ :gen (fn [] (->> (s/cat :topic :kafka/topic
+ :partition :kafka/partition
+ :offset :kafka/offset
+ :key :kafka/key
+ :value :kafka/value)
+ s/gen
+ (gen/fmap (fn [[topic partition offset key value]]
+ (ConsumerRecord. topic partition offset key value)))))))
+
+(s/def :kafka/clojure-consumer-record (s/spec (s/keys :req-un [:kafka/topic :kafka/partition
+ :kafka/offset :kafka/key :kafka/value])))
+(s/def :kafka/properties (s/spec #(instance? Properties %) :gen #(s/tuple keyword? ::string)))