summaryrefslogtreecommitdiffstats
path: root/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service
diff options
context:
space:
mode:
authordecheng zhang <decheng.zhang@huawei.com>2022-02-28 11:15:20 -0500
committerdecheng zhang <decheng.zhang@huawei.com>2022-03-18 08:55:51 -0400
commit7f2e4aa47f56085be6c95cb81b6a8bea8126d56d (patch)
tree74a5316fdad985051a5a824934c522089a76ddf3 /components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service
parentf71b9f809b64f17ac2bedd02a1ed6cbdb7573517 (diff)
Jakarta changes in slice-analysis-ms for IBN Cloud leased line update and CCVPN closed-loop. This commit contains work in sub-task: 1) AAI monitor thread; 2) bandwidth evaluator; 3) policy notification code.1.1.0-slice-analysis-ms
Issue-ID: DCAEGEN2-3063 Signed-off-by: decheng zhang <decheng.zhang@huawei.com> Change-Id: I9029ffd7563e65be59f7fd76adc2a749ff624172 Signed-off-by: decheng zhang <decheng.zhang@huawei.com>
Diffstat (limited to 'components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service')
-rw-r--r--components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/PolicyService.java216
-rw-r--r--components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/ccvpn/BandwidthEvaluator.java324
-rw-r--r--components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/ccvpn/CCVPNPmDatastore.java263
-rw-r--r--components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/ccvpn/Endpointkey.java75
-rw-r--r--components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/ccvpn/Event.java37
-rw-r--r--components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/ccvpn/RequestOwner.java30
-rw-r--r--components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/ccvpn/ServiceState.java32
-rw-r--r--components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/ccvpn/SimpleEvent.java98
8 files changed, 1005 insertions, 70 deletions
diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/PolicyService.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/PolicyService.java
index 2d3a2df0..297683b4 100644
--- a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/PolicyService.java
+++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/PolicyService.java
@@ -3,6 +3,7 @@
* slice-analysis-ms
* ================================================================================
* Copyright (C) 2020-2021 Wipro Limited.
+ * Copyright (C) 2022 Huawei Canada Limited.
* ==============================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -35,6 +36,9 @@ import org.onap.slice.analysis.ms.models.policy.AAI;
import org.onap.slice.analysis.ms.models.policy.AdditionalProperties;
import org.onap.slice.analysis.ms.models.policy.OnsetMessage;
import org.onap.slice.analysis.ms.models.policy.Payload;
+import org.onap.slice.analysis.ms.models.policy.Sla;
+import org.onap.slice.analysis.ms.models.policy.TransportNetwork;
+import org.onap.slice.analysis.ms.service.ccvpn.RequestOwner;
import org.onap.slice.analysis.ms.utils.DmaapUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,77 +46,149 @@ import org.springframework.stereotype.Component;
import com.fasterxml.jackson.databind.ObjectMapper;
+/**
+ * Serivce to generate and publish onsetMessage to ONAP/Policy
+ */
@Component
public class PolicyService {
- private PolicyDmaapClient policyDmaapClient;
- private static Logger log = LoggerFactory.getLogger(PolicyService.class);
- private ObjectMapper objectMapper = new ObjectMapper();
-
- /**
- * Initialization
- */
- @PostConstruct
- public void init() {
- Configuration configuration = Configuration.getInstance();
- policyDmaapClient = new PolicyDmaapClient(new DmaapUtils(), configuration);
- }
-
- protected <T> OnsetMessage formPolicyOnsetMessage(String snssai, AdditionalProperties<T> addProps, Map<String, String> serviceDetails) {
- OnsetMessage onsetmsg = new OnsetMessage();
- Payload payload = new Payload();
- payload.setGlobalSubscriberId(serviceDetails.get("globalSubscriberId"));
- payload.setSubscriptionServiceType(serviceDetails.get("subscriptionServiceType"));
- payload.setNetworkType("AN");
- payload.setName(serviceDetails.get("ranNFNSSIId"));
- payload.setServiceInstanceID(serviceDetails.get("ranNFNSSIId"));
-
- addProps.setModifyAction("");
- Map<String, String> nsiInfo = new HashMap<>();
- nsiInfo.put("nsiId", UUID.randomUUID().toString());
- nsiInfo.put("nsiName", "");
- addProps.setNsiInfo(nsiInfo);
- addProps.setScriptName("AN");
- addProps.setSliceProfileId(serviceDetails.get("sliceProfileId"));
- addProps.setModifyAction("reconfigure");
- List<String> snssaiList = new ArrayList<>();
- snssaiList.add(snssai);
- addProps.setSnssaiList(snssaiList);
-
- payload.setAdditionalProperties(addProps);
- try {
- onsetmsg.setPayload(objectMapper.writeValueAsString(payload));
- } catch (Exception e) {
- log.error("Error while mapping payload as string , {}",e.getMessage());
- }
-
- onsetmsg.setClosedLoopControlName("ControlLoop-Slicing-116d7b00-dbeb-4d03-8719-d0a658fa735b");
- onsetmsg.setClosedLoopAlarmStart(System.currentTimeMillis());
- onsetmsg.setClosedLoopEventClient("microservice.sliceAnalysisMS");
- onsetmsg.setClosedLoopEventStatus("ONSET");
- onsetmsg.setRequestID(UUID.randomUUID().toString());
- onsetmsg.setTarget("generic-vnf.vnf-id");
- onsetmsg.setTargetType("VNF");
- onsetmsg.setFrom("DCAE");
- onsetmsg.setVersion("1.0.2");
- AAI aai = new AAI();
- aai.setVserverIsClosedLoopDisabled("false");
- aai.setVserverProvStatus("ACTIVE");
- aai.setvServerVNFId(serviceDetails.get("ranNFNSSIId"));
- onsetmsg.setAai(aai);
- return onsetmsg;
- }
-
- protected <T> void sendOnsetMessageToPolicy(String snssai, AdditionalProperties<T> addProps, Map<String, String> serviceDetails) {
- OnsetMessage onsetMessage = formPolicyOnsetMessage(snssai, addProps, serviceDetails);
- String msg = "";
- try {
- msg = objectMapper.writeValueAsString(onsetMessage);
- log.info("Policy onset message for S-NSSAI: {} is {}", snssai, msg);
- policyDmaapClient.sendNotificationToPolicy(msg);
- }
- catch (Exception e) {
- log.error("Error sending notification to policy, {}",e.getMessage());
- }
- }
+ private PolicyDmaapClient policyDmaapClient;
+ private static Logger log = LoggerFactory.getLogger(PolicyService.class);
+ private ObjectMapper objectMapper = new ObjectMapper();
+ /**
+ * Initialization
+ */
+ @PostConstruct
+ public void init() {
+ Configuration configuration = Configuration.getInstance();
+ policyDmaapClient = new PolicyDmaapClient(new DmaapUtils(), configuration);
+ }
+
+ protected <T> OnsetMessage formPolicyOnsetMessage(String snssai, AdditionalProperties<T> addProps, Map<String, String> serviceDetails) {
+ OnsetMessage onsetmsg = new OnsetMessage();
+ Payload payload = new Payload();
+ payload.setGlobalSubscriberId(serviceDetails.get("globalSubscriberId"));
+ payload.setSubscriptionServiceType(serviceDetails.get("subscriptionServiceType"));
+ payload.setNetworkType("AN");
+ payload.setName(serviceDetails.get("ranNFNSSIId"));
+ payload.setServiceInstanceID(serviceDetails.get("ranNFNSSIId"));
+
+ addProps.setModifyAction("");
+ Map<String, String> nsiInfo = new HashMap<>();
+ nsiInfo.put("nsiId", UUID.randomUUID().toString());
+ nsiInfo.put("nsiName", "");
+ addProps.setNsiInfo(nsiInfo);
+ addProps.setScriptName("AN");
+ addProps.setSliceProfileId(serviceDetails.get("sliceProfileId"));
+ addProps.setModifyAction("reconfigure");
+ List<String> snssaiList = new ArrayList<>();
+ snssaiList.add(snssai);
+ addProps.setSnssaiList(snssaiList);
+
+ payload.setAdditionalProperties(addProps);
+ try {
+ onsetmsg.setPayload(objectMapper.writeValueAsString(payload));
+ } catch (Exception e) {
+ log.error("Error while mapping payload as string , {}",e.getMessage());
+ }
+
+ onsetmsg.setClosedLoopControlName("ControlLoop-Slicing-116d7b00-dbeb-4d03-8719-d0a658fa735b");
+ onsetmsg.setClosedLoopAlarmStart(System.currentTimeMillis());
+ onsetmsg.setClosedLoopEventClient("microservice.sliceAnalysisMS");
+ onsetmsg.setClosedLoopEventStatus("ONSET");
+ onsetmsg.setRequestID(UUID.randomUUID().toString());
+ onsetmsg.setTarget("generic-vnf.vnf-id");
+ onsetmsg.setTargetType("VNF");
+ onsetmsg.setFrom("DCAE");
+ onsetmsg.setVersion("1.0.2");
+ AAI aai = new AAI();
+ aai.setVserverIsClosedLoopDisabled("false");
+ aai.setVserverProvStatus("ACTIVE");
+ aai.setvServerVNFId(serviceDetails.get("ranNFNSSIId"));
+ onsetmsg.setAai(aai);
+ return onsetmsg;
+ }
+
+ protected <T> void sendOnsetMessageToPolicy(String snssai, AdditionalProperties<T> addProps, Map<String, String> serviceDetails) {
+ OnsetMessage onsetMessage = formPolicyOnsetMessage(snssai, addProps, serviceDetails);
+ String msg = "";
+ try {
+ msg = objectMapper.writeValueAsString(onsetMessage);
+ log.info("Policy onset message for S-NSSAI: {} is {}", snssai, msg);
+ policyDmaapClient.sendNotificationToPolicy(msg);
+ }
+ catch (Exception e) {
+ log.error("Error sending notification to policy, {}",e.getMessage());
+ }
+ }
+
+ /**
+ * Generate onsetMessage for ccvpn service update operation
+ * @param cllId cloud leased line Id (ethernet service id)
+ * @param newBw new bandwidth value for bandwidth adjustment
+ * @param <T> type for additionalPropert
+ * ies, can be omitted
+ * @return OnsetMessage result
+ */
+ public <T> OnsetMessage formPolicyOnsetMessageForCCVPN(String cllId, Integer newBw, RequestOwner owner) {
+ Sla sla = new Sla(2, newBw);
+ String transportNetworkId = cllId;
+ if (owner == RequestOwner.UUI) {
+ transportNetworkId += "-network-001";
+ } else if (owner == RequestOwner.DCAE) {
+ transportNetworkId += "-network-002";
+ }
+ TransportNetwork transportNetwork = new TransportNetwork(transportNetworkId, sla);
+ AdditionalProperties additionalProperties = new AdditionalProperties();
+ additionalProperties.setModifyAction("bandwidth");
+ additionalProperties.setEnableSdnc("true");
+ List<TransportNetwork> transportNetworks = new ArrayList();
+ transportNetworks.add(transportNetwork);
+ additionalProperties.setTransportNetworks(transportNetworks);
+
+ Payload payload = new Payload();
+ payload.setGlobalSubscriberId("IBNCustomer");
+ payload.setSubscriptionServiceType("IBN");
+ payload.setServiceType("CLL");
+ payload.setName("cloud-leased-line-101");
+ payload.setServiceInstanceID(cllId);
+ payload.setAdditionalProperties(additionalProperties);
+
+ OnsetMessage onsetmsg = new OnsetMessage();
+ try {
+ onsetmsg.setPayload(objectMapper.writeValueAsString(payload));
+ } catch (Exception e) {
+ log.error("Error while mapping payload as string , {}",e.getMessage());
+ }
+ onsetmsg.setClosedLoopControlName("ControlLoop-CCVPN-CLL-227e8b00-dbeb-4d03-8719-d0a658fb846c");
+ onsetmsg.setClosedLoopAlarmStart(System.currentTimeMillis());
+ onsetmsg.setClosedLoopEventClient("microservice.sliceAnalysisMS");
+ onsetmsg.setClosedLoopEventStatus("ONSET");
+ onsetmsg.setRequestID(UUID.randomUUID().toString());
+ onsetmsg.setTarget("generic-vnf.vnf-id");
+ onsetmsg.setTargetType("VNF");
+ onsetmsg.setFrom("DCAE");
+ onsetmsg.setVersion("1.0.2");
+ AAI aai = new AAI();
+ aai.setVserverIsClosedLoopDisabled("true");
+ onsetmsg.setAai(aai);
+ return onsetmsg;
+ }
+
+ /**
+ * Sending the onsetMessage to Onap-Policy through PolicyDmaapClient
+ * @param onsetMessage the onsetMessage about to send
+ * @param <T> type inherent from previous implementation can be omitted
+ */
+ public <T> void sendOnsetMessageToPolicy(OnsetMessage onsetMessage){
+ String msg = "";
+ try {
+ msg = objectMapper.writeValueAsString(onsetMessage);
+ log.info("Policy onset message for ControlLoop-CCVPN-CLL is {}", msg);
+ policyDmaapClient.sendNotificationToPolicy(msg);
+ }
+ catch (Exception e) {
+ log.error("Error sending notification to policy, {}",e.getMessage());
+ }
+ }
}
diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/ccvpn/BandwidthEvaluator.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/ccvpn/BandwidthEvaluator.java
new file mode 100644
index 00000000..a7847b0e
--- /dev/null
+++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/ccvpn/BandwidthEvaluator.java
@@ -0,0 +1,324 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * slice-analysis-ms
+ * ================================================================================
+ * Copyright (C) 2022 Huawei Canada Limited.
+ * ==============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ *******************************************************************************/
+package org.onap.slice.analysis.ms.service.ccvpn;
+
+import com.google.gson.JsonObject;
+import lombok.NonNull;
+import org.onap.slice.analysis.ms.aai.AaiService;
+
+import org.onap.slice.analysis.ms.models.Configuration;
+import org.onap.slice.analysis.ms.service.PolicyService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+
+/**
+ * This class implements the CCVPN PM Closed-loop logical function.
+ * A simple actor model design is implemented here.
+ */
+@Component
+public class BandwidthEvaluator {
+ private static Logger log = LoggerFactory.getLogger(BandwidthEvaluator.class);
+ private Configuration configuration;
+
+ @Autowired
+ AaiService aaiService;
+
+ @Autowired
+ CCVPNPmDatastore ccvpnPmDatastore;
+
+ @Autowired
+ PolicyService policyService;
+
+ private Loop evaluationEventLoop;
+ private Loop aaiEventLoop;
+
+ private static final Event KILL_PILL = new SimpleEvent(null, 0);
+ private static final int DEFAULT_EVAL_INTERVAL = 5;
+ private static final String SERVICE_INSTANCE_LOCATION_ID = "service-instance-location-id";
+ private static final String BANDWIDTH_TOTAL = "bandwidth-total";
+
+ /**
+ * Interval of each round of evaluation, defined in config_all.json
+ */
+ private static int evaluationInterval;
+
+ /**
+ * Percentage threshold of bandwidth adjustment.
+ */
+ private static double threshold;
+
+ /**
+ * Precision of bandwidth evaluation and adjustment.
+ */
+ private static double precision; // in Mbps;
+ private final ScheduledExecutorService executorPool = Executors.newScheduledThreadPool(1);
+
+ /**
+ * Initialize and start the bandwidth evaluator process, schedule a periodic service bandwidth usage check
+ */
+ @PostConstruct
+ public void init() {
+ loadConfig();
+ /**
+ * Evalution main loop
+ */
+ evaluationEventLoop = new Loop("EvaluationLoop"){
+ @Override
+ public void process(Event event) {
+ if (event.type() == SimpleEvent.Type.PERIODIC_CHECK && isPeriodicCheckOn()){
+ log.info("Received new periodic check request: {}", event.time());
+ Map<Endpointkey, CCVPNPmDatastore.EvictingQueue<Integer>> usedBwMap = ccvpnPmDatastore.getUsedBwMap();
+ Map<String, Integer> candidate = new TreeMap<>();
+ for(Map.Entry<Endpointkey, CCVPNPmDatastore.EvictingQueue<Integer>> entry: usedBwMap.entrySet()) {
+ String serviceId = entry.getKey().getCllId();
+ Object[] usedBws = entry.getValue().tryReadToArray();
+
+ if (usedBws == null) {
+ // No enough data for evaluating
+ continue;
+ }
+ if (ccvpnPmDatastore.getMaxBwOfSvc(serviceId) == 0) {
+ // Max bandwidth not cached yet
+ post(new SimpleEvent(SimpleEvent.Type.AAI_BW_REQ, serviceId));
+ continue;
+ }
+ double avg = Arrays.stream(usedBws)
+ .mapToInt(o -> (int) o)
+ .summaryStatistics()
+ .getAverage();
+ if (needAdjust(serviceId, avg, ccvpnPmDatastore.getMaxBwOfSvc(serviceId))) {
+ int newBw = (int) (Math.ceil((avg / threshold) * 1.2 / precision) * precision);
+ candidate.put(serviceId, Math.max(candidate.getOrDefault(serviceId, 0), newBw));
+ }
+ }
+ for(Map.Entry<String, Integer> entry: candidate.entrySet()) {
+ if (isServiceUnderMaintenance(entry.getKey())) {
+ post(new SimpleEvent(SimpleEvent.Type.AAI_BW_REQ, entry.getKey()));
+ continue;
+ }
+ ccvpnPmDatastore.updateSvcState(entry.getKey(), ServiceState.UNDER_MAINTENANCE);
+ sendModifyRequest(entry.getKey(), entry.getValue(), RequestOwner.DCAE);
+ }
+
+ } else if (event.type() == SimpleEvent.Type.ONDEMAND_CHECK && isOnDemandCheckOn()) {
+ log.info("Received new on-demand check request: {}", event.time());
+ JsonObject payload = (JsonObject) event.subject();
+ String serviceId = payload.get(SERVICE_INSTANCE_LOCATION_ID).getAsString();
+ if (!isServiceUnderMaintenance(serviceId)){
+ int newBandwidth = payload.get(BANDWIDTH_TOTAL).getAsInt();
+ Map<String, Integer> maxBandwidthData = aaiService.fetchMaxBandwidthOfService(serviceId);
+ int oldBandwidth = maxBandwidthData.get("maxBandwidth");
+ if (newBandwidth != oldBandwidth) {
+ ccvpnPmDatastore.updateSvcState(serviceId, ServiceState.UNDER_MAINTENANCE);
+ sendModifyRequest(serviceId, newBandwidth, RequestOwner.UUI);
+ }
+ }
+ }
+ }
+
+ private void sendModifyRequest(String cllId, Integer newBandwidth, RequestOwner owner) {
+ policyService.sendOnsetMessageToPolicy(
+ policyService.formPolicyOnsetMessageForCCVPN(cllId, newBandwidth, owner)
+ );
+ }
+
+ private boolean needAdjust(String serivceId, double currentAverageUsage, int maxBandwidth){
+ return currentAverageUsage > threshold * maxBandwidth;
+ }
+
+ private boolean isServiceUnderMaintenance(String serivceId) {
+ return ccvpnPmDatastore.getStatusOfSvc(serivceId) == ServiceState.UNDER_MAINTENANCE;
+ }
+ };
+
+ /**
+ * AAI data consumer loop
+ */
+ aaiEventLoop = new Loop("AAIEventLoop"){
+ @Override
+ public void process(Event event) {
+ if (event.type() == SimpleEvent.Type.AAI_BW_REQ){
+ log.info("Received new AAI network policy query at: {}", event.time());
+ String serviceId = (String) event.subject();
+ Map<String, Integer> maxBandwidthData = aaiService.fetchMaxBandwidthOfService(serviceId);
+ int bwVal = maxBandwidthData.get("maxBandwidth");
+ if (maxBandwidthData != null){
+ if (ccvpnPmDatastore.getMaxBwOfSvc(serviceId) == 0){
+ ccvpnPmDatastore.updateMaxBw(serviceId, bwVal, true);
+ } else if (ccvpnPmDatastore.getMaxBwOfSvc(serviceId) != bwVal) {
+ ccvpnPmDatastore.updateMaxBw(serviceId, bwVal, true);
+ ccvpnPmDatastore.updateSvcState(serviceId, ServiceState.RUNNING);
+ }
+ }
+ }
+ }
+ };
+ scheduleEvaluation();
+ }
+
+ /**
+ * Stop the bandwidth evaluator process including two actors and periodic usage check
+ */
+ @PreDestroy
+ public void stop(){
+ stopScheduleEvaluation();
+ aaiEventLoop.stop();
+ evaluationEventLoop.stop();
+ }
+
+ /**
+ * Start to schedule periodic usage check at fixed rate
+ */
+ private void scheduleEvaluation(){
+ executorPool.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ post(new SimpleEvent(SimpleEvent.Type.PERIODIC_CHECK, 1));
+ }
+ }, 0, (evaluationInterval == 0? DEFAULT_EVAL_INTERVAL : evaluationInterval), TimeUnit.SECONDS);
+ }
+
+ /**
+ * Stop periodic bandwidth usage check
+ */
+ private void stopScheduleEvaluation(){
+ executorPool.shutdownNow();
+ }
+
+ /**
+ * Post/broadcast event between Loops
+ * @param event event object
+ */
+ public void post(@NonNull Event event){
+ if (event.type() == SimpleEvent.Type.AAI_BW_REQ) {
+ aaiEventLoop.add(event);
+ } else if (event.type() == SimpleEvent.Type.PERIODIC_CHECK) {
+ evaluationEventLoop.add(event);
+ } else if (event.type() == SimpleEvent.Type.ONDEMAND_CHECK) {
+ evaluationEventLoop.add(event);
+ }
+ }
+
+ private void loadConfig() {
+ configuration = Configuration.getInstance();
+ evaluationInterval = configuration.getCcvpnEvalInterval();
+ threshold = configuration.getCcvpnEvalThreshold();
+ precision = configuration.getCcvpnEvalPrecision(); // in Mbps;
+ }
+
+ private boolean isPeriodicCheckOn() {
+ configuration = Configuration.getInstance();
+ return configuration.isCcvpnEvalPeriodicCheckOn();
+ }
+
+ private boolean isOnDemandCheckOn() {
+ configuration = Configuration.getInstance();
+ return configuration.isCcvpnEvalOnDemandCheckOn();
+ }
+
+ /**
+ * Inner loop implementation. Each loop acts like an actor.
+ */
+ private abstract class Loop implements Runnable {
+ private final String name;
+ private volatile boolean running;
+ private final BlockingQueue<Event> eventsQueue;
+ private final ExecutorService executor;
+ private volatile Future<?> dispatchFuture;
+
+ /**
+ * Constructor that accepts a loop name
+ * @param name name of this loop
+ */
+ Loop(String name){
+ this.name = name;
+ executor = Executors.newSingleThreadExecutor();
+ eventsQueue = new LinkedBlockingQueue<>();
+ dispatchFuture = executor.submit(this);
+ }
+
+ /**
+ * Add new event to this loop
+ * @param evt Event
+ * @return true
+ */
+ public boolean add(Event evt) {
+ return eventsQueue.add(evt);
+ }
+
+ /**
+ * Running loop that process event accordingly
+ */
+ @Override
+ public void run(){
+ running = true;
+ log.info("BandwidthEvaluator -- {} initiated", this.name);
+ while (running){
+ try{
+ Event event = eventsQueue.take();
+ if (event == KILL_PILL){
+ break;
+ }
+ process(event);
+ } catch (InterruptedException e){
+ log.warn("Process loop interrupted");
+ } catch (Exception | Error e){
+ log.warn("Process loop hit an error {}", e.getMessage());
+ }
+ }
+ }
+
+ /**
+ * Operation defined by subclass for different event processing
+ * @param event incoming event
+ */
+ abstract public void process(Event event);
+
+ /**
+ * Stop this loop
+ */
+ public void stop(){
+ running = false;
+ add(KILL_PILL);
+ }
+ }
+}
diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/ccvpn/CCVPNPmDatastore.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/ccvpn/CCVPNPmDatastore.java
new file mode 100644
index 00000000..9c86f6e7
--- /dev/null
+++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/ccvpn/CCVPNPmDatastore.java
@@ -0,0 +1,263 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * slice-analysis-ms
+ * ================================================================================
+ * Copyright (C) 2022 Huawei Canada Limited.
+ * ==============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ *******************************************************************************/
+package org.onap.slice.analysis.ms.service.ccvpn;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import java.util.ArrayDeque;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+/**
+ * This class represents the data structure for storing the CCVPN pm data;
+ */
+@Component
+public class CCVPNPmDatastore {
+
+ private static Logger log = LoggerFactory.getLogger(CCVPNPmDatastore.class);
+ private static final Pattern pattern = Pattern.compile("([0-9.]+)\\s*(kb|Kb|mb|Mb|Gb|gb)*");
+ private static final int WINDOW_SIZE = 5;
+ private final ConcurrentMap<String, ServiceState> svcStatus = new ConcurrentHashMap<>();
+ private final ConcurrentMap<String, Integer> endpointToMaxBw = new ConcurrentHashMap<>();
+ private final ConcurrentMap<Endpointkey, EvictingQueue<Integer>> endpointToUsedBw = new ConcurrentHashMap<>();
+
+ /**
+ * Given a cllId, return a map between Endpointkey and their corresponding UsedBw Queue.
+ * All Endpoints belongs to this same service
+ * @param cllId target cll instance id
+ * @return a filtered map contains used bandwidth data of endpointkeys whose cllId equals to the given one.
+ */
+ public Map<Endpointkey, EvictingQueue<Integer>> getUsedBwOfSvc(String cllId){
+ return endpointToUsedBw.entrySet().stream()
+ .filter(map -> map.getKey().getCllId() == cllId)
+ .collect(Collectors.toMap(map -> map.getKey(), map -> map.getValue()));
+ }
+
+ /**
+ * Return the complete used bandwidth map.
+ * @return a complete endpoint to bandwidth data map
+ */
+ public Map<Endpointkey, EvictingQueue<Integer>> getUsedBwMap(){
+ return endpointToUsedBw;
+ }
+
+ /**
+ * Return max bandwidth of cll service. If max bandwidth is null or missing, return 0;
+ * @param cllId target cll instance id
+ * @return Integer bandwidth value
+ */
+ public Integer getMaxBwOfSvc(String cllId){
+ return endpointToMaxBw.getOrDefault(cllId, 0);
+ }
+
+ /**
+ * Get Service status of this cll service
+ * @param cllId target cll instance id
+ * @return ServiceState of this cll
+ */
+ public ServiceState getStatusOfSvc(String cllId){
+ return svcStatus.getOrDefault(cllId, ServiceState.UNKNOWN);
+ }
+
+ /**
+ * return the complete map of cll service status
+ * @return complete map of serviceStatusMap
+ */
+ public ConcurrentMap<String, ServiceState> getSvcStatusMap(){
+ return svcStatus;
+ }
+
+ /**
+ * Override the service status to provided state
+ * @param cllId target cll instance id
+ * @param state new state
+ */
+ public void updateSvcState(String cllId, ServiceState state){
+ svcStatus.put(cllId, state);
+ }
+
+ /**
+ * Update max bandwidth value to given bandwidth string
+ * @param cllId target cll instance id
+ * @param bw new bandwidth
+ */
+ public void updateMaxBw(String cllId, String bw){
+ double bwvvaldb = Double.parseDouble(bw);
+ int bwvval = (int) bwvvaldb;
+ updateMaxBw(cllId, bwvval, false);
+ }
+
+ /**
+ * Update max bandwidth to given bandwidth value;
+ * if @param{override} is false, only write the bandwidth if it is absent.
+ * Otherwise override the old value no matter if it exists or not
+ * Also, when @param{override} is true, compare the provided value with the old value, if equals, return false;
+ * otherwise, return true;
+ * @param cllId target cll instance id
+ * @param bw new bandwidth int value in Mbps
+ * @param override override old value or not
+ * @return whether bandwidth value is changed or not.
+ */
+ public boolean updateMaxBw(String cllId, int bw, boolean override){
+ ;
+ if ( endpointToMaxBw.putIfAbsent(cllId, bw) == null || !override){
+ return true;
+ } else {
+ if (endpointToMaxBw.get(cllId) == bw){
+ return false;
+ } else {
+ endpointToMaxBw.replace(cllId, bw);
+ return true;
+ }
+ }
+ }
+
+ /**
+ * Append the latest bandwidth data to associated endpoint
+ * @param cllId target cll instance id
+ * @param uniId target uni id
+ * @param bw latest bandwidth usage data
+ */
+ public void addUsedBwToEndpoint(String cllId, String uniId, String bw){
+ Endpointkey enk = new Endpointkey(cllId, uniId);
+ Matcher matcher = pattern.matcher(bw.trim());
+ //Default input bw unit is kbps;
+ String unit = null;
+ // Bw in Mbps;
+ int result = 0;
+ if (matcher.find()) {
+ unit = matcher.group(2);
+ if (unit == null || unit.isEmpty() || unit.toLowerCase().equals("kb")) {
+ double val = Double.parseDouble(matcher.group(1));
+ result = (int) Math.ceil((double) val / (int) 1000 ) ;
+ } else if (unit.toLowerCase().equals("mb")){
+ double val = Double.parseDouble(matcher.group(1));
+ result = (int) val ;
+ } else if (unit.toLowerCase().equals("gb")){
+ double val = Double.parseDouble(matcher.group(1));
+ result = (int) val * (int) 1000;
+ }
+ } else {
+ log.warn("Illigal bw string: " + bw);
+ }
+
+ EvictingQueue<Integer> dataq = new EvictingQueue<Integer>(WINDOW_SIZE);
+ dataq.offer(result);
+ EvictingQueue q = endpointToUsedBw.putIfAbsent(enk, dataq);
+ if (q != null) {
+ q.offer(result);
+ }
+
+ }
+
+ /**
+ * Copy the used bandwidth queue of specified cllId:uniId to an array and return;
+ * @param cllId target cll id
+ * @param uniId target uni id
+ * @return Object[] contains all the used bandwidth data
+ */
+ public Object[] readToArray(String cllId, String uniId){
+ return endpointToUsedBw.get(new Endpointkey(cllId, uniId)).tryReadToArray();
+ }
+
+ /**
+ * Inner data structure is logically similar to circular buffer, thread-safe through blocking
+ * @param <E> Generic type of data
+ */
+ public class EvictingQueue<E> {
+ private final Queue<E> delegate;
+ final int maxSize;
+
+ /**
+ * Constructor accept a maxsize param
+ * @param maxSize max size
+ */
+ EvictingQueue(int maxSize){
+ if (maxSize < 0){
+ throw new IllegalArgumentException("Invalid maxsize for initializing EvictingQueue");
+ }
+ this.delegate = new ArrayDeque<>(maxSize);
+ this.maxSize = maxSize;
+ }
+
+ /**
+ * Adding new data to this queue
+ * @param e new data
+ * @return true
+ */
+ public synchronized boolean offer(E e){
+ return add(e);
+ }
+
+ /**
+ * Try copy data to an array and return, only if data has filled up the whole queue
+ * Otherwise, return null
+ * @return the data array
+ */
+ public synchronized Object[] tryReadToArray(){
+ if (remainingCapacity() > 0){
+ return null;
+ }
+ return toArray();
+ }
+
+ /**
+ * Return the size of this queue, and number of data added. It is no larger than the max capacity.
+ * @return int value of output
+ */
+ public int size(){
+ return delegate.size();
+ }
+
+ /**
+ * return the remaining capacity of this queue
+ * @return int value of output
+ */
+ public int remainingCapacity(){
+ return maxSize - size();
+ }
+
+ private Object[] toArray(){
+ return delegate.toArray();
+ }
+
+ private boolean add(E e){
+ if(null == e){
+ throw new IllegalArgumentException("Invalid new item in add method");
+ }
+ if (maxSize == 0){
+ return true;
+ }
+ if (size() == maxSize){
+ delegate.remove();
+ }
+ delegate.add(e);
+ return true;
+ }
+ }
+}
diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/ccvpn/Endpointkey.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/ccvpn/Endpointkey.java
new file mode 100644
index 00000000..f6c1a8c1
--- /dev/null
+++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/ccvpn/Endpointkey.java
@@ -0,0 +1,75 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * slice-analysis-ms
+ * ================================================================================
+ * Copyright (C) 2022 Huawei Canada Limited.
+ * ==============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ *******************************************************************************/
+package org.onap.slice.analysis.ms.service.ccvpn;
+
+import java.util.Objects;
+
+/**
+ * Endpoint key class represents each uni associated with each cll
+ */
+public class Endpointkey {
+
+ private final String cllId;
+ private final String uniId;
+
+ /**
+ * Constructor accpets cllId and uniId.
+ * @param cllId String cll instance id
+ * @param uniId String uni id
+ */
+ public Endpointkey(String cllId, String uniId){
+ this.cllId = cllId;
+ this.uniId = uniId;
+ }
+
+ /**
+ * Return cllId
+ * @return String cllId
+ */
+ public String getCllId() {
+ return cllId;
+ }
+
+ /**
+ * Return uniId
+ * @return String uni id
+ */
+ public String getUniId() {
+ return uniId;
+ }
+
+ @Override
+ public int hashCode() { return Objects.hash(cllId, uniId); }
+
+ @Override
+ public boolean equals(Object obj){
+ if (this == obj){
+ return true;
+ }
+ if (obj instanceof Endpointkey){
+ final Endpointkey other = (Endpointkey) obj;
+ return Objects.equals(this.cllId, other.cllId) &&
+ Objects.equals(this.uniId, other.uniId);
+ }
+ return false;
+ }
+
+}
diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/ccvpn/Event.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/ccvpn/Event.java
new file mode 100644
index 00000000..c36e4d86
--- /dev/null
+++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/ccvpn/Event.java
@@ -0,0 +1,37 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * slice-analysis-ms
+ * ================================================================================
+ * Copyright (C) 2022 Huawei Canada Limited.
+ * ==============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ *******************************************************************************/
+package org.onap.slice.analysis.ms.service.ccvpn;
+
+/**
+ * Event (CCVPN) interface;
+ * It is the message entity inside CCVPN Closed-loop.
+ * @param <T> message type
+ * @param <S> message paylaod
+ */
+public interface Event<T extends Enum, S> {
+
+ long time();
+
+ T type();
+
+ S subject();
+
+}
diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/ccvpn/RequestOwner.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/ccvpn/RequestOwner.java
new file mode 100644
index 00000000..319032ad
--- /dev/null
+++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/ccvpn/RequestOwner.java
@@ -0,0 +1,30 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * slice-analysis-ms
+ * ================================================================================
+ * Copyright (C) 2022 Huawei Canada Limited.
+ * ==============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ *******************************************************************************/
+
+package org.onap.slice.analysis.ms.service.ccvpn;
+
+/**
+ * Modification request owner
+ */
+public enum RequestOwner {
+ DCAE,
+ UUI
+}
diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/ccvpn/ServiceState.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/ccvpn/ServiceState.java
new file mode 100644
index 00000000..b4d358f5
--- /dev/null
+++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/ccvpn/ServiceState.java
@@ -0,0 +1,32 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * slice-analysis-ms
+ * ================================================================================
+ * Copyright (C) 2022 Huawei Canada Limited.
+ * ==============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ *******************************************************************************/
+package org.onap.slice.analysis.ms.service.ccvpn;
+
+/**
+ * Enum class represents possible state of a Ethernet service (designed for CCVPN CLL serivce)
+ */
+public enum ServiceState {
+ RUNNING,
+ UNDER_MAINTENANCE,
+ ERROR,
+ DELETED,
+ UNKNOWN
+}
diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/ccvpn/SimpleEvent.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/ccvpn/SimpleEvent.java
new file mode 100644
index 00000000..8277380d
--- /dev/null
+++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/ccvpn/SimpleEvent.java
@@ -0,0 +1,98 @@
+/*******************************************************************************
+ * ============LICENSE_START=======================================================
+ * slice-analysis-ms
+ * ================================================================================
+ * Copyright (C) 2022 Huawei Canada Limited.
+ * =============================================================================
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ============LICENSE_END=========================================================
+ *
+ *******************************************************************************/
+package org.onap.slice.analysis.ms.service.ccvpn;
+
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+
+/**
+ * SimpleEvent a generic class implements Event (CCVPN) interface;
+ * It is the message entity inside CCVPN Closed-loop
+ *
+ * @param <T> message type
+ * @param <S> message paylaod
+ */
+public class SimpleEvent<T extends Enum, S> implements Event<T, S> {
+ private final T type;
+ private final S subject;
+ private final long time;
+
+ /**
+ * All the event types
+ */
+ public enum Type {
+ PERIODIC_CHECK,
+ ONDEMAND_CHECK,
+ AAI_BW_REQ
+ }
+
+ /**
+ * Event contructor
+ * @param type event type
+ * @param subject event content
+ */
+ public SimpleEvent(T type, S subject) {
+ this.type = type;
+ this.subject = subject;
+ this.time = System.currentTimeMillis();
+ }
+
+ /**
+ * Return the epoch time of this event happened.
+ * @return long value of epoch time
+ */
+ @Override
+ public long time() {
+ return time;
+ }
+
+ /**
+ * Return the type of this event.
+ * @return event type
+ */
+ @Override
+ public T type() {
+ return type;
+ }
+
+ /**
+ * Return the subject of this event
+ * @return event content
+ */
+ @Override
+ public S subject() {
+ return subject;
+ }
+
+ /**
+ * toString method
+ * @return toString
+ */
+ @Override
+ public String toString() {
+ return new StringBuilder()
+ .append("time " + LocalDateTime.ofInstant(Instant.ofEpochMilli(time), ZoneId.systemDefault()))
+ .append("type " + type())
+ .append("subject " + subject())
+ .toString();
+ }
+}