diff options
Diffstat (limited to 'src/chameleon/kafka.clj')
-rw-r--r-- | src/chameleon/kafka.clj | 97 |
1 files changed, 97 insertions, 0 deletions
diff --git a/src/chameleon/kafka.clj b/src/chameleon/kafka.clj new file mode 100644 index 0000000..ca730a1 --- /dev/null +++ b/src/chameleon/kafka.clj @@ -0,0 +1,97 @@ +(ns chameleon.kafka + (:refer-clojure :exclude [send]) + (:require [clojure.core.async :as ca]) + (:import [java.util Properties] + [org.apache.kafka.clients.consumer KafkaConsumer] + [org.apache.kafka.clients.consumer ConsumerRecord] + [org.apache.kafka.common.serialization StringDeserializer StringSerializer Serdes])) + +(declare consume run-away) + +(defrecord CljKafkaConsumer [config topic]) + + +;; Public +(def deser StringDeserializer) +(def ser StringSerializer) + +(defmacro run-away + [name & body] + `(try (.start (Thread. (fn [] ~@body) ~name)) + (catch OutOfMemoryError e# + (throw (ex-info {:error "Could not allocate resources for asynchronous execution"}))))) + +(defn clj-kafka-consumer + "Given a config, group-id, and a topic, return a CljKafkaConsumer" + [config group-id topic] + (CljKafkaConsumer. (assoc config "group.id" group-id) topic)) + +(defn subscribe + "Given a CljKafkaConsumer, a channel, and a session timeout (in + ms), return a channel. The input channel is where the messages will + be published." + ([{:keys [config topic] :as conf} channel session-timeout] + (subscribe conf channel session-timeout "Kafka-Subscriber-Thread")) + ([{:keys [config topic]} channel session-timeout thread-name] + (let [control-channel (ca/chan (ca/dropping-buffer 1))] + (run-away thread-name (consume config topic channel session-timeout control-channel)) + control-channel))) + +;; Private + +(defn- consumer-record->clojure + [consumer-record] + {:topic (.topic ^ConsumerRecord consumer-record) + :partition (.partition ^ConsumerRecord consumer-record) + :offset (.offset ^ConsumerRecord consumer-record) + :key (.key ^ConsumerRecord consumer-record) + :value (.value ^ConsumerRecord consumer-record)}) + +(defn- send-with-sla-timeout + "Consumer Records will always be a sequence. Try to put a message on + the channel before the sla-timeout. Return true if a message was put + on the channel or when there were no messages to put on the + channel. Return false if there is a timeout." + [chan sla-timeout messages] + (if (seq messages) + (let [sent? (ca/alt!! [[chan messages]] :sent + (ca/timeout sla-timeout) :timeout)] + (if (= sent? :timeout) + false + true)) + true)) + +(defn- as-properties + [m] + (reduce (fn [props [k v]] + (.setProperty ^Properties props k v) + props) + (Properties.) m)) + +(defn- consume + "Consume elements from a topic and put it on the given + channel. Block until a given timeout to put messages on the channel + or put sla-exception on the control channel (channel returned by + this function). Once the message is put on the channel, it is + considered safe to update the offset of the kafka consumer. The + `sla-timeout` is an agreement such that the data has to be taken off + the channel within the given timeout. If failed to take the data, + kaka consumer will be disconnected and an exception will be put on + the control channel." + [config topic chan sla-timeout control-channel] + (let [consumer (KafkaConsumer. ^Properties (as-properties config))] + (.subscribe ^KafkaConsumer consumer (list topic)) + (loop [] + (let [consumer-records (.poll consumer 0)] + (when (and (->> consumer-records + (map consumer-record->clojure) + (send-with-sla-timeout chan sla-timeout)) + (try (.commitSync consumer) + true + (catch Exception e + (ca/>!! control-channel (.getMessage e)) + false))) + (recur)))) + (ca/>!! control-channel :sla-timeout) + (.unsubscribe consumer) + (.close consumer))) |