From cad7cff7dc945eefaf27815742461d6db6ab8eac Mon Sep 17 00:00:00 2001 From: liamfallon Date: Fri, 5 Jul 2019 13:41:03 +0000 Subject: Add duplicate check, examples for kafka Properties Added checks for dealing with duplication of specification of properties explicitly and in kafkaPropertes Added examples for kafkaProperties Added documentation for kafkaProperties Issue-ID: POLICY-1818 Change-Id: Icbe01d6f1d25d4570dcc85cc3db28588743b9c41 Signed-off-by: liamfallon --- .../KafkaCarrierTechnologyParametersTest.java | 90 +++++++++++++++++++++- 1 file changed, 86 insertions(+), 4 deletions(-) (limited to 'plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test') diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParametersTest.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParametersTest.java index 2f5405ba8..5b6ec33c6 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParametersTest.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParametersTest.java @@ -1,6 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2019 Samsung. All rights reserved. + * Modifications Copyright (C) 2019 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -45,7 +46,7 @@ public class KafkaCarrierTechnologyParametersTest { Properties kafkaProducerProperties = kafkaCarrierTechnologyParameters.getKafkaProducerProperties(); assertNotNull(kafkaProducerProperties); assertEquals("localhost:9092", kafkaProducerProperties.get("bootstrap.servers")); - assertEquals(1, kafkaProducerProperties.get("linger.ms")); + assertEquals("1", kafkaProducerProperties.get("linger.ms")); assertEquals(null, kafkaProducerProperties.get("group.id")); assertEquals(null, kafkaProducerProperties.get("Property0")); assertEquals(null, kafkaProducerProperties.get("Property1")); @@ -66,7 +67,7 @@ public class KafkaCarrierTechnologyParametersTest { kafkaProducerProperties = kafkaCarrierTechnologyParameters.getKafkaProducerProperties(); assertNotNull(kafkaProducerProperties); assertEquals("localhost:9092", kafkaProducerProperties.get("bootstrap.servers")); - assertEquals(1, kafkaProducerProperties.get("linger.ms")); + assertEquals("1", kafkaProducerProperties.get("linger.ms")); assertEquals(null, kafkaProducerProperties.get("group.id")); assertEquals("Value0", kafkaProducerProperties.get("Property0")); assertEquals("Value1", kafkaProducerProperties.get("Property1")); @@ -222,7 +223,7 @@ public class KafkaCarrierTechnologyParametersTest { kafkaCarrierTechnologyParameters.setConsumerTopicList(origConsumerTopcList); assertTrue(kafkaCarrierTechnologyParameters.validate().isValid()); - String[] blankStringList = {null, ""}; + String[] blankStringList = { null, "" }; kafkaCarrierTechnologyParameters.setConsumerTopicList(blankStringList); assertFalse(kafkaCarrierTechnologyParameters.validate().isValid()); kafkaCarrierTechnologyParameters.setConsumerTopicList(origConsumerTopcList); @@ -239,7 +240,7 @@ public class KafkaCarrierTechnologyParametersTest { kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties); assertTrue(kafkaCarrierTechnologyParameters.validate().isValid()); - // @formatter:off + // @formatter:offkafkaCarrierTechnologyParameters String[][] kafkaProperties0 = { { null, "Value0" @@ -290,6 +291,87 @@ public class KafkaCarrierTechnologyParametersTest { assertFalse(kafkaCarrierTechnologyParameters.validate().isValid()); kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties); assertTrue(kafkaCarrierTechnologyParameters.validate().isValid()); + } + + @Test + public void testExplicitImplicit() { + KafkaCarrierTechnologyParameters kafkaCtp = new KafkaCarrierTechnologyParameters(); + assertNotNull(kafkaCtp); + + assertTrue(kafkaCtp.validate().isValid()); + + // @formatter:off + assertEquals("localhost:9092", kafkaCtp.getKafkaConsumerProperties().get("bootstrap.servers")); + assertEquals("all", kafkaCtp.getKafkaProducerProperties().get("acks")); + assertEquals("0", kafkaCtp.getKafkaProducerProperties().get("retries")); + assertEquals("16384", kafkaCtp.getKafkaProducerProperties().get("batch.size")); + assertEquals("1", kafkaCtp.getKafkaProducerProperties().get("linger.ms")); + assertEquals("33554432", kafkaCtp.getKafkaProducerProperties().get("buffer.memory")); + assertEquals("default-group-id", kafkaCtp.getKafkaConsumerProperties().get("group.id")); + assertEquals("true", kafkaCtp.getKafkaConsumerProperties().get("enable.auto.commit")); + assertEquals("1000", kafkaCtp.getKafkaConsumerProperties().get("auto.commit.interval.ms")); + assertEquals("30000", kafkaCtp.getKafkaConsumerProperties().get("session.timeout.ms")); + // @formatter:on + + assertEquals("org.apache.kafka.common.serialization.StringSerializer", + kafkaCtp.getKafkaProducerProperties().get("key.serializer")); + assertEquals("org.apache.kafka.common.serialization.StringSerializer", + kafkaCtp.getKafkaProducerProperties().get("value.serializer")); + assertEquals("org.apache.kafka.common.serialization.StringDeserializer", + kafkaCtp.getKafkaConsumerProperties().get("key.deserializer")); + assertEquals("org.apache.kafka.common.serialization.StringDeserializer", + kafkaCtp.getKafkaConsumerProperties().get("value.deserializer")); + assertEquals("org.apache.kafka.clients.producer.internals.DefaultPartitioner", + kafkaCtp.getKafkaProducerProperties().get("partitioner.class")); + + // @formatter:off + String[][] kafkaProperties0 = { + { + "bootstrap.servers", "localhost:9092" + } + }; + // @formatter:on + + kafkaCtp.setBootstrapServers(null); + kafkaCtp.setKafkaProperties(kafkaProperties0); + assertEquals("localhost:9092", kafkaCtp.getKafkaConsumerProperties().get("bootstrap.servers")); + + // @formatter:off + String[][] kafkaProperties1 = { + { + "bootstrap.servers", "localhost:9999" + } + }; + // @formatter:on + + kafkaCtp = new KafkaCarrierTechnologyParameters(); + kafkaCtp.setKafkaProperties(kafkaProperties1); + assertEquals("localhost:9999", kafkaCtp.getKafkaConsumerProperties().get("bootstrap.servers")); + + // @formatter:off + String[][] kafkaProperties2 = { + { + "bootstrap.servers", "localhost:8888" + } + }; + // @formatter:on + + kafkaCtp = new KafkaCarrierTechnologyParameters(); + kafkaCtp.setBootstrapServers("localhost:9092"); + kafkaCtp.setKafkaProperties(kafkaProperties2); + assertEquals("localhost:8888", kafkaCtp.getKafkaConsumerProperties().get("bootstrap.servers")); + + // @formatter:off + String[][] kafkaProperties3 = { + { + "bootstrap.servers", "localhost:5555" + } + }; + // @formatter:on + kafkaCtp = new KafkaCarrierTechnologyParameters(); + kafkaCtp.setBootstrapServers("localhost:7777"); + kafkaCtp.setKafkaProperties(kafkaProperties3); + assertEquals("localhost:7777", kafkaCtp.getKafkaConsumerProperties().get("bootstrap.servers")); } } -- cgit 1.2.3-korg