aboutsummaryrefslogtreecommitdiffstats
path: root/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org
diff options
context:
space:
mode:
authorliamfallon <liam.fallon@est.tech>2019-06-25 16:10:25 +0000
committerliamfallon <liam.fallon@est.tech>2019-06-25 16:10:25 +0000
commit5f029543f1e673655af2d2974113069df0b6def0 (patch)
tree80b84e0e554a6625869847a4b10090c9fa1334c8 /plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org
parentb8316b12e08452cda8322945ca59763e16a37a71 (diff)
Allow Kafka plugin to take arbitrary properties2.2.0
This change adds support for arbitrary Kafka properties to be passed to Kafka through the Kafka plugin. Issue-ID: POLICY-1818 Change-Id: I4389876286747b250c8abe492e9e31674a9483c9 Signed-off-by: liamfallon <liam.fallon@est.tech>
Diffstat (limited to 'plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org')
-rw-r--r--plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParametersTest.java270
1 files changed, 248 insertions, 22 deletions
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParametersTest.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParametersTest.java
index 6eca1dcf0..2f5405ba8 100644
--- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParametersTest.java
+++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParametersTest.java
@@ -20,50 +20,276 @@
package org.onap.policy.apex.plugins.event.carrier.kafka;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
import java.util.Properties;
-import org.junit.Before;
+
import org.junit.Test;
-import org.onap.policy.common.parameters.GroupValidationResult;
public class KafkaCarrierTechnologyParametersTest {
-
- KafkaCarrierTechnologyParameters kafkaCarrierTechnologyParameters = null;
- Properties kafkaProducerProperties = null;
- Properties kafkaConsumerProperties = null;
- GroupValidationResult result = null;
-
- /**
- * Set up testing.
- *
- * @throws Exception on setup errors
- */
- @Before
- public void setUp() throws Exception {
- kafkaCarrierTechnologyParameters = new KafkaCarrierTechnologyParameters();
- kafkaProducerProperties = kafkaCarrierTechnologyParameters.getKafkaProducerProperties();
- kafkaConsumerProperties = kafkaCarrierTechnologyParameters.getKafkaConsumerProperties();
- }
-
@Test
public void testKafkaCarrierTechnologyParameters() {
+ KafkaCarrierTechnologyParameters kafkaCarrierTechnologyParameters = new KafkaCarrierTechnologyParameters();
assertNotNull(kafkaCarrierTechnologyParameters);
+
+ assertEquals("localhost:9092", kafkaCarrierTechnologyParameters.getBootstrapServers());
}
@Test
public void testGetKafkaProducerProperties() {
+ KafkaCarrierTechnologyParameters kafkaCarrierTechnologyParameters = new KafkaCarrierTechnologyParameters();
+
+ Properties kafkaProducerProperties = kafkaCarrierTechnologyParameters.getKafkaProducerProperties();
+ assertNotNull(kafkaProducerProperties);
+ assertEquals("localhost:9092", kafkaProducerProperties.get("bootstrap.servers"));
+ assertEquals(1, kafkaProducerProperties.get("linger.ms"));
+ assertEquals(null, kafkaProducerProperties.get("group.id"));
+ assertEquals(null, kafkaProducerProperties.get("Property0"));
+ assertEquals(null, kafkaProducerProperties.get("Property1"));
+ assertEquals(null, kafkaProducerProperties.get("Property2"));
+
+ // @formatter:off
+ String[][] kafkaProperties = {
+ {
+ "Property0", "Value0"
+ },
+ {
+ "Property1", "Value1"
+ }
+ };
+ // @formatter:on
+
+ kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties);
+ kafkaProducerProperties = kafkaCarrierTechnologyParameters.getKafkaProducerProperties();
assertNotNull(kafkaProducerProperties);
+ assertEquals("localhost:9092", kafkaProducerProperties.get("bootstrap.servers"));
+ assertEquals(1, kafkaProducerProperties.get("linger.ms"));
+ assertEquals(null, kafkaProducerProperties.get("group.id"));
+ assertEquals("Value0", kafkaProducerProperties.get("Property0"));
+ assertEquals("Value1", kafkaProducerProperties.get("Property1"));
+ assertEquals(null, kafkaProducerProperties.get("Property2"));
}
@Test
public void testGetKafkaConsumerProperties() {
+ KafkaCarrierTechnologyParameters kafkaCarrierTechnologyParameters = new KafkaCarrierTechnologyParameters();
+
+ Properties kafkaConsumerProperties = kafkaCarrierTechnologyParameters.getKafkaConsumerProperties();
assertNotNull(kafkaConsumerProperties);
+ assertEquals("localhost:9092", kafkaConsumerProperties.get("bootstrap.servers"));
+ assertEquals("default-group-id", kafkaConsumerProperties.get("group.id"));
+ assertEquals(null, kafkaConsumerProperties.get("linger.ms"));
+ assertEquals(null, kafkaConsumerProperties.get("Property0"));
+ assertEquals(null, kafkaConsumerProperties.get("Property1"));
+ assertEquals(null, kafkaConsumerProperties.get("Property2"));
+
+ // @formatter:off
+ String[][] kafkaProperties = {
+ {
+ "Property0", "Value0"
+ },
+ {
+ "Property1", "Value1"
+ }
+ };
+ // @formatter:on
+
+ kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties);
+ kafkaConsumerProperties = kafkaCarrierTechnologyParameters.getKafkaConsumerProperties();
+ assertNotNull(kafkaConsumerProperties);
+ assertEquals("localhost:9092", kafkaConsumerProperties.get("bootstrap.servers"));
+ assertEquals("default-group-id", kafkaConsumerProperties.get("group.id"));
+ assertEquals(null, kafkaConsumerProperties.get("linger.ms"));
+ assertEquals("Value0", kafkaConsumerProperties.get("Property0"));
+ assertEquals("Value1", kafkaConsumerProperties.get("Property1"));
+ assertEquals(null, kafkaConsumerProperties.get("Property2"));
}
@Test
public void testValidate() {
- result = kafkaCarrierTechnologyParameters.validate();
- assertNotNull(result);
+ KafkaCarrierTechnologyParameters kafkaCarrierTechnologyParameters = new KafkaCarrierTechnologyParameters();
+ assertNotNull(kafkaCarrierTechnologyParameters);
+
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+ String origStringValue = kafkaCarrierTechnologyParameters.getBootstrapServers();
+ kafkaCarrierTechnologyParameters.setBootstrapServers(" ");
+ assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+ kafkaCarrierTechnologyParameters.setBootstrapServers(origStringValue);
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+ origStringValue = kafkaCarrierTechnologyParameters.getAcks();
+ kafkaCarrierTechnologyParameters.setAcks(" ");
+ assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+ kafkaCarrierTechnologyParameters.setAcks(origStringValue);
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+ origStringValue = kafkaCarrierTechnologyParameters.getGroupId();
+ kafkaCarrierTechnologyParameters.setGroupId(" ");
+ assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+ kafkaCarrierTechnologyParameters.setGroupId(origStringValue);
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+ origStringValue = kafkaCarrierTechnologyParameters.getProducerTopic();
+ kafkaCarrierTechnologyParameters.setProducerTopic(" ");
+ assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+ kafkaCarrierTechnologyParameters.setProducerTopic(origStringValue);
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+ origStringValue = kafkaCarrierTechnologyParameters.getPartitionerClass();
+ kafkaCarrierTechnologyParameters.setPartitionerClass(" ");
+ assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+ kafkaCarrierTechnologyParameters.setPartitionerClass(origStringValue);
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+ int origIntValue = kafkaCarrierTechnologyParameters.getRetries();
+ kafkaCarrierTechnologyParameters.setRetries(-1);
+ assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+ kafkaCarrierTechnologyParameters.setRetries(origIntValue);
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+ origIntValue = kafkaCarrierTechnologyParameters.getBatchSize();
+ kafkaCarrierTechnologyParameters.setBatchSize(-1);
+ assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+ kafkaCarrierTechnologyParameters.setBatchSize(origIntValue);
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+ origIntValue = kafkaCarrierTechnologyParameters.getLingerTime();
+ kafkaCarrierTechnologyParameters.setLingerTime(-1);
+ assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+ kafkaCarrierTechnologyParameters.setLingerTime(origIntValue);
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+ long origLongValue = kafkaCarrierTechnologyParameters.getBufferMemory();
+ kafkaCarrierTechnologyParameters.setBufferMemory(-1);
+ assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+ kafkaCarrierTechnologyParameters.setBufferMemory(origLongValue);
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+ origIntValue = kafkaCarrierTechnologyParameters.getAutoCommitTime();
+ kafkaCarrierTechnologyParameters.setAutoCommitTime(-1);
+ assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+ kafkaCarrierTechnologyParameters.setAutoCommitTime(origIntValue);
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+ origIntValue = kafkaCarrierTechnologyParameters.getSessionTimeout();
+ kafkaCarrierTechnologyParameters.setSessionTimeout(-1);
+ assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+ kafkaCarrierTechnologyParameters.setSessionTimeout(origIntValue);
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+ origIntValue = kafkaCarrierTechnologyParameters.getConsumerPollTime();
+ kafkaCarrierTechnologyParameters.setConsumerPollTime(-1);
+ assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+ kafkaCarrierTechnologyParameters.setConsumerPollTime(origIntValue);
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+ origStringValue = kafkaCarrierTechnologyParameters.getKeySerializer();
+ kafkaCarrierTechnologyParameters.setKeySerializer(" ");
+ assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+ kafkaCarrierTechnologyParameters.setKeySerializer(origStringValue);
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+ origStringValue = kafkaCarrierTechnologyParameters.getValueSerializer();
+ kafkaCarrierTechnologyParameters.setValueSerializer(" ");
+ assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+ kafkaCarrierTechnologyParameters.setValueSerializer(origStringValue);
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+ origStringValue = kafkaCarrierTechnologyParameters.getKeyDeserializer();
+ kafkaCarrierTechnologyParameters.setKeyDeserializer(" ");
+ assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+ kafkaCarrierTechnologyParameters.setKeyDeserializer(origStringValue);
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+ origStringValue = kafkaCarrierTechnologyParameters.getValueDeserializer();
+ kafkaCarrierTechnologyParameters.setValueDeserializer(" ");
+ assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+ kafkaCarrierTechnologyParameters.setValueDeserializer(origStringValue);
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+ String[] origConsumerTopcList = kafkaCarrierTechnologyParameters.getConsumerTopicList();
+ kafkaCarrierTechnologyParameters.setConsumerTopicList(null);
+ assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+ kafkaCarrierTechnologyParameters.setConsumerTopicList(origConsumerTopcList);
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+ kafkaCarrierTechnologyParameters.setConsumerTopicList(new String[0]);
+ assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+ kafkaCarrierTechnologyParameters.setConsumerTopicList(origConsumerTopcList);
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+ String[] blankStringList = {null, ""};
+ kafkaCarrierTechnologyParameters.setConsumerTopicList(blankStringList);
+ assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+ kafkaCarrierTechnologyParameters.setConsumerTopicList(origConsumerTopcList);
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+ String[][] origKafkaProperties = kafkaCarrierTechnologyParameters.getKafkaProperties();
+ kafkaCarrierTechnologyParameters.setKafkaProperties(null);
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+ kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+ kafkaCarrierTechnologyParameters.setKafkaProperties(new String[0][0]);
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+ kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+ // @formatter:off
+ String[][] kafkaProperties0 = {
+ {
+ null, "Value0"
+ }
+ };
+ // @formatter:on
+
+ kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties0);
+ assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+ kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+ // @formatter:off
+ String[][] kafkaProperties1 = {
+ {
+ "Property1", null
+ }
+ };
+ // @formatter:on
+
+ kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties1);
+ assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+ kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+ // @formatter:off
+ String[][] kafkaProperties2 = {
+ {
+ "Property1", null
+ }
+ };
+ // @formatter:on
+
+ kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties2);
+ assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+ kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+ // @formatter:off
+ String[][] kafkaProperties3 = {
+ {
+ "Property1", "Value0", "Value1"
+ }
+ };
+ // @formatter:on
+
+ kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties3);
+ assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+ kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
}
}