From fccb576dfeb307bf30de6994fe462d1bfdd51fd5 Mon Sep 17 00:00:00 2001 From: MichaelMorris Date: Fri, 2 Jun 2023 12:46:04 +0000 Subject: Revert "[SDC-BE] Add kafka ssl config" This reverts commit 3405456c46937352863ce19c39266a51dd7760db. Reason for revert: deployment issues with TLS Change-Id: I58aa51f7d563cf74d3747a5ff59104906b294d18 Signed-off-by: MichaelMorris Issue-ID: SDC-4476 --- .../sdc/be/components/kafka/KafkaCommonConfig.java | 95 ---------------------- .../sdc/be/components/kafka/KafkaHandler.java | 1 + .../sdc/be/components/kafka/SdcKafkaConsumer.java | 36 +++++++- .../sdc/be/components/kafka/SdcKafkaProducer.java | 32 +++++++- 4 files changed, 61 insertions(+), 103 deletions(-) delete mode 100644 catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/KafkaCommonConfig.java (limited to 'catalog-be/src/main/java/org') diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/KafkaCommonConfig.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/KafkaCommonConfig.java deleted file mode 100644 index bf65c52874..0000000000 --- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/KafkaCommonConfig.java +++ /dev/null @@ -1,95 +0,0 @@ -/*- - * ============LICENSE_START======================================================= - * SDC - * ================================================================================ - * Copyright (C) 2023 Nordix Foundation. All rights reserved. - * ================================================================================ - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * ============LICENSE_END========================================================= - */ -package org.openecomp.sdc.be.components.kafka; - -import java.util.Properties; -import java.util.UUID; -import org.apache.kafka.clients.CommonClientConfigs; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.config.SaslConfigs; -import org.apache.kafka.common.config.SslConfigs; -import org.openecomp.sdc.be.config.DistributionEngineConfiguration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class KafkaCommonConfig { - - private static final Logger log = LoggerFactory.getLogger(KafkaCommonConfig.class.getName()); - - private final DistributionEngineConfiguration deConfiguration; - - public KafkaCommonConfig(DistributionEngineConfiguration config){ - this.deConfiguration = config; - } - - public Properties getConsumerProperties(){ - Properties props = new Properties(); - setCommonProperties(props); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); - props.put(ConsumerConfig.CLIENT_ID_CONFIG, deConfiguration.getDistributionStatusTopic().getConsumerId() + "-consumer-" + UUID.randomUUID()); - props.put(ConsumerConfig.GROUP_ID_CONFIG, deConfiguration.getDistributionStatusTopic().getConsumerGroup()); - props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); - return props; - } - - public Properties getProducerProperties(){ - Properties props = new Properties(); - setCommonProperties(props); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); - props.put(ProducerConfig.CLIENT_ID_CONFIG, deConfiguration.getDistributionStatusTopic().getConsumerId() + "-producer-" + UUID.randomUUID()); - - return props; - } - - private void setCommonProperties(Properties props) { - String securityProtocolConfig = System.getenv().getOrDefault("SECURITY_PROTOCOL", "SASL_PLAINTEXT"); - props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocolConfig); - props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, deConfiguration.getKafkaBootStrapServers()); - - if("SSL".equals(securityProtocolConfig)) { - props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, deConfiguration.getSSLConfig().getKeystorePath()); - props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, deConfiguration.getSSLConfig().getKeystorePass()); - props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, deConfiguration.getSSLConfig().getKeyManagerPassword()); - props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ""); - props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, deConfiguration.getSSLConfig().getTruststorePath()); - props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, deConfiguration.getSSLConfig().getTruststorePass()); - } else { - props.put(SaslConfigs.SASL_JAAS_CONFIG, getKafkaSaslJaasConfig()); - props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512"); - } - } - - /** - * @return The Sasl Jaas Configuration - */ - private String getKafkaSaslJaasConfig() throws KafkaException { - String saslJaasConfFromEnv = System.getenv("SASL_JAAS_CONFIG"); - if(saslJaasConfFromEnv != null) { - return saslJaasConfFromEnv; - } else { - throw new KafkaException("sasl.jaas.config not set for Kafka Consumer"); - } - } - -} diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/KafkaHandler.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/KafkaHandler.java index 5a3698055e..2a5590e72d 100644 --- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/KafkaHandler.java +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/KafkaHandler.java @@ -22,6 +22,7 @@ package org.openecomp.sdc.be.components.kafka; import com.google.gson.Gson; import com.google.gson.JsonSyntaxException; import fj.data.Either; +import lombok.Getter; import lombok.Setter; import org.apache.http.HttpStatus; import org.apache.kafka.common.KafkaException; diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaConsumer.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaConsumer.java index 5350445ab1..8879bf000e 100644 --- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaConsumer.java +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaConsumer.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * SDC * ================================================================================ - * Copyright (C) 2022-2023 Nordix Foundation. All rights reserved. + * Copyright (C) 2022 Nordix Foundation. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -25,10 +25,14 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Properties; +import java.util.UUID; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.config.SaslConfigs; import org.openecomp.sdc.be.config.DistributionEngineConfiguration; import org.openecomp.sdc.common.log.wrappers.Logger; @@ -39,16 +43,27 @@ public class SdcKafkaConsumer { private static final Logger log = Logger.getLogger(SdcKafkaConsumer.class.getName()); private final DistributionEngineConfiguration deConfiguration; - private final KafkaConsumer kafkaConsumer; + private KafkaConsumer kafkaConsumer; /** * Constructor setting up the KafkaConsumer from a predefined set of configurations */ public SdcKafkaConsumer(DistributionEngineConfiguration deConfiguration){ log.info("Create SdcKafkaConsumer via constructor"); - KafkaCommonConfig kafkaCommonConfig = new KafkaCommonConfig(deConfiguration); - Properties properties = kafkaCommonConfig.getConsumerProperties(); + Properties properties = new Properties(); this.deConfiguration = deConfiguration; + + properties.put(ConsumerConfig.CLIENT_ID_CONFIG, deConfiguration.getDistributionStatusTopic().getConsumerId()+ "-consumer-" + UUID.randomUUID()); + properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); + properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); + properties.put(ConsumerConfig.GROUP_ID_CONFIG, deConfiguration.getDistributionStatusTopic().getConsumerGroup()); + properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, deConfiguration.getKafkaBootStrapServers()); + properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); + properties.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, false); + properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); + properties.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512"); + + properties.put(SaslConfigs.SASL_JAAS_CONFIG, getKafkaSaslJaasConfig()); kafkaConsumer = new KafkaConsumer<>(properties); } @@ -63,6 +78,19 @@ public class SdcKafkaConsumer { this.kafkaConsumer = kafkaConsumer; } + /** + * + * @return the Sasl Jass Config + */ + private String getKafkaSaslJaasConfig() { + String saslJaasConfFromEnv = System.getenv("SASL_JAAS_CONFIG"); + if(saslJaasConfFromEnv != null) { + return saslJaasConfFromEnv; + } else { + throw new KafkaException("sasl.jaas.config not set for Kafka Consumer"); + } + } + /** * * @param topic Topic in which to subscribe diff --git a/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaProducer.java b/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaProducer.java index 9e31da66b9..bdc984d7b5 100644 --- a/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaProducer.java +++ b/catalog-be/src/main/java/org/openecomp/sdc/be/components/kafka/SdcKafkaProducer.java @@ -2,7 +2,7 @@ * ============LICENSE_START======================================================= * SDC * ================================================================================ - * Copyright (C) 2022-2023 Nordix Foundation. All rights reserved. + * Copyright (C) 2022 Nordix Foundation. All rights reserved. * ================================================================================ * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,9 +21,13 @@ package org.openecomp.sdc.be.components.kafka; import com.google.common.annotations.VisibleForTesting; import java.util.Properties; +import java.util.UUID; +import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.config.SaslConfigs; import org.openecomp.sdc.be.config.DistributionEngineConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,15 +38,22 @@ import org.slf4j.LoggerFactory; public class SdcKafkaProducer { private static final Logger log = LoggerFactory.getLogger(SdcKafkaProducer.class.getName()); - private final KafkaProducer kafkaProducer; + private KafkaProducer kafkaProducer; /** * Constructor setting up the KafkaProducer from a predefined set of configurations */ public SdcKafkaProducer(DistributionEngineConfiguration deConfiguration) { log.info("Create SdcKafkaProducer via constructor"); - KafkaCommonConfig kafkaCommonConfig = new KafkaCommonConfig(deConfiguration); - Properties properties = kafkaCommonConfig.getProducerProperties(); + Properties properties = new Properties(); + + properties.put(ProducerConfig.CLIENT_ID_CONFIG, deConfiguration.getDistributionStatusTopic().getConsumerId() + "-producer-" + UUID.randomUUID()); + properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, deConfiguration.getKafkaBootStrapServers()); + properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT"); + properties.put(SaslConfigs.SASL_JAAS_CONFIG, getKafkaSaslJaasConfig()); + properties.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512"); kafkaProducer = new KafkaProducer<>(properties); } @@ -55,9 +66,22 @@ public class SdcKafkaProducer { this.kafkaProducer = kafkaProducer; } + /** + * @return The Sasl Jaas Configuration + */ + private static String getKafkaSaslJaasConfig() throws KafkaException { + String saslJaasConfFromEnv = System.getenv("SASL_JAAS_CONFIG"); + if(saslJaasConfFromEnv != null) { + return saslJaasConfFromEnv; + } else { + throw new KafkaException("sasl.jaas.config not set for Kafka Consumer"); + } + } + /** * @param message A message to Send * @param topicName The name of the topic to publish to + * @return The status of the send request */ public void send(String message, String topicName) throws KafkaException { ProducerRecord kafkaMessagePayload = new ProducerRecord<>(topicName, "PartitionKey", message); -- cgit 1.2.3-korg