aboutsummaryrefslogtreecommitdiffstats
path: root/src/chameleon/event.clj
diff options
context:
space:
mode:
authorShwetank Dave <shwetank.dave@amdocs.com>2018-08-24 13:24:49 -0400
committerShwetank Dave <shwetank.dave@amdocs.com>2018-08-30 14:08:19 -0400
commit525c7d8dbef106245c4fce29a5b6b31b7160b21c (patch)
tree5f24deb3ca219bf7536649d1c89988abb06dbbf3 /src/chameleon/event.clj
parente16bda37d76e63e0f903bba13ed1dccf3b17f395 (diff)
Updated log messages to be more intuitive
Added support for kafka Updated logging.clj file to not reject messages which contain an empty substring. Updating Dockerfile to use the specified GLIBC_VERSION Adding spotify docker plugin Moving the Dockerfile to directory root. Issue-ID: AAI-1542 Change-Id: I4c41486cb6c7698a2c2736d574721f6a5237563e Signed-off-by: Shwetank Dave <shwetank.dave@amdocs.com>
Diffstat (limited to 'src/chameleon/event.clj')
-rw-r--r--src/chameleon/event.clj36
1 files changed, 34 insertions, 2 deletions
diff --git a/src/chameleon/event.clj b/src/chameleon/event.clj
index 92f4211..2eddf85 100644
--- a/src/chameleon/event.clj
+++ b/src/chameleon/event.clj
@@ -1,11 +1,22 @@
(ns chameleon.event
(:require [integrant.core :as ig]
[clojure.string :refer [starts-with?]]
- [chameleon.logging :as log])
- (:import [org.onap.aai.event.client DMaaPEventConsumer]))
+ [chameleon.logging :as log]
+ [clojure.core.async :as ca]
+ [chameleon.kafka :as ck])
+ (:import [org.onap.aai.event.client DMaaPEventConsumer KafkaEventConsumer]))
+
+(declare from-dmaap from-kafka)
(defmethod ig/init-key :chameleon/event
[_ {:keys [event-config gallifrey-host loggers]}]
+ (case (:source event-config)
+ :dmaap (from-dmaap event-config gallifrey-host loggers)
+ :kafka (from-kafka event-config gallifrey-host loggers)
+ :error))
+
+(defn- from-dmaap
+ [event-config gallifrey-host loggers]
(let [{:keys [host topic motsid pass consumer-group consumer-id timeout batch-size type processor]} (:aai event-config)
[error-logger audit-logger] loggers
event-processor (DMaaPEventConsumer. host topic motsid pass consumer-group consumer-id timeout batch-size type)]
@@ -23,3 +34,24 @@
(log/mdc-clear!)))
(catch Exception e
(println (str "Unexpected exception during processing: " (.getMessage e)))))))))))))
+
+(defn- from-kafka
+ [event-config gallifrey-host loggers]
+ (let [{:keys [topic consumer-group processor kafka-config]} (:aai event-config)
+ [error-logger audit-logger] loggers
+ kfc (ck/clj-kafka-consumer kafka-config consumer-group topic)
+ chan (ca/chan 5)
+ error-chan (ck/subscribe kfc chan 30000 "Polling-Kafka-Thread")]
+ (log/info error-logger "EVENT_PROCESSOR"
+ [(format "AAI created. Starting polling a KAFKA Topic '%s' on '%s'" topic (kafka-config "bootstrap.servers"))])
+ (ca/go-loop []
+ (let [recs (ca/<! chan)]
+ (log/mdc-init! "SPIKE-EVENT" "CHAMELEON" "" "" gallifrey-host)
+ (if recs
+ (do (doseq [r recs]
+ (log/info error-logger "EVENT_PROCESSOR" [(str "Processing " (:value r))])
+ (processor gallifrey-host (:value r) error-logger audit-logger)
+ (log/info error-logger "EVENT_PROCESSOR" [(str "Processed Message " (:value r))])
+ (log/mdc-clear!))
+ (recur))
+ (ca/<! error-chan))))))