summaryrefslogtreecommitdiffstats
path: root/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java
diff options
context:
space:
mode:
Diffstat (limited to 'components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java')
-rw-r--r--components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java15
1 files changed, 11 insertions, 4 deletions
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java
index 58b27834..21e1a08f 100644
--- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java
+++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java
@@ -37,7 +37,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
- * Service to check topic changes in Kafka and topic setting updates
+ * Service to check topic changes in Kafka and topic setting updates in DB
*
* @author Guobiao Mo
*
@@ -74,7 +74,7 @@ public class TopicConfigPollingService implements Runnable {
}
}
- public boolean isActiveTopicsChanged(boolean update) {
+ public boolean isActiveTopicsChanged(boolean update) {//update=true means sync local version
boolean changed = currentActiveTopicsVersion > activeTopicsVersionLocal.get();
log.debug("isActiveTopicsChanged={}, currentActiveTopicsVersion={} local={}", changed, currentActiveTopicsVersion, activeTopicsVersionLocal.get());
if (changed && update) {
@@ -96,10 +96,13 @@ public class TopicConfigPollingService implements Runnable {
public void run() {
active = true;
log.info("TopicConfigPollingService started.");
-
+
while (active) {
try { //sleep first since we already pool in init()
Thread.sleep(config.getDmaapCheckNewTopicInterval());
+ if(!active) {
+ break;
+ }
} catch (InterruptedException e) {
log.error("Thread.sleep(config.getDmaapCheckNewTopicInterval())", e);
Thread.currentThread().interrupt();
@@ -131,7 +134,11 @@ public class TopicConfigPollingService implements Runnable {
private List<String> poll() throws IOException {
log.debug("poll(), use dmaapService to getActiveTopicConfigs...");
List<TopicConfig> activeTopicConfigs = dmaapService.getActiveTopicConfigs();
- activeTopicConfigs.stream().forEach(topicConfig -> effectiveTopicConfigMap.put(topicConfig.getName(), topicConfig));
+ Map<String, TopicConfig> tempEffectiveTopicConfigMap = new HashMap<>();
+
+ activeTopicConfigs.stream().forEach(topicConfig -> tempEffectiveTopicConfigMap.put(topicConfig.getName(), topicConfig));
+ effectiveTopicConfigMap = tempEffectiveTopicConfigMap;
+ log.debug("poll(), effectiveTopicConfigMap={}", effectiveTopicConfigMap);
List<String> ret = new ArrayList<>(activeTopicConfigs.size());
activeTopicConfigs.stream().forEach(topicConfig -> ret.add(topicConfig.getName()));