summaryrefslogtreecommitdiffstats
path: root/catalog-be/src/main/java
diff options
context:
space:
mode:
authordavid.mcweeney <david.mcweeney@est.tech>2022-10-04 15:46:14 +0100
committerMichael Morris <michael.morris@est.tech>2022-10-25 11:24:02 +0000
commit47f96dd966663f7f46b719451c0752721a2940a3 (patch)
tree9d875ce43f96cf3e570cc812d907fa2edd3b7945 /catalog-be/src/main/java
parent0d2e96c125aab4e4edfc0a8b897353c0fabdd885 (diff)
[SDC] Add kafka native messaging
Change-Id: I5ab8f580947cbc264d94bec48a5e8b659dc44c08 Issue-ID: DMAAP-1787 Signed-off-by: david.mcweeney <david.mcweeney@est.tech>
Diffstat (limited to 'catalog-be/src/main/java')
-rw-r--r--catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTask.java73
-rw-r--r--catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEnginePollingTask.java27
-rw-r--r--catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionNotificationSender.java23
-rw-r--r--catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/KafkaHandler.java138
-rw-r--r--catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaConsumer.java117
-rw-r--r--catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaProducer.java98
6 files changed, 431 insertions, 45 deletions
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTask.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTask.java
index d61e15016a..00d3fedfc8 100644
--- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTask.java
+++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTask.java
@@ -20,6 +20,7 @@
package org.openecomp.sdc.be.components.distribution.engine;
import fj.data.Either;
+import java.util.ArrayList;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Executors;
@@ -27,7 +28,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.stream.Collectors;
+import org.openecomp.sdc.be.components.kafka.KafkaHandler;
import org.openecomp.sdc.be.config.BeEcompErrorManager;
import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus;
@@ -60,6 +61,7 @@ public class DistributionEngineInitTask implements Runnable {
private AtomicBoolean status = null;
private OperationalEnvironmentEntry environmentEntry;
private CambriaHandler cambriaHandler = new CambriaHandler();
+ private KafkaHandler kafkaHandler = new KafkaHandler();
private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
public DistributionEngineInitTask(Long delayBeforeStartFlow, DistributionEngineConfiguration deConfiguration, String envName,
@@ -136,9 +138,7 @@ public class DistributionEngineInitTask implements Runnable {
@Override
public void run() {
- boolean result = false;
- result = initFlow();
- if (result) {
+ if (initFlow()) {
this.stopTask();
this.status.set(true);
if (this.distributionEnginePollingTask != null) {
@@ -159,38 +159,45 @@ public class DistributionEngineInitTask implements Runnable {
* @return
*/
public boolean initFlow() {
- logger.trace("Start init flow for environment {}", this.envName);
- Set<String> topicsList = null;
- Either<Set<String>, CambriaErrorResponse> getTopicsRes = null;
- getTopicsRes = cambriaHandler.getTopics(environmentEntry.getDmaapUebAddress().stream().collect(Collectors.toList()));
- if (getTopicsRes.isRight()) {
- CambriaErrorResponse status = getTopicsRes.right().value();
- if (status.getOperationStatus() == CambriaOperationStatus.NOT_FOUND) {
- topicsList = new HashSet<>();
+ logger.info("Start init flow for environment {}", this.envName);
+ if (Boolean.FALSE.equals(kafkaHandler.isKafkaActive())) {
+ Set<String> topicsList;
+ Either<Set<String>, CambriaErrorResponse> getTopicsRes;
+ getTopicsRes = cambriaHandler.getTopics(new ArrayList<>(environmentEntry.getDmaapUebAddress()));
+ if (getTopicsRes.isRight()) {
+ CambriaErrorResponse cambriaErrorResponse = getTopicsRes.right().value();
+ if (cambriaErrorResponse.getOperationStatus() == CambriaOperationStatus.NOT_FOUND) {
+ topicsList = new HashSet<>();
+ } else {
+ BeEcompErrorManager.getInstance().logBeUebSystemError(INIT_DISTRIBUTION_ENGINE_FLOW,
+ "try retrieve list of topics from U-EB server");
+ return false;
+ }
} else {
- BeEcompErrorManager.getInstance().logBeUebSystemError(INIT_DISTRIBUTION_ENGINE_FLOW, "try retrieve list of topics from U-EB server");
+ topicsList = getTopicsRes.left().value();
+ }
+ String notificationTopic = buildTopicName(deConfiguration.getDistributionNotifTopicName(), this.envName);
+ logger.debug("Going to handle topic {}", notificationTopic);
+ if (!createNotificationTopicIfNotExists(topicsList, notificationTopic)) {
+ return false;
+ }
+ CambriaErrorResponse registerProducerStatus = registerToTopic(notificationTopic,
+ SubscriberTypeEnum.PRODUCER);
+ CambriaOperationStatus createStatus = registerProducerStatus.getOperationStatus();
+ if (createStatus != CambriaOperationStatus.OK) {
+ return false;
+ }
+ String statusTopic = buildTopicName(deConfiguration.getDistributionStatusTopicName(), this.envName);
+ logger.debug("Going to handle topic {}", statusTopic);
+ if (!createStatusTopicIfNotExists(topicsList, statusTopic)) {
return false;
}
+ CambriaErrorResponse registerConcumerStatus = registerToTopic(statusTopic, SubscriberTypeEnum.CONSUMER);
+ return registerConcumerStatus.getOperationStatus() == CambriaOperationStatus.OK;
} else {
- topicsList = getTopicsRes.left().value();
+ logger.info("Skipping DisributionEngineInitTask flow to use kafka native for distribution messaging");
+ return true;
}
- String notificationTopic = buildTopicName(deConfiguration.getDistributionNotifTopicName(), this.envName);
- logger.debug("Going to handle topic {}", notificationTopic);
- if (!createNotificationTopicIfNotExists(topicsList, notificationTopic)) {
- return false;
- }
- CambriaErrorResponse registerProducerStatus = registerToTopic(notificationTopic, SubscriberTypeEnum.PRODUCER);
- CambriaOperationStatus createStatus = registerProducerStatus.getOperationStatus();
- if (createStatus != CambriaOperationStatus.OK) {
- return false;
- }
- String statusTopic = buildTopicName(deConfiguration.getDistributionStatusTopicName(), this.envName);
- logger.debug("Going to handle topic {}", statusTopic);
- if (!createStatusTopicIfNotExists(topicsList, statusTopic)) {
- return false;
- }
- CambriaErrorResponse registerConcumerStatus = registerToTopic(statusTopic, SubscriberTypeEnum.CONSUMER);
- return registerConcumerStatus.getOperationStatus() == CambriaOperationStatus.OK;
}
private CambriaErrorResponse registerToTopic(String topicName, SubscriberTypeEnum subscriberType) {
@@ -281,4 +288,8 @@ public class DistributionEngineInitTask implements Runnable {
protected void setCambriaHandler(CambriaHandler cambriaHandler) {
this.cambriaHandler = cambriaHandler;
}
+
+ protected void setKafkaHandler(KafkaHandler kafkaHandler) {
+ this.kafkaHandler = kafkaHandler;
+ }
}
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEnginePollingTask.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEnginePollingTask.java
index 124671086f..ab4400a5bf 100644
--- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEnginePollingTask.java
+++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEnginePollingTask.java
@@ -29,6 +29,7 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.openecomp.sdc.be.components.distribution.engine.report.DistributionCompleteReporter;
+import org.openecomp.sdc.be.components.kafka.KafkaHandler;
import org.openecomp.sdc.be.config.BeEcompErrorManager;
import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
import org.openecomp.sdc.be.config.DistributionEngineConfiguration.DistributionStatusTopicConfig;
@@ -51,6 +52,7 @@ public class DistributionEnginePollingTask implements Runnable {
private String consumerId;
private String consumerGroup;
private CambriaHandler cambriaHandler = new CambriaHandler();
+ private final KafkaHandler kafkaHandler = new KafkaHandler();
private Gson gson = new GsonBuilder().setPrettyPrinting().create();
private DistributionCompleteReporter distributionCompleteReporter;
private ScheduledExecutorService scheduledPollingService = Executors
@@ -82,9 +84,12 @@ public class DistributionEnginePollingTask implements Runnable {
fetchTimeoutInSec = 15;
}
try {
- cambriaConsumer = cambriaHandler
- .createConsumer(environmentEntry.getDmaapUebAddress(), topicName, environmentEntry.getUebApikey(), environmentEntry.getUebSecretKey(),
- consumerId, consumerGroup, fetchTimeoutInSec * 1000);
+ if (Boolean.FALSE.equals(kafkaHandler.isKafkaActive())) {
+ cambriaConsumer = cambriaHandler
+ .createConsumer(environmentEntry.getDmaapUebAddress(), topicName, environmentEntry.getUebApikey(),
+ environmentEntry.getUebSecretKey(),
+ consumerId, consumerGroup, fetchTimeoutInSec * 1000);
+ }
if (scheduledPollingService != null) {
logger.debug("Start Distribution Engine polling task. polling interval {} seconds", pollingIntervalInSec);
scheduledFuture = scheduledPollingService.scheduleAtFixedRate(this, 0, pollingIntervalInSec, TimeUnit.SECONDS);
@@ -119,14 +124,20 @@ public class DistributionEnginePollingTask implements Runnable {
@Override
public void run() {
logger.trace("run() method. polling queue {}", topicName);
+ Either<Iterable<String>, CambriaErrorResponse> fetchResult;
try {
// init error
- if (cambriaConsumer == null) {
- BeEcompErrorManager.getInstance().logBeUebSystemError(DISTRIBUTION_STATUS_POLLING, "polling task was not initialized properly");
- stopTask();
- return;
+ if (Boolean.FALSE.equals(kafkaHandler.isKafkaActive())) {
+ if (cambriaConsumer == null) {
+ BeEcompErrorManager.getInstance()
+ .logBeUebSystemError(DISTRIBUTION_STATUS_POLLING, "polling task was not initialized properly");
+ stopTask();
+ return;
+ }
+ fetchResult = cambriaHandler.fetchFromTopic(cambriaConsumer);
+ } else {
+ fetchResult = kafkaHandler.fetchFromTopic(topicName);
}
- Either<Iterable<String>, CambriaErrorResponse> fetchResult = cambriaHandler.fetchFromTopic(cambriaConsumer);
// fetch error
if (fetchResult.isRight()) {
CambriaErrorResponse errorResponse = fetchResult.right().value();
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionNotificationSender.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionNotificationSender.java
index 0098eac7d9..b93d485bdb 100644
--- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionNotificationSender.java
+++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionNotificationSender.java
@@ -19,6 +19,7 @@
*/
package org.openecomp.sdc.be.components.distribution.engine;
+import org.openecomp.sdc.be.components.kafka.KafkaHandler;
import org.openecomp.sdc.be.config.ConfigurationManager;
import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
import org.openecomp.sdc.be.dao.api.ActionStatus;
@@ -36,16 +37,26 @@ public class DistributionNotificationSender {
private static final Logger logger = Logger.getLogger(DistributionNotificationSender.class.getName());
@javax.annotation.Resource
protected ComponentsUtils componentUtils;
- private CambriaHandler cambriaHandler = new CambriaHandler();
- private DistributionEngineConfiguration deConfiguration = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration();
+ private final CambriaHandler cambriaHandler = new CambriaHandler();
+
+ private final KafkaHandler kafkaHandler = new KafkaHandler();
+
+//
+ private final DistributionEngineConfiguration deConfiguration = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration();
public ActionStatus sendNotification(String topicName, String distributionId, EnvironmentMessageBusData messageBusData,
INotificationData notificationData, Service service, User modifier) {
long startTime = System.currentTimeMillis();
- CambriaErrorResponse status = cambriaHandler
- .sendNotificationAndClose(topicName, messageBusData.getUebPublicKey(), messageBusData.getUebPrivateKey(),
- messageBusData.getDmaaPuebEndpoints(), notificationData,
- deConfiguration.getDistributionNotificationTopic().getMaxWaitingAfterSendingSeconds());
+ CambriaErrorResponse status;
+ if (Boolean.FALSE.equals(kafkaHandler.isKafkaActive())) {
+ status = cambriaHandler
+ .sendNotificationAndClose(topicName, messageBusData.getUebPublicKey(), messageBusData.getUebPrivateKey(),
+ messageBusData.getDmaaPuebEndpoints(), notificationData,
+ deConfiguration.getDistributionNotificationTopic().getMaxWaitingAfterSendingSeconds());
+ }
+ else{
+ status = kafkaHandler.sendNotification(topicName, notificationData);
+ }
logger.info("After publishing service {} of version {}. Status is {}", service.getName(), service.getVersion(), status.getHttpCode());
auditDistributionNotification(
new AuditDistributionNotificationBuilder().setTopicName(topicName).setDistributionId(distributionId).setStatus(status).setService(service)
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/KafkaHandler.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/KafkaHandler.java
new file mode 100644
index 0000000000..2a5590e72d
--- /dev/null
+++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/KafkaHandler.java
@@ -0,0 +1,138 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * SDC
+ * ================================================================================
+ * Copyright (C) 2022 Nordix Foundation. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.openecomp.sdc.be.components.kafka;
+
+import com.google.gson.Gson;
+import com.google.gson.JsonSyntaxException;
+import fj.data.Either;
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.http.HttpStatus;
+import org.apache.kafka.common.KafkaException;
+import org.openecomp.sdc.be.components.distribution.engine.CambriaErrorResponse;
+import org.openecomp.sdc.be.components.distribution.engine.INotificationData;
+import org.openecomp.sdc.be.config.BeEcompErrorManager;
+import org.openecomp.sdc.be.config.ConfigurationManager;
+import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
+import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus;
+import org.openecomp.sdc.common.log.wrappers.Logger;
+import org.springframework.stereotype.Component;
+
+/**
+ * Utility class that provides a handler for Kafka interactions
+ */
+@Component
+public class KafkaHandler {
+
+ private static final Logger log = Logger.getLogger(KafkaHandler.class.getName());
+ private final Gson gson = new Gson();
+
+ private SdcKafkaConsumer sdcKafkaConsumer;
+
+ private SdcKafkaProducer sdcKafkaProducer;
+
+ @Setter
+ private boolean isKafkaActive;
+
+ private DistributionEngineConfiguration deConfiguration;
+
+ public KafkaHandler(SdcKafkaConsumer sdcKafkaConsumer, SdcKafkaProducer sdcKafkaProducer, boolean isKafkaActive) {
+ this.sdcKafkaConsumer = sdcKafkaConsumer;
+ this.sdcKafkaProducer = sdcKafkaProducer;
+ this.isKafkaActive = isKafkaActive;
+ }
+
+ public KafkaHandler() {
+ isKafkaActive = Boolean.parseBoolean(System.getenv().getOrDefault("USE_KAFKA", "false"));
+ deConfiguration = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration();
+ }
+
+ /**
+ * @return a user configuration whether Kafka is active for this client
+ */
+ public Boolean isKafkaActive() {
+ return isKafkaActive;
+ }
+
+ /**
+ * @param topicName The topic from which messages will be fetched
+ * @return Either A list of messages from a specific topic, or a specific error response
+ */
+ public Either<Iterable<String>, CambriaErrorResponse> fetchFromTopic(String topicName) {
+ try {
+ if(sdcKafkaConsumer == null){
+ sdcKafkaConsumer = new SdcKafkaConsumer(deConfiguration);
+ }
+ sdcKafkaConsumer.subscribe(topicName);
+ Iterable<String> messages = sdcKafkaConsumer.poll(topicName);
+ log.info("Returning messages from topic {}", topicName);
+ return Either.left(messages);
+ } catch (KafkaException e) {
+ BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError("fetchFromTopic", e.getMessage());
+ log.error("Failed to fetch from kafka for topic: {}", topicName, e);
+ CambriaErrorResponse cambriaErrorResponse =
+ new CambriaErrorResponse(CambriaOperationStatus.INTERNAL_SERVER_ERROR,
+ HttpStatus.SC_INTERNAL_SERVER_ERROR);
+ return Either.right(cambriaErrorResponse);
+ }
+ }
+
+ /**
+ * Publish notification message to a given topic and flush
+ *
+ * @param topicName The topic to which the message should be published
+ * @param data The data to publish to the topic specified
+ * @return CambriaErrorResponse a status response on success or any errors thrown
+ */
+ public CambriaErrorResponse sendNotification(String topicName, INotificationData data) {
+ CambriaErrorResponse response;
+ if(sdcKafkaProducer == null){
+ sdcKafkaProducer = new SdcKafkaProducer(deConfiguration);
+ }
+ try {
+ String json = gson.toJson(data);
+ log.info("Before sending to topic {}", topicName);
+ sdcKafkaProducer.send(json, topicName);
+ }
+ catch(KafkaException e){
+ BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError("sendNotification", e.getMessage());
+ log.error("Failed to send message . Exception {}", e.getMessage());
+
+ return new CambriaErrorResponse(CambriaOperationStatus.INTERNAL_SERVER_ERROR, 500);
+ } catch (JsonSyntaxException e) {
+ BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError("sendNotification", e.getMessage());
+ log.error("Failed to convert data to json: {}", data, e);
+
+ return new CambriaErrorResponse(CambriaOperationStatus.INTERNAL_SERVER_ERROR, 500);
+ } finally {
+ try {
+ sdcKafkaProducer.flush();
+ response = new CambriaErrorResponse(CambriaOperationStatus.OK, 200);
+ } catch (KafkaException | IllegalArgumentException e) {
+ BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError("sendNotification", e.getMessage());
+ log.error("Failed to flush sdcKafkaProducer", e);
+
+ response = new CambriaErrorResponse(CambriaOperationStatus.INTERNAL_SERVER_ERROR, 500);
+ }
+ }
+
+ return response;
+ }
+}
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaConsumer.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaConsumer.java
new file mode 100644
index 0000000000..8879bf000e
--- /dev/null
+++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaConsumer.java
@@ -0,0 +1,117 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * SDC
+ * ================================================================================
+ * Copyright (C) 2022 Nordix Foundation. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.openecomp.sdc.be.components.kafka;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
+import org.openecomp.sdc.common.log.wrappers.Logger;
+
+/**
+ * Utility class that provides a KafkaConsumer to communicate with a kafka cluster
+ */
+public class SdcKafkaConsumer {
+
+ private static final Logger log = Logger.getLogger(SdcKafkaConsumer.class.getName());
+ private final DistributionEngineConfiguration deConfiguration;
+ private KafkaConsumer<String, String> kafkaConsumer;
+
+ /**
+ * Constructor setting up the KafkaConsumer from a predefined set of configurations
+ */
+ public SdcKafkaConsumer(DistributionEngineConfiguration deConfiguration){
+ log.info("Create SdcKafkaConsumer via constructor");
+ Properties properties = new Properties();
+ this.deConfiguration = deConfiguration;
+
+ properties.put(ConsumerConfig.CLIENT_ID_CONFIG, deConfiguration.getDistributionStatusTopic().getConsumerId()+ "-consumer-" + UUID.randomUUID());
+ properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
+ properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
+ properties.put(ConsumerConfig.GROUP_ID_CONFIG, deConfiguration.getDistributionStatusTopic().getConsumerGroup());
+ properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, deConfiguration.getKafkaBootStrapServers());
+ properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
+ properties.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, false);
+ properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
+ properties.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512");
+
+ properties.put(SaslConfigs.SASL_JAAS_CONFIG, getKafkaSaslJaasConfig());
+ kafkaConsumer = new KafkaConsumer<>(properties);
+ }
+
+ /**
+ *
+ * @param kafkaConsumer a kafkaConsumer to use within the class
+ * @param deConfiguration - Configuration to pass into the class
+ */
+ @VisibleForTesting
+ SdcKafkaConsumer(KafkaConsumer kafkaConsumer, DistributionEngineConfiguration deConfiguration){
+ this.deConfiguration = deConfiguration;
+ this.kafkaConsumer = kafkaConsumer;
+ }
+
+ /**
+ *
+ * @return the Sasl Jass Config
+ */
+ private String getKafkaSaslJaasConfig() {
+ String saslJaasConfFromEnv = System.getenv("SASL_JAAS_CONFIG");
+ if(saslJaasConfFromEnv != null) {
+ return saslJaasConfFromEnv;
+ } else {
+ throw new KafkaException("sasl.jaas.config not set for Kafka Consumer");
+ }
+ }
+
+ /**
+ *
+ * @param topic Topic in which to subscribe
+ */
+ public void subscribe(String topic) throws KafkaException {
+ if (!kafkaConsumer.subscription().contains(topic)) {
+ kafkaConsumer.subscribe(Collections.singleton(topic));
+ }
+ }
+
+ /**
+ *
+ * @return The list of messages for a specified topic, returned from the poll
+ */
+ public List<String> poll(String topicName) throws KafkaException {
+ log.info("SdcKafkaConsumer - polling for messages from Topic: {}", topicName);
+ List<String> msgs = new ArrayList<>();
+ ConsumerRecords<String, String> consumerRecordsForSpecificTopic = kafkaConsumer.poll(Duration.ofSeconds(deConfiguration.getDistributionStatusTopic().getPollingIntervalSec()));
+ for(ConsumerRecord<String, String> rec : consumerRecordsForSpecificTopic){
+ msgs.add(rec.value());
+ }
+ return msgs;
+ }
+}
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaProducer.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaProducer.java
new file mode 100644
index 0000000000..bdc984d7b5
--- /dev/null
+++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaProducer.java
@@ -0,0 +1,98 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * SDC
+ * ================================================================================
+ * Copyright (C) 2022 Nordix Foundation. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.openecomp.sdc.be.components.kafka;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Properties;
+import java.util.UUID;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class that provides a KafkaProducer to communicate with a kafka cluster
+ */
+public class SdcKafkaProducer {
+ private static final Logger log = LoggerFactory.getLogger(SdcKafkaProducer.class.getName());
+
+ private KafkaProducer<String, String> kafkaProducer;
+
+ /**
+ * Constructor setting up the KafkaProducer from a predefined set of configurations
+ */
+ public SdcKafkaProducer(DistributionEngineConfiguration deConfiguration) {
+ log.info("Create SdcKafkaProducer via constructor");
+ Properties properties = new Properties();
+
+ properties.put(ProducerConfig.CLIENT_ID_CONFIG, deConfiguration.getDistributionStatusTopic().getConsumerId() + "-producer-" + UUID.randomUUID());
+ properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
+ properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
+ properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, deConfiguration.getKafkaBootStrapServers());
+ properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
+ properties.put(SaslConfigs.SASL_JAAS_CONFIG, getKafkaSaslJaasConfig());
+ properties.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512");
+ kafkaProducer = new KafkaProducer<>(properties);
+ }
+
+ /**
+ *
+ * @param kafkaProducer Setting a KafkaProducer to use within the sdcKafkaProducer class
+ */
+ @VisibleForTesting
+ SdcKafkaProducer(KafkaProducer kafkaProducer) {
+ this.kafkaProducer = kafkaProducer;
+ }
+
+ /**
+ * @return The Sasl Jaas Configuration
+ */
+ private static String getKafkaSaslJaasConfig() throws KafkaException {
+ String saslJaasConfFromEnv = System.getenv("SASL_JAAS_CONFIG");
+ if(saslJaasConfFromEnv != null) {
+ return saslJaasConfFromEnv;
+ } else {
+ throw new KafkaException("sasl.jaas.config not set for Kafka Consumer");
+ }
+ }
+
+ /**
+ * @param message A message to Send
+ * @param topicName The name of the topic to publish to
+ * @return The status of the send request
+ */
+ public void send(String message, String topicName) throws KafkaException {
+ ProducerRecord<String, String> kafkaMessagePayload = new ProducerRecord<>(topicName, "PartitionKey", message);
+ kafkaProducer.send(kafkaMessagePayload);
+ }
+
+ /**
+ * Kafka FLush operation
+ */
+ public void flush() throws KafkaException {
+ log.info("SdcKafkaProducer - flush");
+ kafkaProducer.flush();
+ }
+}