summaryrefslogtreecommitdiffstats
path: root/ANR-App/src/main/java/org/onap/ranapp/kafka/client
diff options
context:
space:
mode:
authorvvarvate <vv00489503@techmahindra.com>2022-09-14 18:04:56 +0530
committervvarvate <vv00489503@techmahindra.com>2022-10-28 12:02:19 +0530
commitf94d8ec01991c9cfc8e4c74de2e763e72d4fbbbf (patch)
treed950078ceff5d702e263d965257643ce9ecd5eca /ANR-App/src/main/java/org/onap/ranapp/kafka/client
parent746cc0a4aa1ada72d98ed161322fb2bd1e359637 (diff)
Create RAN App in RAN-Sim to support A1-based action for SON Use Case
Issue-ID: INT-2129 Signed-off-by: vvarvate <vv00489503@techmahindra.com> Change-Id: Iba1bd825a612ea93ea5611dda818330bb399642c
Diffstat (limited to 'ANR-App/src/main/java/org/onap/ranapp/kafka/client')
-rw-r--r--ANR-App/src/main/java/org/onap/ranapp/kafka/client/KafkaClientRanappTopic.java100
1 files changed, 100 insertions, 0 deletions
diff --git a/ANR-App/src/main/java/org/onap/ranapp/kafka/client/KafkaClientRanappTopic.java b/ANR-App/src/main/java/org/onap/ranapp/kafka/client/KafkaClientRanappTopic.java
new file mode 100644
index 0000000..e80b8e1
--- /dev/null
+++ b/ANR-App/src/main/java/org/onap/ranapp/kafka/client/KafkaClientRanappTopic.java
@@ -0,0 +1,100 @@
+package org.onap.ranapp.kafka.client;
+import java.util.Arrays;
+import java.util.Properties;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.onap.ranapp.kafka.listener.RanAppEventConsumer;
+import org.onap.ranapp.kafka.model.appmodels.PolicytypetoTopicMapping;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.http.ResponseEntity;
+import org.springframework.retry.backoff.FixedBackOffPolicy;
+import org.springframework.retry.policy.SimpleRetryPolicy;
+import org.springframework.retry.support.RetryTemplate;
+import org.springframework.stereotype.Component;
+import org.springframework.web.client.RestTemplate;
+import lombok.AccessLevel;
+import lombok.Getter;
+import reactor.core.Disposable;
+@Component
+public class KafkaClientRanappTopic {
+
+ private static final Logger logger = LoggerFactory.getLogger(KafkaClientRanappTopic.class);
+
+ @Value("${kafkadispatcher.url}")
+ private String kafkaclient_url;
+ @Value("${ranapp.policytype}")
+ private String policytype_name;
+
+ @Value(value = "${kafka.bootstrapAddress}")
+ private String bootstrapAddress;
+ @Value(value = "${ranapp.testing.topic.id}")
+ private String groupId;
+
+ public static String requestTopicName = "";
+
+ public static String responseTopicName = "";
+
+ @Getter(AccessLevel.PROTECTED)
+ private Disposable refreshTask = null;
+
+ @Autowired
+ RanAppEventConsumer ranappeventconsumer;
+
+ private RestTemplate restTemplate = new RestTemplate();
+
+ @Bean
+ public RetryTemplate retryTemplate() {
+ int maxAttempt = Integer.parseInt("10");
+ int retryTimeInterval = Integer.parseInt("5000");
+ SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
+ retryPolicy.setMaxAttempts(maxAttempt);
+ FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
+ backOffPolicy.setBackOffPeriod(retryTimeInterval); // 1.5 seconds
+ RetryTemplate template = new RetryTemplate();
+ template.setRetryPolicy(retryPolicy);
+ template.setBackOffPolicy(backOffPolicy);
+ return template;
+ }
+
+ public void getRanApp_KafkaTopicnames() {
+ logger.info("Requesting for KafkaTopics");
+ stop();
+ refreshTask=retryTemplate().execute(context -> {
+ String url = kafkaclient_url + policytype_name ;
+ System.out.println("retrying Connection: "+url);
+
+
+ ResponseEntity<PolicytypetoTopicMapping> requestData = restTemplate.getForEntity(url, PolicytypetoTopicMapping.class);
+ PolicytypetoTopicMapping policytoTopicMapping=requestData.getBody();
+ requestTopicName=policytoTopicMapping.getRequest_topic();
+ responseTopicName=policytoTopicMapping.getResponse_topic();
+ logger.info("Request Topic for Policy "+ policytype_name +" "+ requestTopicName);
+ logger.info("Response Topic for Policy "+ policytype_name +" "+ responseTopicName);
+
+ Properties properties=new Properties();
+ properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapAddress);
+ properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
+ properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,groupId);
+ properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
+ //creating consumer
+ KafkaConsumer<String,String> consumer= new KafkaConsumer<String,String>(properties);
+ //Subscribing
+ consumer.subscribe(Arrays.asList(requestTopicName));
+ ranappeventconsumer.consume(consumer,requestTopicName,responseTopicName);
+ logger.info("Response ..."+ requestData);
+ throw new IllegalStateException("Something went wrong");
+ });
+ }
+
+ public void stop() {
+ if (refreshTask != null) {
+ refreshTask.dispose();
+ }
+ }
+}