aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaConsumer.java13
-rw-r--r--catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaProducer.java14
2 files changed, 24 insertions, 3 deletions
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaConsumer.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaConsumer.java
index 8879bf000e..04df4e1fbe 100644
--- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaConsumer.java
+++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaConsumer.java
@@ -42,6 +42,8 @@ import org.openecomp.sdc.common.log.wrappers.Logger;
public class SdcKafkaConsumer {
private static final Logger log = Logger.getLogger(SdcKafkaConsumer.class.getName());
+ private static final String DEFAULT_SASL_MECHANISM = "SCRAM-SHA-512";
+
private final DistributionEngineConfiguration deConfiguration;
private KafkaConsumer<String, String> kafkaConsumer;
@@ -61,7 +63,7 @@ public class SdcKafkaConsumer {
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
properties.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, false);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
- properties.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512");
+ properties.put(SaslConfigs.SASL_MECHANISM, getKafkaSaslMechanism());
properties.put(SaslConfigs.SASL_JAAS_CONFIG, getKafkaSaslJaasConfig());
kafkaConsumer = new KafkaConsumer<>(properties);
@@ -90,6 +92,15 @@ public class SdcKafkaConsumer {
throw new KafkaException("sasl.jaas.config not set for Kafka Consumer");
}
}
+
+ private static String getKafkaSaslMechanism() throws KafkaException {
+ String saslMechanism = System.getenv("SASL_MECHANISM");
+ if(saslMechanism != null) {
+ return saslMechanism;
+ } else {
+ return DEFAULT_SASL_MECHANISM;
+ }
+ }
/**
*
diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaProducer.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaProducer.java
index bdc984d7b5..7158357799 100644
--- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaProducer.java
+++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaProducer.java
@@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory;
*/
public class SdcKafkaProducer {
private static final Logger log = LoggerFactory.getLogger(SdcKafkaProducer.class.getName());
+ private static final String DEFAULT_SASL_MECHANISM = "SCRAM-SHA-512";
private KafkaProducer<String, String> kafkaProducer;
@@ -53,7 +54,7 @@ public class SdcKafkaProducer {
properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, deConfiguration.getKafkaBootStrapServers());
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
properties.put(SaslConfigs.SASL_JAAS_CONFIG, getKafkaSaslJaasConfig());
- properties.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512");
+ properties.put(SaslConfigs.SASL_MECHANISM, getKafkaSaslMechanism());
kafkaProducer = new KafkaProducer<>(properties);
}
@@ -77,7 +78,16 @@ public class SdcKafkaProducer {
throw new KafkaException("sasl.jaas.config not set for Kafka Consumer");
}
}
-
+
+ private static String getKafkaSaslMechanism() throws KafkaException {
+ String saslMechanism = System.getenv("SASL_MECHANISM");
+ if(saslMechanism != null) {
+ return saslMechanism;
+ } else {
+ return DEFAULT_SASL_MECHANISM;
+ }
+ }
+
/**
* @param message A message to Send
* @param topicName The name of the topic to publish to