summaryrefslogtreecommitdiffstats
path: root/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main
diff options
context:
space:
mode:
authorliamfallon <liam.fallon@est.tech>2019-06-25 16:10:25 +0000
committerliamfallon <liam.fallon@est.tech>2019-06-25 16:10:25 +0000
commit5f029543f1e673655af2d2974113069df0b6def0 (patch)
tree80b84e0e554a6625869847a4b10090c9fa1334c8 /plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main
parentb8316b12e08452cda8322945ca59763e16a37a71 (diff)
Allow Kafka plugin to take arbitrary properties2.2.0
This change adds support for arbitrary Kafka properties to be passed to Kafka through the Kafka plugin. Issue-ID: POLICY-1818 Change-Id: I4389876286747b250c8abe492e9e31674a9483c9 Signed-off-by: liamfallon <liam.fallon@est.tech>
Diffstat (limited to 'plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main')
-rw-r--r--plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java6
-rw-r--r--plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java386
2 files changed, 105 insertions, 287 deletions
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java
index affd10ccb..6860bca22 100644
--- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java
+++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java
@@ -92,7 +92,7 @@ public class ApexKafkaConsumer implements ApexEventConsumer, Runnable {
// Kick off the Kafka consumer
kafkaConsumer = new KafkaConsumer<>(kafkaConsumerProperties.getKafkaConsumerProperties());
- kafkaConsumer.subscribe(kafkaConsumerProperties.getConsumerTopicList());
+ kafkaConsumer.subscribe(kafkaConsumerProperties.getConsumerTopicListAsCollection());
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("event receiver for " + this.getClass().getName() + ":" + this.name + " subscribed to topics: "
+ kafkaConsumerProperties.getConsumerTopicList());
@@ -154,7 +154,7 @@ public class ApexKafkaConsumer implements ApexEventConsumer, Runnable {
public void run() {
// Kick off the Kafka consumer
kafkaConsumer = new KafkaConsumer<>(kafkaConsumerProperties.getKafkaConsumerProperties());
- kafkaConsumer.subscribe(kafkaConsumerProperties.getConsumerTopicList());
+ kafkaConsumer.subscribe(kafkaConsumerProperties.getConsumerTopicListAsCollection());
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("event receiver for " + this.getClass().getName() + ":" + this.name + " subscribed to topics: "
+ kafkaConsumerProperties.getConsumerTopicList());
@@ -181,7 +181,7 @@ public class ApexKafkaConsumer implements ApexEventConsumer, Runnable {
/**
* Trace a record if trace is enabled.
- *
+ *
* @param record the record to trace
*/
private void traceIfTraceEnabled(final ConsumerRecord<String, String> record) {
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java
index 36947ee13..f66dbfe9e 100644
--- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java
+++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java
@@ -25,16 +25,24 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Properties;
+import lombok.Getter;
+import lombok.Setter;
+
+import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
import org.onap.policy.apex.service.parameters.carriertechnology.CarrierTechnologyParameters;
import org.onap.policy.common.parameters.GroupValidationResult;
import org.onap.policy.common.parameters.ValidationStatus;
+import org.onap.policy.common.parameters.annotations.Min;
+import org.onap.policy.common.parameters.annotations.NotBlank;
/**
* Apex parameters for Kafka as an event carrier technology.
*
* @author Liam Fallon (liam.fallon@ericsson.com)
*/
+@Getter
+@Setter
public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameters {
// @formatter:off
/** The label of this carrier technology. */
@@ -48,6 +56,8 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter
// Repeated strings in messages
private static final String SPECIFY_AS_STRING_MESSAGE = "not specified, must be specified as a string";
+ private static final String ENTRY = "entry ";
+ private static final String KAFKA_PROPERTIES = "kafkaProperties";
// Default parameter values
private static final String DEFAULT_ACKS = "all";
@@ -85,24 +95,44 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter
private static final String PROPERTY_PARTITIONER_CLASS = "partitioner.class";
// kafka carrier parameters
+ @NotBlank
private String bootstrapServers = DEFAULT_BOOT_SERVERS;
+ @NotBlank
private String acks = DEFAULT_ACKS;
+ @Min(value = 0)
private int retries = DEFAULT_RETRIES;
+ @Min(value = 0)
private int batchSize = DEFAULT_BATCH_SIZE;
+ @Min(value = 0)
private int lingerTime = DEFAULT_LINGER_TIME;
+ @Min(value = 0)
private long bufferMemory = DEFAULT_BUFFER_MEMORY;
+ @NotBlank
private String groupId = DEFAULT_GROUP_ID;
private boolean enableAutoCommit = DEFAULT_ENABLE_AUTOCMIT;
+ @Min(value = 0)
private int autoCommitTime = DEFAULT_AUTO_COMMIT_TIME;
+ @Min(value = 0)
private int sessionTimeout = DEFAULT_SESSION_TIMEOUT;
+ @NotBlank
private String producerTopic = DEFAULT_PROD_TOPIC;
+ @Min(value = 0)
private int consumerPollTime = DEFAULT_CONS_POLL_TIME;
private String[] consumerTopicList = DEFAULT_CONS_TOPICLIST;
+ @NotBlank
private String keySerializer = DEFAULT_STRING_SERZER;
+ @NotBlank
private String valueSerializer = DEFAULT_STRING_SERZER;
+ @NotBlank
private String keyDeserializer = DEFAULT_STRING_DESZER;
+ @NotBlank
private String valueDeserializer = DEFAULT_STRING_DESZER;
+ @NotBlank
private String partitionerClass = DEFAULT_PARTITIONR_CLASS;
+
+ // All Kafka properties can be specified as an array of key-value pairs
+ private String[][] kafkaProperties = null;
+
// @formatter:on
/**
@@ -124,19 +154,26 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter
* @return the kafka producer properties
*/
public Properties getKafkaProducerProperties() {
- final Properties kafkaProperties = new Properties();
-
- kafkaProperties.put(PROPERTY_BOOTSTRAP_SERVERS, bootstrapServers);
- kafkaProperties.put(PROPERTY_ACKS, acks);
- kafkaProperties.put(PROPERTY_RETRIES, retries);
- kafkaProperties.put(PROPERTY_BATCH_SIZE, batchSize);
- kafkaProperties.put(PROPERTY_LINGER_TIME, lingerTime);
- kafkaProperties.put(PROPERTY_BUFFER_MEMORY, bufferMemory);
- kafkaProperties.put(PROPERTY_KEY_SERIALIZER, keySerializer);
- kafkaProperties.put(PROPERTY_VALUE_SERIALIZER, valueSerializer);
- kafkaProperties.put(PROPERTY_PARTITIONER_CLASS, partitionerClass);
-
- return kafkaProperties;
+ final Properties returnKafkaProperties = new Properties();
+
+ // Add properties from the Kafka property array
+ if (kafkaProperties != null) {
+ for (int i = 0; i < kafkaProperties.length; i++) {
+ returnKafkaProperties.put(kafkaProperties[i][0], kafkaProperties[i][1]);
+ }
+ }
+
+ returnKafkaProperties.put(PROPERTY_BOOTSTRAP_SERVERS, bootstrapServers);
+ returnKafkaProperties.put(PROPERTY_ACKS, acks);
+ returnKafkaProperties.put(PROPERTY_RETRIES, retries);
+ returnKafkaProperties.put(PROPERTY_BATCH_SIZE, batchSize);
+ returnKafkaProperties.put(PROPERTY_LINGER_TIME, lingerTime);
+ returnKafkaProperties.put(PROPERTY_BUFFER_MEMORY, bufferMemory);
+ returnKafkaProperties.put(PROPERTY_KEY_SERIALIZER, keySerializer);
+ returnKafkaProperties.put(PROPERTY_VALUE_SERIALIZER, valueSerializer);
+ returnKafkaProperties.put(PROPERTY_PARTITIONER_CLASS, partitionerClass);
+
+ return returnKafkaProperties;
}
/**
@@ -145,125 +182,33 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter
* @return the kafka consumer properties
*/
public Properties getKafkaConsumerProperties() {
- final Properties kafkaProperties = new Properties();
-
- kafkaProperties.put(PROPERTY_BOOTSTRAP_SERVERS, bootstrapServers);
- kafkaProperties.put(PROPERTY_GROUP_ID, groupId);
- kafkaProperties.put(PROPERTY_ENABLE_AUTO_COMMIT, enableAutoCommit);
- kafkaProperties.put(PROPERTY_AUTO_COMMIT_TIME, autoCommitTime);
- kafkaProperties.put(PROPERTY_SESSION_TIMEOUT, sessionTimeout);
- kafkaProperties.put(PROPERTY_KEY_DESERIALIZER, keyDeserializer);
- kafkaProperties.put(PROPERTY_VALUE_DESERIALIZER, valueDeserializer);
-
- return kafkaProperties;
- }
-
- /**
- * Gets the bootstrap servers.
- *
- * @return the bootstrap servers
- */
- public String getBootstrapServers() {
- return bootstrapServers;
- }
-
- /**
- * Gets the acks.
- *
- * @return the acks
- */
- public String getAcks() {
- return acks;
- }
+ final Properties returnKafkaProperties = new Properties();
- /**
- * Gets the retries.
- *
- * @return the retries
- */
- public int getRetries() {
- return retries;
- }
-
- /**
- * Gets the batch size.
- *
- * @return the batch size
- */
- public int getBatchSize() {
- return batchSize;
- }
-
- /**
- * Gets the linger time.
- *
- * @return the linger time
- */
- public int getLingerTime() {
- return lingerTime;
- }
-
- /**
- * Gets the buffer memory.
- *
- * @return the buffer memory
- */
- public long getBufferMemory() {
- return bufferMemory;
- }
-
- /**
- * Gets the group id.
- *
- * @return the group id
- */
- public String getGroupId() {
- return groupId;
- }
-
- /**
- * Checks if is enable auto commit.
- *
- * @return true, if checks if is enable auto commit
- */
- public boolean isEnableAutoCommit() {
- return enableAutoCommit;
- }
-
- /**
- * Gets the auto commit time.
- *
- * @return the auto commit time
- */
- public int getAutoCommitTime() {
- return autoCommitTime;
- }
+ // Add properties from the Kafka property array
+ if (kafkaProperties != null) {
+ for (int i = 0; i < kafkaProperties.length; i++) {
+ returnKafkaProperties.put(kafkaProperties[i][0], kafkaProperties[i][1]);
+ }
+ }
- /**
- * Gets the session timeout.
- *
- * @return the session timeout
- */
- public int getSessionTimeout() {
- return sessionTimeout;
- }
+ returnKafkaProperties.put(PROPERTY_BOOTSTRAP_SERVERS, bootstrapServers);
+ returnKafkaProperties.put(PROPERTY_GROUP_ID, groupId);
+ returnKafkaProperties.put(PROPERTY_ENABLE_AUTO_COMMIT, enableAutoCommit);
+ returnKafkaProperties.put(PROPERTY_AUTO_COMMIT_TIME, autoCommitTime);
+ returnKafkaProperties.put(PROPERTY_SESSION_TIMEOUT, sessionTimeout);
+ returnKafkaProperties.put(PROPERTY_KEY_DESERIALIZER, keyDeserializer);
+ returnKafkaProperties.put(PROPERTY_VALUE_DESERIALIZER, valueDeserializer);
- /**
- * Gets the producer topic.
- *
- * @return the producer topic
- */
- public String getProducerTopic() {
- return producerTopic;
+ return returnKafkaProperties;
}
/**
- * Gets the consumer poll time.
+ * Gets the consumer topic list.
*
- * @return the consumer poll time
+ * @return the consumer topic list
*/
- public long getConsumerPollTime() {
- return consumerPollTime;
+ public Collection<String> getConsumerTopicListAsCollection() {
+ return Arrays.asList(consumerTopicList);
}
/**
@@ -274,60 +219,6 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter
return Duration.ofMillis(consumerPollTime);
}
- /**
- * Gets the consumer topic list.
- *
- * @return the consumer topic list
- */
- public Collection<String> getConsumerTopicList() {
- return Arrays.asList(consumerTopicList);
- }
-
- /**
- * Gets the key serializer.
- *
- * @return the key serializer
- */
- public String getKeySerializer() {
- return keySerializer;
- }
-
- /**
- * Gets the value serializer.
- *
- * @return the value serializer
- */
- public String getValueSerializer() {
- return valueSerializer;
- }
-
- /**
- * Gets the key deserializer.
- *
- * @return the key deserializer
- */
- public String getKeyDeserializer() {
- return keyDeserializer;
- }
-
- /**
- * Gets the value deserializer.
- *
- * @return the value deserializer
- */
- public String getValueDeserializer() {
- return valueDeserializer;
- }
-
- /**
- * Gets the value deserializer.
- *
- * @return the value deserializer
- */
- public String getPartitionerClass() {
- return partitionerClass;
- }
-
/*
* (non-Javadoc)
*
@@ -337,136 +228,63 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter
public GroupValidationResult validate() {
final GroupValidationResult result = super.validate();
- validateStringParameters(result);
-
- validateNumericParameters(result);
-
validateConsumerTopicList(result);
- validateSerializersAndDeserializers(result);
+ validateKafkaProperties(result);
return result;
}
/**
- * Validate that string parameters are correct.
- *
- * @param result the result of the validation
- */
- private void validateStringParameters(final GroupValidationResult result) {
- if (isNullOrBlank(bootstrapServers)) {
- result.setResult("bootstrapServers", ValidationStatus.INVALID,
- "not specified, must be specified as a string of form host:port");
- }
-
- if (isNullOrBlank(acks)) {
- result.setResult("acks", ValidationStatus.INVALID,
- "not specified, must be specified as a string with values [0|1|all]");
- }
-
- if (isNullOrBlank(groupId)) {
- result.setResult("groupId", ValidationStatus.INVALID, SPECIFY_AS_STRING_MESSAGE);
- }
-
- if (isNullOrBlank(producerTopic)) {
- result.setResult("producerTopic", ValidationStatus.INVALID,
- SPECIFY_AS_STRING_MESSAGE);
- }
-
- if (isNullOrBlank(partitionerClass)) {
- result.setResult("partitionerClass", ValidationStatus.INVALID,
- SPECIFY_AS_STRING_MESSAGE);
- }
- }
-
- /**
- * Check if numeric parameters are valid.
- *
- * @param result the result of the validation
- */
- private void validateNumericParameters(final GroupValidationResult result) {
- if (retries < 0) {
- result.setResult(PROPERTY_RETRIES, ValidationStatus.INVALID,
- "[" + retries + "] invalid, must be specified as retries >= 0");
- }
-
- if (batchSize < 0) {
- result.setResult("batchSize", ValidationStatus.INVALID,
- "[" + batchSize + "] invalid, must be specified as batchSize >= 0");
- }
-
- if (lingerTime < 0) {
- result.setResult("lingerTime", ValidationStatus.INVALID,
- "[" + lingerTime + "] invalid, must be specified as lingerTime >= 0");
- }
-
- if (bufferMemory < 0) {
- result.setResult("bufferMemory", ValidationStatus.INVALID,
- "[" + bufferMemory + "] invalid, must be specified as bufferMemory >= 0");
- }
-
- if (autoCommitTime < 0) {
- result.setResult("autoCommitTime", ValidationStatus.INVALID,
- "[" + autoCommitTime + "] invalid, must be specified as autoCommitTime >= 0");
- }
-
- if (sessionTimeout < 0) {
- result.setResult("sessionTimeout", ValidationStatus.INVALID,
- "[" + sessionTimeout + "] invalid, must be specified as sessionTimeout >= 0");
- }
-
- if (consumerPollTime < 0) {
- result.setResult("consumerPollTime", ValidationStatus.INVALID,
- "[" + consumerPollTime + "] invalid, must be specified as consumerPollTime >= 0");
- }
- }
-
- /**
- * Validate the serializers and deserializers.
+ * Validate the consumer topic list.
*
* @param result the result of the validation.
*/
- private void validateSerializersAndDeserializers(final GroupValidationResult result) {
- if (isNullOrBlank(keySerializer)) {
- result.setResult("keySerializer", ValidationStatus.INVALID,
- SPECIFY_AS_STRING_MESSAGE);
- }
-
- if (isNullOrBlank(valueSerializer)) {
- result.setResult("valueSerializer", ValidationStatus.INVALID,
- SPECIFY_AS_STRING_MESSAGE);
- }
-
- if (isNullOrBlank(keyDeserializer)) {
- result.setResult("keyDeserializer", ValidationStatus.INVALID,
- SPECIFY_AS_STRING_MESSAGE);
- }
-
- if (isNullOrBlank(valueDeserializer)) {
- result.setResult("valueDeserializer", ValidationStatus.INVALID,
- SPECIFY_AS_STRING_MESSAGE);
- }
- }
-
private void validateConsumerTopicList(final GroupValidationResult result) {
if (consumerTopicList == null || consumerTopicList.length == 0) {
result.setResult("consumerTopicList", ValidationStatus.INVALID,
- "not specified, must be specified as a list of strings");
+ "not specified, must be specified as a list of strings");
+ return;
}
StringBuilder consumerTopicStringBuilder = new StringBuilder();
for (final String consumerTopic : consumerTopicList) {
- if (consumerTopic == null || consumerTopic.trim().length() == 0) {
+ if (StringUtils.isBlank(consumerTopic)) {
consumerTopicStringBuilder.append(consumerTopic + "/");
}
}
if (consumerTopicStringBuilder.length() > 0) {
result.setResult("consumerTopicList", ValidationStatus.INVALID,
- "invalid consumer topic list entries found: /" + consumerTopicStringBuilder.toString());
+ "invalid consumer topic list entries found: /" + consumerTopicStringBuilder.toString());
}
}
- private boolean isNullOrBlank(final String stringValue) {
- return stringValue == null || stringValue.trim().length() == 0;
+ /**
+ * Validate the kafka properties.
+ *
+ * @param result the result of the validation.
+ */
+ private void validateKafkaProperties(final GroupValidationResult result) {
+ // Kafka properties are optional
+ if (kafkaProperties == null || kafkaProperties.length == 0) {
+ return;
+ }
+
+ for (int i = 0; i < kafkaProperties.length; i++) {
+ if (kafkaProperties[i].length != 2) {
+ result.setResult(KAFKA_PROPERTIES, ValidationStatus.INVALID,
+ ENTRY + i + " invalid, kafka properties must be name-value pairs");
+ }
+
+ if (StringUtils.isBlank(kafkaProperties[i][0])) {
+ result.setResult(KAFKA_PROPERTIES, ValidationStatus.INVALID,
+ ENTRY + i + " invalid, key is null or blank");
+ }
+
+ if (StringUtils.isBlank(kafkaProperties[i][1])) {
+ result.setResult(KAFKA_PROPERTIES, ValidationStatus.INVALID,
+ ENTRY + i + " invalid, value is null or blank");
+ }
+ }
}
}