aboutsummaryrefslogtreecommitdiffstats
path: root/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka
diff options
context:
space:
mode:
Diffstat (limited to 'plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka')
-rw-r--r--plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumerTest.java67
-rw-r--r--plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaProducerTest.java37
-rw-r--r--plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParametersTest.java421
3 files changed, 275 insertions, 250 deletions
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumerTest.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumerTest.java
index 1ef3550e4..f1e420bf0 100644
--- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumerTest.java
+++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaConsumerTest.java
@@ -1,7 +1,7 @@
/*-
* ============LICENSE_START=======================================================
* Copyright (C) 2019 Samsung. All rights reserved.
- * Modifications Copyright (C) 2020 Nordix Foundation
+ * Modifications Copyright (C) 2020, 2024 Nordix Foundation
* Modifications Copyright (C) 2022 Bell Canada. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -23,12 +23,13 @@
package org.onap.policy.apex.plugins.event.carrier.kafka;
import static org.assertj.core.api.Assertions.assertThatCode;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import org.onap.policy.apex.service.engine.event.ApexEventException;
import org.onap.policy.apex.service.engine.event.ApexEventProducer;
import org.onap.policy.apex.service.engine.event.ApexEventReceiver;
@@ -37,7 +38,7 @@ import org.onap.policy.apex.service.parameters.carriertechnology.CarrierTechnolo
import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
-public class ApexKafkaConsumerTest {
+class ApexKafkaConsumerTest {
ApexKafkaConsumer apexKafkaConsumer = null;
ApexKafkaConsumer apexKafkaConsumer2 = null;
EventHandlerParameters consumerParameters = null;
@@ -51,13 +52,14 @@ public class ApexKafkaConsumerTest {
*
* @throws ApexEventException on test set up errors.
*/
- @Before
- public void setUp() throws ApexEventException {
+ @BeforeEach
+ void setUp() throws ApexEventException {
apexKafkaConsumer = new ApexKafkaConsumer();
consumerParameters = new EventHandlerParameters();
apexKafkaProducer = new ApexKafkaProducer();
consumerParameters
- .setCarrierTechnologyParameters(new KafkaCarrierTechnologyParameters() {});
+ .setCarrierTechnologyParameters(new KafkaCarrierTechnologyParameters() {
+ });
apexKafkaConsumer.init("TestApexKafkaConsumer", consumerParameters, incomingEventReceiver);
apexKafkaConsumer2 = new ApexKafkaConsumer();
@@ -65,63 +67,66 @@ public class ApexKafkaConsumerTest {
kafkaParameters = new KafkaCarrierTechnologyParameters();
String[][] kafkaProperties = {
{"value.deserializer", "io.confluent.kafka.serializers.KafkaAvroDeserializer"},
- {"schema.registry.url", "[http://test-registory:8080]"}
+ {"schema.registry.url", "[https://test-registory:8080]"}
};
kafkaParameters.setKafkaProperties(kafkaProperties);
consumerParameters2
- .setCarrierTechnologyParameters(kafkaParameters);
+ .setCarrierTechnologyParameters(kafkaParameters);
apexKafkaConsumer2.init("TestApexKafkaConsumer2", consumerParameters2, incomingEventReceiver);
}
@Test
- public void testStart() {
+ void testStart() {
assertThatCode(apexKafkaConsumer::start).doesNotThrowAnyException();
assertThatCode(apexKafkaConsumer2::start).doesNotThrowAnyException();
}
@Test
- public void testGetName() {
+ void testGetName() {
assertEquals("TestApexKafkaConsumer", apexKafkaConsumer.getName());
assertEquals("TestApexKafkaConsumer2", apexKafkaConsumer2.getName());
}
@Test
- public void testGetPeeredReference() {
+ void testGetPeeredReference() {
assertNull(apexKafkaConsumer.getPeeredReference(EventHandlerPeeredMode.REQUESTOR));
assertNull(apexKafkaConsumer2.getPeeredReference(EventHandlerPeeredMode.REQUESTOR));
}
@Test
- public void testSetPeeredReference() {
+ void testSetPeeredReference() {
PeeredReference peeredReference = new PeeredReference(EventHandlerPeeredMode.REQUESTOR,
- apexKafkaConsumer, apexKafkaProducer);
+ apexKafkaConsumer, apexKafkaProducer);
apexKafkaConsumer.setPeeredReference(EventHandlerPeeredMode.REQUESTOR, peeredReference);
assertNotNull(apexKafkaConsumer.getPeeredReference(EventHandlerPeeredMode.REQUESTOR));
PeeredReference peeredReference2 = new PeeredReference(EventHandlerPeeredMode.REQUESTOR,
- apexKafkaConsumer2, apexKafkaProducer);
+ apexKafkaConsumer2, apexKafkaProducer);
apexKafkaConsumer2.setPeeredReference(EventHandlerPeeredMode.REQUESTOR, peeredReference2);
assertNotNull(apexKafkaConsumer2.getPeeredReference(EventHandlerPeeredMode.REQUESTOR));
}
- @Test(expected = java.lang.NullPointerException.class)
- public void testRun() {
- apexKafkaConsumer.run();
- apexKafkaConsumer2.run();
+ @Test
+ void testRun() {
+ assertThrows(NullPointerException.class, () -> apexKafkaConsumer.run());
+ assertThrows(NullPointerException.class, () -> apexKafkaConsumer2.run());
}
- @Test(expected = java.lang.NullPointerException.class)
- public void testStop() {
- apexKafkaConsumer.stop();
- apexKafkaConsumer2.stop();
+ @Test
+ void testStop() {
+ assertThrows(NullPointerException.class, () -> apexKafkaConsumer.stop());
+ assertThrows(NullPointerException.class, () -> apexKafkaConsumer2.stop());
}
- @Test(expected = ApexEventException.class)
- public void testInitWithNonKafkaCarrierTechnologyParameters() throws ApexEventException {
- consumerParameters.setCarrierTechnologyParameters(new CarrierTechnologyParameters() {});
- apexKafkaConsumer.init("TestApexKafkaConsumer", consumerParameters, incomingEventReceiver);
- apexKafkaConsumer2.init("TestApexKafkaConsumer2", consumerParameters, incomingEventReceiver);
+ @Test
+ void testInitWithNonKafkaCarrierTechnologyParameters() {
+ consumerParameters.setCarrierTechnologyParameters(new CarrierTechnologyParameters() {
+ });
+ assertThrows(ApexEventException.class, () ->
+ apexKafkaConsumer.init("TestApexKafkaConsumer", consumerParameters, incomingEventReceiver));
+ assertThrows(ApexEventException.class, () ->
+ apexKafkaConsumer2.init("TestApexKafkaConsumer2", consumerParameters, incomingEventReceiver));
}
}
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaProducerTest.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaProducerTest.java
index 7300474c7..f5606eb57 100644
--- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaProducerTest.java
+++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/ApexKafkaProducerTest.java
@@ -1,6 +1,7 @@
/*-
* ============LICENSE_START=======================================================
* Copyright (C) 2019 Samsung. All rights reserved.
+ * Modifications Copyright (C) 2024 Nordix Foundation.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,19 +21,20 @@
package org.onap.policy.apex.plugins.event.carrier.kafka;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
import org.onap.policy.apex.service.engine.event.ApexEventException;
import org.onap.policy.apex.service.engine.event.PeeredReference;
import org.onap.policy.apex.service.engine.event.SynchronousEventCache;
import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerParameters;
import org.onap.policy.apex.service.parameters.eventhandler.EventHandlerPeeredMode;
-public class ApexKafkaProducerTest {
+class ApexKafkaProducerTest {
ApexKafkaProducer apexKafkaProducer = null;
ApexKafkaConsumer apexKafkaConsumer = null;
EventHandlerParameters producerParameters = null;
@@ -43,37 +45,38 @@ public class ApexKafkaProducerTest {
/**
* Set up testing.
*/
- @Before
- public void setUp() throws Exception {
+ @BeforeEach
+ void setUp() {
apexKafkaProducer = new ApexKafkaProducer();
apexKafkaConsumer = new ApexKafkaConsumer();
producerParameters = new EventHandlerParameters();
}
- @Test(expected = ApexEventException.class)
- public void testInit() throws ApexEventException {
- apexKafkaProducer.init("TestApexKafkaProducer", producerParameters);
+ @Test
+ void testInit() {
+ assertThrows(ApexEventException.class,
+ () -> apexKafkaProducer.init("TestApexKafkaProducer", producerParameters));
}
@Test
- public void testGetName() {
+ void testGetName() {
assertNull(apexKafkaProducer.getName());
}
@Test
- public void testGetPeeredReference() {
+ void testGetPeeredReference() {
assertNull(apexKafkaProducer.getPeeredReference(EventHandlerPeeredMode.SYNCHRONOUS));
}
@Test
- public void testWithProperValues() throws ApexEventException {
+ void testWithProperValues() throws ApexEventException {
producerParameters
- .setCarrierTechnologyParameters(new KafkaCarrierTechnologyParameters() {});
+ .setCarrierTechnologyParameters(new KafkaCarrierTechnologyParameters() { });
synchronousEventCache = new SynchronousEventCache(EventHandlerPeeredMode.SYNCHRONOUS,
- apexKafkaConsumer, apexKafkaProducer, DEFAULT_SYNCHRONOUS_EVENT_TIMEOUT);
+ apexKafkaConsumer, apexKafkaProducer, DEFAULT_SYNCHRONOUS_EVENT_TIMEOUT);
apexKafkaProducer.setPeeredReference(EventHandlerPeeredMode.SYNCHRONOUS,
- synchronousEventCache);
+ synchronousEventCache);
apexKafkaProducer.init("TestApexKafkaProducer", producerParameters);
assertEquals("TestApexKafkaProducer", apexKafkaProducer.getName());
assertNotNull(apexKafkaProducer.getPeeredReference(EventHandlerPeeredMode.SYNCHRONOUS));
diff --git a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParametersTest.java b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParametersTest.java
index 6b0f7d920..11bafc9b1 100644
--- a/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParametersTest.java
+++ b/plugins/plugins-event/plugins-event-carrier/plugins-event-carrier-kafka/src/test/java/org/onap/policy/apex/plugins/event/carrier/kafka/KafkaCarrierTechnologyParametersTest.java
@@ -1,7 +1,7 @@
/*-
* ============LICENSE_START=======================================================
* Copyright (C) 2019 Samsung. All rights reserved.
- * Modifications Copyright (C) 2019,2023 Nordix Foundation.
+ * Modifications Copyright (C) 2019, 2023-2024 Nordix Foundation.
* Modifications Copyright (C) 2021 Bell Canada. All rights reserved.
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -22,17 +22,18 @@
package org.onap.policy.apex.plugins.event.carrier.kafka;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.Properties;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
-public class KafkaCarrierTechnologyParametersTest {
+class KafkaCarrierTechnologyParametersTest {
@Test
- public void testKafkaCarrierTechnologyParameters() {
+ void testKafkaCarrierTechnologyParameters() {
KafkaCarrierTechnologyParameters kafkaCarrierTechnologyParameters = new KafkaCarrierTechnologyParameters();
assertNotNull(kafkaCarrierTechnologyParameters);
@@ -40,17 +41,17 @@ public class KafkaCarrierTechnologyParametersTest {
}
@Test
- public void testGetKafkaProducerProperties() {
+ void testGetKafkaProducerProperties() {
KafkaCarrierTechnologyParameters kafkaCarrierTechnologyParameters = new KafkaCarrierTechnologyParameters();
Properties kafkaProducerProperties = kafkaCarrierTechnologyParameters.getKafkaProducerProperties();
assertNotNull(kafkaProducerProperties);
assertEquals("localhost:9092", kafkaProducerProperties.get("bootstrap.servers"));
assertEquals("1", kafkaProducerProperties.get("linger.ms"));
- assertEquals(null, kafkaProducerProperties.get("group.id"));
- assertEquals(null, kafkaProducerProperties.get("Property0"));
- assertEquals(null, kafkaProducerProperties.get("Property1"));
- assertEquals(null, kafkaProducerProperties.get("Property2"));
+ assertNull(kafkaProducerProperties.get("group.id"));
+ assertNull(kafkaProducerProperties.get("Property0"));
+ assertNull(kafkaProducerProperties.get("Property1"));
+ assertNull(kafkaProducerProperties.get("Property2"));
// @formatter:off
String[][] kafkaProperties = {
@@ -68,24 +69,24 @@ public class KafkaCarrierTechnologyParametersTest {
assertNotNull(kafkaProducerProperties);
assertEquals("localhost:9092", kafkaProducerProperties.get("bootstrap.servers"));
assertEquals("1", kafkaProducerProperties.get("linger.ms"));
- assertEquals(null, kafkaProducerProperties.get("group.id"));
+ assertNull(kafkaProducerProperties.get("group.id"));
assertEquals("Value0", kafkaProducerProperties.get("Property0"));
assertEquals("Value1", kafkaProducerProperties.get("Property1"));
- assertEquals(null, kafkaProducerProperties.get("Property2"));
+ assertNull(kafkaProducerProperties.get("Property2"));
}
@Test
- public void testGetKafkaConsumerProperties() {
+ void testGetKafkaConsumerProperties() {
KafkaCarrierTechnologyParameters kafkaCarrierTechnologyParameters = new KafkaCarrierTechnologyParameters();
Properties kafkaConsumerProperties = kafkaCarrierTechnologyParameters.getKafkaConsumerProperties();
assertNotNull(kafkaConsumerProperties);
assertEquals("localhost:9092", kafkaConsumerProperties.get("bootstrap.servers"));
assertEquals("default-group-id", kafkaConsumerProperties.get("group.id"));
- assertEquals(null, kafkaConsumerProperties.get("linger.ms"));
- assertEquals(null, kafkaConsumerProperties.get("Property0"));
- assertEquals(null, kafkaConsumerProperties.get("Property1"));
- assertEquals(null, kafkaConsumerProperties.get("Property2"));
+ assertNull(kafkaConsumerProperties.get("linger.ms"));
+ assertNull(kafkaConsumerProperties.get("Property0"));
+ assertNull(kafkaConsumerProperties.get("Property1"));
+ assertNull(kafkaConsumerProperties.get("Property2"));
// @formatter:off
String[][] kafkaProperties = {
@@ -103,204 +104,30 @@ public class KafkaCarrierTechnologyParametersTest {
assertNotNull(kafkaConsumerProperties);
assertEquals("localhost:9092", kafkaConsumerProperties.get("bootstrap.servers"));
assertEquals("default-group-id", kafkaConsumerProperties.get("group.id"));
- assertEquals(null, kafkaConsumerProperties.get("linger.ms"));
+ assertNull(kafkaConsumerProperties.get("linger.ms"));
assertEquals("Value0", kafkaConsumerProperties.get("Property0"));
assertEquals("Value1", kafkaConsumerProperties.get("Property1"));
- assertEquals(null, kafkaConsumerProperties.get("Property2"));
+ assertNull(kafkaConsumerProperties.get("Property2"));
}
@Test
- public void testValidate() {
+ void testValidate() {
KafkaCarrierTechnologyParameters kafkaCarrierTechnologyParameters = new KafkaCarrierTechnologyParameters();
assertNotNull(kafkaCarrierTechnologyParameters);
assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
- String origStringValue = kafkaCarrierTechnologyParameters.getBootstrapServers();
- kafkaCarrierTechnologyParameters.setBootstrapServers(" ");
- assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
- kafkaCarrierTechnologyParameters.setBootstrapServers(origStringValue);
- assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
-
- origStringValue = kafkaCarrierTechnologyParameters.getAcks();
- kafkaCarrierTechnologyParameters.setAcks(" ");
- assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
- kafkaCarrierTechnologyParameters.setAcks(origStringValue);
- assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
-
- origStringValue = kafkaCarrierTechnologyParameters.getGroupId();
- kafkaCarrierTechnologyParameters.setGroupId(" ");
- assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
- kafkaCarrierTechnologyParameters.setGroupId(origStringValue);
- assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
-
- origStringValue = kafkaCarrierTechnologyParameters.getProducerTopic();
- kafkaCarrierTechnologyParameters.setProducerTopic(" ");
- assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
- kafkaCarrierTechnologyParameters.setProducerTopic(origStringValue);
- assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
-
- int origIntValue = kafkaCarrierTechnologyParameters.getRetries();
- kafkaCarrierTechnologyParameters.setRetries(-1);
- assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
- kafkaCarrierTechnologyParameters.setRetries(origIntValue);
- assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
-
- origIntValue = kafkaCarrierTechnologyParameters.getBatchSize();
- kafkaCarrierTechnologyParameters.setBatchSize(-1);
- assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
- kafkaCarrierTechnologyParameters.setBatchSize(origIntValue);
- assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
-
- origIntValue = kafkaCarrierTechnologyParameters.getLingerTime();
- kafkaCarrierTechnologyParameters.setLingerTime(-1);
- assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
- kafkaCarrierTechnologyParameters.setLingerTime(origIntValue);
- assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
-
- long origLongValue = kafkaCarrierTechnologyParameters.getBufferMemory();
- kafkaCarrierTechnologyParameters.setBufferMemory(-1);
- assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
- kafkaCarrierTechnologyParameters.setBufferMemory(origLongValue);
- assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
-
- origIntValue = kafkaCarrierTechnologyParameters.getAutoCommitTime();
- kafkaCarrierTechnologyParameters.setAutoCommitTime(-1);
- assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
- kafkaCarrierTechnologyParameters.setAutoCommitTime(origIntValue);
- assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
-
- origIntValue = kafkaCarrierTechnologyParameters.getSessionTimeout();
- kafkaCarrierTechnologyParameters.setSessionTimeout(-1);
- assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
- kafkaCarrierTechnologyParameters.setSessionTimeout(origIntValue);
- assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
-
- origIntValue = kafkaCarrierTechnologyParameters.getConsumerPollTime();
- kafkaCarrierTechnologyParameters.setConsumerPollTime(-1);
- assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
- kafkaCarrierTechnologyParameters.setConsumerPollTime(origIntValue);
- assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
-
- origStringValue = kafkaCarrierTechnologyParameters.getKeySerializer();
- kafkaCarrierTechnologyParameters.setKeySerializer(" ");
- assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
- kafkaCarrierTechnologyParameters.setKeySerializer(origStringValue);
- assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
-
- origStringValue = kafkaCarrierTechnologyParameters.getValueSerializer();
- kafkaCarrierTechnologyParameters.setValueSerializer(" ");
- assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
- kafkaCarrierTechnologyParameters.setValueSerializer(origStringValue);
- assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
-
- origStringValue = kafkaCarrierTechnologyParameters.getKeyDeserializer();
- kafkaCarrierTechnologyParameters.setKeyDeserializer(" ");
- assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
- kafkaCarrierTechnologyParameters.setKeyDeserializer(origStringValue);
- assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
-
- origStringValue = kafkaCarrierTechnologyParameters.getValueDeserializer();
- kafkaCarrierTechnologyParameters.setValueDeserializer(" ");
- assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
- kafkaCarrierTechnologyParameters.setValueDeserializer(origStringValue);
- assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
-
- String[] origConsumerTopcList = kafkaCarrierTechnologyParameters.getConsumerTopicList();
- kafkaCarrierTechnologyParameters.setConsumerTopicList(null);
- assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
- kafkaCarrierTechnologyParameters.setConsumerTopicList(origConsumerTopcList);
- assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
-
- kafkaCarrierTechnologyParameters.setConsumerTopicList(new String[0]);
- assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
- kafkaCarrierTechnologyParameters.setConsumerTopicList(origConsumerTopcList);
- assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
-
- String[] blankStringList = { null, "" };
- kafkaCarrierTechnologyParameters.setConsumerTopicList(blankStringList);
- assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
- kafkaCarrierTechnologyParameters.setConsumerTopicList(origConsumerTopcList);
- assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
-
- String[][] origKafkaProperties = kafkaCarrierTechnologyParameters.getKafkaProperties();
- kafkaCarrierTechnologyParameters.setKafkaProperties(null);
- assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
- kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
- assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+ assertValidateStringProperties(kafkaCarrierTechnologyParameters);
- kafkaCarrierTechnologyParameters.setKafkaProperties(new String[0][0]);
- assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
- kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
- assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+ assertValidateNumberProperties(kafkaCarrierTechnologyParameters);
- // @formatter:offkafkaCarrierTechnologyParameters
- String[][] kafkaProperties0 = {
- {
- null, "Value0"
- }
- };
- // @formatter:on
+ assertValidateTopicList(kafkaCarrierTechnologyParameters);
- kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties0);
- assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
- kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
- assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
-
- // @formatter:off
- String[][] kafkaProperties1 = {
- {
- "Property1", null
- }
- };
- // @formatter:on
-
- kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties1);
- assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
- kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
- assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
-
- // @formatter:off
- String[][] kafkaProperties2 = {
- {
- "Property1", null
- }
- };
- // @formatter:on
-
- kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties2);
- assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
-
- // @formatter:off
- String[][] kafkaPropertiesWithEmptyValue = {
- {
- "Property1", ""
- }
- };
- // @formatter:on
-
- kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaPropertiesWithEmptyValue);
- assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
-
- kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
- assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
-
- // @formatter:off
- String[][] kafkaProperties3 = {
- {
- "Property1", "Value0", "Value1"
- }
- };
- // @formatter:on
-
- kafkaCarrierTechnologyParameters.setKafkaProperties(kafkaProperties3);
- assertFalse(kafkaCarrierTechnologyParameters.validate().isValid());
- kafkaCarrierTechnologyParameters.setKafkaProperties(origKafkaProperties);
- assertTrue(kafkaCarrierTechnologyParameters.validate().isValid());
+ assertValidateKafkaProperties(kafkaCarrierTechnologyParameters);
}
@Test
- public void testExplicitImplicit() {
+ void testExplicitImplicit() {
KafkaCarrierTechnologyParameters kafkaCtp = new KafkaCarrierTechnologyParameters();
assertNotNull(kafkaCtp);
@@ -378,4 +205,194 @@ public class KafkaCarrierTechnologyParametersTest {
kafkaCtp.setKafkaProperties(kafkaProperties3);
assertEquals("localhost:7777", kafkaCtp.getKafkaConsumerProperties().get("bootstrap.servers"));
}
+
+ private static void assertValidateStringProperties(KafkaCarrierTechnologyParameters kafkaParameters) {
+ String origStringValue = kafkaParameters.getBootstrapServers();
+ kafkaParameters.setBootstrapServers(" ");
+ assertFalse(kafkaParameters.validate().isValid());
+ kafkaParameters.setBootstrapServers(origStringValue);
+ assertTrue(kafkaParameters.validate().isValid());
+
+ origStringValue = kafkaParameters.getAcks();
+ kafkaParameters.setAcks(" ");
+ assertFalse(kafkaParameters.validate().isValid());
+ kafkaParameters.setAcks(origStringValue);
+ assertTrue(kafkaParameters.validate().isValid());
+
+ origStringValue = kafkaParameters.getGroupId();
+ kafkaParameters.setGroupId(" ");
+ assertFalse(kafkaParameters.validate().isValid());
+ kafkaParameters.setGroupId(origStringValue);
+ assertTrue(kafkaParameters.validate().isValid());
+
+ origStringValue = kafkaParameters.getProducerTopic();
+ kafkaParameters.setProducerTopic(" ");
+ assertFalse(kafkaParameters.validate().isValid());
+ kafkaParameters.setProducerTopic(origStringValue);
+ assertTrue(kafkaParameters.validate().isValid());
+
+ origStringValue = kafkaParameters.getKeySerializer();
+ kafkaParameters.setKeySerializer(" ");
+ assertFalse(kafkaParameters.validate().isValid());
+ kafkaParameters.setKeySerializer(origStringValue);
+ assertTrue(kafkaParameters.validate().isValid());
+
+ origStringValue = kafkaParameters.getValueSerializer();
+ kafkaParameters.setValueSerializer(" ");
+ assertFalse(kafkaParameters.validate().isValid());
+ kafkaParameters.setValueSerializer(origStringValue);
+ assertTrue(kafkaParameters.validate().isValid());
+
+ origStringValue = kafkaParameters.getKeyDeserializer();
+ kafkaParameters.setKeyDeserializer(" ");
+ assertFalse(kafkaParameters.validate().isValid());
+ kafkaParameters.setKeyDeserializer(origStringValue);
+ assertTrue(kafkaParameters.validate().isValid());
+
+ origStringValue = kafkaParameters.getValueDeserializer();
+ kafkaParameters.setValueDeserializer(" ");
+ assertFalse(kafkaParameters.validate().isValid());
+ kafkaParameters.setValueDeserializer(origStringValue);
+ assertTrue(kafkaParameters.validate().isValid());
+ }
+
+ private static void assertValidateTopicList(KafkaCarrierTechnologyParameters kafkaParameters) {
+ String[] origConsumerTopicList = kafkaParameters.getConsumerTopicList();
+ kafkaParameters.setConsumerTopicList(null);
+ assertFalse(kafkaParameters.validate().isValid());
+ kafkaParameters.setConsumerTopicList(origConsumerTopicList);
+ assertTrue(kafkaParameters.validate().isValid());
+
+ kafkaParameters.setConsumerTopicList(new String[0]);
+ assertFalse(kafkaParameters.validate().isValid());
+ kafkaParameters.setConsumerTopicList(origConsumerTopicList);
+ assertTrue(kafkaParameters.validate().isValid());
+
+ String[] blankStringList = { null, "" };
+ kafkaParameters.setConsumerTopicList(blankStringList);
+ assertFalse(kafkaParameters.validate().isValid());
+ kafkaParameters.setConsumerTopicList(origConsumerTopicList);
+ assertTrue(kafkaParameters.validate().isValid());
+ }
+
+ private static void assertValidateNumberProperties(KafkaCarrierTechnologyParameters kafkaParameters) {
+ int origIntValue = kafkaParameters.getRetries();
+ kafkaParameters.setRetries(-1);
+ assertFalse(kafkaParameters.validate().isValid());
+ kafkaParameters.setRetries(origIntValue);
+ assertTrue(kafkaParameters.validate().isValid());
+
+ origIntValue = kafkaParameters.getBatchSize();
+ kafkaParameters.setBatchSize(-1);
+ assertFalse(kafkaParameters.validate().isValid());
+ kafkaParameters.setBatchSize(origIntValue);
+ assertTrue(kafkaParameters.validate().isValid());
+
+ origIntValue = kafkaParameters.getLingerTime();
+ kafkaParameters.setLingerTime(-1);
+ assertFalse(kafkaParameters.validate().isValid());
+ kafkaParameters.setLingerTime(origIntValue);
+ assertTrue(kafkaParameters.validate().isValid());
+
+ long origLongValue = kafkaParameters.getBufferMemory();
+ kafkaParameters.setBufferMemory(-1);
+ assertFalse(kafkaParameters.validate().isValid());
+ kafkaParameters.setBufferMemory(origLongValue);
+ assertTrue(kafkaParameters.validate().isValid());
+
+ origIntValue = kafkaParameters.getAutoCommitTime();
+ kafkaParameters.setAutoCommitTime(-1);
+ assertFalse(kafkaParameters.validate().isValid());
+ kafkaParameters.setAutoCommitTime(origIntValue);
+ assertTrue(kafkaParameters.validate().isValid());
+
+ origIntValue = kafkaParameters.getSessionTimeout();
+ kafkaParameters.setSessionTimeout(-1);
+ assertFalse(kafkaParameters.validate().isValid());
+ kafkaParameters.setSessionTimeout(origIntValue);
+ assertTrue(kafkaParameters.validate().isValid());
+
+ origIntValue = kafkaParameters.getConsumerPollTime();
+ kafkaParameters.setConsumerPollTime(-1);
+ assertFalse(kafkaParameters.validate().isValid());
+ kafkaParameters.setConsumerPollTime(origIntValue);
+ assertTrue(kafkaParameters.validate().isValid());
+ }
+
+ private static void assertValidateKafkaProperties(KafkaCarrierTechnologyParameters kafkaParameters) {
+ String[][] origKafkaProperties = kafkaParameters.getKafkaProperties();
+ kafkaParameters.setKafkaProperties(null);
+ assertTrue(kafkaParameters.validate().isValid());
+ kafkaParameters.setKafkaProperties(origKafkaProperties);
+ assertTrue(kafkaParameters.validate().isValid());
+
+ kafkaParameters.setKafkaProperties(new String[0][0]);
+ assertTrue(kafkaParameters.validate().isValid());
+ kafkaParameters.setKafkaProperties(origKafkaProperties);
+ assertTrue(kafkaParameters.validate().isValid());
+
+ // @formatter:off
+ String[][] kafkaProperties0 = {
+ {
+ null, "Value0"
+ }
+ };
+ // @formatter:on
+
+ kafkaParameters.setKafkaProperties(kafkaProperties0);
+ assertFalse(kafkaParameters.validate().isValid());
+ kafkaParameters.setKafkaProperties(origKafkaProperties);
+ assertTrue(kafkaParameters.validate().isValid());
+
+ // @formatter:off
+ String[][] kafkaProperties1 = {
+ {
+ "Property1", null
+ }
+ };
+ // @formatter:on
+
+ kafkaParameters.setKafkaProperties(kafkaProperties1);
+ assertFalse(kafkaParameters.validate().isValid());
+ kafkaParameters.setKafkaProperties(origKafkaProperties);
+ assertTrue(kafkaParameters.validate().isValid());
+
+ // @formatter:off
+ String[][] kafkaProperties2 = {
+ {
+ "Property1", null
+ }
+ };
+ // @formatter:on
+
+ kafkaParameters.setKafkaProperties(kafkaProperties2);
+ assertFalse(kafkaParameters.validate().isValid());
+
+ // @formatter:off
+ String[][] kafkaPropertiesWithEmptyValue = {
+ {
+ "Property1", ""
+ }
+ };
+ // @formatter:on
+
+ kafkaParameters.setKafkaProperties(kafkaPropertiesWithEmptyValue);
+ assertTrue(kafkaParameters.validate().isValid());
+
+ kafkaParameters.setKafkaProperties(origKafkaProperties);
+ assertTrue(kafkaParameters.validate().isValid());
+
+ // @formatter:off
+ String[][] kafkaProperties3 = {
+ {
+ "Property1", "Value0", "Value1"
+ }
+ };
+ // @formatter:on
+
+ kafkaParameters.setKafkaProperties(kafkaProperties3);
+ assertFalse(kafkaParameters.validate().isValid());
+ kafkaParameters.setKafkaProperties(origKafkaProperties);
+ assertTrue(kafkaParameters.validate().isValid());
+ }
}