diff options
4 files changed, 162 insertions, 32 deletions
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java index 927d79ee1..9d9acf625 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/main/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParameters.java @@ -154,26 +154,28 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter * @return the kafka producer properties */ public Properties getKafkaProducerProperties() { - final Properties returnKafkaProperties = new Properties(); + final Properties retKafkaProps = new Properties(); // Add properties from the Kafka property array if (kafkaProperties != null) { for (int i = 0; i < kafkaProperties.length; i++) { - returnKafkaProperties.put(kafkaProperties[i][0], kafkaProperties[i][1]); + retKafkaProps.setProperty(kafkaProperties[i][0], kafkaProperties[i][1]); } } - returnKafkaProperties.put(PROPERTY_BOOTSTRAP_SERVERS, bootstrapServers); - returnKafkaProperties.put(PROPERTY_ACKS, acks); - returnKafkaProperties.put(PROPERTY_RETRIES, retries); - returnKafkaProperties.put(PROPERTY_BATCH_SIZE, batchSize); - returnKafkaProperties.put(PROPERTY_LINGER_TIME, lingerTime); - returnKafkaProperties.put(PROPERTY_BUFFER_MEMORY, bufferMemory); - returnKafkaProperties.put(PROPERTY_KEY_SERIALIZER, keySerializer); - returnKafkaProperties.put(PROPERTY_VALUE_SERIALIZER, valueSerializer); - returnKafkaProperties.put(PROPERTY_PARTITIONER_CLASS, partitionerClass); - - return returnKafkaProperties; + // @formatter:off + putExplicitProperty(retKafkaProps, PROPERTY_BOOTSTRAP_SERVERS, bootstrapServers, DEFAULT_BOOT_SERVERS); + putExplicitProperty(retKafkaProps, PROPERTY_ACKS, acks, DEFAULT_ACKS); + putExplicitProperty(retKafkaProps, PROPERTY_RETRIES, retries, DEFAULT_RETRIES); + putExplicitProperty(retKafkaProps, PROPERTY_BATCH_SIZE, batchSize, DEFAULT_BATCH_SIZE); + putExplicitProperty(retKafkaProps, PROPERTY_LINGER_TIME, lingerTime, DEFAULT_LINGER_TIME); + putExplicitProperty(retKafkaProps, PROPERTY_BUFFER_MEMORY, bufferMemory, DEFAULT_BUFFER_MEMORY); + putExplicitProperty(retKafkaProps, PROPERTY_KEY_SERIALIZER, keySerializer, DEFAULT_STRING_SERZER); + putExplicitProperty(retKafkaProps, PROPERTY_VALUE_SERIALIZER, valueSerializer, DEFAULT_STRING_SERZER); + putExplicitProperty(retKafkaProps, PROPERTY_PARTITIONER_CLASS, partitionerClass, DEFAULT_PARTITIONR_CLASS); + // @formatter:on + + return retKafkaProps; } /** @@ -182,24 +184,26 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter * @return the kafka consumer properties */ public Properties getKafkaConsumerProperties() { - final Properties returnKafkaProperties = new Properties(); + final Properties retKafkaProps = new Properties(); // Add properties from the Kafka property array if (kafkaProperties != null) { for (int i = 0; i < kafkaProperties.length; i++) { - returnKafkaProperties.put(kafkaProperties[i][0], kafkaProperties[i][1]); + retKafkaProps.setProperty(kafkaProperties[i][0], kafkaProperties[i][1]); } } - returnKafkaProperties.put(PROPERTY_BOOTSTRAP_SERVERS, bootstrapServers); - returnKafkaProperties.put(PROPERTY_GROUP_ID, groupId); - returnKafkaProperties.put(PROPERTY_ENABLE_AUTO_COMMIT, enableAutoCommit); - returnKafkaProperties.put(PROPERTY_AUTO_COMMIT_TIME, autoCommitTime); - returnKafkaProperties.put(PROPERTY_SESSION_TIMEOUT, sessionTimeout); - returnKafkaProperties.put(PROPERTY_KEY_DESERIALIZER, keyDeserializer); - returnKafkaProperties.put(PROPERTY_VALUE_DESERIALIZER, valueDeserializer); - - return returnKafkaProperties; + // @formatter:off + putExplicitProperty(retKafkaProps, PROPERTY_BOOTSTRAP_SERVERS, bootstrapServers, DEFAULT_BOOT_SERVERS); + putExplicitProperty(retKafkaProps, PROPERTY_GROUP_ID, groupId, DEFAULT_GROUP_ID); + putExplicitProperty(retKafkaProps, PROPERTY_ENABLE_AUTO_COMMIT, enableAutoCommit, DEFAULT_ENABLE_AUTOCMIT); + putExplicitProperty(retKafkaProps, PROPERTY_AUTO_COMMIT_TIME, autoCommitTime, DEFAULT_AUTO_COMMIT_TIME); + putExplicitProperty(retKafkaProps, PROPERTY_SESSION_TIMEOUT, sessionTimeout, DEFAULT_SESSION_TIMEOUT); + putExplicitProperty(retKafkaProps, PROPERTY_KEY_DESERIALIZER, keyDeserializer, DEFAULT_STRING_DESZER); + putExplicitProperty(retKafkaProps, PROPERTY_VALUE_DESERIALIZER, valueDeserializer, DEFAULT_STRING_DESZER); + // @formatter:on + + return retKafkaProps; } /** @@ -285,4 +289,31 @@ public class KafkaCarrierTechnologyParameters extends CarrierTechnologyParameter } } } + + /** + * Put a property into the properties if it is not already defined and is not the default value. + * + * @param returnKafkaProperties the properties to set the value in + * @param property the property to put + * @param value the value of the property to put + * @param defaultValue the default value of the property to put + */ + private void putExplicitProperty(final Properties returnKafkaProperties, final String property, + final Object value, final Object defaultValue) { + + // Check if the property is already in the properties + if (!returnKafkaProperties.containsKey(property)) { + // Not found, so add it + returnKafkaProperties.setProperty(property, value.toString()); + } + else { + // Found, only overwrite if the property does not have the default value + if (value == null) { + returnKafkaProperties.setProperty(property, defaultValue.toString()); + } + else if (!value.toString().contentEquals(defaultValue.toString())) { + returnKafkaProperties.setProperty(property, value.toString()); + } + } + } } diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/site-docs/adoc/fragments/ct-kafka-io.adoc b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/site-docs/adoc/fragments/ct-kafka-io.adoc index 03e1139a0..94586e9bd 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/site-docs/adoc/fragments/ct-kafka-io.adoc +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/site-docs/adoc/fragments/ct-kafka-io.adoc @@ -1,6 +1,7 @@ // // ============LICENSE_START======================================================= // Copyright (C) 2016-2018 Ericsson. All rights reserved. +// Modifications Copyright (C) 2019 Nordix Foundation. // ================================================================================ // This file is licensed under the CREATIVE COMMONS ATTRIBUTION 4.0 INTERNATIONAL LICENSE // Full license text at https://creativecommons.org/licenses/by/4.0/legalcode @@ -45,7 +46,8 @@ The input is uni-directional, an engine will only receive events from the input "keyDeserializer" : "org.apache.kafka.common.serialization.StringDeserializer", <9> "valueDeserializer" : - "org.apache.kafka.common.serialization.StringDeserializer" <10> + "org.apache.kafka.common.serialization.StringDeserializer", <10> + "kafkaProperties" : [] <11> } } ---- @@ -60,6 +62,8 @@ The input is uni-directional, an engine will only receive events from the input <8> consumer topic list <9> key for the Kafka de-serializer <10> value for the Kafka de-serializer +<11> an optional list of name value pairs of properties to be passed transparently to Kafka. +This field need not be specified, can be set to null, or to an empty list as here. === Kafka Output @@ -85,7 +89,11 @@ The output is uni-directional, an engine will send events to the output but not "keySerializer" : "org.apache.kafka.common.serialization.StringSerializer", <9> "valueSerializer" : - "org.apache.kafka.common.serialization.StringSerializer" <10> + "org.apache.kafka.common.serialization.StringSerializer", <10> + "kafkaProperties": [ <11> + ["message.max.bytes", 1000000], + ["compression.codec", "none"] + ] } } ---- @@ -100,3 +108,7 @@ The output is uni-directional, an engine will send events to the output but not <8> producer topic <9> key for the Kafka serializer <10> value for the Kafka serializer +<11> an optional list of name value pairs of properties to be passed transparently to Kafka. If a property appears in +the _kafkaProperties_ field and is also explicitly specified to a non-default value (such as _lingerTime_ +and _linger.ms_) the explictly specified value of the property is used rather than the value specified in the +_kafkaProperties_ list. diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParametersTest.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParametersTest.java index 2f5405ba8..5b6ec33c6 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParametersTest.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParametersTest.java @@ -1,6 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2019 Samsung. All rights reserved. + * Modifications Copyright (C) 2019 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -45,7 +46,7 @@ public class KafkaCarrierTechnologyParametersTest { Properties kafkaProducerProperties = kafkaCarrierTechnologyParameters.getKafkaProducerProperties(); assertNotNull(kafkaProducerProperties); assertEquals("localhost:9092", kafkaProducerProperties.get("bootstrap.servers")); - assertEquals(1, kafkaProducerProperties.get("linger.ms")); + assertEquals("1", kafkaProducerProperties.get("linger.ms")); assertEquals(null, kafkaProducerProperties.get("group.id")); assertEquals(null, kafkaProducerProperties.get("Property0")); assertEquals(null, kafkaProducerProperties.get("Property1")); @@ -66,7 +67,7 @@ public class KafkaCarrierTechnologyParametersTest { kafkaProducerProperties = kafkaCarrierTechnologyParameters.getKafkaProducerProperties(); assertNotNull(kafkaProducerProperties); assertEquals("localhost:9092", kafkaProducerProperties.get("bootstrap.servers")); - assertEquals(1, kafkaProducerProperties.get("linger.ms")); + assertEquals("1", kafkaProducerProperties.get("linger.ms")); assertEquals(null, kafkaProducerProperties.get("group.id")); assertEquals("Value0", kafkaProducerProperties.get("Property0")); assertEquals("Value1", kafkaProducerProperties.get("Property1")); @@ -222,7 +223,7 @@ public class KafkaCarrierTechnologyParametersTest { kafkaCarrierTechnologyParameters.setConsumerTopicList(origConsumerTopcList); assertTrue(kafkaCarrierTechnologyParameters.validate().isValid()); - String[] blankStringList = {null, ""}; + String[] blankStringList = { null, "" }; kafkaCarrierTechnologyParameters.setConsumerTopicList(blankStringList); assertFalse(kafkaCarrierTechnologyParameters.validate().isValid()); kafkaCarrierTechnologyParameters.setConsumerTopicList(origConsumerTopcList); @@ -239,7 +240,7 @@ public class KafkaCarrierTechnologyParametersTest { kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties); assertTrue(kafkaCarrierTechnologyParameters.validate().isValid()); - // @formatter:off + // @formatter:offkafkaCarrierTechnologyParameters String[][] kafkaProperties0 = { { null, "Value0" @@ -290,6 +291,87 @@ public class KafkaCarrierTechnologyParametersTest { assertFalse(kafkaCarrierTechnologyParameters.validate().isValid()); kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties); assertTrue(kafkaCarrierTechnologyParameters.validate().isValid()); + } + + @Test + public void testExplicitImplicit() { + KafkaCarrierTechnologyParameters kafkaCtp = new KafkaCarrierTechnologyParameters(); + assertNotNull(kafkaCtp); + + assertTrue(kafkaCtp.validate().isValid()); + + // @formatter:off + assertEquals("localhost:9092", kafkaCtp.getKafkaConsumerProperties().get("bootstrap.servers")); + assertEquals("all", kafkaCtp.getKafkaProducerProperties().get("acks")); + assertEquals("0", kafkaCtp.getKafkaProducerProperties().get("retries")); + assertEquals("16384", kafkaCtp.getKafkaProducerProperties().get("batch.size")); + assertEquals("1", kafkaCtp.getKafkaProducerProperties().get("linger.ms")); + assertEquals("33554432", kafkaCtp.getKafkaProducerProperties().get("buffer.memory")); + assertEquals("default-group-id", kafkaCtp.getKafkaConsumerProperties().get("group.id")); + assertEquals("true", kafkaCtp.getKafkaConsumerProperties().get("enable.auto.commit")); + assertEquals("1000", kafkaCtp.getKafkaConsumerProperties().get("auto.commit.interval.ms")); + assertEquals("30000", kafkaCtp.getKafkaConsumerProperties().get("session.timeout.ms")); + // @formatter:on + + assertEquals("org.apache.kafka.common.serialization.StringSerializer", + kafkaCtp.getKafkaProducerProperties().get("key.serializer")); + assertEquals("org.apache.kafka.common.serialization.StringSerializer", + kafkaCtp.getKafkaProducerProperties().get("value.serializer")); + assertEquals("org.apache.kafka.common.serialization.StringDeserializer", + kafkaCtp.getKafkaConsumerProperties().get("key.deserializer")); + assertEquals("org.apache.kafka.common.serialization.StringDeserializer", + kafkaCtp.getKafkaConsumerProperties().get("value.deserializer")); + assertEquals("org.apache.kafka.clients.producer.internals.DefaultPartitioner", + kafkaCtp.getKafkaProducerProperties().get("partitioner.class")); + + // @formatter:off + String[][] kafkaProperties0 = { + { + "bootstrap.servers", "localhost:9092" + } + }; + // @formatter:on + + kafkaCtp.setBootstrapServers(null); + kafkaCtp.setKafkaProperties(kafkaProperties0); + assertEquals("localhost:9092", kafkaCtp.getKafkaConsumerProperties().get("bootstrap.servers")); + + // @formatter:off + String[][] kafkaProperties1 = { + { + "bootstrap.servers", "localhost:9999" + } + }; + // @formatter:on + + kafkaCtp = new KafkaCarrierTechnologyParameters(); + kafkaCtp.setKafkaProperties(kafkaProperties1); + assertEquals("localhost:9999", kafkaCtp.getKafkaConsumerProperties().get("bootstrap.servers")); + + // @formatter:off + String[][] kafkaProperties2 = { + { + "bootstrap.servers", "localhost:8888" + } + }; + // @formatter:on + + kafkaCtp = new KafkaCarrierTechnologyParameters(); + kafkaCtp.setBootstrapServers("localhost:9092"); + kafkaCtp.setKafkaProperties(kafkaProperties2); + assertEquals("localhost:8888", kafkaCtp.getKafkaConsumerProperties().get("bootstrap.servers")); + + // @formatter:off + String[][] kafkaProperties3 = { + { + "bootstrap.servers", "localhost:5555" + } + }; + // @formatter:on + kafkaCtp = new KafkaCarrierTechnologyParameters(); + kafkaCtp.setBootstrapServers("localhost:7777"); + kafkaCtp.setKafkaProperties(kafkaProperties3); + assertEquals("localhost:7777", kafkaCtp.getKafkaConsumerProperties().get("bootstrap.servers")); } } diff --git a/testsuites/integration/integration-common/src/main/resources/examples/config/SampleDomain/Kafka2KafkaJsonEvent.json b/testsuites/integration/integration-common/src/main/resources/examples/config/SampleDomain/Kafka2KafkaJsonEvent.json index 6b13db7d4..5f1a9c75a 100644 --- a/testsuites/integration/integration-common/src/main/resources/examples/config/SampleDomain/Kafka2KafkaJsonEvent.json +++ b/testsuites/integration/integration-common/src/main/resources/examples/config/SampleDomain/Kafka2KafkaJsonEvent.json @@ -28,7 +28,8 @@ "bufferMemory": 33554432, "producerTopic": "apex-out-json", "keySerializer": "org.apache.kafka.common.serialization.StringSerializer", - "valueSerializer": "org.apache.kafka.common.serialization.StringSerializer" + "valueSerializer": "org.apache.kafka.common.serialization.StringSerializer", + "kafkaProperties": [] } }, "eventProtocolParameters": { @@ -52,7 +53,11 @@ "apex-in-json" ], "keyDeserializer": "org.apache.kafka.common.serialization.StringDeserializer", - "valueDeserializer": "org.apache.kafka.common.serialization.StringDeserializer" + "valueDeserializer": "org.apache.kafka.common.serialization.StringDeserializer", + "kafkaProperties": [ + ["message.max.bytes", 1000000], + ["compression.codec", "none"] + ] } }, "eventProtocolParameters": { |