diff options
author | sangeeta.bellara <sangeeta.bellara@t-systems.com> | 2023-03-09 22:13:03 +0530 |
---|---|---|
committer | Sangeeta Bellara <sangeeta.bellara@t-systems.com> | 2023-03-28 06:26:20 +0000 |
commit | e1e4154a3914f0878e26ea2e1683ebdfe507f5a9 (patch) | |
tree | 6dc21739b666f486daab905de63f2ca5e1aa72d9 /prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/KafkaConfig.java | |
parent | fb93a46a63c859434cecdbd07c0e08531c366855 (diff) |
PRH Code Additions for Early PNF registrations1.9.0
Issue-ID: DCAEGEN2-3312
Change-Id: Id9b1ca83390af3675e26fc61ccc8d12611ab8ddf
Signed-off-by: sangeeta.bellara <sangeeta.bellara@t-systems.com>
Signed-off-by: sangeeta.bellara <sangeeta.bellara@t-systems.com>
Change-Id: I9bc25bc1343c40aca5644de3fd30f7c2142c1a47
Signed-off-by: sangeeta.bellara <sangeeta.bellara@t-systems.com>
Diffstat (limited to 'prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/KafkaConfig.java')
-rw-r--r-- | prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/KafkaConfig.java | 96 |
1 files changed, 96 insertions, 0 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/KafkaConfig.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/KafkaConfig.java new file mode 100644 index 00000000..8affe281 --- /dev/null +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/KafkaConfig.java @@ -0,0 +1,96 @@ +/* + * ============LICENSE_START======================================================= + * PNF-REGISTRATION-HANDLER + * ================================================================================ + * Copyright (C) 2023 Deutsche Telekom Intellectual Property. All rights reserved. + * ================================================================================ + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + */ + +package org.onap.dcaegen2.services.prh.configuration; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Profile; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.listener.ContainerProperties; + +import java.util.HashMap; +import java.util.Map; + +/** + * @author <a href="mailto:pravin.kokane@t-systems.com">Pravin Kokane</a> on 3/13/23 + */ + +@Profile("autoCommitDisabled") +@EnableKafka +@Configuration +public class KafkaConfig +{ + String kafkaBoostrapServerConfig = System.getenv("kafkaBoostrapServerConfig"); + + String groupIdConfig = System.getenv("groupIdConfig"); + + + String kafkaSecurityProtocol = System.getenv("kafkaSecurityProtocol"); + + String kafkaSaslMechanism = System.getenv("kafkaSaslMechanism"); + + String kafkaUsername = System.getenv("kafkaUsername"); + + String kafkaPassword = System.getenv("kafkaPassword"); + + String kafkaJaasConfig = System.getenv("JAAS_CONFIG"); + + String kafkaLoginModuleClassConfig = System.getenv("Login_Module_Class"); + + @Bean + public ConsumerFactory<String, String> consumerFactory() + { + Map<String,Object> config = new HashMap<>(); + config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,kafkaBoostrapServerConfig); + config.put(ConsumerConfig.GROUP_ID_CONFIG,groupIdConfig); + config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); + config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer"); + config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); + if(kafkaJaasConfig == null) { + kafkaJaasConfig = kafkaLoginModuleClassConfig + " required username=\"" + + kafkaUsername + "\" password=\"" + kafkaPassword + "\";"; + } + if(kafkaSecurityProtocol==null ) kafkaSecurityProtocol="SASL_PLAINTEXT"; + config.put("security.protocol", kafkaSecurityProtocol); + if(kafkaSaslMechanism==null ) kafkaSaslMechanism="SCRAM-SHA-512"; + config.put("sasl.mechanism", kafkaSaslMechanism); + + config.put("sasl.jaas.config", kafkaJaasConfig); + + return new DefaultKafkaConsumerFactory<>(config); + } + + @Bean + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() + { + ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory()); + factory.setBatchListener(true); + factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL); + return factory; + } +} |