diff options
Diffstat (limited to 'plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka')
3 files changed, 275 insertions, 250 deletions
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumerTest.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumerTest.java index 1ef3550e4..f1e420bf0 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumerTest.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumerTest.java @@ -1,7 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2019 Samsung. All rights reserved. - * Modifications Copyright (C) 2020 Nordix Foundation + * Modifications Copyright (C) 2020, 2024 Nordix Foundation * Modifications Copyright (C) 2022 Bell Canada. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); @@ -23,12 +23,13 @@ package org.onap.policy.apex.plugins.event.carrier.kafka; import static org.assertj.core.api.Assertions.assertThatCode; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; -import org.junit.Before; -import org.junit.Test; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.onap.policy.apex.service.engine.event.ApexEventException; import org.onap.policy.apex.service.engine.event.ApexEventProducer; import org.onap.policy.apex.service.engine.event.ApexEventReceiver; @@ -37,7 +38,7 @@ import org.onap.policy.apex.service.parameters.carriertechnology.CarrierTechnolo import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters; import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode; -public class ApexKafkaConsumerTest { +class ApexKafkaConsumerTest { ApexKafkaConsumer apexKafkaConsumer = null; ApexKafkaConsumer apexKafkaConsumer2 = null; EventHandlerParameters consumerParameters = null; @@ -51,13 +52,14 @@ public class ApexKafkaConsumerTest { * * @throws ApexEventException on test set up errors. */ - @Before - public void setUp() throws ApexEventException { + @BeforeEach + void setUp() throws ApexEventException { apexKafkaConsumer = new ApexKafkaConsumer(); consumerParameters = new EventHandlerParameters(); apexKafkaProducer = new ApexKafkaProducer(); consumerParameters - .setCarrierTechnologyParameters(new KafkaCarrierTechnologyParameters() {}); + .setCarrierTechnologyParameters(new KafkaCarrierTechnologyParameters() { + }); apexKafkaConsumer.init("TestApexKafkaConsumer", consumerParameters, incomingEventReceiver); apexKafkaConsumer2 = new ApexKafkaConsumer(); @@ -65,63 +67,66 @@ public class ApexKafkaConsumerTest { kafkaParameters = new KafkaCarrierTechnologyParameters(); String[][] kafkaProperties = { {"value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer"}, - {"schema.registry.url", "[http://test-registory:8080]"} + {"schema.registry.url", "[https://test-registory:8080]"} }; kafkaParameters.setKafkaProperties(kafkaProperties); consumerParameters2 - .setCarrierTechnologyParameters(kafkaParameters); + .setCarrierTechnologyParameters(kafkaParameters); apexKafkaConsumer2.init("TestApexKafkaConsumer2", consumerParameters2, incomingEventReceiver); } @Test - public void testStart() { + void testStart() { assertThatCode(apexKafkaConsumer::start).doesNotThrowAnyException(); assertThatCode(apexKafkaConsumer2::start).doesNotThrowAnyException(); } @Test - public void testGetName() { + void testGetName() { assertEquals("TestApexKafkaConsumer", apexKafkaConsumer.getName()); assertEquals("TestApexKafkaConsumer2", apexKafkaConsumer2.getName()); } @Test - public void testGetPeeredReference() { + void testGetPeeredReference() { assertNull(apexKafkaConsumer.getPeeredReference(EventHandlerPeeredMode.REQUESTOR)); assertNull(apexKafkaConsumer2.getPeeredReference(EventHandlerPeeredMode.REQUESTOR)); } @Test - public void testSetPeeredReference() { + void testSetPeeredReference() { PeeredReference peeredReference = new PeeredReference(EventHandlerPeeredMode.REQUESTOR, - apexKafkaConsumer, apexKafkaProducer); + apexKafkaConsumer, apexKafkaProducer); apexKafkaConsumer.setPeeredReference(EventHandlerPeeredMode.REQUESTOR, peeredReference); assertNotNull(apexKafkaConsumer.getPeeredReference(EventHandlerPeeredMode.REQUESTOR)); PeeredReference peeredReference2 = new PeeredReference(EventHandlerPeeredMode.REQUESTOR, - apexKafkaConsumer2, apexKafkaProducer); + apexKafkaConsumer2, apexKafkaProducer); apexKafkaConsumer2.setPeeredReference(EventHandlerPeeredMode.REQUESTOR, peeredReference2); assertNotNull(apexKafkaConsumer2.getPeeredReference(EventHandlerPeeredMode.REQUESTOR)); } - @Test(expected = java.lang.NullPointerException.class) - public void testRun() { - apexKafkaConsumer.run(); - apexKafkaConsumer2.run(); + @Test + void testRun() { + assertThrows(NullPointerException.class, () -> apexKafkaConsumer.run()); + assertThrows(NullPointerException.class, () -> apexKafkaConsumer2.run()); } - @Test(expected = java.lang.NullPointerException.class) - public void testStop() { - apexKafkaConsumer.stop(); - apexKafkaConsumer2.stop(); + @Test + void testStop() { + assertThrows(NullPointerException.class, () -> apexKafkaConsumer.stop()); + assertThrows(NullPointerException.class, () -> apexKafkaConsumer2.stop()); } - @Test(expected = ApexEventException.class) - public void testInitWithNonKafkaCarrierTechnologyParameters() throws ApexEventException { - consumerParameters.setCarrierTechnologyParameters(new CarrierTechnologyParameters() {}); - apexKafkaConsumer.init("TestApexKafkaConsumer", consumerParameters, incomingEventReceiver); - apexKafkaConsumer2.init("TestApexKafkaConsumer2", consumerParameters, incomingEventReceiver); + @Test + void testInitWithNonKafkaCarrierTechnologyParameters() { + consumerParameters.setCarrierTechnologyParameters(new CarrierTechnologyParameters() { + }); + assertThrows(ApexEventException.class, () -> + apexKafkaConsumer.init("TestApexKafkaConsumer", consumerParameters, incomingEventReceiver)); + assertThrows(ApexEventException.class, () -> + apexKafkaConsumer2.init("TestApexKafkaConsumer2", consumerParameters, incomingEventReceiver)); } } diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaProducerTest.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaProducerTest.java index 7300474c7..f5606eb57 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaProducerTest.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaProducerTest.java @@ -1,6 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2019 Samsung. All rights reserved. + * Modifications Copyright (C) 2024 Nordix Foundation. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,19 +21,20 @@ package org.onap.policy.apex.plugins.event.carrier.kafka; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; -import org.junit.Before; -import org.junit.Test; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.onap.policy.apex.service.engine.event.ApexEventException; import org.onap.policy.apex.service.engine.event.PeeredReference; import org.onap.policy.apex.service.engine.event.SynchronousEventCache; import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters; import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode; -public class ApexKafkaProducerTest { +class ApexKafkaProducerTest { ApexKafkaProducer apexKafkaProducer = null; ApexKafkaConsumer apexKafkaConsumer = null; EventHandlerParameters producerParameters = null; @@ -43,37 +45,38 @@ public class ApexKafkaProducerTest { /** * Set up testing. */ - @Before - public void setUp() throws Exception { + @BeforeEach + void setUp() { apexKafkaProducer = new ApexKafkaProducer(); apexKafkaConsumer = new ApexKafkaConsumer(); producerParameters = new EventHandlerParameters(); } - @Test(expected = ApexEventException.class) - public void testInit() throws ApexEventException { - apexKafkaProducer.init("TestApexKafkaProducer", producerParameters); + @Test + void testInit() { + assertThrows(ApexEventException.class, + () -> apexKafkaProducer.init("TestApexKafkaProducer", producerParameters)); } @Test - public void testGetName() { + void testGetName() { assertNull(apexKafkaProducer.getName()); } @Test - public void testGetPeeredReference() { + void testGetPeeredReference() { assertNull(apexKafkaProducer.getPeeredReference(EventHandlerPeeredMode.SYNCHRONOUS)); } @Test - public void testWithProperValues() throws ApexEventException { + void testWithProperValues() throws ApexEventException { producerParameters - .setCarrierTechnologyParameters(new KafkaCarrierTechnologyParameters() {}); + .setCarrierTechnologyParameters(new KafkaCarrierTechnologyParameters() { }); synchronousEventCache = new SynchronousEventCache(EventHandlerPeeredMode.SYNCHRONOUS, - apexKafkaConsumer, apexKafkaProducer, DEFAULT_SYNCHRONOUS_EVENT_TIMEOUT); + apexKafkaConsumer, apexKafkaProducer, DEFAULT_SYNCHRONOUS_EVENT_TIMEOUT); apexKafkaProducer.setPeeredReference(EventHandlerPeeredMode.SYNCHRONOUS, - synchronousEventCache); + synchronousEventCache); apexKafkaProducer.init("TestApexKafkaProducer", producerParameters); assertEquals("TestApexKafkaProducer", apexKafkaProducer.getName()); assertNotNull(apexKafkaProducer.getPeeredReference(EventHandlerPeeredMode.SYNCHRONOUS)); diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParametersTest.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParametersTest.java index 6b0f7d920..11bafc9b1 100644 --- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParametersTest.java +++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParametersTest.java @@ -1,7 +1,7 @@ /*- * ============LICENSE_START======================================================= * Copyright (C) 2019 Samsung. All rights reserved. - * Modifications Copyright (C) 2019,2023 Nordix Foundation. + * Modifications Copyright (C) 2019, 2023-2024 Nordix Foundation. * Modifications Copyright (C) 2021 Bell Canada. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); @@ -22,17 +22,18 @@ package org.onap.policy.apex.plugins.event.carrier.kafka; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.Properties; -import org.junit.Test; +import org.junit.jupiter.api.Test; -public class KafkaCarrierTechnologyParametersTest { +class KafkaCarrierTechnologyParametersTest { @Test - public void testKafkaCarrierTechnologyParameters() { + void testKafkaCarrierTechnologyParameters() { KafkaCarrierTechnologyParameters kafkaCarrierTechnologyParameters = new KafkaCarrierTechnologyParameters(); assertNotNull(kafkaCarrierTechnologyParameters); @@ -40,17 +41,17 @@ public class KafkaCarrierTechnologyParametersTest { } @Test - public void testGetKafkaProducerProperties() { + void testGetKafkaProducerProperties() { KafkaCarrierTechnologyParameters kafkaCarrierTechnologyParameters = new KafkaCarrierTechnologyParameters(); Properties kafkaProducerProperties = kafkaCarrierTechnologyParameters.getKafkaProducerProperties(); assertNotNull(kafkaProducerProperties); assertEquals("localhost:9092", kafkaProducerProperties.get("bootstrap.servers")); assertEquals("1", kafkaProducerProperties.get("linger.ms")); - assertEquals(null, kafkaProducerProperties.get("group.id")); - assertEquals(null, kafkaProducerProperties.get("Property0")); - assertEquals(null, kafkaProducerProperties.get("Property1")); - assertEquals(null, kafkaProducerProperties.get("Property2")); + assertNull(kafkaProducerProperties.get("group.id")); + assertNull(kafkaProducerProperties.get("Property0")); + assertNull(kafkaProducerProperties.get("Property1")); + assertNull(kafkaProducerProperties.get("Property2")); // @formatter:off String[][] kafkaProperties = { @@ -68,24 +69,24 @@ public class KafkaCarrierTechnologyParametersTest { assertNotNull(kafkaProducerProperties); assertEquals("localhost:9092", kafkaProducerProperties.get("bootstrap.servers")); assertEquals("1", kafkaProducerProperties.get("linger.ms")); - assertEquals(null, kafkaProducerProperties.get("group.id")); + assertNull(kafkaProducerProperties.get("group.id")); assertEquals("Value0", kafkaProducerProperties.get("Property0")); assertEquals("Value1", kafkaProducerProperties.get("Property1")); - assertEquals(null, kafkaProducerProperties.get("Property2")); + assertNull(kafkaProducerProperties.get("Property2")); } @Test - public void testGetKafkaConsumerProperties() { + void testGetKafkaConsumerProperties() { KafkaCarrierTechnologyParameters kafkaCarrierTechnologyParameters = new KafkaCarrierTechnologyParameters(); Properties kafkaConsumerProperties = kafkaCarrierTechnologyParameters.getKafkaConsumerProperties(); assertNotNull(kafkaConsumerProperties); assertEquals("localhost:9092", kafkaConsumerProperties.get("bootstrap.servers")); assertEquals("default-group-id", kafkaConsumerProperties.get("group.id")); - assertEquals(null, kafkaConsumerProperties.get("linger.ms")); - assertEquals(null, kafkaConsumerProperties.get("Property0")); - assertEquals(null, kafkaConsumerProperties.get("Property1")); - assertEquals(null, kafkaConsumerProperties.get("Property2")); + assertNull(kafkaConsumerProperties.get("linger.ms")); + assertNull(kafkaConsumerProperties.get("Property0")); + assertNull(kafkaConsumerProperties.get("Property1")); + assertNull(kafkaConsumerProperties.get("Property2")); // @formatter:off String[][] kafkaProperties = { @@ -103,204 +104,30 @@ public class KafkaCarrierTechnologyParametersTest { assertNotNull(kafkaConsumerProperties); assertEquals("localhost:9092", kafkaConsumerProperties.get("bootstrap.servers")); assertEquals("default-group-id", kafkaConsumerProperties.get("group.id")); - assertEquals(null, kafkaConsumerProperties.get("linger.ms")); + assertNull(kafkaConsumerProperties.get("linger.ms")); assertEquals("Value0", kafkaConsumerProperties.get("Property0")); assertEquals("Value1", kafkaConsumerProperties.get("Property1")); - assertEquals(null, kafkaConsumerProperties.get("Property2")); + assertNull(kafkaConsumerProperties.get("Property2")); } @Test - public void testValidate() { + void testValidate() { KafkaCarrierTechnologyParameters kafkaCarrierTechnologyParameters = new KafkaCarrierTechnologyParameters(); assertNotNull(kafkaCarrierTechnologyParameters); assertTrue(kafkaCarrierTechnologyParameters.validate().isValid()); - String origStringValue = kafkaCarrierTechnologyParameters.getBootstrapServers(); - kafkaCarrierTechnologyParameters.setBootstrapServers(" "); - assertFalse(kafkaCarrierTechnologyParameters.validate().isValid()); - kafkaCarrierTechnologyParameters.setBootstrapServers(origStringValue); - assertTrue(kafkaCarrierTechnologyParameters.validate().isValid()); - - origStringValue = kafkaCarrierTechnologyParameters.getAcks(); - kafkaCarrierTechnologyParameters.setAcks(" "); - assertFalse(kafkaCarrierTechnologyParameters.validate().isValid()); - kafkaCarrierTechnologyParameters.setAcks(origStringValue); - assertTrue(kafkaCarrierTechnologyParameters.validate().isValid()); - - origStringValue = kafkaCarrierTechnologyParameters.getGroupId(); - kafkaCarrierTechnologyParameters.setGroupId(" "); - assertFalse(kafkaCarrierTechnologyParameters.validate().isValid()); - kafkaCarrierTechnologyParameters.setGroupId(origStringValue); - assertTrue(kafkaCarrierTechnologyParameters.validate().isValid()); - - origStringValue = kafkaCarrierTechnologyParameters.getProducerTopic(); - kafkaCarrierTechnologyParameters.setProducerTopic(" "); - assertFalse(kafkaCarrierTechnologyParameters.validate().isValid()); - kafkaCarrierTechnologyParameters.setProducerTopic(origStringValue); - assertTrue(kafkaCarrierTechnologyParameters.validate().isValid()); - - int origIntValue = kafkaCarrierTechnologyParameters.getRetries(); - kafkaCarrierTechnologyParameters.setRetries(-1); - assertFalse(kafkaCarrierTechnologyParameters.validate().isValid()); - kafkaCarrierTechnologyParameters.setRetries(origIntValue); - assertTrue(kafkaCarrierTechnologyParameters.validate().isValid()); - - origIntValue = kafkaCarrierTechnologyParameters.getBatchSize(); - kafkaCarrierTechnologyParameters.setBatchSize(-1); - assertFalse(kafkaCarrierTechnologyParameters.validate().isValid()); - kafkaCarrierTechnologyParameters.setBatchSize(origIntValue); - assertTrue(kafkaCarrierTechnologyParameters.validate().isValid()); - - origIntValue = kafkaCarrierTechnologyParameters.getLingerTime(); - kafkaCarrierTechnologyParameters.setLingerTime(-1); - assertFalse(kafkaCarrierTechnologyParameters.validate().isValid()); - kafkaCarrierTechnologyParameters.setLingerTime(origIntValue); - assertTrue(kafkaCarrierTechnologyParameters.validate().isValid()); - - long origLongValue = kafkaCarrierTechnologyParameters.getBufferMemory(); - kafkaCarrierTechnologyParameters.setBufferMemory(-1); - assertFalse(kafkaCarrierTechnologyParameters.validate().isValid()); - kafkaCarrierTechnologyParameters.setBufferMemory(origLongValue); - assertTrue(kafkaCarrierTechnologyParameters.validate().isValid()); - - origIntValue = kafkaCarrierTechnologyParameters.getAutoCommitTime(); - kafkaCarrierTechnologyParameters.setAutoCommitTime(-1); - assertFalse(kafkaCarrierTechnologyParameters.validate().isValid()); - kafkaCarrierTechnologyParameters.setAutoCommitTime(origIntValue); - assertTrue(kafkaCarrierTechnologyParameters.validate().isValid()); - - origIntValue = kafkaCarrierTechnologyParameters.getSessionTimeout(); - kafkaCarrierTechnologyParameters.setSessionTimeout(-1); - assertFalse(kafkaCarrierTechnologyParameters.validate().isValid()); - kafkaCarrierTechnologyParameters.setSessionTimeout(origIntValue); - assertTrue(kafkaCarrierTechnologyParameters.validate().isValid()); - - origIntValue = kafkaCarrierTechnologyParameters.getConsumerPollTime(); - kafkaCarrierTechnologyParameters.setConsumerPollTime(-1); - assertFalse(kafkaCarrierTechnologyParameters.validate().isValid()); - kafkaCarrierTechnologyParameters.setConsumerPollTime(origIntValue); - assertTrue(kafkaCarrierTechnologyParameters.validate().isValid()); - - origStringValue = kafkaCarrierTechnologyParameters.getKeySerializer(); - kafkaCarrierTechnologyParameters.setKeySerializer(" "); - assertFalse(kafkaCarrierTechnologyParameters.validate().isValid()); - kafkaCarrierTechnologyParameters.setKeySerializer(origStringValue); - assertTrue(kafkaCarrierTechnologyParameters.validate().isValid()); - - origStringValue = kafkaCarrierTechnologyParameters.getValueSerializer(); - kafkaCarrierTechnologyParameters.setValueSerializer(" "); - assertFalse(kafkaCarrierTechnologyParameters.validate().isValid()); - kafkaCarrierTechnologyParameters.setValueSerializer(origStringValue); - assertTrue(kafkaCarrierTechnologyParameters.validate().isValid()); - - origStringValue = kafkaCarrierTechnologyParameters.getKeyDeserializer(); - kafkaCarrierTechnologyParameters.setKeyDeserializer(" "); - assertFalse(kafkaCarrierTechnologyParameters.validate().isValid()); - kafkaCarrierTechnologyParameters.setKeyDeserializer(origStringValue); - assertTrue(kafkaCarrierTechnologyParameters.validate().isValid()); - - origStringValue = kafkaCarrierTechnologyParameters.getValueDeserializer(); - kafkaCarrierTechnologyParameters.setValueDeserializer(" "); - assertFalse(kafkaCarrierTechnologyParameters.validate().isValid()); - kafkaCarrierTechnologyParameters.setValueDeserializer(origStringValue); - assertTrue(kafkaCarrierTechnologyParameters.validate().isValid()); - - String[] origConsumerTopcList = kafkaCarrierTechnologyParameters.getConsumerTopicList(); - kafkaCarrierTechnologyParameters.setConsumerTopicList(null); - assertFalse(kafkaCarrierTechnologyParameters.validate().isValid()); - kafkaCarrierTechnologyParameters.setConsumerTopicList(origConsumerTopcList); - assertTrue(kafkaCarrierTechnologyParameters.validate().isValid()); - - kafkaCarrierTechnologyParameters.setConsumerTopicList(new String[0]); - assertFalse(kafkaCarrierTechnologyParameters.validate().isValid()); - kafkaCarrierTechnologyParameters.setConsumerTopicList(origConsumerTopcList); - assertTrue(kafkaCarrierTechnologyParameters.validate().isValid()); - - String[] blankStringList = { null, "" }; - kafkaCarrierTechnologyParameters.setConsumerTopicList(blankStringList); - assertFalse(kafkaCarrierTechnologyParameters.validate().isValid()); - kafkaCarrierTechnologyParameters.setConsumerTopicList(origConsumerTopcList); - assertTrue(kafkaCarrierTechnologyParameters.validate().isValid()); - - String[][] origKafkaProperties = kafkaCarrierTechnologyParameters.getKafkaProperties(); - kafkaCarrierTechnologyParameters.setKafkaProperties(null); - assertTrue(kafkaCarrierTechnologyParameters.validate().isValid()); - kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties); - assertTrue(kafkaCarrierTechnologyParameters.validate().isValid()); + assertValidateStringProperties(kafkaCarrierTechnologyParameters); - kafkaCarrierTechnologyParameters.setKafkaProperties(new String[0][0]); - assertTrue(kafkaCarrierTechnologyParameters.validate().isValid()); - kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties); - assertTrue(kafkaCarrierTechnologyParameters.validate().isValid()); + assertValidateNumberProperties(kafkaCarrierTechnologyParameters); - // @formatter:offkafkaCarrierTechnologyParameters - String[][] kafkaProperties0 = { - { - null, "Value0" - } - }; - // @formatter:on + assertValidateTopicList(kafkaCarrierTechnologyParameters); - kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties0); - assertFalse(kafkaCarrierTechnologyParameters.validate().isValid()); - kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties); - assertTrue(kafkaCarrierTechnologyParameters.validate().isValid()); - - // @formatter:off - String[][] kafkaProperties1 = { - { - "Property1", null - } - }; - // @formatter:on - - kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties1); - assertFalse(kafkaCarrierTechnologyParameters.validate().isValid()); - kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties); - assertTrue(kafkaCarrierTechnologyParameters.validate().isValid()); - - // @formatter:off - String[][] kafkaProperties2 = { - { - "Property1", null - } - }; - // @formatter:on - - kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties2); - assertFalse(kafkaCarrierTechnologyParameters.validate().isValid()); - - // @formatter:off - String[][] kafkaPropertiesWithEmptyValue = { - { - "Property1", "" - } - }; - // @formatter:on - - kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaPropertiesWithEmptyValue); - assertTrue(kafkaCarrierTechnologyParameters.validate().isValid()); - - kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties); - assertTrue(kafkaCarrierTechnologyParameters.validate().isValid()); - - // @formatter:off - String[][] kafkaProperties3 = { - { - "Property1", "Value0", "Value1" - } - }; - // @formatter:on - - kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties3); - assertFalse(kafkaCarrierTechnologyParameters.validate().isValid()); - kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties); - assertTrue(kafkaCarrierTechnologyParameters.validate().isValid()); + assertValidateKafkaProperties(kafkaCarrierTechnologyParameters); } @Test - public void testExplicitImplicit() { + void testExplicitImplicit() { KafkaCarrierTechnologyParameters kafkaCtp = new KafkaCarrierTechnologyParameters(); assertNotNull(kafkaCtp); @@ -378,4 +205,194 @@ public class KafkaCarrierTechnologyParametersTest { kafkaCtp.setKafkaProperties(kafkaProperties3); assertEquals("localhost:7777", kafkaCtp.getKafkaConsumerProperties().get("bootstrap.servers")); } + + private static void assertValidateStringProperties(KafkaCarrierTechnologyParameters kafkaParameters) { + String origStringValue = kafkaParameters.getBootstrapServers(); + kafkaParameters.setBootstrapServers(" "); + assertFalse(kafkaParameters.validate().isValid()); + kafkaParameters.setBootstrapServers(origStringValue); + assertTrue(kafkaParameters.validate().isValid()); + + origStringValue = kafkaParameters.getAcks(); + kafkaParameters.setAcks(" "); + assertFalse(kafkaParameters.validate().isValid()); + kafkaParameters.setAcks(origStringValue); + assertTrue(kafkaParameters.validate().isValid()); + + origStringValue = kafkaParameters.getGroupId(); + kafkaParameters.setGroupId(" "); + assertFalse(kafkaParameters.validate().isValid()); + kafkaParameters.setGroupId(origStringValue); + assertTrue(kafkaParameters.validate().isValid()); + + origStringValue = kafkaParameters.getProducerTopic(); + kafkaParameters.setProducerTopic(" "); + assertFalse(kafkaParameters.validate().isValid()); + kafkaParameters.setProducerTopic(origStringValue); + assertTrue(kafkaParameters.validate().isValid()); + + origStringValue = kafkaParameters.getKeySerializer(); + kafkaParameters.setKeySerializer(" "); + assertFalse(kafkaParameters.validate().isValid()); + kafkaParameters.setKeySerializer(origStringValue); + assertTrue(kafkaParameters.validate().isValid()); + + origStringValue = kafkaParameters.getValueSerializer(); + kafkaParameters.setValueSerializer(" "); + assertFalse(kafkaParameters.validate().isValid()); + kafkaParameters.setValueSerializer(origStringValue); + assertTrue(kafkaParameters.validate().isValid()); + + origStringValue = kafkaParameters.getKeyDeserializer(); + kafkaParameters.setKeyDeserializer(" "); + assertFalse(kafkaParameters.validate().isValid()); + kafkaParameters.setKeyDeserializer(origStringValue); + assertTrue(kafkaParameters.validate().isValid()); + + origStringValue = kafkaParameters.getValueDeserializer(); + kafkaParameters.setValueDeserializer(" "); + assertFalse(kafkaParameters.validate().isValid()); + kafkaParameters.setValueDeserializer(origStringValue); + assertTrue(kafkaParameters.validate().isValid()); + } + + private static void assertValidateTopicList(KafkaCarrierTechnologyParameters kafkaParameters) { + String[] origConsumerTopicList = kafkaParameters.getConsumerTopicList(); + kafkaParameters.setConsumerTopicList(null); + assertFalse(kafkaParameters.validate().isValid()); + kafkaParameters.setConsumerTopicList(origConsumerTopicList); + assertTrue(kafkaParameters.validate().isValid()); + + kafkaParameters.setConsumerTopicList(new String[0]); + assertFalse(kafkaParameters.validate().isValid()); + kafkaParameters.setConsumerTopicList(origConsumerTopicList); + assertTrue(kafkaParameters.validate().isValid()); + + String[] blankStringList = { null, "" }; + kafkaParameters.setConsumerTopicList(blankStringList); + assertFalse(kafkaParameters.validate().isValid()); + kafkaParameters.setConsumerTopicList(origConsumerTopicList); + assertTrue(kafkaParameters.validate().isValid()); + } + + private static void assertValidateNumberProperties(KafkaCarrierTechnologyParameters kafkaParameters) { + int origIntValue = kafkaParameters.getRetries(); + kafkaParameters.setRetries(-1); + assertFalse(kafkaParameters.validate().isValid()); + kafkaParameters.setRetries(origIntValue); + assertTrue(kafkaParameters.validate().isValid()); + + origIntValue = kafkaParameters.getBatchSize(); + kafkaParameters.setBatchSize(-1); + assertFalse(kafkaParameters.validate().isValid()); + kafkaParameters.setBatchSize(origIntValue); + assertTrue(kafkaParameters.validate().isValid()); + + origIntValue = kafkaParameters.getLingerTime(); + kafkaParameters.setLingerTime(-1); + assertFalse(kafkaParameters.validate().isValid()); + kafkaParameters.setLingerTime(origIntValue); + assertTrue(kafkaParameters.validate().isValid()); + + long origLongValue = kafkaParameters.getBufferMemory(); + kafkaParameters.setBufferMemory(-1); + assertFalse(kafkaParameters.validate().isValid()); + kafkaParameters.setBufferMemory(origLongValue); + assertTrue(kafkaParameters.validate().isValid()); + + origIntValue = kafkaParameters.getAutoCommitTime(); + kafkaParameters.setAutoCommitTime(-1); + assertFalse(kafkaParameters.validate().isValid()); + kafkaParameters.setAutoCommitTime(origIntValue); + assertTrue(kafkaParameters.validate().isValid()); + + origIntValue = kafkaParameters.getSessionTimeout(); + kafkaParameters.setSessionTimeout(-1); + assertFalse(kafkaParameters.validate().isValid()); + kafkaParameters.setSessionTimeout(origIntValue); + assertTrue(kafkaParameters.validate().isValid()); + + origIntValue = kafkaParameters.getConsumerPollTime(); + kafkaParameters.setConsumerPollTime(-1); + assertFalse(kafkaParameters.validate().isValid()); + kafkaParameters.setConsumerPollTime(origIntValue); + assertTrue(kafkaParameters.validate().isValid()); + } + + private static void assertValidateKafkaProperties(KafkaCarrierTechnologyParameters kafkaParameters) { + String[][] origKafkaProperties = kafkaParameters.getKafkaProperties(); + kafkaParameters.setKafkaProperties(null); + assertTrue(kafkaParameters.validate().isValid()); + kafkaParameters.setKafkaProperties(origKafkaProperties); + assertTrue(kafkaParameters.validate().isValid()); + + kafkaParameters.setKafkaProperties(new String[0][0]); + assertTrue(kafkaParameters.validate().isValid()); + kafkaParameters.setKafkaProperties(origKafkaProperties); + assertTrue(kafkaParameters.validate().isValid()); + + // @formatter:off + String[][] kafkaProperties0 = { + { + null, "Value0" + } + }; + // @formatter:on + + kafkaParameters.setKafkaProperties(kafkaProperties0); + assertFalse(kafkaParameters.validate().isValid()); + kafkaParameters.setKafkaProperties(origKafkaProperties); + assertTrue(kafkaParameters.validate().isValid()); + + // @formatter:off + String[][] kafkaProperties1 = { + { + "Property1", null + } + }; + // @formatter:on + + kafkaParameters.setKafkaProperties(kafkaProperties1); + assertFalse(kafkaParameters.validate().isValid()); + kafkaParameters.setKafkaProperties(origKafkaProperties); + assertTrue(kafkaParameters.validate().isValid()); + + // @formatter:off + String[][] kafkaProperties2 = { + { + "Property1", null + } + }; + // @formatter:on + + kafkaParameters.setKafkaProperties(kafkaProperties2); + assertFalse(kafkaParameters.validate().isValid()); + + // @formatter:off + String[][] kafkaPropertiesWithEmptyValue = { + { + "Property1", "" + } + }; + // @formatter:on + + kafkaParameters.setKafkaProperties(kafkaPropertiesWithEmptyValue); + assertTrue(kafkaParameters.validate().isValid()); + + kafkaParameters.setKafkaProperties(origKafkaProperties); + assertTrue(kafkaParameters.validate().isValid()); + + // @formatter:off + String[][] kafkaProperties3 = { + { + "Property1", "Value0", "Value1" + } + }; + // @formatter:on + + kafkaParameters.setKafkaProperties(kafkaProperties3); + assertFalse(kafkaParameters.validate().isValid()); + kafkaParameters.setKafkaProperties(origKafkaProperties); + assertTrue(kafkaParameters.validate().isValid()); + } } |