summaryrefslogtreecommitdiffstats
path: root/ANR-App/src/main/java/org/onap/ranapp/kafka/client/KafkaClientRanappTopic.java
diff options
context:
space:
mode:
Diffstat (limited to 'ANR-App/src/main/java/org/onap/ranapp/kafka/client/KafkaClientRanappTopic.java')
-rw-r--r--ANR-App/src/main/java/org/onap/ranapp/kafka/client/KafkaClientRanappTopic.java29
1 files changed, 15 insertions, 14 deletions
diff --git a/ANR-App/src/main/java/org/onap/ranapp/kafka/client/KafkaClientRanappTopic.java b/ANR-App/src/main/java/org/onap/ranapp/kafka/client/KafkaClientRanappTopic.java
index e80b8e1..090fbfd 100644
--- a/ANR-App/src/main/java/org/onap/ranapp/kafka/client/KafkaClientRanappTopic.java
+++ b/ANR-App/src/main/java/org/onap/ranapp/kafka/client/KafkaClientRanappTopic.java
@@ -1,3 +1,18 @@
+/*
+ * Copyright (C) 2022 CAPGEMINI ENGINEERING.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at:
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.onap.ranapp.kafka.client;
import java.util.Arrays;
import java.util.Properties;
@@ -22,31 +37,22 @@ import lombok.Getter;
import reactor.core.Disposable;
@Component
public class KafkaClientRanappTopic {
-
private static final Logger logger = LoggerFactory.getLogger(KafkaClientRanappTopic.class);
-
@Value("${kafkadispatcher.url}")
private String kafkaclient_url;
@Value("${ranapp.policytype}")
private String policytype_name;
-
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
@Value(value = "${ranapp.testing.topic.id}")
private String groupId;
-
public static String requestTopicName = "";
-
public static String responseTopicName = "";
-
@Getter(AccessLevel.PROTECTED)
private Disposable refreshTask = null;
-
@Autowired
RanAppEventConsumer ranappeventconsumer;
-
private RestTemplate restTemplate = new RestTemplate();
-
@Bean
public RetryTemplate retryTemplate() {
int maxAttempt = Integer.parseInt("10");
@@ -60,22 +66,18 @@ public class KafkaClientRanappTopic {
template.setBackOffPolicy(backOffPolicy);
return template;
}
-
public void getRanApp_KafkaTopicnames() {
logger.info("Requesting for KafkaTopics");
stop();
refreshTask=retryTemplate().execute(context -> {
String url = kafkaclient_url + policytype_name ;
System.out.println("retrying Connection: "+url);
-
-
ResponseEntity<PolicytypetoTopicMapping> requestData = restTemplate.getForEntity(url, PolicytypetoTopicMapping.class);
PolicytypetoTopicMapping policytoTopicMapping=requestData.getBody();
requestTopicName=policytoTopicMapping.getRequest_topic();
responseTopicName=policytoTopicMapping.getResponse_topic();
logger.info("Request Topic for Policy "+ policytype_name +" "+ requestTopicName);
logger.info("Response Topic for Policy "+ policytype_name +" "+ responseTopicName);
-
Properties properties=new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapAddress);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
@@ -91,7 +93,6 @@ public class KafkaClientRanappTopic {
throw new IllegalStateException("Something went wrong");
});
}
-
public void stop() {
if (refreshTask != null) {
refreshTask.dispose();