From decb69997badb5c8de17268ca0c9bccf93f937e3 Mon Sep 17 00:00:00 2001 From: MichaelMorris Date: Fri, 16 Jun 2023 14:38:28 +0100 Subject: Enable setting sasl.mechanism Signed-off-by: MichaelMorris Issue-ID: SDC-4540 Change-Id: Iaa88b41ca6a49427695b891544d6f4767196c75f --- .../sdc/be/components/kafka/SdcKafkaConsumer.java | 13 ++++++++++++- .../sdc/be/components/kafka/SdcKafkaProducer.java | 14 ++++++++++++-- 2 files changed, 24 insertions(+), 3 deletions(-) (limited to 'catalog-be/src/main') diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaConsumer.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaConsumer.java index 8879bf000e..04df4e1fbe 100644 --- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaConsumer.java +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaConsumer.java @@ -42,6 +42,8 @@ import org.openecomp.sdc.common.log.wrappers.Logger; public class SdcKafkaConsumer { private static final Logger log = Logger.getLogger(SdcKafkaConsumer.class.getName()); + private static final String DEFAULT_SASL_MECHANISM = "SCRAM-SHA-512"; + private final DistributionEngineConfiguration deConfiguration; private KafkaConsumer kafkaConsumer; @@ -61,7 +63,7 @@ public class SdcKafkaConsumer { properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); properties.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, false); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); - properties.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512"); + properties.put(SaslConfigs.SASL_MECHANISM, getKafkaSaslMechanism()); properties.put(SaslConfigs.SASL_JAAS_CONFIG, getKafkaSaslJaasConfig()); kafkaConsumer = new KafkaConsumer<>(properties); @@ -90,6 +92,15 @@ public class SdcKafkaConsumer { throw new KafkaException("sasl.jaas.config not set for Kafka Consumer"); } } + + private static String getKafkaSaslMechanism() throws KafkaException { + String saslMechanism = System.getenv("SASL_MECHANISM"); + if(saslMechanism != null) { + return saslMechanism; + } else { + return DEFAULT_SASL_MECHANISM; + } + } /** * diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaProducer.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaProducer.java index bdc984d7b5..7158357799 100644 --- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaProducer.java +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaProducer.java @@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory; */ public class SdcKafkaProducer { private static final Logger log = LoggerFactory.getLogger(SdcKafkaProducer.class.getName()); + private static final String DEFAULT_SASL_MECHANISM = "SCRAM-SHA-512"; private KafkaProducer kafkaProducer; @@ -53,7 +54,7 @@ public class SdcKafkaProducer { properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, deConfiguration.getKafkaBootStrapServers()); properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); properties.put(SaslConfigs.SASL_JAAS_CONFIG, getKafkaSaslJaasConfig()); - properties.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512"); + properties.put(SaslConfigs.SASL_MECHANISM, getKafkaSaslMechanism()); kafkaProducer = new KafkaProducer<>(properties); } @@ -77,7 +78,16 @@ public class SdcKafkaProducer { throw new KafkaException("sasl.jaas.config not set for Kafka Consumer"); } } - + + private static String getKafkaSaslMechanism() throws KafkaException { + String saslMechanism = System.getenv("SASL_MECHANISM"); + if(saslMechanism != null) { + return saslMechanism; + } else { + return DEFAULT_SASL_MECHANISM; + } + } + /** * @param message A message to Send * @param topicName The name of the topic to publish to -- cgit 1.2.3-korg