From cad7cff7dc945eefaf27815742461d6db6ab8eac Mon Sep 17 00:00:00 2001 From: liamfallon Date: Fri, 5 Jul 2019 13:41:03 +0000 Subject: Add duplicate check, examples for kafka Properties Added checks for dealing with duplication of specification of properties explicitly and in kafkaPropertes Added examples for kafkaProperties Added documentation for kafkaProperties Issue-ID: POLICY-1818 Change-Id: Icbe01d6f1d25d4570dcc85cc3db28588743b9c41 Signed-off-by: liamfallon --- .../kafka/KafkaCarrierTechnologyParameters.java | 79 +++++++++++++++------- 1 file changed, 55 insertions(+), 24 deletions(-) (limited to 'plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main') diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java index 927d79ee1..9d9acf625 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java @@ -154,26 +154,28 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter * @return the kafka producer properties */ public Properties getKafkaProducerProperties() { - final Properties returnKafkaProperties = new Properties(); + final Properties retKafkaProps = new Properties(); // Add properties from the Kafka property array if (kafkaProperties != null) { for (int i = 0; i < kafkaProperties.length; i++) { - returnKafkaProperties.put(kafkaProperties[i][0], kafkaProperties[i][1]); + retKafkaProps.setProperty(kafkaProperties[i][0], kafkaProperties[i][1]); } } - returnKafkaProperties.put(PROPERTY_BOOTSTRAP_SERVERS, bootstrapServers); - returnKafkaProperties.put(PROPERTY_ACKS, acks); - returnKafkaProperties.put(PROPERTY_RETRIES, retries); - returnKafkaProperties.put(PROPERTY_BATCH_SIZE, batchSize); - returnKafkaProperties.put(PROPERTY_LINGER_TIME, lingerTime); - returnKafkaProperties.put(PROPERTY_BUFFER_MEMORY, bufferMemory); - returnKafkaProperties.put(PROPERTY_KEY_SERIALIZER, keySerializer); - returnKafkaProperties.put(PROPERTY_VALUE_SERIALIZER, valueSerializer); - returnKafkaProperties.put(PROPERTY_PARTITIONER_CLASS, partitionerClass); - - return returnKafkaProperties; + // @formatter:off + putExplicitProperty(retKafkaProps, PROPERTY_BOOTSTRAP_SERVERS, bootstrapServers, DEFAULT_BOOT_SERVERS); + putExplicitProperty(retKafkaProps, PROPERTY_ACKS, acks, DEFAULT_ACKS); + putExplicitProperty(retKafkaProps, PROPERTY_RETRIES, retries, DEFAULT_RETRIES); + putExplicitProperty(retKafkaProps, PROPERTY_BATCH_SIZE, batchSize, DEFAULT_BATCH_SIZE); + putExplicitProperty(retKafkaProps, PROPERTY_LINGER_TIME, lingerTime, DEFAULT_LINGER_TIME); + putExplicitProperty(retKafkaProps, PROPERTY_BUFFER_MEMORY, bufferMemory, DEFAULT_BUFFER_MEMORY); + putExplicitProperty(retKafkaProps, PROPERTY_KEY_SERIALIZER, keySerializer, DEFAULT_STRING_SERZER); + putExplicitProperty(retKafkaProps, PROPERTY_VALUE_SERIALIZER, valueSerializer, DEFAULT_STRING_SERZER); + putExplicitProperty(retKafkaProps, PROPERTY_PARTITIONER_CLASS, partitionerClass, DEFAULT_PARTITIONR_CLASS); + // @formatter:on + + return retKafkaProps; } /** @@ -182,24 +184,26 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter * @return the kafka consumer properties */ public Properties getKafkaConsumerProperties() { - final Properties returnKafkaProperties = new Properties(); + final Properties retKafkaProps = new Properties(); // Add properties from the Kafka property array if (kafkaProperties != null) { for (int i = 0; i < kafkaProperties.length; i++) { - returnKafkaProperties.put(kafkaProperties[i][0], kafkaProperties[i][1]); + retKafkaProps.setProperty(kafkaProperties[i][0], kafkaProperties[i][1]); } } - returnKafkaProperties.put(PROPERTY_BOOTSTRAP_SERVERS, bootstrapServers); - returnKafkaProperties.put(PROPERTY_GROUP_ID, groupId); - returnKafkaProperties.put(PROPERTY_ENABLE_AUTO_COMMIT, enableAutoCommit); - returnKafkaProperties.put(PROPERTY_AUTO_COMMIT_TIME, autoCommitTime); - returnKafkaProperties.put(PROPERTY_SESSION_TIMEOUT, sessionTimeout); - returnKafkaProperties.put(PROPERTY_KEY_DESERIALIZER, keyDeserializer); - returnKafkaProperties.put(PROPERTY_VALUE_DESERIALIZER, valueDeserializer); - - return returnKafkaProperties; + // @formatter:off + putExplicitProperty(retKafkaProps, PROPERTY_BOOTSTRAP_SERVERS, bootstrapServers, DEFAULT_BOOT_SERVERS); + putExplicitProperty(retKafkaProps, PROPERTY_GROUP_ID, groupId, DEFAULT_GROUP_ID); + putExplicitProperty(retKafkaProps, PROPERTY_ENABLE_AUTO_COMMIT, enableAutoCommit, DEFAULT_ENABLE_AUTOCMIT); + putExplicitProperty(retKafkaProps, PROPERTY_AUTO_COMMIT_TIME, autoCommitTime, DEFAULT_AUTO_COMMIT_TIME); + putExplicitProperty(retKafkaProps, PROPERTY_SESSION_TIMEOUT, sessionTimeout, DEFAULT_SESSION_TIMEOUT); + putExplicitProperty(retKafkaProps, PROPERTY_KEY_DESERIALIZER, keyDeserializer, DEFAULT_STRING_DESZER); + putExplicitProperty(retKafkaProps, PROPERTY_VALUE_DESERIALIZER, valueDeserializer, DEFAULT_STRING_DESZER); + // @formatter:on + + return retKafkaProps; } /** @@ -285,4 +289,31 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter } } } + + /** + * Put a property into the properties if it is not already defined and is not the default value. + * + * @param returnKafkaProperties the properties to set the value in + * @param property the property to put + * @param value the value of the property to put + * @param defaultValue the default value of the property to put + */ + private void putExplicitProperty(final Properties returnKafkaProperties, final String property, + final Object value, final Object defaultValue) { + + // Check if the property is already in the properties + if (!returnKafkaProperties.containsKey(property)) { + // Not found, so add it + returnKafkaProperties.setProperty(property, value.toString()); + } + else { + // Found, only overwrite if the property does not have the default value + if (value == null) { + returnKafkaProperties.setProperty(property, defaultValue.toString()); + } + else if (!value.toString().contentEquals(defaultValue.toString())) { + returnKafkaProperties.setProperty(property, value.toString()); + } + } + } } -- cgit 1.2.3-korg