diff options
Diffstat (limited to 'prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/KafkaConfig.java')
-rw-r--r-- | prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/KafkaConfig.java | 98 |
1 files changed, 54 insertions, 44 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/KafkaConfig.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/KafkaConfig.java index 8affe281..baaf3b16 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/KafkaConfig.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/KafkaConfig.java @@ -17,80 +17,90 @@ * limitations under the License. * ============LICENSE_END========================================================= */ - package org.onap.dcaegen2.services.prh.configuration; +import java.util.HashMap; +import java.util.Map; +import org.springframework.context.annotation.Profile; import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.context.annotation.Profile; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.ContainerProperties; -import java.util.HashMap; -import java.util.Map; - -/** - * @author <a href="mailto:pravin.kokane@t-systems.com">Pravin Kokane</a> on 3/13/23 - */ + /** + * * @author <a href="mailto:PRANIT.KAPDULE@t-systems.com">Pranit Kapdule</a> on + * * 24/08/23 + * */ @Profile("autoCommitDisabled") @EnableKafka @Configuration -public class KafkaConfig -{ - String kafkaBoostrapServerConfig = System.getenv("kafkaBoostrapServerConfig"); - - String groupIdConfig = System.getenv("groupIdConfig"); - - - String kafkaSecurityProtocol = System.getenv("kafkaSecurityProtocol"); - - String kafkaSaslMechanism = System.getenv("kafkaSaslMechanism"); - - String kafkaUsername = System.getenv("kafkaUsername"); - - String kafkaPassword = System.getenv("kafkaPassword"); - - String kafkaJaasConfig = System.getenv("JAAS_CONFIG"); - - String kafkaLoginModuleClassConfig = System.getenv("Login_Module_Class"); +public class KafkaConfig { + + CbsConfigurationForAutoCommitDisabledMode cbsConfigurationForAutoCommitDisabledMode; + + public String kafkaBoostrapServerConfig; + public String groupIdConfig; + public String kafkaSecurityProtocol; + public String kafkaSaslMechanism; + public String kafkaUsername; + public String kafkaPassword; + public String kafkaJaasConfigName; + public String kafkaLoginModuleClassConfig; + public String kafkaJaasConfig; + + public final String DEFAULT_KAFKA_SECURITY_PROTOCOL = "SASL_PLAINTEXT"; + public final String DEFAULT_KAFKA_SASL_MECHANISM = "SCRAM-SHA-512"; + + public KafkaConfig() { + + } @Bean - public ConsumerFactory<String, String> consumerFactory() - { - Map<String,Object> config = new HashMap<>(); - config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,kafkaBoostrapServerConfig); - config.put(ConsumerConfig.GROUP_ID_CONFIG,groupIdConfig); - config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); - config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer"); + public ConsumerFactory<String, String> consumerFactory(CbsConfigurationForAutoCommitDisabledMode cbsConfigurationForAutoCommitDisabledMode) { + this.cbsConfigurationForAutoCommitDisabledMode = cbsConfigurationForAutoCommitDisabledMode; + kafkaBoostrapServerConfig = cbsConfigurationForAutoCommitDisabledMode.getKafkaConfig() + .kafkaBoostrapServerConfig(); + groupIdConfig = cbsConfigurationForAutoCommitDisabledMode.getKafkaConfig().groupIdConfig(); + kafkaSecurityProtocol = cbsConfigurationForAutoCommitDisabledMode.getKafkaConfig().kafkaSecurityProtocol(); + kafkaSaslMechanism = cbsConfigurationForAutoCommitDisabledMode.getKafkaConfig().kafkaSaslMechanism(); + kafkaJaasConfig = cbsConfigurationForAutoCommitDisabledMode.getKafkaConfig().kafkaJaasConfig(); + + Map<String, Object> config = new HashMap<>(); + config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBoostrapServerConfig); + + config.put(ConsumerConfig.GROUP_ID_CONFIG, groupIdConfig); + config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.StringDeserializer"); + config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.StringDeserializer"); config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); - if(kafkaJaasConfig == null) { - kafkaJaasConfig = kafkaLoginModuleClassConfig + " required username=\"" - + kafkaUsername + "\" password=\"" + kafkaPassword + "\";"; - } - if(kafkaSecurityProtocol==null ) kafkaSecurityProtocol="SASL_PLAINTEXT"; + + if (kafkaSecurityProtocol == null) + kafkaSecurityProtocol = DEFAULT_KAFKA_SECURITY_PROTOCOL; config.put("security.protocol", kafkaSecurityProtocol); - if(kafkaSaslMechanism==null ) kafkaSaslMechanism="SCRAM-SHA-512"; + if (kafkaSaslMechanism == null) + kafkaSaslMechanism = DEFAULT_KAFKA_SASL_MECHANISM; config.put("sasl.mechanism", kafkaSaslMechanism); config.put("sasl.jaas.config", kafkaJaasConfig); return new DefaultKafkaConsumerFactory<>(config); + } @Bean - public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() - { + public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory( + CbsConfigurationForAutoCommitDisabledMode cbsConfigurationForAutoCommitDisabledMode) { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); - factory.setConsumerFactory(consumerFactory()); + factory.setConsumerFactory(consumerFactory(cbsConfigurationForAutoCommitDisabledMode)); factory.setBatchListener(true); factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL); return factory; } + } |