diff options
author | david.mcweeney <david.mcweeney@est.tech> | 2022-10-04 15:46:14 +0100 |
---|---|---|
committer | Michael Morris <michael.morris@est.tech> | 2022-10-25 11:24:02 +0000 |
commit | 47f96dd966663f7f46b719451c0752721a2940a3 (patch) | |
tree | 9d875ce43f96cf3e570cc812d907fa2edd3b7945 /catalog-be/src/main/java/org | |
parent | 0d2e96c125aab4e4edfc0a8b897353c0fabdd885 (diff) |
[SDC] Add kafka native messaging
Change-Id: I5ab8f580947cbc264d94bec48a5e8b659dc44c08
Issue-ID: DMAAP-1787
Signed-off-by: david.mcweeney <david.mcweeney@est.tech>
Diffstat (limited to 'catalog-be/src/main/java/org')
6 files changed, 431 insertions, 45 deletions
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTask.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTask.java index d61e15016a..00d3fedfc8 100644 --- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTask.java +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTask.java @@ -20,6 +20,7 @@ package org.openecomp.sdc.be.components.distribution.engine; import fj.data.Either; +import java.util.ArrayList; import java.util.HashSet; import java.util.Set; import java.util.concurrent.Executors; @@ -27,7 +28,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.stream.Collectors; +import org.openecomp.sdc.be.components.kafka.KafkaHandler; import org.openecomp.sdc.be.config.BeEcompErrorManager; import org.openecomp.sdc.be.config.DistributionEngineConfiguration; import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus; @@ -60,6 +61,7 @@ public class DistributionEngineInitTask implements Runnable { private AtomicBoolean status = null; private OperationalEnvironmentEntry environmentEntry; private CambriaHandler cambriaHandler = new CambriaHandler(); + private KafkaHandler kafkaHandler = new KafkaHandler(); private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1); public DistributionEngineInitTask(Long delayBeforeStartFlow, DistributionEngineConfiguration deConfiguration, String envName, @@ -136,9 +138,7 @@ public class DistributionEngineInitTask implements Runnable { @Override public void run() { - boolean result = false; - result = initFlow(); - if (result) { + if (initFlow()) { this.stopTask(); this.status.set(true); if (this.distributionEnginePollingTask != null) { @@ -159,38 +159,45 @@ public class DistributionEngineInitTask implements Runnable { * @return */ public boolean initFlow() { - logger.trace("Start init flow for environment {}", this.envName); - Set<String> topicsList = null; - Either<Set<String>, CambriaErrorResponse> getTopicsRes = null; - getTopicsRes = cambriaHandler.getTopics(environmentEntry.getDmaapUebAddress().stream().collect(Collectors.toList())); - if (getTopicsRes.isRight()) { - CambriaErrorResponse status = getTopicsRes.right().value(); - if (status.getOperationStatus() == CambriaOperationStatus.NOT_FOUND) { - topicsList = new HashSet<>(); + logger.info("Start init flow for environment {}", this.envName); + if (Boolean.FALSE.equals(kafkaHandler.isKafkaActive())) { + Set<String> topicsList; + Either<Set<String>, CambriaErrorResponse> getTopicsRes; + getTopicsRes = cambriaHandler.getTopics(new ArrayList<>(environmentEntry.getDmaapUebAddress())); + if (getTopicsRes.isRight()) { + CambriaErrorResponse cambriaErrorResponse = getTopicsRes.right().value(); + if (cambriaErrorResponse.getOperationStatus() == CambriaOperationStatus.NOT_FOUND) { + topicsList = new HashSet<>(); + } else { + BeEcompErrorManager.getInstance().logBeUebSystemError(INIT_DISTRIBUTION_ENGINE_FLOW, + "try retrieve list of topics from U-EB server"); + return false; + } } else { - BeEcompErrorManager.getInstance().logBeUebSystemError(INIT_DISTRIBUTION_ENGINE_FLOW, "try retrieve list of topics from U-EB server"); + topicsList = getTopicsRes.left().value(); + } + String notificationTopic = buildTopicName(deConfiguration.getDistributionNotifTopicName(), this.envName); + logger.debug("Going to handle topic {}", notificationTopic); + if (!createNotificationTopicIfNotExists(topicsList, notificationTopic)) { + return false; + } + CambriaErrorResponse registerProducerStatus = registerToTopic(notificationTopic, + SubscriberTypeEnum.PRODUCER); + CambriaOperationStatus createStatus = registerProducerStatus.getOperationStatus(); + if (createStatus != CambriaOperationStatus.OK) { + return false; + } + String statusTopic = buildTopicName(deConfiguration.getDistributionStatusTopicName(), this.envName); + logger.debug("Going to handle topic {}", statusTopic); + if (!createStatusTopicIfNotExists(topicsList, statusTopic)) { return false; } + CambriaErrorResponse registerConcumerStatus = registerToTopic(statusTopic, SubscriberTypeEnum.CONSUMER); + return registerConcumerStatus.getOperationStatus() == CambriaOperationStatus.OK; } else { - topicsList = getTopicsRes.left().value(); + logger.info("Skipping DisributionEngineInitTask flow to use kafka native for distribution messaging"); + return true; } - String notificationTopic = buildTopicName(deConfiguration.getDistributionNotifTopicName(), this.envName); - logger.debug("Going to handle topic {}", notificationTopic); - if (!createNotificationTopicIfNotExists(topicsList, notificationTopic)) { - return false; - } - CambriaErrorResponse registerProducerStatus = registerToTopic(notificationTopic, SubscriberTypeEnum.PRODUCER); - CambriaOperationStatus createStatus = registerProducerStatus.getOperationStatus(); - if (createStatus != CambriaOperationStatus.OK) { - return false; - } - String statusTopic = buildTopicName(deConfiguration.getDistributionStatusTopicName(), this.envName); - logger.debug("Going to handle topic {}", statusTopic); - if (!createStatusTopicIfNotExists(topicsList, statusTopic)) { - return false; - } - CambriaErrorResponse registerConcumerStatus = registerToTopic(statusTopic, SubscriberTypeEnum.CONSUMER); - return registerConcumerStatus.getOperationStatus() == CambriaOperationStatus.OK; } private CambriaErrorResponse registerToTopic(String topicName, SubscriberTypeEnum subscriberType) { @@ -281,4 +288,8 @@ public class DistributionEngineInitTask implements Runnable { protected void setCambriaHandler(CambriaHandler cambriaHandler) { this.cambriaHandler = cambriaHandler; } + + protected void setKafkaHandler(KafkaHandler kafkaHandler) { + this.kafkaHandler = kafkaHandler; + } } diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEnginePollingTask.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEnginePollingTask.java index 124671086f..ab4400a5bf 100644 --- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEnginePollingTask.java +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEnginePollingTask.java @@ -29,6 +29,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.openecomp.sdc.be.components.distribution.engine.report.DistributionCompleteReporter; +import org.openecomp.sdc.be.components.kafka.KafkaHandler; import org.openecomp.sdc.be.config.BeEcompErrorManager; import org.openecomp.sdc.be.config.DistributionEngineConfiguration; import org.openecomp.sdc.be.config.DistributionEngineConfiguration.DistributionStatusTopicConfig; @@ -51,6 +52,7 @@ public class DistributionEnginePollingTask implements Runnable { private String consumerId; private String consumerGroup; private CambriaHandler cambriaHandler = new CambriaHandler(); + private final KafkaHandler kafkaHandler = new KafkaHandler(); private Gson gson = new GsonBuilder().setPrettyPrinting().create(); private DistributionCompleteReporter distributionCompleteReporter; private ScheduledExecutorService scheduledPollingService = Executors @@ -82,9 +84,12 @@ public class DistributionEnginePollingTask implements Runnable { fetchTimeoutInSec = 15; } try { - cambriaConsumer = cambriaHandler - .createConsumer(environmentEntry.getDmaapUebAddress(), topicName, environmentEntry.getUebApikey(), environmentEntry.getUebSecretKey(), - consumerId, consumerGroup, fetchTimeoutInSec * 1000); + if (Boolean.FALSE.equals(kafkaHandler.isKafkaActive())) { + cambriaConsumer = cambriaHandler + .createConsumer(environmentEntry.getDmaapUebAddress(), topicName, environmentEntry.getUebApikey(), + environmentEntry.getUebSecretKey(), + consumerId, consumerGroup, fetchTimeoutInSec * 1000); + } if (scheduledPollingService != null) { logger.debug("Start Distribution Engine polling task. polling interval {} seconds", pollingIntervalInSec); scheduledFuture = scheduledPollingService.scheduleAtFixedRate(this, 0, pollingIntervalInSec, TimeUnit.SECONDS); @@ -119,14 +124,20 @@ public class DistributionEnginePollingTask implements Runnable { @Override public void run() { logger.trace("run() method. polling queue {}", topicName); + Either<Iterable<String>, CambriaErrorResponse> fetchResult; try { // init error - if (cambriaConsumer == null) { - BeEcompErrorManager.getInstance().logBeUebSystemError(DISTRIBUTION_STATUS_POLLING, "polling task was not initialized properly"); - stopTask(); - return; + if (Boolean.FALSE.equals(kafkaHandler.isKafkaActive())) { + if (cambriaConsumer == null) { + BeEcompErrorManager.getInstance() + .logBeUebSystemError(DISTRIBUTION_STATUS_POLLING, "polling task was not initialized properly"); + stopTask(); + return; + } + fetchResult = cambriaHandler.fetchFromTopic(cambriaConsumer); + } else { + fetchResult = kafkaHandler.fetchFromTopic(topicName); } - Either<Iterable<String>, CambriaErrorResponse> fetchResult = cambriaHandler.fetchFromTopic(cambriaConsumer); // fetch error if (fetchResult.isRight()) { CambriaErrorResponse errorResponse = fetchResult.right().value(); diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionNotificationSender.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionNotificationSender.java index 0098eac7d9..b93d485bdb 100644 --- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionNotificationSender.java +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionNotificationSender.java @@ -19,6 +19,7 @@ */ package org.openecomp.sdc.be.components.distribution.engine; +import org.openecomp.sdc.be.components.kafka.KafkaHandler; import org.openecomp.sdc.be.config.ConfigurationManager; import org.openecomp.sdc.be.config.DistributionEngineConfiguration; import org.openecomp.sdc.be.dao.api.ActionStatus; @@ -36,16 +37,26 @@ public class DistributionNotificationSender { private static final Logger logger = Logger.getLogger(DistributionNotificationSender.class.getName()); @javax.annotation.Resource protected ComponentsUtils componentUtils; - private CambriaHandler cambriaHandler = new CambriaHandler(); - private DistributionEngineConfiguration deConfiguration = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration(); + private final CambriaHandler cambriaHandler = new CambriaHandler(); + + private final KafkaHandler kafkaHandler = new KafkaHandler(); + +// + private final DistributionEngineConfiguration deConfiguration = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration(); public ActionStatus sendNotification(String topicName, String distributionId, EnvironmentMessageBusData messageBusData, INotificationData notificationData, Service service, User modifier) { long startTime = System.currentTimeMillis(); - CambriaErrorResponse status = cambriaHandler - .sendNotificationAndClose(topicName, messageBusData.getUebPublicKey(), messageBusData.getUebPrivateKey(), - messageBusData.getDmaaPuebEndpoints(), notificationData, - deConfiguration.getDistributionNotificationTopic().getMaxWaitingAfterSendingSeconds()); + CambriaErrorResponse status; + if (Boolean.FALSE.equals(kafkaHandler.isKafkaActive())) { + status = cambriaHandler + .sendNotificationAndClose(topicName, messageBusData.getUebPublicKey(), messageBusData.getUebPrivateKey(), + messageBusData.getDmaaPuebEndpoints(), notificationData, + deConfiguration.getDistributionNotificationTopic().getMaxWaitingAfterSendingSeconds()); + } + else{ + status = kafkaHandler.sendNotification(topicName, notificationData); + } logger.info("After publishing service {} of version {}. Status is {}", service.getName(), service.getVersion(), status.getHttpCode()); auditDistributionNotification( new AuditDistributionNotificationBuilder().setTopicName(topicName).setDistributionId(distributionId).setStatus(status).setService(service) diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/KafkaHandler.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/KafkaHandler.java new file mode 100644 index 0000000000..2a5590e72d --- /dev/null +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/KafkaHandler.java @@ -0,0 +1,138 @@ +/*- + * ============LICENSE_START======================================================= + * SDC + * ================================================================================ + * Copyright (C) 2022 Nordix Foundation. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.openecomp.sdc.be.components.kafka; + +import com.google.gson.Gson; +import com.google.gson.JsonSyntaxException; +import fj.data.Either; +import lombok.Getter; +import lombok.Setter; +import org.apache.http.HttpStatus; +import org.apache.kafka.common.KafkaException; +import org.openecomp.sdc.be.components.distribution.engine.CambriaErrorResponse; +import org.openecomp.sdc.be.components.distribution.engine.INotificationData; +import org.openecomp.sdc.be.config.BeEcompErrorManager; +import org.openecomp.sdc.be.config.ConfigurationManager; +import org.openecomp.sdc.be.config.DistributionEngineConfiguration; +import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus; +import org.openecomp.sdc.common.log.wrappers.Logger; +import org.springframework.stereotype.Component; + +/** + * Utility class that provides a handler for Kafka interactions + */ +@Component +public class KafkaHandler { + + private static final Logger log = Logger.getLogger(KafkaHandler.class.getName()); + private final Gson gson = new Gson(); + + private SdcKafkaConsumer sdcKafkaConsumer; + + private SdcKafkaProducer sdcKafkaProducer; + + @Setter + private boolean isKafkaActive; + + private DistributionEngineConfiguration deConfiguration; + + public KafkaHandler(SdcKafkaConsumer sdcKafkaConsumer, SdcKafkaProducer sdcKafkaProducer, boolean isKafkaActive) { + this.sdcKafkaConsumer = sdcKafkaConsumer; + this.sdcKafkaProducer = sdcKafkaProducer; + this.isKafkaActive = isKafkaActive; + } + + public KafkaHandler() { + isKafkaActive = Boolean.parseBoolean(System.getenv().getOrDefault("USE_KAFKA", "false")); + deConfiguration = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration(); + } + + /** + * @return a user configuration whether Kafka is active for this client + */ + public Boolean isKafkaActive() { + return isKafkaActive; + } + + /** + * @param topicName The topic from which messages will be fetched + * @return Either A list of messages from a specific topic, or a specific error response + */ + public Either<Iterable<String>, CambriaErrorResponse> fetchFromTopic(String topicName) { + try { + if(sdcKafkaConsumer == null){ + sdcKafkaConsumer = new SdcKafkaConsumer(deConfiguration); + } + sdcKafkaConsumer.subscribe(topicName); + Iterable<String> messages = sdcKafkaConsumer.poll(topicName); + log.info("Returning messages from topic {}", topicName); + return Either.left(messages); + } catch (KafkaException e) { + BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError("fetchFromTopic", e.getMessage()); + log.error("Failed to fetch from kafka for topic: {}", topicName, e); + CambriaErrorResponse cambriaErrorResponse = + new CambriaErrorResponse(CambriaOperationStatus.INTERNAL_SERVER_ERROR, + HttpStatus.SC_INTERNAL_SERVER_ERROR); + return Either.right(cambriaErrorResponse); + } + } + + /** + * Publish notification message to a given topic and flush + * + * @param topicName The topic to which the message should be published + * @param data The data to publish to the topic specified + * @return CambriaErrorResponse a status response on success or any errors thrown + */ + public CambriaErrorResponse sendNotification(String topicName, INotificationData data) { + CambriaErrorResponse response; + if(sdcKafkaProducer == null){ + sdcKafkaProducer = new SdcKafkaProducer(deConfiguration); + } + try { + String json = gson.toJson(data); + log.info("Before sending to topic {}", topicName); + sdcKafkaProducer.send(json, topicName); + } + catch(KafkaException e){ + BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError("sendNotification", e.getMessage()); + log.error("Failed to send message . Exception {}", e.getMessage()); + + return new CambriaErrorResponse(CambriaOperationStatus.INTERNAL_SERVER_ERROR, 500); + } catch (JsonSyntaxException e) { + BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError("sendNotification", e.getMessage()); + log.error("Failed to convert data to json: {}", data, e); + + return new CambriaErrorResponse(CambriaOperationStatus.INTERNAL_SERVER_ERROR, 500); + } finally { + try { + sdcKafkaProducer.flush(); + response = new CambriaErrorResponse(CambriaOperationStatus.OK, 200); + } catch (KafkaException | IllegalArgumentException e) { + BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError("sendNotification", e.getMessage()); + log.error("Failed to flush sdcKafkaProducer", e); + + response = new CambriaErrorResponse(CambriaOperationStatus.INTERNAL_SERVER_ERROR, 500); + } + } + + return response; + } +} diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaConsumer.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaConsumer.java new file mode 100644 index 0000000000..8879bf000e --- /dev/null +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaConsumer.java @@ -0,0 +1,117 @@ +/*- + * ============LICENSE_START======================================================= + * SDC + * ================================================================================ + * Copyright (C) 2022 Nordix Foundation. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.openecomp.sdc.be.components.kafka; + +import com.google.common.annotations.VisibleForTesting; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.UUID; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.config.SaslConfigs; +import org.openecomp.sdc.be.config.DistributionEngineConfiguration; +import org.openecomp.sdc.common.log.wrappers.Logger; + +/** + * Utility class that provides a KafkaConsumer to communicate with a kafka cluster + */ +public class SdcKafkaConsumer { + + private static final Logger log = Logger.getLogger(SdcKafkaConsumer.class.getName()); + private final DistributionEngineConfiguration deConfiguration; + private KafkaConsumer<String, String> kafkaConsumer; + + /** + * Constructor setting up the KafkaConsumer from a predefined set of configurations + */ + public SdcKafkaConsumer(DistributionEngineConfiguration deConfiguration){ + log.info("Create SdcKafkaConsumer via constructor"); + Properties properties = new Properties(); + this.deConfiguration = deConfiguration; + + properties.put(ConsumerConfig.CLIENT_ID_CONFIG, deConfiguration.getDistributionStatusTopic().getConsumerId()+ "-consumer-" + UUID.randomUUID()); + properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); + properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); + properties.put(ConsumerConfig.GROUP_ID_CONFIG, deConfiguration.getDistributionStatusTopic().getConsumerGroup()); + properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, deConfiguration.getKafkaBootStrapServers()); + properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); + properties.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, false); + properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); + properties.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512"); + + properties.put(SaslConfigs.SASL_JAAS_CONFIG, getKafkaSaslJaasConfig()); + kafkaConsumer = new KafkaConsumer<>(properties); + } + + /** + * + * @param kafkaConsumer a kafkaConsumer to use within the class + * @param deConfiguration - Configuration to pass into the class + */ + @VisibleForTesting + SdcKafkaConsumer(KafkaConsumer kafkaConsumer, DistributionEngineConfiguration deConfiguration){ + this.deConfiguration = deConfiguration; + this.kafkaConsumer = kafkaConsumer; + } + + /** + * + * @return the Sasl Jass Config + */ + private String getKafkaSaslJaasConfig() { + String saslJaasConfFromEnv = System.getenv("SASL_JAAS_CONFIG"); + if(saslJaasConfFromEnv != null) { + return saslJaasConfFromEnv; + } else { + throw new KafkaException("sasl.jaas.config not set for Kafka Consumer"); + } + } + + /** + * + * @param topic Topic in which to subscribe + */ + public void subscribe(String topic) throws KafkaException { + if (!kafkaConsumer.subscription().contains(topic)) { + kafkaConsumer.subscribe(Collections.singleton(topic)); + } + } + + /** + * + * @return The list of messages for a specified topic, returned from the poll + */ + public List<String> poll(String topicName) throws KafkaException { + log.info("SdcKafkaConsumer - polling for messages from Topic: {}", topicName); + List<String> msgs = new ArrayList<>(); + ConsumerRecords<String, String> consumerRecordsForSpecificTopic = kafkaConsumer.poll(Duration.ofSeconds(deConfiguration.getDistributionStatusTopic().getPollingIntervalSec())); + for(ConsumerRecord<String, String> rec : consumerRecordsForSpecificTopic){ + msgs.add(rec.value()); + } + return msgs; + } +} diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaProducer.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaProducer.java new file mode 100644 index 0000000000..bdc984d7b5 --- /dev/null +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaProducer.java @@ -0,0 +1,98 @@ +/*- + * ============LICENSE_START======================================================= + * SDC + * ================================================================================ + * Copyright (C) 2022 Nordix Foundation. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.openecomp.sdc.be.components.kafka; + +import com.google.common.annotations.VisibleForTesting; +import java.util.Properties; +import java.util.UUID; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.config.SaslConfigs; +import org.openecomp.sdc.be.config.DistributionEngineConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Utility class that provides a KafkaProducer to communicate with a kafka cluster + */ +public class SdcKafkaProducer { + private static final Logger log = LoggerFactory.getLogger(SdcKafkaProducer.class.getName()); + + private KafkaProducer<String, String> kafkaProducer; + + /** + * Constructor setting up the KafkaProducer from a predefined set of configurations + */ + public SdcKafkaProducer(DistributionEngineConfiguration deConfiguration) { + log.info("Create SdcKafkaProducer via constructor"); + Properties properties = new Properties(); + + properties.put(ProducerConfig.CLIENT_ID_CONFIG, deConfiguration.getDistributionStatusTopic().getConsumerId() + "-producer-" + UUID.randomUUID()); + properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, deConfiguration.getKafkaBootStrapServers()); + properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); + properties.put(SaslConfigs.SASL_JAAS_CONFIG, getKafkaSaslJaasConfig()); + properties.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512"); + kafkaProducer = new KafkaProducer<>(properties); + } + + /** + * + * @param kafkaProducer Setting a KafkaProducer to use within the sdcKafkaProducer class + */ + @VisibleForTesting + SdcKafkaProducer(KafkaProducer kafkaProducer) { + this.kafkaProducer = kafkaProducer; + } + + /** + * @return The Sasl Jaas Configuration + */ + private static String getKafkaSaslJaasConfig() throws KafkaException { + String saslJaasConfFromEnv = System.getenv("SASL_JAAS_CONFIG"); + if(saslJaasConfFromEnv != null) { + return saslJaasConfFromEnv; + } else { + throw new KafkaException("sasl.jaas.config not set for Kafka Consumer"); + } + } + + /** + * @param message A message to Send + * @param topicName The name of the topic to publish to + * @return The status of the send request + */ + public void send(String message, String topicName) throws KafkaException { + ProducerRecord<String, String> kafkaMessagePayload = new ProducerRecord<>(topicName, "PartitionKey", message); + kafkaProducer.send(kafkaMessagePayload); + } + + /** + * Kafka FLush operation + */ + public void flush() throws KafkaException { + log.info("SdcKafkaProducer - flush"); + kafkaProducer.flush(); + } +} |