summaryrefslogtreecommitdiffstats
path: root/prh-app-server/src/main/java
diff options
context:
space:
mode:
Diffstat (limited to 'prh-app-server/src/main/java')
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsConfiguration.java8
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/ScheduleController.java1
-rw-r--r--prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasksRunner.java13
3 files changed, 20 insertions, 2 deletions
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsConfiguration.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsConfiguration.java
index 64fff9a7..22763e8b 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsConfiguration.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/configuration/CbsConfiguration.java
@@ -58,7 +58,14 @@ public class CbsConfiguration implements Config {
messageRouterSubscriber = DmaapClientFactory
.createMessageRouterSubscriber(consulConfigurationParser.getMessageRouterSubscriberConfig());
+ String prevTopicUrl = null;
+ if(messageRouterCBSSubscribeRequest != null) {
+ prevTopicUrl = messageRouterCBSSubscribeRequest.sourceDefinition().topicUrl();
+ }
messageRouterCBSSubscribeRequest = consulConfigurationParser.getMessageRouterSubscribeRequest();
+ if(!messageRouterCBSSubscribeRequest.sourceDefinition().topicUrl().equals(prevTopicUrl)) {
+ messageRouterSubscriber.close();
+ }
}
@Override
@@ -95,5 +102,4 @@ public class CbsConfiguration implements Config {
.orElseThrow(() -> new RuntimeException(CBS_CONFIG_MISSING));
}
-
}
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/ScheduleController.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/ScheduleController.java
index fcbd10a5..aafcd81a 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/ScheduleController.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/controllers/ScheduleController.java
@@ -65,6 +65,7 @@ public class ScheduleController {
public Mono<ResponseEntity<String>> stopTask() {
LOGGER.trace("Receiving stop scheduling worker request");
return Mono.defer(() -> {
+ scheduledTasksRunner.closeKafkaPublisherSubscriber();
scheduledTasksRunner.cancelTasks();
return Mono.just(new ResponseEntity<>("PRH Service has been stopped!", HttpStatus.OK));
});
diff --git a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasksRunner.java b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasksRunner.java
index e90b0271..5a5eb075 100644
--- a/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasksRunner.java
+++ b/prh-app-server/src/main/java/org/onap/dcaegen2/services/prh/tasks/ScheduledTasksRunner.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ScheduledFuture;
import javax.annotation.PreDestroy;
+import org.onap.dcaegen2.services.prh.configuration.CbsConfiguration;
import org.onap.dcaegen2.services.prh.configuration.PrhProperties;
import org.springframework.boot.context.event.ApplicationStartedEvent;
import org.springframework.context.annotation.Configuration;
@@ -46,11 +47,13 @@ public class ScheduledTasksRunner {
private final TaskScheduler taskScheduler;
private final ScheduledTasks scheduledTask;
private final PrhProperties prhProperties;
+ private final CbsConfiguration cbsConfiguration;
public ScheduledTasksRunner(TaskScheduler taskScheduler, ScheduledTasks scheduledTask,
- PrhProperties prhProperties) {
+ PrhProperties prhProperties, CbsConfiguration cbsConfiguration) {
this.taskScheduler = taskScheduler;
this.scheduledTask = scheduledTask;
this.prhProperties = prhProperties;
+ this.cbsConfiguration = cbsConfiguration;
}
@EventListener
@@ -82,4 +85,12 @@ public class ScheduledTasksRunner {
return false;
}
}
+
+ /**
+ * Function for cleaning resources for kafka subscriber and publisher.
+ */
+ public synchronized void closeKafkaPublisherSubscriber() {
+ cbsConfiguration.getMessageRouterSubscriber().close();
+ cbsConfiguration.getMessageRouterPublisher().close();
+ }
}