diff options
Diffstat (limited to 'components/slice-analysis-ms/src/main/java/org/onap')
10 files changed, 465 insertions, 159 deletions
diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/AaiEventNotificationCallback.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/AaiEventNotificationCallback.java index 9bff14a0..292cf6ae 100644 --- a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/AaiEventNotificationCallback.java +++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/dmaap/AaiEventNotificationCallback.java @@ -95,7 +95,7 @@ public class AaiEventNotificationCallback implements NotificationCallback { private void handleMsgJsonObject(JsonObject jsonObject){ JsonObject header = jsonObject.get(EVENT_HEADER).getAsJsonObject(); - if (!header.has(ACTION) || !header.get(ACTION).getAsString().equals(aaiNotifTargetAction)){ + if (!header.has(ACTION) || !isValid(header, ACTION, aaiNotifTargetAction)){ return; } if (!header.has(ENTITY_TYPE) || !header.get(ENTITY_TYPE).getAsString().equals(aaiNotifTargetEntity)){ @@ -131,4 +131,15 @@ public class AaiEventNotificationCallback implements NotificationCallback { } return null; } + + // make sure dmaap mesg header is expected type and valid; + private boolean isValid(JsonObject header, String targetKey, String allowed){ + boolean valid = false; + String[] allowedArr = allowed.split("\\|"); + String targetVal= header.get(targetKey).getAsString(); + for (String al: allowedArr){ + valid |= targetVal.equals(al); + } + return valid; + } } diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/models/Configuration.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/models/Configuration.java index 2a509aa1..4b5fe2ed 100644 --- a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/models/Configuration.java +++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/models/Configuration.java @@ -81,6 +81,7 @@ public class Configuration { private String aaiNotifTargetEntity; private boolean ccvpnEvalPeriodicCheckOn; private boolean ccvpnEvalOnDemandCheckOn; + private String ccvpnEvalStrategy; /** * No args constructor @@ -168,6 +169,7 @@ public class Configuration { ccvpnEvalPrecision = jsonObject.get("sliceanalysisms.ccvpnEvalPrecision").getAsDouble(); ccvpnEvalPeriodicCheckOn = jsonObject.get("sliceanalysisms.ccvpnEvalPeriodicCheckOn").getAsBoolean(); ccvpnEvalOnDemandCheckOn = jsonObject.get("sliceanalysisms.ccvpnEvalOnDemandCheckOn").getAsBoolean(); + ccvpnEvalStrategy = jsonObject.get("sliceanalysisms.ccvpnEvalStrategy").getAsString(); if (Objects.isNull(jsonObject.get("aafUsername"))) { aafUsername = null; diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/PolicyService.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/PolicyService.java index 24aeea61..da55c0b4 100644 --- a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/PolicyService.java +++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/PolicyService.java @@ -51,9 +51,11 @@ import com.fasterxml.jackson.databind.ObjectMapper; */ @Component public class PolicyService { + private final static int SERVICE_RATE_INTERVAL = 5000; // in ms private PolicyDmaapClient policyDmaapClient; private static Logger log = LoggerFactory.getLogger(PolicyService.class); private ObjectMapper objectMapper = new ObjectMapper(); + private RateLimiter rateLimiter; /** * Initialization @@ -62,6 +64,7 @@ public class PolicyService { public void init() { Configuration configuration = Configuration.getInstance(); policyDmaapClient = new PolicyDmaapClient(configuration); + rateLimiter = new RateLimiter(1, SERVICE_RATE_INTERVAL); } protected <T> OnsetMessage formPolicyOnsetMessage(String snssai, AdditionalProperties<T> addProps, Map<String, String> serviceDetails) { @@ -189,6 +192,7 @@ public class PolicyService { String msg = ""; try { msg = objectMapper.writeValueAsString(onsetMessage); + rateLimiter.getToken(); log.info("Sending onset message to Onap/Policy for ControlLoop-CCVPN-CLL, the msg: {}", msg); policyDmaapClient.sendNotificationToPolicy(msg); } diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/RateLimiter.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/RateLimiter.java new file mode 100644 index 00000000..77ee61f2 --- /dev/null +++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/RateLimiter.java @@ -0,0 +1,59 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * slice-analysis-ms + * ================================================================================ + * Copyright (C) 2022 Huawei Canada Limited. + * ============================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + *******************************************************************************/ +package org.onap.slice.analysis.ms.service; + +/** + * A simple rate-limiter; make sure bandwidth adjustment requests don't swarm underlying network controller + */ +class RateLimiter { + private int MAX_TOKENS; + private long lastRequestTime = System.currentTimeMillis(); + private long possibleTokens = 0; + private long interval = 1000; + + /** + * Constructor for rate limiter (simple token bucket filter) + * @param maxTokens max number of token allowed + * @param interval interval(ms) between received new token + */ + public RateLimiter(int maxTokens, int interval){ + MAX_TOKENS = maxTokens; + this.interval = interval; + } + + /** + * Trying to get a new token for execution, if no token left, stall for interval ms. + * @throws InterruptedException + */ + synchronized public void getToken() throws InterruptedException { + possibleTokens += (System.currentTimeMillis() - lastRequestTime) / interval; + if (possibleTokens > MAX_TOKENS){ + possibleTokens = MAX_TOKENS; + } + if (possibleTokens == 0){ + Thread.sleep(interval); + } else { + possibleTokens--; + } + // granting token + lastRequestTime = System.currentTimeMillis(); + } +} diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/ccvpn/BandwidthEvaluator.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/ccvpn/BandwidthEvaluator.java index c66122c1..43391809 100644 --- a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/ccvpn/BandwidthEvaluator.java +++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/ccvpn/BandwidthEvaluator.java @@ -20,12 +20,10 @@ *******************************************************************************/ package org.onap.slice.analysis.ms.service.ccvpn; -import com.google.gson.JsonObject; import lombok.NonNull; import org.onap.slice.analysis.ms.aai.AaiService; import org.onap.slice.analysis.ms.models.Configuration; -import org.onap.slice.analysis.ms.service.PolicyService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -33,23 +31,14 @@ import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; import java.util.Map; -import java.util.TreeMap; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; - - -import static java.util.concurrent.Executors.newSingleThreadExecutor; /** * This class implements the CCVPN PM Closed-loop logical function. @@ -67,15 +56,14 @@ public class BandwidthEvaluator { CCVPNPmDatastore ccvpnPmDatastore; @Autowired - PolicyService policyService; + StrategyFactory strategyFactory; private Loop evaluationEventLoop; private Loop aaiEventLoop; private static final Event KILL_PILL = new SimpleEvent(null, 0); private static final int DEFAULT_EVAL_INTERVAL = 5; - private static final String SERVICE_INSTANCE_LOCATION_ID = "service-instance-location-id"; - private static final String BANDWIDTH_TOTAL = "bandwidth-total"; + private static final String DEFAULT_STRATEGY_NAME = "FixedUpperBoundStrategy"; /** * Interval of each round of evaluation, defined in config_all.json @@ -83,14 +71,10 @@ public class BandwidthEvaluator { private static int evaluationInterval; /** - * Percentage threshold of bandwidth adjustment. + * Bandwidth Evaluation and adjustment strategy. */ - private static double threshold; + private static String strategyName; - /** - * Precision of bandwidth evaluation and adjustment. - */ - private static double precision; // in Mbps; private final ScheduledExecutorService executorPool = Executors.newScheduledThreadPool(1); /** @@ -99,110 +83,17 @@ public class BandwidthEvaluator { @PostConstruct public void init() { loadConfig(); + strategyName = (strategyName != null)? strategyName : DEFAULT_STRATEGY_NAME; + evaluationInterval = (evaluationInterval == 0)? DEFAULT_EVAL_INTERVAL : evaluationInterval; + EvaluationStrategy strategy = strategyFactory.getStrategy(strategyName); + /** * Evalution main loop */ evaluationEventLoop = new Loop("EvaluationLoop"){ @Override public void process(Event event) { - if (event.type() == SimpleEvent.Type.PERIODIC_CHECK && isPeriodicCheckOn()){ - log.info("=== Processing new periodic check request: {} ===", event.time()); - Map<Endpointkey, CCVPNPmDatastore.EvictingQueue<Integer>> usedBwMap = ccvpnPmDatastore.getUsedBwMap(); - Map<String, Integer> candidate = new TreeMap<>(); - for(Map.Entry<Endpointkey, CCVPNPmDatastore.EvictingQueue<Integer>> entry: usedBwMap.entrySet()) { - String serviceId = entry.getKey().getCllId(); - Object[] usedBws = entry.getValue().tryReadToArray(); - - if (usedBws == null) { - // No enough data for evaluating - log.debug("CCVPN Evaluator Output: service {}, not enough data to evaluate", serviceId); - continue; - } - if (ccvpnPmDatastore.getMaxBwOfSvc(serviceId) == 0) { - // Max bandwidth not cached yet - log.debug("CCVPN Evaluator Output: service {}, max bandwidth not cached, wait for next round", serviceId); - post(new SimpleEvent(SimpleEvent.Type.AAI_BW_REQ, serviceId)); - continue; - } - double avg = Arrays.stream(usedBws) - .mapToInt(o -> (int) o) - .summaryStatistics() - .getAverage(); - if (needAdjust(serviceId, avg, ccvpnPmDatastore.getMaxBwOfSvc(serviceId))) { - log.debug("CCVPN Evaluator Output: service {}, need adjustment, putting into candidate list", serviceId); - int newBw = (int) (Math.ceil((avg / threshold) * 1.2 / precision) * precision); - candidate.put(serviceId, Math.max(candidate.getOrDefault(serviceId, 0), newBw)); - } - } - // check svc under maintenance - Map<String , ServiceState> svcUnderMaintenance = getServicesUnderMaintenance(); - for (Map.Entry<String, ServiceState> entry: svcUnderMaintenance.entrySet()){ - candidate.putIfAbsent(entry.getKey(), 0); - } - // fetch the maxbandwidth info if underMaintenance; otherwise send modification request - for(Map.Entry<String, Integer> entry: candidate.entrySet()) { - if (isServiceUnderMaintenance(entry.getKey())) { - if (entry.getValue() == 0){ - log.debug("CCVPN Evaluator Output: service {}," + - " are in maintenance state, fetching bandwidth info from AAI", entry.getKey()); - } else { - log.debug("CCVPN Evaluator Output: candidate {}," + - " need adjustment, but skipped due to maintenance state", entry.getKey()); - } - post(new SimpleEvent(SimpleEvent.Type.AAI_BW_REQ, entry.getKey())); - continue; - } - log.debug("CCVPN Evaluator Output: candidate {}," + - " need adjustment, sending request to policy", entry.getKey()); - ccvpnPmDatastore.updateSvcState(entry.getKey(), ServiceState.UNDER_MAINTENANCE); - sendModifyRequest(entry.getKey(), entry.getValue(), RequestOwner.DCAE); - } - log.info("=== Processing periodic check complete ==="); - - } else if (event.type() == SimpleEvent.Type.ONDEMAND_CHECK && isOnDemandCheckOn()) { - log.info("=== Processing new on-demand check request: {} ===", event.time()); - JsonObject payload = (JsonObject) event.subject(); - String serviceId = payload.get(SERVICE_INSTANCE_LOCATION_ID).getAsString(); - if (!isServiceUnderMaintenance(serviceId)){ - int newBandwidth = payload.get(BANDWIDTH_TOTAL).getAsInt(); - Map<String, Integer> maxBandwidthData = aaiService.fetchMaxBandwidthOfService(serviceId); - if (maxBandwidthData.get("maxBandwidth") != null - && maxBandwidthData.get("maxBandwidth") != newBandwidth){ - log.debug("CCVPN Evaluator Output: on-demand adjustment request for service: {} processed," + - " sending request to policy", serviceId); - ccvpnPmDatastore.updateSvcState(serviceId, ServiceState.UNDER_MAINTENANCE); - sendModifyRequest(serviceId, newBandwidth, RequestOwner.UUI); - } - } else { - log.debug("CCVPN Evaluator Output: service {}," + - " received on-demand request, but skipped due to maintenance state", serviceId); - } - log.info("=== Processing on-demand check complete ==="); - } - } - - private void sendModifyRequest(String cllId, Integer newBandwidth, RequestOwner owner) { - log.info("Sending modification request to policy. RequestOwner: {} - Service: {} change to bw: {}", - owner, cllId, newBandwidth); - policyService.sendOnsetMessageToPolicy( - policyService.formPolicyOnsetMessageForCCVPN(cllId, newBandwidth, owner) - ); - } - - private boolean needAdjust(String serivceId, double currentAverageUsage, int maxBandwidth){ - log.debug("CCVPN Service Usage Analysis: usage: {}, threshold: {}, maxbw {}", currentAverageUsage, threshold, maxBandwidth); - return currentAverageUsage > threshold * maxBandwidth; - } - - private boolean isServiceUnderMaintenance(String serivceId) { - return ccvpnPmDatastore.getStatusOfSvc(serivceId) == ServiceState.UNDER_MAINTENANCE; - } - - private Map<String, ServiceState> getServicesUnderMaintenance(){ - return ccvpnPmDatastore.getSvcStatusMap().entrySet() - .stream() - .filter(e -> e.getValue() == ServiceState.UNDER_MAINTENANCE) - .collect(Collectors.toMap(p -> p.getKey(), p -> p.getValue())); + strategy.execute(event); } }; @@ -213,22 +104,22 @@ public class BandwidthEvaluator { @Override public void process(Event event) { if (event.type() == SimpleEvent.Type.AAI_BW_REQ){ - log.info("=== Processing new AAI network policy query at: {} ===", event.time()); + log.debug("=== Processing new AAI network policy query at: {} ===", event.time()); String serviceId = (String) event.subject(); Map<String, Integer> maxBandwidthData = aaiService.fetchMaxBandwidthOfService(serviceId); if (maxBandwidthData.get("maxBandwidth") != null){ - log.debug("Successfully retrieved bandwidth info from AAI; service: {}, bandwidth: {}", + log.info("Successfully retrieved bandwidth info from AAI; service: {}, bandwidth: {}", serviceId, maxBandwidthData.get("maxBandwidth")); int bwValue = maxBandwidthData.get("maxBandwidth").intValue(); - if (ccvpnPmDatastore.getMaxBwOfSvc(serviceId) == 0){ - ccvpnPmDatastore.updateMaxBw(serviceId, bwValue, true); - } else if (ccvpnPmDatastore.getMaxBwOfSvc(serviceId) != bwValue) { - log.debug("Service modification complete; serviceId: {} with new bandwidth: {}", serviceId, bwValue); - ccvpnPmDatastore.updateMaxBw(serviceId, bwValue, true); + if (ccvpnPmDatastore.getProvBwOfSvc(serviceId) == 0){ + ccvpnPmDatastore.updateProvBw(serviceId, bwValue, true); + } else if (ccvpnPmDatastore.getProvBwOfSvc(serviceId) != bwValue) { + log.info("Service modification complete; serviceId: {} with new bandwidth: {}", serviceId, bwValue); + ccvpnPmDatastore.updateProvBw(serviceId, bwValue, true); ccvpnPmDatastore.updateSvcState(serviceId, ServiceState.RUNNING); } } - log.info("=== Processing AAI network policy query complete ==="); + log.debug("=== Processing AAI network policy query complete ==="); } } }; @@ -283,18 +174,7 @@ public class BandwidthEvaluator { private void loadConfig() { configuration = Configuration.getInstance(); evaluationInterval = configuration.getCcvpnEvalInterval(); - threshold = configuration.getCcvpnEvalThreshold(); - precision = configuration.getCcvpnEvalPrecision(); // in Mbps; - } - - private boolean isPeriodicCheckOn() { - configuration = Configuration.getInstance(); - return configuration.isCcvpnEvalPeriodicCheckOn(); - } - - private boolean isOnDemandCheckOn() { - configuration = Configuration.getInstance(); - return configuration.isCcvpnEvalOnDemandCheckOn(); + strategyName = configuration.getCcvpnEvalStrategy(); } /** diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/ccvpn/CCVPNPmDatastore.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/ccvpn/CCVPNPmDatastore.java index 9c86f6e7..6d9b9604 100644 --- a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/ccvpn/CCVPNPmDatastore.java +++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/ccvpn/CCVPNPmDatastore.java @@ -43,7 +43,11 @@ public class CCVPNPmDatastore { private static final Pattern pattern = Pattern.compile("([0-9.]+)\\s*(kb|Kb|mb|Mb|Gb|gb)*"); private static final int WINDOW_SIZE = 5; private final ConcurrentMap<String, ServiceState> svcStatus = new ConcurrentHashMap<>(); - private final ConcurrentMap<String, Integer> endpointToMaxBw = new ConcurrentHashMap<>(); + // Provisioned bandwidth of each endpoint + private final ConcurrentMap<String, Integer> endpointToProvBw = new ConcurrentHashMap<>(); + // Max bandwidth (upper-bound) of each endpoint + private final ConcurrentMap<String, Integer> upperBoundBw = new ConcurrentHashMap<>(); + // Current bandwidth usage data list from customers private final ConcurrentMap<Endpointkey, EvictingQueue<Integer>> endpointToUsedBw = new ConcurrentHashMap<>(); /** @@ -67,12 +71,12 @@ public class CCVPNPmDatastore { } /** - * Return max bandwidth of cll service. If max bandwidth is null or missing, return 0; + * Return provisioned bandwidth of cll service. If provisioned bandwidth is null or missing, return 0; * @param cllId target cll instance id * @return Integer bandwidth value */ - public Integer getMaxBwOfSvc(String cllId){ - return endpointToMaxBw.getOrDefault(cllId, 0); + public Integer getProvBwOfSvc(String cllId){ + return endpointToProvBw.getOrDefault(cllId, 0); } /** @@ -84,6 +88,10 @@ public class CCVPNPmDatastore { return svcStatus.getOrDefault(cllId, ServiceState.UNKNOWN); } + public Integer getUpperBoundBwOfSvc(String cllId){ + return upperBoundBw.getOrDefault(cllId, Integer.MAX_VALUE); + } + /** * return the complete map of cll service status * @return complete map of serviceStatusMap @@ -102,18 +110,27 @@ public class CCVPNPmDatastore { } /** - * Update max bandwidth value to given bandwidth string + * Update provisioned bandwidth value to given bandwidth string * @param cllId target cll instance id * @param bw new bandwidth */ - public void updateMaxBw(String cllId, String bw){ + public void updateProvBw(String cllId, String bw){ double bwvvaldb = Double.parseDouble(bw); int bwvval = (int) bwvvaldb; - updateMaxBw(cllId, bwvval, false); + updateProvBw(cllId, bwvval, false); + } + + /** + * Update upper bound bandwidth value to given bandwidth + * @param cllId target cll instance id + * @param bw new bandwidth + */ + public void updateUpperBoundBw(String cllId, int bw){ + upperBoundBw.put(cllId, bw); } /** - * Update max bandwidth to given bandwidth value; + * Update provisioned bandwidth to given bandwidth value; * if @param{override} is false, only write the bandwidth if it is absent. * Otherwise override the old value no matter if it exists or not * Also, when @param{override} is true, compare the provided value with the old value, if equals, return false; @@ -123,15 +140,15 @@ public class CCVPNPmDatastore { * @param override override old value or not * @return whether bandwidth value is changed or not. */ - public boolean updateMaxBw(String cllId, int bw, boolean override){ - ; - if ( endpointToMaxBw.putIfAbsent(cllId, bw) == null || !override){ + public boolean updateProvBw(String cllId, int bw, boolean override){ + if (!override && !endpointToProvBw.containsKey(cllId)){ + endpointToProvBw.put(cllId, bw); return true; } else { - if (endpointToMaxBw.get(cllId) == bw){ + if (endpointToProvBw.get(cllId) == bw){ return false; } else { - endpointToMaxBw.replace(cllId, bw); + endpointToProvBw.replace(cllId, bw); return true; } } @@ -166,13 +183,7 @@ public class CCVPNPmDatastore { log.warn("Illigal bw string: " + bw); } - EvictingQueue<Integer> dataq = new EvictingQueue<Integer>(WINDOW_SIZE); - dataq.offer(result); - EvictingQueue q = endpointToUsedBw.putIfAbsent(enk, dataq); - if (q != null) { - q.offer(result); - } - + endpointToUsedBw.computeIfAbsent(enk, k -> new EvictingQueue<Integer>(WINDOW_SIZE)).offer(result); } /** diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/ccvpn/EvaluationStrategy.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/ccvpn/EvaluationStrategy.java new file mode 100644 index 00000000..7d502894 --- /dev/null +++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/ccvpn/EvaluationStrategy.java @@ -0,0 +1,36 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * slice-analysis-ms + * ================================================================================ + * Copyright (C) 2022 Huawei Canada Limited. + * ============================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + *******************************************************************************/ +package org.onap.slice.analysis.ms.service.ccvpn; + +public interface EvaluationStrategy { + + /** + * Defined strategy logic to deal with different events + * @param event + */ + void execute(Event event); + + /** + * Return name of this strategy + * @return name + */ + String getName(); +} diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/ccvpn/FixedUpperBoundStrategy.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/ccvpn/FixedUpperBoundStrategy.java new file mode 100644 index 00000000..874e3271 --- /dev/null +++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/ccvpn/FixedUpperBoundStrategy.java @@ -0,0 +1,206 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * slice-analysis-ms + * ================================================================================ + * Copyright (C) 2022 Huawei Canada Limited. + * ============================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + *******************************************************************************/ +package org.onap.slice.analysis.ms.service.ccvpn; + +import com.google.gson.JsonObject; +import org.onap.slice.analysis.ms.models.Configuration; +import org.onap.slice.analysis.ms.service.PolicyService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +import javax.annotation.PostConstruct; +import java.util.Arrays; +import java.util.Map; +import java.util.TreeMap; +import java.util.stream.Collectors; + +@Component +public class FixedUpperBoundStrategy implements EvaluationStrategy{ + private static Logger log = LoggerFactory.getLogger(FixedUpperBoundStrategy.class); + private Configuration configuration; + + private static final String TYPE_NAME = "FixedUpperBoundStrategy"; + private static final String SERVICE_INSTANCE_LOCATION_ID = "service-instance-location-id"; + private static final String BANDWIDTH_TOTAL = "bandwidth-total"; + + /** + * Percentage threshold of bandwidth adjustment. + */ + private static double threshold; + + /** + * Precision of bandwidth evaluation and adjustment. + */ + private static double precision; // in Mbps; + + @Autowired + BandwidthEvaluator bandwidthEvaluator; + + @Autowired + CCVPNPmDatastore ccvpnPmDatastore; + + @Autowired + PolicyService policyService; + + @PostConstruct + public void init() { + loadConfig(); + } + + @Override + public void execute(Event event){ + if (event.type() == SimpleEvent.Type.PERIODIC_CHECK && isPeriodicCheckOn()){ + log.debug("=== Processing new periodic check request: {} ===", event.time()); + Map<Endpointkey, CCVPNPmDatastore.EvictingQueue<Integer>> usedBwMap = ccvpnPmDatastore.getUsedBwMap(); + Map<String, Integer> candidate = new TreeMap<>(); + for(Map.Entry<Endpointkey, CCVPNPmDatastore.EvictingQueue<Integer>> entry: usedBwMap.entrySet()) { + String serviceId = entry.getKey().getCllId(); + Object[] usedBws = entry.getValue().tryReadToArray(); + + if (usedBws == null) { + // No enough data for evaluating + log.debug("CCVPN Evaluator Output: service {}, not enough data to evaluate", serviceId); + continue; + } + if (ccvpnPmDatastore.getProvBwOfSvc(serviceId) == 0) { + // Max bandwidth not cached yet + log.debug("CCVPN Evaluator Output: service {}, max bandwidth not cached, wait for next round", serviceId); + post(new SimpleEvent(SimpleEvent.Type.AAI_BW_REQ, serviceId)); + continue; + } + double avg = Arrays.stream(usedBws) + .mapToInt(o -> (int) o) + .summaryStatistics() + .getAverage(); + int provBw = ccvpnPmDatastore.getProvBwOfSvc(serviceId); + int upperBw = ccvpnPmDatastore.getUpperBoundBwOfSvc(serviceId); + if (needAdjust(serviceId, avg, provBw, upperBw)) { + int newBw = needAdjustTo(serviceId, avg, provBw, upperBw); + if(Math.abs(newBw - provBw) >= precision){ + log.info("CCVPN Evaluator Output: service {}, need adjustment, putting into candidate list", serviceId); + candidate.put(serviceId, newBw); + } + } + } + // check svc under maintenance + Map<String , ServiceState> svcUnderMaintenance = getServicesUnderMaintenance(); + for (Map.Entry<String, ServiceState> entry: svcUnderMaintenance.entrySet()){ + candidate.putIfAbsent(entry.getKey(), 0); + } + // fetch the provisioned bandwidth info if underMaintenance; otherwise send modification request + for(Map.Entry<String, Integer> entry: candidate.entrySet()) { + //still doing adjustment + if (isServiceUnderMaintenance(entry.getKey())) { + if (entry.getValue() == 0){ + log.debug("CCVPN Evaluator Output: service {}," + + " is in maintenance state, fetching bandwidth info from AAI", entry.getKey()); + } else { + log.debug("CCVPN Evaluator Output: candidate {}," + + " need an adjustment, but skipped due to in maintenance state", entry.getKey()); + } + post(new SimpleEvent(SimpleEvent.Type.AAI_BW_REQ, entry.getKey())); + continue; + } + //not in the mid of adjustment; we are free to adjust. + log.info("CCVPN Evaluator Output: candidate {}," + + " need an adjustment, sending request to policy", entry.getKey()); + ccvpnPmDatastore.updateSvcState(entry.getKey(), ServiceState.UNDER_MAINTENANCE); + sendModifyRequest(entry.getKey(), entry.getValue(), RequestOwner.DCAE); + } + log.debug("=== Processing periodic check complete ==="); + } + if (event.type() == SimpleEvent.Type.ONDEMAND_CHECK && isOnDemandCheckOn()) { + log.debug("=== Processing upperbound adjustment request: {} ===", event.time()); + JsonObject payload = (JsonObject) event.subject(); + String serviceId = payload.get(SERVICE_INSTANCE_LOCATION_ID).getAsString(); + int newBandwidth = payload.get(BANDWIDTH_TOTAL).getAsInt(); + log.info("Update service {} bandwidth upperbound to {} ", serviceId, newBandwidth); + ccvpnPmDatastore.updateUpperBoundBw(serviceId, newBandwidth); + log.debug("=== Processing upperbound adjustment complete ==="); + } + } + + @Override + public String getName() { + return TYPE_NAME; + } + + /** + * Post/broadcast event to the BandwidthEvaluator + * @param event event object + */ + private void post(Event event){ + bandwidthEvaluator.post(event); + } + + private void loadConfig() { + configuration = Configuration.getInstance(); + threshold = configuration.getCcvpnEvalThreshold(); + precision = configuration.getCcvpnEvalPrecision(); // in Mbps; + } + + private boolean isPeriodicCheckOn() { + configuration = Configuration.getInstance(); + return configuration.isCcvpnEvalPeriodicCheckOn(); + } + + private boolean isOnDemandCheckOn() { + configuration = Configuration.getInstance(); + return configuration.isCcvpnEvalOnDemandCheckOn(); + } + + // send modification requestion + private void sendModifyRequest(String cllId, Integer newBandwidth, RequestOwner owner) { + log.info("Sending modification request to policy. RequestOwner: {} - Service: {} change to bw: {}", + owner, cllId, newBandwidth); + policyService.sendOnsetMessageToPolicy( + policyService.formPolicyOnsetMessageForCCVPN(cllId, newBandwidth, owner) + ); + } + // check if an adjustment is necessary + private boolean needAdjust(String serivceId, double used, int provBandwidth, int upper){ + log.debug("CCVPN Service Usage Analysis: usage: {}, threshold: {}, currentProvisioned {}, upperbound {}", + used, threshold, provBandwidth, upper); + return provBandwidth > upper || used > threshold * provBandwidth; + } + + // calculate new bandwidth to accomodate customer + private int needAdjustTo(String serivceId, double used, int cur, int upper){ + if (cur >= upper){ + return upper; + } + int expected = (int) (Math.ceil((used / threshold) * 1.2 / precision) * precision); + return Math.min(expected, upper); + } + // check is service under maint + private boolean isServiceUnderMaintenance(String serivceId) { + return ccvpnPmDatastore.getStatusOfSvc(serivceId) == ServiceState.UNDER_MAINTENANCE; + } + // get a collection of service under maint + private Map<String, ServiceState> getServicesUnderMaintenance(){ + return ccvpnPmDatastore.getSvcStatusMap().entrySet() + .stream() + .filter(e -> e.getValue() == ServiceState.UNDER_MAINTENANCE) + .collect(Collectors.toMap(p -> p.getKey(), p -> p.getValue())); + } +} diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/ccvpn/FlexibleThresholdStrategy.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/ccvpn/FlexibleThresholdStrategy.java new file mode 100644 index 00000000..d60c3eae --- /dev/null +++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/ccvpn/FlexibleThresholdStrategy.java @@ -0,0 +1,37 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * slice-analysis-ms + * ================================================================================ + * Copyright (C) 2022 Huawei Canada Limited. + * ============================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + *******************************************************************************/ +package org.onap.slice.analysis.ms.service.ccvpn; + +import org.springframework.stereotype.Component; + +@Component +public class FlexibleThresholdStrategy implements EvaluationStrategy { + private static final String TYPE_NAME = "FlexibleThresholdStrategy"; + @Override + public void execute(Event event) { + return; + } + + @Override + public String getName() { + return TYPE_NAME; + } +} diff --git a/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/ccvpn/StrategyFactory.java b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/ccvpn/StrategyFactory.java new file mode 100644 index 00000000..824731fc --- /dev/null +++ b/components/slice-analysis-ms/src/main/java/org/onap/slice/analysis/ms/service/ccvpn/StrategyFactory.java @@ -0,0 +1,60 @@ +/******************************************************************************* + * ============LICENSE_START======================================================= + * slice-analysis-ms + * ================================================================================ + * Copyright (C) 2022 Huawei Canada Limited. + * ============================================================================== + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ============LICENSE_END========================================================= + * + *******************************************************************************/ +package org.onap.slice.analysis.ms.service.ccvpn; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import javax.annotation.Resource; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@Service +public class StrategyFactory { + private static Logger log = LoggerFactory.getLogger(StrategyFactory.class); + + @Autowired + List<EvaluationStrategy> strategies; + + private StrategyFactory() {} + + /** + * Get evulation strategy by name + * @param name evaluationStrategy name + * @return EvaluationStrategy + */ + public EvaluationStrategy getStrategy(String name){ + if (null == name || name.isEmpty()){ + log.error("Empty strategy name provided in config file"); + throw new IllegalArgumentException("Unknown strategy name: " + name); + } + for(EvaluationStrategy s: strategies){ + if(s.getName().equals(name)){ + return s; + } + } + log.error("Unknown strategy name: {}", name); + throw new IllegalArgumentException("Unknown strategy name: " + name); + } +} |