diff options
Diffstat (limited to 'components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/DmaapClient.java')
-rw-r--r-- | components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/DmaapClient.java | 45 |
1 files changed, 28 insertions, 17 deletions
diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/DmaapClient.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/DmaapClient.java index 6e0ea401..08a89541 100644 --- a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/DmaapClient.java +++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/DmaapClient.java @@ -30,7 +30,7 @@ import java.util.concurrent.TimeUnit; import javax.annotation.PostConstruct; -import org.onap.slice.analysis.ms.beans.Configuration; +import org.onap.slice.analysis.ms.models.Configuration; import org.onap.slice.analysis.ms.utils.DmaapUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,23 +69,29 @@ public class DmaapClient { @SuppressWarnings("unchecked") public synchronized void startClient() { - Map<String, Object> streamSubscribes = Configuration.getInstance().getStreamsSubscribes(); + Map<String, Object> streamSubscribes = configuration.getStreamsSubscribes(); String pmTopicUrl = ((Map<String, String>) ((Map<String, Object>) streamSubscribes .get("performance_management_topic")).get("dmaap_info")).get("topic_url"); String[] pmTopicSplit = pmTopicUrl.split("\\/"); String pmTopic = pmTopicSplit[pmTopicSplit.length - 1]; log.debug("pm topic : {}", pmTopic); + String policyResponseTopicUrl = ((Map<String, String>) ((Map<String, Object>) streamSubscribes .get("dcae_cl_response_topic")).get("dmaap_info")).get("topic_url"); String[] policyResponseTopicUrlSplit = policyResponseTopicUrl.split("\\/"); String policyResponseTopic = policyResponseTopicUrlSplit[policyResponseTopicUrlSplit.length - 1]; log.debug("policyResponse Topic : {}", policyResponseTopic); - CambriaConsumer pmNotifCambriaConsumer = null; - CambriaConsumer policyResponseCambriaConsumer = null; - - pmNotifCambriaConsumer = dmaapUtils.buildConsumer(configuration, pmTopic); - policyResponseCambriaConsumer = dmaapUtils.buildConsumer(configuration, policyResponseTopic); + + String intelligentSlicingTopicUrl = ((Map<String, String>) ((Map<String, Object>) streamSubscribes + .get("intelligent_slicing_topic")).get("dmaap_info")).get("topic_url"); + String[] intelligentSlicingTopicSplit = intelligentSlicingTopicUrl.split("\\/"); + String intelligentSlicingTopic = intelligentSlicingTopicSplit[intelligentSlicingTopicSplit.length - 1]; + log.debug("intelligent slicing topic : {}", pmTopic); + + CambriaConsumer pmNotifCambriaConsumer = dmaapUtils.buildConsumer(configuration, pmTopic); + CambriaConsumer policyResponseCambriaConsumer = dmaapUtils.buildConsumer(configuration, policyResponseTopic); + CambriaConsumer intelligentSlicingCambriaConsumer = dmaapUtils.buildConsumer(configuration, intelligentSlicingTopic); ScheduledExecutorService executorPool; @@ -97,16 +103,21 @@ public class DmaapClient { executorPool.scheduleAtFixedRate(pmNotificationConsumer, 0, configuration.getPollingInterval(), TimeUnit.SECONDS); - // create notification consumers for Policy - NotificationConsumer policyNotificationConsumer = new NotificationConsumer(policyResponseCambriaConsumer, - new PolicyNotificationCallback()); - // start policy notification consumer threads - executorPool = Executors.newScheduledThreadPool(10); - executorPool.scheduleAtFixedRate(policyNotificationConsumer, 0, configuration.getPollingInterval(), - TimeUnit.SECONDS); - - - + // create notification consumers for Policy + NotificationConsumer policyNotificationConsumer = new NotificationConsumer(policyResponseCambriaConsumer, + new PolicyNotificationCallback()); + // start policy notification consumer threads + executorPool = Executors.newScheduledThreadPool(10); + executorPool.scheduleAtFixedRate(policyNotificationConsumer, 0, configuration.getPollingInterval(), + TimeUnit.SECONDS); + + // create notification consumers for ML MS + NotificationConsumer intelligentSlicingConsumer = new NotificationConsumer(intelligentSlicingCambriaConsumer, + new IntelligentSlicingCallback()); + // start intelligent Slicing notification consumer threads + executorPool = Executors.newScheduledThreadPool(10); + executorPool.scheduleAtFixedRate(intelligentSlicingConsumer, 0, configuration.getPollingInterval(), + TimeUnit.SECONDS); } } |