aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorShwetank Dave <shwetank.dave@amdocs.com>2018-08-24 13:24:49 -0400
committerShwetank Dave <shwetank.dave@amdocs.com>2018-08-30 14:08:19 -0400
commit525c7d8dbef106245c4fce29a5b6b31b7160b21c (patch)
tree5f24deb3ca219bf7536649d1c89988abb06dbbf3 /src
parente16bda37d76e63e0f903bba13ed1dccf3b17f395 (diff)
Updated log messages to be more intuitive
Added support for kafka Updated logging.clj file to not reject messages which contain an empty substring. Updating Dockerfile to use the specified GLIBC_VERSION Adding spotify docker plugin Moving the Dockerfile to directory root. Issue-ID: AAI-1542 Change-Id: I4c41486cb6c7698a2c2736d574721f6a5237563e Signed-off-by: Shwetank Dave <shwetank.dave@amdocs.com>
Diffstat (limited to 'src')
-rw-r--r--src/chameleon/aai_processor.clj2
-rw-r--r--src/chameleon/config.clj4
-rw-r--r--src/chameleon/event.clj36
-rw-r--r--src/chameleon/handler.clj14
-rw-r--r--src/chameleon/kafka.clj97
-rw-r--r--src/chameleon/logging.clj5
-rw-r--r--src/chameleon/route.clj5
-rw-r--r--src/chameleon/specs.clj2
8 files changed, 143 insertions, 22 deletions
diff --git a/src/chameleon/aai_processor.clj b/src/chameleon/aai_processor.clj
index 4243b39..74c6da9 100644
--- a/src/chameleon/aai_processor.clj
+++ b/src/chameleon/aai_processor.clj
@@ -55,7 +55,7 @@
(defn from-spike
"Transforms Spike-based event payloads to a format accepted by Gallifrey for vertices and relationships"
[gallifrey-host payload & [error-logger audit-logger]]
- (let [txpayload (map-keywords (parse-string payload))
+ (let [txpayload (:body (parse-string payload true))
operation (:operation txpayload)
parse-type (if (contains? txpayload :vertex)
:vertex
diff --git a/src/chameleon/config.clj b/src/chameleon/config.clj
index 10324e2..d5a8406 100644
--- a/src/chameleon/config.clj
+++ b/src/chameleon/config.clj
@@ -15,8 +15,8 @@
:gallifrey-transformer from-gallifrey
:loggers (ig/ref :chameleon/loggers)}
:chameleon/aai-processor
- {:provenance-attr "last-mod-source-of-truth"
- :truth-attr "truth-time"}
+ {:provenance-attr :last-mod-source-of-truth
+ :truth-attr :truth-time}
:chameleon/http-server
{:port (:http-port app-config)
:handler (ig/ref :chameleon/handler)}}]
diff --git a/src/chameleon/event.clj b/src/chameleon/event.clj
index 92f4211..2eddf85 100644
--- a/src/chameleon/event.clj
+++ b/src/chameleon/event.clj
@@ -1,11 +1,22 @@
(ns chameleon.event
(:require [integrant.core :as ig]
[clojure.string :refer [starts-with?]]
- [chameleon.logging :as log])
- (:import [org.onap.aai.event.client DMaaPEventConsumer]))
+ [chameleon.logging :as log]
+ [clojure.core.async :as ca]
+ [chameleon.kafka :as ck])
+ (:import [org.onap.aai.event.client DMaaPEventConsumer KafkaEventConsumer]))
+
+(declare from-dmaap from-kafka)
(defmethod ig/init-key :chameleon/event
[_ {:keys [event-config gallifrey-host loggers]}]
+ (case (:source event-config)
+ :dmaap (from-dmaap event-config gallifrey-host loggers)
+ :kafka (from-kafka event-config gallifrey-host loggers)
+ :error))
+
+(defn- from-dmaap
+ [event-config gallifrey-host loggers]
(let [{:keys [host topic motsid pass consumer-group consumer-id timeout batch-size type processor]} (:aai event-config)
[error-logger audit-logger] loggers
event-processor (DMaaPEventConsumer. host topic motsid pass consumer-group consumer-id timeout batch-size type)]
@@ -23,3 +34,24 @@
(log/mdc-clear!)))
(catch Exception e
(println (str "Unexpected exception during processing: " (.getMessage e)))))))))))))
+
+(defn- from-kafka
+ [event-config gallifrey-host loggers]
+ (let [{:keys [topic consumer-group processor kafka-config]} (:aai event-config)
+ [error-logger audit-logger] loggers
+ kfc (ck/clj-kafka-consumer kafka-config consumer-group topic)
+ chan (ca/chan 5)
+ error-chan (ck/subscribe kfc chan 30000 "Polling-Kafka-Thread")]
+ (log/info error-logger "EVENT_PROCESSOR"
+ [(format "AAI created. Starting polling a KAFKA Topic '%s' on '%s'" topic (kafka-config "bootstrap.servers"))])
+ (ca/go-loop []
+ (let [recs (ca/<! chan)]
+ (log/mdc-init! "SPIKE-EVENT" "CHAMELEON" "" "" gallifrey-host)
+ (if recs
+ (do (doseq [r recs]
+ (log/info error-logger "EVENT_PROCESSOR" [(str "Processing " (:value r))])
+ (processor gallifrey-host (:value r) error-logger audit-logger)
+ (log/info error-logger "EVENT_PROCESSOR" [(str "Processed Message " (:value r))])
+ (log/mdc-clear!))
+ (recur))
+ (ca/<! error-chan))))))
diff --git a/src/chameleon/handler.clj b/src/chameleon/handler.clj
index 675a34b..453aba6 100644
--- a/src/chameleon/handler.clj
+++ b/src/chameleon/handler.clj
@@ -78,17 +78,3 @@
(-> app-routes
(wrap-defaults api-defaults)
(log-reqs loggers)))
-
-
-;;; Implementation
-
-(defn- serialize
- [e]
- (compact
- (update e :_meta #(map-vals
- (fn [m]
- (map-vals str m)) %))))
-
-(defn- de-serialize
- [e]
- e)
diff --git a/src/chameleon/kafka.clj b/src/chameleon/kafka.clj
new file mode 100644
index 0000000..ca730a1
--- /dev/null
+++ b/src/chameleon/kafka.clj
@@ -0,0 +1,97 @@
+(ns chameleon.kafka
+ (:refer-clojure :exclude [send])
+ (:require [clojure.core.async :as ca])
+ (:import [java.util Properties]
+ [org.apache.kafka.clients.consumer KafkaConsumer]
+ [org.apache.kafka.clients.consumer ConsumerRecord]
+ [org.apache.kafka.common.serialization StringDeserializer StringSerializer Serdes]))
+
+(declare consume run-away)
+
+(defrecord CljKafkaConsumer [config topic])
+
+
+;; Public
+(def deser StringDeserializer)
+(def ser StringSerializer)
+
+(defmacro run-away
+ [name & body]
+ `(try (.start (Thread. (fn [] ~@body) ~name))
+ (catch OutOfMemoryError e#
+ (throw (ex-info {:error "Could not allocate resources for asynchronous execution"})))))
+
+(defn clj-kafka-consumer
+ "Given a config, group-id, and a topic, return a CljKafkaConsumer"
+ [config group-id topic]
+ (CljKafkaConsumer. (assoc config "group.id" group-id) topic))
+
+(defn subscribe
+ "Given a CljKafkaConsumer, a channel, and a session timeout (in
+ ms), return a channel. The input channel is where the messages will
+ be published."
+ ([{:keys [config topic] :as conf} channel session-timeout]
+ (subscribe conf channel session-timeout "Kafka-Subscriber-Thread"))
+ ([{:keys [config topic]} channel session-timeout thread-name]
+ (let [control-channel (ca/chan (ca/dropping-buffer 1))]
+ (run-away thread-name (consume config topic channel session-timeout control-channel))
+ control-channel)))
+
+;; Private
+
+(defn- consumer-record->clojure
+ [consumer-record]
+ {:topic (.topic ^ConsumerRecord consumer-record)
+ :partition (.partition ^ConsumerRecord consumer-record)
+ :offset (.offset ^ConsumerRecord consumer-record)
+ :key (.key ^ConsumerRecord consumer-record)
+ :value (.value ^ConsumerRecord consumer-record)})
+
+(defn- send-with-sla-timeout
+ "Consumer Records will always be a sequence. Try to put a message on
+ the channel before the sla-timeout. Return true if a message was put
+ on the channel or when there were no messages to put on the
+ channel. Return false if there is a timeout."
+ [chan sla-timeout messages]
+ (if (seq messages)
+ (let [sent? (ca/alt!! [[chan messages]] :sent
+ (ca/timeout sla-timeout) :timeout)]
+ (if (= sent? :timeout)
+ false
+ true))
+ true))
+
+(defn- as-properties
+ [m]
+ (reduce (fn [props [k v]]
+ (.setProperty ^Properties props k v)
+ props)
+ (Properties.) m))
+
+(defn- consume
+ "Consume elements from a topic and put it on the given
+ channel. Block until a given timeout to put messages on the channel
+ or put sla-exception on the control channel (channel returned by
+ this function). Once the message is put on the channel, it is
+ considered safe to update the offset of the kafka consumer. The
+ `sla-timeout` is an agreement such that the data has to be taken off
+ the channel within the given timeout. If failed to take the data,
+ kaka consumer will be disconnected and an exception will be put on
+ the control channel."
+ [config topic chan sla-timeout control-channel]
+ (let [consumer (KafkaConsumer. ^Properties (as-properties config))]
+ (.subscribe ^KafkaConsumer consumer (list topic))
+ (loop []
+ (let [consumer-records (.poll consumer 0)]
+ (when (and (->> consumer-records
+ (map consumer-record->clojure)
+ (send-with-sla-timeout chan sla-timeout))
+ (try (.commitSync consumer)
+ true
+ (catch Exception e
+ (ca/>!! control-channel (.getMessage e))
+ false)))
+ (recur))))
+ (ca/>!! control-channel :sla-timeout)
+ (.unsubscribe consumer)
+ (.close consumer)))
diff --git a/src/chameleon/logging.clj b/src/chameleon/logging.clj
index 15e1e7b..aa43974 100644
--- a/src/chameleon/logging.clj
+++ b/src/chameleon/logging.clj
@@ -76,7 +76,10 @@
:chameleon.specs/logger logger)]
(if (empty? confirmed-specs)
(.info logger (string->enum enum) (logfields fields) (into-array java.lang.String msgs))
- confirmed-specs)))
+ (.info logger (string->enum "ERROR") (logfields fields) (->> confirmed-specs
+ (into ["SPEC ERROR"])
+ (mapv str)
+ (into-array java.lang.String))))))
(defn debug
[^AaiLoggerAdapter logger ^String enum msgs]
diff --git a/src/chameleon/route.clj b/src/chameleon/route.clj
index 0918da4..707c608 100644
--- a/src/chameleon/route.clj
+++ b/src/chameleon/route.clj
@@ -59,6 +59,9 @@
"UPDATE" (assert-update! host actor type key body time)
"DELETE" (assert-delete! host actor type key time))
{:keys [status body]} @g-assert]
- (log/info e-logger "GALLIFREY_ASSERTED" [(str type) (str key)])
+ (log/info e-logger "RESPONSE" (mapv str [operation key status body]))
+ (if (and (>= status 200) (<= status 299))
+ (log/info e-logger "GALLIFREY_ASSERTED" ["SUCCEEDED" (str type) (str key)])
+ (log/info e-logger "GALLIFREY_ASSERTED" ["FAILED" (str type) (str key)]))
(log/info a-logger "RESPONSE" (mapv str [operation key status body])
:fields {:response-code status :response-description (hs/get-name status)})))
diff --git a/src/chameleon/specs.clj b/src/chameleon/specs.clj
index 40d5768..630a4a8 100644
--- a/src/chameleon/specs.clj
+++ b/src/chameleon/specs.clj
@@ -67,5 +67,5 @@
;; Logger specs
(s/def ::logger (s/spec log/logger? :gen #(gen/return (log/error-logger "chameleon.specs"))))
(s/def ::loggers (s/cat :e :chameleon.specs/logger :a :chameleon.specs/logger))
-(s/def :logging/msgs (s/* ::string))
+(s/def :logging/msgs (s/* string?))
(s/def :logging/valid-fields log/valid-logfields?)