From 5f029543f1e673655af2d2974113069df0b6def0 Mon Sep 17 00:00:00 2001 From: liamfallon Date: Tue, 25 Jun 2019 16:10:25 +0000 Subject: Allow Kafka plugin to take arbitrary properties This change adds support for arbitrary Kafka properties to be passed to Kafka through the Kafka plugin. Issue-ID: POLICY-1818 Change-Id: I4389876286747b250c8abe492e9e31674a9483c9 Signed-off-by: liamfallon --- .../KafkaCarrierTechnologyParametersTest.java | 270 +++++++++++++++++++-- 1 file changed, 248 insertions(+), 22 deletions(-) (limited to 'plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org') diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParametersTest.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParametersTest.java index 6eca1dcf0..2f5405ba8 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParametersTest.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParametersTest.java @@ -20,50 +20,276 @@ package org.onap.policy.apex.plugins.event.carrier.kafka; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import java.util.Properties; -import org.junit.Before; + import org.junit.Test; -import org.onap.policy.common.parameters.GroupValidationResult; public class KafkaCarrierTechnologyParametersTest { - - KafkaCarrierTechnologyParameters kafkaCarrierTechnologyParameters = null; - Properties kafkaProducerProperties = null; - Properties kafkaConsumerProperties = null; - GroupValidationResult result = null; - - /** - * Set up testing. - * - * @throws Exception on setup errors - */ - @Before - public void setUp() throws Exception { - kafkaCarrierTechnologyParameters = new KafkaCarrierTechnologyParameters(); - kafkaProducerProperties = kafkaCarrierTechnologyParameters.getKafkaProducerProperties(); - kafkaConsumerProperties = kafkaCarrierTechnologyParameters.getKafkaConsumerProperties(); - } - @Test public void testKafkaCarrierTechnologyParameters() { + KafkaCarrierTechnologyParameters kafkaCarrierTechnologyParameters = new KafkaCarrierTechnologyParameters(); assertNotNull(kafkaCarrierTechnologyParameters); + + assertEquals("localhost:9092", kafkaCarrierTechnologyParameters.getBootstrapServers()); } @Test public void testGetKafkaProducerProperties() { + KafkaCarrierTechnologyParameters kafkaCarrierTechnologyParameters = new KafkaCarrierTechnologyParameters(); + + Properties kafkaProducerProperties = kafkaCarrierTechnologyParameters.getKafkaProducerProperties(); + assertNotNull(kafkaProducerProperties); + assertEquals("localhost:9092", kafkaProducerProperties.get("bootstrap.servers")); + assertEquals(1, kafkaProducerProperties.get("linger.ms")); + assertEquals(null, kafkaProducerProperties.get("group.id")); + assertEquals(null, kafkaProducerProperties.get("Property0")); + assertEquals(null, kafkaProducerProperties.get("Property1")); + assertEquals(null, kafkaProducerProperties.get("Property2")); + + // @formatter:off + String[][] kafkaProperties = { + { + "Property0", "Value0" + }, + { + "Property1", "Value1" + } + }; + // @formatter:on + + kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties); + kafkaProducerProperties = kafkaCarrierTechnologyParameters.getKafkaProducerProperties(); assertNotNull(kafkaProducerProperties); + assertEquals("localhost:9092", kafkaProducerProperties.get("bootstrap.servers")); + assertEquals(1, kafkaProducerProperties.get("linger.ms")); + assertEquals(null, kafkaProducerProperties.get("group.id")); + assertEquals("Value0", kafkaProducerProperties.get("Property0")); + assertEquals("Value1", kafkaProducerProperties.get("Property1")); + assertEquals(null, kafkaProducerProperties.get("Property2")); } @Test public void testGetKafkaConsumerProperties() { + KafkaCarrierTechnologyParameters kafkaCarrierTechnologyParameters = new KafkaCarrierTechnologyParameters(); + + Properties kafkaConsumerProperties = kafkaCarrierTechnologyParameters.getKafkaConsumerProperties(); assertNotNull(kafkaConsumerProperties); + assertEquals("localhost:9092", kafkaConsumerProperties.get("bootstrap.servers")); + assertEquals("default-group-id", kafkaConsumerProperties.get("group.id")); + assertEquals(null, kafkaConsumerProperties.get("linger.ms")); + assertEquals(null, kafkaConsumerProperties.get("Property0")); + assertEquals(null, kafkaConsumerProperties.get("Property1")); + assertEquals(null, kafkaConsumerProperties.get("Property2")); + + // @formatter:off + String[][] kafkaProperties = { + { + "Property0", "Value0" + }, + { + "Property1", "Value1" + } + }; + // @formatter:on + + kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties); + kafkaConsumerProperties = kafkaCarrierTechnologyParameters.getKafkaConsumerProperties(); + assertNotNull(kafkaConsumerProperties); + assertEquals("localhost:9092", kafkaConsumerProperties.get("bootstrap.servers")); + assertEquals("default-group-id", kafkaConsumerProperties.get("group.id")); + assertEquals(null, kafkaConsumerProperties.get("linger.ms")); + assertEquals("Value0", kafkaConsumerProperties.get("Property0")); + assertEquals("Value1", kafkaConsumerProperties.get("Property1")); + assertEquals(null, kafkaConsumerProperties.get("Property2")); } @Test public void testValidate() { - result = kafkaCarrierTechnologyParameters.validate(); - assertNotNull(result); + KafkaCarrierTechnologyParameters kafkaCarrierTechnologyParameters = new KafkaCarrierTechnologyParameters(); + assertNotNull(kafkaCarrierTechnologyParameters); + + assertTrue(kafkaCarrierTechnologyParameters.validate().isValid()); + + String origStringValue = kafkaCarrierTechnologyParameters.getBootstrapServers(); + kafkaCarrierTechnologyParameters.setBootstrapServers(" "); + assertFalse(kafkaCarrierTechnologyParameters.validate().isValid()); + kafkaCarrierTechnologyParameters.setBootstrapServers(origStringValue); + assertTrue(kafkaCarrierTechnologyParameters.validate().isValid()); + + origStringValue = kafkaCarrierTechnologyParameters.getAcks(); + kafkaCarrierTechnologyParameters.setAcks(" "); + assertFalse(kafkaCarrierTechnologyParameters.validate().isValid()); + kafkaCarrierTechnologyParameters.setAcks(origStringValue); + assertTrue(kafkaCarrierTechnologyParameters.validate().isValid()); + + origStringValue = kafkaCarrierTechnologyParameters.getGroupId(); + kafkaCarrierTechnologyParameters.setGroupId(" "); + assertFalse(kafkaCarrierTechnologyParameters.validate().isValid()); + kafkaCarrierTechnologyParameters.setGroupId(origStringValue); + assertTrue(kafkaCarrierTechnologyParameters.validate().isValid()); + + origStringValue = kafkaCarrierTechnologyParameters.getProducerTopic(); + kafkaCarrierTechnologyParameters.setProducerTopic(" "); + assertFalse(kafkaCarrierTechnologyParameters.validate().isValid()); + kafkaCarrierTechnologyParameters.setProducerTopic(origStringValue); + assertTrue(kafkaCarrierTechnologyParameters.validate().isValid()); + + origStringValue = kafkaCarrierTechnologyParameters.getPartitionerClass(); + kafkaCarrierTechnologyParameters.setPartitionerClass(" "); + assertFalse(kafkaCarrierTechnologyParameters.validate().isValid()); + kafkaCarrierTechnologyParameters.setPartitionerClass(origStringValue); + assertTrue(kafkaCarrierTechnologyParameters.validate().isValid()); + + int origIntValue = kafkaCarrierTechnologyParameters.getRetries(); + kafkaCarrierTechnologyParameters.setRetries(-1); + assertFalse(kafkaCarrierTechnologyParameters.validate().isValid()); + kafkaCarrierTechnologyParameters.setRetries(origIntValue); + assertTrue(kafkaCarrierTechnologyParameters.validate().isValid()); + + origIntValue = kafkaCarrierTechnologyParameters.getBatchSize(); + kafkaCarrierTechnologyParameters.setBatchSize(-1); + assertFalse(kafkaCarrierTechnologyParameters.validate().isValid()); + kafkaCarrierTechnologyParameters.setBatchSize(origIntValue); + assertTrue(kafkaCarrierTechnologyParameters.validate().isValid()); + + origIntValue = kafkaCarrierTechnologyParameters.getLingerTime(); + kafkaCarrierTechnologyParameters.setLingerTime(-1); + assertFalse(kafkaCarrierTechnologyParameters.validate().isValid()); + kafkaCarrierTechnologyParameters.setLingerTime(origIntValue); + assertTrue(kafkaCarrierTechnologyParameters.validate().isValid()); + + long origLongValue = kafkaCarrierTechnologyParameters.getBufferMemory(); + kafkaCarrierTechnologyParameters.setBufferMemory(-1); + assertFalse(kafkaCarrierTechnologyParameters.validate().isValid()); + kafkaCarrierTechnologyParameters.setBufferMemory(origLongValue); + assertTrue(kafkaCarrierTechnologyParameters.validate().isValid()); + + origIntValue = kafkaCarrierTechnologyParameters.getAutoCommitTime(); + kafkaCarrierTechnologyParameters.setAutoCommitTime(-1); + assertFalse(kafkaCarrierTechnologyParameters.validate().isValid()); + kafkaCarrierTechnologyParameters.setAutoCommitTime(origIntValue); + assertTrue(kafkaCarrierTechnologyParameters.validate().isValid()); + + origIntValue = kafkaCarrierTechnologyParameters.getSessionTimeout(); + kafkaCarrierTechnologyParameters.setSessionTimeout(-1); + assertFalse(kafkaCarrierTechnologyParameters.validate().isValid()); + kafkaCarrierTechnologyParameters.setSessionTimeout(origIntValue); + assertTrue(kafkaCarrierTechnologyParameters.validate().isValid()); + + origIntValue = kafkaCarrierTechnologyParameters.getConsumerPollTime(); + kafkaCarrierTechnologyParameters.setConsumerPollTime(-1); + assertFalse(kafkaCarrierTechnologyParameters.validate().isValid()); + kafkaCarrierTechnologyParameters.setConsumerPollTime(origIntValue); + assertTrue(kafkaCarrierTechnologyParameters.validate().isValid()); + + origStringValue = kafkaCarrierTechnologyParameters.getKeySerializer(); + kafkaCarrierTechnologyParameters.setKeySerializer(" "); + assertFalse(kafkaCarrierTechnologyParameters.validate().isValid()); + kafkaCarrierTechnologyParameters.setKeySerializer(origStringValue); + assertTrue(kafkaCarrierTechnologyParameters.validate().isValid()); + + origStringValue = kafkaCarrierTechnologyParameters.getValueSerializer(); + kafkaCarrierTechnologyParameters.setValueSerializer(" "); + assertFalse(kafkaCarrierTechnologyParameters.validate().isValid()); + kafkaCarrierTechnologyParameters.setValueSerializer(origStringValue); + assertTrue(kafkaCarrierTechnologyParameters.validate().isValid()); + + origStringValue = kafkaCarrierTechnologyParameters.getKeyDeserializer(); + kafkaCarrierTechnologyParameters.setKeyDeserializer(" "); + assertFalse(kafkaCarrierTechnologyParameters.validate().isValid()); + kafkaCarrierTechnologyParameters.setKeyDeserializer(origStringValue); + assertTrue(kafkaCarrierTechnologyParameters.validate().isValid()); + + origStringValue = kafkaCarrierTechnologyParameters.getValueDeserializer(); + kafkaCarrierTechnologyParameters.setValueDeserializer(" "); + assertFalse(kafkaCarrierTechnologyParameters.validate().isValid()); + kafkaCarrierTechnologyParameters.setValueDeserializer(origStringValue); + assertTrue(kafkaCarrierTechnologyParameters.validate().isValid()); + + String[] origConsumerTopcList = kafkaCarrierTechnologyParameters.getConsumerTopicList(); + kafkaCarrierTechnologyParameters.setConsumerTopicList(null); + assertFalse(kafkaCarrierTechnologyParameters.validate().isValid()); + kafkaCarrierTechnologyParameters.setConsumerTopicList(origConsumerTopcList); + assertTrue(kafkaCarrierTechnologyParameters.validate().isValid()); + + kafkaCarrierTechnologyParameters.setConsumerTopicList(new String[0]); + assertFalse(kafkaCarrierTechnologyParameters.validate().isValid()); + kafkaCarrierTechnologyParameters.setConsumerTopicList(origConsumerTopcList); + assertTrue(kafkaCarrierTechnologyParameters.validate().isValid()); + + String[] blankStringList = {null, ""}; + kafkaCarrierTechnologyParameters.setConsumerTopicList(blankStringList); + assertFalse(kafkaCarrierTechnologyParameters.validate().isValid()); + kafkaCarrierTechnologyParameters.setConsumerTopicList(origConsumerTopcList); + assertTrue(kafkaCarrierTechnologyParameters.validate().isValid()); + + String[][] origKafkaProperties = kafkaCarrierTechnologyParameters.getKafkaProperties(); + kafkaCarrierTechnologyParameters.setKafkaProperties(null); + assertTrue(kafkaCarrierTechnologyParameters.validate().isValid()); + kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties); + assertTrue(kafkaCarrierTechnologyParameters.validate().isValid()); + + kafkaCarrierTechnologyParameters.setKafkaProperties(new String[0][0]); + assertTrue(kafkaCarrierTechnologyParameters.validate().isValid()); + kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties); + assertTrue(kafkaCarrierTechnologyParameters.validate().isValid()); + + // @formatter:off + String[][] kafkaProperties0 = { + { + null, "Value0" + } + }; + // @formatter:on + + kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties0); + assertFalse(kafkaCarrierTechnologyParameters.validate().isValid()); + kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties); + assertTrue(kafkaCarrierTechnologyParameters.validate().isValid()); + + // @formatter:off + String[][] kafkaProperties1 = { + { + "Property1", null + } + }; + // @formatter:on + + kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties1); + assertFalse(kafkaCarrierTechnologyParameters.validate().isValid()); + kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties); + assertTrue(kafkaCarrierTechnologyParameters.validate().isValid()); + + // @formatter:off + String[][] kafkaProperties2 = { + { + "Property1", null + } + }; + // @formatter:on + + kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties2); + assertFalse(kafkaCarrierTechnologyParameters.validate().isValid()); + kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties); + assertTrue(kafkaCarrierTechnologyParameters.validate().isValid()); + + // @formatter:off + String[][] kafkaProperties3 = { + { + "Property1", "Value0", "Value1" + } + }; + // @formatter:on + + kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties3); + assertFalse(kafkaCarrierTechnologyParameters.validate().isValid()); + kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties); + assertTrue(kafkaCarrierTechnologyParameters.validate().isValid()); + } } -- cgit 1.2.3-korg