summaryrefslogtreecommitdiffstats
path: root/ANR-App/src/main/java/org/onap/ranapp/kafka/client/KafkaClientRanappTopic.java
diff options
context:
space:
mode:
Diffstat (limited to 'ANR-App/src/main/java/org/onap/ranapp/kafka/client/KafkaClientRanappTopic.java')
-rw-r--r--ANR-App/src/main/java/org/onap/ranapp/kafka/client/KafkaClientRanappTopic.java100
1 files changed, 100 insertions, 0 deletions
diff --git a/ANR-App/src/main/java/org/onap/ranapp/kafka/client/KafkaClientRanappTopic.java b/ANR-App/src/main/java/org/onap/ranapp/kafka/client/KafkaClientRanappTopic.java
new file mode 100644
index 0000000..e80b8e1
--- /dev/null
+++ b/ANR-App/src/main/java/org/onap/ranapp/kafka/client/KafkaClientRanappTopic.java
@@ -0,0 +1,100 @@
+package org.onap.ranapp.kafka.client;
+import java.util.Arrays;
+import java.util.Properties;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.onap.ranapp.kafka.listener.RanAppEventConsumer;
+import org.onap.ranapp.kafka.model.appmodels.PolicytypetoTopicMapping;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.http.ResponseEntity;
+import org.springframework.retry.backoff.FixedBackOffPolicy;
+import org.springframework.retry.policy.SimpleRetryPolicy;
+import org.springframework.retry.support.RetryTemplate;
+import org.springframework.stereotype.Component;
+import org.springframework.web.client.RestTemplate;
+import lombok.AccessLevel;
+import lombok.Getter;
+import reactor.core.Disposable;
+@Component
+public class KafkaClientRanappTopic {
+
+ private static final Logger logger = LoggerFactory.getLogger(KafkaClientRanappTopic.class);
+
+ @Value("${kafkadispatcher.url}")
+ private String kafkaclient_url;
+ @Value("${ranapp.policytype}")
+ private String policytype_name;
+
+ @Value(value = "${kafka.bootstrapAddress}")
+ private String bootstrapAddress;
+ @Value(value = "${ranapp.testing.topic.id}")
+ private String groupId;
+
+ public static String requestTopicName = "";
+
+ public static String responseTopicName = "";
+
+ @Getter(AccessLevel.PROTECTED)
+ private Disposable refreshTask = null;
+
+ @Autowired
+ RanAppEventConsumer ranappeventconsumer;
+
+ private RestTemplate restTemplate = new RestTemplate();
+
+ @Bean
+ public RetryTemplate retryTemplate() {
+ int maxAttempt = Integer.parseInt("10");
+ int retryTimeInterval = Integer.parseInt("5000");
+ SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
+ retryPolicy.setMaxAttempts(maxAttempt);
+ FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
+ backOffPolicy.setBackOffPeriod(retryTimeInterval); // 1.5 seconds
+ RetryTemplate template = new RetryTemplate();
+ template.setRetryPolicy(retryPolicy);
+ template.setBackOffPolicy(backOffPolicy);
+ return template;
+ }
+
+ public void getRanApp_KafkaTopicnames() {
+ logger.info("Requesting for KafkaTopics");
+ stop();
+ refreshTask=retryTemplate().execute(context -> {
+ String url = kafkaclient_url + policytype_name ;
+ System.out.println("retrying Connection: "+url);
+
+
+ ResponseEntity<PolicytypetoTopicMapping> requestData = restTemplate.getForEntity(url, PolicytypetoTopicMapping.class);
+ PolicytypetoTopicMapping policytoTopicMapping=requestData.getBody();
+ requestTopicName=policytoTopicMapping.getRequest_topic();
+ responseTopicName=policytoTopicMapping.getResponse_topic();
+ logger.info("Request Topic for Policy "+ policytype_name +" "+ requestTopicName);
+ logger.info("Response Topic for Policy "+ policytype_name +" "+ responseTopicName);
+
+ Properties properties=new Properties();
+ properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapAddress);
+ properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
+ properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
+ properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,groupId);
+ properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
+ //creating consumer
+ KafkaConsumer<String,String> consumer= new KafkaConsumer<String,String>(properties);
+ //Subscribing
+ consumer.subscribe(Arrays.asList(requestTopicName));
+ ranappeventconsumer.consume(consumer,requestTopicName,responseTopicName);
+ logger.info("Response ..."+ requestData);
+ throw new IllegalStateException("Something went wrong");
+ });
+ }
+
+ public void stop() {
+ if (refreshTask != null) {
+ refreshTask.dispose();
+ }
+ }
+}