diff options
author | Shwetank Dave <shwetank.dave@amdocs.com> | 2019-01-16 12:27:47 -0500 |
---|---|---|
committer | Shwetank Dave <shwetank.dave@amdocs.com> | 2019-01-16 12:30:20 -0500 |
commit | 2210ee587d73dff06ac0711280b0ea8aae3ffa10 (patch) | |
tree | d1e2eef7819642fa44abb498db3bc136f2a76b92 /src | |
parent | 694cabe2c0c5aaa05de06e731112b4bf78366de5 (diff) |
Removing conform from kafkaRecords
Change-Id: Id3648a7286556e683b50da9f3db4bcec62005794
Issue-ID: AAI-797
Signed-off-by: Shwetank Dave <shwetank.dave@amdocs.com>
Diffstat (limited to 'src')
-rw-r--r-- | src/chameleon/kafka.clj | 1 | ||||
-rw-r--r-- | src/chameleon/specs.clj | 11 |
2 files changed, 10 insertions, 2 deletions
diff --git a/src/chameleon/kafka.clj b/src/chameleon/kafka.clj index bdfc07d..357d68a 100644 --- a/src/chameleon/kafka.clj +++ b/src/chameleon/kafka.clj @@ -99,7 +99,6 @@ (loop [] (let [consumer-records (.poll consumer 0)] (when (and (->> consumer-records - (s/conform :kafka/consumer-record) (map consumer-record->clojure) (send-with-sla-timeout chan sla-timeout)) (try (.commitSync consumer) diff --git a/src/chameleon/specs.clj b/src/chameleon/specs.clj index 760b30a..5a8a896 100644 --- a/src/chameleon/specs.clj +++ b/src/chameleon/specs.clj @@ -7,7 +7,8 @@ [clojure.string :as str] [cheshire.core :as json] [chameleon.kafka :as ck]) - (:import [org.apache.kafka.clients.consumer ConsumerRecord] + (:import [org.apache.kafka.clients.consumer ConsumerRecord ConsumerRecords ] + [org.apache.kafka.common TopicPartition] [java.util Properties])) (s/def ::host string?) @@ -102,6 +103,11 @@ (s/def :kafka/key ::string) (s/def :kafka/value ::string) +(s/def :kafka/topic-partition (s/spec #(instance? TopicPartition %) :gen (fn [] (->> (s/cat :topic :kafka/topic + :partition :kafka/partition) + s/gen + (gen/fmap (fn [[topic partition]] + (TopicPartition. topic partition))))))) (s/def :kafka/consumer-record (s/spec #(instance? ConsumerRecord %) :gen (fn [] (->> (s/cat :topic :kafka/topic :partition :kafka/partition @@ -111,6 +117,9 @@ s/gen (gen/fmap (fn [[topic partition offset key value]] (ConsumerRecord. topic partition offset key value))))))) +(s/def :kafka/consumer-records (s/spec #(instance? ConsumerRecords %) + :gen (fn [] (gen/fmap (fn [k] (ConsumerRecords. k)) + (s/gen (s/map-of :kafka/topic-partition (s/coll-of :kafka/consumer-record))))))) (s/def :kafka/clojure-consumer-record (s/spec (s/keys :req-un [:kafka/topic :kafka/partition :kafka/offset :kafka/key :kafka/value]))) |