aboutsummaryrefslogtreecommitdiffstats
path: root/src/chameleon/kafka.clj
diff options
context:
space:
mode:
authorShwetank Dave <shwetank.dave@amdocs.com>2018-08-24 13:24:49 -0400
committerShwetank Dave <shwetank.dave@amdocs.com>2018-08-30 14:08:19 -0400
commit525c7d8dbef106245c4fce29a5b6b31b7160b21c (patch)
tree5f24deb3ca219bf7536649d1c89988abb06dbbf3 /src/chameleon/kafka.clj
parente16bda37d76e63e0f903bba13ed1dccf3b17f395 (diff)
Updated log messages to be more intuitive
Added support for kafka Updated logging.clj file to not reject messages which contain an empty substring. Updating Dockerfile to use the specified GLIBC_VERSION Adding spotify docker plugin Moving the Dockerfile to directory root. Issue-ID: AAI-1542 Change-Id: I4c41486cb6c7698a2c2736d574721f6a5237563e Signed-off-by: Shwetank Dave <shwetank.dave@amdocs.com>
Diffstat (limited to 'src/chameleon/kafka.clj')
-rw-r--r--src/chameleon/kafka.clj97
1 files changed, 97 insertions, 0 deletions
diff --git a/src/chameleon/kafka.clj b/src/chameleon/kafka.clj
new file mode 100644
index 0000000..ca730a1
--- /dev/null
+++ b/src/chameleon/kafka.clj
@@ -0,0 +1,97 @@
+(ns chameleon.kafka
+ (:refer-clojure :exclude [send])
+ (:require [clojure.core.async :as ca])
+ (:import [java.util Properties]
+ [org.apache.kafka.clients.consumer KafkaConsumer]
+ [org.apache.kafka.clients.consumer ConsumerRecord]
+ [org.apache.kafka.common.serialization StringDeserializer StringSerializer Serdes]))
+
+(declare consume run-away)
+
+(defrecord CljKafkaConsumer [config topic])
+
+
+;; Public
+(def deser StringDeserializer)
+(def ser StringSerializer)
+
+(defmacro run-away
+ [name & body]
+ `(try (.start (Thread. (fn [] ~@body) ~name))
+ (catch OutOfMemoryError e#
+ (throw (ex-info {:error "Could not allocate resources for asynchronous execution"})))))
+
+(defn clj-kafka-consumer
+ "Given a config, group-id, and a topic, return a CljKafkaConsumer"
+ [config group-id topic]
+ (CljKafkaConsumer. (assoc config "group.id" group-id) topic))
+
+(defn subscribe
+ "Given a CljKafkaConsumer, a channel, and a session timeout (in
+ ms), return a channel. The input channel is where the messages will
+ be published."
+ ([{:keys [config topic] :as conf} channel session-timeout]
+ (subscribe conf channel session-timeout "Kafka-Subscriber-Thread"))
+ ([{:keys [config topic]} channel session-timeout thread-name]
+ (let [control-channel (ca/chan (ca/dropping-buffer 1))]
+ (run-away thread-name (consume config topic channel session-timeout control-channel))
+ control-channel)))
+
+;; Private
+
+(defn- consumer-record->clojure
+ [consumer-record]
+ {:topic (.topic ^ConsumerRecord consumer-record)
+ :partition (.partition ^ConsumerRecord consumer-record)
+ :offset (.offset ^ConsumerRecord consumer-record)
+ :key (.key ^ConsumerRecord consumer-record)
+ :value (.value ^ConsumerRecord consumer-record)})
+
+(defn- send-with-sla-timeout
+ "Consumer Records will always be a sequence. Try to put a message on
+ the channel before the sla-timeout. Return true if a message was put
+ on the channel or when there were no messages to put on the
+ channel. Return false if there is a timeout."
+ [chan sla-timeout messages]
+ (if (seq messages)
+ (let [sent? (ca/alt!! [[chan messages]] :sent
+ (ca/timeout sla-timeout) :timeout)]
+ (if (= sent? :timeout)
+ false
+ true))
+ true))
+
+(defn- as-properties
+ [m]
+ (reduce (fn [props [k v]]
+ (.setProperty ^Properties props k v)
+ props)
+ (Properties.) m))
+
+(defn- consume
+ "Consume elements from a topic and put it on the given
+ channel. Block until a given timeout to put messages on the channel
+ or put sla-exception on the control channel (channel returned by
+ this function). Once the message is put on the channel, it is
+ considered safe to update the offset of the kafka consumer. The
+ `sla-timeout` is an agreement such that the data has to be taken off
+ the channel within the given timeout. If failed to take the data,
+ kaka consumer will be disconnected and an exception will be put on
+ the control channel."
+ [config topic chan sla-timeout control-channel]
+ (let [consumer (KafkaConsumer. ^Properties (as-properties config))]
+ (.subscribe ^KafkaConsumer consumer (list topic))
+ (loop []
+ (let [consumer-records (.poll consumer 0)]
+ (when (and (->> consumer-records
+ (map consumer-record->clojure)
+ (send-with-sla-timeout chan sla-timeout))
+ (try (.commitSync consumer)
+ true
+ (catch Exception e
+ (ca/>!! control-channel (.getMessage e))
+ false)))
+ (recur))))
+ (ca/>!! control-channel :sla-timeout)
+ (.unsubscribe consumer)
+ (.close consumer)))