From f94d8ec01991c9cfc8e4c74de2e763e72d4fbbbf Mon Sep 17 00:00:00 2001 From: vvarvate Date: Wed, 14 Sep 2022 18:04:56 +0530 Subject: Create RAN App in RAN-Sim to support A1-based action for SON Use Case Issue-ID: INT-2129 Signed-off-by: vvarvate Change-Id: Iba1bd825a612ea93ea5611dda818330bb399642c --- .../kafka/client/KafkaClientRanappTopic.java | 100 +++++++++++++++++++++ 1 file changed, 100 insertions(+) create mode 100644 ANR-App/src/main/java/org/onap/ranapp/kafka/client/KafkaClientRanappTopic.java (limited to 'ANR-App/src/main/java/org/onap/ranapp/kafka/client') diff --git a/ANR-App/src/main/java/org/onap/ranapp/kafka/client/KafkaClientRanappTopic.java b/ANR-App/src/main/java/org/onap/ranapp/kafka/client/KafkaClientRanappTopic.java new file mode 100644 index 0000000..e80b8e1 --- /dev/null +++ b/ANR-App/src/main/java/org/onap/ranapp/kafka/client/KafkaClientRanappTopic.java @@ -0,0 +1,100 @@ +package org.onap.ranapp.kafka.client; +import java.util.Arrays; +import java.util.Properties; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.onap.ranapp.kafka.listener.RanAppEventConsumer; +import org.onap.ranapp.kafka.model.appmodels.PolicytypetoTopicMapping; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.http.ResponseEntity; +import org.springframework.retry.backoff.FixedBackOffPolicy; +import org.springframework.retry.policy.SimpleRetryPolicy; +import org.springframework.retry.support.RetryTemplate; +import org.springframework.stereotype.Component; +import org.springframework.web.client.RestTemplate; +import lombok.AccessLevel; +import lombok.Getter; +import reactor.core.Disposable; +@Component +public class KafkaClientRanappTopic { + + private static final Logger logger = LoggerFactory.getLogger(KafkaClientRanappTopic.class); + + @Value("${kafkadispatcher.url}") + private String kafkaclient_url; + @Value("${ranapp.policytype}") + private String policytype_name; + + @Value(value = "${kafka.bootstrapAddress}") + private String bootstrapAddress; + @Value(value = "${ranapp.testing.topic.id}") + private String groupId; + + public static String requestTopicName = ""; + + public static String responseTopicName = ""; + + @Getter(AccessLevel.PROTECTED) + private Disposable refreshTask = null; + + @Autowired + RanAppEventConsumer ranappeventconsumer; + + private RestTemplate restTemplate = new RestTemplate(); + + @Bean + public RetryTemplate retryTemplate() { + int maxAttempt = Integer.parseInt("10"); + int retryTimeInterval = Integer.parseInt("5000"); + SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy(); + retryPolicy.setMaxAttempts(maxAttempt); + FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy(); + backOffPolicy.setBackOffPeriod(retryTimeInterval); // 1.5 seconds + RetryTemplate template = new RetryTemplate(); + template.setRetryPolicy(retryPolicy); + template.setBackOffPolicy(backOffPolicy); + return template; + } + + public void getRanApp_KafkaTopicnames() { + logger.info("Requesting for KafkaTopics"); + stop(); + refreshTask=retryTemplate().execute(context -> { + String url = kafkaclient_url + policytype_name ; + System.out.println("retrying Connection: "+url); + + + ResponseEntity requestData = restTemplate.getForEntity(url, PolicytypetoTopicMapping.class); + PolicytypetoTopicMapping policytoTopicMapping=requestData.getBody(); + requestTopicName=policytoTopicMapping.getRequest_topic(); + responseTopicName=policytoTopicMapping.getResponse_topic(); + logger.info("Request Topic for Policy "+ policytype_name +" "+ requestTopicName); + logger.info("Response Topic for Policy "+ policytype_name +" "+ responseTopicName); + + Properties properties=new Properties(); + properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapAddress); + properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName()); + properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,groupId); + properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"); + //creating consumer + KafkaConsumer consumer= new KafkaConsumer(properties); + //Subscribing + consumer.subscribe(Arrays.asList(requestTopicName)); + ranappeventconsumer.consume(consumer,requestTopicName,responseTopicName); + logger.info("Response ..."+ requestData); + throw new IllegalStateException("Something went wrong"); + }); + } + + public void stop() { + if (refreshTask != null) { + refreshTask.dispose(); + } + } +} -- cgit 1.2.3-korg