summaryrefslogtreecommitdiffstats
path: root/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaConsumer.java
diff options
context:
space:
mode:
authordavid.mcweeney <david.mcweeney@est.tech>2022-10-04 15:46:14 +0100
committerMichael Morris <michael.morris@est.tech>2022-10-25 11:24:02 +0000
commit47f96dd966663f7f46b719451c0752721a2940a3 (patch)
tree9d875ce43f96cf3e570cc812d907fa2edd3b7945 /catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaConsumer.java
parent0d2e96c125aab4e4edfc0a8b897353c0fabdd885 (diff)
[SDC] Add kafka native messaging
Change-Id: I5ab8f580947cbc264d94bec48a5e8b659dc44c08 Issue-ID: DMAAP-1787 Signed-off-by: david.mcweeney <david.mcweeney@est.tech>
Diffstat (limited to 'catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaConsumer.java')
-rw-r--r--catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaConsumer.java117
1 files changed, 117 insertions, 0 deletions
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaConsumer.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaConsumer.java
new file mode 100644
index 0000000000..8879bf000e
--- /dev/null
+++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaConsumer.java
@@ -0,0 +1,117 @@
+/*-
+ * ============LICENSE_START=======================================================
+ * SDC
+ * ================================================================================
+ * Copyright (C) 2022 Nordix Foundation. All rights reserved.
+ * ================================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ */
+package org.openecomp.sdc.be.components.kafka;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.UUID;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.openecomp.sdc.be.config.DistributionEngineConfiguration;
+import org.openecomp.sdc.common.log.wrappers.Logger;
+
+/**
+ * Utility class that provides a KafkaConsumer to communicate with a kafka cluster
+ */
+public class SdcKafkaConsumer {
+
+ private static final Logger log = Logger.getLogger(SdcKafkaConsumer.class.getName());
+ private final DistributionEngineConfiguration deConfiguration;
+ private KafkaConsumer<String, String> kafkaConsumer;
+
+ /**
+ * Constructor setting up the KafkaConsumer from a predefined set of configurations
+ */
+ public SdcKafkaConsumer(DistributionEngineConfiguration deConfiguration){
+ log.info("Create SdcKafkaConsumer via constructor");
+ Properties properties = new Properties();
+ this.deConfiguration = deConfiguration;
+
+ properties.put(ConsumerConfig.CLIENT_ID_CONFIG, deConfiguration.getDistributionStatusTopic().getConsumerId()+ "-consumer-" + UUID.randomUUID());
+ properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
+ properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
+ properties.put(ConsumerConfig.GROUP_ID_CONFIG, deConfiguration.getDistributionStatusTopic().getConsumerGroup());
+ properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, deConfiguration.getKafkaBootStrapServers());
+ properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
+ properties.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, false);
+ properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
+ properties.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512");
+
+ properties.put(SaslConfigs.SASL_JAAS_CONFIG, getKafkaSaslJaasConfig());
+ kafkaConsumer = new KafkaConsumer<>(properties);
+ }
+
+ /**
+ *
+ * @param kafkaConsumer a kafkaConsumer to use within the class
+ * @param deConfiguration - Configuration to pass into the class
+ */
+ @VisibleForTesting
+ SdcKafkaConsumer(KafkaConsumer kafkaConsumer, DistributionEngineConfiguration deConfiguration){
+ this.deConfiguration = deConfiguration;
+ this.kafkaConsumer = kafkaConsumer;
+ }
+
+ /**
+ *
+ * @return the Sasl Jass Config
+ */
+ private String getKafkaSaslJaasConfig() {
+ String saslJaasConfFromEnv = System.getenv("SASL_JAAS_CONFIG");
+ if(saslJaasConfFromEnv != null) {
+ return saslJaasConfFromEnv;
+ } else {
+ throw new KafkaException("sasl.jaas.config not set for Kafka Consumer");
+ }
+ }
+
+ /**
+ *
+ * @param topic Topic in which to subscribe
+ */
+ public void subscribe(String topic) throws KafkaException {
+ if (!kafkaConsumer.subscription().contains(topic)) {
+ kafkaConsumer.subscribe(Collections.singleton(topic));
+ }
+ }
+
+ /**
+ *
+ * @return The list of messages for a specified topic, returned from the poll
+ */
+ public List<String> poll(String topicName) throws KafkaException {
+ log.info("SdcKafkaConsumer - polling for messages from Topic: {}", topicName);
+ List<String> msgs = new ArrayList<>();
+ ConsumerRecords<String, String> consumerRecordsForSpecificTopic = kafkaConsumer.poll(Duration.ofSeconds(deConfiguration.getDistributionStatusTopic().getPollingIntervalSec()));
+ for(ConsumerRecord<String, String> rec : consumerRecordsForSpecificTopic){
+ msgs.add(rec.value());
+ }
+ return msgs;
+ }
+}