diff options
Diffstat (limited to 'catalog-be')
-rw-r--r-- | catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaConsumer.java | 13 | ||||
-rw-r--r-- | catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaProducer.java | 14 |
2 files changed, 24 insertions, 3 deletions
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaConsumer.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaConsumer.java index 8879bf000e..04df4e1fbe 100644 --- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaConsumer.java +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaConsumer.java @@ -42,6 +42,8 @@ import org.openecomp.sdc.common.log.wrappers.Logger; public class SdcKafkaConsumer { private static final Logger log = Logger.getLogger(SdcKafkaConsumer.class.getName()); + private static final String DEFAULT_SASL_MECHANISM = "SCRAM-SHA-512"; + private final DistributionEngineConfiguration deConfiguration; private KafkaConsumer<String, String> kafkaConsumer; @@ -61,7 +63,7 @@ public class SdcKafkaConsumer { properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); properties.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, false); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); - properties.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512"); + properties.put(SaslConfigs.SASL_MECHANISM, getKafkaSaslMechanism()); properties.put(SaslConfigs.SASL_JAAS_CONFIG, getKafkaSaslJaasConfig()); kafkaConsumer = new KafkaConsumer<>(properties); @@ -90,6 +92,15 @@ public class SdcKafkaConsumer { throw new KafkaException("sasl.jaas.config not set for Kafka Consumer"); } } + + private static String getKafkaSaslMechanism() throws KafkaException { + String saslMechanism = System.getenv("SASL_MECHANISM"); + if(saslMechanism != null) { + return saslMechanism; + } else { + return DEFAULT_SASL_MECHANISM; + } + } /** * diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaProducer.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaProducer.java index bdc984d7b5..7158357799 100644 --- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaProducer.java +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaProducer.java @@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory; */ public class SdcKafkaProducer { private static final Logger log = LoggerFactory.getLogger(SdcKafkaProducer.class.getName()); + private static final String DEFAULT_SASL_MECHANISM = "SCRAM-SHA-512"; private KafkaProducer<String, String> kafkaProducer; @@ -53,7 +54,7 @@ public class SdcKafkaProducer { properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, deConfiguration.getKafkaBootStrapServers()); properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); properties.put(SaslConfigs.SASL_JAAS_CONFIG, getKafkaSaslJaasConfig()); - properties.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512"); + properties.put(SaslConfigs.SASL_MECHANISM, getKafkaSaslMechanism()); kafkaProducer = new KafkaProducer<>(properties); } @@ -77,7 +78,16 @@ public class SdcKafkaProducer { throw new KafkaException("sasl.jaas.config not set for Kafka Consumer"); } } - + + private static String getKafkaSaslMechanism() throws KafkaException { + String saslMechanism = System.getenv("SASL_MECHANISM"); + if(saslMechanism != null) { + return saslMechanism; + } else { + return DEFAULT_SASL_MECHANISM; + } + } + /** * @param message A message to Send * @param topicName The name of the topic to publish to |