diff options
author | liamfallon <liam.fallon@est.tech> | 2019-04-10 12:52:37 +0000 |
---|---|---|
committer | liamfallon <liam.fallon@est.tech> | 2019-04-10 12:52:37 +0000 |
commit | 20b67a6e8cd07b7e49a7725288d95fb66dcebae0 (patch) | |
tree | 96dbcd04cab49434d84c4554064052116a842a9c /plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka | |
parent | f2f9e5e8c4ce5c3ff75cbe6f6f4e2de5e4b3f3fb (diff) |
Fix Kafka partiion support in APEX
The kafka producer property ‘partitioner.class’ should be configurable via
the EngineConfig.json file so that a specialized or custom Kafka partitioner
can be used.
Issue-ID: POLICY-1627
Change-Id: Ic36ccdf3d244ab932b58c3e2ae1cd668249726c5
Signed-off-by: liamfallon <liam.fallon@est.tech>
Diffstat (limited to 'plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka')
-rw-r--r-- | plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java | 25 |
1 files changed, 22 insertions, 3 deletions
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java index 851741611..6aa9d53e6 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java @@ -25,6 +25,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Properties; +import org.apache.kafka.clients.producer.internals.DefaultPartitioner; import org.onap.policy.apex.service.parameters.carriertechnology.CarrierTechnologyParameters; import org.onap.policy.common.parameters.GroupValidationResult; import org.onap.policy.common.parameters.ValidationStatus; @@ -64,6 +65,7 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter private static final String[] DEFAULT_CONS_TOPICLIST = {"apex-in"}; private static final String DEFAULT_STRING_SERZER = "org.apache.kafka.common.serialization.StringSerializer"; private static final String DEFAULT_STRING_DESZER = "org.apache.kafka.common.serialization.StringDeserializer"; + private static final String DEFAULT_PARTITIONR_CLASS = DefaultPartitioner.class.getCanonicalName(); // Parameter property map tokens private static final String PROPERTY_BOOTSTRAP_SERVERS = "bootstrap.servers"; @@ -80,6 +82,7 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter private static final String PROPERTY_VALUE_SERIALIZER = "value.serializer"; private static final String PROPERTY_KEY_DESERIALIZER = "key.deserializer"; private static final String PROPERTY_VALUE_DESERIALIZER = "value.deserializer"; + private static final String PROPERTY_PARTITIONER_CLASS = "partitioner.class"; // kafka carrier parameters private String bootstrapServers = DEFAULT_BOOT_SERVERS; @@ -99,6 +102,7 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter private String valueSerializer = DEFAULT_STRING_SERZER; private String keyDeserializer = DEFAULT_STRING_DESZER; private String valueDeserializer = DEFAULT_STRING_DESZER; + private String partitionerClass = DEFAULT_PARTITIONR_CLASS; // @formatter:on /** @@ -130,6 +134,7 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter kafkaProperties.put(PROPERTY_BUFFER_MEMORY, bufferMemory); kafkaProperties.put(PROPERTY_KEY_SERIALIZER, keySerializer); kafkaProperties.put(PROPERTY_VALUE_SERIALIZER, valueSerializer); + kafkaProperties.put(PROPERTY_PARTITIONER_CLASS, partitionerClass); return kafkaProperties; } @@ -314,6 +319,15 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter return valueDeserializer; } + /** + * Gets the value deserializer. + * + * @return the value deserializer + */ + public String getPartitionerClass() { + return partitionerClass; + } + /* * (non-Javadoc) * @@ -336,7 +350,7 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter /** * Validate that string parameters are correct. - * + * * @param result the result of the validation */ private void validateStringParameters(final GroupValidationResult result) { @@ -358,11 +372,16 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter result.setResult("producerTopic", ValidationStatus.INVALID, SPECIFY_AS_STRING_MESSAGE); } + + if (isNullOrBlank(partitionerClass)) { + result.setResult("partitionerClass", ValidationStatus.INVALID, + SPECIFY_AS_STRING_MESSAGE); + } } /** * Check if numeric parameters are valid. - * + * * @param result the result of the validation */ private void validateNumericParameters(final GroupValidationResult result) { @@ -404,7 +423,7 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter /** * Validate the serializers and deserializers. - * + * * @param result the result of the validation. */ private void validateSerializersAndDeserializers(final GroupValidationResult result) { |