aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/chameleon/kafka.clj4
-rw-r--r--src/chameleon/specs.clj11
-rw-r--r--test/chameleon/testing.clj6
3 files changed, 17 insertions, 4 deletions
diff --git a/src/chameleon/kafka.clj b/src/chameleon/kafka.clj
index ae8e77f..bdfc07d 100644
--- a/src/chameleon/kafka.clj
+++ b/src/chameleon/kafka.clj
@@ -36,6 +36,10 @@
(into ["SPEC ERROR"])
(mapv str))))))
+(defn clj-kafka-comsumer?
+ [consumer]
+ (instance? CljKafkaConsumer consumer))
+
(defn subscribe
"Given a CljKafkaConsumer, a channel, and a session timeout (in
ms), return a channel. The input channel is where the messages will
diff --git a/src/chameleon/specs.clj b/src/chameleon/specs.clj
index 4158e09..760b30a 100644
--- a/src/chameleon/specs.clj
+++ b/src/chameleon/specs.clj
@@ -5,7 +5,8 @@
[cheshire.core :as c]
[clojure.core.async :as ca]
[clojure.string :as str]
- [cheshire.core :as json])
+ [cheshire.core :as json]
+ [chameleon.kafka :as ck])
(:import [org.apache.kafka.clients.consumer ConsumerRecord]
[java.util Properties]))
@@ -114,3 +115,11 @@
(s/def :kafka/clojure-consumer-record (s/spec (s/keys :req-un [:kafka/topic :kafka/partition
:kafka/offset :kafka/key :kafka/value])))
(s/def :kafka/properties (s/spec #(instance? Properties %) :gen #(s/tuple keyword? ::string)))
+(s/def :kafka/consumer (s/spec ck/clj-kafka-comsumer? :gen #(->> (s/cat :config :kafka/config
+ :group-id ::string
+ :topic :kafka/topic
+ :logger ::logger)
+ s/gen
+ (gen/fmap (fn [[config group-id topic logger]]
+ (println [config group-id topic logger])
+ (ck/clj-kafka-consumer config group-id topic logger))))))
diff --git a/test/chameleon/testing.clj b/test/chameleon/testing.clj
index 35f897d..90de58b 100644
--- a/test/chameleon/testing.clj
+++ b/test/chameleon/testing.clj
@@ -9,8 +9,8 @@
[chameleon.aai-processor :as cai]
[chameleon.config :as cc]
[integrant.core :as ic]
- [integrant.core :as ig])
- (:import [chameleon.kafka CljKafkaConsumer]))
+ [integrant.core :as ig]
+ [chameleon.aai-processor]))
;; STUBS
(s/fdef chameleon.route/assert-gallifrey!
@@ -72,7 +72,7 @@
(s/fdef chameleon.kafka/clj-kafka-consumer
:args (s/cat :config :kafka/config :group-id :chameleon.specs/string
:topic :chameleon.specs/string :logger :chameleon.specs/logger)
- :ret #(instance? CljKafkaConsumer %))
+ :ret :kafka/consumer)
(s/fdef chameleon.kafka/consumer-record->clojure
:args (s/cat :consumer-record :kafka/consumer-record)