aboutsummaryrefslogtreecommitdiffstats
path: root/catalog-be
diff options
context:
space:
mode:
authordavid.mcweeney <david.mcweeney@est.tech>2022-10-04 15:46:14 +0100
committerMichael Morris <michael.morris@est.tech>2022-10-25 11:24:02 +0000
commit47f96dd966663f7f46b719451c0752721a2940a3 (patch)
tree9d875ce43f96cf3e570cc812d907fa2edd3b7945 /catalog-be
parent0d2e96c125aab4e4edfc0a8b897353c0fabdd885 (diff)
[SDC] Add kafka native messaging
Change-Id: I5ab8f580947cbc264d94bec48a5e8b659dc44c08 Issue-ID: DMAAP-1787 Signed-off-by: david.mcweeney <david.mcweeney@est.tech>
Diffstat (limited to 'catalog-be')
-rw-r--r--catalog-be/pom.xml4
-rw-r--r--catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTask.java73
-rw-r--r--catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEnginePollingTask.java27
-rw-r--r--catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionNotificationSender.java23
-rw-r--r--catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/KafkaHandler.java138
-rw-r--r--catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaConsumer.java117
-rw-r--r--catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaProducer.java98
-rw-r--r--catalog-be/src/test/java/org/openecomp/sdc/be/components/distribution/engine/CambriaHandlerTest.java32
-rw-r--r--catalog-be/src/test/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTaskTest.java44
-rw-r--r--catalog-be/src/test/java/org/openecomp/sdc/be/components/kafka/KafkaHandlerTest.java138
-rw-r--r--catalog-be/src/test/java/org/openecomp/sdc/be/components/kafka/SdcKafkaConsumerTest.java143
-rw-r--r--catalog-be/src/test/java/org/openecomp/sdc/be/components/kafka/SdcKafkaProducerTest.java94
12 files changed, 857 insertions, 74 deletions
diff --git a/catalog-be/pom.xml b/catalog-be/pom.xml
index 7b060c9bfa..edd673c54c 100644
--- a/catalog-be/pom.xml
+++ b/catalog-be/pom.xml
@@ -1043,6 +1043,10 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ </dependency>
</dependencies>
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTask.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTask.java
index d61e15016a..00d3fedfc8 100644
--- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTask.java
+++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTask.java
@@ -20,6 +20,7 @@
package org.openecomp.sdc.be.components.distribution.engine;
import fj.data.Either;
+import java.util.ArrayList;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Executors;
@@ -27,7 +28,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.stream.Collectors;
+import org.openecomp.sdc.be.components.kafka.KafkaHandler;
import org.openecomp.sdc.be.config.BeEcompErrorManager;
import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus;
@@ -60,6 +61,7 @@ public class DistributionEngineInitTask implements Runnable {
private AtomicBoolean status = null;
private OperationalEnvironmentEntry environmentEntry;
private CambriaHandler cambriaHandler = new CambriaHandler();
+ private KafkaHandler kafkaHandler = new KafkaHandler();
private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
public DistributionEngineInitTask(Long delayBeforeStartFlow, DistributionEngineConfiguration deConfiguration, String envName,
@@ -136,9 +138,7 @@ public class DistributionEngineInitTask implements Runnable {
@Override
public void run() {
- boolean result = false;
- result = initFlow();
- if (result) {
+ if (initFlow()) {
this.stopTask();
this.status.set(true);
if (this.distributionEnginePollingTask != null) {
@@ -159,38 +159,45 @@ public class DistributionEngineInitTask implements Runnable {
* @return
*/
public boolean initFlow() {
- logger.trace("Start init flow for environment {}", this.envName);
- Set<String> topicsList = null;
- Either<Set<String>, CambriaErrorResponse> getTopicsRes = null;
- getTopicsRes = cambriaHandler.getTopics(environmentEntry.getDmaapUebAddress().stream().collect(Collectors.toList()));
- if (getTopicsRes.isRight()) {
- CambriaErrorResponse status = getTopicsRes.right().value();
- if (status.getOperationStatus() == CambriaOperationStatus.NOT_FOUND) {
- topicsList = new HashSet<>();
+ logger.info("Start init flow for environment {}", this.envName);
+ if (Boolean.FALSE.equals(kafkaHandler.isKafkaActive())) {
+ Set<String> topicsList;
+ Either<Set<String>, CambriaErrorResponse> getTopicsRes;
+ getTopicsRes = cambriaHandler.getTopics(new ArrayList<>(environmentEntry.getDmaapUebAddress()));
+ if (getTopicsRes.isRight()) {
+ CambriaErrorResponse cambriaErrorResponse = getTopicsRes.right().value();
+ if (cambriaErrorResponse.getOperationStatus() == CambriaOperationStatus.NOT_FOUND) {
+ topicsList = new HashSet<>();
+ } else {
+ BeEcompErrorManager.getInstance().logBeUebSystemError(INIT_DISTRIBUTION_ENGINE_FLOW,
+ "try retrieve list of topics from U-EB server");
+ return false;
+ }
} else {
- BeEcompErrorManager.getInstance().logBeUebSystemError(INIT_DISTRIBUTION_ENGINE_FLOW, "try retrieve list of topics from U-EB server");
+ topicsList = getTopicsRes.left().value();
+ }
+ String notificationTopic = buildTopicName(deConfiguration.getDistributionNotifTopicName(), this.envName);
+ logger.debug("Going to handle topic {}", notificationTopic);
+ if (!createNotificationTopicIfNotExists(topicsList, notificationTopic)) {
+ return false;
+ }
+ CambriaErrorResponse registerProducerStatus = registerToTopic(notificationTopic,
+ SubscriberTypeEnum.PRODUCER);
+ CambriaOperationStatus createStatus = registerProducerStatus.getOperationStatus();
+ if (createStatus != CambriaOperationStatus.OK) {
+ return false;
+ }
+ String statusTopic = buildTopicName(deConfiguration.getDistributionStatusTopicName(), this.envName);
+ logger.debug("Going to handle topic {}", statusTopic);
+ if (!createStatusTopicIfNotExists(topicsList, statusTopic)) {
return false;
}
+ CambriaErrorResponse registerConcumerStatus = registerToTopic(statusTopic, SubscriberTypeEnum.CONSUMER);
+ return registerConcumerStatus.getOperationStatus() == CambriaOperationStatus.OK;
} else {
- topicsList = getTopicsRes.left().value();
+ logger.info("Skipping DisributionEngineInitTask flow to use kafka native for distribution messaging");
+ return true;
}
- String notificationTopic = buildTopicName(deConfiguration.getDistributionNotifTopicName(), this.envName);
- logger.debug("Going to handle topic {}", notificationTopic);
- if (!createNotificationTopicIfNotExists(topicsList, notificationTopic)) {
- return false;
- }
- CambriaErrorResponse registerProducerStatus = registerToTopic(notificationTopic, SubscriberTypeEnum.PRODUCER);
- CambriaOperationStatus createStatus = registerProducerStatus.getOperationStatus();
- if (createStatus != CambriaOperationStatus.OK) {
- return false;
- }
- String statusTopic = buildTopicName(deConfiguration.getDistributionStatusTopicName(), this.envName);
- logger.debug("Going to handle topic {}", statusTopic);
- if (!createStatusTopicIfNotExists(topicsList, statusTopic)) {
- return false;
- }
- CambriaErrorResponse registerConcumerStatus = registerToTopic(statusTopic, SubscriberTypeEnum.CONSUMER);
- return registerConcumerStatus.getOperationStatus() == CambriaOperationStatus.OK;
}
private CambriaErrorResponse registerToTopic(String topicName, SubscriberTypeEnum subscriberType) {
@@ -281,4 +288,8 @@ public class DistributionEngineInitTask implements Runnable {
protected void setCambriaHandler(CambriaHandler cambriaHandler) {
this.cambriaHandler = cambriaHandler;
}
+
+ protected void setKafkaHandler(KafkaHandler kafkaHandler) {
+ this.kafkaHandler = kafkaHandler;
+ }
}
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEnginePollingTask.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEnginePollingTask.java
index 124671086f..ab4400a5bf 100644
--- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEnginePollingTask.java
+++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEnginePollingTask.java
@@ -29,6 +29,7 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.openecomp.sdc.be.components.distribution.engine.report.DistributionCompleteReporter;
+import org.openecomp.sdc.be.components.kafka.KafkaHandler;
import org.openecomp.sdc.be.config.BeEcompErrorManager;
import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
import org.openecomp.sdc.be.config.DistributionEngineConfiguration.DistributionStatusTopicConfig;
@@ -51,6 +52,7 @@ public class DistributionEnginePollingTask implements Runnable {
private String consumerId;
private String consumerGroup;
private CambriaHandler cambriaHandler = new CambriaHandler();
+ private final KafkaHandler kafkaHandler = new KafkaHandler();
private Gson gson = new GsonBuilder().setPrettyPrinting().create();
private DistributionCompleteReporter distributionCompleteReporter;
private ScheduledExecutorService scheduledPollingService = Executors
@@ -82,9 +84,12 @@ public class DistributionEnginePollingTask implements Runnable {
fetchTimeoutInSec = 15;
}
try {
- cambriaConsumer = cambriaHandler
- .createConsumer(environmentEntry.getDmaapUebAddress(), topicName, environmentEntry.getUebApikey(), environmentEntry.getUebSecretKey(),
- consumerId, consumerGroup, fetchTimeoutInSec * 1000);
+ if (Boolean.FALSE.equals(kafkaHandler.isKafkaActive())) {
+ cambriaConsumer = cambriaHandler
+ .createConsumer(environmentEntry.getDmaapUebAddress(), topicName, environmentEntry.getUebApikey(),
+ environmentEntry.getUebSecretKey(),
+ consumerId, consumerGroup, fetchTimeoutInSec * 1000);
+ }
if (scheduledPollingService != null) {
logger.debug("Start Distribution Engine polling task. polling interval {} seconds", pollingIntervalInSec);
scheduledFuture = scheduledPollingService.scheduleAtFixedRate(this, 0, pollingIntervalInSec, TimeUnit.SECONDS);
@@ -119,14 +124,20 @@ public class DistributionEnginePollingTask implements Runnable {
@Override
public void run() {
logger.trace("run() method. polling queue {}", topicName);
+ Either<Iterable<String>, CambriaErrorResponse> fetchResult;
try {
// init error
- if (cambriaConsumer == null) {
- BeEcompErrorManager.getInstance().logBeUebSystemError(DISTRIBUTION_STATUS_POLLING, "polling task was not initialized properly");
- stopTask();
- return;
+ if (Boolean.FALSE.equals(kafkaHandler.isKafkaActive())) {
+ if (cambriaConsumer == null) {
+ BeEcompErrorManager.getInstance()
+ .logBeUebSystemError(DISTRIBUTION_STATUS_POLLING, "polling task was not initialized properly");
+ stopTask();
+ return;
+ }
+ fetchResult = cambriaHandler.fetchFromTopic(cambriaConsumer);
+ } else {
+ fetchResult = kafkaHandler.fetchFromTopic(topicName);
}
- Either<Iterable<String>, CambriaErrorResponse> fetchResult = cambriaHandler.fetchFromTopic(cambriaConsumer);
// fetch error
if (fetchResult.isRight()) {
CambriaErrorResponse errorResponse = fetchResult.right().value();
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionNotificationSender.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionNotificationSender.java
index 0098eac7d9..b93d485bdb 100644
--- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionNotificationSender.java
+++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionNotificationSender.java
@@ -19,6 +19,7 @@
*/
package org.openecomp.sdc.be.components.distribution.engine;
+import org.openecomp.sdc.be.components.kafka.KafkaHandler;
import org.openecomp.sdc.be.config.ConfigurationManager;
import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
import org.openecomp.sdc.be.dao.api.ActionStatus;
@@ -36,16 +37,26 @@ public class DistributionNotificationSender {
private static final Logger logger = Logger.getLogger(DistributionNotificationSender.class.getName());
@javax.annotation.Resource
protected ComponentsUtils componentUtils;
- private CambriaHandler cambriaHandler = new CambriaHandler();
- private DistributionEngineConfiguration deConfiguration = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration();
+ private final CambriaHandler cambriaHandler = new CambriaHandler();
+
+ private final KafkaHandler kafkaHandler = new KafkaHandler();
+
+//
+ private final DistributionEngineConfiguration deConfiguration = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration();
public ActionStatus sendNotification(String topicName, String distributionId, EnvironmentMessageBusData messageBusData,
INotificationData notificationData, Service service, User modifier) {
long startTime = System.currentTimeMillis();
- CambriaErrorResponse status = cambriaHandler
- .sendNotificationAndClose(topicName, messageBusData.getUebPublicKey(), messageBusData.getUebPrivateKey(),
- messageBusData.getDmaaPuebEndpoints(), notificationData,
- deConfiguration.getDistributionNotificationTopic().getMaxWaitingAfterSendingSeconds());
+ CambriaErrorResponse status;
+ if (Boolean.FALSE.equals(kafkaHandler.isKafkaActive())) {
+ status = cambriaHandler
+ .sendNotificationAndClose(topicName, messageBusData.getUebPublicKey(), messageBusData.getUebPrivateKey(),
+ messageBusData.getDmaaPuebEndpoints(), notificationData,
+ deConfiguration.getDistributionNotificationTopic().getMaxWaitingAfterSendingSeconds());
+ }
+ else{
+ status = kafkaHandler.sendNotification(topicName, notificationData);
+ }
logger.info("After publishing service {} of version {}. Status is {}", service.getName(), service.getVersion(), status.getHttpCode());
auditDistributionNotification(
new AuditDistributionNotificationBuilder().setTopicName(topicName).setDistributionId(distributionId).setStatus(status).setService(service)
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/KafkaHandler.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/KafkaHandler.java
new file mode 100644
index 0000000000..2a5590e72d
--- /dev/null
+++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/KafkaHandler.java
@@ -0,0 +1,138 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * SDC
+ * ================================================================================
+ * Copyright (C) 2022 Nordix Foundation. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.openecomp.sdc.be.components.kafka;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonSyntaxException;
+import fj.data.Either;
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.http.HttpStatus;
+import org.apache.kafka.common.KafkaException;
+import org.openecomp.sdc.be.components.distribution.engine.CambriaErrorResponse;
+import org.openecomp.sdc.be.components.distribution.engine.INotificationData;
+import org.openecomp.sdc.be.config.BeEcompErrorManager;
+import org.openecomp.sdc.be.config.ConfigurationManager;
+import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
+import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus;
+import org.openecomp.sdc.common.log.wrappers.Logger;
+import org.springframework.stereotype.Component;
+
+/**
+ * Utility class that provides a handler for Kafka interactions
+ */
+@Component
+public class KafkaHandler {
+
+ private static final Logger log = Logger.getLogger(KafkaHandler.class.getName());
+ private final Gson gson = new Gson();
+
+ private SdcKafkaConsumer sdcKafkaConsumer;
+
+ private SdcKafkaProducer sdcKafkaProducer;
+
+ @Setter
+ private boolean isKafkaActive;
+
+ private DistributionEngineConfiguration deConfiguration;
+
+ public KafkaHandler(SdcKafkaConsumer sdcKafkaConsumer, SdcKafkaProducer sdcKafkaProducer, boolean isKafkaActive) {
+ this.sdcKafkaConsumer = sdcKafkaConsumer;
+ this.sdcKafkaProducer = sdcKafkaProducer;
+ this.isKafkaActive = isKafkaActive;
+ }
+
+ public KafkaHandler() {
+ isKafkaActive = Boolean.parseBoolean(System.getenv().getOrDefault("USE_KAFKA", "false"));
+ deConfiguration = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration();
+ }
+
+ /**
+ * @return a user configuration whether Kafka is active for this client
+ */
+ public Boolean isKafkaActive() {
+ return isKafkaActive;
+ }
+
+ /**
+ * @param topicName The topic from which messages will be fetched
+ * @return Either A list of messages from a specific topic, or a specific error response
+ */
+ public Either<Iterable<String>, CambriaErrorResponse> fetchFromTopic(String topicName) {
+ try {
+ if(sdcKafkaConsumer == null){
+ sdcKafkaConsumer = new SdcKafkaConsumer(deConfiguration);
+ }
+ sdcKafkaConsumer.subscribe(topicName);
+ Iterable<String> messages = sdcKafkaConsumer.poll(topicName);
+ log.info("Returning messages from topic {}", topicName);
+ return Either.left(messages);
+ } catch (KafkaException e) {
+ BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError("fetchFromTopic", e.getMessage());
+ log.error("Failed to fetch from kafka for topic: {}", topicName, e);
+ CambriaErrorResponse cambriaErrorResponse =
+ new CambriaErrorResponse(CambriaOperationStatus.INTERNAL_SERVER_ERROR,
+ HttpStatus.SC_INTERNAL_SERVER_ERROR);
+ return Either.right(cambriaErrorResponse);
+ }
+ }
+
+ /**
+ * Publish notification message to a given topic and flush
+ *
+ * @param topicName The topic to which the message should be published
+ * @param data The data to publish to the topic specified
+ * @return CambriaErrorResponse a status response on success or any errors thrown
+ */
+ public CambriaErrorResponse sendNotification(String topicName, INotificationData data) {
+ CambriaErrorResponse response;
+ if(sdcKafkaProducer == null){
+ sdcKafkaProducer = new SdcKafkaProducer(deConfiguration);
+ }
+ try {
+ String json = gson.toJson(data);
+ log.info("Before sending to topic {}", topicName);
+ sdcKafkaProducer.send(json, topicName);
+ }
+ catch(KafkaException e){
+ BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError("sendNotification", e.getMessage());
+ log.error("Failed to send message . Exception {}", e.getMessage());
+
+ return new CambriaErrorResponse(CambriaOperationStatus.INTERNAL_SERVER_ERROR, 500);
+ } catch (JsonSyntaxException e) {
+ BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError("sendNotification", e.getMessage());
+ log.error("Failed to convert data to json: {}", data, e);
+
+ return new CambriaErrorResponse(CambriaOperationStatus.INTERNAL_SERVER_ERROR, 500);
+ } finally {
+ try {
+ sdcKafkaProducer.flush();
+ response = new CambriaErrorResponse(CambriaOperationStatus.OK, 200);
+ } catch (KafkaException | IllegalArgumentException e) {
+ BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError("sendNotification", e.getMessage());
+ log.error("Failed to flush sdcKafkaProducer", e);
+
+ response = new CambriaErrorResponse(CambriaOperationStatus.INTERNAL_SERVER_ERROR, 500);
+ }
+ }
+
+ return response;
+ }
+}
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaConsumer.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaConsumer.java
new file mode 100644
index 0000000000..8879bf000e
--- /dev/null
+++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaConsumer.java
@@ -0,0 +1,117 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * SDC
+ * ================================================================================
+ * Copyright (C) 2022 Nordix Foundation. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.openecomp.sdc.be.components.kafka;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
+import org.openecomp.sdc.common.log.wrappers.Logger;
+
+/**
+ * Utility class that provides a KafkaConsumer to communicate with a kafka cluster
+ */
+public class SdcKafkaConsumer {
+
+ private static final Logger log = Logger.getLogger(SdcKafkaConsumer.class.getName());
+ private final DistributionEngineConfiguration deConfiguration;
+ private KafkaConsumer<String, String> kafkaConsumer;
+
+ /**
+ * Constructor setting up the KafkaConsumer from a predefined set of configurations
+ */
+ public SdcKafkaConsumer(DistributionEngineConfiguration deConfiguration){
+ log.info("Create SdcKafkaConsumer via constructor");
+ Properties properties = new Properties();
+ this.deConfiguration = deConfiguration;
+
+ properties.put(ConsumerConfig.CLIENT_ID_CONFIG, deConfiguration.getDistributionStatusTopic().getConsumerId()+ "-consumer-" + UUID.randomUUID());
+ properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
+ properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
+ properties.put(ConsumerConfig.GROUP_ID_CONFIG, deConfiguration.getDistributionStatusTopic().getConsumerGroup());
+ properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, deConfiguration.getKafkaBootStrapServers());
+ properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
+ properties.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, false);
+ properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
+ properties.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512");
+
+ properties.put(SaslConfigs.SASL_JAAS_CONFIG, getKafkaSaslJaasConfig());
+ kafkaConsumer = new KafkaConsumer<>(properties);
+ }
+
+ /**
+ *
+ * @param kafkaConsumer a kafkaConsumer to use within the class
+ * @param deConfiguration - Configuration to pass into the class
+ */
+ @VisibleForTesting
+ SdcKafkaConsumer(KafkaConsumer kafkaConsumer, DistributionEngineConfiguration deConfiguration){
+ this.deConfiguration = deConfiguration;
+ this.kafkaConsumer = kafkaConsumer;
+ }
+
+ /**
+ *
+ * @return the Sasl Jass Config
+ */
+ private String getKafkaSaslJaasConfig() {
+ String saslJaasConfFromEnv = System.getenv("SASL_JAAS_CONFIG");
+ if(saslJaasConfFromEnv != null) {
+ return saslJaasConfFromEnv;
+ } else {
+ throw new KafkaException("sasl.jaas.config not set for Kafka Consumer");
+ }
+ }
+
+ /**
+ *
+ * @param topic Topic in which to subscribe
+ */
+ public void subscribe(String topic) throws KafkaException {
+ if (!kafkaConsumer.subscription().contains(topic)) {
+ kafkaConsumer.subscribe(Collections.singleton(topic));
+ }
+ }
+
+ /**
+ *
+ * @return The list of messages for a specified topic, returned from the poll
+ */
+ public List<String> poll(String topicName) throws KafkaException {
+ log.info("SdcKafkaConsumer - polling for messages from Topic: {}", topicName);
+ List<String> msgs = new ArrayList<>();
+ ConsumerRecords<String, String> consumerRecordsForSpecificTopic = kafkaConsumer.poll(Duration.ofSeconds(deConfiguration.getDistributionStatusTopic().getPollingIntervalSec()));
+ for(ConsumerRecord<String, String> rec : consumerRecordsForSpecificTopic){
+ msgs.add(rec.value());
+ }
+ return msgs;
+ }
+}
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaProducer.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaProducer.java
new file mode 100644
index 0000000000..bdc984d7b5
--- /dev/null
+++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaProducer.java
@@ -0,0 +1,98 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * SDC
+ * ================================================================================
+ * Copyright (C) 2022 Nordix Foundation. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.openecomp.sdc.be.components.kafka;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Properties;
+import java.util.UUID;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class that provides a KafkaProducer to communicate with a kafka cluster
+ */
+public class SdcKafkaProducer {
+ private static final Logger log = LoggerFactory.getLogger(SdcKafkaProducer.class.getName());
+
+ private KafkaProducer<String, String> kafkaProducer;
+
+ /**
+ * Constructor setting up the KafkaProducer from a predefined set of configurations
+ */
+ public SdcKafkaProducer(DistributionEngineConfiguration deConfiguration) {
+ log.info("Create SdcKafkaProducer via constructor");
+ Properties properties = new Properties();
+
+ properties.put(ProducerConfig.CLIENT_ID_CONFIG, deConfiguration.getDistributionStatusTopic().getConsumerId() + "-producer-" + UUID.randomUUID());
+ properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
+ properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
+ properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, deConfiguration.getKafkaBootStrapServers());
+ properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
+ properties.put(SaslConfigs.SASL_JAAS_CONFIG, getKafkaSaslJaasConfig());
+ properties.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512");
+ kafkaProducer = new KafkaProducer<>(properties);
+ }
+
+ /**
+ *
+ * @param kafkaProducer Setting a KafkaProducer to use within the sdcKafkaProducer class
+ */
+ @VisibleForTesting
+ SdcKafkaProducer(KafkaProducer kafkaProducer) {
+ this.kafkaProducer = kafkaProducer;
+ }
+
+ /**
+ * @return The Sasl Jaas Configuration
+ */
+ private static String getKafkaSaslJaasConfig() throws KafkaException {
+ String saslJaasConfFromEnv = System.getenv("SASL_JAAS_CONFIG");
+ if(saslJaasConfFromEnv != null) {
+ return saslJaasConfFromEnv;
+ } else {
+ throw new KafkaException("sasl.jaas.config not set for Kafka Consumer");
+ }
+ }
+
+ /**
+ * @param message A message to Send
+ * @param topicName The name of the topic to publish to
+ * @return The status of the send request
+ */
+ public void send(String message, String topicName) throws KafkaException {
+ ProducerRecord<String, String> kafkaMessagePayload = new ProducerRecord<>(topicName, "PartitionKey", message);
+ kafkaProducer.send(kafkaMessagePayload);
+ }
+
+ /**
+ * Kafka FLush operation
+ */
+ public void flush() throws KafkaException {
+ log.info("SdcKafkaProducer - flush");
+ kafkaProducer.flush();
+ }
+}
diff --git a/catalog-be/src/test/java/org/openecomp/sdc/be/components/distribution/engine/CambriaHandlerTest.java b/catalog-be/src/test/java/org/openecomp/sdc/be/components/distribution/engine/CambriaHandlerTest.java
index 980bb8369a..a91b246f40 100644
--- a/catalog-be/src/test/java/org/openecomp/sdc/be/components/distribution/engine/CambriaHandlerTest.java
+++ b/catalog-be/src/test/java/org/openecomp/sdc/be/components/distribution/engine/CambriaHandlerTest.java
@@ -7,9 +7,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,6 +20,11 @@
package org.openecomp.sdc.be.components.distribution.engine;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
+
import com.att.nsa.apiClient.credentials.ApiCredential;
import com.att.nsa.apiClient.http.HttpException;
import com.att.nsa.cambria.client.CambriaClient;
@@ -29,6 +34,14 @@ import com.att.nsa.cambria.client.CambriaClientBuilders.TopicManagerBuilder;
import com.att.nsa.cambria.client.CambriaConsumer;
import com.att.nsa.cambria.client.CambriaIdentityManager;
import fj.data.Either;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.security.GeneralSecurityException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
import mockit.Deencapsulation;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -45,20 +58,6 @@ import org.openecomp.sdc.common.api.ConfigurationSource;
import org.openecomp.sdc.common.impl.ExternalConfiguration;
import org.openecomp.sdc.common.impl.FSConfigurationSource;
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.security.GeneralSecurityException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doReturn;
-
@RunWith(MockitoJUnitRunner.class)
public class CambriaHandlerTest extends BeConfDependentTest {
@@ -141,6 +140,7 @@ public class CambriaHandlerTest extends BeConfDependentTest {
@Test
public void testGetTopics() throws Exception {
+
CambriaHandler testSubject;
List<String> hostSet = new LinkedList<>();
hostSet.add("mock");
diff --git a/catalog-be/src/test/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTaskTest.java b/catalog-be/src/test/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTaskTest.java
index d53476db71..9c1af39d9c 100644
--- a/catalog-be/src/test/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTaskTest.java
+++ b/catalog-be/src/test/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTaskTest.java
@@ -7,9 +7,9 @@
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -25,6 +25,7 @@ import org.apache.commons.collections.CollectionUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
+import org.openecomp.sdc.be.components.kafka.KafkaHandler;
import org.openecomp.sdc.be.config.ConfigurationManager;
import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
import org.openecomp.sdc.be.config.DistributionEngineConfiguration.CreateTopicConfig;
@@ -54,6 +55,8 @@ class DistributionEngineInitTaskTest {
private CambriaHandler cambriaHandler;
+ private KafkaHandler kafkaHandler;
+
@BeforeEach
public void setup() {
ExternalConfiguration.setAppName("catalog-be");
@@ -65,6 +68,7 @@ class DistributionEngineInitTaskTest {
componentsUtils = Mockito.mock(ComponentsUtils.class);
cambriaHandler = Mockito.mock(CambriaHandler.class);
+ kafkaHandler = Mockito.mock(KafkaHandler.class);
}
@Test
@@ -88,7 +92,7 @@ class DistributionEngineInitTaskTest {
assertEquals("check next retry interval reach max retry interval", initTask.getCurrentRetryInterval(), maxRetry);
}
-
+
@Test
void checkStartTask() {
@@ -100,10 +104,10 @@ class DistributionEngineInitTaskTest {
deConfiguration.setInitRetryIntervalSec(retry);
deConfiguration.setInitMaxIntervalSec(maxRetry);
DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, deConfiguration, envName, new AtomicBoolean(false), componentsUtils, null, readEnvFromConfig(deConfiguration));
-
+
initTask.startTask();
}
-
+
@Test
void checkRestartTask() {
@@ -115,10 +119,10 @@ class DistributionEngineInitTaskTest {
deConfiguration.setInitRetryIntervalSec(retry);
deConfiguration.setInitMaxIntervalSec(maxRetry);
DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, deConfiguration, envName, new AtomicBoolean(false), componentsUtils, null, readEnvFromConfig(deConfiguration));
-
+
initTask.restartTask();
}
-
+
@Test
void checkStopTask() {
@@ -130,12 +134,12 @@ class DistributionEngineInitTaskTest {
deConfiguration.setInitRetryIntervalSec(retry);
deConfiguration.setInitMaxIntervalSec(maxRetry);
DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, deConfiguration, envName, new AtomicBoolean(false), componentsUtils, null, readEnvFromConfig(deConfiguration));
-
+
initTask.stopTask();
initTask.startTask();
initTask.stopTask();
}
-
+
@Test
void checkDestroy() {
@@ -147,10 +151,10 @@ class DistributionEngineInitTaskTest {
deConfiguration.setInitRetryIntervalSec(retry);
deConfiguration.setInitMaxIntervalSec(maxRetry);
DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, deConfiguration, envName, new AtomicBoolean(false), componentsUtils, null, readEnvFromConfig(deConfiguration));
-
+
initTask.destroy();
}
-
+
@Test
void checkRun() {
@@ -193,10 +197,10 @@ class DistributionEngineInitTaskTest {
initTask.setCambriaHandler(cambriaHandler);
boolean initFlow = initTask.initFlow();
-
+
initTask.run();
}
-
+
@Test
void testInitFlowScenarioSuccess() {
@@ -244,6 +248,20 @@ class DistributionEngineInitTaskTest {
}
@Test
+ void testInitFlowSuccessKafkaEnabled(){
+ DistributionEngineConfiguration config = new DistributionEngineConfiguration();
+ config.setInitRetryIntervalSec(1);
+ config.setInitMaxIntervalSec(1);
+
+ when(kafkaHandler.isKafkaActive()).thenReturn(true);
+ DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, config, null, new AtomicBoolean(false), componentsUtils, null, null);
+ initTask.setKafkaHandler(kafkaHandler);
+
+ boolean initFlow = initTask.initFlow();
+ assertTrue("check init flow succeed", initFlow);
+ }
+
+ @Test
void testInitFlowScenarioSuccessTopicsAlreadyExists() {
String envName = "PrOD";
diff --git a/catalog-be/src/test/java/org/openecomp/sdc/be/components/kafka/KafkaHandlerTest.java b/catalog-be/src/test/java/org/openecomp/sdc/be/components/kafka/KafkaHandlerTest.java
new file mode 100644
index 0000000000..91ee0235ad
--- /dev/null
+++ b/catalog-be/src/test/java/org/openecomp/sdc/be/components/kafka/KafkaHandlerTest.java
@@ -0,0 +1,138 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * SDC
+ * ================================================================================
+ * Copyright (C) 2022 Nordix Foundation. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.openecomp.sdc.be.components.kafka;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.when;
+
+import com.google.gson.JsonSyntaxException;
+import org.apache.kafka.common.KafkaException;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+
+import java.util.ArrayList;
+import fj.data.Either;
+import java.util.List;
+
+import org.openecomp.sdc.be.components.distribution.engine.CambriaErrorResponse;
+import org.openecomp.sdc.be.components.distribution.engine.NotificationDataImpl;
+import org.openecomp.sdc.be.components.distribution.engine.INotificationData;
+import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus;
+
+
+@ExtendWith(MockitoExtension.class)
+public class KafkaHandlerTest {
+
+ @Mock
+ private SdcKafkaConsumer mockSdcKafkaConsumer;
+
+ @Mock
+ private SdcKafkaProducer mockSdcKafkaProducer;
+
+ private KafkaHandler kafkaHandler;
+
+ @Test
+ public void testIsKafkaActiveTrue(){
+ KafkaHandler kafkaHandler = new KafkaHandler(mockSdcKafkaConsumer, mockSdcKafkaProducer, true);
+ assertTrue(kafkaHandler.isKafkaActive());
+ }
+
+ @Test
+ public void testIsKafkaActiveFalse(){
+ KafkaHandler kafkaHandler = new KafkaHandler(mockSdcKafkaConsumer, mockSdcKafkaProducer, true);
+ kafkaHandler.setKafkaActive(false);
+ assertFalse(kafkaHandler.isKafkaActive());
+ }
+
+ @Test
+ public void testFetchFromTopicSuccess(){
+ String testTopic = "testTopic";
+ List<String> mockedReturnedMessages = new ArrayList<>();
+ mockedReturnedMessages.add("message1");
+ mockedReturnedMessages.add("message2");
+ KafkaHandler kafkaHandler = new KafkaHandler(mockSdcKafkaConsumer, mockSdcKafkaProducer, true);
+ when(mockSdcKafkaConsumer.poll(any())).thenReturn(mockedReturnedMessages);
+ Either<Iterable<String>, CambriaErrorResponse> response = kafkaHandler.fetchFromTopic(testTopic);
+ Iterable<String> actualReturnedMessages = response.left().value();
+ assertTrue(response.isLeft());
+ assertEquals(actualReturnedMessages, mockedReturnedMessages);
+ }
+
+ @Test
+ public void testFetchFromTopicFail(){
+ String testTopic = "testTopic";
+ KafkaHandler kafkaHandler = new KafkaHandler(mockSdcKafkaConsumer, mockSdcKafkaProducer, true);
+ when(mockSdcKafkaConsumer.poll(any())).thenThrow(new KafkaException());
+ Either<Iterable<String>, CambriaErrorResponse> response = kafkaHandler.fetchFromTopic(testTopic);
+ CambriaErrorResponse responseValue = response.right().value();
+ assertTrue(response.isRight());
+ assertEquals(responseValue.getOperationStatus(), CambriaOperationStatus.INTERNAL_SERVER_ERROR);
+ }
+
+ @Test
+ public void testSendNotificationSuccess(){
+ String testTopic = "testTopic";
+ KafkaHandler kafkaHandler = new KafkaHandler(mockSdcKafkaConsumer, mockSdcKafkaProducer, true);
+ INotificationData testData = new NotificationDataImpl();
+ CambriaErrorResponse response = kafkaHandler.sendNotification(testTopic, testData);
+ assertEquals(response.getOperationStatus(), CambriaOperationStatus.OK);
+ assertEquals(response.getHttpCode(), 200);
+ }
+
+ @Test
+ public void testSendNotificationKafkaException(){
+ String testTopic = "testTopic";
+ KafkaHandler kafkaHandler = new KafkaHandler(mockSdcKafkaConsumer, mockSdcKafkaProducer, true);
+ INotificationData testData = new NotificationDataImpl();
+ doThrow(KafkaException.class).when(mockSdcKafkaProducer).send(any(), any());
+ CambriaErrorResponse response = kafkaHandler.sendNotification(testTopic, testData);
+ assertEquals(response.getOperationStatus(), CambriaOperationStatus.INTERNAL_SERVER_ERROR);
+ assertEquals(response.getHttpCode(), 500);
+ }
+
+ @Test
+ public void testSendNotificationJsonSyntaxException(){
+ String testTopic = "testTopic";
+ KafkaHandler kafkaHandler = new KafkaHandler(mockSdcKafkaConsumer, mockSdcKafkaProducer, true);
+ INotificationData testData = new NotificationDataImpl();
+ doThrow(JsonSyntaxException.class).when(mockSdcKafkaProducer).send(any(), any());
+ CambriaErrorResponse response = kafkaHandler.sendNotification(testTopic, testData);
+ assertEquals(response.getOperationStatus(), CambriaOperationStatus.INTERNAL_SERVER_ERROR);
+ assertEquals(response.getHttpCode(), 500);
+ }
+
+ @Test
+ public void testSendNotificationFlushException(){
+ String testTopic = "testTopic";
+ KafkaHandler kafkaHandler = new KafkaHandler(mockSdcKafkaConsumer, mockSdcKafkaProducer, true);
+ INotificationData testData = new NotificationDataImpl();
+ doThrow(KafkaException.class).when(mockSdcKafkaProducer).flush();
+ CambriaErrorResponse response = kafkaHandler.sendNotification(testTopic, testData);
+ assertEquals(response.getOperationStatus(), CambriaOperationStatus.INTERNAL_SERVER_ERROR);
+ assertEquals(response.getHttpCode(), 500);
+ }
+}
diff --git a/catalog-be/src/test/java/org/openecomp/sdc/be/components/kafka/SdcKafkaConsumerTest.java b/catalog-be/src/test/java/org/openecomp/sdc/be/components/kafka/SdcKafkaConsumerTest.java
new file mode 100644
index 0000000000..0a4a834fa4
--- /dev/null
+++ b/catalog-be/src/test/java/org/openecomp/sdc/be/components/kafka/SdcKafkaConsumerTest.java
@@ -0,0 +1,143 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * SDC
+ * ================================================================================
+ * Copyright (C) 2022 Nordix Foundation. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.openecomp.sdc.be.components.kafka;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.apache.kafka.common.KafkaException;
+import org.junit.jupiter.api.Test;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.when;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+import java.util.Collections;
+import java.util.Collection;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.jetbrains.annotations.NotNull;
+
+import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
+
+public class SdcKafkaConsumerTest {
+
+ @Test
+ public void TestSubscribeSuccess(){
+ KafkaConsumer<byte[], byte[]> mockKafkaConsumer = Mockito.mock(KafkaConsumer.class);
+ SdcKafkaConsumer sdcKafkaConsumer = new SdcKafkaConsumer(mockKafkaConsumer, null);
+ ArgumentCaptor<Collections> captor = ArgumentCaptor.forClass(Collections.class);
+
+ String testTopics = "testTopic";
+ sdcKafkaConsumer.subscribe(testTopics);
+ verify(mockKafkaConsumer).subscribe((Collection<String>) captor.capture());
+ }
+
+ @Test
+ public void TestSubscribeAlreadySubscribed(){
+ KafkaConsumer<byte[], byte[]> mockKafkaConsumer = Mockito.mock(KafkaConsumer.class);
+ SdcKafkaConsumer sdcKafkaConsumer = new SdcKafkaConsumer(mockKafkaConsumer, null);
+ ArgumentCaptor<Collections> captor = ArgumentCaptor.forClass(Collections.class);
+
+
+ String testTopics = "testTopic";
+ Set<String> currentSubs = new HashSet<String>();
+ currentSubs.add(testTopics);
+ when(mockKafkaConsumer.subscription()).thenReturn(currentSubs);
+ sdcKafkaConsumer.subscribe(testTopics);
+ verify(mockKafkaConsumer, never()).subscribe((Collection<String>) captor.capture());
+ }
+
+ @Test
+ public void TestPollForMessagesForSpecificTopicSuccess(){
+ KafkaConsumer<byte[], byte[]> mockKafkaConsumer = Mockito.mock(KafkaConsumer.class);
+
+
+ String testTopic = "testTopic";
+
+ ConsumerRecords mockedPollResult = getTestConsumerRecords(testTopic);
+
+ when(mockKafkaConsumer.poll(any())).thenReturn(mockedPollResult);
+
+ DistributionEngineConfiguration config = getMockDistributionEngineConfiguration();
+
+ SdcKafkaConsumer sdcKafkaConsumer = new SdcKafkaConsumer(mockKafkaConsumer, config);
+
+ List<String> returned = sdcKafkaConsumer.poll(testTopic);
+ assertTrue(returned.size()==1);
+ assertTrue(returned.contains("testTopicValue"));
+ }
+
+ @Test
+ public void testSaslJaasConfigNotFound(){
+ assertThrows(
+ KafkaException.class,
+ () -> new SdcKafkaConsumer(setTestDistributionEngineConfigs()),
+ "Sasl Jaas Config should not be found, so expected a KafkaException"
+ );
+ }
+
+ @NotNull
+ private DistributionEngineConfiguration getMockDistributionEngineConfiguration() {
+ DistributionEngineConfiguration config = new DistributionEngineConfiguration();
+ DistributionEngineConfiguration.DistributionStatusTopicConfig mockStatusTopic = new DistributionEngineConfiguration.DistributionStatusTopicConfig();
+ mockStatusTopic.setPollingIntervalSec(1);
+ config.setDistributionStatusTopic(mockStatusTopic);
+ return config;
+ }
+
+ @NotNull
+ private ConsumerRecords getTestConsumerRecords(String testTopics) {
+ Map map = new HashMap<Integer, ConsumerRecord>();
+
+ ConsumerRecord consumerRecord = new ConsumerRecord(testTopics, 0, 0, "", "testTopicValue");
+
+ List<ConsumerRecord> consumerRecordList = new ArrayList<>();
+ consumerRecordList.add(consumerRecord);
+ TopicPartition topicPartition = new TopicPartition(testTopics, 0);
+ map.put(topicPartition, consumerRecordList);
+
+ ConsumerRecords mockedPollResult = new ConsumerRecords(map);
+ return mockedPollResult;
+ }
+
+ private DistributionEngineConfiguration setTestDistributionEngineConfigs(){
+ DistributionEngineConfiguration.DistributionStatusTopicConfig dsTopic = new DistributionEngineConfiguration.DistributionStatusTopicConfig();
+ DistributionEngineConfiguration deConfiguration = new DistributionEngineConfiguration();
+ String testBootstrapServers = "TestBootstrapServer";
+ dsTopic.setConsumerGroup("consumerGroup");
+ dsTopic.setConsumerId("consumerId");
+
+ deConfiguration.setKafkaBootStrapServers(testBootstrapServers);
+ deConfiguration.setDistributionStatusTopic(dsTopic);
+ return deConfiguration;
+ }
+}
diff --git a/catalog-be/src/test/java/org/openecomp/sdc/be/components/kafka/SdcKafkaProducerTest.java b/catalog-be/src/test/java/org/openecomp/sdc/be/components/kafka/SdcKafkaProducerTest.java
new file mode 100644
index 0000000000..23322cce5a
--- /dev/null
+++ b/catalog-be/src/test/java/org/openecomp/sdc/be/components/kafka/SdcKafkaProducerTest.java
@@ -0,0 +1,94 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * SDC
+ * ================================================================================
+ * Copyright (C) 2022 Nordix Foundation. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.openecomp.sdc.be.components.kafka;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import org.junit.jupiter.api.Test;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.common.KafkaException;
+
+import org.openecomp.sdc.be.catalog.api.IStatus;
+import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
+
+public class SdcKafkaProducerTest {
+
+ @Test
+ public void TestSendSuccess(){
+ KafkaProducer<byte[], byte[]> mockKafkaProducer = Mockito.mock(KafkaProducer.class);
+ SdcKafkaProducer sdcKafkaProducer = new SdcKafkaProducer(mockKafkaProducer);
+ ArgumentCaptor<ProducerRecord> captor = ArgumentCaptor.forClass(ProducerRecord.class);
+ sdcKafkaProducer.send("testMessage", "testTopic");
+
+
+ verify(mockKafkaProducer).send(captor.capture());
+ }
+
+ @Test
+ public void testFlushSuccess(){
+ KafkaProducer<byte[], byte[]> mockKafkaProducer = Mockito.mock(KafkaProducer.class);
+ SdcKafkaProducer sdcKafkaProducer = new SdcKafkaProducer(mockKafkaProducer);
+ sdcKafkaProducer.flush();
+
+ verify(mockKafkaProducer).flush();
+ }
+
+ @Test
+ public void testSendFail(){
+ KafkaProducer<byte[], byte[]> mockKafkaProducer = Mockito.mock(KafkaProducer.class);
+ SdcKafkaProducer sdcKafkaProducer = new SdcKafkaProducer(mockKafkaProducer);
+
+ when(mockKafkaProducer.send(any())).thenThrow(new KafkaException());
+
+ assertThrows(
+ KafkaException.class,
+ () -> sdcKafkaProducer.send("testMessage", "testTopic"),
+ "Expected a KafkaException thrown on KafkaProducer Send");
+ }
+
+ @Test
+ public void testSaslJaasConfigNotFound(){
+ assertThrows(
+ KafkaException.class,
+ () -> new SdcKafkaProducer(setTestDistributionEngineConfigs()),
+ "Sasl Jaas Config should not be found, so expected a KafkaException"
+ );
+ }
+
+ private DistributionEngineConfiguration setTestDistributionEngineConfigs(){
+ DistributionEngineConfiguration.DistributionStatusTopicConfig dStatusTopicConfig = new DistributionEngineConfiguration.DistributionStatusTopicConfig();
+ DistributionEngineConfiguration deConfiguration = new DistributionEngineConfiguration();
+ deConfiguration.setKafkaBootStrapServers("TestBootstrapServer");
+ dStatusTopicConfig.setConsumerId("consumerId");
+
+ deConfiguration.setDistributionStatusTopic(dStatusTopicConfig);
+ deConfiguration.getDistributionStatusTopic().getConsumerId();
+ return deConfiguration;
+ }
+}