summaryrefslogtreecommitdiffstats
path: root/plugins/plugins-event
diff options
context:
space:
mode:
authorliamfallon <liam.fallon@est.tech>2019-04-10 12:52:37 +0000
committerliamfallon <liam.fallon@est.tech>2019-04-10 12:52:37 +0000
commit20b67a6e8cd07b7e49a7725288d95fb66dcebae0 (patch)
tree96dbcd04cab49434d84c4554064052116a842a9c /plugins/plugins-event
parentf2f9e5e8c4ce5c3ff75cbe6f6f4e2de5e4b3f3fb (diff)
Fix Kafka partiion support in APEX
The kafka producer property ‘partitioner.class’ should be configurable via the EngineConfig.json file so that a specialized or custom Kafka partitioner can be used. Issue-ID: POLICY-1627 Change-Id: Ic36ccdf3d244ab932b58c3e2ae1cd668249726c5 Signed-off-by: liamfallon <liam.fallon@est.tech>
Diffstat (limited to 'plugins/plugins-event')
-rw-r--r--plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java25
1 files changed, 22 insertions, 3 deletions
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java
index 851741611..6aa9d53e6 100644
--- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java
+++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java
@@ -25,6 +25,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Properties;
+import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
import org.onap.policy.apex.service.parameters.carriertechnology.CarrierTechnologyParameters;
import org.onap.policy.common.parameters.GroupValidationResult;
import org.onap.policy.common.parameters.ValidationStatus;
@@ -64,6 +65,7 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter
private static final String[] DEFAULT_CONS_TOPICLIST = {"apex-in"};
private static final String DEFAULT_STRING_SERZER = "org.apache.kafka.common.serialization.StringSerializer";
private static final String DEFAULT_STRING_DESZER = "org.apache.kafka.common.serialization.StringDeserializer";
+ private static final String DEFAULT_PARTITIONR_CLASS = DefaultPartitioner.class.getCanonicalName();
// Parameter property map tokens
private static final String PROPERTY_BOOTSTRAP_SERVERS = "bootstrap.servers";
@@ -80,6 +82,7 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter
private static final String PROPERTY_VALUE_SERIALIZER = "value.serializer";
private static final String PROPERTY_KEY_DESERIALIZER = "key.deserializer";
private static final String PROPERTY_VALUE_DESERIALIZER = "value.deserializer";
+ private static final String PROPERTY_PARTITIONER_CLASS = "partitioner.class";
// kafka carrier parameters
private String bootstrapServers = DEFAULT_BOOT_SERVERS;
@@ -99,6 +102,7 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter
private String valueSerializer = DEFAULT_STRING_SERZER;
private String keyDeserializer = DEFAULT_STRING_DESZER;
private String valueDeserializer = DEFAULT_STRING_DESZER;
+ private String partitionerClass = DEFAULT_PARTITIONR_CLASS;
// @formatter:on
/**
@@ -130,6 +134,7 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter
kafkaProperties.put(PROPERTY_BUFFER_MEMORY, bufferMemory);
kafkaProperties.put(PROPERTY_KEY_SERIALIZER, keySerializer);
kafkaProperties.put(PROPERTY_VALUE_SERIALIZER, valueSerializer);
+ kafkaProperties.put(PROPERTY_PARTITIONER_CLASS, partitionerClass);
return kafkaProperties;
}
@@ -314,6 +319,15 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter
return valueDeserializer;
}
+ /**
+ * Gets the value deserializer.
+ *
+ * @return the value deserializer
+ */
+ public String getPartitionerClass() {
+ return partitionerClass;
+ }
+
/*
* (non-Javadoc)
*
@@ -336,7 +350,7 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter
/**
* Validate that string parameters are correct.
- *
+ *
* @param result the result of the validation
*/
private void validateStringParameters(final GroupValidationResult result) {
@@ -358,11 +372,16 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter
result.setResult("producerTopic", ValidationStatus.INVALID,
SPECIFY_AS_STRING_MESSAGE);
}
+
+ if (isNullOrBlank(partitionerClass)) {
+ result.setResult("partitionerClass", ValidationStatus.INVALID,
+ SPECIFY_AS_STRING_MESSAGE);
+ }
}
/**
* Check if numeric parameters are valid.
- *
+ *
* @param result the result of the validation
*/
private void validateNumericParameters(final GroupValidationResult result) {
@@ -404,7 +423,7 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter
/**
* Validate the serializers and deserializers.
- *
+ *
* @param result the result of the validation.
*/
private void validateSerializersAndDeserializers(final GroupValidationResult result) {