summaryrefslogtreecommitdiffstats
path: root/catalog-be/src/test
diff options
context:
space:
mode:
authordavid.mcweeney <david.mcweeney@est.tech>2022-10-04 15:46:14 +0100
committerMichael Morris <michael.morris@est.tech>2022-10-25 11:24:02 +0000
commit47f96dd966663f7f46b719451c0752721a2940a3 (patch)
tree9d875ce43f96cf3e570cc812d907fa2edd3b7945 /catalog-be/src/test
parent0d2e96c125aab4e4edfc0a8b897353c0fabdd885 (diff)
[SDC] Add kafka native messaging
Change-Id: I5ab8f580947cbc264d94bec48a5e8b659dc44c08 Issue-ID: DMAAP-1787 Signed-off-by: david.mcweeney <david.mcweeney@est.tech>
Diffstat (limited to 'catalog-be/src/test')
-rw-r--r--catalog-be/src/test/java/org/openecomp/sdc/be/components/distribution/engine/CambriaHandlerTest.java32
-rw-r--r--catalog-be/src/test/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTaskTest.java44
-rw-r--r--catalog-be/src/test/java/org/openecomp/sdc/be/components/kafka/KafkaHandlerTest.java138
-rw-r--r--catalog-be/src/test/java/org/openecomp/sdc/be/components/kafka/SdcKafkaConsumerTest.java143
-rw-r--r--catalog-be/src/test/java/org/openecomp/sdc/be/components/kafka/SdcKafkaProducerTest.java94
5 files changed, 422 insertions, 29 deletions
diff --git a/catalog-be/src/test/java/org/openecomp/sdc/be/components/distribution/engine/CambriaHandlerTest.java b/catalog-be/src/test/java/org/openecomp/sdc/be/components/distribution/engine/CambriaHandlerTest.java
index 980bb8369a..a91b246f40 100644
--- a/catalog-be/src/test/java/org/openecomp/sdc/be/components/distribution/engine/CambriaHandlerTest.java
+++ b/catalog-be/src/test/java/org/openecomp/sdc/be/components/distribution/engine/CambriaHandlerTest.java
@@ -7,9 +7,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,6 +20,11 @@
package org.openecomp.sdc.be.components.distribution.engine;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
+
import com.att.nsa.apiClient.credentials.ApiCredential;
import com.att.nsa.apiClient.http.HttpException;
import com.att.nsa.cambria.client.CambriaClient;
@@ -29,6 +34,14 @@ import com.att.nsa.cambria.client.CambriaClientBuilders.TopicManagerBuilder;
import com.att.nsa.cambria.client.CambriaConsumer;
import com.att.nsa.cambria.client.CambriaIdentityManager;
import fj.data.Either;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.security.GeneralSecurityException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
import mockit.Deencapsulation;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -45,20 +58,6 @@ import org.openecomp.sdc.common.api.ConfigurationSource;
import org.openecomp.sdc.common.impl.ExternalConfiguration;
import org.openecomp.sdc.common.impl.FSConfigurationSource;
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.security.GeneralSecurityException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doReturn;
-
@RunWith(MockitoJUnitRunner.class)
public class CambriaHandlerTest extends BeConfDependentTest {
@@ -141,6 +140,7 @@ public class CambriaHandlerTest extends BeConfDependentTest {
@Test
public void testGetTopics() throws Exception {
+
CambriaHandler testSubject;
List<String> hostSet = new LinkedList<>();
hostSet.add("mock");
diff --git a/catalog-be/src/test/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTaskTest.java b/catalog-be/src/test/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTaskTest.java
index d53476db71..9c1af39d9c 100644
--- a/catalog-be/src/test/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTaskTest.java
+++ b/catalog-be/src/test/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTaskTest.java
@@ -7,9 +7,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -25,6 +25,7 @@ import org.apache.commons.collections.CollectionUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
+import org.openecomp.sdc.be.components.kafka.KafkaHandler;
import org.openecomp.sdc.be.config.ConfigurationManager;
import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
import org.openecomp.sdc.be.config.DistributionEngineConfiguration.CreateTopicConfig;
@@ -54,6 +55,8 @@ class DistributionEngineInitTaskTest {
private CambriaHandler cambriaHandler;
+ private KafkaHandler kafkaHandler;
+
@BeforeEach
public void setup() {
ExternalConfiguration.setAppName("catalog-be");
@@ -65,6 +68,7 @@ class DistributionEngineInitTaskTest {
componentsUtils = Mockito.mock(ComponentsUtils.class);
cambriaHandler = Mockito.mock(CambriaHandler.class);
+ kafkaHandler = Mockito.mock(KafkaHandler.class);
}
@Test
@@ -88,7 +92,7 @@ class DistributionEngineInitTaskTest {
assertEquals("check next retry interval reach max retry interval", initTask.getCurrentRetryInterval(), maxRetry);
}
-
+
@Test
void checkStartTask() {
@@ -100,10 +104,10 @@ class DistributionEngineInitTaskTest {
deConfiguration.setInitRetryIntervalSec(retry);
deConfiguration.setInitMaxIntervalSec(maxRetry);
DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, deConfiguration, envName, new AtomicBoolean(false), componentsUtils, null, readEnvFromConfig(deConfiguration));
-
+
initTask.startTask();
}
-
+
@Test
void checkRestartTask() {
@@ -115,10 +119,10 @@ class DistributionEngineInitTaskTest {
deConfiguration.setInitRetryIntervalSec(retry);
deConfiguration.setInitMaxIntervalSec(maxRetry);
DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, deConfiguration, envName, new AtomicBoolean(false), componentsUtils, null, readEnvFromConfig(deConfiguration));
-
+
initTask.restartTask();
}
-
+
@Test
void checkStopTask() {
@@ -130,12 +134,12 @@ class DistributionEngineInitTaskTest {
deConfiguration.setInitRetryIntervalSec(retry);
deConfiguration.setInitMaxIntervalSec(maxRetry);
DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, deConfiguration, envName, new AtomicBoolean(false), componentsUtils, null, readEnvFromConfig(deConfiguration));
-
+
initTask.stopTask();
initTask.startTask();
initTask.stopTask();
}
-
+
@Test
void checkDestroy() {
@@ -147,10 +151,10 @@ class DistributionEngineInitTaskTest {
deConfiguration.setInitRetryIntervalSec(retry);
deConfiguration.setInitMaxIntervalSec(maxRetry);
DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, deConfiguration, envName, new AtomicBoolean(false), componentsUtils, null, readEnvFromConfig(deConfiguration));
-
+
initTask.destroy();
}
-
+
@Test
void checkRun() {
@@ -193,10 +197,10 @@ class DistributionEngineInitTaskTest {
initTask.setCambriaHandler(cambriaHandler);
boolean initFlow = initTask.initFlow();
-
+
initTask.run();
}
-
+
@Test
void testInitFlowScenarioSuccess() {
@@ -244,6 +248,20 @@ class DistributionEngineInitTaskTest {
}
@Test
+ void testInitFlowSuccessKafkaEnabled(){
+ DistributionEngineConfiguration config = new DistributionEngineConfiguration();
+ config.setInitRetryIntervalSec(1);
+ config.setInitMaxIntervalSec(1);
+
+ when(kafkaHandler.isKafkaActive()).thenReturn(true);
+ DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, config, null, new AtomicBoolean(false), componentsUtils, null, null);
+ initTask.setKafkaHandler(kafkaHandler);
+
+ boolean initFlow = initTask.initFlow();
+ assertTrue("check init flow succeed", initFlow);
+ }
+
+ @Test
void testInitFlowScenarioSuccessTopicsAlreadyExists() {
String envName = "PrOD";
diff --git a/catalog-be/src/test/java/org/openecomp/sdc/be/components/kafka/KafkaHandlerTest.java b/catalog-be/src/test/java/org/openecomp/sdc/be/components/kafka/KafkaHandlerTest.java
new file mode 100644
index 0000000000..91ee0235ad
--- /dev/null
+++ b/catalog-be/src/test/java/org/openecomp/sdc/be/components/kafka/KafkaHandlerTest.java
@@ -0,0 +1,138 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * SDC
+ * ================================================================================
+ * Copyright (C) 2022 Nordix Foundation. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.openecomp.sdc.be.components.kafka;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.when;
+
+import com.google.gson.JsonSyntaxException;
+import org.apache.kafka.common.KafkaException;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+
+import java.util.ArrayList;
+import fj.data.Either;
+import java.util.List;
+
+import org.openecomp.sdc.be.components.distribution.engine.CambriaErrorResponse;
+import org.openecomp.sdc.be.components.distribution.engine.NotificationDataImpl;
+import org.openecomp.sdc.be.components.distribution.engine.INotificationData;
+import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus;
+
+
+@ExtendWith(MockitoExtension.class)
+public class KafkaHandlerTest {
+
+ @Mock
+ private SdcKafkaConsumer mockSdcKafkaConsumer;
+
+ @Mock
+ private SdcKafkaProducer mockSdcKafkaProducer;
+
+ private KafkaHandler kafkaHandler;
+
+ @Test
+ public void testIsKafkaActiveTrue(){
+ KafkaHandler kafkaHandler = new KafkaHandler(mockSdcKafkaConsumer, mockSdcKafkaProducer, true);
+ assertTrue(kafkaHandler.isKafkaActive());
+ }
+
+ @Test
+ public void testIsKafkaActiveFalse(){
+ KafkaHandler kafkaHandler = new KafkaHandler(mockSdcKafkaConsumer, mockSdcKafkaProducer, true);
+ kafkaHandler.setKafkaActive(false);
+ assertFalse(kafkaHandler.isKafkaActive());
+ }
+
+ @Test
+ public void testFetchFromTopicSuccess(){
+ String testTopic = "testTopic";
+ List<String> mockedReturnedMessages = new ArrayList<>();
+ mockedReturnedMessages.add("message1");
+ mockedReturnedMessages.add("message2");
+ KafkaHandler kafkaHandler = new KafkaHandler(mockSdcKafkaConsumer, mockSdcKafkaProducer, true);
+ when(mockSdcKafkaConsumer.poll(any())).thenReturn(mockedReturnedMessages);
+ Either<Iterable<String>, CambriaErrorResponse> response = kafkaHandler.fetchFromTopic(testTopic);
+ Iterable<String> actualReturnedMessages = response.left().value();
+ assertTrue(response.isLeft());
+ assertEquals(actualReturnedMessages, mockedReturnedMessages);
+ }
+
+ @Test
+ public void testFetchFromTopicFail(){
+ String testTopic = "testTopic";
+ KafkaHandler kafkaHandler = new KafkaHandler(mockSdcKafkaConsumer, mockSdcKafkaProducer, true);
+ when(mockSdcKafkaConsumer.poll(any())).thenThrow(new KafkaException());
+ Either<Iterable<String>, CambriaErrorResponse> response = kafkaHandler.fetchFromTopic(testTopic);
+ CambriaErrorResponse responseValue = response.right().value();
+ assertTrue(response.isRight());
+ assertEquals(responseValue.getOperationStatus(), CambriaOperationStatus.INTERNAL_SERVER_ERROR);
+ }
+
+ @Test
+ public void testSendNotificationSuccess(){
+ String testTopic = "testTopic";
+ KafkaHandler kafkaHandler = new KafkaHandler(mockSdcKafkaConsumer, mockSdcKafkaProducer, true);
+ INotificationData testData = new NotificationDataImpl();
+ CambriaErrorResponse response = kafkaHandler.sendNotification(testTopic, testData);
+ assertEquals(response.getOperationStatus(), CambriaOperationStatus.OK);
+ assertEquals(response.getHttpCode(), 200);
+ }
+
+ @Test
+ public void testSendNotificationKafkaException(){
+ String testTopic = "testTopic";
+ KafkaHandler kafkaHandler = new KafkaHandler(mockSdcKafkaConsumer, mockSdcKafkaProducer, true);
+ INotificationData testData = new NotificationDataImpl();
+ doThrow(KafkaException.class).when(mockSdcKafkaProducer).send(any(), any());
+ CambriaErrorResponse response = kafkaHandler.sendNotification(testTopic, testData);
+ assertEquals(response.getOperationStatus(), CambriaOperationStatus.INTERNAL_SERVER_ERROR);
+ assertEquals(response.getHttpCode(), 500);
+ }
+
+ @Test
+ public void testSendNotificationJsonSyntaxException(){
+ String testTopic = "testTopic";
+ KafkaHandler kafkaHandler = new KafkaHandler(mockSdcKafkaConsumer, mockSdcKafkaProducer, true);
+ INotificationData testData = new NotificationDataImpl();
+ doThrow(JsonSyntaxException.class).when(mockSdcKafkaProducer).send(any(), any());
+ CambriaErrorResponse response = kafkaHandler.sendNotification(testTopic, testData);
+ assertEquals(response.getOperationStatus(), CambriaOperationStatus.INTERNAL_SERVER_ERROR);
+ assertEquals(response.getHttpCode(), 500);
+ }
+
+ @Test
+ public void testSendNotificationFlushException(){
+ String testTopic = "testTopic";
+ KafkaHandler kafkaHandler = new KafkaHandler(mockSdcKafkaConsumer, mockSdcKafkaProducer, true);
+ INotificationData testData = new NotificationDataImpl();
+ doThrow(KafkaException.class).when(mockSdcKafkaProducer).flush();
+ CambriaErrorResponse response = kafkaHandler.sendNotification(testTopic, testData);
+ assertEquals(response.getOperationStatus(), CambriaOperationStatus.INTERNAL_SERVER_ERROR);
+ assertEquals(response.getHttpCode(), 500);
+ }
+}
diff --git a/catalog-be/src/test/java/org/openecomp/sdc/be/components/kafka/SdcKafkaConsumerTest.java b/catalog-be/src/test/java/org/openecomp/sdc/be/components/kafka/SdcKafkaConsumerTest.java
new file mode 100644
index 0000000000..0a4a834fa4
--- /dev/null
+++ b/catalog-be/src/test/java/org/openecomp/sdc/be/components/kafka/SdcKafkaConsumerTest.java
@@ -0,0 +1,143 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * SDC
+ * ================================================================================
+ * Copyright (C) 2022 Nordix Foundation. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.openecomp.sdc.be.components.kafka;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.apache.kafka.common.KafkaException;
+import org.junit.jupiter.api.Test;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.when;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+import java.util.Collections;
+import java.util.Collection;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.jetbrains.annotations.NotNull;
+
+import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
+
+public class SdcKafkaConsumerTest {
+
+ @Test
+ public void TestSubscribeSuccess(){
+ KafkaConsumer<byte[], byte[]> mockKafkaConsumer = Mockito.mock(KafkaConsumer.class);
+ SdcKafkaConsumer sdcKafkaConsumer = new SdcKafkaConsumer(mockKafkaConsumer, null);
+ ArgumentCaptor<Collections> captor = ArgumentCaptor.forClass(Collections.class);
+
+ String testTopics = "testTopic";
+ sdcKafkaConsumer.subscribe(testTopics);
+ verify(mockKafkaConsumer).subscribe((Collection<String>) captor.capture());
+ }
+
+ @Test
+ public void TestSubscribeAlreadySubscribed(){
+ KafkaConsumer<byte[], byte[]> mockKafkaConsumer = Mockito.mock(KafkaConsumer.class);
+ SdcKafkaConsumer sdcKafkaConsumer = new SdcKafkaConsumer(mockKafkaConsumer, null);
+ ArgumentCaptor<Collections> captor = ArgumentCaptor.forClass(Collections.class);
+
+
+ String testTopics = "testTopic";
+ Set<String> currentSubs = new HashSet<String>();
+ currentSubs.add(testTopics);
+ when(mockKafkaConsumer.subscription()).thenReturn(currentSubs);
+ sdcKafkaConsumer.subscribe(testTopics);
+ verify(mockKafkaConsumer, never()).subscribe((Collection<String>) captor.capture());
+ }
+
+ @Test
+ public void TestPollForMessagesForSpecificTopicSuccess(){
+ KafkaConsumer<byte[], byte[]> mockKafkaConsumer = Mockito.mock(KafkaConsumer.class);
+
+
+ String testTopic = "testTopic";
+
+ ConsumerRecords mockedPollResult = getTestConsumerRecords(testTopic);
+
+ when(mockKafkaConsumer.poll(any())).thenReturn(mockedPollResult);
+
+ DistributionEngineConfiguration config = getMockDistributionEngineConfiguration();
+
+ SdcKafkaConsumer sdcKafkaConsumer = new SdcKafkaConsumer(mockKafkaConsumer, config);
+
+ List<String> returned = sdcKafkaConsumer.poll(testTopic);
+ assertTrue(returned.size()==1);
+ assertTrue(returned.contains("testTopicValue"));
+ }
+
+ @Test
+ public void testSaslJaasConfigNotFound(){
+ assertThrows(
+ KafkaException.class,
+ () -> new SdcKafkaConsumer(setTestDistributionEngineConfigs()),
+ "Sasl Jaas Config should not be found, so expected a KafkaException"
+ );
+ }
+
+ @NotNull
+ private DistributionEngineConfiguration getMockDistributionEngineConfiguration() {
+ DistributionEngineConfiguration config = new DistributionEngineConfiguration();
+ DistributionEngineConfiguration.DistributionStatusTopicConfig mockStatusTopic = new DistributionEngineConfiguration.DistributionStatusTopicConfig();
+ mockStatusTopic.setPollingIntervalSec(1);
+ config.setDistributionStatusTopic(mockStatusTopic);
+ return config;
+ }
+
+ @NotNull
+ private ConsumerRecords getTestConsumerRecords(String testTopics) {
+ Map map = new HashMap<Integer, ConsumerRecord>();
+
+ ConsumerRecord consumerRecord = new ConsumerRecord(testTopics, 0, 0, "", "testTopicValue");
+
+ List<ConsumerRecord> consumerRecordList = new ArrayList<>();
+ consumerRecordList.add(consumerRecord);
+ TopicPartition topicPartition = new TopicPartition(testTopics, 0);
+ map.put(topicPartition, consumerRecordList);
+
+ ConsumerRecords mockedPollResult = new ConsumerRecords(map);
+ return mockedPollResult;
+ }
+
+ private DistributionEngineConfiguration setTestDistributionEngineConfigs(){
+ DistributionEngineConfiguration.DistributionStatusTopicConfig dsTopic = new DistributionEngineConfiguration.DistributionStatusTopicConfig();
+ DistributionEngineConfiguration deConfiguration = new DistributionEngineConfiguration();
+ String testBootstrapServers = "TestBootstrapServer";
+ dsTopic.setConsumerGroup("consumerGroup");
+ dsTopic.setConsumerId("consumerId");
+
+ deConfiguration.setKafkaBootStrapServers(testBootstrapServers);
+ deConfiguration.setDistributionStatusTopic(dsTopic);
+ return deConfiguration;
+ }
+}
diff --git a/catalog-be/src/test/java/org/openecomp/sdc/be/components/kafka/SdcKafkaProducerTest.java b/catalog-be/src/test/java/org/openecomp/sdc/be/components/kafka/SdcKafkaProducerTest.java
new file mode 100644
index 0000000000..23322cce5a
--- /dev/null
+++ b/catalog-be/src/test/java/org/openecomp/sdc/be/components/kafka/SdcKafkaProducerTest.java
@@ -0,0 +1,94 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * SDC
+ * ================================================================================
+ * Copyright (C) 2022 Nordix Foundation. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.openecomp.sdc.be.components.kafka;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import org.junit.jupiter.api.Test;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.common.KafkaException;
+
+import org.openecomp.sdc.be.catalog.api.IStatus;
+import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
+
+public class SdcKafkaProducerTest {
+
+ @Test
+ public void TestSendSuccess(){
+ KafkaProducer<byte[], byte[]> mockKafkaProducer = Mockito.mock(KafkaProducer.class);
+ SdcKafkaProducer sdcKafkaProducer = new SdcKafkaProducer(mockKafkaProducer);
+ ArgumentCaptor<ProducerRecord> captor = ArgumentCaptor.forClass(ProducerRecord.class);
+ sdcKafkaProducer.send("testMessage", "testTopic");
+
+
+ verify(mockKafkaProducer).send(captor.capture());
+ }
+
+ @Test
+ public void testFlushSuccess(){
+ KafkaProducer<byte[], byte[]> mockKafkaProducer = Mockito.mock(KafkaProducer.class);
+ SdcKafkaProducer sdcKafkaProducer = new SdcKafkaProducer(mockKafkaProducer);
+ sdcKafkaProducer.flush();
+
+ verify(mockKafkaProducer).flush();
+ }
+
+ @Test
+ public void testSendFail(){
+ KafkaProducer<byte[], byte[]> mockKafkaProducer = Mockito.mock(KafkaProducer.class);
+ SdcKafkaProducer sdcKafkaProducer = new SdcKafkaProducer(mockKafkaProducer);
+
+ when(mockKafkaProducer.send(any())).thenThrow(new KafkaException());
+
+ assertThrows(
+ KafkaException.class,
+ () -> sdcKafkaProducer.send("testMessage", "testTopic"),
+ "Expected a KafkaException thrown on KafkaProducer Send");
+ }
+
+ @Test
+ public void testSaslJaasConfigNotFound(){
+ assertThrows(
+ KafkaException.class,
+ () -> new SdcKafkaProducer(setTestDistributionEngineConfigs()),
+ "Sasl Jaas Config should not be found, so expected a KafkaException"
+ );
+ }
+
+ private DistributionEngineConfiguration setTestDistributionEngineConfigs(){
+ DistributionEngineConfiguration.DistributionStatusTopicConfig dStatusTopicConfig = new DistributionEngineConfiguration.DistributionStatusTopicConfig();
+ DistributionEngineConfiguration deConfiguration = new DistributionEngineConfiguration();
+ deConfiguration.setKafkaBootStrapServers("TestBootstrapServer");
+ dStatusTopicConfig.setConsumerId("consumerId");
+
+ deConfiguration.setDistributionStatusTopic(dStatusTopicConfig);
+ deConfiguration.getDistributionStatusTopic().getConsumerId();
+ return deConfiguration;
+ }
+}