summaryrefslogtreecommitdiffstats
path: root/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka
diff options
context:
space:
mode:
authorliamfallon <liam.fallon@est.tech>2019-06-25 16:10:25 +0000
committerliamfallon <liam.fallon@est.tech>2019-06-25 16:10:25 +0000
commit5f029543f1e673655af2d2974113069df0b6def0 (patch)
tree80b84e0e554a6625869847a4b10090c9fa1334c8 /plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka
parentb8316b12e08452cda8322945ca59763e16a37a71 (diff)
Allow Kafka plugin to take arbitrary properties2.2.0
This change adds support for arbitrary Kafka properties to be passed to Kafka through the Kafka plugin. Issue-ID: POLICY-1818 Change-Id: I4389876286747b250c8abe492e9e31674a9483c9 Signed-off-by: liamfallon <liam.fallon@est.tech>
Diffstat (limited to 'plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka')
-rw-r--r--plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java6
-rw-r--r--plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java386
-rw-r--r--plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParametersTest.java270
3 files changed, 353 insertions, 309 deletions
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java
index affd10ccb..6860bca22 100644
--- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java
+++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java
@@ -92,7 +92,7 @@ public class ApexKafkaConsumer implements ApexEventConsumer, Runnable {
// Kick off the Kafka consumer
kafkaConsumer = new KafkaConsumer<>(kafkaConsumerProperties.getKafkaConsumerProperties());
- kafkaConsumer.subscribe(kafkaConsumerProperties.getConsumerTopicList());
+ kafkaConsumer.subscribe(kafkaConsumerProperties.getConsumerTopicListAsCollection());
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("event receiver for " + this.getClass().getName() + ":" + this.name + " subscribed to topics: "
+ kafkaConsumerProperties.getConsumerTopicList());
@@ -154,7 +154,7 @@ public class ApexKafkaConsumer implements ApexEventConsumer, Runnable {
public void run() {
// Kick off the Kafka consumer
kafkaConsumer = new KafkaConsumer<>(kafkaConsumerProperties.getKafkaConsumerProperties());
- kafkaConsumer.subscribe(kafkaConsumerProperties.getConsumerTopicList());
+ kafkaConsumer.subscribe(kafkaConsumerProperties.getConsumerTopicListAsCollection());
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("event receiver for " + this.getClass().getName() + ":" + this.name + " subscribed to topics: "
+ kafkaConsumerProperties.getConsumerTopicList());
@@ -181,7 +181,7 @@ public class ApexKafkaConsumer implements ApexEventConsumer, Runnable {
/**
* Trace a record if trace is enabled.
- *
+ *
* @param record the record to trace
*/
private void traceIfTraceEnabled(final ConsumerRecord<String, String> record) {
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java
index 36947ee13..f66dbfe9e 100644
--- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java
+++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java
@@ -25,16 +25,24 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Properties;
+import lombok.Getter;
+import lombok.Setter;
+
+import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
import org.onap.policy.apex.service.parameters.carriertechnology.CarrierTechnologyParameters;
import org.onap.policy.common.parameters.GroupValidationResult;
import org.onap.policy.common.parameters.ValidationStatus;
+import org.onap.policy.common.parameters.annotations.Min;
+import org.onap.policy.common.parameters.annotations.NotBlank;
/**
* Apex parameters for Kafka as an event carrier technology.
*
* @author Liam Fallon (liam.fallon@ericsson.com)
*/
+@Getter
+@Setter
public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameters {
// @formatter:off
/** The label of this carrier technology. */
@@ -48,6 +56,8 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter
// Repeated strings in messages
private static final String SPECIFY_AS_STRING_MESSAGE = "not specified, must be specified as a string";
+ private static final String ENTRY = "entry ";
+ private static final String KAFKA_PROPERTIES = "kafkaProperties";
// Default parameter values
private static final String DEFAULT_ACKS = "all";
@@ -85,24 +95,44 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter
private static final String PROPERTY_PARTITIONER_CLASS = "partitioner.class";
// kafka carrier parameters
+ @NotBlank
private String bootstrapServers = DEFAULT_BOOT_SERVERS;
+ @NotBlank
private String acks = DEFAULT_ACKS;
+ @Min(value = 0)
private int retries = DEFAULT_RETRIES;
+ @Min(value = 0)
private int batchSize = DEFAULT_BATCH_SIZE;
+ @Min(value = 0)
private int lingerTime = DEFAULT_LINGER_TIME;
+ @Min(value = 0)
private long bufferMemory = DEFAULT_BUFFER_MEMORY;
+ @NotBlank
private String groupId = DEFAULT_GROUP_ID;
private boolean enableAutoCommit = DEFAULT_ENABLE_AUTOCMIT;
+ @Min(value = 0)
private int autoCommitTime = DEFAULT_AUTO_COMMIT_TIME;
+ @Min(value = 0)
private int sessionTimeout = DEFAULT_SESSION_TIMEOUT;
+ @NotBlank
private String producerTopic = DEFAULT_PROD_TOPIC;
+ @Min(value = 0)
private int consumerPollTime = DEFAULT_CONS_POLL_TIME;
private String[] consumerTopicList = DEFAULT_CONS_TOPICLIST;
+ @NotBlank
private String keySerializer = DEFAULT_STRING_SERZER;
+ @NotBlank
private String valueSerializer = DEFAULT_STRING_SERZER;
+ @NotBlank
private String keyDeserializer = DEFAULT_STRING_DESZER;
+ @NotBlank
private String valueDeserializer = DEFAULT_STRING_DESZER;
+ @NotBlank
private String partitionerClass = DEFAULT_PARTITIONR_CLASS;
+
+ // All Kafka properties can be specified as an array of key-value pairs
+ private String[][] kafkaProperties = null;
+
// @formatter:on
/**
@@ -124,19 +154,26 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter
* @return the kafka producer properties
*/
public Properties getKafkaProducerProperties() {
- final Properties kafkaProperties = new Properties();
-
- kafkaProperties.put(PROPERTY_BOOTSTRAP_SERVERS, bootstrapServers);
- kafkaProperties.put(PROPERTY_ACKS, acks);
- kafkaProperties.put(PROPERTY_RETRIES, retries);
- kafkaProperties.put(PROPERTY_BATCH_SIZE, batchSize);
- kafkaProperties.put(PROPERTY_LINGER_TIME, lingerTime);
- kafkaProperties.put(PROPERTY_BUFFER_MEMORY, bufferMemory);
- kafkaProperties.put(PROPERTY_KEY_SERIALIZER, keySerializer);
- kafkaProperties.put(PROPERTY_VALUE_SERIALIZER, valueSerializer);
- kafkaProperties.put(PROPERTY_PARTITIONER_CLASS, partitionerClass);
-
- return kafkaProperties;
+ final Properties returnKafkaProperties = new Properties();
+
+ // Add properties from the Kafka property array
+ if (kafkaProperties != null) {
+ for (int i = 0; i < kafkaProperties.length; i++) {
+ returnKafkaProperties.put(kafkaProperties[i][0], kafkaProperties[i][1]);
+ }
+ }
+
+ returnKafkaProperties.put(PROPERTY_BOOTSTRAP_SERVERS, bootstrapServers);
+ returnKafkaProperties.put(PROPERTY_ACKS, acks);
+ returnKafkaProperties.put(PROPERTY_RETRIES, retries);
+ returnKafkaProperties.put(PROPERTY_BATCH_SIZE, batchSize);
+ returnKafkaProperties.put(PROPERTY_LINGER_TIME, lingerTime);
+ returnKafkaProperties.put(PROPERTY_BUFFER_MEMORY, bufferMemory);
+ returnKafkaProperties.put(PROPERTY_KEY_SERIALIZER, keySerializer);
+ returnKafkaProperties.put(PROPERTY_VALUE_SERIALIZER, valueSerializer);
+ returnKafkaProperties.put(PROPERTY_PARTITIONER_CLASS, partitionerClass);
+
+ return returnKafkaProperties;
}
/**
@@ -145,125 +182,33 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter
* @return the kafka consumer properties
*/
public Properties getKafkaConsumerProperties() {
- final Properties kafkaProperties = new Properties();
-
- kafkaProperties.put(PROPERTY_BOOTSTRAP_SERVERS, bootstrapServers);
- kafkaProperties.put(PROPERTY_GROUP_ID, groupId);
- kafkaProperties.put(PROPERTY_ENABLE_AUTO_COMMIT, enableAutoCommit);
- kafkaProperties.put(PROPERTY_AUTO_COMMIT_TIME, autoCommitTime);
- kafkaProperties.put(PROPERTY_SESSION_TIMEOUT, sessionTimeout);
- kafkaProperties.put(PROPERTY_KEY_DESERIALIZER, keyDeserializer);
- kafkaProperties.put(PROPERTY_VALUE_DESERIALIZER, valueDeserializer);
-
- return kafkaProperties;
- }
-
- /**
- * Gets the bootstrap servers.
- *
- * @return the bootstrap servers
- */
- public String getBootstrapServers() {
- return bootstrapServers;
- }
-
- /**
- * Gets the acks.
- *
- * @return the acks
- */
- public String getAcks() {
- return acks;
- }
+ final Properties returnKafkaProperties = new Properties();
- /**
- * Gets the retries.
- *
- * @return the retries
- */
- public int getRetries() {
- return retries;
- }
-
- /**
- * Gets the batch size.
- *
- * @return the batch size
- */
- public int getBatchSize() {
- return batchSize;
- }
-
- /**
- * Gets the linger time.
- *
- * @return the linger time
- */
- public int getLingerTime() {
- return lingerTime;
- }
-
- /**
- * Gets the buffer memory.
- *
- * @return the buffer memory
- */
- public long getBufferMemory() {
- return bufferMemory;
- }
-
- /**
- * Gets the group id.
- *
- * @return the group id
- */
- public String getGroupId() {
- return groupId;
- }
-
- /**
- * Checks if is enable auto commit.
- *
- * @return true, if checks if is enable auto commit
- */
- public boolean isEnableAutoCommit() {
- return enableAutoCommit;
- }
-
- /**
- * Gets the auto commit time.
- *
- * @return the auto commit time
- */
- public int getAutoCommitTime() {
- return autoCommitTime;
- }
+ // Add properties from the Kafka property array
+ if (kafkaProperties != null) {
+ for (int i = 0; i < kafkaProperties.length; i++) {
+ returnKafkaProperties.put(kafkaProperties[i][0], kafkaProperties[i][1]);
+ }
+ }
- /**
- * Gets the session timeout.
- *
- * @return the session timeout
- */
- public int getSessionTimeout() {
- return sessionTimeout;
- }
+ returnKafkaProperties.put(PROPERTY_BOOTSTRAP_SERVERS, bootstrapServers);
+ returnKafkaProperties.put(PROPERTY_GROUP_ID, groupId);
+ returnKafkaProperties.put(PROPERTY_ENABLE_AUTO_COMMIT, enableAutoCommit);
+ returnKafkaProperties.put(PROPERTY_AUTO_COMMIT_TIME, autoCommitTime);
+ returnKafkaProperties.put(PROPERTY_SESSION_TIMEOUT, sessionTimeout);
+ returnKafkaProperties.put(PROPERTY_KEY_DESERIALIZER, keyDeserializer);
+ returnKafkaProperties.put(PROPERTY_VALUE_DESERIALIZER, valueDeserializer);
- /**
- * Gets the producer topic.
- *
- * @return the producer topic
- */
- public String getProducerTopic() {
- return producerTopic;
+ return returnKafkaProperties;
}
/**
- * Gets the consumer poll time.
+ * Gets the consumer topic list.
*
- * @return the consumer poll time
+ * @return the consumer topic list
*/
- public long getConsumerPollTime() {
- return consumerPollTime;
+ public Collection<String> getConsumerTopicListAsCollection() {
+ return Arrays.asList(consumerTopicList);
}
/**
@@ -274,60 +219,6 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter
return Duration.ofMillis(consumerPollTime);
}
- /**
- * Gets the consumer topic list.
- *
- * @return the consumer topic list
- */
- public Collection<String> getConsumerTopicList() {
- return Arrays.asList(consumerTopicList);
- }
-
- /**
- * Gets the key serializer.
- *
- * @return the key serializer
- */
- public String getKeySerializer() {
- return keySerializer;
- }
-
- /**
- * Gets the value serializer.
- *
- * @return the value serializer
- */
- public String getValueSerializer() {
- return valueSerializer;
- }
-
- /**
- * Gets the key deserializer.
- *
- * @return the key deserializer
- */
- public String getKeyDeserializer() {
- return keyDeserializer;
- }
-
- /**
- * Gets the value deserializer.
- *
- * @return the value deserializer
- */
- public String getValueDeserializer() {
- return valueDeserializer;
- }
-
- /**
- * Gets the value deserializer.
- *
- * @return the value deserializer
- */
- public String getPartitionerClass() {
- return partitionerClass;
- }
-
/*
* (non-Javadoc)
*
@@ -337,136 +228,63 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter
public GroupValidationResult validate() {
final GroupValidationResult result = super.validate();
- validateStringParameters(result);
-
- validateNumericParameters(result);
-
validateConsumerTopicList(result);
- validateSerializersAndDeserializers(result);
+ validateKafkaProperties(result);
return result;
}
/**
- * Validate that string parameters are correct.
- *
- * @param result the result of the validation
- */
- private void validateStringParameters(final GroupValidationResult result) {
- if (isNullOrBlank(bootstrapServers)) {
- result.setResult("bootstrapServers", ValidationStatus.INVALID,
- "not specified, must be specified as a string of form host:port");
- }
-
- if (isNullOrBlank(acks)) {
- result.setResult("acks", ValidationStatus.INVALID,
- "not specified, must be specified as a string with values [0|1|all]");
- }
-
- if (isNullOrBlank(groupId)) {
- result.setResult("groupId", ValidationStatus.INVALID, SPECIFY_AS_STRING_MESSAGE);
- }
-
- if (isNullOrBlank(producerTopic)) {
- result.setResult("producerTopic", ValidationStatus.INVALID,
- SPECIFY_AS_STRING_MESSAGE);
- }
-
- if (isNullOrBlank(partitionerClass)) {
- result.setResult("partitionerClass", ValidationStatus.INVALID,
- SPECIFY_AS_STRING_MESSAGE);
- }
- }
-
- /**
- * Check if numeric parameters are valid.
- *
- * @param result the result of the validation
- */
- private void validateNumericParameters(final GroupValidationResult result) {
- if (retries < 0) {
- result.setResult(PROPERTY_RETRIES, ValidationStatus.INVALID,
- "[" + retries + "] invalid, must be specified as retries >= 0");
- }
-
- if (batchSize < 0) {
- result.setResult("batchSize", ValidationStatus.INVALID,
- "[" + batchSize + "] invalid, must be specified as batchSize >= 0");
- }
-
- if (lingerTime < 0) {
- result.setResult("lingerTime", ValidationStatus.INVALID,
- "[" + lingerTime + "] invalid, must be specified as lingerTime >= 0");
- }
-
- if (bufferMemory < 0) {
- result.setResult("bufferMemory", ValidationStatus.INVALID,
- "[" + bufferMemory + "] invalid, must be specified as bufferMemory >= 0");
- }
-
- if (autoCommitTime < 0) {
- result.setResult("autoCommitTime", ValidationStatus.INVALID,
- "[" + autoCommitTime + "] invalid, must be specified as autoCommitTime >= 0");
- }
-
- if (sessionTimeout < 0) {
- result.setResult("sessionTimeout", ValidationStatus.INVALID,
- "[" + sessionTimeout + "] invalid, must be specified as sessionTimeout >= 0");
- }
-
- if (consumerPollTime < 0) {
- result.setResult("consumerPollTime", ValidationStatus.INVALID,
- "[" + consumerPollTime + "] invalid, must be specified as consumerPollTime >= 0");
- }
- }
-
- /**
- * Validate the serializers and deserializers.
+ * Validate the consumer topic list.
*
* @param result the result of the validation.
*/
- private void validateSerializersAndDeserializers(final GroupValidationResult result) {
- if (isNullOrBlank(keySerializer)) {
- result.setResult("keySerializer", ValidationStatus.INVALID,
- SPECIFY_AS_STRING_MESSAGE);
- }
-
- if (isNullOrBlank(valueSerializer)) {
- result.setResult("valueSerializer", ValidationStatus.INVALID,
- SPECIFY_AS_STRING_MESSAGE);
- }
-
- if (isNullOrBlank(keyDeserializer)) {
- result.setResult("keyDeserializer", ValidationStatus.INVALID,
- SPECIFY_AS_STRING_MESSAGE);
- }
-
- if (isNullOrBlank(valueDeserializer)) {
- result.setResult("valueDeserializer", ValidationStatus.INVALID,
- SPECIFY_AS_STRING_MESSAGE);
- }
- }
-
private void validateConsumerTopicList(final GroupValidationResult result) {
if (consumerTopicList == null || consumerTopicList.length == 0) {
result.setResult("consumerTopicList", ValidationStatus.INVALID,
- "not specified, must be specified as a list of strings");
+ "not specified, must be specified as a list of strings");
+ return;
}
StringBuilder consumerTopicStringBuilder = new StringBuilder();
for (final String consumerTopic : consumerTopicList) {
- if (consumerTopic == null || consumerTopic.trim().length() == 0) {
+ if (StringUtils.isBlank(consumerTopic)) {
consumerTopicStringBuilder.append(consumerTopic + "/");
}
}
if (consumerTopicStringBuilder.length() > 0) {
result.setResult("consumerTopicList", ValidationStatus.INVALID,
- "invalid consumer topic list entries found: /" + consumerTopicStringBuilder.toString());
+ "invalid consumer topic list entries found: /" + consumerTopicStringBuilder.toString());
}
}
- private boolean isNullOrBlank(final String stringValue) {
- return stringValue == null || stringValue.trim().length() == 0;
+ /**
+ * Validate the kafka properties.
+ *
+ * @param result the result of the validation.
+ */
+ private void validateKafkaProperties(final GroupValidationResult result) {
+ // Kafka properties are optional
+ if (kafkaProperties == null || kafkaProperties.length == 0) {
+ return;
+ }
+
+ for (int i = 0; i < kafkaProperties.length; i++) {
+ if (kafkaProperties[i].length != 2) {
+ result.setResult(KAFKA_PROPERTIES, ValidationStatus.INVALID,
+ ENTRY + i + " invalid, kafka properties must be name-value pairs");
+ }
+
+ if (StringUtils.isBlank(kafkaProperties[i][0])) {
+ result.setResult(KAFKA_PROPERTIES, ValidationStatus.INVALID,
+ ENTRY + i + " invalid, key is null or blank");
+ }
+
+ if (StringUtils.isBlank(kafkaProperties[i][1])) {
+ result.setResult(KAFKA_PROPERTIES, ValidationStatus.INVALID,
+ ENTRY + i + " invalid, value is null or blank");
+ }
+ }
}
}
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParametersTest.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParametersTest.java
index 6eca1dcf0..2f5405ba8 100644
--- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParametersTest.java
+++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParametersTest.java
@@ -20,50 +20,276 @@
package org.onap.policy.apex.plugins.event.carrier.kafka;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
import java.util.Properties;
-import org.junit.Before;
+
import org.junit.Test;
-import org.onap.policy.common.parameters.GroupValidationResult;
public class KafkaCarrierTechnologyParametersTest {
-
- KafkaCarrierTechnologyParameters kafkaCarrierTechnologyParameters = null;
- Properties kafkaProducerProperties = null;
- Properties kafkaConsumerProperties = null;
- GroupValidationResult result = null;
-
- /**
- * Set up testing.
- *
- * @throws Exception on setup errors
- */
- @Before
- public void setUp() throws Exception {
- kafkaCarrierTechnologyParameters = new KafkaCarrierTechnologyParameters();
- kafkaProducerProperties = kafkaCarrierTechnologyParameters.getKafkaProducerProperties();
- kafkaConsumerProperties = kafkaCarrierTechnologyParameters.getKafkaConsumerProperties();
- }
-
@Test
public void testKafkaCarrierTechnologyParameters() {
+ KafkaCarrierTechnologyParameters kafkaCarrierTechnologyParameters = new KafkaCarrierTechnologyParameters();
assertNotNull(kafkaCarrierTechnologyParameters);
+
+ assertEquals("localhost:9092", kafkaCarrierTechnologyParameters.getBootstrapServers());
}
@Test
public void testGetKafkaProducerProperties() {
+ KafkaCarrierTechnologyParameters kafkaCarrierTechnologyParameters = new KafkaCarrierTechnologyParameters();
+
+ Properties kafkaProducerProperties = kafkaCarrierTechnologyParameters.getKafkaProducerProperties();
+ assertNotNull(kafkaProducerProperties);
+ assertEquals("localhost:9092", kafkaProducerProperties.get("bootstrap.servers"));
+ assertEquals(1, kafkaProducerProperties.get("linger.ms"));
+ assertEquals(null, kafkaProducerProperties.get("group.id"));
+ assertEquals(null, kafkaProducerProperties.get("Property0"));
+ assertEquals(null, kafkaProducerProperties.get("Property1"));
+ assertEquals(null, kafkaProducerProperties.get("Property2"));
+
+ // @formatter:off
+ String[][] kafkaProperties = {
+ {
+ "Property0", "Value0"
+ },
+ {
+ "Property1", "Value1"
+ }
+ };
+ // @formatter:on
+
+ kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties);
+ kafkaProducerProperties = kafkaCarrierTechnologyParameters.getKafkaProducerProperties();
assertNotNull(kafkaProducerProperties);
+ assertEquals("localhost:9092", kafkaProducerProperties.get("bootstrap.servers"));
+ assertEquals(1, kafkaProducerProperties.get("linger.ms"));
+ assertEquals(null, kafkaProducerProperties.get("group.id"));
+ assertEquals("Value0", kafkaProducerProperties.get("Property0"));
+ assertEquals("Value1", kafkaProducerProperties.get("Property1"));
+ assertEquals(null, kafkaProducerProperties.get("Property2"));
}
@Test
public void testGetKafkaConsumerProperties() {
+ KafkaCarrierTechnologyParameters kafkaCarrierTechnologyParameters = new KafkaCarrierTechnologyParameters();
+
+ Properties kafkaConsumerProperties = kafkaCarrierTechnologyParameters.getKafkaConsumerProperties();
assertNotNull(kafkaConsumerProperties);
+ assertEquals("localhost:9092", kafkaConsumerProperties.get("bootstrap.servers"));
+ assertEquals("default-group-id", kafkaConsumerProperties.get("group.id"));
+ assertEquals(null, kafkaConsumerProperties.get("linger.ms"));
+ assertEquals(null, kafkaConsumerProperties.get("Property0"));
+ assertEquals(null, kafkaConsumerProperties.get("Property1"));
+ assertEquals(null, kafkaConsumerProperties.get("Property2"));
+
+ // @formatter:off
+ String[][] kafkaProperties = {
+ {
+ "Property0", "Value0"
+ },
+ {
+ "Property1", "Value1"
+ }
+ };
+ // @formatter:on
+
+ kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties);
+ kafkaConsumerProperties = kafkaCarrierTechnologyParameters.getKafkaConsumerProperties();
+ assertNotNull(kafkaConsumerProperties);
+ assertEquals("localhost:9092", kafkaConsumerProperties.get("bootstrap.servers"));
+ assertEquals("default-group-id", kafkaConsumerProperties.get("group.id"));
+ assertEquals(null, kafkaConsumerProperties.get("linger.ms"));
+ assertEquals("Value0", kafkaConsumerProperties.get("Property0"));
+ assertEquals("Value1", kafkaConsumerProperties.get("Property1"));
+ assertEquals(null, kafkaConsumerProperties.get("Property2"));
}
@Test
public void testValidate() {
- result = kafkaCarrierTechnologyParameters.validate();
- assertNotNull(result);
+ KafkaCarrierTechnologyParameters kafkaCarrierTechnologyParameters = new KafkaCarrierTechnologyParameters();
+ assertNotNull(kafkaCarrierTechnologyParameters);
+
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+ String origStringValue = kafkaCarrierTechnologyParameters.getBootstrapServers();
+ kafkaCarrierTechnologyParameters.setBootstrapServers(" ");
+ assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+ kafkaCarrierTechnologyParameters.setBootstrapServers(origStringValue);
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+ origStringValue = kafkaCarrierTechnologyParameters.getAcks();
+ kafkaCarrierTechnologyParameters.setAcks(" ");
+ assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+ kafkaCarrierTechnologyParameters.setAcks(origStringValue);
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+ origStringValue = kafkaCarrierTechnologyParameters.getGroupId();
+ kafkaCarrierTechnologyParameters.setGroupId(" ");
+ assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+ kafkaCarrierTechnologyParameters.setGroupId(origStringValue);
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+ origStringValue = kafkaCarrierTechnologyParameters.getProducerTopic();
+ kafkaCarrierTechnologyParameters.setProducerTopic(" ");
+ assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+ kafkaCarrierTechnologyParameters.setProducerTopic(origStringValue);
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+ origStringValue = kafkaCarrierTechnologyParameters.getPartitionerClass();
+ kafkaCarrierTechnologyParameters.setPartitionerClass(" ");
+ assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+ kafkaCarrierTechnologyParameters.setPartitionerClass(origStringValue);
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+ int origIntValue = kafkaCarrierTechnologyParameters.getRetries();
+ kafkaCarrierTechnologyParameters.setRetries(-1);
+ assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+ kafkaCarrierTechnologyParameters.setRetries(origIntValue);
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+ origIntValue = kafkaCarrierTechnologyParameters.getBatchSize();
+ kafkaCarrierTechnologyParameters.setBatchSize(-1);
+ assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+ kafkaCarrierTechnologyParameters.setBatchSize(origIntValue);
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+ origIntValue = kafkaCarrierTechnologyParameters.getLingerTime();
+ kafkaCarrierTechnologyParameters.setLingerTime(-1);
+ assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+ kafkaCarrierTechnologyParameters.setLingerTime(origIntValue);
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+ long origLongValue = kafkaCarrierTechnologyParameters.getBufferMemory();
+ kafkaCarrierTechnologyParameters.setBufferMemory(-1);
+ assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+ kafkaCarrierTechnologyParameters.setBufferMemory(origLongValue);
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+ origIntValue = kafkaCarrierTechnologyParameters.getAutoCommitTime();
+ kafkaCarrierTechnologyParameters.setAutoCommitTime(-1);
+ assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+ kafkaCarrierTechnologyParameters.setAutoCommitTime(origIntValue);
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+ origIntValue = kafkaCarrierTechnologyParameters.getSessionTimeout();
+ kafkaCarrierTechnologyParameters.setSessionTimeout(-1);
+ assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+ kafkaCarrierTechnologyParameters.setSessionTimeout(origIntValue);
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+ origIntValue = kafkaCarrierTechnologyParameters.getConsumerPollTime();
+ kafkaCarrierTechnologyParameters.setConsumerPollTime(-1);
+ assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+ kafkaCarrierTechnologyParameters.setConsumerPollTime(origIntValue);
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+ origStringValue = kafkaCarrierTechnologyParameters.getKeySerializer();
+ kafkaCarrierTechnologyParameters.setKeySerializer(" ");
+ assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+ kafkaCarrierTechnologyParameters.setKeySerializer(origStringValue);
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+ origStringValue = kafkaCarrierTechnologyParameters.getValueSerializer();
+ kafkaCarrierTechnologyParameters.setValueSerializer(" ");
+ assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+ kafkaCarrierTechnologyParameters.setValueSerializer(origStringValue);
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+ origStringValue = kafkaCarrierTechnologyParameters.getKeyDeserializer();
+ kafkaCarrierTechnologyParameters.setKeyDeserializer(" ");
+ assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+ kafkaCarrierTechnologyParameters.setKeyDeserializer(origStringValue);
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+ origStringValue = kafkaCarrierTechnologyParameters.getValueDeserializer();
+ kafkaCarrierTechnologyParameters.setValueDeserializer(" ");
+ assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+ kafkaCarrierTechnologyParameters.setValueDeserializer(origStringValue);
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+ String[] origConsumerTopcList = kafkaCarrierTechnologyParameters.getConsumerTopicList();
+ kafkaCarrierTechnologyParameters.setConsumerTopicList(null);
+ assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+ kafkaCarrierTechnologyParameters.setConsumerTopicList(origConsumerTopcList);
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+ kafkaCarrierTechnologyParameters.setConsumerTopicList(new String[0]);
+ assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+ kafkaCarrierTechnologyParameters.setConsumerTopicList(origConsumerTopcList);
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+ String[] blankStringList = {null, ""};
+ kafkaCarrierTechnologyParameters.setConsumerTopicList(blankStringList);
+ assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+ kafkaCarrierTechnologyParameters.setConsumerTopicList(origConsumerTopcList);
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+ String[][] origKafkaProperties = kafkaCarrierTechnologyParameters.getKafkaProperties();
+ kafkaCarrierTechnologyParameters.setKafkaProperties(null);
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+ kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+ kafkaCarrierTechnologyParameters.setKafkaProperties(new String[0][0]);
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+ kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+ // @formatter:off
+ String[][] kafkaProperties0 = {
+ {
+ null, "Value0"
+ }
+ };
+ // @formatter:on
+
+ kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties0);
+ assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+ kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+ // @formatter:off
+ String[][] kafkaProperties1 = {
+ {
+ "Property1", null
+ }
+ };
+ // @formatter:on
+
+ kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties1);
+ assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+ kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+ // @formatter:off
+ String[][] kafkaProperties2 = {
+ {
+ "Property1", null
+ }
+ };
+ // @formatter:on
+
+ kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties2);
+ assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+ kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
+ // @formatter:off
+ String[][] kafkaProperties3 = {
+ {
+ "Property1", "Value0", "Value1"
+ }
+ };
+ // @formatter:on
+
+ kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties3);
+ assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
+ kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
+ assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+
}
}