aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorliamfallon <liam.fallon@est.tech>2019-06-25 16:10:25 +0000
committerliamfallon <liam.fallon@est.tech>2019-06-25 16:10:25 +0000
commit5f029543f1e673655af2d2974113069df0b6def0 (patch)
tree80b84e0e554a6625869847a4b10090c9fa1334c8
parentb8316b12e08452cda8322945ca59763e16a37a71 (diff)
Allow Kafka plugin to take arbitrary properties2.2.0
This change adds support for arbitrary Kafka properties to be passed to Kafka through the Kafka plugin. Issue-ID: POLICY-1818 Change-Id: I4389876286747b250c8abe492e9e31674a9483c9 Signed-off-by: liamfallon <liam.fallon@est.tech>
-rw-r--r--plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java6
-rw-r--r--plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java386
-rw-r--r--plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParametersTest.java270
3 files changed, 353 insertions, 309 deletions
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java
index affd10ccb..6860bca22 100644
--- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java
+++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java
@@ -92,7 +92,7 @@ public class ApexKafkaConsumer implements ApexEventConsumer, Runnable {
// Kick off the Kafka consumer
kafkaConsumer = new KafkaConsumer<>(kafkaConsumerProperties.getKafkaConsumerProperties());
- kafkaConsumer.subscribe(kafkaConsumerProperties.getConsumerTopicList());
+ kafkaConsumer.subscribe(kafkaConsumerProperties.getConsumerTopicListAsCollection());
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("event receiver for " + this.getClass().getName() + ":" + this.name + " subscribed to topics: "
+ kafkaConsumerProperties.getConsumerTopicList());
@@ -154,7 +154,7 @@ public class ApexKafkaConsumer implements ApexEventConsumer, Runnable {
public void run() {
// Kick off the Kafka consumer
kafkaConsumer = new KafkaConsumer<>(kafkaConsumerProperties.getKafkaConsumerProperties());
- kafkaConsumer.subscribe(kafkaConsumerProperties.getConsumerTopicList());
+ kafkaConsumer.subscribe(kafkaConsumerProperties.getConsumerTopicListAsCollection());
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("event receiver for " + this.getClass().getName() + ":" + this.name + " subscribed to topics: "
+ kafkaConsumerProperties.getConsumerTopicList());
@@ -181,7 +181,7 @@ public class ApexKafkaConsumer implements ApexEventConsumer, Runnable {
/**
* Trace a record if trace is enabled.
- *
+ *
* @param record the record to trace
*/
private void traceIfTraceEnabled(final ConsumerRecord<String, String> record) {
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java
index 36947ee13..f66dbfe9e 100644
--- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java
+++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java
@@ -25,16 +25,24 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Properties;
+import lombok.Getter;
+import lombok.Setter;
+
+import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
import org.onap.policy.apex.service.parameters.carriertechnology.CarrierTechnologyParameters;
import org.onap.policy.common.parameters.GroupValidationResult;
import org.onap.policy.common.parameters.ValidationStatus;
+import org.onap.policy.common.parameters.annotations.Min;
+import org.onap.policy.common.parameters.annotations.NotBlank;
/**
* Apex parameters for Kafka as an event carrier technology.
*
* @author Liam Fallon (liam.fallon@ericsson.com)
*/
+@Getter
+@Setter
public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameters {
// @formatter:off
/** The label of this carrier technology. */
@@ -48,6 +56,8 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter
// Repeated strings in messages
private static final String SPECIFY_AS_STRING_MESSAGE = "not specified, must be specified as a string";
+ private static final String ENTRY = "entry ";
+ private static final String KAFKA_PROPERTIES = "kafkaProperties";
// Default parameter values
private static final String DEFAULT_ACKS = "all";
@@ -85,24 +95,44 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter
private static final String PROPERTY_PARTITIONER_CLASS = "partitioner.class";
// kafka carrier parameters
+ @NotBlank
private String bootstrapServers = DEFAULT_BOOT_SERVERS;
+ @NotBlank
private String acks = DEFAULT_ACKS;
+ @Min(value = 0)
private int retries = DEFAULT_RETRIES;
+ @Min(value = 0)
private int batchSize = DEFAULT_BATCH_SIZE;
+ @Min(value = 0)
private int lingerTime = DEFAULT_LINGER_TIME;
+ @Min(value = 0)
private long bufferMemory = DEFAULT_BUFFER_MEMORY;
+ @NotBlank
private String groupId = DEFAULT_GROUP_ID;
private boolean enableAutoCommit = DEFAULT_ENABLE_AUTOCMIT;
+ @Min(value = 0)
private int autoCommitTime = DEFAULT_AUTO_COMMIT_TIME;
+ @Min(value = 0)
private int sessionTimeout = DEFAULT_SESSION_TIMEOUT;
+ @NotBlank
private String producerTopic = DEFAULT_PROD_TOPIC;
+ @Min(value = 0)
private int consumerPollTime = DEFAULT_CONS_POLL_TIME;
private String[] consumerTopicList = DEFAULT_CONS_TOPICLIST;
+ @NotBlank
private String keySerializer = DEFAULT_STRING_SERZER;
+ @NotBlank
private String valueSerializer = DEFAULT_STRING_SERZER;
+ @NotBlank
private String keyDeserializer = DEFAULT_STRING_DESZER;
+ @NotBlank
private String valueDeserializer = DEFAULT_STRING_DESZER;
+ @NotBlank
private String partitionerClass = DEFAULT_PARTITIONR_CLASS;
+
+ // All Kafka properties can be specified as an array of key-value pairs
+ private String[][] kafkaProperties = null;
+
// @formatter:on
/**
@@ -124,19 +154,26 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter
* @return the kafka producer properties
*/
public Properties getKafkaProducerProperties() {
- final Properties kafkaProperties = new Properties();
-
- kafkaProperties.put(PROPERTY_BOOTSTRAP_SERVERS, bootstrapServers);
- kafkaProperties.put(PROPERTY_ACKS, acks);
- kafkaProperties.put(PROPERTY_RETRIES, retries);
- kafkaProperties.put(PROPERTY_BATCH_SIZE, batchSize);
- kafkaProperties.put(PROPERTY_LINGER_TIME, lingerTime);
- kafkaProperties.put(PROPERTY_BUFFER_MEMORY, bufferMemory);
- kafkaProperties.put(PROPERTY_KEY_SERIALIZER, keySerializer);
- kafkaProperties.put(PROPERTY_VALUE_SERIALIZER, valueSerializer);
- kafkaProperties.put(PROPERTY_PARTITIONER_CLASS, partitionerClass);
-
- return kafkaProperties;
+ final Properties returnKafkaProperties = new Properties();
+
+ // Add properties from the Kafka property array
+ if (kafkaProperties != null) {
+ for (int i = 0; i < kafkaProperties.length; i++) {
+ returnKafkaProperties.put(kafkaProperties[i][0], kafkaProperties[i][1]);
+ }
+ }
+
+ returnKafkaProperties.put(PROPERTY_BOOTSTRAP_SERVERS, bootstrapServers);
+ returnKafkaProperties.put(PROPERTY_ACKS, acks);
+ returnKafkaProperties.put(PROPERTY_RETRIES, retries);
+ returnKafkaProperties.put(PROPERTY_BATCH_SIZE, batchSize);
+ returnKafkaProperties.put(PROPERTY_LINGER_TIME, lingerTime);
+ returnKafkaProperties.put(PROPERTY_BUFFER_MEMORY, bufferMemory);
+ returnKafkaProperties.put(PROPERTY_KEY_SERIALIZER, keySerializer);
+ returnKafkaProperties.put(PROPERTY_VALUE_SERIALIZER, valueSerializer);
+ returnKafkaProperties.put(PROPERTY_PARTITIONER_CLASS, partitionerClass);
+
+ return returnKafkaProperties;
}
/**
@@ -145,125 +182,33 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter
* @return the kafka consumer properties
*/
public Properties getKafkaConsumerProperties() {
- final Properties kafkaProperties = new Properties();
-
- kafkaProperties.put(PROPERTY_BOOTSTRAP_SERVERS, bootstrapServers);
- kafkaProperties.put(PROPERTY_GROUP_ID, groupId);
- kafkaProperties.put(PROPERTY_ENABLE_AUTO_COMMIT, enableAutoCommit);
- kafkaProperties.put(PROPERTY_AUTO_COMMIT_TIME, autoCommitTime);
- kafkaProperties.put(PROPERTY_SESSION_TIMEOUT, sessionTimeout);
- kafkaProperties.put(PROPERTY_KEY_DESERIALIZER, keyDeserializer);
- kafkaProperties.put(PROPERTY_VALUE_DESERIALIZER, valueDeserializer);
-
- return kafkaProperties;
- }
-
- /**
- * Gets the bootstrap servers.
- *
- * @return the bootstrap servers
- */
- public String getBootstrapServers() {
- return bootstrapServers;
- }
-
- /**
- * Gets the acks.
- *
- * @return the acks
- */
- public String getAcks() {
- return acks;
- }
+ final Properties returnKafkaProperties = new Properties();
- /**
- * Gets the retries.
- *
- * @return the retries
- */
- public int getRetries() {
- return retries;
- }
-
- /**
- * Gets the batch size.
- *
- * @return the batch size
- */
- public int getBatchSize() {
- return batchSize;
- }
-
- /**
- * Gets the linger time.
- *
- * @return the linger time
- */
- public int getLingerTime() {
- return lingerTime;
- }
-
- /**
- * Gets the buffer memory.
- *
- * @return the buffer memory
- */
- public long getBufferMemory() {
- return bufferMemory;
- }
-
- /**
- * Gets the group id.
- *
- * @return the group id
- */
- public String getGroupId() {
- return groupId;
- }
-
- /**
- * Checks if is enable auto commit.
- *
- * @return true, if checks if is enable auto commit
- */
- public boolean isEnableAutoCommit() {
- return enableAutoCommit;
- }
-
- /**
- * Gets the auto commit time.
- *
- * @return the auto commit time
- */
- public int getAutoCommitTime() {
- return autoCommitTime;
- }
+ // Add properties from the Kafka property array
+ if (kafkaProperties != null) {
+ for (int i = 0; i < kafkaProperties.length; i++) {
+ returnKafkaProperties.put(kafkaProperties[i][0], kafkaProperties[i][1]);
+ }
+ }
- /**
- * Gets the session timeout.
- *
- * @return the session timeout
- */
- public int getSessionTimeout() {
- return sessionTimeout;
- }
+ returnKafkaProperties.put(PROPERTY_BOOTSTRAP_SERVERS, bootstrapServers);
+ returnKafkaProperties.put(PROPERTY_GROUP_ID, groupId);
+ returnKafkaProperties.put(PROPERTY_ENABLE_AUTO_COMMIT, enableAutoCommit);
+ returnKafkaProperties.put(PROPERTY_AUTO_COMMIT_TIME, autoCommitTime);
+ returnKafkaProperties.put(PROPERTY_SESSION_TIMEOUT, sessionTimeout);
+ returnKafkaProperties.put(PROPERTY_KEY_DESERIALIZER, keyDeserializer);
+ returnKafkaProperties.put(PROPERTY_VALUE_DESERIALIZER, valueDeserializer);
- /**
- * Gets the producer topic.
- *
- * @return the producer topic
- */
- public String getProducerTopic() {
- return producerTopic;
+ return returnKafkaProperties;
}
/**
- * Gets the consumer poll time.
+ * Gets the consumer topic list.
*
- * @return the consumer poll time
+ * @return the consumer topic list
*/
- public long getConsumerPollTime() {
- return consumerPollTime;
+ public Collection<String> getConsumerTopicListAsCollection() {
+ return Arrays.asList(consumerTopicList);
}
/**
@@ -274,60 +219,6 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter
return Duration.ofMillis(consumerPollTime);
}
- /**
- * Gets the consumer topic list.
- *
- * @return the consumer topic list
- */
- public Collection<String> getConsumerTopicList() {
- return Arrays.asList(consumerTopicList);
- }
-
- /**
- * Gets the key serializer.
- *
- * @return the key serializer
- */
- public String getKeySerializer() {
- return keySerializer;
- }
-
- /**
- * Gets the value serializer.
- *
- * @return the value serializer
- */
- public String getValueSerializer() {
- return valueSerializer;
- }
-
- /**
- * Gets the key deserializer.
- *
- * @return the key deserializer
- */
- public String getKeyDeserializer() {
- return keyDeserializer;
- }
-
- /**
- * Gets the value deserializer.
- *
- * @return the value deserializer
- */
- public String getValueDeserializer() {
- return valueDeserializer;
- }
-
- /**
- * Gets the value deserializer.
- *
- * @return the value deserializer
- */
- public String getPartitionerClass() {
- return partitionerClass;
- }
-
/*
* (non-Javadoc)
*
@@ -337,136 +228,63 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter
public GroupValidationResult validate() {
final GroupValidationResult result = super.validate();
- validateStringParameters(result);
-
- validateNumericParameters(result);
-
validateConsumerTopicList(result);
- validateSerializersAndDeserializers(result);
+ validateKafkaProperties(result);
return result;
}
/**
- * Validate that string parameters are correct.
- *
- * @param result the result of the validation
- */
- private void validateStringParameters(final GroupValidationResult result) {
- if (isNullOrBlank(bootstrapServers)) {
- result.setResult("bootstrapServers", ValidationStatus.INVALID,
- "not specified, must be specified as a string of form host:port");
- }
-
- if (isNullOrBlank(acks)) {
- result.setResult("acks", ValidationStatus.INVALID,
- "not specified, must be specified as a string with values [0|1|all]");
- }
-
- if (isNullOrBlank(groupId)) {
- result.setResult("groupId", ValidationStatus.INVALID, SPECIFY_AS_STRING_MESSAGE);
- }
-
- if (isNullOrBlank(producerTopic)) {
- result.setResult("producerTopic", ValidationStatus.INVALID,
- SPECIFY_AS_STRING_MESSAGE);
- }
-
- if (isNullOrBlank(partitionerClass)) {
- result.setResult("partitionerClass", ValidationStatus.INVALID,
- SPECIFY_AS_STRING_MESSAGE);
- }
- }
-
- /**
- * Check if numeric parameters are valid.
- *
- * @param result the result of the validation
- */
- private void validateNumericParameters(final GroupValidationResult result) {
- if (retries < 0) {
- result.setResult(PROPERTY_RETRIES, ValidationStatus.INVALID,
- "[" + retries + "] invalid, must be specified as retries >= 0");
- }
-
- if (batchSize < 0) {
- result.setResult("batchSize", ValidationStatus.INVALID,
- "[" + batchSize + "] invalid, must be specified as batchSize >= 0");
- }
-
- if (lingerTime < 0) {
- result.setResult("lingerTime", ValidationStatus.INVALID,
- "[" + lingerTime + "] invalid, must be specified as lingerTime >= 0");
- }
-
- if (bufferMemory < 0) {
- result.setResult("bufferMemory", ValidationStatus.INVALID,
- "[" + bufferMemory + "] invalid, must be specified as bufferMemory >= 0");
- }
-
- if (autoCommitTime < 0) {
- result.setResult("autoCommitTime", ValidationStatus.INVALID,
- "[" + autoCommitTime + "] invalid, must be specified as autoCommitTime >= 0");
- }
-
- if (sessionTimeout < 0) {
- result.setResult("sessionTimeout", ValidationStatus.INVALID,
- "[" + sessionTimeout + "] invalid, must be specified as sessionTimeout >= 0");
- }
-
- if (consumerPollTime < 0) {
- result.setResult("consumerPollTime", ValidationStatus.INVALID,
- "[" + consumerPollTime + "] invalid, must be specified as consumerPollTime >= 0");
- }
- }
-
- /**
- * Validate the serializers and deserializers.
+ * Validate the consumer topic list.
*
* @param result the result of the validation.
*/
- private void validateSerializersAndDeserializers(final GroupValidationResult result) {
- if (isNullOrBlank(keySerializer)) {
- result.setResult("keySerializer", ValidationStatus.INVALID,
- SPECIFY_AS_STRING_MESSAGE);
- }
-
- if (isNullOrBlank(valueSerializer)) {
- result.setResult("valueSerializer", ValidationStatus.INVALID,
- SPECIFY_AS_STRING_MESSAGE);
- }
-
- if (isNullOrBlank(keyDeserializer)) {
- result.setResult("keyDeserializer", ValidationStatus.INVALID,
- SPECIFY_AS_STRING_MESSAGE);
- }
-
- if (isNullOrBlank(valueDeserializer)) {
- result.setResult("valueDeserializer", ValidationStatus.INVALID,
- SPECIFY_AS_STRING_MESSAGE);
- }
- }
-
private void validateConsumerTopicList(final GroupValidationResult result) {
if (consumerTopicList == null || consumerTopicList.length == 0) {
result.setResult("consumerTopicList", ValidationStatus.INVALID,
- "not specified, must be specified as a list of strings");
+ "not specified, must be specified as a list of strings");
+ return;
}
StringBuilder consumerTopicStringBuilder = new StringBuilder();
for (final String consumerTopic : consumerTopicList) {
- if (consumerTopic == null || consumerTopic.trim().length() == 0) {
+ if (StringUtils.isBlank(consumerTopic)) {
consumerTopicStringBuilder.append(consumerTopic + "/");
}
}
if (consumerTopicStringBuilder.length() > 0) {
result.setResult("consumerTopicList", ValidationStatus.INVALID,
- "invalid consumer topic list entries found: /" + consumerTopicStringBuilder.toString());
+ "invalid consumer topic list entries found: /" + consumerTopicStringBuilder.toString());
}
}
- private boolean isNullOrBlank(final String stringValue) {
- return stringValue == null || stringValue.trim().length() == 0;
+ /**
+ * Validate the kafka properties.
+ *
+ * @param result the result of the validation.
+ */
+ private void validateKafkaProperties(final GroupValidationResult result) {
+ // Kafka properties are optional
+ if (kafkaProperties == null || kafkaProperties.length == 0) {
+ return;
+ }
+
+ for (int i = 0; i < kafkaProperties.length; i++) {
+ if (kafkaProperties[i].length != 2) {
+ result.setResult(KAFKA_PROPERTIES, ValidationStatus.INVALID,
+ ENTRY + i + " invalid, kafka properties must be name-value pairs");
+ }
+
+ if (StringUtils.isBlank(kafkaProperties[i][0])) {
+ result.setResult(KAFKA_PROPERTIES, ValidationStatus.INVALID,
+ ENTRY + i + " invalid, key is null or blank");
+ }
+
+ if (StringUtils.isBlank(kafkaProperties[i][1])) {
+ result.setResult(KAFKA_PROPERTIES, ValidationStatus.INVALID,
+ ENTRY + i + " invalid, value is null or blank");
+ }
+ }
}
}
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParametersTest.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParametersTest.java
index 6eca1dcf0..2f5405ba8 100644
--- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParametersTest.java
+++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParametersTest.java
@@ -20,50 +20,276 @@
package org.onap.policy.apex.plugins.event.carrier.kafka;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
import java.util.Properties;
-import org.junit.Before;
+
import org.junit.Test;
-import org.onap.policy.common.parameters.GroupValidationResult;
public class KafkaCarrierTechnologyParametersTest {
-
- KafkaCarrierTechnologyParameters kafkaCarrierTechnologyParameters = null;
- Properties kafkaProducerProperties = null;
- Properties kafkaConsumerProperties = null;
- GroupValidationResult result = null;
-
- /**
- * Set up testing.
- *
- * @throws Exception on setup errors
- */
- @Before
- public void setUp() throws Exception {
- kafkaCarrierTechnologyParameters = new KafkaCarrierTechnologyParameters();
- kafkaProducerProperties = kafkaCarrierTechnologyParameters.getKafkaProducerProperties();
- kafkaConsumerProperties = kafkaCarrierTechnologyParameters.getKafkaConsumerProperties();
- }
-
@Test
public void testKafkaCarrierTechnologyParameters() {
+ KafkaCarrierTechnologyParameters kafkaCarrierTechnologyParameters = new KafkaCarrierTechnologyParameters();
assertNotNull(kafkaCarrierTechnologyParameters);
+
+ assertEquals("localhost:9092", kafkaCarrierTechnologyParameters.getBootstrapServers());
}
@Test
public void testGetKafkaProducerProperties() {
+ KafkaCarrierTechnologyParameters kafkaCarrierTechnologyParameters = new KafkaCarrierTechnologyParameters();
+
+ Properties kafkaProducerProperties = kafkaCarrierTechnologyParameters.getKafkaProducerProperties();
+ assertNotNull(kafkaProducerProperties);
+ assertEquals("localhost:9092", kafkaProducerProperties.get("bootstrap.servers"));
+ assertEquals(1, kafkaProducerProperties.get("linger.ms"));
+ assertEquals(null, kafkaProducerProperties.get("group.id"));
+ assertEquals(null, kafkaProducerProperties.get("Property0"));
+ assertEquals(null, kafkaProducerProperties.get("Property1"));
+ assertEquals(null, kafkaProducerProperties.get("Property2"));
+
+ // @formatter:off
+ String[][] kafkaProperties = {
+ {
+ "Property0", "Value0"
+ },
+ {
+ "Property1", "Value1"
+ }
+ };
+ // @formatter:on
+
+ kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties);
+ kafkaProducerProperties = kafkaCarrierTechnologyParameters.getKafkaProducerProperties();
assertNotNull(kafkaProducerProperties);
+ assertEquals("localhost:9092", kafkaProducerProperties.get("bootstrap.servers"));
+ assertEquals(1, kafkaProducerProperties.get("linger.ms"));
+ assertEquals(null, kafkaProducerProperties.get("group.id"));
+ assertEquals("Value0", kafkaProducerProperties.get("Property0"));
+ assertEquals("Value1", kafkaProducerProperties.get("Property1"));
+ assertEquals(null, kafkaProducerProperties.get("Property2"));
}
@Test
public void testGetKafkaConsumerProperties() {
+ KafkaCarrierTechnologyParameters kafkaCarrierTechnologyParameters = new KafkaCarrierTechnologyParameters();
+
+ Properties kafkaConsumerProperties = kafkaCarrierTechnologyParameters.getKafkaConsumerProperties();
assertNotNull(kafkaConsumerProperties);
+ assertEquals("localhost:9092", kafkaConsumerProperties.get("bootstrap.servers"));
+ assertEquals("default-group-id", kafkaConsumerProperties.get("group.id"));
+ assertEquals(null, kafkaConsumerProperties.get("linger.ms"));
+ assertEquals(null, kafkaConsumerProperties.get("Property0"));
+ assertEquals(null, kafkaConsumerProperties.get("Property1"));
+ assertEquals(null, kafkaConsumerProperties.get("Property2"));
+
+ // @formatter:off
+ String[][] kafkaProperties = {
+ {
+ "Property0", "Value0"
+ },
+ {
+ "Property1", "Value1"
+ }
+ };
+ // @formatter:on
+
+ kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties);
+ kafkaConsumerProperties = kafkaCarrierTechnologyParameters.getKafkaConsumerProperties();
+ assertNotNull(kafkaConsumerProperties);
+ assertEquals("localhost:9092", kafkaConsumerProperties.get("bootstrap.servers"));
+ assertEquals("default-group-id", kafkaConsumerProperties.get("group.id"));
+ assertEquals(null, kafkaConsumerProperties.get("linger.ms"));
+ assertEquals("Value0", kafkaConsumerProperties.get("Property0"));
+ assertEquals("Value1", kafkaConsumerProperties.get("Property1"));
+ assertEquals(null, kafkaConsumerProperties.get("Property2"));
}
@Test
public void testValidate() {
- result = kafkaCarrierTechnologyParameters.validate();
- assertNotNull(result);
+ KafkaCarrierTechnologyParameters kafkaCarrierTechnologyParameters = new KafkaCarrierTechnologyParameters();
+ assertNotNull(kafkaCarrierTechnologyParameters);
+
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+ String origStringValue = kafkaCarrierTechnologyParameters.getBootstrapServers();
+ kafkaCarrierTechnologyParameters.setBootstrapServers(" ");
+ assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+ kafkaCarrierTechnologyParameters.setBootstrapServers(origStringValue);
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+ origStringValue = kafkaCarrierTechnologyParameters.getAcks();
+ kafkaCarrierTechnologyParameters.setAcks(" ");
+ assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+ kafkaCarrierTechnologyParameters.setAcks(origStringValue);
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+ origStringValue = kafkaCarrierTechnologyParameters.getGroupId();
+ kafkaCarrierTechnologyParameters.setGroupId(" ");
+ assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+ kafkaCarrierTechnologyParameters.setGroupId(origStringValue);
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+ origStringValue = kafkaCarrierTechnologyParameters.getProducerTopic();
+ kafkaCarrierTechnologyParameters.setProducerTopic(" ");
+ assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+ kafkaCarrierTechnologyParameters.setProducerTopic(origStringValue);
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+ origStringValue = kafkaCarrierTechnologyParameters.getPartitionerClass();
+ kafkaCarrierTechnologyParameters.setPartitionerClass(" ");
+ assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+ kafkaCarrierTechnologyParameters.setPartitionerClass(origStringValue);
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+ int origIntValue = kafkaCarrierTechnologyParameters.getRetries();
+ kafkaCarrierTechnologyParameters.setRetries(-1);
+ assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+ kafkaCarrierTechnologyParameters.setRetries(origIntValue);
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+ origIntValue = kafkaCarrierTechnologyParameters.getBatchSize();
+ kafkaCarrierTechnologyParameters.setBatchSize(-1);
+ assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+ kafkaCarrierTechnologyParameters.setBatchSize(origIntValue);
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+ origIntValue = kafkaCarrierTechnologyParameters.getLingerTime();
+ kafkaCarrierTechnologyParameters.setLingerTime(-1);
+ assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+ kafkaCarrierTechnologyParameters.setLingerTime(origIntValue);
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+ long origLongValue = kafkaCarrierTechnologyParameters.getBufferMemory();
+ kafkaCarrierTechnologyParameters.setBufferMemory(-1);
+ assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+ kafkaCarrierTechnologyParameters.setBufferMemory(origLongValue);
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+ origIntValue = kafkaCarrierTechnologyParameters.getAutoCommitTime();
+ kafkaCarrierTechnologyParameters.setAutoCommitTime(-1);
+ assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+ kafkaCarrierTechnologyParameters.setAutoCommitTime(origIntValue);
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+ origIntValue = kafkaCarrierTechnologyParameters.getSessionTimeout();
+ kafkaCarrierTechnologyParameters.setSessionTimeout(-1);
+ assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+ kafkaCarrierTechnologyParameters.setSessionTimeout(origIntValue);
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+ origIntValue = kafkaCarrierTechnologyParameters.getConsumerPollTime();
+ kafkaCarrierTechnologyParameters.setConsumerPollTime(-1);
+ assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+ kafkaCarrierTechnologyParameters.setConsumerPollTime(origIntValue);
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+ origStringValue = kafkaCarrierTechnologyParameters.getKeySerializer();
+ kafkaCarrierTechnologyParameters.setKeySerializer(" ");
+ assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+ kafkaCarrierTechnologyParameters.setKeySerializer(origStringValue);
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+ origStringValue = kafkaCarrierTechnologyParameters.getValueSerializer();
+ kafkaCarrierTechnologyParameters.setValueSerializer(" ");
+ assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+ kafkaCarrierTechnologyParameters.setValueSerializer(origStringValue);
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+ origStringValue = kafkaCarrierTechnologyParameters.getKeyDeserializer();
+ kafkaCarrierTechnologyParameters.setKeyDeserializer(" ");
+ assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+ kafkaCarrierTechnologyParameters.setKeyDeserializer(origStringValue);
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+ origStringValue = kafkaCarrierTechnologyParameters.getValueDeserializer();
+ kafkaCarrierTechnologyParameters.setValueDeserializer(" ");
+ assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+ kafkaCarrierTechnologyParameters.setValueDeserializer(origStringValue);
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+ String[] origConsumerTopcList = kafkaCarrierTechnologyParameters.getConsumerTopicList();
+ kafkaCarrierTechnologyParameters.setConsumerTopicList(null);
+ assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+ kafkaCarrierTechnologyParameters.setConsumerTopicList(origConsumerTopcList);
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+ kafkaCarrierTechnologyParameters.setConsumerTopicList(new String[0]);
+ assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+ kafkaCarrierTechnologyParameters.setConsumerTopicList(origConsumerTopcList);
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+ String[] blankStringList = {null, ""};
+ kafkaCarrierTechnologyParameters.setConsumerTopicList(blankStringList);
+ assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+ kafkaCarrierTechnologyParameters.setConsumerTopicList(origConsumerTopcList);
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+ String[][] origKafkaProperties = kafkaCarrierTechnologyParameters.getKafkaProperties();
+ kafkaCarrierTechnologyParameters.setKafkaProperties(null);
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+ kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+ kafkaCarrierTechnologyParameters.setKafkaProperties(new String[0][0]);
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+ kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+ // @formatter:off
+ String[][] kafkaProperties0 = {
+ {
+ null, "Value0"
+ }
+ };
+ // @formatter:on
+
+ kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties0);
+ assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+ kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+ // @formatter:off
+ String[][] kafkaProperties1 = {
+ {
+ "Property1", null
+ }
+ };
+ // @formatter:on
+
+ kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties1);
+ assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+ kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+ // @formatter:off
+ String[][] kafkaProperties2 = {
+ {
+ "Property1", null
+ }
+ };
+ // @formatter:on
+
+ kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties2);
+ assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+ kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+ // @formatter:off
+ String[][] kafkaProperties3 = {
+ {
+ "Property1", "Value0", "Value1"
+ }
+ };
+ // @formatter:on
+
+ kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties3);
+ assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+ kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
}
}