summaryrefslogtreecommitdiffstats
path: root/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src
diff options
context:
space:
mode:
authorliamfallon <liam.fallon@est.tech>2019-07-05 13:41:03 +0000
committerliamfallon <liam.fallon@est.tech>2019-07-05 13:41:03 +0000
commitcad7cff7dc945eefaf27815742461d6db6ab8eac (patch)
tree4f3e6b26fadd9fff47deccfe2cbff0e76a46e948 /plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src
parent9f0678f3ab333949076b5b9747158cf40e08fbef (diff)
Add duplicate check, examples for kafka Properties
Added checks for dealing with duplication of specification of properties explicitly and in kafkaPropertes Added examples for kafkaProperties Added documentation for kafkaProperties Issue-ID: POLICY-1818 Change-Id: Icbe01d6f1d25d4570dcc85cc3db28588743b9c41 Signed-off-by: liamfallon <liam.fallon@est.tech>
Diffstat (limited to 'plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src')
-rw-r--r--plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java79
-rw-r--r--plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/site-docs/adoc/fragments/ct-kafka-io.adoc16
-rw-r--r--plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParametersTest.java90
3 files changed, 155 insertions, 30 deletions
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java
index 927d79ee1..9d9acf625 100644
--- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java
+++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java
@@ -154,26 +154,28 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter
* @return the kafka producer properties
*/
public Properties getKafkaProducerProperties() {
- final Properties returnKafkaProperties = new Properties();
+ final Properties retKafkaProps = new Properties();
// Add properties from the Kafka property array
if (kafkaProperties != null) {
for (int i = 0; i < kafkaProperties.length; i++) {
- returnKafkaProperties.put(kafkaProperties[i][0], kafkaProperties[i][1]);
+ retKafkaProps.setProperty(kafkaProperties[i][0], kafkaProperties[i][1]);
}
}
- returnKafkaProperties.put(PROPERTY_BOOTSTRAP_SERVERS, bootstrapServers);
- returnKafkaProperties.put(PROPERTY_ACKS, acks);
- returnKafkaProperties.put(PROPERTY_RETRIES, retries);
- returnKafkaProperties.put(PROPERTY_BATCH_SIZE, batchSize);
- returnKafkaProperties.put(PROPERTY_LINGER_TIME, lingerTime);
- returnKafkaProperties.put(PROPERTY_BUFFER_MEMORY, bufferMemory);
- returnKafkaProperties.put(PROPERTY_KEY_SERIALIZER, keySerializer);
- returnKafkaProperties.put(PROPERTY_VALUE_SERIALIZER, valueSerializer);
- returnKafkaProperties.put(PROPERTY_PARTITIONER_CLASS, partitionerClass);
-
- return returnKafkaProperties;
+ // @formatter:off
+ putExplicitProperty(retKafkaProps, PROPERTY_BOOTSTRAP_SERVERS, bootstrapServers, DEFAULT_BOOT_SERVERS);
+ putExplicitProperty(retKafkaProps, PROPERTY_ACKS, acks, DEFAULT_ACKS);
+ putExplicitProperty(retKafkaProps, PROPERTY_RETRIES, retries, DEFAULT_RETRIES);
+ putExplicitProperty(retKafkaProps, PROPERTY_BATCH_SIZE, batchSize, DEFAULT_BATCH_SIZE);
+ putExplicitProperty(retKafkaProps, PROPERTY_LINGER_TIME, lingerTime, DEFAULT_LINGER_TIME);
+ putExplicitProperty(retKafkaProps, PROPERTY_BUFFER_MEMORY, bufferMemory, DEFAULT_BUFFER_MEMORY);
+ putExplicitProperty(retKafkaProps, PROPERTY_KEY_SERIALIZER, keySerializer, DEFAULT_STRING_SERZER);
+ putExplicitProperty(retKafkaProps, PROPERTY_VALUE_SERIALIZER, valueSerializer, DEFAULT_STRING_SERZER);
+ putExplicitProperty(retKafkaProps, PROPERTY_PARTITIONER_CLASS, partitionerClass, DEFAULT_PARTITIONR_CLASS);
+ // @formatter:on
+
+ return retKafkaProps;
}
/**
@@ -182,24 +184,26 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter
* @return the kafka consumer properties
*/
public Properties getKafkaConsumerProperties() {
- final Properties returnKafkaProperties = new Properties();
+ final Properties retKafkaProps = new Properties();
// Add properties from the Kafka property array
if (kafkaProperties != null) {
for (int i = 0; i < kafkaProperties.length; i++) {
- returnKafkaProperties.put(kafkaProperties[i][0], kafkaProperties[i][1]);
+ retKafkaProps.setProperty(kafkaProperties[i][0], kafkaProperties[i][1]);
}
}
- returnKafkaProperties.put(PROPERTY_BOOTSTRAP_SERVERS, bootstrapServers);
- returnKafkaProperties.put(PROPERTY_GROUP_ID, groupId);
- returnKafkaProperties.put(PROPERTY_ENABLE_AUTO_COMMIT, enableAutoCommit);
- returnKafkaProperties.put(PROPERTY_AUTO_COMMIT_TIME, autoCommitTime);
- returnKafkaProperties.put(PROPERTY_SESSION_TIMEOUT, sessionTimeout);
- returnKafkaProperties.put(PROPERTY_KEY_DESERIALIZER, keyDeserializer);
- returnKafkaProperties.put(PROPERTY_VALUE_DESERIALIZER, valueDeserializer);
-
- return returnKafkaProperties;
+ // @formatter:off
+ putExplicitProperty(retKafkaProps, PROPERTY_BOOTSTRAP_SERVERS, bootstrapServers, DEFAULT_BOOT_SERVERS);
+ putExplicitProperty(retKafkaProps, PROPERTY_GROUP_ID, groupId, DEFAULT_GROUP_ID);
+ putExplicitProperty(retKafkaProps, PROPERTY_ENABLE_AUTO_COMMIT, enableAutoCommit, DEFAULT_ENABLE_AUTOCMIT);
+ putExplicitProperty(retKafkaProps, PROPERTY_AUTO_COMMIT_TIME, autoCommitTime, DEFAULT_AUTO_COMMIT_TIME);
+ putExplicitProperty(retKafkaProps, PROPERTY_SESSION_TIMEOUT, sessionTimeout, DEFAULT_SESSION_TIMEOUT);
+ putExplicitProperty(retKafkaProps, PROPERTY_KEY_DESERIALIZER, keyDeserializer, DEFAULT_STRING_DESZER);
+ putExplicitProperty(retKafkaProps, PROPERTY_VALUE_DESERIALIZER, valueDeserializer, DEFAULT_STRING_DESZER);
+ // @formatter:on
+
+ return retKafkaProps;
}
/**
@@ -285,4 +289,31 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter
}
}
}
+
+ /**
+ * Put a property into the properties if it is not already defined and is not the default value.
+ *
+ * @param returnKafkaProperties the properties to set the value in
+ * @param property the property to put
+ * @param value the value of the property to put
+ * @param defaultValue the default value of the property to put
+ */
+ private void putExplicitProperty(final Properties returnKafkaProperties, final String property,
+ final Object value, final Object defaultValue) {
+
+ // Check if the property is already in the properties
+ if (!returnKafkaProperties.containsKey(property)) {
+ // Not found, so add it
+ returnKafkaProperties.setProperty(property, value.toString());
+ }
+ else {
+ // Found, only overwrite if the property does not have the default value
+ if (value == null) {
+ returnKafkaProperties.setProperty(property, defaultValue.toString());
+ }
+ else if (!value.toString().contentEquals(defaultValue.toString())) {
+ returnKafkaProperties.setProperty(property, value.toString());
+ }
+ }
+ }
}
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/site-docs/adoc/fragments/ct-kafka-io.adoc b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/site-docs/adoc/fragments/ct-kafka-io.adoc
index 03e1139a0..94586e9bd 100644
--- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/site-docs/adoc/fragments/ct-kafka-io.adoc
+++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/site-docs/adoc/fragments/ct-kafka-io.adoc
@@ -1,6 +1,7 @@
//
// ============LICENSE_START=======================================================
// Copyright (C) 2016-2018 Ericsson. All rights reserved.
+// Modifications Copyright (C) 2019 Nordix Foundation.
// ================================================================================
// This file is licensed under the CREATIVE COMMONS ATTRIBUTION 4.0 INTERNATIONAL LICENSE
// Full license text at https://creativecommons.org/licenses/by/4.0/legalcode
@@ -45,7 +46,8 @@ The input is uni-directional, an engine will only receive events from the input
"keyDeserializer" :
"org.apache.kafka.common.serialization.StringDeserializer", <9>
"valueDeserializer" :
- "org.apache.kafka.common.serialization.StringDeserializer" <10>
+ "org.apache.kafka.common.serialization.StringDeserializer", <10>
+ "kafkaProperties" : [] <11>
}
}
----
@@ -60,6 +62,8 @@ The input is uni-directional, an engine will only receive events from the input
<8> consumer topic list
<9> key for the Kafka de-serializer
<10> value for the Kafka de-serializer
+<11> an optional list of name value pairs of properties to be passed transparently to Kafka.
+This field need not be specified, can be set to null, or to an empty list as here.
=== Kafka Output
@@ -85,7 +89,11 @@ The output is uni-directional, an engine will send events to the output but not
"keySerializer" :
"org.apache.kafka.common.serialization.StringSerializer", <9>
"valueSerializer" :
- "org.apache.kafka.common.serialization.StringSerializer" <10>
+ "org.apache.kafka.common.serialization.StringSerializer", <10>
+ "kafkaProperties": [ <11>
+ ["message.max.bytes", 1000000],
+ ["compression.codec", "none"]
+ ]
}
}
----
@@ -100,3 +108,7 @@ The output is uni-directional, an engine will send events to the output but not
<8> producer topic
<9> key for the Kafka serializer
<10> value for the Kafka serializer
+<11> an optional list of name value pairs of properties to be passed transparently to Kafka. If a property appears in
+the _kafkaProperties_ field and is also explicitly specified to a non-default value (such as _lingerTime_
+and _linger.ms_) the explictly specified value of the property is used rather than the value specified in the
+_kafkaProperties_ list.
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParametersTest.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParametersTest.java
index 2f5405ba8..5b6ec33c6 100644
--- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParametersTest.java
+++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParametersTest.java
@@ -1,6 +1,7 @@
/*-
* ============LICENSE_START=======================================================
* Copyright (C) 2019 Samsung. All rights reserved.
+ * Modifications Copyright (C) 2019 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -45,7 +46,7 @@ public class KafkaCarrierTechnologyParametersTest {
Properties kafkaProducerProperties = kafkaCarrierTechnologyParameters.getKafkaProducerProperties();
assertNotNull(kafkaProducerProperties);
assertEquals("localhost:9092", kafkaProducerProperties.get("bootstrap.servers"));
- assertEquals(1, kafkaProducerProperties.get("linger.ms"));
+ assertEquals("1", kafkaProducerProperties.get("linger.ms"));
assertEquals(null, kafkaProducerProperties.get("group.id"));
assertEquals(null, kafkaProducerProperties.get("Property0"));
assertEquals(null, kafkaProducerProperties.get("Property1"));
@@ -66,7 +67,7 @@ public class KafkaCarrierTechnologyParametersTest {
kafkaProducerProperties = kafkaCarrierTechnologyParameters.getKafkaProducerProperties();
assertNotNull(kafkaProducerProperties);
assertEquals("localhost:9092", kafkaProducerProperties.get("bootstrap.servers"));
- assertEquals(1, kafkaProducerProperties.get("linger.ms"));
+ assertEquals("1", kafkaProducerProperties.get("linger.ms"));
assertEquals(null, kafkaProducerProperties.get("group.id"));
assertEquals("Value0", kafkaProducerProperties.get("Property0"));
assertEquals("Value1", kafkaProducerProperties.get("Property1"));
@@ -222,7 +223,7 @@ public class KafkaCarrierTechnologyParametersTest {
kafkaCarrierTechnologyParameters.setConsumerTopicList(origConsumerTopcList);
assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
- String[] blankStringList = {null, ""};
+ String[] blankStringList = { null, "" };
kafkaCarrierTechnologyParameters.setConsumerTopicList(blankStringList);
assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
kafkaCarrierTechnologyParameters.setConsumerTopicList(origConsumerTopcList);
@@ -239,7 +240,7 @@ public class KafkaCarrierTechnologyParametersTest {
kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
- // @formatter:off
+ // @formatter:offkafkaCarrierTechnologyParameters
String[][] kafkaProperties0 = {
{
null, "Value0"
@@ -290,6 +291,87 @@ public class KafkaCarrierTechnologyParametersTest {
assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+ }
+
+ @Test
+ public void testExplicitImplicit() {
+ KafkaCarrierTechnologyParameters kafkaCtp = new KafkaCarrierTechnologyParameters();
+ assertNotNull(kafkaCtp);
+
+ assertTrue(kafkaCtp.validate().isValid());
+
+ // @formatter:off
+ assertEquals("localhost:9092", kafkaCtp.getKafkaConsumerProperties().get("bootstrap.servers"));
+ assertEquals("all", kafkaCtp.getKafkaProducerProperties().get("acks"));
+ assertEquals("0", kafkaCtp.getKafkaProducerProperties().get("retries"));
+ assertEquals("16384", kafkaCtp.getKafkaProducerProperties().get("batch.size"));
+ assertEquals("1", kafkaCtp.getKafkaProducerProperties().get("linger.ms"));
+ assertEquals("33554432", kafkaCtp.getKafkaProducerProperties().get("buffer.memory"));
+ assertEquals("default-group-id", kafkaCtp.getKafkaConsumerProperties().get("group.id"));
+ assertEquals("true", kafkaCtp.getKafkaConsumerProperties().get("enable.auto.commit"));
+ assertEquals("1000", kafkaCtp.getKafkaConsumerProperties().get("auto.commit.interval.ms"));
+ assertEquals("30000", kafkaCtp.getKafkaConsumerProperties().get("session.timeout.ms"));
+ // @formatter:on
+
+ assertEquals("org.apache.kafka.common.serialization.StringSerializer",
+ kafkaCtp.getKafkaProducerProperties().get("key.serializer"));
+ assertEquals("org.apache.kafka.common.serialization.StringSerializer",
+ kafkaCtp.getKafkaProducerProperties().get("value.serializer"));
+ assertEquals("org.apache.kafka.common.serialization.StringDeserializer",
+ kafkaCtp.getKafkaConsumerProperties().get("key.deserializer"));
+ assertEquals("org.apache.kafka.common.serialization.StringDeserializer",
+ kafkaCtp.getKafkaConsumerProperties().get("value.deserializer"));
+ assertEquals("org.apache.kafka.clients.producer.internals.DefaultPartitioner",
+ kafkaCtp.getKafkaProducerProperties().get("partitioner.class"));
+
+ // @formatter:off
+ String[][] kafkaProperties0 = {
+ {
+ "bootstrap.servers", "localhost:9092"
+ }
+ };
+ // @formatter:on
+
+ kafkaCtp.setBootstrapServers(null);
+ kafkaCtp.setKafkaProperties(kafkaProperties0);
+ assertEquals("localhost:9092", kafkaCtp.getKafkaConsumerProperties().get("bootstrap.servers"));
+
+ // @formatter:off
+ String[][] kafkaProperties1 = {
+ {
+ "bootstrap.servers", "localhost:9999"
+ }
+ };
+ // @formatter:on
+
+ kafkaCtp = new KafkaCarrierTechnologyParameters();
+ kafkaCtp.setKafkaProperties(kafkaProperties1);
+ assertEquals("localhost:9999", kafkaCtp.getKafkaConsumerProperties().get("bootstrap.servers"));
+
+ // @formatter:off
+ String[][] kafkaProperties2 = {
+ {
+ "bootstrap.servers", "localhost:8888"
+ }
+ };
+ // @formatter:on
+
+ kafkaCtp = new KafkaCarrierTechnologyParameters();
+ kafkaCtp.setBootstrapServers("localhost:9092");
+ kafkaCtp.setKafkaProperties(kafkaProperties2);
+ assertEquals("localhost:8888", kafkaCtp.getKafkaConsumerProperties().get("bootstrap.servers"));
+
+ // @formatter:off
+ String[][] kafkaProperties3 = {
+ {
+ "bootstrap.servers", "localhost:5555"
+ }
+ };
+ // @formatter:on
+ kafkaCtp = new KafkaCarrierTechnologyParameters();
+ kafkaCtp.setBootstrapServers("localhost:7777");
+ kafkaCtp.setKafkaProperties(kafkaProperties3);
+ assertEquals("localhost:7777", kafkaCtp.getKafkaConsumerProperties().get("bootstrap.servers"));
}
}