From 8882e23eedce9e9236e1d979b2056b62dd974d91 Mon Sep 17 00:00:00 2001 From: dhebeha Date: Tue, 8 Sep 2020 13:02:32 +0530 Subject: Add support to consume, process pm message from DB - Add support for analysing pm data - Add support to trigger closed loop - Add support for configDb Interface Implementation - Add support for Intelligent slicing Issue-ID: DCAEGEN2-2255 Signed-off-by: dhebeha Change-Id: I185dbb6da45ae6ee74f0a090e2d604914163588b --- .../onap/slice/analysis/ms/dmaap/DmaapClient.java | 45 ++++++++++++++-------- 1 file changed, 28 insertions(+), 17 deletions(-) (limited to 'components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/DmaapClient.java') diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/DmaapClient.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/DmaapClient.java index 6e0ea401..08a89541 100644 --- a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/DmaapClient.java +++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/DmaapClient.java @@ -30,7 +30,7 @@ import java.util.concurrent.TimeUnit; import javax.annotation.PostConstruct; -import org.onap.slice.analysis.ms.beans.Configuration; +import org.onap.slice.analysis.ms.models.Configuration; import org.onap.slice.analysis.ms.utils.DmaapUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,23 +69,29 @@ public class DmaapClient { @SuppressWarnings("unchecked") public synchronized void startClient() { - Map streamSubscribes = Configuration.getInstance().getStreamsSubscribes(); + Map streamSubscribes = configuration.getStreamsSubscribes(); String pmTopicUrl = ((Map) ((Map) streamSubscribes .get("performance_management_topic")).get("dmaap_info")).get("topic_url"); String[] pmTopicSplit = pmTopicUrl.split("\\/"); String pmTopic = pmTopicSplit[pmTopicSplit.length - 1]; log.debug("pm topic : {}", pmTopic); + String policyResponseTopicUrl = ((Map) ((Map) streamSubscribes .get("dcae_cl_response_topic")).get("dmaap_info")).get("topic_url"); String[] policyResponseTopicUrlSplit = policyResponseTopicUrl.split("\\/"); String policyResponseTopic = policyResponseTopicUrlSplit[policyResponseTopicUrlSplit.length - 1]; log.debug("policyResponse Topic : {}", policyResponseTopic); - CambriaConsumer pmNotifCambriaConsumer = null; - CambriaConsumer policyResponseCambriaConsumer = null; - - pmNotifCambriaConsumer = dmaapUtils.buildConsumer(configuration, pmTopic); - policyResponseCambriaConsumer = dmaapUtils.buildConsumer(configuration, policyResponseTopic); + + String intelligentSlicingTopicUrl = ((Map) ((Map) streamSubscribes + .get("intelligent_slicing_topic")).get("dmaap_info")).get("topic_url"); + String[] intelligentSlicingTopicSplit = intelligentSlicingTopicUrl.split("\\/"); + String intelligentSlicingTopic = intelligentSlicingTopicSplit[intelligentSlicingTopicSplit.length - 1]; + log.debug("intelligent slicing topic : {}", pmTopic); + + CambriaConsumer pmNotifCambriaConsumer = dmaapUtils.buildConsumer(configuration, pmTopic); + CambriaConsumer policyResponseCambriaConsumer = dmaapUtils.buildConsumer(configuration, policyResponseTopic); + CambriaConsumer intelligentSlicingCambriaConsumer = dmaapUtils.buildConsumer(configuration, intelligentSlicingTopic); ScheduledExecutorService executorPool; @@ -97,16 +103,21 @@ public class DmaapClient { executorPool.scheduleAtFixedRate(pmNotificationConsumer, 0, configuration.getPollingInterval(), TimeUnit.SECONDS); - // create notification consumers for Policy - NotificationConsumer policyNotificationConsumer = new NotificationConsumer(policyResponseCambriaConsumer, - new PolicyNotificationCallback()); - // start policy notification consumer threads - executorPool = Executors.newScheduledThreadPool(10); - executorPool.scheduleAtFixedRate(policyNotificationConsumer, 0, configuration.getPollingInterval(), - TimeUnit.SECONDS); - - - + // create notification consumers for Policy + NotificationConsumer policyNotificationConsumer = new NotificationConsumer(policyResponseCambriaConsumer, + new PolicyNotificationCallback()); + // start policy notification consumer threads + executorPool = Executors.newScheduledThreadPool(10); + executorPool.scheduleAtFixedRate(policyNotificationConsumer, 0, configuration.getPollingInterval(), + TimeUnit.SECONDS); + + // create notification consumers for ML MS + NotificationConsumer intelligentSlicingConsumer = new NotificationConsumer(intelligentSlicingCambriaConsumer, + new IntelligentSlicingCallback()); + // start intelligent Slicing notification consumer threads + executorPool = Executors.newScheduledThreadPool(10); + executorPool.scheduleAtFixedRate(intelligentSlicingConsumer, 0, configuration.getPollingInterval(), + TimeUnit.SECONDS); } } -- cgit 1.2.3-korg