diff options
-rw-r--r-- | src/chameleon/kafka.clj | 4 | ||||
-rw-r--r-- | src/chameleon/specs.clj | 11 | ||||
-rw-r--r-- | test/chameleon/testing.clj | 6 |
3 files changed, 17 insertions, 4 deletions
diff --git a/src/chameleon/kafka.clj b/src/chameleon/kafka.clj index ae8e77f..bdfc07d 100644 --- a/src/chameleon/kafka.clj +++ b/src/chameleon/kafka.clj @@ -36,6 +36,10 @@ (into ["SPEC ERROR"]) (mapv str)))))) +(defn clj-kafka-comsumer? + [consumer] + (instance? CljKafkaConsumer consumer)) + (defn subscribe "Given a CljKafkaConsumer, a channel, and a session timeout (in ms), return a channel. The input channel is where the messages will diff --git a/src/chameleon/specs.clj b/src/chameleon/specs.clj index 4158e09..760b30a 100644 --- a/src/chameleon/specs.clj +++ b/src/chameleon/specs.clj @@ -5,7 +5,8 @@ [cheshire.core :as c] [clojure.core.async :as ca] [clojure.string :as str] - [cheshire.core :as json]) + [cheshire.core :as json] + [chameleon.kafka :as ck]) (:import [org.apache.kafka.clients.consumer ConsumerRecord] [java.util Properties])) @@ -114,3 +115,11 @@ (s/def :kafka/clojure-consumer-record (s/spec (s/keys :req-un [:kafka/topic :kafka/partition :kafka/offset :kafka/key :kafka/value]))) (s/def :kafka/properties (s/spec #(instance? Properties %) :gen #(s/tuple keyword? ::string))) +(s/def :kafka/consumer (s/spec ck/clj-kafka-comsumer? :gen #(->> (s/cat :config :kafka/config + :group-id ::string + :topic :kafka/topic + :logger ::logger) + s/gen + (gen/fmap (fn [[config group-id topic logger]] + (println [config group-id topic logger]) + (ck/clj-kafka-consumer config group-id topic logger)))))) diff --git a/test/chameleon/testing.clj b/test/chameleon/testing.clj index 35f897d..90de58b 100644 --- a/test/chameleon/testing.clj +++ b/test/chameleon/testing.clj @@ -9,8 +9,8 @@ [chameleon.aai-processor :as cai] [chameleon.config :as cc] [integrant.core :as ic] - [integrant.core :as ig]) - (:import [chameleon.kafka CljKafkaConsumer])) + [integrant.core :as ig] + [chameleon.aai-processor])) ;; STUBS (s/fdef chameleon.route/assert-gallifrey! @@ -72,7 +72,7 @@ (s/fdef chameleon.kafka/clj-kafka-consumer :args (s/cat :config :kafka/config :group-id :chameleon.specs/string :topic :chameleon.specs/string :logger :chameleon.specs/logger) - :ret #(instance? CljKafkaConsumer %)) + :ret :kafka/consumer) (s/fdef chameleon.kafka/consumer-record->clojure :args (s/cat :consumer-record :kafka/consumer-record) |