diff options
Diffstat (limited to 'components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java')
-rw-r--r-- | components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java | 15 |
1 files changed, 11 insertions, 4 deletions
diff --git a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java index 58b27834..21e1a08f 100644 --- a/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java +++ b/components/datalake-handler/feeder/src/main/java/org/onap/datalake/feeder/service/TopicConfigPollingService.java @@ -37,7 +37,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; /** - * Service to check topic changes in Kafka and topic setting updates + * Service to check topic changes in Kafka and topic setting updates in DB * * @author Guobiao Mo * @@ -74,7 +74,7 @@ public class TopicConfigPollingService implements Runnable { } } - public boolean isActiveTopicsChanged(boolean update) { + public boolean isActiveTopicsChanged(boolean update) {//update=true means sync local version boolean changed = currentActiveTopicsVersion > activeTopicsVersionLocal.get(); log.debug("isActiveTopicsChanged={}, currentActiveTopicsVersion={} local={}", changed, currentActiveTopicsVersion, activeTopicsVersionLocal.get()); if (changed && update) { @@ -96,10 +96,13 @@ public class TopicConfigPollingService implements Runnable { public void run() { active = true; log.info("TopicConfigPollingService started."); - + while (active) { try { //sleep first since we already pool in init() Thread.sleep(config.getDmaapCheckNewTopicInterval()); + if(!active) { + break; + } } catch (InterruptedException e) { log.error("Thread.sleep(config.getDmaapCheckNewTopicInterval())", e); Thread.currentThread().interrupt(); @@ -131,7 +134,11 @@ public class TopicConfigPollingService implements Runnable { private List<String> poll() throws IOException { log.debug("poll(), use dmaapService to getActiveTopicConfigs..."); List<TopicConfig> activeTopicConfigs = dmaapService.getActiveTopicConfigs(); - activeTopicConfigs.stream().forEach(topicConfig -> effectiveTopicConfigMap.put(topicConfig.getName(), topicConfig)); + Map<String, TopicConfig> tempEffectiveTopicConfigMap = new HashMap<>(); + + activeTopicConfigs.stream().forEach(topicConfig -> tempEffectiveTopicConfigMap.put(topicConfig.getName(), topicConfig)); + effectiveTopicConfigMap = tempEffectiveTopicConfigMap; + log.debug("poll(), effectiveTopicConfigMap={}", effectiveTopicConfigMap); List<String> ret = new ArrayList<>(activeTopicConfigs.size()); activeTopicConfigs.stream().forEach(topicConfig -> ret.add(topicConfig.getName())); |