diff options
author | C.T. Paterson <ctpaterson+onap@gmail.com> | 2018-09-04 14:05:24 -0400 |
---|---|---|
committer | C.T. Paterson <ctpaterson+onap@gmail.com> | 2018-09-04 14:49:55 -0400 |
commit | d0dceef6a3504a376cc689afc119ef15a56b5f7a (patch) | |
tree | 6403bf7e70a014860e426589bdeec9fc11401607 /src/chameleon/kafka.clj | |
parent | 1586036fd345c240982237b76469a1f5d04d122d (diff) |
Adding version.properties for build job, tests
Issue-ID: AAI-1547
Change-Id: I70ecf0f427eb30da8df1ec3e2a0cec383e0063fb
Signed-off-by: C.T. Paterson <ctpaterson+onap@gmail.com>
Diffstat (limited to 'src/chameleon/kafka.clj')
-rw-r--r-- | src/chameleon/kafka.clj | 54 |
1 files changed, 33 insertions, 21 deletions
diff --git a/src/chameleon/kafka.clj b/src/chameleon/kafka.clj index ca730a1..ae8e77f 100644 --- a/src/chameleon/kafka.clj +++ b/src/chameleon/kafka.clj @@ -1,6 +1,9 @@ (ns chameleon.kafka (:refer-clojure :exclude [send]) - (:require [clojure.core.async :as ca]) + (:require [clojure.core.async :as ca] + [clojure.spec.alpha :as s] + [chameleon.core :as core] + [chameleon.logging :as log]) (:import [java.util Properties] [org.apache.kafka.clients.consumer KafkaConsumer] [org.apache.kafka.clients.consumer ConsumerRecord] @@ -15,7 +18,7 @@ (def deser StringDeserializer) (def ser StringSerializer) -(defmacro run-away +(defmacro threaded [name & body] `(try (.start (Thread. (fn [] ~@body) ~name)) (catch OutOfMemoryError e# @@ -23,24 +26,39 @@ (defn clj-kafka-consumer "Given a config, group-id, and a topic, return a CljKafkaConsumer" - [config group-id topic] - (CljKafkaConsumer. (assoc config "group.id" group-id) topic)) + [config group-id topic error-logger] + (let [valid? (core/conform-multiple :kafka/config config + :chameleon.specs/string group-id + :chameleon.specs/string topic)] + (if-not (seq valid?) + (CljKafkaConsumer. (assoc config "group.id" group-id) topic) + (log/info error-logger "ERROR" (->> valid? + (into ["SPEC ERROR"]) + (mapv str)))))) (defn subscribe "Given a CljKafkaConsumer, a channel, and a session timeout (in ms), return a channel. The input channel is where the messages will be published." - ([{:keys [config topic] :as conf} channel session-timeout] + ([{:keys [config topic] :as conf} channel session-timeout error-logger] (subscribe conf channel session-timeout "Kafka-Subscriber-Thread")) - ([{:keys [config topic]} channel session-timeout thread-name] - (let [control-channel (ca/chan (ca/dropping-buffer 1))] - (run-away thread-name (consume config topic channel session-timeout control-channel)) - control-channel))) + ([{:keys [config topic]} channel session-timeout thread-name error-logger] + (let [valid? (core/conform-multiple :kafka/timeout session-timeout + :kafka/chan channel + :kafka/config config + :chameleon.specs/string topic) + control-channel (ca/chan (ca/dropping-buffer 1))] + (if-not (seq valid?) + (do (threaded thread-name (consume config topic channel session-timeout control-channel)) + control-channel) + (log/info error-logger "ERROR" (->> valid? + (into ["SPEC ERROR"]) + (mapv str))))))) ;; Private (defn- consumer-record->clojure - [consumer-record] + [^ConsumerRecord consumer-record] {:topic (.topic ^ConsumerRecord consumer-record) :partition (.partition ^ConsumerRecord consumer-record) :offset (.offset ^ConsumerRecord consumer-record) @@ -61,13 +79,6 @@ true)) true)) -(defn- as-properties - [m] - (reduce (fn [props [k v]] - (.setProperty ^Properties props k v) - props) - (Properties.) m)) - (defn- consume "Consume elements from a topic and put it on the given channel. Block until a given timeout to put messages on the channel @@ -79,13 +90,14 @@ kaka consumer will be disconnected and an exception will be put on the control channel." [config topic chan sla-timeout control-channel] - (let [consumer (KafkaConsumer. ^Properties (as-properties config))] + (let [consumer (KafkaConsumer. config)] (.subscribe ^KafkaConsumer consumer (list topic)) (loop [] (let [consumer-records (.poll consumer 0)] - (when (and (->> consumer-records - (map consumer-record->clojure) - (send-with-sla-timeout chan sla-timeout)) + (when (and (->> consumer-records + (s/conform :kafka/consumer-record) + (map consumer-record->clojure) + (send-with-sla-timeout chan sla-timeout)) (try (.commitSync consumer) true (catch Exception e |