diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/chameleon/aai_processor.clj | 2 | ||||
-rw-r--r-- | src/chameleon/config.clj | 4 | ||||
-rw-r--r-- | src/chameleon/event.clj | 36 | ||||
-rw-r--r-- | src/chameleon/handler.clj | 14 | ||||
-rw-r--r-- | src/chameleon/kafka.clj | 97 | ||||
-rw-r--r-- | src/chameleon/logging.clj | 5 | ||||
-rw-r--r-- | src/chameleon/route.clj | 5 | ||||
-rw-r--r-- | src/chameleon/specs.clj | 2 |
8 files changed, 143 insertions, 22 deletions
diff --git a/src/chameleon/aai_processor.clj b/src/chameleon/aai_processor.clj index 4243b39..74c6da9 100644 --- a/src/chameleon/aai_processor.clj +++ b/src/chameleon/aai_processor.clj @@ -55,7 +55,7 @@ (defn from-spike "Transforms Spike-based event payloads to a format accepted by Gallifrey for vertices and relationships" [gallifrey-host payload & [error-logger audit-logger]] - (let [txpayload (map-keywords (parse-string payload)) + (let [txpayload (:body (parse-string payload true)) operation (:operation txpayload) parse-type (if (contains? txpayload :vertex) :vertex diff --git a/src/chameleon/config.clj b/src/chameleon/config.clj index 10324e2..d5a8406 100644 --- a/src/chameleon/config.clj +++ b/src/chameleon/config.clj @@ -15,8 +15,8 @@ :gallifrey-transformer from-gallifrey :loggers (ig/ref :chameleon/loggers)} :chameleon/aai-processor - {:provenance-attr "last-mod-source-of-truth" - :truth-attr "truth-time"} + {:provenance-attr :last-mod-source-of-truth + :truth-attr :truth-time} :chameleon/http-server {:port (:http-port app-config) :handler (ig/ref :chameleon/handler)}}] diff --git a/src/chameleon/event.clj b/src/chameleon/event.clj index 92f4211..2eddf85 100644 --- a/src/chameleon/event.clj +++ b/src/chameleon/event.clj @@ -1,11 +1,22 @@ (ns chameleon.event (:require [integrant.core :as ig] [clojure.string :refer [starts-with?]] - [chameleon.logging :as log]) - (:import [org.onap.aai.event.client DMaaPEventConsumer])) + [chameleon.logging :as log] + [clojure.core.async :as ca] + [chameleon.kafka :as ck]) + (:import [org.onap.aai.event.client DMaaPEventConsumer KafkaEventConsumer])) + +(declare from-dmaap from-kafka) (defmethod ig/init-key :chameleon/event [_ {:keys [event-config gallifrey-host loggers]}] + (case (:source event-config) + :dmaap (from-dmaap event-config gallifrey-host loggers) + :kafka (from-kafka event-config gallifrey-host loggers) + :error)) + +(defn- from-dmaap + [event-config gallifrey-host loggers] (let [{:keys [host topic motsid pass consumer-group consumer-id timeout batch-size type processor]} (:aai event-config) [error-logger audit-logger] loggers event-processor (DMaaPEventConsumer. host topic motsid pass consumer-group consumer-id timeout batch-size type)] @@ -23,3 +34,24 @@ (log/mdc-clear!))) (catch Exception e (println (str "Unexpected exception during processing: " (.getMessage e))))))))))))) + +(defn- from-kafka + [event-config gallifrey-host loggers] + (let [{:keys [topic consumer-group processor kafka-config]} (:aai event-config) + [error-logger audit-logger] loggers + kfc (ck/clj-kafka-consumer kafka-config consumer-group topic) + chan (ca/chan 5) + error-chan (ck/subscribe kfc chan 30000 "Polling-Kafka-Thread")] + (log/info error-logger "EVENT_PROCESSOR" + [(format "AAI created. Starting polling a KAFKA Topic '%s' on '%s'" topic (kafka-config "bootstrap.servers"))]) + (ca/go-loop [] + (let [recs (ca/<! chan)] + (log/mdc-init! "SPIKE-EVENT" "CHAMELEON" "" "" gallifrey-host) + (if recs + (do (doseq [r recs] + (log/info error-logger "EVENT_PROCESSOR" [(str "Processing " (:value r))]) + (processor gallifrey-host (:value r) error-logger audit-logger) + (log/info error-logger "EVENT_PROCESSOR" [(str "Processed Message " (:value r))]) + (log/mdc-clear!)) + (recur)) + (ca/<! error-chan)))))) diff --git a/src/chameleon/handler.clj b/src/chameleon/handler.clj index 675a34b..453aba6 100644 --- a/src/chameleon/handler.clj +++ b/src/chameleon/handler.clj @@ -78,17 +78,3 @@ (-> app-routes (wrap-defaults api-defaults) (log-reqs loggers))) - - -;;; Implementation - -(defn- serialize - [e] - (compact - (update e :_meta #(map-vals - (fn [m] - (map-vals str m)) %)))) - -(defn- de-serialize - [e] - e) diff --git a/src/chameleon/kafka.clj b/src/chameleon/kafka.clj new file mode 100644 index 0000000..ca730a1 --- /dev/null +++ b/src/chameleon/kafka.clj @@ -0,0 +1,97 @@ +(ns chameleon.kafka + (:refer-clojure :exclude [send]) + (:require [clojure.core.async :as ca]) + (:import [java.util Properties] + [org.apache.kafka.clients.consumer KafkaConsumer] + [org.apache.kafka.clients.consumer ConsumerRecord] + [org.apache.kafka.common.serialization StringDeserializer StringSerializer Serdes])) + +(declare consume run-away) + +(defrecord CljKafkaConsumer [config topic]) + + +;; Public +(def deser StringDeserializer) +(def ser StringSerializer) + +(defmacro run-away + [name & body] + `(try (.start (Thread. (fn [] ~@body) ~name)) + (catch OutOfMemoryError e# + (throw (ex-info {:error "Could not allocate resources for asynchronous execution"}))))) + +(defn clj-kafka-consumer + "Given a config, group-id, and a topic, return a CljKafkaConsumer" + [config group-id topic] + (CljKafkaConsumer. (assoc config "group.id" group-id) topic)) + +(defn subscribe + "Given a CljKafkaConsumer, a channel, and a session timeout (in + ms), return a channel. The input channel is where the messages will + be published." + ([{:keys [config topic] :as conf} channel session-timeout] + (subscribe conf channel session-timeout "Kafka-Subscriber-Thread")) + ([{:keys [config topic]} channel session-timeout thread-name] + (let [control-channel (ca/chan (ca/dropping-buffer 1))] + (run-away thread-name (consume config topic channel session-timeout control-channel)) + control-channel))) + +;; Private + +(defn- consumer-record->clojure + [consumer-record] + {:topic (.topic ^ConsumerRecord consumer-record) + :partition (.partition ^ConsumerRecord consumer-record) + :offset (.offset ^ConsumerRecord consumer-record) + :key (.key ^ConsumerRecord consumer-record) + :value (.value ^ConsumerRecord consumer-record)}) + +(defn- send-with-sla-timeout + "Consumer Records will always be a sequence. Try to put a message on + the channel before the sla-timeout. Return true if a message was put + on the channel or when there were no messages to put on the + channel. Return false if there is a timeout." + [chan sla-timeout messages] + (if (seq messages) + (let [sent? (ca/alt!! [[chan messages]] :sent + (ca/timeout sla-timeout) :timeout)] + (if (= sent? :timeout) + false + true)) + true)) + +(defn- as-properties + [m] + (reduce (fn [props [k v]] + (.setProperty ^Properties props k v) + props) + (Properties.) m)) + +(defn- consume + "Consume elements from a topic and put it on the given + channel. Block until a given timeout to put messages on the channel + or put sla-exception on the control channel (channel returned by + this function). Once the message is put on the channel, it is + considered safe to update the offset of the kafka consumer. The + `sla-timeout` is an agreement such that the data has to be taken off + the channel within the given timeout. If failed to take the data, + kaka consumer will be disconnected and an exception will be put on + the control channel." + [config topic chan sla-timeout control-channel] + (let [consumer (KafkaConsumer. ^Properties (as-properties config))] + (.subscribe ^KafkaConsumer consumer (list topic)) + (loop [] + (let [consumer-records (.poll consumer 0)] + (when (and (->> consumer-records + (map consumer-record->clojure) + (send-with-sla-timeout chan sla-timeout)) + (try (.commitSync consumer) + true + (catch Exception e + (ca/>!! control-channel (.getMessage e)) + false))) + (recur)))) + (ca/>!! control-channel :sla-timeout) + (.unsubscribe consumer) + (.close consumer))) diff --git a/src/chameleon/logging.clj b/src/chameleon/logging.clj index 15e1e7b..aa43974 100644 --- a/src/chameleon/logging.clj +++ b/src/chameleon/logging.clj @@ -76,7 +76,10 @@ :chameleon.specs/logger logger)] (if (empty? confirmed-specs) (.info logger (string->enum enum) (logfields fields) (into-array java.lang.String msgs)) - confirmed-specs))) + (.info logger (string->enum "ERROR") (logfields fields) (->> confirmed-specs + (into ["SPEC ERROR"]) + (mapv str) + (into-array java.lang.String)))))) (defn debug [^AaiLoggerAdapter logger ^String enum msgs] diff --git a/src/chameleon/route.clj b/src/chameleon/route.clj index 0918da4..707c608 100644 --- a/src/chameleon/route.clj +++ b/src/chameleon/route.clj @@ -59,6 +59,9 @@ "UPDATE" (assert-update! host actor type key body time) "DELETE" (assert-delete! host actor type key time)) {:keys [status body]} @g-assert] - (log/info e-logger "GALLIFREY_ASSERTED" [(str type) (str key)]) + (log/info e-logger "RESPONSE" (mapv str [operation key status body])) + (if (and (>= status 200) (<= status 299)) + (log/info e-logger "GALLIFREY_ASSERTED" ["SUCCEEDED" (str type) (str key)]) + (log/info e-logger "GALLIFREY_ASSERTED" ["FAILED" (str type) (str key)])) (log/info a-logger "RESPONSE" (mapv str [operation key status body]) :fields {:response-code status :response-description (hs/get-name status)}))) diff --git a/src/chameleon/specs.clj b/src/chameleon/specs.clj index 40d5768..630a4a8 100644 --- a/src/chameleon/specs.clj +++ b/src/chameleon/specs.clj @@ -67,5 +67,5 @@ ;; Logger specs (s/def ::logger (s/spec log/logger? :gen #(gen/return (log/error-logger "chameleon.specs")))) (s/def ::loggers (s/cat :e :chameleon.specs/logger :a :chameleon.specs/logger)) -(s/def :logging/msgs (s/* ::string)) +(s/def :logging/msgs (s/* string?)) (s/def :logging/valid-fields log/valid-logfields?) |