summaryrefslogtreecommitdiffstats
path: root/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap
diff options
context:
space:
mode:
Diffstat (limited to 'components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap')
-rw-r--r--components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/DmaapClient.java45
-rw-r--r--components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/IntelligentSlicingCallback.java69
-rw-r--r--components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/NewPmNotification.java8
-rw-r--r--components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/NotificationCallback.java2
-rw-r--r--components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/PmNotificationCallback.java2
-rw-r--r--components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/PolicyDmaapClient.java2
-rw-r--r--components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/PolicyNotificationCallback.java6
7 files changed, 107 insertions, 27 deletions
diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/DmaapClient.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/DmaapClient.java
index 6e0ea401..08a89541 100644
--- a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/DmaapClient.java
+++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/DmaapClient.java
@@ -30,7 +30,7 @@ import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
-import org.onap.slice.analysis.ms.beans.Configuration;
+import org.onap.slice.analysis.ms.models.Configuration;
import org.onap.slice.analysis.ms.utils.DmaapUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -69,23 +69,29 @@ public class DmaapClient {
@SuppressWarnings("unchecked")
public synchronized void startClient() {
- Map<String, Object> streamSubscribes = Configuration.getInstance().getStreamsSubscribes();
+ Map<String, Object> streamSubscribes = configuration.getStreamsSubscribes();
String pmTopicUrl = ((Map<String, String>) ((Map<String, Object>) streamSubscribes
.get("performance_management_topic")).get("dmaap_info")).get("topic_url");
String[] pmTopicSplit = pmTopicUrl.split("\\/");
String pmTopic = pmTopicSplit[pmTopicSplit.length - 1];
log.debug("pm topic : {}", pmTopic);
+
String policyResponseTopicUrl = ((Map<String, String>) ((Map<String, Object>) streamSubscribes
.get("dcae_cl_response_topic")).get("dmaap_info")).get("topic_url");
String[] policyResponseTopicUrlSplit = policyResponseTopicUrl.split("\\/");
String policyResponseTopic = policyResponseTopicUrlSplit[policyResponseTopicUrlSplit.length - 1];
log.debug("policyResponse Topic : {}", policyResponseTopic);
- CambriaConsumer pmNotifCambriaConsumer = null;
- CambriaConsumer policyResponseCambriaConsumer = null;
-
- pmNotifCambriaConsumer = dmaapUtils.buildConsumer(configuration, pmTopic);
- policyResponseCambriaConsumer = dmaapUtils.buildConsumer(configuration, policyResponseTopic);
+
+ String intelligentSlicingTopicUrl = ((Map<String, String>) ((Map<String, Object>) streamSubscribes
+ .get("intelligent_slicing_topic")).get("dmaap_info")).get("topic_url");
+ String[] intelligentSlicingTopicSplit = intelligentSlicingTopicUrl.split("\\/");
+ String intelligentSlicingTopic = intelligentSlicingTopicSplit[intelligentSlicingTopicSplit.length - 1];
+ log.debug("intelligent slicing topic : {}", pmTopic);
+
+ CambriaConsumer pmNotifCambriaConsumer = dmaapUtils.buildConsumer(configuration, pmTopic);
+ CambriaConsumer policyResponseCambriaConsumer = dmaapUtils.buildConsumer(configuration, policyResponseTopic);
+ CambriaConsumer intelligentSlicingCambriaConsumer = dmaapUtils.buildConsumer(configuration, intelligentSlicingTopic);
ScheduledExecutorService executorPool;
@@ -97,16 +103,21 @@ public class DmaapClient {
executorPool.scheduleAtFixedRate(pmNotificationConsumer, 0, configuration.getPollingInterval(),
TimeUnit.SECONDS);
- // create notification consumers for Policy
- NotificationConsumer policyNotificationConsumer = new NotificationConsumer(policyResponseCambriaConsumer,
- new PolicyNotificationCallback());
- // start policy notification consumer threads
- executorPool = Executors.newScheduledThreadPool(10);
- executorPool.scheduleAtFixedRate(policyNotificationConsumer, 0, configuration.getPollingInterval(),
- TimeUnit.SECONDS);
-
-
-
+ // create notification consumers for Policy
+ NotificationConsumer policyNotificationConsumer = new NotificationConsumer(policyResponseCambriaConsumer,
+ new PolicyNotificationCallback());
+ // start policy notification consumer threads
+ executorPool = Executors.newScheduledThreadPool(10);
+ executorPool.scheduleAtFixedRate(policyNotificationConsumer, 0, configuration.getPollingInterval(),
+ TimeUnit.SECONDS);
+
+ // create notification consumers for ML MS
+ NotificationConsumer intelligentSlicingConsumer = new NotificationConsumer(intelligentSlicingCambriaConsumer,
+ new IntelligentSlicingCallback());
+ // start intelligent Slicing notification consumer threads
+ executorPool = Executors.newScheduledThreadPool(10);
+ executorPool.scheduleAtFixedRate(intelligentSlicingConsumer, 0, configuration.getPollingInterval(),
+ TimeUnit.SECONDS);
}
}
diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/IntelligentSlicingCallback.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/IntelligentSlicingCallback.java
new file mode 100644
index 00000000..dd6760ba
--- /dev/null
+++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/IntelligentSlicingCallback.java
@@ -0,0 +1,69 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * slice-analysis-ms
+ * ================================================================================
+ * Copyright (C) 2020 Wipro Limited.
+ * ==============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ *******************************************************************************/
+
+package org.onap.slice.analysis.ms.dmaap;
+
+import java.io.IOException;
+
+import org.onap.slice.analysis.ms.models.MLOutputModel;
+import org.onap.slice.analysis.ms.service.MLMessageProcessor;
+import org.onap.slice.analysis.ms.utils.BeanUtil;
+import org.slf4j.Logger;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+/**
+ * Handles Notification on dmaap for ML ms events
+ */
+public class IntelligentSlicingCallback implements NotificationCallback {
+ private static final Logger log = org.slf4j.LoggerFactory.getLogger(IntelligentSlicingCallback.class);
+ private MLMessageProcessor mlMsMessageProcessor;
+
+ public IntelligentSlicingCallback() {
+ mlMsMessageProcessor = BeanUtil.getBean(MLMessageProcessor.class);
+ }
+
+ /**
+ * Trigger on Notification from ML ms
+ */
+ @Override
+ public void activateCallBack(String msg) {
+ handlePolicyNotification(msg);
+ }
+
+ /**
+ * Parse and take actions on reception of Notification from ML ms
+ * @param msg
+ */
+ private void handlePolicyNotification(String msg) {
+ log.info("Message received from ML ms: {}" ,msg);
+ ObjectMapper obj = new ObjectMapper();
+ MLOutputModel output = null;
+ try {
+ output = obj.readValue(msg, new TypeReference<MLOutputModel>(){});
+ mlMsMessageProcessor.processMLMsg(output);
+ }
+ catch (IOException e) {
+ log.error("Error converting ML msg to object, {}",e.getMessage());
+ }
+ }
+}
diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/NewPmNotification.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/NewPmNotification.java
index 5c1f496b..66c3f706 100644
--- a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/NewPmNotification.java
+++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/NewPmNotification.java
@@ -32,7 +32,7 @@ import org.springframework.stereotype.Component;
@Component
public class NewPmNotification {
- private Boolean newNotif;
+ private boolean newNotif;
/**
* Initialize new pm Notification flag
@@ -42,15 +42,15 @@ public class NewPmNotification {
newNotif = false;
}
- public Boolean getNewNotif() {
+ public boolean getNewNotif() {
return newNotif;
}
- public void setNewNotif(Boolean newNotif) {
+ public void setNewNotif(boolean newNotif) {
this.newNotif = newNotif;
}
- public NewPmNotification(Boolean newNotif) {
+ public NewPmNotification(boolean newNotif) {
super();
this.newNotif = newNotif;
}
diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/NotificationCallback.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/NotificationCallback.java
index 427b4048..ce1ebd6f 100644
--- a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/NotificationCallback.java
+++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/NotificationCallback.java
@@ -21,7 +21,7 @@
package org.onap.slice.analysis.ms.dmaap;
-public abstract class NotificationCallback {
+public interface NotificationCallback {
public abstract void activateCallBack(String msg);
diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/PmNotificationCallback.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/PmNotificationCallback.java
index 17e50aca..963165d2 100644
--- a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/PmNotificationCallback.java
+++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/PmNotificationCallback.java
@@ -30,7 +30,7 @@ import org.slf4j.LoggerFactory;
/**
* Handles Notification on dmaap for Performance events
*/
-public class PmNotificationCallback extends NotificationCallback {
+public class PmNotificationCallback implements NotificationCallback {
private static Logger log = LoggerFactory.getLogger(PmNotificationCallback.class);
diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/PolicyDmaapClient.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/PolicyDmaapClient.java
index 81ca9ef1..06604040 100644
--- a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/PolicyDmaapClient.java
+++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/PolicyDmaapClient.java
@@ -25,7 +25,7 @@ import com.att.nsa.cambria.client.CambriaBatchingPublisher;
import java.io.IOException;
import java.util.Map;
-import org.onap.slice.analysis.ms.beans.Configuration;
+import org.onap.slice.analysis.ms.models.Configuration;
import org.onap.slice.analysis.ms.utils.DmaapUtils;
/**
diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/PolicyNotificationCallback.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/PolicyNotificationCallback.java
index 57aadd18..146b60a9 100644
--- a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/PolicyNotificationCallback.java
+++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/PolicyNotificationCallback.java
@@ -26,7 +26,7 @@ import org.slf4j.Logger;
/**
* Handles Notification on dmaap for Policy events
*/
-public class PolicyNotificationCallback extends NotificationCallback {
+public class PolicyNotificationCallback implements NotificationCallback {
private static final Logger log = org.slf4j.LoggerFactory.getLogger(PolicyNotificationCallback.class);
@@ -43,7 +43,7 @@ public class PolicyNotificationCallback extends NotificationCallback {
* @param msg
*/
private void handlePolicyNotification(String msg) {
- log.info("Message received from policy: " +msg);
- //TBD - actions to perform on reception of notification from policy
+ log.info("Message received from policy: " +msg);
+ //TBD - actions to perform on reception of notification from policy
}
}