summaryrefslogtreecommitdiffstats
path: root/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service
diff options
context:
space:
mode:
authordhebeha <dhebeha.mj71@wipro.com>2020-09-08 13:02:32 +0530
committerdhebeha <dhebeha.mj71@wipro.com>2020-09-15 22:11:14 +0530
commit8882e23eedce9e9236e1d979b2056b62dd974d91 (patch)
treeff7bce29e94a814c340df73607b9d0da59a1329a /components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service
parent2c6ad52e8d0e29b8037776fff035f031558cc4b0 (diff)
Add support to consume, process pm message from DB
- Add support for analysing pm data - Add support to trigger closed loop - Add support for configDb Interface Implementation - Add support for Intelligent slicing Issue-ID: DCAEGEN2-2255 Signed-off-by: dhebeha <dhebeha.mj71@wipro.com> Change-Id: I185dbb6da45ae6ee74f0a090e2d604914163588b
Diffstat (limited to 'components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service')
-rw-r--r--components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/AverageCalculator.java94
-rw-r--r--components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/ConsumerThread.java72
-rw-r--r--components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/IPmEventProcessor.java35
-rw-r--r--components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/MLMessageProcessor.java73
-rw-r--r--components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/PmDataQueue.java96
-rw-r--r--components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/PmEventProcessor.java105
-rw-r--r--components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/PmThread.java92
-rw-r--r--components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/PolicyService.java111
-rw-r--r--components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/SnssaiSamplesProcessor.java190
9 files changed, 868 insertions, 0 deletions
diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/AverageCalculator.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/AverageCalculator.java
new file mode 100644
index 00000000..a003e9c0
--- /dev/null
+++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/AverageCalculator.java
@@ -0,0 +1,94 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * slice-analysis-ms
+ * ================================================================================
+ * Copyright (C) 2020 Wipro Limited.
+ * ==============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ *******************************************************************************/
+
+package org.onap.slice.analysis.ms.service;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.annotation.PostConstruct;
+
+import org.onap.slice.analysis.ms.models.MeasurementObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+/**
+ * This class has utility methods for calculating the average of samples
+ */
+@Component
+public class AverageCalculator {
+ private static Logger log = LoggerFactory.getLogger(AverageCalculator.class);
+ private List<String> pmNames;
+
+ @PostConstruct
+ public void init() {
+ pmNames = new ArrayList<>();
+ pmNames.add("PrbUsedDl");
+ pmNames.add("PrbUsedUl");
+ }
+
+ /**
+ * Find average of samples
+ */
+ public List<MeasurementObject> findAverageOfSamples(List<List<MeasurementObject>> samples) {
+ int numOfSamples = samples.size();
+ List<MeasurementObject> result = new ArrayList<>();
+ samples.forEach(sample ->
+ sample.forEach(cellMeasObj -> {
+ int index = result.indexOf(cellMeasObj);
+ if(index != -1) {
+ result.set(index, findSum(result.get(index), cellMeasObj));
+ }
+ else {
+ result.add(cellMeasObj);
+ }
+ })
+ );
+ return findAvg(result, numOfSamples);
+ }
+
+ /**
+ * Calculate the sum
+ */
+ public MeasurementObject findSum(MeasurementObject existing, MeasurementObject current) {
+ pmNames.forEach(pmName -> {
+ int newValue = current.getPmData().get(pmName) + existing.getPmData().get(pmName);
+ existing.getPmData().put(pmName, newValue);
+ });
+ return existing;
+ }
+
+ /**
+ * Calculate the average
+ */
+ public List<MeasurementObject> findAvg(List<MeasurementObject> result, int numOfSamples) {
+ result.forEach(cellMeasObj ->
+ pmNames.forEach(pmName -> {
+ int value = (cellMeasObj.getPmData().get(pmName))/numOfSamples;
+ cellMeasObj.getPmData().put(pmName, value);
+ })
+ );
+ log.debug("Average of measurement data samples {}",result);
+ return result;
+ }
+}
+
diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/ConsumerThread.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/ConsumerThread.java
new file mode 100644
index 00000000..39235cd5
--- /dev/null
+++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/ConsumerThread.java
@@ -0,0 +1,72 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * slice-analysis-ms
+ * ================================================================================
+ * Copyright (C) 2020 Wipro Limited.
+ * ==============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ *******************************************************************************/
+
+package org.onap.slice.analysis.ms.service;
+
+import org.onap.slice.analysis.ms.configdb.IConfigDbService;
+import org.onap.slice.analysis.ms.models.Configuration;
+import org.onap.slice.analysis.ms.utils.BeanUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This Thread class consumes message from pm data queue and sends onset message to policy
+ */
+public class ConsumerThread extends Thread {
+ private static Logger log = LoggerFactory.getLogger(PmThread.class);
+ private PmDataQueue pmDataQueue;
+ private IConfigDbService configDbService;
+ private SnssaiSamplesProcessor snssaiSamplesProcessor;
+ private long initialDelaySec;
+
+ /**
+ * Default constructor.
+ */
+ public ConsumerThread() {
+ super();
+ this.pmDataQueue = BeanUtil.getBean(PmDataQueue.class);
+ this.configDbService = BeanUtil.getBean(IConfigDbService.class);
+ this.snssaiSamplesProcessor = BeanUtil.getBean(SnssaiSamplesProcessor.class);
+ this.initialDelaySec = Configuration.getInstance().getInitialDelaySeconds();
+ }
+
+ /**
+ * Consumes data from PM data queue, process the data and sends onset message to policy if needed
+ */
+ @Override
+ public void run() {
+ boolean done = false;
+ String snssai = "";
+ while (!done) {
+ try {
+ Thread.sleep(initialDelaySec);
+ snssai = pmDataQueue.getSnnsaiFromQueue();
+ if (!snssai.equals("")) {
+ log.info("Consumer thread started for s-nssai {}",snssai);
+ snssaiSamplesProcessor.processSamplesOfSnnsai(snssai, configDbService.fetchNetworkFunctionsOfSnssai(snssai));
+ }
+ } catch (Exception e) {
+ log.error("Exception in Consumer Thread ", e);
+ done = true;
+ }
+ }
+ }
+}
diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/IPmEventProcessor.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/IPmEventProcessor.java
new file mode 100644
index 00000000..0a67df81
--- /dev/null
+++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/IPmEventProcessor.java
@@ -0,0 +1,35 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * slice-analysis-ms
+ * ================================================================================
+ * Copyright (C) 2020 Wipro Limited.
+ * ==============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ *******************************************************************************/
+
+package org.onap.slice.analysis.ms.service;
+
+import java.util.List;
+import java.util.Map;
+
+import org.onap.slice.analysis.ms.models.MeasurementObject;
+import org.onap.slice.analysis.ms.models.pmnotification.Event;
+
+/**
+ * Interface for pm event processor
+ */
+public interface IPmEventProcessor {
+ public Map<String, List<MeasurementObject>> processEvent(Event event);
+}
diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/MLMessageProcessor.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/MLMessageProcessor.java
new file mode 100644
index 00000000..bee7b30e
--- /dev/null
+++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/MLMessageProcessor.java
@@ -0,0 +1,73 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * slice-analysis-ms
+ * ================================================================================
+ * Copyright (C) 2020 Wipro Limited.
+ * ==============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ *******************************************************************************/
+
+package org.onap.slice.analysis.ms.service;
+
+import java.util.List;
+import java.util.Map;
+
+import javax.annotation.PostConstruct;
+
+import org.onap.slice.analysis.ms.configdb.IConfigDbService;
+import org.onap.slice.analysis.ms.models.CUModel;
+import org.onap.slice.analysis.ms.models.MLOutputModel;
+import org.onap.slice.analysis.ms.models.policy.AdditionalProperties;
+import org.onap.slice.analysis.ms.utils.BeanUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.context.annotation.Scope;
+import org.springframework.stereotype.Component;
+
+/**
+ * Process the message sent by ML service and sends notification to policy
+ */
+@Component
+@Scope("Prototype")
+public class MLMessageProcessor {
+ private static Logger log = LoggerFactory.getLogger(MLMessageProcessor.class);
+
+ private IConfigDbService configDbService;
+ private PolicyService policyService;
+
+
+ @PostConstruct
+ public void init() {
+ configDbService = BeanUtil.getBean(IConfigDbService.class);
+ }
+
+ public void processMLMsg(MLOutputModel mlOutputMsg) {
+ String snssai = mlOutputMsg.getSnssai();
+ List<CUModel> cuData = mlOutputMsg.getData();
+ Map<String, List<String>> ricToCellMapping = configDbService.fetchRICsOfSnssai(snssai);
+ log.debug("RIC to cell mapping of S-NSSAI {} is {}",snssai,ricToCellMapping);
+ for(CUModel cuModel: cuData) {
+ String cellId = String.valueOf(cuModel.getCellCUList().get(0).getCellLocalId());
+ ricToCellMapping.forEach((ricId, cells) -> {
+ if(cells.contains(cellId)) {
+ cuModel.setNearRTRICId(ricId);
+ }
+ });
+ }
+ AdditionalProperties<MLOutputModel> addProps = new AdditionalProperties<>();
+ addProps.setResourceConfig(mlOutputMsg);
+ policyService.sendOnsetMessageToPolicy(snssai, addProps, configDbService.fetchServiceDetails(snssai));
+ }
+}
diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/PmDataQueue.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/PmDataQueue.java
new file mode 100644
index 00000000..d907bfed
--- /dev/null
+++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/PmDataQueue.java
@@ -0,0 +1,96 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * slice-analysis-ms
+ * ================================================================================
+ * Copyright (C) 2020 Wipro Limited.
+ * ==============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ *******************************************************************************/
+
+package org.onap.slice.analysis.ms.service;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.onap.slice.analysis.ms.models.MeasurementObject;
+import org.onap.slice.analysis.ms.models.SubCounter;
+import org.springframework.stereotype.Component;
+
+/**
+ * This class represents the data structure for storing the pm events
+ */
+@Component
+public class PmDataQueue {
+ private Map<SubCounter, Queue<List<MeasurementObject>>> subCounterMap = Collections.synchronizedMap(new LinkedHashMap<SubCounter, Queue<List<MeasurementObject>>>());
+ private Queue<String> snssaiList = new LinkedBlockingQueue<>();
+
+ /**
+ * put the measurement data for (an S-NSSAI from a network function) in the queue
+ */
+ public void putDataToQueue(SubCounter subCounter, List<MeasurementObject> measurementObjectData) {
+ Queue<List<MeasurementObject>> measQueue;
+ if (subCounterMap.containsKey(subCounter)){
+ subCounterMap.get(subCounter).add(measurementObjectData);
+ }
+ else {
+ measQueue = new LinkedBlockingQueue<>();
+ measQueue.add(measurementObjectData);
+ subCounterMap.put(subCounter, measQueue);
+ }
+ }
+
+ /**
+ * get the measurement data for (an S-NSSAI from a network function) from the queue
+ * returns the specified number of samples
+ */
+ public List<List<MeasurementObject>> getSamplesFromQueue(SubCounter subCounter, int samples) {
+ List<List<MeasurementObject>> sampleList = new LinkedList<>();
+ if (subCounterMap.containsKey(subCounter)){
+ Queue<List<MeasurementObject>> measQueue = subCounterMap.get(subCounter);
+ while(samples > 0) {
+ sampleList.add(measQueue.remove());
+ samples --;
+ }
+ }
+ return sampleList;
+ }
+
+ /**
+ * put S-NSSAI to the queue
+ */
+ public void putSnssaiToQueue(String snssai) {
+ if (!snssaiList.contains(snssai))
+ snssaiList.add(snssai);
+ }
+
+ /**
+ * get S-NSSAI from the queue
+ */
+ public String getSnnsaiFromQueue() {
+ String snssai = "";
+ try {
+ snssai = snssaiList.remove();
+ }
+ catch(Exception e) {
+
+ }
+ return snssai;
+ }
+}
diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/PmEventProcessor.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/PmEventProcessor.java
new file mode 100644
index 00000000..99c24c8a
--- /dev/null
+++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/PmEventProcessor.java
@@ -0,0 +1,105 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * slice-analysis-ms
+ * ================================================================================
+ * Copyright (C) 2020 Wipro Limited.
+ * ==============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ *******************************************************************************/
+
+package org.onap.slice.analysis.ms.service;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.onap.slice.analysis.ms.models.MeasurementObject;
+import org.onap.slice.analysis.ms.models.pmnotification.Event;
+import org.onap.slice.analysis.ms.models.pmnotification.MeasInfoList;
+import org.onap.slice.analysis.ms.models.pmnotification.MeasResult;
+import org.onap.slice.analysis.ms.models.pmnotification.MeasValuesList;
+import org.springframework.context.annotation.Scope;
+import org.springframework.stereotype.Component;
+
+/**
+ * This class is responsible for processing PmEvent
+ */
+@Component
+@Scope("prototype")
+public class PmEventProcessor implements IPmEventProcessor{
+ protected Map<String, List<MeasurementObject>> instanceMap = new HashMap<>();
+
+
+ /**
+ * Process the PM event
+ */
+ public Map<String, List<MeasurementObject>> processEvent(Event event) {
+ List<MeasInfoList> measurements = event.getPerf3gppFields().getMeasDataCollection().getMeasInfoList();
+ measurements.forEach(measurement -> {
+ List<String> collectedSubCounters = measurement.getMeasTypes().getsMeasTypesList();
+ List<MeasValuesList> subCounterMeasurements = measurement.getMeasValuesList();
+ subCounterMeasurements.forEach(subCounterMeasurement -> processMeasurementObjectData(collectedSubCounters, subCounterMeasurement));
+ });
+ return instanceMap;
+ }
+
+ /**
+ * Process the measurement data from every measurement object. eg cell
+ */
+ public void processMeasurementObjectData(List<String> collectedSubCounters, MeasValuesList subCounterMeasurement) {
+ List<MeasResult> measResultList = subCounterMeasurement.getMeasResults();
+ String measObjId = subCounterMeasurement.getMeasObjInstId();
+ measResultList.forEach(measResult -> {
+ String pmName = collectedSubCounters.get(measResult.getP()-1);
+ Integer pmValue = Integer.valueOf(measResult.getsValue());
+ Map<String,String> pmMapping = getMapKey(pmName);
+ String snssai = pmMapping.get("snssai");
+ String pm = pmMapping.get("pm");
+ Map<String, Integer> pmData = new HashMap<>();
+ pmData.put(pm, pmValue);
+ if (instanceMap.containsKey(snssai)) {
+ int index = instanceMap.get(snssai).indexOf(new MeasurementObject(measObjId));
+ if (index == -1) {
+ instanceMap.get(snssai).add(new MeasurementObject(measObjId,pmData));
+ }
+ else {
+ instanceMap.get(snssai).get(index).getPmData().put(pmName, pmValue);
+ }
+ }
+ else {
+ List<MeasurementObject> l = new LinkedList<>();
+ l.add(new MeasurementObject(measObjId,pmData));
+ instanceMap.put(snssai, l);
+ }
+ });
+ }
+
+ /**
+ * Fetch pm name and S-NSSAI
+ */
+ public Map<String, String> getMapKey(String pmName) {
+ String [] pmNameArr = pmName.split("\\.");
+ String snssai = "";
+ String pm = pmNameArr[1];
+ Map<String, String> result = new HashMap<>();
+ result.put("pm", pm);
+ if ((pm.equalsIgnoreCase("PrbUsedDl")) || (pm.equalsIgnoreCase("PrbUsedUl"))){
+ snssai = pmNameArr[2];
+ }
+ result.put("snssai", snssai);
+ return result;
+ }
+} \ No newline at end of file
diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/PmThread.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/PmThread.java
new file mode 100644
index 00000000..d9091b3c
--- /dev/null
+++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/PmThread.java
@@ -0,0 +1,92 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * slice-analysis-ms
+ * ================================================================================
+ * Copyright (C) 2020 Wipro Limited.
+ * ==============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ *******************************************************************************/
+
+package org.onap.slice.analysis.ms.service;
+
+import java.util.List;
+import java.util.Map;
+
+import org.onap.slice.analysis.ms.data.repository.PerformanceNotificationsRepository;
+import org.onap.slice.analysis.ms.dmaap.NewPmNotification;
+import org.onap.slice.analysis.ms.models.MeasurementObject;
+import org.onap.slice.analysis.ms.models.SubCounter;
+import org.onap.slice.analysis.ms.models.pmnotification.PmNotification;
+import org.onap.slice.analysis.ms.utils.BeanUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+/**
+ * This Thread class consumes pm message from database and puts it in the queue for further processing
+ */
+public class PmThread extends Thread {
+ private static Logger log = LoggerFactory.getLogger(PmThread.class);
+ private NewPmNotification newPmNotification;
+ private PerformanceNotificationsRepository performanceNotificationsRepository;
+ private IPmEventProcessor pmEventProcessor;
+ private PmDataQueue pmDataQueue;
+
+ /**
+ * parameterized constructor.
+ */
+ public PmThread(NewPmNotification newPmNotification) {
+ super();
+ this.newPmNotification = newPmNotification;
+ this.performanceNotificationsRepository = BeanUtil.getBean(PerformanceNotificationsRepository.class);
+ this.pmEventProcessor = BeanUtil.getBean(IPmEventProcessor.class);
+ this.pmDataQueue = BeanUtil.getBean(PmDataQueue.class);
+ }
+
+ /**
+ * check for new PM notification. Fetch notification from the database, process and put it in the pm data queue
+ */
+ @Override
+ public void run() {
+ log.info("PM thread starting ...");
+ boolean done = false;
+ PmNotification pmNotification;
+ Map<String, List<MeasurementObject>> processedData;
+ while (!done) {
+ try {
+ Thread.sleep(1000);
+ if (newPmNotification.getNewNotif()) {
+ log.info("New PM notification from Dmaap");
+ String pmNotificationString = performanceNotificationsRepository.getPerformanceNotificationFromQueue();
+ if(pmNotificationString != null) {
+ ObjectMapper mapper = new ObjectMapper();
+ pmNotification = mapper.readValue(pmNotificationString, PmNotification.class);
+ processedData = pmEventProcessor.processEvent(pmNotification.getEvent());
+ String networkFunction = pmNotification.getEvent().getPerf3gppFields().getMeasDataCollection().getMeasuredEntityDn();
+ processedData.forEach((key,value) -> {
+ SubCounter subCounter = new SubCounter(networkFunction, key);
+ pmDataQueue.putDataToQueue(subCounter, value);
+ pmDataQueue.putSnssaiToQueue(subCounter.getMeasuredObject());
+ });
+ }
+ }
+ } catch (Exception e) {
+ log.error("Exception in PM Thread ", e);
+ done = true;
+ }
+ }
+ }
+}
diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/PolicyService.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/PolicyService.java
new file mode 100644
index 00000000..80063398
--- /dev/null
+++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/PolicyService.java
@@ -0,0 +1,111 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * slice-analysis-ms
+ * ================================================================================
+ * Copyright (C) 2020 Wipro Limited.
+ * ==============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ *******************************************************************************/
+
+package org.onap.slice.analysis.ms.service;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import javax.annotation.PostConstruct;
+
+import org.onap.slice.analysis.ms.dmaap.PolicyDmaapClient;
+import org.onap.slice.analysis.ms.models.Configuration;
+import org.onap.slice.analysis.ms.models.policy.AAI;
+import org.onap.slice.analysis.ms.models.policy.AdditionalProperties;
+import org.onap.slice.analysis.ms.models.policy.OnsetMessage;
+import org.onap.slice.analysis.ms.models.policy.Payload;
+import org.onap.slice.analysis.ms.utils.DmaapUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+@Component
+public class PolicyService {
+ private PolicyDmaapClient policyDmaapClient;
+ private static Logger log = LoggerFactory.getLogger(PolicyService.class);
+
+ @PostConstruct
+ public void init() {
+ Configuration configuration = Configuration.getInstance();
+ policyDmaapClient = new PolicyDmaapClient(new DmaapUtils(), configuration);
+ }
+
+ protected <T> OnsetMessage formPolicyOnsetMessage(String snssai, AdditionalProperties<T> addProps, Map<String, String> serviceDetails) {
+ OnsetMessage onsetmsg = new OnsetMessage();
+ Payload payload = new Payload();
+ payload.setGlobalSubscriberId(serviceDetails.get("globalSubscriberId"));
+ payload.setSubscriptionServiceType(serviceDetails.get("subscriptionServiceType"));
+ payload.setNetworkType("AN");
+ payload.setName(serviceDetails.get("ranNFNSSIId"));
+ payload.setServiceInstanceID(serviceDetails.get("ranNFNSSIId"));
+
+ addProps.setModifyAction("");
+ Map<String, String> nsiInfo = new HashMap<>();
+ nsiInfo.put("nsiId", UUID.randomUUID().toString());
+ nsiInfo.put("nsiName", "");
+ addProps.setNsiInfo(nsiInfo);
+ addProps.setScriptName("AN");
+ addProps.setSliceProfileId(serviceDetails.get("sliceProfileId"));
+ addProps.setModifyAction("reconfigure");
+ List<String> snssaiList = new ArrayList<>();
+ snssaiList.add(snssai);
+ addProps.setSnssaiList(snssaiList);
+
+ payload.setAdditionalProperties(addProps);
+ onsetmsg.setPayload(payload);
+
+ onsetmsg.setClosedLoopControlName("ControlLoop-Slicing-116d7b00-dbeb-4d03-8719-d0a658fa735b");
+ onsetmsg.setClosedLoopAlarmStart(System.currentTimeMillis());
+ onsetmsg.setClosedLoopEventClient("microservice.sliceAnalysisMS");
+ onsetmsg.setClosedLoopEventStatus("ONSET");
+ onsetmsg.setRequestID(UUID.randomUUID().toString());
+ onsetmsg.setTarget("vserver.vserver-name");
+ onsetmsg.setTargetType("VNF");
+ onsetmsg.setFrom("DCAE");
+ onsetmsg.setVersion("1.0.2");
+ AAI aai = new AAI();
+ aai.setVserverIsClosedLoopDisabled("false");
+ aai.setVserverProvStatus("ACTIVE");
+ aai.setVserverVserverName(serviceDetails.get("ranNFNSSIId"));
+ onsetmsg.setAai(aai);
+ return onsetmsg;
+ }
+
+ protected <T> void sendOnsetMessageToPolicy(String snssai, AdditionalProperties<T> addProps, Map<String, String> serviceDetails) {
+ OnsetMessage onsetMessage = formPolicyOnsetMessage(snssai, addProps, serviceDetails);
+ ObjectMapper obj = new ObjectMapper();
+ String msg = "";
+ try {
+ log.debug("Policy onset message for S-NSSAI: {} is {}", snssai, msg);
+ msg = obj.writeValueAsString(onsetMessage);
+ policyDmaapClient.sendNotificationToPolicy(msg);
+ }
+ catch (Exception e) {
+ log.error("Error sending notification to policy, {}",e.getMessage());
+ }
+ }
+
+}
diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/SnssaiSamplesProcessor.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/SnssaiSamplesProcessor.java
new file mode 100644
index 00000000..2e56190f
--- /dev/null
+++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/SnssaiSamplesProcessor.java
@@ -0,0 +1,190 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * slice-analysis-ms
+ * ================================================================================
+ * Copyright (C) 2020 Wipro Limited.
+ * ==============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ *******************************************************************************/
+package org.onap.slice.analysis.ms.service;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import javax.annotation.PostConstruct;
+
+import org.onap.slice.analysis.ms.configdb.IConfigDbService;
+import org.onap.slice.analysis.ms.models.Configuration;
+import org.onap.slice.analysis.ms.models.MeasurementObject;
+import org.onap.slice.analysis.ms.models.SubCounter;
+import org.onap.slice.analysis.ms.models.policy.AdditionalProperties;
+import org.onap.slice.analysis.ms.utils.BeanUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.context.annotation.Scope;
+import org.springframework.stereotype.Component;
+
+/**
+ * This class process the measurement data of an S-NSSAI
+ */
+@Component
+@Scope("Prototype")
+public class SnssaiSamplesProcessor {
+ private static Logger log = LoggerFactory.getLogger(SnssaiSamplesProcessor.class);
+
+ private PolicyService policyService;
+ private IConfigDbService configDbService;
+ private PmDataQueue pmDataQueue;
+ private AverageCalculator averageCalculator;
+ private List<MeasurementObject> snssaiMeasurementList = new ArrayList<>();
+ private Map<String, List<String>> ricToCellMapping = new HashMap<>();
+ private Map<String, Map<String, Integer>> ricToPrbsMapping = new HashMap<>();
+ private Map<String, Map<String, Integer>> ricToThroughputMapping = new HashMap<>();
+ private int samples;
+ private List<String> pmsToCompute;
+ private Map<String, String> prbThroughputMapping = new HashMap<>();
+ private int minPercentageChange;
+
+ @PostConstruct
+ public void init() {
+ Configuration configuration = Configuration.getInstance();
+ samples = configuration.getSamples();
+ pmsToCompute = new ArrayList<>();
+ pmsToCompute.add("PrbUsedDl");
+ pmsToCompute.add("PrbUsedUl");
+ prbThroughputMapping = new HashMap<>();
+ prbThroughputMapping.put("PrbUsedDl", "dLThptPerSlice");
+ prbThroughputMapping.put("PrbUsedUl", "uLThptPerSlice");
+ minPercentageChange = configuration.getMinPercentageChange();
+ policyService = BeanUtil.getBean(PolicyService.class);
+ configDbService = BeanUtil.getBean(IConfigDbService.class);
+ pmDataQueue = BeanUtil.getBean(PmDataQueue.class);
+ averageCalculator = BeanUtil.getBean(AverageCalculator.class);
+ }
+
+ /**
+ * process the measurement data of an S-NSSAI
+ */
+ public void processSamplesOfSnnsai(String snssai, List<String> networkFunctions) {
+ networkFunctions.forEach(nf -> {
+ log.debug("Average of samples for {}:", snssai);
+ addToMeasurementList(averageCalculator.findAverageOfSamples(pmDataQueue.getSamplesFromQueue(new SubCounter(nf, snssai), samples)));
+ });
+ ricToCellMapping = configDbService.fetchRICsOfSnssai(snssai);
+ log.debug("RIC to Cell Mapping for {} S-NSSAI: {}", snssai, ricToCellMapping);
+ Map<String, Map<String, Integer>> ricConfiguration = configDbService.fetchCurrentConfigurationOfRIC(snssai);
+ Map<String, Integer> sliceConfiguration = configDbService.fetchCurrentConfigurationOfSlice(snssai);
+ log.debug("RIC Configuration: {}", ricConfiguration);
+ log.debug("Slice Configuration: {}", sliceConfiguration);
+ pmsToCompute.forEach(pm -> {
+ sumOfPrbsAcrossCells(pm);
+ int sum = computeSum(pm);
+ computeThroughput(sliceConfiguration, sum, pm);
+ calculatePercentageChange(ricConfiguration, pm);
+ });
+ updateConfiguration();
+ if(ricToThroughputMapping.size() > 0) {
+ AdditionalProperties<Map<String, Map<String, Integer>>> addProps = new AdditionalProperties<>();
+ addProps.setResourceConfig(ricToThroughputMapping);
+ policyService.sendOnsetMessageToPolicy(snssai, addProps, configDbService.fetchServiceDetails(snssai));
+ }
+
+ }
+
+ /**
+ * process the measurement data of an S-NSSAI
+ */
+ protected void updateConfiguration() {
+ Iterator<Map.Entry<String, Map<String,Integer>>> it = ricToThroughputMapping.entrySet().iterator();
+ Map.Entry<String, Map<String,Integer>> entry = null;
+ while(it.hasNext()) {
+ entry = it.next();
+ if(entry.getValue().size() == 0) {
+ it.remove();
+ }
+ }
+ }
+
+ private void addToMeasurementList(List<MeasurementObject> sample) {
+ snssaiMeasurementList.addAll(sample);
+ }
+
+ /**
+ * Calculate the change in the configuration value and keep the configuration only if it is greater than a
+ * specific limit
+ */
+ protected void calculatePercentageChange(Map<String, Map<String, Integer>> ricConfiguration, String pm) {
+ Iterator<Map.Entry<String, Map<String,Integer>>> it = ricToThroughputMapping.entrySet().iterator();
+ Map.Entry<String, Map<String,Integer>> entry = null;
+ float existing = 0;
+ float change = 0;
+ while(it.hasNext()) {
+ entry = it.next();
+ existing = ricConfiguration.get(entry.getKey()).get(pm);
+ change = ((Math.abs(entry.getValue().get(pm) - existing))/existing)*100;
+ if (change <= minPercentageChange) {
+ ricToThroughputMapping.get(entry.getKey()).remove(pm);
+ }
+ }
+ }
+
+ protected void sumOfPrbsAcrossCells(String pmName) {
+ ricToCellMapping.forEach((ric,cells) -> {
+ int sumOfPrbs = 0;
+ for(String cell : cells) {
+ int index = snssaiMeasurementList.indexOf(new MeasurementObject(cell));
+ sumOfPrbs += snssaiMeasurementList.get(index).getPmData().get(pmName);
+ }
+ if(ricToPrbsMapping.containsKey(ric)) {
+ ricToPrbsMapping.get(ric).put(pmName, sumOfPrbs);
+ }
+ else {
+ Map<String, Integer> pmToPrbMapping = new HashMap<>();
+ pmToPrbMapping.put(pmName, sumOfPrbs);
+ ricToPrbsMapping.put(ric, pmToPrbMapping);
+ }
+ });
+ }
+
+ protected Integer computeSum(String pm) {
+ return ricToPrbsMapping.entrySet().stream().map(x -> x.getValue().get(pm)).reduce(0, Integer::sum);
+ }
+
+ protected void computeThroughput(Map<String, Integer> sliceConfiguration, int sum, String pm) {
+ Iterator<Map.Entry<String, Map<String,Integer>>> it = ricToPrbsMapping.entrySet().iterator();
+ Map.Entry<String, Map<String,Integer>> entry = null;
+ Map<String, Integer> throughtputMap = null;
+ String ric = "";
+ int value = 0;
+ while(it.hasNext()) {
+ entry = it.next();
+ ric = entry.getKey();
+ value = Math.round(((float)entry.getValue().get(pm)/sum)*(float)sliceConfiguration.get(prbThroughputMapping.get(pm)));
+ if(ricToThroughputMapping.containsKey(ric)) {
+ ricToThroughputMapping.get(ric).put(prbThroughputMapping.get(pm), value);
+ }
+ else {
+ throughtputMap = new HashMap<>();
+ throughtputMap.put(prbThroughputMapping.get(pm), value);
+ ricToThroughputMapping.put(ric, throughtputMap);
+ }
+ }
+
+ }
+
+}