summaryrefslogtreecommitdiffstats
path: root/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test')
-rw-r--r--plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParametersTest.java90
1 files changed, 86 insertions, 4 deletions
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParametersTest.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParametersTest.java
index 2f5405ba8..5b6ec33c6 100644
--- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParametersTest.java
+++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParametersTest.java
@@ -1,6 +1,7 @@
/*-
* ============LICENSE_START=======================================================
* Copyright (C) 2019 Samsung. All rights reserved.
+ * Modifications Copyright (C) 2019 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -45,7 +46,7 @@ public class KafkaCarrierTechnologyParametersTest {
Properties kafkaProducerProperties = kafkaCarrierTechnologyParameters.getKafkaProducerProperties();
assertNotNull(kafkaProducerProperties);
assertEquals("localhost:9092", kafkaProducerProperties.get("bootstrap.servers"));
- assertEquals(1, kafkaProducerProperties.get("linger.ms"));
+ assertEquals("1", kafkaProducerProperties.get("linger.ms"));
assertEquals(null, kafkaProducerProperties.get("group.id"));
assertEquals(null, kafkaProducerProperties.get("Property0"));
assertEquals(null, kafkaProducerProperties.get("Property1"));
@@ -66,7 +67,7 @@ public class KafkaCarrierTechnologyParametersTest {
kafkaProducerProperties = kafkaCarrierTechnologyParameters.getKafkaProducerProperties();
assertNotNull(kafkaProducerProperties);
assertEquals("localhost:9092", kafkaProducerProperties.get("bootstrap.servers"));
- assertEquals(1, kafkaProducerProperties.get("linger.ms"));
+ assertEquals("1", kafkaProducerProperties.get("linger.ms"));
assertEquals(null, kafkaProducerProperties.get("group.id"));
assertEquals("Value0", kafkaProducerProperties.get("Property0"));
assertEquals("Value1", kafkaProducerProperties.get("Property1"));
@@ -222,7 +223,7 @@ public class KafkaCarrierTechnologyParametersTest {
kafkaCarrierTechnologyParameters.setConsumerTopicList(origConsumerTopcList);
assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
- String[] blankStringList = {null, ""};
+ String[] blankStringList = { null, "" };
kafkaCarrierTechnologyParameters.setConsumerTopicList(blankStringList);
assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
kafkaCarrierTechnologyParameters.setConsumerTopicList(origConsumerTopcList);
@@ -239,7 +240,7 @@ public class KafkaCarrierTechnologyParametersTest {
kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
- // @formatter:off
+ // @formatter:offkafkaCarrierTechnologyParameters
String[][] kafkaProperties0 = {
{
null, "Value0"
@@ -290,6 +291,87 @@ public class KafkaCarrierTechnologyParametersTest {
assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+ }
+
+ @Test
+ public void testExplicitImplicit() {
+ KafkaCarrierTechnologyParameters kafkaCtp = new KafkaCarrierTechnologyParameters();
+ assertNotNull(kafkaCtp);
+
+ assertTrue(kafkaCtp.validate().isValid());
+
+ // @formatter:off
+ assertEquals("localhost:9092", kafkaCtp.getKafkaConsumerProperties().get("bootstrap.servers"));
+ assertEquals("all", kafkaCtp.getKafkaProducerProperties().get("acks"));
+ assertEquals("0", kafkaCtp.getKafkaProducerProperties().get("retries"));
+ assertEquals("16384", kafkaCtp.getKafkaProducerProperties().get("batch.size"));
+ assertEquals("1", kafkaCtp.getKafkaProducerProperties().get("linger.ms"));
+ assertEquals("33554432", kafkaCtp.getKafkaProducerProperties().get("buffer.memory"));
+ assertEquals("default-group-id", kafkaCtp.getKafkaConsumerProperties().get("group.id"));
+ assertEquals("true", kafkaCtp.getKafkaConsumerProperties().get("enable.auto.commit"));
+ assertEquals("1000", kafkaCtp.getKafkaConsumerProperties().get("auto.commit.interval.ms"));
+ assertEquals("30000", kafkaCtp.getKafkaConsumerProperties().get("session.timeout.ms"));
+ // @formatter:on
+
+ assertEquals("org.apache.kafka.common.serialization.StringSerializer",
+ kafkaCtp.getKafkaProducerProperties().get("key.serializer"));
+ assertEquals("org.apache.kafka.common.serialization.StringSerializer",
+ kafkaCtp.getKafkaProducerProperties().get("value.serializer"));
+ assertEquals("org.apache.kafka.common.serialization.StringDeserializer",
+ kafkaCtp.getKafkaConsumerProperties().get("key.deserializer"));
+ assertEquals("org.apache.kafka.common.serialization.StringDeserializer",
+ kafkaCtp.getKafkaConsumerProperties().get("value.deserializer"));
+ assertEquals("org.apache.kafka.clients.producer.internals.DefaultPartitioner",
+ kafkaCtp.getKafkaProducerProperties().get("partitioner.class"));
+
+ // @formatter:off
+ String[][] kafkaProperties0 = {
+ {
+ "bootstrap.servers", "localhost:9092"
+ }
+ };
+ // @formatter:on
+
+ kafkaCtp.setBootstrapServers(null);
+ kafkaCtp.setKafkaProperties(kafkaProperties0);
+ assertEquals("localhost:9092", kafkaCtp.getKafkaConsumerProperties().get("bootstrap.servers"));
+
+ // @formatter:off
+ String[][] kafkaProperties1 = {
+ {
+ "bootstrap.servers", "localhost:9999"
+ }
+ };
+ // @formatter:on
+
+ kafkaCtp = new KafkaCarrierTechnologyParameters();
+ kafkaCtp.setKafkaProperties(kafkaProperties1);
+ assertEquals("localhost:9999", kafkaCtp.getKafkaConsumerProperties().get("bootstrap.servers"));
+
+ // @formatter:off
+ String[][] kafkaProperties2 = {
+ {
+ "bootstrap.servers", "localhost:8888"
+ }
+ };
+ // @formatter:on
+
+ kafkaCtp = new KafkaCarrierTechnologyParameters();
+ kafkaCtp.setBootstrapServers("localhost:9092");
+ kafkaCtp.setKafkaProperties(kafkaProperties2);
+ assertEquals("localhost:8888", kafkaCtp.getKafkaConsumerProperties().get("bootstrap.servers"));
+
+ // @formatter:off
+ String[][] kafkaProperties3 = {
+ {
+ "bootstrap.servers", "localhost:5555"
+ }
+ };
+ // @formatter:on
+ kafkaCtp = new KafkaCarrierTechnologyParameters();
+ kafkaCtp.setBootstrapServers("localhost:7777");
+ kafkaCtp.setKafkaProperties(kafkaProperties3);
+ assertEquals("localhost:7777", kafkaCtp.getKafkaConsumerProperties().get("bootstrap.servers"));
}
}