summaryrefslogtreecommitdiffstats
path: root/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/DmaapClient.java
diff options
context:
space:
mode:
Diffstat (limited to 'components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/DmaapClient.java')
-rw-r--r--components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/DmaapClient.java45
1 files changed, 28 insertions, 17 deletions
diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/DmaapClient.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/DmaapClient.java
index 6e0ea401..08a89541 100644
--- a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/DmaapClient.java
+++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/DmaapClient.java
@@ -30,7 +30,7 @@ import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
-import org.onap.slice.analysis.ms.beans.Configuration;
+import org.onap.slice.analysis.ms.models.Configuration;
import org.onap.slice.analysis.ms.utils.DmaapUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -69,23 +69,29 @@ public class DmaapClient {
@SuppressWarnings("unchecked")
public synchronized void startClient() {
- Map<String, Object> streamSubscribes = Configuration.getInstance().getStreamsSubscribes();
+ Map<String, Object> streamSubscribes = configuration.getStreamsSubscribes();
String pmTopicUrl = ((Map<String, String>) ((Map<String, Object>) streamSubscribes
.get("performance_management_topic")).get("dmaap_info")).get("topic_url");
String[] pmTopicSplit = pmTopicUrl.split("\\/");
String pmTopic = pmTopicSplit[pmTopicSplit.length - 1];
log.debug("pm topic : {}", pmTopic);
+
String policyResponseTopicUrl = ((Map<String, String>) ((Map<String, Object>) streamSubscribes
.get("dcae_cl_response_topic")).get("dmaap_info")).get("topic_url");
String[] policyResponseTopicUrlSplit = policyResponseTopicUrl.split("\\/");
String policyResponseTopic = policyResponseTopicUrlSplit[policyResponseTopicUrlSplit.length - 1];
log.debug("policyResponse Topic : {}", policyResponseTopic);
- CambriaConsumer pmNotifCambriaConsumer = null;
- CambriaConsumer policyResponseCambriaConsumer = null;
-
- pmNotifCambriaConsumer = dmaapUtils.buildConsumer(configuration, pmTopic);
- policyResponseCambriaConsumer = dmaapUtils.buildConsumer(configuration, policyResponseTopic);
+
+ String intelligentSlicingTopicUrl = ((Map<String, String>) ((Map<String, Object>) streamSubscribes
+ .get("intelligent_slicing_topic")).get("dmaap_info")).get("topic_url");
+ String[] intelligentSlicingTopicSplit = intelligentSlicingTopicUrl.split("\\/");
+ String intelligentSlicingTopic = intelligentSlicingTopicSplit[intelligentSlicingTopicSplit.length - 1];
+ log.debug("intelligent slicing topic : {}", pmTopic);
+
+ CambriaConsumer pmNotifCambriaConsumer = dmaapUtils.buildConsumer(configuration, pmTopic);
+ CambriaConsumer policyResponseCambriaConsumer = dmaapUtils.buildConsumer(configuration, policyResponseTopic);
+ CambriaConsumer intelligentSlicingCambriaConsumer = dmaapUtils.buildConsumer(configuration, intelligentSlicingTopic);
ScheduledExecutorService executorPool;
@@ -97,16 +103,21 @@ public class DmaapClient {
executorPool.scheduleAtFixedRate(pmNotificationConsumer, 0, configuration.getPollingInterval(),
TimeUnit.SECONDS);
- // create notification consumers for Policy
- NotificationConsumer policyNotificationConsumer = new NotificationConsumer(policyResponseCambriaConsumer,
- new PolicyNotificationCallback());
- // start policy notification consumer threads
- executorPool = Executors.newScheduledThreadPool(10);
- executorPool.scheduleAtFixedRate(policyNotificationConsumer, 0, configuration.getPollingInterval(),
- TimeUnit.SECONDS);
-
-
-
+ // create notification consumers for Policy
+ NotificationConsumer policyNotificationConsumer = new NotificationConsumer(policyResponseCambriaConsumer,
+ new PolicyNotificationCallback());
+ // start policy notification consumer threads
+ executorPool = Executors.newScheduledThreadPool(10);
+ executorPool.scheduleAtFixedRate(policyNotificationConsumer, 0, configuration.getPollingInterval(),
+ TimeUnit.SECONDS);
+
+ // create notification consumers for ML MS
+ NotificationConsumer intelligentSlicingConsumer = new NotificationConsumer(intelligentSlicingCambriaConsumer,
+ new IntelligentSlicingCallback());
+ // start intelligent Slicing notification consumer threads
+ executorPool = Executors.newScheduledThreadPool(10);
+ executorPool.scheduleAtFixedRate(intelligentSlicingConsumer, 0, configuration.getPollingInterval(),
+ TimeUnit.SECONDS);
}
}