From 59ec623b673959cb44937ba601abbe15c9b4501e Mon Sep 17 00:00:00 2001 From: vvarvate Date: Tue, 29 Nov 2022 00:30:51 +0530 Subject: INT-2142 RAN App Integration with A1 Terminator RAN App will interact with A1 Terminator to start receiving A1 Policies Added license Anotations for all files Issue-ID: INT-2129 Signed-off-by: vvarvate Change-Id: I61c90086b87766119ba7939a06e93bd2e7d22bad --- .../kafka/client/KafkaClientRanappTopic.java | 29 +++++++++++----------- 1 file changed, 15 insertions(+), 14 deletions(-) (limited to 'ANR-App/src/main/java/org/onap/ranapp/kafka/client') diff --git a/ANR-App/src/main/java/org/onap/ranapp/kafka/client/KafkaClientRanappTopic.java b/ANR-App/src/main/java/org/onap/ranapp/kafka/client/KafkaClientRanappTopic.java index e80b8e1..090fbfd 100644 --- a/ANR-App/src/main/java/org/onap/ranapp/kafka/client/KafkaClientRanappTopic.java +++ b/ANR-App/src/main/java/org/onap/ranapp/kafka/client/KafkaClientRanappTopic.java @@ -1,3 +1,18 @@ +/* + * Copyright (C) 2022 CAPGEMINI ENGINEERING. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package org.onap.ranapp.kafka.client; import java.util.Arrays; import java.util.Properties; @@ -22,31 +37,22 @@ import lombok.Getter; import reactor.core.Disposable; @Component public class KafkaClientRanappTopic { - private static final Logger logger = LoggerFactory.getLogger(KafkaClientRanappTopic.class); - @Value("${kafkadispatcher.url}") private String kafkaclient_url; @Value("${ranapp.policytype}") private String policytype_name; - @Value(value = "${kafka.bootstrapAddress}") private String bootstrapAddress; @Value(value = "${ranapp.testing.topic.id}") private String groupId; - public static String requestTopicName = ""; - public static String responseTopicName = ""; - @Getter(AccessLevel.PROTECTED) private Disposable refreshTask = null; - @Autowired RanAppEventConsumer ranappeventconsumer; - private RestTemplate restTemplate = new RestTemplate(); - @Bean public RetryTemplate retryTemplate() { int maxAttempt = Integer.parseInt("10"); @@ -60,22 +66,18 @@ public class KafkaClientRanappTopic { template.setBackOffPolicy(backOffPolicy); return template; } - public void getRanApp_KafkaTopicnames() { logger.info("Requesting for KafkaTopics"); stop(); refreshTask=retryTemplate().execute(context -> { String url = kafkaclient_url + policytype_name ; System.out.println("retrying Connection: "+url); - - ResponseEntity requestData = restTemplate.getForEntity(url, PolicytypetoTopicMapping.class); PolicytypetoTopicMapping policytoTopicMapping=requestData.getBody(); requestTopicName=policytoTopicMapping.getRequest_topic(); responseTopicName=policytoTopicMapping.getResponse_topic(); logger.info("Request Topic for Policy "+ policytype_name +" "+ requestTopicName); logger.info("Response Topic for Policy "+ policytype_name +" "+ responseTopicName); - Properties properties=new Properties(); properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapAddress); properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); @@ -91,7 +93,6 @@ public class KafkaClientRanappTopic { throw new IllegalStateException("Something went wrong"); }); } - public void stop() { if (refreshTask != null) { refreshTask.dispose(); -- cgit 1.2.3-korg