blob: 2b32d2698fa1211a07b1a9dbd8f57f9af095426a (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
(ns chameleon.event
(:require [chameleon.txform]
[chameleon.route]
[integrant.core :as ig]
[clojure.string :refer [starts-with?]])
(:import [org.onap.aai.event.client DMaaPEventConsumer]))
(defmethod ig/init-key :chameleon/event
[_ {:keys [event-config gallifrey-host]}]
(let [{:keys [host topic motsid pass consumer-group consumer-id timeout batch-size type processor]} (:aai event-config)
event-processor (DMaaPEventConsumer. host topic motsid pass consumer-group consumer-id timeout batch-size type)]
(println "Event processor for AAI created. Starting event polling on " host topic)
(.start (Thread. (fn [] (while true
(let [it (.iterator (.consume event-processor))]
(println "Polling...")
(while (.hasNext it)
(try (let [event (.next it)]
(if (not (starts-with? event "DMAAP")) ;Temporarily added for current version of dmaap client
(processor gallifrey-host event)))
(catch Exception e (println (str "Unexpected exception during processing: " (.getMessage e)))))))))))
))
|