diff options
author | david.mcweeney <david.mcweeney@est.tech> | 2022-10-04 15:46:14 +0100 |
---|---|---|
committer | Michael Morris <michael.morris@est.tech> | 2022-10-25 11:24:02 +0000 |
commit | 47f96dd966663f7f46b719451c0752721a2940a3 (patch) | |
tree | 9d875ce43f96cf3e570cc812d907fa2edd3b7945 /catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaConsumer.java | |
parent | 0d2e96c125aab4e4edfc0a8b897353c0fabdd885 (diff) |
[SDC] Add kafka native messaging
Change-Id: I5ab8f580947cbc264d94bec48a5e8b659dc44c08
Issue-ID: DMAAP-1787
Signed-off-by: david.mcweeney <david.mcweeney@est.tech>
Diffstat (limited to 'catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaConsumer.java')
-rw-r--r-- | catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaConsumer.java | 117 |
1 files changed, 117 insertions, 0 deletions
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaConsumer.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaConsumer.java new file mode 100644 index 0000000000..8879bf000e --- /dev/null +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaConsumer.java @@ -0,0 +1,117 @@ +/*- + * ============LICENSE_START======================================================= + * SDC + * ================================================================================ + * Copyright (C) 2022 Nordix Foundation. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ +package org.openecomp.sdc.be.components.kafka; + +import com.google.common.annotations.VisibleForTesting; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Properties; +import java.util.UUID; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.config.SaslConfigs; +import org.openecomp.sdc.be.config.DistributionEngineConfiguration; +import org.openecomp.sdc.common.log.wrappers.Logger; + +/** + * Utility class that provides a KafkaConsumer to communicate with a kafka cluster + */ +public class SdcKafkaConsumer { + + private static final Logger log = Logger.getLogger(SdcKafkaConsumer.class.getName()); + private final DistributionEngineConfiguration deConfiguration; + private KafkaConsumer<String, String> kafkaConsumer; + + /** + * Constructor setting up the KafkaConsumer from a predefined set of configurations + */ + public SdcKafkaConsumer(DistributionEngineConfiguration deConfiguration){ + log.info("Create SdcKafkaConsumer via constructor"); + Properties properties = new Properties(); + this.deConfiguration = deConfiguration; + + properties.put(ConsumerConfig.CLIENT_ID_CONFIG, deConfiguration.getDistributionStatusTopic().getConsumerId()+ "-consumer-" + UUID.randomUUID()); + properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); + properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); + properties.put(ConsumerConfig.GROUP_ID_CONFIG, deConfiguration.getDistributionStatusTopic().getConsumerGroup()); + properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, deConfiguration.getKafkaBootStrapServers()); + properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); + properties.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, false); + properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); + properties.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512"); + + properties.put(SaslConfigs.SASL_JAAS_CONFIG, getKafkaSaslJaasConfig()); + kafkaConsumer = new KafkaConsumer<>(properties); + } + + /** + * + * @param kafkaConsumer a kafkaConsumer to use within the class + * @param deConfiguration - Configuration to pass into the class + */ + @VisibleForTesting + SdcKafkaConsumer(KafkaConsumer kafkaConsumer, DistributionEngineConfiguration deConfiguration){ + this.deConfiguration = deConfiguration; + this.kafkaConsumer = kafkaConsumer; + } + + /** + * + * @return the Sasl Jass Config + */ + private String getKafkaSaslJaasConfig() { + String saslJaasConfFromEnv = System.getenv("SASL_JAAS_CONFIG"); + if(saslJaasConfFromEnv != null) { + return saslJaasConfFromEnv; + } else { + throw new KafkaException("sasl.jaas.config not set for Kafka Consumer"); + } + } + + /** + * + * @param topic Topic in which to subscribe + */ + public void subscribe(String topic) throws KafkaException { + if (!kafkaConsumer.subscription().contains(topic)) { + kafkaConsumer.subscribe(Collections.singleton(topic)); + } + } + + /** + * + * @return The list of messages for a specified topic, returned from the poll + */ + public List<String> poll(String topicName) throws KafkaException { + log.info("SdcKafkaConsumer - polling for messages from Topic: {}", topicName); + List<String> msgs = new ArrayList<>(); + ConsumerRecords<String, String> consumerRecordsForSpecificTopic = kafkaConsumer.poll(Duration.ofSeconds(deConfiguration.getDistributionStatusTopic().getPollingIntervalSec())); + for(ConsumerRecord<String, String> rec : consumerRecordsForSpecificTopic){ + msgs.add(rec.value()); + } + return msgs; + } +} |