summaryrefslogtreecommitdiffstats
path: root/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka')
-rw-r--r--plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java79
-rw-r--r--plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/site-docs/adoc/fragments/ct-kafka-io.adoc16
-rw-r--r--plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParametersTest.java90
3 files changed, 155 insertions, 30 deletions
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java
index 927d79ee1..9d9acf625 100644
--- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java
+++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java
@@ -154,26 +154,28 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter
* @return the kafka producer properties
*/
public Properties getKafkaProducerProperties() {
- final Properties returnKafkaProperties = new Properties();
+ final Properties retKafkaProps = new Properties();
// Add properties from the Kafka property array
if (kafkaProperties != null) {
for (int i = 0; i < kafkaProperties.length; i++) {
- returnKafkaProperties.put(kafkaProperties[i][0], kafkaProperties[i][1]);
+ retKafkaProps.setProperty(kafkaProperties[i][0], kafkaProperties[i][1]);
}
}
- returnKafkaProperties.put(PROPERTY_BOOTSTRAP_SERVERS, bootstrapServers);
- returnKafkaProperties.put(PROPERTY_ACKS, acks);
- returnKafkaProperties.put(PROPERTY_RETRIES, retries);
- returnKafkaProperties.put(PROPERTY_BATCH_SIZE, batchSize);
- returnKafkaProperties.put(PROPERTY_LINGER_TIME, lingerTime);
- returnKafkaProperties.put(PROPERTY_BUFFER_MEMORY, bufferMemory);
- returnKafkaProperties.put(PROPERTY_KEY_SERIALIZER, keySerializer);
- returnKafkaProperties.put(PROPERTY_VALUE_SERIALIZER, valueSerializer);
- returnKafkaProperties.put(PROPERTY_PARTITIONER_CLASS, partitionerClass);
-
- return returnKafkaProperties;
+ // @formatter:off
+ putExplicitProperty(retKafkaProps, PROPERTY_BOOTSTRAP_SERVERS, bootstrapServers, DEFAULT_BOOT_SERVERS);
+ putExplicitProperty(retKafkaProps, PROPERTY_ACKS, acks, DEFAULT_ACKS);
+ putExplicitProperty(retKafkaProps, PROPERTY_RETRIES, retries, DEFAULT_RETRIES);
+ putExplicitProperty(retKafkaProps, PROPERTY_BATCH_SIZE, batchSize, DEFAULT_BATCH_SIZE);
+ putExplicitProperty(retKafkaProps, PROPERTY_LINGER_TIME, lingerTime, DEFAULT_LINGER_TIME);
+ putExplicitProperty(retKafkaProps, PROPERTY_BUFFER_MEMORY, bufferMemory, DEFAULT_BUFFER_MEMORY);
+ putExplicitProperty(retKafkaProps, PROPERTY_KEY_SERIALIZER, keySerializer, DEFAULT_STRING_SERZER);
+ putExplicitProperty(retKafkaProps, PROPERTY_VALUE_SERIALIZER, valueSerializer, DEFAULT_STRING_SERZER);
+ putExplicitProperty(retKafkaProps, PROPERTY_PARTITIONER_CLASS, partitionerClass, DEFAULT_PARTITIONR_CLASS);
+ // @formatter:on
+
+ return retKafkaProps;
}
/**
@@ -182,24 +184,26 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter
* @return the kafka consumer properties
*/
public Properties getKafkaConsumerProperties() {
- final Properties returnKafkaProperties = new Properties();
+ final Properties retKafkaProps = new Properties();
// Add properties from the Kafka property array
if (kafkaProperties != null) {
for (int i = 0; i < kafkaProperties.length; i++) {
- returnKafkaProperties.put(kafkaProperties[i][0], kafkaProperties[i][1]);
+ retKafkaProps.setProperty(kafkaProperties[i][0], kafkaProperties[i][1]);
}
}
- returnKafkaProperties.put(PROPERTY_BOOTSTRAP_SERVERS, bootstrapServers);
- returnKafkaProperties.put(PROPERTY_GROUP_ID, groupId);
- returnKafkaProperties.put(PROPERTY_ENABLE_AUTO_COMMIT, enableAutoCommit);
- returnKafkaProperties.put(PROPERTY_AUTO_COMMIT_TIME, autoCommitTime);
- returnKafkaProperties.put(PROPERTY_SESSION_TIMEOUT, sessionTimeout);
- returnKafkaProperties.put(PROPERTY_KEY_DESERIALIZER, keyDeserializer);
- returnKafkaProperties.put(PROPERTY_VALUE_DESERIALIZER, valueDeserializer);
-
- return returnKafkaProperties;
+ // @formatter:off
+ putExplicitProperty(retKafkaProps, PROPERTY_BOOTSTRAP_SERVERS, bootstrapServers, DEFAULT_BOOT_SERVERS);
+ putExplicitProperty(retKafkaProps, PROPERTY_GROUP_ID, groupId, DEFAULT_GROUP_ID);
+ putExplicitProperty(retKafkaProps, PROPERTY_ENABLE_AUTO_COMMIT, enableAutoCommit, DEFAULT_ENABLE_AUTOCMIT);
+ putExplicitProperty(retKafkaProps, PROPERTY_AUTO_COMMIT_TIME, autoCommitTime, DEFAULT_AUTO_COMMIT_TIME);
+ putExplicitProperty(retKafkaProps, PROPERTY_SESSION_TIMEOUT, sessionTimeout, DEFAULT_SESSION_TIMEOUT);
+ putExplicitProperty(retKafkaProps, PROPERTY_KEY_DESERIALIZER, keyDeserializer, DEFAULT_STRING_DESZER);
+ putExplicitProperty(retKafkaProps, PROPERTY_VALUE_DESERIALIZER, valueDeserializer, DEFAULT_STRING_DESZER);
+ // @formatter:on
+
+ return retKafkaProps;
}
/**
@@ -285,4 +289,31 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter
}
}
}
+
+ /**
+ * Put a property into the properties if it is not already defined and is not the default value.
+ *
+ * @param returnKafkaProperties the properties to set the value in
+ * @param property the property to put
+ * @param value the value of the property to put
+ * @param defaultValue the default value of the property to put
+ */
+ private void putExplicitProperty(final Properties returnKafkaProperties, final String property,
+ final Object value, final Object defaultValue) {
+
+ // Check if the property is already in the properties
+ if (!returnKafkaProperties.containsKey(property)) {
+ // Not found, so add it
+ returnKafkaProperties.setProperty(property, value.toString());
+ }
+ else {
+ // Found, only overwrite if the property does not have the default value
+ if (value == null) {
+ returnKafkaProperties.setProperty(property, defaultValue.toString());
+ }
+ else if (!value.toString().contentEquals(defaultValue.toString())) {
+ returnKafkaProperties.setProperty(property, value.toString());
+ }
+ }
+ }
}
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/site-docs/adoc/fragments/ct-kafka-io.adoc b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/site-docs/adoc/fragments/ct-kafka-io.adoc
index 03e1139a0..94586e9bd 100644
--- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/site-docs/adoc/fragments/ct-kafka-io.adoc
+++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/site-docs/adoc/fragments/ct-kafka-io.adoc
@@ -1,6 +1,7 @@
//
// ============LICENSE_START=======================================================
// Copyright (C) 2016-2018 Ericsson. All rights reserved.
+// Modifications Copyright (C) 2019 Nordix Foundation.
// ================================================================================
// This file is licensed under the CREATIVE COMMONS ATTRIBUTION 4.0 INTERNATIONAL LICENSE
// Full license text at https://creativecommons.org/licenses/by/4.0/legalcode
@@ -45,7 +46,8 @@ The input is uni-directional, an engine will only receive events from the input
"keyDeserializer" :
"org.apache.kafka.common.serialization.StringDeserializer", <9>
"valueDeserializer" :
- "org.apache.kafka.common.serialization.StringDeserializer" <10>
+ "org.apache.kafka.common.serialization.StringDeserializer", <10>
+ "kafkaProperties" : [] <11>
}
}
----
@@ -60,6 +62,8 @@ The input is uni-directional, an engine will only receive events from the input
<8> consumer topic list
<9> key for the Kafka de-serializer
<10> value for the Kafka de-serializer
+<11> an optional list of name value pairs of properties to be passed transparently to Kafka.
+This field need not be specified, can be set to null, or to an empty list as here.
=== Kafka Output
@@ -85,7 +89,11 @@ The output is uni-directional, an engine will send events to the output but not
"keySerializer" :
"org.apache.kafka.common.serialization.StringSerializer", <9>
"valueSerializer" :
- "org.apache.kafka.common.serialization.StringSerializer" <10>
+ "org.apache.kafka.common.serialization.StringSerializer", <10>
+ "kafkaProperties": [ <11>
+ ["message.max.bytes", 1000000],
+ ["compression.codec", "none"]
+ ]
}
}
----
@@ -100,3 +108,7 @@ The output is uni-directional, an engine will send events to the output but not
<8> producer topic
<9> key for the Kafka serializer
<10> value for the Kafka serializer
+<11> an optional list of name value pairs of properties to be passed transparently to Kafka. If a property appears in
+the _kafkaProperties_ field and is also explicitly specified to a non-default value (such as _lingerTime_
+and _linger.ms_) the explictly specified value of the property is used rather than the value specified in the
+_kafkaProperties_ list.
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParametersTest.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParametersTest.java
index 2f5405ba8..5b6ec33c6 100644
--- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParametersTest.java
+++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParametersTest.java
@@ -1,6 +1,7 @@
/*-
* ============LICENSE_START=======================================================
* Copyright (C) 2019 Samsung. All rights reserved.
+ * Modifications Copyright (C) 2019 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -45,7 +46,7 @@ public class KafkaCarrierTechnologyParametersTest {
Properties kafkaProducerProperties = kafkaCarrierTechnologyParameters.getKafkaProducerProperties();
assertNotNull(kafkaProducerProperties);
assertEquals("localhost:9092", kafkaProducerProperties.get("bootstrap.servers"));
- assertEquals(1, kafkaProducerProperties.get("linger.ms"));
+ assertEquals("1", kafkaProducerProperties.get("linger.ms"));
assertEquals(null, kafkaProducerProperties.get("group.id"));
assertEquals(null, kafkaProducerProperties.get("Property0"));
assertEquals(null, kafkaProducerProperties.get("Property1"));
@@ -66,7 +67,7 @@ public class KafkaCarrierTechnologyParametersTest {
kafkaProducerProperties = kafkaCarrierTechnologyParameters.getKafkaProducerProperties();
assertNotNull(kafkaProducerProperties);
assertEquals("localhost:9092", kafkaProducerProperties.get("bootstrap.servers"));
- assertEquals(1, kafkaProducerProperties.get("linger.ms"));
+ assertEquals("1", kafkaProducerProperties.get("linger.ms"));
assertEquals(null, kafkaProducerProperties.get("group.id"));
assertEquals("Value0", kafkaProducerProperties.get("Property0"));
assertEquals("Value1", kafkaProducerProperties.get("Property1"));
@@ -222,7 +223,7 @@ public class KafkaCarrierTechnologyParametersTest {
kafkaCarrierTechnologyParameters.setConsumerTopicList(origConsumerTopcList);
assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
- String[] blankStringList = {null, ""};
+ String[] blankStringList = { null, "" };
kafkaCarrierTechnologyParameters.setConsumerTopicList(blankStringList);
assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
kafkaCarrierTechnologyParameters.setConsumerTopicList(origConsumerTopcList);
@@ -239,7 +240,7 @@ public class KafkaCarrierTechnologyParametersTest {
kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
- // @formatter:off
+ // @formatter:offkafkaCarrierTechnologyParameters
String[][] kafkaProperties0 = {
{
null, "Value0"
@@ -290,6 +291,87 @@ public class KafkaCarrierTechnologyParametersTest {
assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+ }
+
+ @Test
+ public void testExplicitImplicit() {
+ KafkaCarrierTechnologyParameters kafkaCtp = new KafkaCarrierTechnologyParameters();
+ assertNotNull(kafkaCtp);
+
+ assertTrue(kafkaCtp.validate().isValid());
+
+ // @formatter:off
+ assertEquals("localhost:9092", kafkaCtp.getKafkaConsumerProperties().get("bootstrap.servers"));
+ assertEquals("all", kafkaCtp.getKafkaProducerProperties().get("acks"));
+ assertEquals("0", kafkaCtp.getKafkaProducerProperties().get("retries"));
+ assertEquals("16384", kafkaCtp.getKafkaProducerProperties().get("batch.size"));
+ assertEquals("1", kafkaCtp.getKafkaProducerProperties().get("linger.ms"));
+ assertEquals("33554432", kafkaCtp.getKafkaProducerProperties().get("buffer.memory"));
+ assertEquals("default-group-id", kafkaCtp.getKafkaConsumerProperties().get("group.id"));
+ assertEquals("true", kafkaCtp.getKafkaConsumerProperties().get("enable.auto.commit"));
+ assertEquals("1000", kafkaCtp.getKafkaConsumerProperties().get("auto.commit.interval.ms"));
+ assertEquals("30000", kafkaCtp.getKafkaConsumerProperties().get("session.timeout.ms"));
+ // @formatter:on
+
+ assertEquals("org.apache.kafka.common.serialization.StringSerializer",
+ kafkaCtp.getKafkaProducerProperties().get("key.serializer"));
+ assertEquals("org.apache.kafka.common.serialization.StringSerializer",
+ kafkaCtp.getKafkaProducerProperties().get("value.serializer"));
+ assertEquals("org.apache.kafka.common.serialization.StringDeserializer",
+ kafkaCtp.getKafkaConsumerProperties().get("key.deserializer"));
+ assertEquals("org.apache.kafka.common.serialization.StringDeserializer",
+ kafkaCtp.getKafkaConsumerProperties().get("value.deserializer"));
+ assertEquals("org.apache.kafka.clients.producer.internals.DefaultPartitioner",
+ kafkaCtp.getKafkaProducerProperties().get("partitioner.class"));
+
+ // @formatter:off
+ String[][] kafkaProperties0 = {
+ {
+ "bootstrap.servers", "localhost:9092"
+ }
+ };
+ // @formatter:on
+
+ kafkaCtp.setBootstrapServers(null);
+ kafkaCtp.setKafkaProperties(kafkaProperties0);
+ assertEquals("localhost:9092", kafkaCtp.getKafkaConsumerProperties().get("bootstrap.servers"));
+
+ // @formatter:off
+ String[][] kafkaProperties1 = {
+ {
+ "bootstrap.servers", "localhost:9999"
+ }
+ };
+ // @formatter:on
+
+ kafkaCtp = new KafkaCarrierTechnologyParameters();
+ kafkaCtp.setKafkaProperties(kafkaProperties1);
+ assertEquals("localhost:9999", kafkaCtp.getKafkaConsumerProperties().get("bootstrap.servers"));
+
+ // @formatter:off
+ String[][] kafkaProperties2 = {
+ {
+ "bootstrap.servers", "localhost:8888"
+ }
+ };
+ // @formatter:on
+
+ kafkaCtp = new KafkaCarrierTechnologyParameters();
+ kafkaCtp.setBootstrapServers("localhost:9092");
+ kafkaCtp.setKafkaProperties(kafkaProperties2);
+ assertEquals("localhost:8888", kafkaCtp.getKafkaConsumerProperties().get("bootstrap.servers"));
+
+ // @formatter:off
+ String[][] kafkaProperties3 = {
+ {
+ "bootstrap.servers", "localhost:5555"
+ }
+ };
+ // @formatter:on
+ kafkaCtp = new KafkaCarrierTechnologyParameters();
+ kafkaCtp.setBootstrapServers("localhost:7777");
+ kafkaCtp.setKafkaProperties(kafkaProperties3);
+ assertEquals("localhost:7777", kafkaCtp.getKafkaConsumerProperties().get("bootstrap.servers"));
}
}