aboutsummaryrefslogtreecommitdiffstats
path: root/src/chameleon
diff options
context:
space:
mode:
Diffstat (limited to 'src/chameleon')
-rw-r--r--src/chameleon/kafka.clj4
-rw-r--r--src/chameleon/specs.clj11
2 files changed, 14 insertions, 1 deletions
diff --git a/src/chameleon/kafka.clj b/src/chameleon/kafka.clj
index ae8e77f..bdfc07d 100644
--- a/src/chameleon/kafka.clj
+++ b/src/chameleon/kafka.clj
@@ -36,6 +36,10 @@
(into ["SPEC ERROR"])
(mapv str))))))
+(defn clj-kafka-comsumer?
+ [consumer]
+ (instance? CljKafkaConsumer consumer))
+
(defn subscribe
"Given a CljKafkaConsumer, a channel, and a session timeout (in
ms), return a channel. The input channel is where the messages will
diff --git a/src/chameleon/specs.clj b/src/chameleon/specs.clj
index 4158e09..760b30a 100644
--- a/src/chameleon/specs.clj
+++ b/src/chameleon/specs.clj
@@ -5,7 +5,8 @@
[cheshire.core :as c]
[clojure.core.async :as ca]
[clojure.string :as str]
- [cheshire.core :as json])
+ [cheshire.core :as json]
+ [chameleon.kafka :as ck])
(:import [org.apache.kafka.clients.consumer ConsumerRecord]
[java.util Properties]))
@@ -114,3 +115,11 @@
(s/def :kafka/clojure-consumer-record (s/spec (s/keys :req-un [:kafka/topic :kafka/partition
:kafka/offset :kafka/key :kafka/value])))
(s/def :kafka/properties (s/spec #(instance? Properties %) :gen #(s/tuple keyword? ::string)))
+(s/def :kafka/consumer (s/spec ck/clj-kafka-comsumer? :gen #(->> (s/cat :config :kafka/config
+ :group-id ::string
+ :topic :kafka/topic
+ :logger ::logger)
+ s/gen
+ (gen/fmap (fn [[config group-id topic logger]]
+ (println [config group-id topic logger])
+ (ck/clj-kafka-consumer config group-id topic logger))))))