From 62841e53e41fd8e12b75641806f76c11878260ee Mon Sep 17 00:00:00 2001 From: sushant53 Date: Fri, 27 Oct 2023 16:44:30 +0530 Subject: [DCAEGEN2] Remove DMaaP dependency in PRH Removed DMaaP dependency in PRH by using new sdk library, which uses Kafka API directly. Issue-ID: DCAEGEN2-3402 Change-Id: I5456ce432a9fd4a58826275a17c603379b0c18ee Signed-off-by: sushant53 --- .../services/prh/configuration/CbsConfiguration.java | 8 +++++++- .../services/prh/controllers/ScheduleController.java | 1 + .../dcaegen2/services/prh/tasks/ScheduledTasksRunner.java | 13 ++++++++++++- 3 files changed, 20 insertions(+), 2 deletions(-) (limited to 'prh-app-server/src/main') diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsConfiguration.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsConfiguration.java index 64fff9a7..22763e8b 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsConfiguration.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsConfiguration.java @@ -58,7 +58,14 @@ public class CbsConfiguration implements Config { messageRouterSubscriber = DmaapClientFactory .createMessageRouterSubscriber(consulConfigurationParser.getMessageRouterSubscriberConfig()); + String prevTopicUrl = null; + if(messageRouterCBSSubscribeRequest != null) { + prevTopicUrl = messageRouterCBSSubscribeRequest.sourceDefinition().topicUrl(); + } messageRouterCBSSubscribeRequest = consulConfigurationParser.getMessageRouterSubscribeRequest(); + if(!messageRouterCBSSubscribeRequest.sourceDefinition().topicUrl().equals(prevTopicUrl)) { + messageRouterSubscriber.close(); + } } @Override @@ -95,5 +102,4 @@ public class CbsConfiguration implements Config { .orElseThrow(() -> new RuntimeException(CBS_CONFIG_MISSING)); } - } diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/ScheduleController.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/ScheduleController.java index fcbd10a5..aafcd81a 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/ScheduleController.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/ScheduleController.java @@ -65,6 +65,7 @@ public class ScheduleController { public Mono> stopTask() { LOGGER.trace("Receiving stop scheduling worker request"); return Mono.defer(() -> { + scheduledTasksRunner.closeKafkaPublisherSubscriber(); scheduledTasksRunner.cancelTasks(); return Mono.just(new ResponseEntity<>("PRH Service has been stopped!", HttpStatus.OK)); }); diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasksRunner.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasksRunner.java index e90b0271..5a5eb075 100644 --- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasksRunner.java +++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasksRunner.java @@ -25,6 +25,7 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.ScheduledFuture; import javax.annotation.PreDestroy; +import org.onap.dcaegen2.services.prh.configuration.CbsConfiguration; import org.onap.dcaegen2.services.prh.configuration.PrhProperties; import org.springframework.boot.context.event.ApplicationStartedEvent; import org.springframework.context.annotation.Configuration; @@ -46,11 +47,13 @@ public class ScheduledTasksRunner { private final TaskScheduler taskScheduler; private final ScheduledTasks scheduledTask; private final PrhProperties prhProperties; + private final CbsConfiguration cbsConfiguration; public ScheduledTasksRunner(TaskScheduler taskScheduler, ScheduledTasks scheduledTask, - PrhProperties prhProperties) { + PrhProperties prhProperties, CbsConfiguration cbsConfiguration) { this.taskScheduler = taskScheduler; this.scheduledTask = scheduledTask; this.prhProperties = prhProperties; + this.cbsConfiguration = cbsConfiguration; } @EventListener @@ -82,4 +85,12 @@ public class ScheduledTasksRunner { return false; } } + + /** + * Function for cleaning resources for kafka subscriber and publisher. + */ + public synchronized void closeKafkaPublisherSubscriber() { + cbsConfiguration.getMessageRouterSubscriber().close(); + cbsConfiguration.getMessageRouterPublisher().close(); + } } -- cgit 1.2.3-korg