aboutsummaryrefslogtreecommitdiffstats
path: root/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java
diff options
context:
space:
mode:
authorliamfallon <liam.fallon@est.tech>2019-07-05 13:41:03 +0000
committerliamfallon <liam.fallon@est.tech>2019-07-05 13:41:03 +0000
commitcad7cff7dc945eefaf27815742461d6db6ab8eac (patch)
tree4f3e6b26fadd9fff47deccfe2cbff0e76a46e948 /plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java
parent9f0678f3ab333949076b5b9747158cf40e08fbef (diff)
Add duplicate check, examples for kafka Properties
Added checks for dealing with duplication of specification of properties explicitly and in kafkaPropertes Added examples for kafkaProperties Added documentation for kafkaProperties Issue-ID: POLICY-1818 Change-Id: Icbe01d6f1d25d4570dcc85cc3db28588743b9c41 Signed-off-by: liamfallon <liam.fallon@est.tech>
Diffstat (limited to 'plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java')
-rw-r--r--plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParametersTest.java90
1 files changed, 86 insertions, 4 deletions
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParametersTest.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParametersTest.java
index 2f5405ba8..5b6ec33c6 100644
--- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParametersTest.java
+++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParametersTest.java
@@ -1,6 +1,7 @@
/*-
* ============LICENSE_START=======================================================
* Copyright (C) 2019 Samsung. All rights reserved.
+ * Modifications Copyright (C) 2019 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -45,7 +46,7 @@ public class KafkaCarrierTechnologyParametersTest {
Properties kafkaProducerProperties = kafkaCarrierTechnologyParameters.getKafkaProducerProperties();
assertNotNull(kafkaProducerProperties);
assertEquals("localhost:9092", kafkaProducerProperties.get("bootstrap.servers"));
- assertEquals(1, kafkaProducerProperties.get("linger.ms"));
+ assertEquals("1", kafkaProducerProperties.get("linger.ms"));
assertEquals(null, kafkaProducerProperties.get("group.id"));
assertEquals(null, kafkaProducerProperties.get("Property0"));
assertEquals(null, kafkaProducerProperties.get("Property1"));
@@ -66,7 +67,7 @@ public class KafkaCarrierTechnologyParametersTest {
kafkaProducerProperties = kafkaCarrierTechnologyParameters.getKafkaProducerProperties();
assertNotNull(kafkaProducerProperties);
assertEquals("localhost:9092", kafkaProducerProperties.get("bootstrap.servers"));
- assertEquals(1, kafkaProducerProperties.get("linger.ms"));
+ assertEquals("1", kafkaProducerProperties.get("linger.ms"));
assertEquals(null, kafkaProducerProperties.get("group.id"));
assertEquals("Value0", kafkaProducerProperties.get("Property0"));
assertEquals("Value1", kafkaProducerProperties.get("Property1"));
@@ -222,7 +223,7 @@ public class KafkaCarrierTechnologyParametersTest {
kafkaCarrierTechnologyParameters.setConsumerTopicList(origConsumerTopcList);
assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
- String[] blankStringList = {null, ""};
+ String[] blankStringList = { null, "" };
kafkaCarrierTechnologyParameters.setConsumerTopicList(blankStringList);
assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
kafkaCarrierTechnologyParameters.setConsumerTopicList(origConsumerTopcList);
@@ -239,7 +240,7 @@ public class KafkaCarrierTechnologyParametersTest {
kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
- // @formatter:off
+ // @formatter:offkafkaCarrierTechnologyParameters
String[][] kafkaProperties0 = {
{
null, "Value0"
@@ -290,6 +291,87 @@ public class KafkaCarrierTechnologyParametersTest {
assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+ }
+
+ @Test
+ public void testExplicitImplicit() {
+ KafkaCarrierTechnologyParameters kafkaCtp = new KafkaCarrierTechnologyParameters();
+ assertNotNull(kafkaCtp);
+
+ assertTrue(kafkaCtp.validate().isValid());
+
+ // @formatter:off
+ assertEquals("localhost:9092", kafkaCtp.getKafkaConsumerProperties().get("bootstrap.servers"));
+ assertEquals("all", kafkaCtp.getKafkaProducerProperties().get("acks"));
+ assertEquals("0", kafkaCtp.getKafkaProducerProperties().get("retries"));
+ assertEquals("16384", kafkaCtp.getKafkaProducerProperties().get("batch.size"));
+ assertEquals("1", kafkaCtp.getKafkaProducerProperties().get("linger.ms"));
+ assertEquals("33554432", kafkaCtp.getKafkaProducerProperties().get("buffer.memory"));
+ assertEquals("default-group-id", kafkaCtp.getKafkaConsumerProperties().get("group.id"));
+ assertEquals("true", kafkaCtp.getKafkaConsumerProperties().get("enable.auto.commit"));
+ assertEquals("1000", kafkaCtp.getKafkaConsumerProperties().get("auto.commit.interval.ms"));
+ assertEquals("30000", kafkaCtp.getKafkaConsumerProperties().get("session.timeout.ms"));
+ // @formatter:on
+
+ assertEquals("org.apache.kafka.common.serialization.StringSerializer",
+ kafkaCtp.getKafkaProducerProperties().get("key.serializer"));
+ assertEquals("org.apache.kafka.common.serialization.StringSerializer",
+ kafkaCtp.getKafkaProducerProperties().get("value.serializer"));
+ assertEquals("org.apache.kafka.common.serialization.StringDeserializer",
+ kafkaCtp.getKafkaConsumerProperties().get("key.deserializer"));
+ assertEquals("org.apache.kafka.common.serialization.StringDeserializer",
+ kafkaCtp.getKafkaConsumerProperties().get("value.deserializer"));
+ assertEquals("org.apache.kafka.clients.producer.internals.DefaultPartitioner",
+ kafkaCtp.getKafkaProducerProperties().get("partitioner.class"));
+
+ // @formatter:off
+ String[][] kafkaProperties0 = {
+ {
+ "bootstrap.servers", "localhost:9092"
+ }
+ };
+ // @formatter:on
+
+ kafkaCtp.setBootstrapServers(null);
+ kafkaCtp.setKafkaProperties(kafkaProperties0);
+ assertEquals("localhost:9092", kafkaCtp.getKafkaConsumerProperties().get("bootstrap.servers"));
+
+ // @formatter:off
+ String[][] kafkaProperties1 = {
+ {
+ "bootstrap.servers", "localhost:9999"
+ }
+ };
+ // @formatter:on
+
+ kafkaCtp = new KafkaCarrierTechnologyParameters();
+ kafkaCtp.setKafkaProperties(kafkaProperties1);
+ assertEquals("localhost:9999", kafkaCtp.getKafkaConsumerProperties().get("bootstrap.servers"));
+
+ // @formatter:off
+ String[][] kafkaProperties2 = {
+ {
+ "bootstrap.servers", "localhost:8888"
+ }
+ };
+ // @formatter:on
+
+ kafkaCtp = new KafkaCarrierTechnologyParameters();
+ kafkaCtp.setBootstrapServers("localhost:9092");
+ kafkaCtp.setKafkaProperties(kafkaProperties2);
+ assertEquals("localhost:8888", kafkaCtp.getKafkaConsumerProperties().get("bootstrap.servers"));
+
+ // @formatter:off
+ String[][] kafkaProperties3 = {
+ {
+ "bootstrap.servers", "localhost:5555"
+ }
+ };
+ // @formatter:on
+ kafkaCtp = new KafkaCarrierTechnologyParameters();
+ kafkaCtp.setBootstrapServers("localhost:7777");
+ kafkaCtp.setKafkaProperties(kafkaProperties3);
+ assertEquals("localhost:7777", kafkaCtp.getKafkaConsumerProperties().get("bootstrap.servers"));
}
}