From 20b67a6e8cd07b7e49a7725288d95fb66dcebae0 Mon Sep 17 00:00:00 2001 From: liamfallon Date: Wed, 10 Apr 2019 12:52:37 +0000 Subject: Fix Kafka partiion support in APEX MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The kafka producer property ‘partitioner.class’ should be configurable via the EngineConfig.json file so that a specialized or custom Kafka partitioner can be used. Issue-ID: POLICY-1627 Change-Id: Ic36ccdf3d244ab932b58c3e2ae1cd668249726c5 Signed-off-by: liamfallon --- .../kafka/KafkaCarrierTechnologyParameters.java | 25 +++++++++++++++++++--- 1 file changed, 22 insertions(+), 3 deletions(-) (limited to 'plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka') diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java index 851741611..6aa9d53e6 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java @@ -25,6 +25,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Properties; +import org.apache.kafka.clients.producer.internals.DefaultPartitioner; import org.onap.policy.apex.service.parameters.carriertechnology.CarrierTechnologyParameters; import org.onap.policy.common.parameters.GroupValidationResult; import org.onap.policy.common.parameters.ValidationStatus; @@ -64,6 +65,7 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter private static final String[] DEFAULT_CONS_TOPICLIST = {"apex-in"}; private static final String DEFAULT_STRING_SERZER = "org.apache.kafka.common.serialization.StringSerializer"; private static final String DEFAULT_STRING_DESZER = "org.apache.kafka.common.serialization.StringDeserializer"; + private static final String DEFAULT_PARTITIONR_CLASS = DefaultPartitioner.class.getCanonicalName(); // Parameter property map tokens private static final String PROPERTY_BOOTSTRAP_SERVERS = "bootstrap.servers"; @@ -80,6 +82,7 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter private static final String PROPERTY_VALUE_SERIALIZER = "value.serializer"; private static final String PROPERTY_KEY_DESERIALIZER = "key.deserializer"; private static final String PROPERTY_VALUE_DESERIALIZER = "value.deserializer"; + private static final String PROPERTY_PARTITIONER_CLASS = "partitioner.class"; // kafka carrier parameters private String bootstrapServers = DEFAULT_BOOT_SERVERS; @@ -99,6 +102,7 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter private String valueSerializer = DEFAULT_STRING_SERZER; private String keyDeserializer = DEFAULT_STRING_DESZER; private String valueDeserializer = DEFAULT_STRING_DESZER; + private String partitionerClass = DEFAULT_PARTITIONR_CLASS; // @formatter:on /** @@ -130,6 +134,7 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter kafkaProperties.put(PROPERTY_BUFFER_MEMORY, bufferMemory); kafkaProperties.put(PROPERTY_KEY_SERIALIZER, keySerializer); kafkaProperties.put(PROPERTY_VALUE_SERIALIZER, valueSerializer); + kafkaProperties.put(PROPERTY_PARTITIONER_CLASS, partitionerClass); return kafkaProperties; } @@ -314,6 +319,15 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter return valueDeserializer; } + /** + * Gets the value deserializer. + * + * @return the value deserializer + */ + public String getPartitionerClass() { + return partitionerClass; + } + /* * (non-Javadoc) * @@ -336,7 +350,7 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter /** * Validate that string parameters are correct. - * + * * @param result the result of the validation */ private void validateStringParameters(final GroupValidationResult result) { @@ -358,11 +372,16 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter result.setResult("producerTopic", ValidationStatus.INVALID, SPECIFY_AS_STRING_MESSAGE); } + + if (isNullOrBlank(partitionerClass)) { + result.setResult("partitionerClass", ValidationStatus.INVALID, + SPECIFY_AS_STRING_MESSAGE); + } } /** * Check if numeric parameters are valid. - * + * * @param result the result of the validation */ private void validateNumericParameters(final GroupValidationResult result) { @@ -404,7 +423,7 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter /** * Validate the serializers and deserializers. - * + * * @param result the result of the validation. */ private void validateSerializersAndDeserializers(final GroupValidationResult result) { -- cgit 1.2.3-korg