summaryrefslogtreecommitdiffstats
path: root/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main
diff options
context:
space:
mode:
authorliamfallon <liam.fallon@est.tech>2019-07-05 13:41:03 +0000
committerliamfallon <liam.fallon@est.tech>2019-07-05 13:41:03 +0000
commitcad7cff7dc945eefaf27815742461d6db6ab8eac (patch)
tree4f3e6b26fadd9fff47deccfe2cbff0e76a46e948 /plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main
parent9f0678f3ab333949076b5b9747158cf40e08fbef (diff)
Add duplicate check, examples for kafka Properties
Added checks for dealing with duplication of specification of properties explicitly and in kafkaPropertes Added examples for kafkaProperties Added documentation for kafkaProperties Issue-ID: POLICY-1818 Change-Id: Icbe01d6f1d25d4570dcc85cc3db28588743b9c41 Signed-off-by: liamfallon <liam.fallon@est.tech>
Diffstat (limited to 'plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main')
-rw-r--r--plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java79
1 files changed, 55 insertions, 24 deletions
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java
index 927d79ee1..9d9acf625 100644
--- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java
+++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java
@@ -154,26 +154,28 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter
* @return the kafka producer properties
*/
public Properties getKafkaProducerProperties() {
- final Properties returnKafkaProperties = new Properties();
+ final Properties retKafkaProps = new Properties();
// Add properties from the Kafka property array
if (kafkaProperties != null) {
for (int i = 0; i < kafkaProperties.length; i++) {
- returnKafkaProperties.put(kafkaProperties[i][0], kafkaProperties[i][1]);
+ retKafkaProps.setProperty(kafkaProperties[i][0], kafkaProperties[i][1]);
}
}
- returnKafkaProperties.put(PROPERTY_BOOTSTRAP_SERVERS, bootstrapServers);
- returnKafkaProperties.put(PROPERTY_ACKS, acks);
- returnKafkaProperties.put(PROPERTY_RETRIES, retries);
- returnKafkaProperties.put(PROPERTY_BATCH_SIZE, batchSize);
- returnKafkaProperties.put(PROPERTY_LINGER_TIME, lingerTime);
- returnKafkaProperties.put(PROPERTY_BUFFER_MEMORY, bufferMemory);
- returnKafkaProperties.put(PROPERTY_KEY_SERIALIZER, keySerializer);
- returnKafkaProperties.put(PROPERTY_VALUE_SERIALIZER, valueSerializer);
- returnKafkaProperties.put(PROPERTY_PARTITIONER_CLASS, partitionerClass);
-
- return returnKafkaProperties;
+ // @formatter:off
+ putExplicitProperty(retKafkaProps, PROPERTY_BOOTSTRAP_SERVERS, bootstrapServers, DEFAULT_BOOT_SERVERS);
+ putExplicitProperty(retKafkaProps, PROPERTY_ACKS, acks, DEFAULT_ACKS);
+ putExplicitProperty(retKafkaProps, PROPERTY_RETRIES, retries, DEFAULT_RETRIES);
+ putExplicitProperty(retKafkaProps, PROPERTY_BATCH_SIZE, batchSize, DEFAULT_BATCH_SIZE);
+ putExplicitProperty(retKafkaProps, PROPERTY_LINGER_TIME, lingerTime, DEFAULT_LINGER_TIME);
+ putExplicitProperty(retKafkaProps, PROPERTY_BUFFER_MEMORY, bufferMemory, DEFAULT_BUFFER_MEMORY);
+ putExplicitProperty(retKafkaProps, PROPERTY_KEY_SERIALIZER, keySerializer, DEFAULT_STRING_SERZER);
+ putExplicitProperty(retKafkaProps, PROPERTY_VALUE_SERIALIZER, valueSerializer, DEFAULT_STRING_SERZER);
+ putExplicitProperty(retKafkaProps, PROPERTY_PARTITIONER_CLASS, partitionerClass, DEFAULT_PARTITIONR_CLASS);
+ // @formatter:on
+
+ return retKafkaProps;
}
/**
@@ -182,24 +184,26 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter
* @return the kafka consumer properties
*/
public Properties getKafkaConsumerProperties() {
- final Properties returnKafkaProperties = new Properties();
+ final Properties retKafkaProps = new Properties();
// Add properties from the Kafka property array
if (kafkaProperties != null) {
for (int i = 0; i < kafkaProperties.length; i++) {
- returnKafkaProperties.put(kafkaProperties[i][0], kafkaProperties[i][1]);
+ retKafkaProps.setProperty(kafkaProperties[i][0], kafkaProperties[i][1]);
}
}
- returnKafkaProperties.put(PROPERTY_BOOTSTRAP_SERVERS, bootstrapServers);
- returnKafkaProperties.put(PROPERTY_GROUP_ID, groupId);
- returnKafkaProperties.put(PROPERTY_ENABLE_AUTO_COMMIT, enableAutoCommit);
- returnKafkaProperties.put(PROPERTY_AUTO_COMMIT_TIME, autoCommitTime);
- returnKafkaProperties.put(PROPERTY_SESSION_TIMEOUT, sessionTimeout);
- returnKafkaProperties.put(PROPERTY_KEY_DESERIALIZER, keyDeserializer);
- returnKafkaProperties.put(PROPERTY_VALUE_DESERIALIZER, valueDeserializer);
-
- return returnKafkaProperties;
+ // @formatter:off
+ putExplicitProperty(retKafkaProps, PROPERTY_BOOTSTRAP_SERVERS, bootstrapServers, DEFAULT_BOOT_SERVERS);
+ putExplicitProperty(retKafkaProps, PROPERTY_GROUP_ID, groupId, DEFAULT_GROUP_ID);
+ putExplicitProperty(retKafkaProps, PROPERTY_ENABLE_AUTO_COMMIT, enableAutoCommit, DEFAULT_ENABLE_AUTOCMIT);
+ putExplicitProperty(retKafkaProps, PROPERTY_AUTO_COMMIT_TIME, autoCommitTime, DEFAULT_AUTO_COMMIT_TIME);
+ putExplicitProperty(retKafkaProps, PROPERTY_SESSION_TIMEOUT, sessionTimeout, DEFAULT_SESSION_TIMEOUT);
+ putExplicitProperty(retKafkaProps, PROPERTY_KEY_DESERIALIZER, keyDeserializer, DEFAULT_STRING_DESZER);
+ putExplicitProperty(retKafkaProps, PROPERTY_VALUE_DESERIALIZER, valueDeserializer, DEFAULT_STRING_DESZER);
+ // @formatter:on
+
+ return retKafkaProps;
}
/**
@@ -285,4 +289,31 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter
}
}
}
+
+ /**
+ * Put a property into the properties if it is not already defined and is not the default value.
+ *
+ * @param returnKafkaProperties the properties to set the value in
+ * @param property the property to put
+ * @param value the value of the property to put
+ * @param defaultValue the default value of the property to put
+ */
+ private void putExplicitProperty(final Properties returnKafkaProperties, final String property,
+ final Object value, final Object defaultValue) {
+
+ // Check if the property is already in the properties
+ if (!returnKafkaProperties.containsKey(property)) {
+ // Not found, so add it
+ returnKafkaProperties.setProperty(property, value.toString());
+ }
+ else {
+ // Found, only overwrite if the property does not have the default value
+ if (value == null) {
+ returnKafkaProperties.setProperty(property, defaultValue.toString());
+ }
+ else if (!value.toString().contentEquals(defaultValue.toString())) {
+ returnKafkaProperties.setProperty(property, value.toString());
+ }
+ }
+ }
}