path: root/src
diff options
Diffstat (limited to 'src')
8 files changed, 143 insertions, 22 deletions
diff --git a/src/chameleon/aai_processor.clj b/src/chameleon/aai_processor.clj
index 4243b39..74c6da9 100644
--- a/src/chameleon/aai_processor.clj
+++ b/src/chameleon/aai_processor.clj
@@ -55,7 +55,7 @@
(defn from-spike
"Transforms Spike-based event payloads to a format accepted by Gallifrey for vertices and relationships"
[gallifrey-host payload & [error-logger audit-logger]]
- (let [txpayload (map-keywords (parse-string payload))
+ (let [txpayload (:body (parse-string payload true))
operation (:operation txpayload)
parse-type (if (contains? txpayload :vertex)
diff --git a/src/chameleon/config.clj b/src/chameleon/config.clj
index 10324e2..d5a8406 100644
--- a/src/chameleon/config.clj
+++ b/src/chameleon/config.clj
@@ -15,8 +15,8 @@
:gallifrey-transformer from-gallifrey
:loggers (ig/ref :chameleon/loggers)}
- {:provenance-attr "last-mod-source-of-truth"
- :truth-attr "truth-time"}
+ {:provenance-attr :last-mod-source-of-truth
+ :truth-attr :truth-time}
{:port (:http-port app-config)
:handler (ig/ref :chameleon/handler)}}]
diff --git a/src/chameleon/event.clj b/src/chameleon/event.clj
index 92f4211..2eddf85 100644
--- a/src/chameleon/event.clj
+++ b/src/chameleon/event.clj
@@ -1,11 +1,22 @@
(ns chameleon.event
(:require [integrant.core :as ig]
[clojure.string :refer [starts-with?]]
- [chameleon.logging :as log])
- (:import [org.onap.aai.event.client DMaaPEventConsumer]))
+ [chameleon.logging :as log]
+ [clojure.core.async :as ca]
+ [chameleon.kafka :as ck])
+ (:import [org.onap.aai.event.client DMaaPEventConsumer KafkaEventConsumer]))
+(declare from-dmaap from-kafka)
(defmethod ig/init-key :chameleon/event
[_ {:keys [event-config gallifrey-host loggers]}]
+ (case (:source event-config)
+ :dmaap (from-dmaap event-config gallifrey-host loggers)
+ :kafka (from-kafka event-config gallifrey-host loggers)
+ :error))
+(defn- from-dmaap
+ [event-config gallifrey-host loggers]
(let [{:keys [host topic motsid pass consumer-group consumer-id timeout batch-size type processor]} (:aai event-config)
[error-logger audit-logger] loggers
event-processor (DMaaPEventConsumer. host topic motsid pass consumer-group consumer-id timeout batch-size type)]
@@ -23,3 +34,24 @@
(catch Exception e
(println (str "Unexpected exception during processing: " (.getMessage e)))))))))))))
+(defn- from-kafka
+ [event-config gallifrey-host loggers]
+ (let [{:keys [topic consumer-group processor kafka-config]} (:aai event-config)
+ [error-logger audit-logger] loggers
+ kfc (ck/clj-kafka-consumer kafka-config consumer-group topic)
+ chan (ca/chan 5)
+ error-chan (ck/subscribe kfc chan 30000 "Polling-Kafka-Thread")]
+ (log/info error-logger "EVENT_PROCESSOR"
+ [(format "AAI created. Starting polling a KAFKA Topic '%s' on '%s'" topic (kafka-config "bootstrap.servers"))])
+ (ca/go-loop []
+ (let [recs (ca/<! chan)]
+ (log/mdc-init! "SPIKE-EVENT" "CHAMELEON" "" "" gallifrey-host)
+ (if recs
+ (do (doseq [r recs]
+ (log/info error-logger "EVENT_PROCESSOR" [(str "Processing " (:value r))])
+ (processor gallifrey-host (:value r) error-logger audit-logger)
+ (log/info error-logger "EVENT_PROCESSOR" [(str "Processed Message " (:value r))])
+ (log/mdc-clear!))
+ (recur))
+ (ca/<! error-chan))))))
diff --git a/src/chameleon/handler.clj b/src/chameleon/handler.clj
index 675a34b..453aba6 100644
--- a/src/chameleon/handler.clj
+++ b/src/chameleon/handler.clj
@@ -78,17 +78,3 @@
(-> app-routes
(wrap-defaults api-defaults)
(log-reqs loggers)))
-;;; Implementation
-(defn- serialize
- [e]
- (compact
- (update e :_meta #(map-vals
- (fn [m]
- (map-vals str m)) %))))
-(defn- de-serialize
- [e]
- e)
diff --git a/src/chameleon/kafka.clj b/src/chameleon/kafka.clj
new file mode 100644
index 0000000..ca730a1
--- /dev/null
+++ b/src/chameleon/kafka.clj
@@ -0,0 +1,97 @@
+(ns chameleon.kafka
+ (:refer-clojure :exclude [send])
+ (:require [clojure.core.async :as ca])
+ (:import [java.util Properties]
+ [org.apache.kafka.clients.consumer KafkaConsumer]
+ [org.apache.kafka.clients.consumer ConsumerRecord]
+ [org.apache.kafka.common.serialization StringDeserializer StringSerializer Serdes]))
+(declare consume run-away)
+(defrecord CljKafkaConsumer [config topic])
+;; Public
+(def deser StringDeserializer)
+(def ser StringSerializer)
+(defmacro run-away
+ [name & body]
+ `(try (.start (Thread. (fn [] ~@body) ~name))
+ (catch OutOfMemoryError e#
+ (throw (ex-info {:error "Could not allocate resources for asynchronous execution"})))))
+(defn clj-kafka-consumer
+ "Given a config, group-id, and a topic, return a CljKafkaConsumer"
+ [config group-id topic]
+ (CljKafkaConsumer. (assoc config "group.id" group-id) topic))
+(defn subscribe
+ "Given a CljKafkaConsumer, a channel, and a session timeout (in
+ ms), return a channel. The input channel is where the messages will
+ be published."
+ ([{:keys [config topic] :as conf} channel session-timeout]
+ (subscribe conf channel session-timeout "Kafka-Subscriber-Thread"))
+ ([{:keys [config topic]} channel session-timeout thread-name]
+ (let [control-channel (ca/chan (ca/dropping-buffer 1))]
+ (run-away thread-name (consume config topic channel session-timeout control-channel))
+ control-channel)))
+;; Private
+(defn- consumer-record->clojure
+ [consumer-record]
+ {:topic (.topic ^ConsumerRecord consumer-record)
+ :partition (.partition ^ConsumerRecord consumer-record)
+ :offset (.offset ^ConsumerRecord consumer-record)
+ :key (.key ^ConsumerRecord consumer-record)
+ :value (.value ^ConsumerRecord consumer-record)})
+(defn- send-with-sla-timeout
+ "Consumer Records will always be a sequence. Try to put a message on
+ the channel before the sla-timeout. Return true if a message was put
+ on the channel or when there were no messages to put on the
+ channel. Return false if there is a timeout."
+ [chan sla-timeout messages]
+ (if (seq messages)
+ (let [sent? (ca/alt!! [[chan messages]] :sent
+ (ca/timeout sla-timeout) :timeout)]
+ (if (= sent? :timeout)
+ false
+ true))
+ true))
+(defn- as-properties
+ [m]
+ (reduce (fn [props [k v]]
+ (.setProperty ^Properties props k v)
+ props)
+ (Properties.) m))
+(defn- consume
+ "Consume elements from a topic and put it on the given
+ channel. Block until a given timeout to put messages on the channel
+ or put sla-exception on the control channel (channel returned by
+ this function). Once the message is put on the channel, it is
+ considered safe to update the offset of the kafka consumer. The
+ `sla-timeout` is an agreement such that the data has to be taken off
+ the channel within the given timeout. If failed to take the data,
+ kaka consumer will be disconnected and an exception will be put on
+ the control channel."
+ [config topic chan sla-timeout control-channel]
+ (let [consumer (KafkaConsumer. ^Properties (as-properties config))]
+ (.subscribe ^KafkaConsumer consumer (list topic))
+ (loop []
+ (let [consumer-records (.poll consumer 0)]
+ (when (and (->> consumer-records
+ (map consumer-record->clojure)
+ (send-with-sla-timeout chan sla-timeout))
+ (try (.commitSync consumer)
+ true
+ (catch Exception e
+ (ca/>!! control-channel (.getMessage e))
+ false)))
+ (recur))))
+ (ca/>!! control-channel :sla-timeout)
+ (.unsubscribe consumer)
+ (.close consumer)))
diff --git a/src/chameleon/logging.clj b/src/chameleon/logging.clj
index 15e1e7b..aa43974 100644
--- a/src/chameleon/logging.clj
+++ b/src/chameleon/logging.clj
@@ -76,7 +76,10 @@
:chameleon.specs/logger logger)]
(if (empty? confirmed-specs)
(.info logger (string->enum enum) (logfields fields) (into-array java.lang.String msgs))
- confirmed-specs)))
+ (.info logger (string->enum "ERROR") (logfields fields) (->> confirmed-specs
+ (into ["SPEC ERROR"])
+ (mapv str)
+ (into-array java.lang.String))))))
(defn debug
[^AaiLoggerAdapter logger ^String enum msgs]
diff --git a/src/chameleon/route.clj b/src/chameleon/route.clj
index 0918da4..707c608 100644
--- a/src/chameleon/route.clj
+++ b/src/chameleon/route.clj
@@ -59,6 +59,9 @@
"UPDATE" (assert-update! host actor type key body time)
"DELETE" (assert-delete! host actor type key time))
{:keys [status body]} @g-assert]
- (log/info e-logger "GALLIFREY_ASSERTED" [(str type) (str key)])
+ (log/info e-logger "RESPONSE" (mapv str [operation key status body]))
+ (if (and (>= status 200) (<= status 299))
+ (log/info e-logger "GALLIFREY_ASSERTED" ["SUCCEEDED" (str type) (str key)])
+ (log/info e-logger "GALLIFREY_ASSERTED" ["FAILED" (str type) (str key)]))
(log/info a-logger "RESPONSE" (mapv str [operation key status body])
:fields {:response-code status :response-description (hs/get-name status)})))
diff --git a/src/chameleon/specs.clj b/src/chameleon/specs.clj
index 40d5768..630a4a8 100644
--- a/src/chameleon/specs.clj
+++ b/src/chameleon/specs.clj
@@ -67,5 +67,5 @@
;; Logger specs
(s/def ::logger (s/spec log/logger? :gen #(gen/return (log/error-logger "chameleon.specs"))))
(s/def ::loggers (s/cat :e :chameleon.specs/logger :a :chameleon.specs/logger))
-(s/def :logging/msgs (s/* ::string))
+(s/def :logging/msgs (s/* string?))
(s/def :logging/valid-fields log/valid-logfields?)