aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSteven Blimkie <Steven.Blimkie@amdocs.com>2019-01-16 20:11:14 +0000
committerGerrit Code Review <gerrit@onap.org>2019-01-16 20:11:14 +0000
commit4b0c75f9e509399fe9a3540741e2c3b3c24db897 (patch)
tree5d93476be811f7dbd611383a4eea66d7863e910e
parent549532a41cfc041af86477a8c0409190eb4c4e8d (diff)
parent2210ee587d73dff06ac0711280b0ea8aae3ffa10 (diff)
Merge "Removing conform from kafkaRecords"
-rw-r--r--src/chameleon/specs.clj11
1 files changed, 10 insertions, 1 deletions
diff --git a/src/chameleon/specs.clj b/src/chameleon/specs.clj
index 760b30a..5a8a896 100644
--- a/src/chameleon/specs.clj
+++ b/src/chameleon/specs.clj
@@ -7,7 +7,8 @@
[clojure.string :as str]
[cheshire.core :as json]
[chameleon.kafka :as ck])
- (:import [org.apache.kafka.clients.consumer ConsumerRecord]
+ (:import [org.apache.kafka.clients.consumer ConsumerRecord ConsumerRecords ]
+ [org.apache.kafka.common TopicPartition]
[java.util Properties]))
(s/def ::host string?)
@@ -102,6 +103,11 @@
(s/def :kafka/key ::string)
(s/def :kafka/value ::string)
+(s/def :kafka/topic-partition (s/spec #(instance? TopicPartition %) :gen (fn [] (->> (s/cat :topic :kafka/topic
+ :partition :kafka/partition)
+ s/gen
+ (gen/fmap (fn [[topic partition]]
+ (TopicPartition. topic partition)))))))
(s/def :kafka/consumer-record (s/spec #(instance? ConsumerRecord %)
:gen (fn [] (->> (s/cat :topic :kafka/topic
:partition :kafka/partition
@@ -111,6 +117,9 @@
s/gen
(gen/fmap (fn [[topic partition offset key value]]
(ConsumerRecord. topic partition offset key value)))))))
+(s/def :kafka/consumer-records (s/spec #(instance? ConsumerRecords %)
+ :gen (fn [] (gen/fmap (fn [k] (ConsumerRecords. k))
+ (s/gen (s/map-of :kafka/topic-partition (s/coll-of :kafka/consumer-record)))))))
(s/def :kafka/clojure-consumer-record (s/spec (s/keys :req-un [:kafka/topic :kafka/partition
:kafka/offset :kafka/key :kafka/value])))