diff options
author | 2023-04-21 15:42:02 +0100 | |
---|---|---|
committer | 2023-04-25 19:30:59 +0000 | |
commit | 3405456c46937352863ce19c39266a51dd7760db (patch) | |
tree | 96e1240d6d4e768e864574dc37aa64609d882e49 /catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaConsumer.java | |
parent | 5d3954987dd1d53d2e9623648b6d436592a4c195 (diff) |
[SDC-BE] Add kafka ssl config
Signed-off-by: efiacor <fiachra.corcoran@est.tech>
Change-Id: I9e7c0e44566c46bd6225397a680015bf1c0f1c0b
Issue-ID: SDC-4476
Diffstat (limited to 'catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaConsumer.java')
-rw-r--r-- | catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaConsumer.java | 36 |
1 files changed, 4 insertions, 32 deletions
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaConsumer.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaConsumer.java index 8879bf000e..5350445ab1 100644 --- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaConsumer.java +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaConsumer.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * SDC * ================================================================================ - * Copyright (C) 2022 Nordix Foundation. All rights reserved. + * Copyright (C) 2022-2023 Nordix Foundation. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,14 +25,10 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Properties; -import java.util.UUID; -import org.apache.kafka.clients.CommonClientConfigs; -import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.config.SaslConfigs; import org.openecomp.sdc.be.config.DistributionEngineConfiguration; import org.openecomp.sdc.common.log.wrappers.Logger; @@ -43,27 +39,16 @@ public class SdcKafkaConsumer { private static final Logger log = Logger.getLogger(SdcKafkaConsumer.class.getName()); private final DistributionEngineConfiguration deConfiguration; - private KafkaConsumer<String, String> kafkaConsumer; + private final KafkaConsumer<String, String> kafkaConsumer; /** * Constructor setting up the KafkaConsumer from a predefined set of configurations */ public SdcKafkaConsumer(DistributionEngineConfiguration deConfiguration){ log.info("Create SdcKafkaConsumer via constructor"); - Properties properties = new Properties(); + KafkaCommonConfig kafkaCommonConfig = new KafkaCommonConfig(deConfiguration); + Properties properties = kafkaCommonConfig.getConsumerProperties(); this.deConfiguration = deConfiguration; - - properties.put(ConsumerConfig.CLIENT_ID_CONFIG, deConfiguration.getDistributionStatusTopic().getConsumerId()+ "-consumer-" + UUID.randomUUID()); - properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); - properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); - properties.put(ConsumerConfig.GROUP_ID_CONFIG, deConfiguration.getDistributionStatusTopic().getConsumerGroup()); - properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, deConfiguration.getKafkaBootStrapServers()); - properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); - properties.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, false); - properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); - properties.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512"); - - properties.put(SaslConfigs.SASL_JAAS_CONFIG, getKafkaSaslJaasConfig()); kafkaConsumer = new KafkaConsumer<>(properties); } @@ -80,19 +65,6 @@ public class SdcKafkaConsumer { /** * - * @return the Sasl Jass Config - */ - private String getKafkaSaslJaasConfig() { - String saslJaasConfFromEnv = System.getenv("SASL_JAAS_CONFIG"); - if(saslJaasConfFromEnv != null) { - return saslJaasConfFromEnv; - } else { - throw new KafkaException("sasl.jaas.config not set for Kafka Consumer"); - } - } - - /** - * * @param topic Topic in which to subscribe */ public void subscribe(String topic) throws KafkaException { |