From 5f029543f1e673655af2d2974113069df0b6def0 Mon Sep 17 00:00:00 2001 From: liamfallon Date: Tue, 25 Jun 2019 16:10:25 +0000 Subject: Allow Kafka plugin to take arbitrary properties This change adds support for arbitrary Kafka properties to be passed to Kafka through the Kafka plugin. Issue-ID: POLICY-1818 Change-Id: I4389876286747b250c8abe492e9e31674a9483c9 Signed-off-by: liamfallon --- .../event/carrier/kafka/ApexKafkaConsumer.java | 6 +- .../kafka/KafkaCarrierTechnologyParameters.java | 386 ++++++--------------- 2 files changed, 105 insertions(+), 287 deletions(-) (limited to 'plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java') diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java index affd10ccb..6860bca22 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumer.java @@ -92,7 +92,7 @@ public class ApexKafkaConsumer implements ApexEventConsumer, Runnable { // Kick off the Kafka consumer kafkaConsumer = new KafkaConsumer<>(kafkaConsumerProperties.getKafkaConsumerProperties()); - kafkaConsumer.subscribe(kafkaConsumerProperties.getConsumerTopicList()); + kafkaConsumer.subscribe(kafkaConsumerProperties.getConsumerTopicListAsCollection()); if (LOGGER.isDebugEnabled()) { LOGGER.debug("event receiver for " + this.getClass().getName() + ":" + this.name + " subscribed to topics: " + kafkaConsumerProperties.getConsumerTopicList()); @@ -154,7 +154,7 @@ public class ApexKafkaConsumer implements ApexEventConsumer, Runnable { public void run() { // Kick off the Kafka consumer kafkaConsumer = new KafkaConsumer<>(kafkaConsumerProperties.getKafkaConsumerProperties()); - kafkaConsumer.subscribe(kafkaConsumerProperties.getConsumerTopicList()); + kafkaConsumer.subscribe(kafkaConsumerProperties.getConsumerTopicListAsCollection()); if (LOGGER.isDebugEnabled()) { LOGGER.debug("event receiver for " + this.getClass().getName() + ":" + this.name + " subscribed to topics: " + kafkaConsumerProperties.getConsumerTopicList()); @@ -181,7 +181,7 @@ public class ApexKafkaConsumer implements ApexEventConsumer, Runnable { /** * Trace a record if trace is enabled. - * + * * @param record the record to trace */ private void traceIfTraceEnabled(final ConsumerRecord record) { diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java index 36947ee13..f66dbfe9e 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java @@ -25,16 +25,24 @@ import java.util.Arrays; import java.util.Collection; import java.util.Properties; +import lombok.Getter; +import lombok.Setter; + +import org.apache.commons.lang3.StringUtils; import org.apache.kafka.clients.producer.internals.DefaultPartitioner; import org.onap.policy.apex.service.parameters.carriertechnology.CarrierTechnologyParameters; import org.onap.policy.common.parameters.GroupValidationResult; import org.onap.policy.common.parameters.ValidationStatus; +import org.onap.policy.common.parameters.annotations.Min; +import org.onap.policy.common.parameters.annotations.NotBlank; /** * Apex parameters for Kafka as an event carrier technology. * * @author Liam Fallon (liam.fallon@ericsson.com) */ +@Getter +@Setter public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameters { // @formatter:off /** The label of this carrier technology. */ @@ -48,6 +56,8 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter // Repeated strings in messages private static final String SPECIFY_AS_STRING_MESSAGE = "not specified, must be specified as a string"; + private static final String ENTRY = "entry "; + private static final String KAFKA_PROPERTIES = "kafkaProperties"; // Default parameter values private static final String DEFAULT_ACKS = "all"; @@ -85,24 +95,44 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter private static final String PROPERTY_PARTITIONER_CLASS = "partitioner.class"; // kafka carrier parameters + @NotBlank private String bootstrapServers = DEFAULT_BOOT_SERVERS; + @NotBlank private String acks = DEFAULT_ACKS; + @Min(value = 0) private int retries = DEFAULT_RETRIES; + @Min(value = 0) private int batchSize = DEFAULT_BATCH_SIZE; + @Min(value = 0) private int lingerTime = DEFAULT_LINGER_TIME; + @Min(value = 0) private long bufferMemory = DEFAULT_BUFFER_MEMORY; + @NotBlank private String groupId = DEFAULT_GROUP_ID; private boolean enableAutoCommit = DEFAULT_ENABLE_AUTOCMIT; + @Min(value = 0) private int autoCommitTime = DEFAULT_AUTO_COMMIT_TIME; + @Min(value = 0) private int sessionTimeout = DEFAULT_SESSION_TIMEOUT; + @NotBlank private String producerTopic = DEFAULT_PROD_TOPIC; + @Min(value = 0) private int consumerPollTime = DEFAULT_CONS_POLL_TIME; private String[] consumerTopicList = DEFAULT_CONS_TOPICLIST; + @NotBlank private String keySerializer = DEFAULT_STRING_SERZER; + @NotBlank private String valueSerializer = DEFAULT_STRING_SERZER; + @NotBlank private String keyDeserializer = DEFAULT_STRING_DESZER; + @NotBlank private String valueDeserializer = DEFAULT_STRING_DESZER; + @NotBlank private String partitionerClass = DEFAULT_PARTITIONR_CLASS; + + // All Kafka properties can be specified as an array of key-value pairs + private String[][] kafkaProperties = null; + // @formatter:on /** @@ -124,19 +154,26 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter * @return the kafka producer properties */ public Properties getKafkaProducerProperties() { - final Properties kafkaProperties = new Properties(); - - kafkaProperties.put(PROPERTY_BOOTSTRAP_SERVERS, bootstrapServers); - kafkaProperties.put(PROPERTY_ACKS, acks); - kafkaProperties.put(PROPERTY_RETRIES, retries); - kafkaProperties.put(PROPERTY_BATCH_SIZE, batchSize); - kafkaProperties.put(PROPERTY_LINGER_TIME, lingerTime); - kafkaProperties.put(PROPERTY_BUFFER_MEMORY, bufferMemory); - kafkaProperties.put(PROPERTY_KEY_SERIALIZER, keySerializer); - kafkaProperties.put(PROPERTY_VALUE_SERIALIZER, valueSerializer); - kafkaProperties.put(PROPERTY_PARTITIONER_CLASS, partitionerClass); - - return kafkaProperties; + final Properties returnKafkaProperties = new Properties(); + + // Add properties from the Kafka property array + if (kafkaProperties != null) { + for (int i = 0; i < kafkaProperties.length; i++) { + returnKafkaProperties.put(kafkaProperties[i][0], kafkaProperties[i][1]); + } + } + + returnKafkaProperties.put(PROPERTY_BOOTSTRAP_SERVERS, bootstrapServers); + returnKafkaProperties.put(PROPERTY_ACKS, acks); + returnKafkaProperties.put(PROPERTY_RETRIES, retries); + returnKafkaProperties.put(PROPERTY_BATCH_SIZE, batchSize); + returnKafkaProperties.put(PROPERTY_LINGER_TIME, lingerTime); + returnKafkaProperties.put(PROPERTY_BUFFER_MEMORY, bufferMemory); + returnKafkaProperties.put(PROPERTY_KEY_SERIALIZER, keySerializer); + returnKafkaProperties.put(PROPERTY_VALUE_SERIALIZER, valueSerializer); + returnKafkaProperties.put(PROPERTY_PARTITIONER_CLASS, partitionerClass); + + return returnKafkaProperties; } /** @@ -145,125 +182,33 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter * @return the kafka consumer properties */ public Properties getKafkaConsumerProperties() { - final Properties kafkaProperties = new Properties(); - - kafkaProperties.put(PROPERTY_BOOTSTRAP_SERVERS, bootstrapServers); - kafkaProperties.put(PROPERTY_GROUP_ID, groupId); - kafkaProperties.put(PROPERTY_ENABLE_AUTO_COMMIT, enableAutoCommit); - kafkaProperties.put(PROPERTY_AUTO_COMMIT_TIME, autoCommitTime); - kafkaProperties.put(PROPERTY_SESSION_TIMEOUT, sessionTimeout); - kafkaProperties.put(PROPERTY_KEY_DESERIALIZER, keyDeserializer); - kafkaProperties.put(PROPERTY_VALUE_DESERIALIZER, valueDeserializer); - - return kafkaProperties; - } - - /** - * Gets the bootstrap servers. - * - * @return the bootstrap servers - */ - public String getBootstrapServers() { - return bootstrapServers; - } - - /** - * Gets the acks. - * - * @return the acks - */ - public String getAcks() { - return acks; - } + final Properties returnKafkaProperties = new Properties(); - /** - * Gets the retries. - * - * @return the retries - */ - public int getRetries() { - return retries; - } - - /** - * Gets the batch size. - * - * @return the batch size - */ - public int getBatchSize() { - return batchSize; - } - - /** - * Gets the linger time. - * - * @return the linger time - */ - public int getLingerTime() { - return lingerTime; - } - - /** - * Gets the buffer memory. - * - * @return the buffer memory - */ - public long getBufferMemory() { - return bufferMemory; - } - - /** - * Gets the group id. - * - * @return the group id - */ - public String getGroupId() { - return groupId; - } - - /** - * Checks if is enable auto commit. - * - * @return true, if checks if is enable auto commit - */ - public boolean isEnableAutoCommit() { - return enableAutoCommit; - } - - /** - * Gets the auto commit time. - * - * @return the auto commit time - */ - public int getAutoCommitTime() { - return autoCommitTime; - } + // Add properties from the Kafka property array + if (kafkaProperties != null) { + for (int i = 0; i < kafkaProperties.length; i++) { + returnKafkaProperties.put(kafkaProperties[i][0], kafkaProperties[i][1]); + } + } - /** - * Gets the session timeout. - * - * @return the session timeout - */ - public int getSessionTimeout() { - return sessionTimeout; - } + returnKafkaProperties.put(PROPERTY_BOOTSTRAP_SERVERS, bootstrapServers); + returnKafkaProperties.put(PROPERTY_GROUP_ID, groupId); + returnKafkaProperties.put(PROPERTY_ENABLE_AUTO_COMMIT, enableAutoCommit); + returnKafkaProperties.put(PROPERTY_AUTO_COMMIT_TIME, autoCommitTime); + returnKafkaProperties.put(PROPERTY_SESSION_TIMEOUT, sessionTimeout); + returnKafkaProperties.put(PROPERTY_KEY_DESERIALIZER, keyDeserializer); + returnKafkaProperties.put(PROPERTY_VALUE_DESERIALIZER, valueDeserializer); - /** - * Gets the producer topic. - * - * @return the producer topic - */ - public String getProducerTopic() { - return producerTopic; + return returnKafkaProperties; } /** - * Gets the consumer poll time. + * Gets the consumer topic list. * - * @return the consumer poll time + * @return the consumer topic list */ - public long getConsumerPollTime() { - return consumerPollTime; + public Collection getConsumerTopicListAsCollection() { + return Arrays.asList(consumerTopicList); } /** @@ -274,60 +219,6 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter return Duration.ofMillis(consumerPollTime); } - /** - * Gets the consumer topic list. - * - * @return the consumer topic list - */ - public Collection getConsumerTopicList() { - return Arrays.asList(consumerTopicList); - } - - /** - * Gets the key serializer. - * - * @return the key serializer - */ - public String getKeySerializer() { - return keySerializer; - } - - /** - * Gets the value serializer. - * - * @return the value serializer - */ - public String getValueSerializer() { - return valueSerializer; - } - - /** - * Gets the key deserializer. - * - * @return the key deserializer - */ - public String getKeyDeserializer() { - return keyDeserializer; - } - - /** - * Gets the value deserializer. - * - * @return the value deserializer - */ - public String getValueDeserializer() { - return valueDeserializer; - } - - /** - * Gets the value deserializer. - * - * @return the value deserializer - */ - public String getPartitionerClass() { - return partitionerClass; - } - /* * (non-Javadoc) * @@ -337,136 +228,63 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter public GroupValidationResult validate() { final GroupValidationResult result = super.validate(); - validateStringParameters(result); - - validateNumericParameters(result); - validateConsumerTopicList(result); - validateSerializersAndDeserializers(result); + validateKafkaProperties(result); return result; } /** - * Validate that string parameters are correct. - * - * @param result the result of the validation - */ - private void validateStringParameters(final GroupValidationResult result) { - if (isNullOrBlank(bootstrapServers)) { - result.setResult("bootstrapServers", ValidationStatus.INVALID, - "not specified, must be specified as a string of form host:port"); - } - - if (isNullOrBlank(acks)) { - result.setResult("acks", ValidationStatus.INVALID, - "not specified, must be specified as a string with values [0|1|all]"); - } - - if (isNullOrBlank(groupId)) { - result.setResult("groupId", ValidationStatus.INVALID, SPECIFY_AS_STRING_MESSAGE); - } - - if (isNullOrBlank(producerTopic)) { - result.setResult("producerTopic", ValidationStatus.INVALID, - SPECIFY_AS_STRING_MESSAGE); - } - - if (isNullOrBlank(partitionerClass)) { - result.setResult("partitionerClass", ValidationStatus.INVALID, - SPECIFY_AS_STRING_MESSAGE); - } - } - - /** - * Check if numeric parameters are valid. - * - * @param result the result of the validation - */ - private void validateNumericParameters(final GroupValidationResult result) { - if (retries < 0) { - result.setResult(PROPERTY_RETRIES, ValidationStatus.INVALID, - "[" + retries + "] invalid, must be specified as retries >= 0"); - } - - if (batchSize < 0) { - result.setResult("batchSize", ValidationStatus.INVALID, - "[" + batchSize + "] invalid, must be specified as batchSize >= 0"); - } - - if (lingerTime < 0) { - result.setResult("lingerTime", ValidationStatus.INVALID, - "[" + lingerTime + "] invalid, must be specified as lingerTime >= 0"); - } - - if (bufferMemory < 0) { - result.setResult("bufferMemory", ValidationStatus.INVALID, - "[" + bufferMemory + "] invalid, must be specified as bufferMemory >= 0"); - } - - if (autoCommitTime < 0) { - result.setResult("autoCommitTime", ValidationStatus.INVALID, - "[" + autoCommitTime + "] invalid, must be specified as autoCommitTime >= 0"); - } - - if (sessionTimeout < 0) { - result.setResult("sessionTimeout", ValidationStatus.INVALID, - "[" + sessionTimeout + "] invalid, must be specified as sessionTimeout >= 0"); - } - - if (consumerPollTime < 0) { - result.setResult("consumerPollTime", ValidationStatus.INVALID, - "[" + consumerPollTime + "] invalid, must be specified as consumerPollTime >= 0"); - } - } - - /** - * Validate the serializers and deserializers. + * Validate the consumer topic list. * * @param result the result of the validation. */ - private void validateSerializersAndDeserializers(final GroupValidationResult result) { - if (isNullOrBlank(keySerializer)) { - result.setResult("keySerializer", ValidationStatus.INVALID, - SPECIFY_AS_STRING_MESSAGE); - } - - if (isNullOrBlank(valueSerializer)) { - result.setResult("valueSerializer", ValidationStatus.INVALID, - SPECIFY_AS_STRING_MESSAGE); - } - - if (isNullOrBlank(keyDeserializer)) { - result.setResult("keyDeserializer", ValidationStatus.INVALID, - SPECIFY_AS_STRING_MESSAGE); - } - - if (isNullOrBlank(valueDeserializer)) { - result.setResult("valueDeserializer", ValidationStatus.INVALID, - SPECIFY_AS_STRING_MESSAGE); - } - } - private void validateConsumerTopicList(final GroupValidationResult result) { if (consumerTopicList == null || consumerTopicList.length == 0) { result.setResult("consumerTopicList", ValidationStatus.INVALID, - "not specified, must be specified as a list of strings"); + "not specified, must be specified as a list of strings"); + return; } StringBuilder consumerTopicStringBuilder = new StringBuilder(); for (final String consumerTopic : consumerTopicList) { - if (consumerTopic == null || consumerTopic.trim().length() == 0) { + if (StringUtils.isBlank(consumerTopic)) { consumerTopicStringBuilder.append(consumerTopic + "/"); } } if (consumerTopicStringBuilder.length() > 0) { result.setResult("consumerTopicList", ValidationStatus.INVALID, - "invalid consumer topic list entries found: /" + consumerTopicStringBuilder.toString()); + "invalid consumer topic list entries found: /" + consumerTopicStringBuilder.toString()); } } - private boolean isNullOrBlank(final String stringValue) { - return stringValue == null || stringValue.trim().length() == 0; + /** + * Validate the kafka properties. + * + * @param result the result of the validation. + */ + private void validateKafkaProperties(final GroupValidationResult result) { + // Kafka properties are optional + if (kafkaProperties == null || kafkaProperties.length == 0) { + return; + } + + for (int i = 0; i < kafkaProperties.length; i++) { + if (kafkaProperties[i].length != 2) { + result.setResult(KAFKA_PROPERTIES, ValidationStatus.INVALID, + ENTRY + i + " invalid, kafka properties must be name-value pairs"); + } + + if (StringUtils.isBlank(kafkaProperties[i][0])) { + result.setResult(KAFKA_PROPERTIES, ValidationStatus.INVALID, + ENTRY + i + " invalid, key is null or blank"); + } + + if (StringUtils.isBlank(kafkaProperties[i][1])) { + result.setResult(KAFKA_PROPERTIES, ValidationStatus.INVALID, + ENTRY + i + " invalid, value is null or blank"); + } + } } } -- cgit 1.2.3-korg