aboutsummaryrefslogtreecommitdiffstats
path: root/src/chameleon/kafka.clj
diff options
context:
space:
mode:
Diffstat (limited to 'src/chameleon/kafka.clj')
-rw-r--r--src/chameleon/kafka.clj54
1 files changed, 33 insertions, 21 deletions
diff --git a/src/chameleon/kafka.clj b/src/chameleon/kafka.clj
index ca730a1..ae8e77f 100644
--- a/src/chameleon/kafka.clj
+++ b/src/chameleon/kafka.clj
@@ -1,6 +1,9 @@
(ns chameleon.kafka
(:refer-clojure :exclude [send])
- (:require [clojure.core.async :as ca])
+ (:require [clojure.core.async :as ca]
+ [clojure.spec.alpha :as s]
+ [chameleon.core :as core]
+ [chameleon.logging :as log])
(:import [java.util Properties]
[org.apache.kafka.clients.consumer KafkaConsumer]
[org.apache.kafka.clients.consumer ConsumerRecord]
@@ -15,7 +18,7 @@
(def deser StringDeserializer)
(def ser StringSerializer)
-(defmacro run-away
+(defmacro threaded
[name & body]
`(try (.start (Thread. (fn [] ~@body) ~name))
(catch OutOfMemoryError e#
@@ -23,24 +26,39 @@
(defn clj-kafka-consumer
"Given a config, group-id, and a topic, return a CljKafkaConsumer"
- [config group-id topic]
- (CljKafkaConsumer. (assoc config "group.id" group-id) topic))
+ [config group-id topic error-logger]
+ (let [valid? (core/conform-multiple :kafka/config config
+ :chameleon.specs/string group-id
+ :chameleon.specs/string topic)]
+ (if-not (seq valid?)
+ (CljKafkaConsumer. (assoc config "group.id" group-id) topic)
+ (log/info error-logger "ERROR" (->> valid?
+ (into ["SPEC ERROR"])
+ (mapv str))))))
(defn subscribe
"Given a CljKafkaConsumer, a channel, and a session timeout (in
ms), return a channel. The input channel is where the messages will
be published."
- ([{:keys [config topic] :as conf} channel session-timeout]
+ ([{:keys [config topic] :as conf} channel session-timeout error-logger]
(subscribe conf channel session-timeout "Kafka-Subscriber-Thread"))
- ([{:keys [config topic]} channel session-timeout thread-name]
- (let [control-channel (ca/chan (ca/dropping-buffer 1))]
- (run-away thread-name (consume config topic channel session-timeout control-channel))
- control-channel)))
+ ([{:keys [config topic]} channel session-timeout thread-name error-logger]
+ (let [valid? (core/conform-multiple :kafka/timeout session-timeout
+ :kafka/chan channel
+ :kafka/config config
+ :chameleon.specs/string topic)
+ control-channel (ca/chan (ca/dropping-buffer 1))]
+ (if-not (seq valid?)
+ (do (threaded thread-name (consume config topic channel session-timeout control-channel))
+ control-channel)
+ (log/info error-logger "ERROR" (->> valid?
+ (into ["SPEC ERROR"])
+ (mapv str)))))))
;; Private
(defn- consumer-record->clojure
- [consumer-record]
+ [^ConsumerRecord consumer-record]
{:topic (.topic ^ConsumerRecord consumer-record)
:partition (.partition ^ConsumerRecord consumer-record)
:offset (.offset ^ConsumerRecord consumer-record)
@@ -61,13 +79,6 @@
true))
true))
-(defn- as-properties
- [m]
- (reduce (fn [props [k v]]
- (.setProperty ^Properties props k v)
- props)
- (Properties.) m))
-
(defn- consume
"Consume elements from a topic and put it on the given
channel. Block until a given timeout to put messages on the channel
@@ -79,13 +90,14 @@
kaka consumer will be disconnected and an exception will be put on
the control channel."
[config topic chan sla-timeout control-channel]
- (let [consumer (KafkaConsumer. ^Properties (as-properties config))]
+ (let [consumer (KafkaConsumer. config)]
(.subscribe ^KafkaConsumer consumer (list topic))
(loop []
(let [consumer-records (.poll consumer 0)]
- (when (and (->> consumer-records
- (map consumer-record->clojure)
- (send-with-sla-timeout chan sla-timeout))
+ (when (and (->> consumer-records
+ (s/conform :kafka/consumer-record)
+ (map consumer-record->clojure)
+ (send-with-sla-timeout chan sla-timeout))
(try (.commitSync consumer)
true
(catch Exception e