diff options
author | david.mcweeney <david.mcweeney@est.tech> | 2022-10-04 15:46:14 +0100 |
---|---|---|
committer | Michael Morris <michael.morris@est.tech> | 2022-10-25 11:24:02 +0000 |
commit | 47f96dd966663f7f46b719451c0752721a2940a3 (patch) | |
tree | 9d875ce43f96cf3e570cc812d907fa2edd3b7945 /catalog-be | |
parent | 0d2e96c125aab4e4edfc0a8b897353c0fabdd885 (diff) |
[SDC] Add kafka native messaging
Change-Id: I5ab8f580947cbc264d94bec48a5e8b659dc44c08
Issue-ID: DMAAP-1787
Signed-off-by: david.mcweeney <david.mcweeney@est.tech>
Diffstat (limited to 'catalog-be')
12 files changed, 857 insertions, 74 deletions
diff --git a/catalog-be/pom.xml b/catalog-be/pom.xml index 7b060c9bfa..edd673c54c 100644 --- a/catalog-be/pom.xml +++ b/catalog-be/pom.xml @@ -1043,6 +1043,10 @@ </exclusion> </exclusions> </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-clients</artifactId> + </dependency> </dependencies> diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTask.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTask.java index d61e15016a..00d3fedfc8 100644 --- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTask.java +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTask.java @@ -20,6 +20,7 @@ package org.openecomp.sdc.be.components.distribution.engine; import fj.data.Either; +import java.util.ArrayList; import java.util.HashSet; import java.util.Set; import java.util.concurrent.Executors; @@ -27,7 +28,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.stream.Collectors; +import org.openecomp.sdc.be.components.kafka.KafkaHandler; import org.openecomp.sdc.be.config.BeEcompErrorManager; import org.openecomp.sdc.be.config.DistributionEngineConfiguration; import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus; @@ -60,6 +61,7 @@ public class DistributionEngineInitTask implements Runnable { private AtomicBoolean status = null; private OperationalEnvironmentEntry environmentEntry; private CambriaHandler cambriaHandler = new CambriaHandler(); + private KafkaHandler kafkaHandler = new KafkaHandler(); private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1); public DistributionEngineInitTask(Long delayBeforeStartFlow, DistributionEngineConfiguration deConfiguration, String envName, @@ -136,9 +138,7 @@ public class DistributionEngineInitTask implements Runnable { @Override public void run() { - boolean result = false; - result = initFlow(); - if (result) { + if (initFlow()) { this.stopTask(); this.status.set(true); if (this.distributionEnginePollingTask != null) { @@ -159,38 +159,45 @@ public class DistributionEngineInitTask implements Runnable { * @return */ public boolean initFlow() { - logger.trace("Start init flow for environment {}", this.envName); - Set<String> topicsList = null; - Either<Set<String>, CambriaErrorResponse> getTopicsRes = null; - getTopicsRes = cambriaHandler.getTopics(environmentEntry.getDmaapUebAddress().stream().collect(Collectors.toList())); - if (getTopicsRes.isRight()) { - CambriaErrorResponse status = getTopicsRes.right().value(); - if (status.getOperationStatus() == CambriaOperationStatus.NOT_FOUND) { - topicsList = new HashSet<>(); + logger.info("Start init flow for environment {}", this.envName); + if (Boolean.FALSE.equals(kafkaHandler.isKafkaActive())) { + Set<String> topicsList; + Either<Set<String>, CambriaErrorResponse> getTopicsRes; + getTopicsRes = cambriaHandler.getTopics(new ArrayList<>(environmentEntry.getDmaapUebAddress())); + if (getTopicsRes.isRight()) { + CambriaErrorResponse cambriaErrorResponse = getTopicsRes.right().value(); + if (cambriaErrorResponse.getOperationStatus() == CambriaOperationStatus.NOT_FOUND) { + topicsList = new HashSet<>(); + } else { + BeEcompErrorManager.getInstance().logBeUebSystemError(INIT_DISTRIBUTION_ENGINE_FLOW, + "try retrieve list of topics from U-EB server"); + return false; + } } else { - BeEcompErrorManager.getInstance().logBeUebSystemError(INIT_DISTRIBUTION_ENGINE_FLOW, "try retrieve list of topics from U-EB server"); + topicsList = getTopicsRes.left().value(); + } + String notificationTopic = buildTopicName(deConfiguration.getDistributionNotifTopicName(), this.envName); + logger.debug("Going to handle topic {}", notificationTopic); + if (!createNotificationTopicIfNotExists(topicsList, notificationTopic)) { + return false; + } + CambriaErrorResponse registerProducerStatus = registerToTopic(notificationTopic, + SubscriberTypeEnum.PRODUCER); + CambriaOperationStatus createStatus = registerProducerStatus.getOperationStatus(); + if (createStatus != CambriaOperationStatus.OK) { + return false; + } + String statusTopic = buildTopicName(deConfiguration.getDistributionStatusTopicName(), this.envName); + logger.debug("Going to handle topic {}", statusTopic); + if (!createStatusTopicIfNotExists(topicsList, statusTopic)) { return false; } + CambriaErrorResponse registerConcumerStatus = registerToTopic(statusTopic, SubscriberTypeEnum.CONSUMER); + return registerConcumerStatus.getOperationStatus() == CambriaOperationStatus.OK; } else { - topicsList = getTopicsRes.left().value(); + logger.info("Skipping DisributionEngineInitTask flow to use kafka native for distribution messaging"); + return true; } - String notificationTopic = buildTopicName(deConfiguration.getDistributionNotifTopicName(), this.envName); - logger.debug("Going to handle topic {}", notificationTopic); - if (!createNotificationTopicIfNotExists(topicsList, notificationTopic)) { - return false; - } - CambriaErrorResponse registerProducerStatus = registerToTopic(notificationTopic, SubscriberTypeEnum.PRODUCER); - CambriaOperationStatus createStatus = registerProducerStatus.getOperationStatus(); - if (createStatus != CambriaOperationStatus.OK) { - return false; - } - String statusTopic = buildTopicName(deConfiguration.getDistributionStatusTopicName(), this.envName); - logger.debug("Going to handle topic {}", statusTopic); - if (!createStatusTopicIfNotExists(topicsList, statusTopic)) { - return false; - } - CambriaErrorResponse registerConcumerStatus = registerToTopic(statusTopic, SubscriberTypeEnum.CONSUMER); - return registerConcumerStatus.getOperationStatus() == CambriaOperationStatus.OK; } private CambriaErrorResponse registerToTopic(String topicName, SubscriberTypeEnum subscriberType) { @@ -281,4 +288,8 @@ public class DistributionEngineInitTask implements Runnable { protected void setCambriaHandler(CambriaHandler cambriaHandler) { this.cambriaHandler = cambriaHandler; } + + protected void setKafkaHandler(KafkaHandler kafkaHandler) { + this.kafkaHandler = kafkaHandler; + } } diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEnginePollingTask.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEnginePollingTask.java index 124671086f..ab4400a5bf 100644 --- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEnginePollingTask.java +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEnginePollingTask.java @@ -29,6 +29,7 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.openecomp.sdc.be.components.distribution.engine.report.DistributionCompleteReporter; +import org.openecomp.sdc.be.components.kafka.KafkaHandler; import org.openecomp.sdc.be.config.BeEcompErrorManager; import org.openecomp.sdc.be.config.DistributionEngineConfiguration; import org.openecomp.sdc.be.config.DistributionEngineConfiguration.DistributionStatusTopicConfig; @@ -51,6 +52,7 @@ public class DistributionEnginePollingTask implements Runnable { private String consumerId; private String consumerGroup; private CambriaHandler cambriaHandler = new CambriaHandler(); + private final KafkaHandler kafkaHandler = new KafkaHandler(); private Gson gson = new GsonBuilder().setPrettyPrinting().create(); private DistributionCompleteReporter distributionCompleteReporter; private ScheduledExecutorService scheduledPollingService = Executors @@ -82,9 +84,12 @@ public class DistributionEnginePollingTask implements Runnable { fetchTimeoutInSec = 15; } try { - cambriaConsumer = cambriaHandler - .createConsumer(environmentEntry.getDmaapUebAddress(), topicName, environmentEntry.getUebApikey(), environmentEntry.getUebSecretKey(), - consumerId, consumerGroup, fetchTimeoutInSec * 1000); + if (Boolean.FALSE.equals(kafkaHandler.isKafkaActive())) { + cambriaConsumer = cambriaHandler + .createConsumer(environmentEntry.getDmaapUebAddress(), topicName, environmentEntry.getUebApikey(), + environmentEntry.getUebSecretKey(), + consumerId, consumerGroup, fetchTimeoutInSec * 1000); + } if (scheduledPollingService != null) { logger.debug("Start Distribution Engine polling task. polling interval {} seconds", pollingIntervalInSec); scheduledFuture = scheduledPollingService.scheduleAtFixedRate(this, 0, pollingIntervalInSec, TimeUnit.SECONDS); @@ -119,14 +124,20 @@ public class DistributionEnginePollingTask implements Runnable { @Override public void run() { logger.trace("run() method. polling queue {}", topicName); + Either<Iterable<String>, CambriaErrorResponse> fetchResult; try { // init error - if (cambriaConsumer == null) { - BeEcompErrorManager.getInstance().logBeUebSystemError(DISTRIBUTION_STATUS_POLLING, "polling task was not initialized properly"); - stopTask(); - return; + if (Boolean.FALSE.equals(kafkaHandler.isKafkaActive())) { + if (cambriaConsumer == null) { + BeEcompErrorManager.getInstance() + .logBeUebSystemError(DISTRIBUTION_STATUS_POLLING, "polling task was not initialized properly"); + stopTask(); + return; + } + fetchResult = cambriaHandler.fetchFromTopic(cambriaConsumer); + } else { + fetchResult = kafkaHandler.fetchFromTopic(topicName); } - Either<Iterable<String>, CambriaErrorResponse> fetchResult = cambriaHandler.fetchFromTopic(cambriaConsumer); // fetch error if (fetchResult.isRight()) { CambriaErrorResponse errorResponse = fetchResult.right().value(); diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionNotificationSender.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionNotificationSender.java index 0098eac7d9..b93d485bdb 100644 --- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionNotificationSender.java +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/distribution/engine/DistributionNotificationSender.java @@ -19,6 +19,7 @@ */ package org.openecomp.sdc.be.components.distribution.engine; +import org.openecomp.sdc.be.components.kafka.KafkaHandler; import org.openecomp.sdc.be.config.ConfigurationManager; import org.openecomp.sdc.be.config.DistributionEngineConfiguration; import org.openecomp.sdc.be.dao.api.ActionStatus; @@ -36,16 +37,26 @@ public class DistributionNotificationSender { private static final Logger logger = Logger.getLogger(DistributionNotificationSender.class.getName()); @javax.annotation.Resource protected ComponentsUtils componentUtils; - private CambriaHandler cambriaHandler = new CambriaHandler(); - private DistributionEngineConfiguration deConfiguration = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration(); + private final CambriaHandler cambriaHandler = new CambriaHandler(); + + private final KafkaHandler kafkaHandler = new KafkaHandler(); + +// + private final DistributionEngineConfiguration deConfiguration = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration(); public ActionStatus sendNotification(String topicName, String distributionId, EnvironmentMessageBusData messageBusData, INotificationData notificationData, Service service, User modifier) { long startTime = System.currentTimeMillis(); - CambriaErrorResponse status = cambriaHandler - .sendNotificationAndClose(topicName, messageBusData.getUebPublicKey(), messageBusData.getUebPrivateKey(), - messageBusData.getDmaaPuebEndpoints(), notificationData, - deConfiguration.getDistributionNotificationTopic().getMaxWaitingAfterSendingSeconds()); + CambriaErrorResponse status; + if (Boolean.FALSE.equals(kafkaHandler.isKafkaActive())) { + status = cambriaHandler + .sendNotificationAndClose(topicName, messageBusData.getUebPublicKey(), messageBusData.getUebPrivateKey(), + messageBusData.getDmaaPuebEndpoints(), notificationData, + deConfiguration.getDistributionNotificationTopic().getMaxWaitingAfterSendingSeconds()); + } + else{ + status = kafkaHandler.sendNotification(topicName, notificationData); + } logger.info("After publishing service {} of version {}. Status is {}", service.getName(), service.getVersion(), status.getHttpCode()); auditDistributionNotification( new AuditDistributionNotificationBuilder().setTopicName(topicName).setDistributionId(distributionId).setStatus(status).setService(service) diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/KafkaHandler.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/KafkaHandler.java new file mode 100644 index 0000000000..2a5590e72d --- /dev/null +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/KafkaHandler.java @@ -0,0 +1,138 @@ +/*- + * ============LICENSE_START======================================================= + * SDC + * ================================================================================ + * Copyright (C) 2022 Nordix Foundation. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.openecomp.sdc.be.components.kafka; + +import com.google.gson.Gson; +import com.google.gson.JsonSyntaxException; +import fj.data.Either; +import lombok.Getter; +import lombok.Setter; +import org.apache.http.HttpStatus; +import org.apache.kafka.common.KafkaException; +import org.openecomp.sdc.be.components.distribution.engine.CambriaErrorResponse; +import org.openecomp.sdc.be.components.distribution.engine.INotificationData; +import org.openecomp.sdc.be.config.BeEcompErrorManager; +import org.openecomp.sdc.be.config.ConfigurationManager; +import org.openecomp.sdc.be.config.DistributionEngineConfiguration; +import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus; +import org.openecomp.sdc.common.log.wrappers.Logger; +import org.springframework.stereotype.Component; + +/** + * Utility class that provides a handler for Kafka interactions + */ +@Component +public class KafkaHandler { + + private static final Logger log = Logger.getLogger(KafkaHandler.class.getName()); + private final Gson gson = new Gson(); + + private SdcKafkaConsumer sdcKafkaConsumer; + + private SdcKafkaProducer sdcKafkaProducer; + + @Setter + private boolean isKafkaActive; + + private DistributionEngineConfiguration deConfiguration; + + public KafkaHandler(SdcKafkaConsumer sdcKafkaConsumer, SdcKafkaProducer sdcKafkaProducer, boolean isKafkaActive) { + this.sdcKafkaConsumer = sdcKafkaConsumer; + this.sdcKafkaProducer = sdcKafkaProducer; + this.isKafkaActive = isKafkaActive; + } + + public KafkaHandler() { + isKafkaActive = Boolean.parseBoolean(System.getenv().getOrDefault("USE_KAFKA", "false")); + deConfiguration = ConfigurationManager.getConfigurationManager().getDistributionEngineConfiguration(); + } + + /** + * @return a user configuration whether Kafka is active for this client + */ + public Boolean isKafkaActive() { + return isKafkaActive; + } + + /** + * @param topicName The topic from which messages will be fetched + * @return Either A list of messages from a specific topic, or a specific error response + */ + public Either<Iterable<String>, CambriaErrorResponse> fetchFromTopic(String topicName) { + try { + if(sdcKafkaConsumer == null){ + sdcKafkaConsumer = new SdcKafkaConsumer(deConfiguration); + } + sdcKafkaConsumer.subscribe(topicName); + Iterable<String> messages = sdcKafkaConsumer.poll(topicName); + log.info("Returning messages from topic {}", topicName); + return Either.left(messages); + } catch (KafkaException e) { + BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError("fetchFromTopic", e.getMessage()); + log.error("Failed to fetch from kafka for topic: {}", topicName, e); + CambriaErrorResponse cambriaErrorResponse = + new CambriaErrorResponse(CambriaOperationStatus.INTERNAL_SERVER_ERROR, + HttpStatus.SC_INTERNAL_SERVER_ERROR); + return Either.right(cambriaErrorResponse); + } + } + + /** + * Publish notification message to a given topic and flush + * + * @param topicName The topic to which the message should be published + * @param data The data to publish to the topic specified + * @return CambriaErrorResponse a status response on success or any errors thrown + */ + public CambriaErrorResponse sendNotification(String topicName, INotificationData data) { + CambriaErrorResponse response; + if(sdcKafkaProducer == null){ + sdcKafkaProducer = new SdcKafkaProducer(deConfiguration); + } + try { + String json = gson.toJson(data); + log.info("Before sending to topic {}", topicName); + sdcKafkaProducer.send(json, topicName); + } + catch(KafkaException e){ + BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError("sendNotification", e.getMessage()); + log.error("Failed to send message . Exception {}", e.getMessage()); + + return new CambriaErrorResponse(CambriaOperationStatus.INTERNAL_SERVER_ERROR, 500); + } catch (JsonSyntaxException e) { + BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError("sendNotification", e.getMessage()); + log.error("Failed to convert data to json: {}", data, e); + + return new CambriaErrorResponse(CambriaOperationStatus.INTERNAL_SERVER_ERROR, 500); + } finally { + try { + sdcKafkaProducer.flush(); + response = new CambriaErrorResponse(CambriaOperationStatus.OK, 200); + } catch (KafkaException | IllegalArgumentException e) { + BeEcompErrorManager.getInstance().logBeDistributionEngineSystemError("sendNotification", e.getMessage()); + log.error("Failed to flush sdcKafkaProducer", e); + + response = new CambriaErrorResponse(CambriaOperationStatus.INTERNAL_SERVER_ERROR, 500); + } + } + + return response; + } +} diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaConsumer.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaConsumer.java new file mode 100644 index 0000000000..8879bf000e --- /dev/null +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaConsumer.java @@ -0,0 +1,117 @@ +/*- + * ============LICENSE_START======================================================= + * SDC + * ================================================================================ + * Copyright (C) 2022 Nordix Foundation. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.openecomp.sdc.be.components.kafka; + +import com.google.common.annotations.VisibleForTesting; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.UUID; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.config.SaslConfigs; +import org.openecomp.sdc.be.config.DistributionEngineConfiguration; +import org.openecomp.sdc.common.log.wrappers.Logger; + +/** + * Utility class that provides a KafkaConsumer to communicate with a kafka cluster + */ +public class SdcKafkaConsumer { + + private static final Logger log = Logger.getLogger(SdcKafkaConsumer.class.getName()); + private final DistributionEngineConfiguration deConfiguration; + private KafkaConsumer<String, String> kafkaConsumer; + + /** + * Constructor setting up the KafkaConsumer from a predefined set of configurations + */ + public SdcKafkaConsumer(DistributionEngineConfiguration deConfiguration){ + log.info("Create SdcKafkaConsumer via constructor"); + Properties properties = new Properties(); + this.deConfiguration = deConfiguration; + + properties.put(ConsumerConfig.CLIENT_ID_CONFIG, deConfiguration.getDistributionStatusTopic().getConsumerId()+ "-consumer-" + UUID.randomUUID()); + properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); + properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); + properties.put(ConsumerConfig.GROUP_ID_CONFIG, deConfiguration.getDistributionStatusTopic().getConsumerGroup()); + properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, deConfiguration.getKafkaBootStrapServers()); + properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); + properties.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, false); + properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); + properties.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512"); + + properties.put(SaslConfigs.SASL_JAAS_CONFIG, getKafkaSaslJaasConfig()); + kafkaConsumer = new KafkaConsumer<>(properties); + } + + /** + * + * @param kafkaConsumer a kafkaConsumer to use within the class + * @param deConfiguration - Configuration to pass into the class + */ + @VisibleForTesting + SdcKafkaConsumer(KafkaConsumer kafkaConsumer, DistributionEngineConfiguration deConfiguration){ + this.deConfiguration = deConfiguration; + this.kafkaConsumer = kafkaConsumer; + } + + /** + * + * @return the Sasl Jass Config + */ + private String getKafkaSaslJaasConfig() { + String saslJaasConfFromEnv = System.getenv("SASL_JAAS_CONFIG"); + if(saslJaasConfFromEnv != null) { + return saslJaasConfFromEnv; + } else { + throw new KafkaException("sasl.jaas.config not set for Kafka Consumer"); + } + } + + /** + * + * @param topic Topic in which to subscribe + */ + public void subscribe(String topic) throws KafkaException { + if (!kafkaConsumer.subscription().contains(topic)) { + kafkaConsumer.subscribe(Collections.singleton(topic)); + } + } + + /** + * + * @return The list of messages for a specified topic, returned from the poll + */ + public List<String> poll(String topicName) throws KafkaException { + log.info("SdcKafkaConsumer - polling for messages from Topic: {}", topicName); + List<String> msgs = new ArrayList<>(); + ConsumerRecords<String, String> consumerRecordsForSpecificTopic = kafkaConsumer.poll(Duration.ofSeconds(deConfiguration.getDistributionStatusTopic().getPollingIntervalSec())); + for(ConsumerRecord<String, String> rec : consumerRecordsForSpecificTopic){ + msgs.add(rec.value()); + } + return msgs; + } +} diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaProducer.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaProducer.java new file mode 100644 index 0000000000..bdc984d7b5 --- /dev/null +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaProducer.java @@ -0,0 +1,98 @@ +/*- + * ============LICENSE_START======================================================= + * SDC + * ================================================================================ + * Copyright (C) 2022 Nordix Foundation. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.openecomp.sdc.be.components.kafka; + +import com.google.common.annotations.VisibleForTesting; +import java.util.Properties; +import java.util.UUID; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.config.SaslConfigs; +import org.openecomp.sdc.be.config.DistributionEngineConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Utility class that provides a KafkaProducer to communicate with a kafka cluster + */ +public class SdcKafkaProducer { + private static final Logger log = LoggerFactory.getLogger(SdcKafkaProducer.class.getName()); + + private KafkaProducer<String, String> kafkaProducer; + + /** + * Constructor setting up the KafkaProducer from a predefined set of configurations + */ + public SdcKafkaProducer(DistributionEngineConfiguration deConfiguration) { + log.info("Create SdcKafkaProducer via constructor"); + Properties properties = new Properties(); + + properties.put(ProducerConfig.CLIENT_ID_CONFIG, deConfiguration.getDistributionStatusTopic().getConsumerId() + "-producer-" + UUID.randomUUID()); + properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, deConfiguration.getKafkaBootStrapServers()); + properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); + properties.put(SaslConfigs.SASL_JAAS_CONFIG, getKafkaSaslJaasConfig()); + properties.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512"); + kafkaProducer = new KafkaProducer<>(properties); + } + + /** + * + * @param kafkaProducer Setting a KafkaProducer to use within the sdcKafkaProducer class + */ + @VisibleForTesting + SdcKafkaProducer(KafkaProducer kafkaProducer) { + this.kafkaProducer = kafkaProducer; + } + + /** + * @return The Sasl Jaas Configuration + */ + private static String getKafkaSaslJaasConfig() throws KafkaException { + String saslJaasConfFromEnv = System.getenv("SASL_JAAS_CONFIG"); + if(saslJaasConfFromEnv != null) { + return saslJaasConfFromEnv; + } else { + throw new KafkaException("sasl.jaas.config not set for Kafka Consumer"); + } + } + + /** + * @param message A message to Send + * @param topicName The name of the topic to publish to + * @return The status of the send request + */ + public void send(String message, String topicName) throws KafkaException { + ProducerRecord<String, String> kafkaMessagePayload = new ProducerRecord<>(topicName, "PartitionKey", message); + kafkaProducer.send(kafkaMessagePayload); + } + + /** + * Kafka FLush operation + */ + public void flush() throws KafkaException { + log.info("SdcKafkaProducer - flush"); + kafkaProducer.flush(); + } +} diff --git a/catalog-be/src/test/java/org/openecomp/sdc/be/components/distribution/engine/CambriaHandlerTest.java b/catalog-be/src/test/java/org/openecomp/sdc/be/components/distribution/engine/CambriaHandlerTest.java index 980bb8369a..a91b246f40 100644 --- a/catalog-be/src/test/java/org/openecomp/sdc/be/components/distribution/engine/CambriaHandlerTest.java +++ b/catalog-be/src/test/java/org/openecomp/sdc/be/components/distribution/engine/CambriaHandlerTest.java @@ -7,9 +7,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -20,6 +20,11 @@ package org.openecomp.sdc.be.components.distribution.engine; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doReturn; + import com.att.nsa.apiClient.credentials.ApiCredential; import com.att.nsa.apiClient.http.HttpException; import com.att.nsa.cambria.client.CambriaClient; @@ -29,6 +34,14 @@ import com.att.nsa.cambria.client.CambriaClientBuilders.TopicManagerBuilder; import com.att.nsa.cambria.client.CambriaConsumer; import com.att.nsa.cambria.client.CambriaIdentityManager; import fj.data.Either; +import java.io.IOException; +import java.net.MalformedURLException; +import java.security.GeneralSecurityException; +import java.util.Arrays; +import java.util.Collection; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; import mockit.Deencapsulation; import org.junit.Before; import org.junit.BeforeClass; @@ -45,20 +58,6 @@ import org.openecomp.sdc.common.api.ConfigurationSource; import org.openecomp.sdc.common.impl.ExternalConfiguration; import org.openecomp.sdc.common.impl.FSConfigurationSource; -import java.io.IOException; -import java.net.MalformedURLException; -import java.security.GeneralSecurityException; -import java.util.Arrays; -import java.util.Collection; -import java.util.LinkedList; -import java.util.List; -import java.util.Set; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doReturn; - @RunWith(MockitoJUnitRunner.class) public class CambriaHandlerTest extends BeConfDependentTest { @@ -141,6 +140,7 @@ public class CambriaHandlerTest extends BeConfDependentTest { @Test public void testGetTopics() throws Exception { + CambriaHandler testSubject; List<String> hostSet = new LinkedList<>(); hostSet.add("mock"); diff --git a/catalog-be/src/test/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTaskTest.java b/catalog-be/src/test/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTaskTest.java index d53476db71..9c1af39d9c 100644 --- a/catalog-be/src/test/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTaskTest.java +++ b/catalog-be/src/test/java/org/openecomp/sdc/be/components/distribution/engine/DistributionEngineInitTaskTest.java @@ -7,9 +7,9 @@ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -25,6 +25,7 @@ import org.apache.commons.collections.CollectionUtils; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.Mockito; +import org.openecomp.sdc.be.components.kafka.KafkaHandler; import org.openecomp.sdc.be.config.ConfigurationManager; import org.openecomp.sdc.be.config.DistributionEngineConfiguration; import org.openecomp.sdc.be.config.DistributionEngineConfiguration.CreateTopicConfig; @@ -54,6 +55,8 @@ class DistributionEngineInitTaskTest { private CambriaHandler cambriaHandler; + private KafkaHandler kafkaHandler; + @BeforeEach public void setup() { ExternalConfiguration.setAppName("catalog-be"); @@ -65,6 +68,7 @@ class DistributionEngineInitTaskTest { componentsUtils = Mockito.mock(ComponentsUtils.class); cambriaHandler = Mockito.mock(CambriaHandler.class); + kafkaHandler = Mockito.mock(KafkaHandler.class); } @Test @@ -88,7 +92,7 @@ class DistributionEngineInitTaskTest { assertEquals("check next retry interval reach max retry interval", initTask.getCurrentRetryInterval(), maxRetry); } - + @Test void checkStartTask() { @@ -100,10 +104,10 @@ class DistributionEngineInitTaskTest { deConfiguration.setInitRetryIntervalSec(retry); deConfiguration.setInitMaxIntervalSec(maxRetry); DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, deConfiguration, envName, new AtomicBoolean(false), componentsUtils, null, readEnvFromConfig(deConfiguration)); - + initTask.startTask(); } - + @Test void checkRestartTask() { @@ -115,10 +119,10 @@ class DistributionEngineInitTaskTest { deConfiguration.setInitRetryIntervalSec(retry); deConfiguration.setInitMaxIntervalSec(maxRetry); DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, deConfiguration, envName, new AtomicBoolean(false), componentsUtils, null, readEnvFromConfig(deConfiguration)); - + initTask.restartTask(); } - + @Test void checkStopTask() { @@ -130,12 +134,12 @@ class DistributionEngineInitTaskTest { deConfiguration.setInitRetryIntervalSec(retry); deConfiguration.setInitMaxIntervalSec(maxRetry); DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, deConfiguration, envName, new AtomicBoolean(false), componentsUtils, null, readEnvFromConfig(deConfiguration)); - + initTask.stopTask(); initTask.startTask(); initTask.stopTask(); } - + @Test void checkDestroy() { @@ -147,10 +151,10 @@ class DistributionEngineInitTaskTest { deConfiguration.setInitRetryIntervalSec(retry); deConfiguration.setInitMaxIntervalSec(maxRetry); DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, deConfiguration, envName, new AtomicBoolean(false), componentsUtils, null, readEnvFromConfig(deConfiguration)); - + initTask.destroy(); } - + @Test void checkRun() { @@ -193,10 +197,10 @@ class DistributionEngineInitTaskTest { initTask.setCambriaHandler(cambriaHandler); boolean initFlow = initTask.initFlow(); - + initTask.run(); } - + @Test void testInitFlowScenarioSuccess() { @@ -244,6 +248,20 @@ class DistributionEngineInitTaskTest { } @Test + void testInitFlowSuccessKafkaEnabled(){ + DistributionEngineConfiguration config = new DistributionEngineConfiguration(); + config.setInitRetryIntervalSec(1); + config.setInitMaxIntervalSec(1); + + when(kafkaHandler.isKafkaActive()).thenReturn(true); + DistributionEngineInitTask initTask = new DistributionEngineInitTask(0l, config, null, new AtomicBoolean(false), componentsUtils, null, null); + initTask.setKafkaHandler(kafkaHandler); + + boolean initFlow = initTask.initFlow(); + assertTrue("check init flow succeed", initFlow); + } + + @Test void testInitFlowScenarioSuccessTopicsAlreadyExists() { String envName = "PrOD"; diff --git a/catalog-be/src/test/java/org/openecomp/sdc/be/components/kafka/KafkaHandlerTest.java b/catalog-be/src/test/java/org/openecomp/sdc/be/components/kafka/KafkaHandlerTest.java new file mode 100644 index 0000000000..91ee0235ad --- /dev/null +++ b/catalog-be/src/test/java/org/openecomp/sdc/be/components/kafka/KafkaHandlerTest.java @@ -0,0 +1,138 @@ +/*- + * ============LICENSE_START======================================================= + * SDC + * ================================================================================ + * Copyright (C) 2022 Nordix Foundation. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.openecomp.sdc.be.components.kafka; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.when; + +import com.google.gson.JsonSyntaxException; +import org.apache.kafka.common.KafkaException; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; + +import java.util.ArrayList; +import fj.data.Either; +import java.util.List; + +import org.openecomp.sdc.be.components.distribution.engine.CambriaErrorResponse; +import org.openecomp.sdc.be.components.distribution.engine.NotificationDataImpl; +import org.openecomp.sdc.be.components.distribution.engine.INotificationData; +import org.openecomp.sdc.be.distribution.api.client.CambriaOperationStatus; + + +@ExtendWith(MockitoExtension.class) +public class KafkaHandlerTest { + + @Mock + private SdcKafkaConsumer mockSdcKafkaConsumer; + + @Mock + private SdcKafkaProducer mockSdcKafkaProducer; + + private KafkaHandler kafkaHandler; + + @Test + public void testIsKafkaActiveTrue(){ + KafkaHandler kafkaHandler = new KafkaHandler(mockSdcKafkaConsumer, mockSdcKafkaProducer, true); + assertTrue(kafkaHandler.isKafkaActive()); + } + + @Test + public void testIsKafkaActiveFalse(){ + KafkaHandler kafkaHandler = new KafkaHandler(mockSdcKafkaConsumer, mockSdcKafkaProducer, true); + kafkaHandler.setKafkaActive(false); + assertFalse(kafkaHandler.isKafkaActive()); + } + + @Test + public void testFetchFromTopicSuccess(){ + String testTopic = "testTopic"; + List<String> mockedReturnedMessages = new ArrayList<>(); + mockedReturnedMessages.add("message1"); + mockedReturnedMessages.add("message2"); + KafkaHandler kafkaHandler = new KafkaHandler(mockSdcKafkaConsumer, mockSdcKafkaProducer, true); + when(mockSdcKafkaConsumer.poll(any())).thenReturn(mockedReturnedMessages); + Either<Iterable<String>, CambriaErrorResponse> response = kafkaHandler.fetchFromTopic(testTopic); + Iterable<String> actualReturnedMessages = response.left().value(); + assertTrue(response.isLeft()); + assertEquals(actualReturnedMessages, mockedReturnedMessages); + } + + @Test + public void testFetchFromTopicFail(){ + String testTopic = "testTopic"; + KafkaHandler kafkaHandler = new KafkaHandler(mockSdcKafkaConsumer, mockSdcKafkaProducer, true); + when(mockSdcKafkaConsumer.poll(any())).thenThrow(new KafkaException()); + Either<Iterable<String>, CambriaErrorResponse> response = kafkaHandler.fetchFromTopic(testTopic); + CambriaErrorResponse responseValue = response.right().value(); + assertTrue(response.isRight()); + assertEquals(responseValue.getOperationStatus(), CambriaOperationStatus.INTERNAL_SERVER_ERROR); + } + + @Test + public void testSendNotificationSuccess(){ + String testTopic = "testTopic"; + KafkaHandler kafkaHandler = new KafkaHandler(mockSdcKafkaConsumer, mockSdcKafkaProducer, true); + INotificationData testData = new NotificationDataImpl(); + CambriaErrorResponse response = kafkaHandler.sendNotification(testTopic, testData); + assertEquals(response.getOperationStatus(), CambriaOperationStatus.OK); + assertEquals(response.getHttpCode(), 200); + } + + @Test + public void testSendNotificationKafkaException(){ + String testTopic = "testTopic"; + KafkaHandler kafkaHandler = new KafkaHandler(mockSdcKafkaConsumer, mockSdcKafkaProducer, true); + INotificationData testData = new NotificationDataImpl(); + doThrow(KafkaException.class).when(mockSdcKafkaProducer).send(any(), any()); + CambriaErrorResponse response = kafkaHandler.sendNotification(testTopic, testData); + assertEquals(response.getOperationStatus(), CambriaOperationStatus.INTERNAL_SERVER_ERROR); + assertEquals(response.getHttpCode(), 500); + } + + @Test + public void testSendNotificationJsonSyntaxException(){ + String testTopic = "testTopic"; + KafkaHandler kafkaHandler = new KafkaHandler(mockSdcKafkaConsumer, mockSdcKafkaProducer, true); + INotificationData testData = new NotificationDataImpl(); + doThrow(JsonSyntaxException.class).when(mockSdcKafkaProducer).send(any(), any()); + CambriaErrorResponse response = kafkaHandler.sendNotification(testTopic, testData); + assertEquals(response.getOperationStatus(), CambriaOperationStatus.INTERNAL_SERVER_ERROR); + assertEquals(response.getHttpCode(), 500); + } + + @Test + public void testSendNotificationFlushException(){ + String testTopic = "testTopic"; + KafkaHandler kafkaHandler = new KafkaHandler(mockSdcKafkaConsumer, mockSdcKafkaProducer, true); + INotificationData testData = new NotificationDataImpl(); + doThrow(KafkaException.class).when(mockSdcKafkaProducer).flush(); + CambriaErrorResponse response = kafkaHandler.sendNotification(testTopic, testData); + assertEquals(response.getOperationStatus(), CambriaOperationStatus.INTERNAL_SERVER_ERROR); + assertEquals(response.getHttpCode(), 500); + } +} diff --git a/catalog-be/src/test/java/org/openecomp/sdc/be/components/kafka/SdcKafkaConsumerTest.java b/catalog-be/src/test/java/org/openecomp/sdc/be/components/kafka/SdcKafkaConsumerTest.java new file mode 100644 index 0000000000..0a4a834fa4 --- /dev/null +++ b/catalog-be/src/test/java/org/openecomp/sdc/be/components/kafka/SdcKafkaConsumerTest.java @@ -0,0 +1,143 @@ +/*- + * ============LICENSE_START======================================================= + * SDC + * ================================================================================ + * Copyright (C) 2022 Nordix Foundation. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.openecomp.sdc.be.components.kafka; + +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import org.apache.kafka.common.KafkaException; +import org.junit.jupiter.api.Test; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.when; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; + +import java.util.Collections; +import java.util.Collection; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.TopicPartition; +import org.jetbrains.annotations.NotNull; + +import org.openecomp.sdc.be.config.DistributionEngineConfiguration; + +public class SdcKafkaConsumerTest { + + @Test + public void TestSubscribeSuccess(){ + KafkaConsumer<byte[], byte[]> mockKafkaConsumer = Mockito.mock(KafkaConsumer.class); + SdcKafkaConsumer sdcKafkaConsumer = new SdcKafkaConsumer(mockKafkaConsumer, null); + ArgumentCaptor<Collections> captor = ArgumentCaptor.forClass(Collections.class); + + String testTopics = "testTopic"; + sdcKafkaConsumer.subscribe(testTopics); + verify(mockKafkaConsumer).subscribe((Collection<String>) captor.capture()); + } + + @Test + public void TestSubscribeAlreadySubscribed(){ + KafkaConsumer<byte[], byte[]> mockKafkaConsumer = Mockito.mock(KafkaConsumer.class); + SdcKafkaConsumer sdcKafkaConsumer = new SdcKafkaConsumer(mockKafkaConsumer, null); + ArgumentCaptor<Collections> captor = ArgumentCaptor.forClass(Collections.class); + + + String testTopics = "testTopic"; + Set<String> currentSubs = new HashSet<String>(); + currentSubs.add(testTopics); + when(mockKafkaConsumer.subscription()).thenReturn(currentSubs); + sdcKafkaConsumer.subscribe(testTopics); + verify(mockKafkaConsumer, never()).subscribe((Collection<String>) captor.capture()); + } + + @Test + public void TestPollForMessagesForSpecificTopicSuccess(){ + KafkaConsumer<byte[], byte[]> mockKafkaConsumer = Mockito.mock(KafkaConsumer.class); + + + String testTopic = "testTopic"; + + ConsumerRecords mockedPollResult = getTestConsumerRecords(testTopic); + + when(mockKafkaConsumer.poll(any())).thenReturn(mockedPollResult); + + DistributionEngineConfiguration config = getMockDistributionEngineConfiguration(); + + SdcKafkaConsumer sdcKafkaConsumer = new SdcKafkaConsumer(mockKafkaConsumer, config); + + List<String> returned = sdcKafkaConsumer.poll(testTopic); + assertTrue(returned.size()==1); + assertTrue(returned.contains("testTopicValue")); + } + + @Test + public void testSaslJaasConfigNotFound(){ + assertThrows( + KafkaException.class, + () -> new SdcKafkaConsumer(setTestDistributionEngineConfigs()), + "Sasl Jaas Config should not be found, so expected a KafkaException" + ); + } + + @NotNull + private DistributionEngineConfiguration getMockDistributionEngineConfiguration() { + DistributionEngineConfiguration config = new DistributionEngineConfiguration(); + DistributionEngineConfiguration.DistributionStatusTopicConfig mockStatusTopic = new DistributionEngineConfiguration.DistributionStatusTopicConfig(); + mockStatusTopic.setPollingIntervalSec(1); + config.setDistributionStatusTopic(mockStatusTopic); + return config; + } + + @NotNull + private ConsumerRecords getTestConsumerRecords(String testTopics) { + Map map = new HashMap<Integer, ConsumerRecord>(); + + ConsumerRecord consumerRecord = new ConsumerRecord(testTopics, 0, 0, "", "testTopicValue"); + + List<ConsumerRecord> consumerRecordList = new ArrayList<>(); + consumerRecordList.add(consumerRecord); + TopicPartition topicPartition = new TopicPartition(testTopics, 0); + map.put(topicPartition, consumerRecordList); + + ConsumerRecords mockedPollResult = new ConsumerRecords(map); + return mockedPollResult; + } + + private DistributionEngineConfiguration setTestDistributionEngineConfigs(){ + DistributionEngineConfiguration.DistributionStatusTopicConfig dsTopic = new DistributionEngineConfiguration.DistributionStatusTopicConfig(); + DistributionEngineConfiguration deConfiguration = new DistributionEngineConfiguration(); + String testBootstrapServers = "TestBootstrapServer"; + dsTopic.setConsumerGroup("consumerGroup"); + dsTopic.setConsumerId("consumerId"); + + deConfiguration.setKafkaBootStrapServers(testBootstrapServers); + deConfiguration.setDistributionStatusTopic(dsTopic); + return deConfiguration; + } +} diff --git a/catalog-be/src/test/java/org/openecomp/sdc/be/components/kafka/SdcKafkaProducerTest.java b/catalog-be/src/test/java/org/openecomp/sdc/be/components/kafka/SdcKafkaProducerTest.java new file mode 100644 index 0000000000..23322cce5a --- /dev/null +++ b/catalog-be/src/test/java/org/openecomp/sdc/be/components/kafka/SdcKafkaProducerTest.java @@ -0,0 +1,94 @@ +/*- + * ============LICENSE_START======================================================= + * SDC + * ================================================================================ + * Copyright (C) 2022 Nordix Foundation. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.openecomp.sdc.be.components.kafka; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import org.junit.jupiter.api.Test; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.common.KafkaException; + +import org.openecomp.sdc.be.catalog.api.IStatus; +import org.openecomp.sdc.be.config.DistributionEngineConfiguration; + +public class SdcKafkaProducerTest { + + @Test + public void TestSendSuccess(){ + KafkaProducer<byte[], byte[]> mockKafkaProducer = Mockito.mock(KafkaProducer.class); + SdcKafkaProducer sdcKafkaProducer = new SdcKafkaProducer(mockKafkaProducer); + ArgumentCaptor<ProducerRecord> captor = ArgumentCaptor.forClass(ProducerRecord.class); + sdcKafkaProducer.send("testMessage", "testTopic"); + + + verify(mockKafkaProducer).send(captor.capture()); + } + + @Test + public void testFlushSuccess(){ + KafkaProducer<byte[], byte[]> mockKafkaProducer = Mockito.mock(KafkaProducer.class); + SdcKafkaProducer sdcKafkaProducer = new SdcKafkaProducer(mockKafkaProducer); + sdcKafkaProducer.flush(); + + verify(mockKafkaProducer).flush(); + } + + @Test + public void testSendFail(){ + KafkaProducer<byte[], byte[]> mockKafkaProducer = Mockito.mock(KafkaProducer.class); + SdcKafkaProducer sdcKafkaProducer = new SdcKafkaProducer(mockKafkaProducer); + + when(mockKafkaProducer.send(any())).thenThrow(new KafkaException()); + + assertThrows( + KafkaException.class, + () -> sdcKafkaProducer.send("testMessage", "testTopic"), + "Expected a KafkaException thrown on KafkaProducer Send"); + } + + @Test + public void testSaslJaasConfigNotFound(){ + assertThrows( + KafkaException.class, + () -> new SdcKafkaProducer(setTestDistributionEngineConfigs()), + "Sasl Jaas Config should not be found, so expected a KafkaException" + ); + } + + private DistributionEngineConfiguration setTestDistributionEngineConfigs(){ + DistributionEngineConfiguration.DistributionStatusTopicConfig dStatusTopicConfig = new DistributionEngineConfiguration.DistributionStatusTopicConfig(); + DistributionEngineConfiguration deConfiguration = new DistributionEngineConfiguration(); + deConfiguration.setKafkaBootStrapServers("TestBootstrapServer"); + dStatusTopicConfig.setConsumerId("consumerId"); + + deConfiguration.setDistributionStatusTopic(dStatusTopicConfig); + deConfiguration.getDistributionStatusTopic().getConsumerId(); + return deConfiguration; + } +} |