aboutsummaryrefslogtreecommitdiffstats
path: root/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java')
-rw-r--r--plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java79
1 files changed, 55 insertions, 24 deletions
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java
index 927d79ee1..9d9acf625 100644
--- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java
+++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java
@@ -154,26 +154,28 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter
* @return the kafka producer properties
*/
public Properties getKafkaProducerProperties() {
- final Properties returnKafkaProperties = new Properties();
+ final Properties retKafkaProps = new Properties();
// Add properties from the Kafka property array
if (kafkaProperties != null) {
for (int i = 0; i < kafkaProperties.length; i++) {
- returnKafkaProperties.put(kafkaProperties[i][0], kafkaProperties[i][1]);
+ retKafkaProps.setProperty(kafkaProperties[i][0], kafkaProperties[i][1]);
}
}
- returnKafkaProperties.put(PROPERTY_BOOTSTRAP_SERVERS, bootstrapServers);
- returnKafkaProperties.put(PROPERTY_ACKS, acks);
- returnKafkaProperties.put(PROPERTY_RETRIES, retries);
- returnKafkaProperties.put(PROPERTY_BATCH_SIZE, batchSize);
- returnKafkaProperties.put(PROPERTY_LINGER_TIME, lingerTime);
- returnKafkaProperties.put(PROPERTY_BUFFER_MEMORY, bufferMemory);
- returnKafkaProperties.put(PROPERTY_KEY_SERIALIZER, keySerializer);
- returnKafkaProperties.put(PROPERTY_VALUE_SERIALIZER, valueSerializer);
- returnKafkaProperties.put(PROPERTY_PARTITIONER_CLASS, partitionerClass);
-
- return returnKafkaProperties;
+ // @formatter:off
+ putExplicitProperty(retKafkaProps, PROPERTY_BOOTSTRAP_SERVERS, bootstrapServers, DEFAULT_BOOT_SERVERS);
+ putExplicitProperty(retKafkaProps, PROPERTY_ACKS, acks, DEFAULT_ACKS);
+ putExplicitProperty(retKafkaProps, PROPERTY_RETRIES, retries, DEFAULT_RETRIES);
+ putExplicitProperty(retKafkaProps, PROPERTY_BATCH_SIZE, batchSize, DEFAULT_BATCH_SIZE);
+ putExplicitProperty(retKafkaProps, PROPERTY_LINGER_TIME, lingerTime, DEFAULT_LINGER_TIME);
+ putExplicitProperty(retKafkaProps, PROPERTY_BUFFER_MEMORY, bufferMemory, DEFAULT_BUFFER_MEMORY);
+ putExplicitProperty(retKafkaProps, PROPERTY_KEY_SERIALIZER, keySerializer, DEFAULT_STRING_SERZER);
+ putExplicitProperty(retKafkaProps, PROPERTY_VALUE_SERIALIZER, valueSerializer, DEFAULT_STRING_SERZER);
+ putExplicitProperty(retKafkaProps, PROPERTY_PARTITIONER_CLASS, partitionerClass, DEFAULT_PARTITIONR_CLASS);
+ // @formatter:on
+
+ return retKafkaProps;
}
/**
@@ -182,24 +184,26 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter
* @return the kafka consumer properties
*/
public Properties getKafkaConsumerProperties() {
- final Properties returnKafkaProperties = new Properties();
+ final Properties retKafkaProps = new Properties();
// Add properties from the Kafka property array
if (kafkaProperties != null) {
for (int i = 0; i < kafkaProperties.length; i++) {
- returnKafkaProperties.put(kafkaProperties[i][0], kafkaProperties[i][1]);
+ retKafkaProps.setProperty(kafkaProperties[i][0], kafkaProperties[i][1]);
}
}
- returnKafkaProperties.put(PROPERTY_BOOTSTRAP_SERVERS, bootstrapServers);
- returnKafkaProperties.put(PROPERTY_GROUP_ID, groupId);
- returnKafkaProperties.put(PROPERTY_ENABLE_AUTO_COMMIT, enableAutoCommit);
- returnKafkaProperties.put(PROPERTY_AUTO_COMMIT_TIME, autoCommitTime);
- returnKafkaProperties.put(PROPERTY_SESSION_TIMEOUT, sessionTimeout);
- returnKafkaProperties.put(PROPERTY_KEY_DESERIALIZER, keyDeserializer);
- returnKafkaProperties.put(PROPERTY_VALUE_DESERIALIZER, valueDeserializer);
-
- return returnKafkaProperties;
+ // @formatter:off
+ putExplicitProperty(retKafkaProps, PROPERTY_BOOTSTRAP_SERVERS, bootstrapServers, DEFAULT_BOOT_SERVERS);
+ putExplicitProperty(retKafkaProps, PROPERTY_GROUP_ID, groupId, DEFAULT_GROUP_ID);
+ putExplicitProperty(retKafkaProps, PROPERTY_ENABLE_AUTO_COMMIT, enableAutoCommit, DEFAULT_ENABLE_AUTOCMIT);
+ putExplicitProperty(retKafkaProps, PROPERTY_AUTO_COMMIT_TIME, autoCommitTime, DEFAULT_AUTO_COMMIT_TIME);
+ putExplicitProperty(retKafkaProps, PROPERTY_SESSION_TIMEOUT, sessionTimeout, DEFAULT_SESSION_TIMEOUT);
+ putExplicitProperty(retKafkaProps, PROPERTY_KEY_DESERIALIZER, keyDeserializer, DEFAULT_STRING_DESZER);
+ putExplicitProperty(retKafkaProps, PROPERTY_VALUE_DESERIALIZER, valueDeserializer, DEFAULT_STRING_DESZER);
+ // @formatter:on
+
+ return retKafkaProps;
}
/**
@@ -285,4 +289,31 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter
}
}
}
+
+ /**
+ * Put a property into the properties if it is not already defined and is not the default value.
+ *
+ * @param returnKafkaProperties the properties to set the value in
+ * @param property the property to put
+ * @param value the value of the property to put
+ * @param defaultValue the default value of the property to put
+ */
+ private void putExplicitProperty(final Properties returnKafkaProperties, final String property,
+ final Object value, final Object defaultValue) {
+
+ // Check if the property is already in the properties
+ if (!returnKafkaProperties.containsKey(property)) {
+ // Not found, so add it
+ returnKafkaProperties.setProperty(property, value.toString());
+ }
+ else {
+ // Found, only overwrite if the property does not have the default value
+ if (value == null) {
+ returnKafkaProperties.setProperty(property, defaultValue.toString());
+ }
+ else if (!value.toString().contentEquals(defaultValue.toString())) {
+ returnKafkaProperties.setProperty(property, value.toString());
+ }
+ }
+ }
}