diff options
author | fujinhua <fu.jinhua@zte.com.cn> | 2019-04-01 16:46:14 +0800 |
---|---|---|
committer | fujinhua <fu.jinhua@zte.com.cn> | 2019-04-01 17:06:46 +0800 |
commit | a9e5551743e34479d440a421abf6ffa43e36836c (patch) | |
tree | 369a5b7f4911c9fbce5afe264333dbead85c8dba /lcm | |
parent | b1a39e73549949f4b571f41a1ae030707e5af579 (diff) |
Add scale vnf biz logic
Change-Id: I427acdb2af08fa4253938304cc803da45900ddff
Issue-ID: VFC-1306
Signed-off-by: fujinhua <fu.jinhua@zte.com.cn>
Diffstat (limited to 'lcm')
-rw-r--r-- | lcm/lcm/nf/biz/scale_vnf.py | 275 | ||||
-rw-r--r-- | lcm/lcm/nf/const.py | 27 | ||||
-rw-r--r-- | lcm/lcm/nf/views/scale_vnf_view.py | 5 | ||||
-rw-r--r-- | lcm/lcm/pub/utils/notificationsutil.py | 175 |
4 files changed, 479 insertions, 3 deletions
diff --git a/lcm/lcm/nf/biz/scale_vnf.py b/lcm/lcm/nf/biz/scale_vnf.py new file mode 100644 index 00000000..935fcefa --- /dev/null +++ b/lcm/lcm/nf/biz/scale_vnf.py @@ -0,0 +1,275 @@ +# Copyright (C) 2018 Verizon. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import json +import logging +import uuid +import traceback +from threading import Thread + +from lcm.nf.biz import common +from lcm.nf.biz.grant_vnf import grant_resource +from lcm.nf.const import VNF_STATUS, GRANT_TYPE, CHANGE_TYPE +from lcm.nf.const import RESOURCE_MAP, OPERATION_STATE_TYPE +from lcm.nf.const import INSTANTIATION_STATE +from lcm.pub.database.models import NfInstModel +from lcm.pub.database.models import VNFCInstModel, PortInstModel +from lcm.pub.database.models import VmInstModel +from lcm.pub.exceptions import NFLCMException +from lcm.pub.utils.notificationsutil import NotificationsUtil, prepare_notification +from lcm.pub.utils.values import ignore_case_get +from lcm.pub.utils.jobutil import JobUtil +from lcm.pub.utils.timeutil import now_time +from lcm.pub.vimapi import adaptor + +logger = logging.getLogger(__name__) + +DEFAULT_STEPS = 1 + + +class ScaleVnf(Thread): + def __init__(self, data, nf_inst_id, job_id): + super(ScaleVnf, self).__init__() + self.data = data + self.nf_inst_id = nf_inst_id + self.job_id = job_id + self.vnf_insts = NfInstModel.objects.filter(nfinstid=self.nf_inst_id) + + def run(self): + try: + self.heal_pre() + self.apply_grant() + self.do_operation() + JobUtil.add_job_status(self.job_id, + 100, + "Scale Vnf success.") + self.vnf_insts.update(status=INSTANTIATION_STATE.INSTANTIATED, + lastuptime=now_time()) + except NFLCMException as e: + logger.error(e.message) + self.vnf_scale_failed_handle(e.message) + except Exception as e: + logger.error(e.message) + logger.error(traceback.format_exc()) + self.vnf_scale_failed_handle(e.message) + + def scale_pre(self): + self.scale_type = self.data.get("type") + self.aspect_id = self.data.get("aspectId") + self.number_of_steps = int(self.data.get("numberOfSteps", DEFAULT_STEPS)) + self.additional_params = self.data.get("additionalParams", {}) + self.is_scale_in = (self.scale_type == GRANT_TYPE.SCALE_IN) + self.vnfd_info = json.loads(self.vnf_insts[0].vnfd_model) + self.step_delta = self.get_scale_step_delta() + self.target_vdu, self.step_inst_num = self.get_vdu_scale_aspect_deltas() + self.scale_inst_num = self.number_of_steps * self.step_inst_num + self.min_instance_num, self.max_instance_num = self.get_instance_range() + self.check_if_can_scale() + self.scale_out_resource = {} + + def apply_grant(self): + logger.debug("Start scale apply grant") + vdus = ignore_case_get(self.vnfd_info, "vdus") + scale_vdus = [vdu for vdu in vdus if vdu["vdu_id"] == self.target_vdu] + scale_vdus = scale_vdus * self.scale_inst_num + grant_result = grant_resource(data=self.data, + nf_inst_id=self.nf_inst_id, + job_id=self.job_id, + grant_type=self.scale_type, + vdus=scale_vdus) + logger.debug("Scale Grant result: %s", grant_result) + self.set_location(grant_result) + + def do_operation(self): + logger.debug("Start %s VNF resource", self.scale_type) + logger.debug('VnfdInfo = %s' % self.vnfd_info) + + if self.is_scale_in: + self.affected_vnfcs = [] + self.scale_in_resource = self.get_scale_in_resource(self.affected_vnfcs) + adaptor.delete_vim_res(self.scale_in_resource, self.do_notify_del_vim_res) + else: + self.scale_out_resource = { + 'volumn': [], + 'network': [], + 'subnet': [], + 'port': [], + 'flavor': [], + 'vm': [] + } + self.vnfd_info["volume_storages"] = [] + self.vnfd_info["vls"] = [] + self.vnfd_info["cps"] = self.vnfd_info["cps"] * self.scale_inst_num + for cp in self.vnfd_info["cps"]: + # TODO: how to set name for scale_out cp + cp["properties"]["name"] = cp["cp_id"] + str(uuid.uuid4()) + cp_inst = PortInstModel.objects.filter(name__startswith=cp["cp_id"]).first() + if cp_inst: + cp["networkId"] = cp_inst.networkid + cp["subnetId"] = cp_inst.subnetworkid + else: + raise NFLCMException("CP(%s) does not exist" % cp["cp_id"]) + self.vnfd_info["vdus"] = self.vnfd_info["vdus"] * self.scale_inst_num + for vdu in self.vnfd_info["vdus"]: + # TODO: how to set name for scale_out vdu + vdu["properties"]["name"] = vdu["properties"]["name"] + str(uuid.uuid4()) + + vim_cache = json.loads(self.vnf_insts[0].vimInfo) + res_cache = json.loads(self.vnf_insts[0].resInfo) + adaptor.create_vim_res(self.vnfd_info, + self.do_notify_create_vim_res, + vim_cache=vim_cache, + res_cache=res_cache) + self.vnf_insts.update(vimInfo=json.dumps(vim_cache), + resInfo=json.dumps(res_cache)) + logger.debug("%s VNF resource finish", self.scale_type) + + def send_notification(self): + data = prepare_notification(nfinstid=self.nf_inst_id, + jobid=self.job_id, + operation=self.op_type, + operation_state=OPERATION_STATE_TYPE.COMPLETED) + + # TODO: need set changedExtConnectivity for data + if self.is_scale_in: + data["affectedVnfcs"] = self.affected_vnfcs + else: + for vm in self.scale_out_resource["vm"]: + self.set_affected_vnfcs(data["affectedVnfcs"], vm["res_id"]) + + logger.debug('Notify request data = %s' % data) + NotificationsUtil().send_notification(data) + + def rollback_operation(self): + if self.is_scale_in: + # SCALE_IN operaion does not support rollback + return + adaptor.delete_vim_res(self.scale_out_resource, self.do_notify_del_vim_res) + + def set_location(self, grant_result): + vim_connections = ignore_case_get(grant_result, "vimConnections") + access_info = ignore_case_get(vim_connections[0], "accessInfo") + tenant = ignore_case_get(access_info, "tenant") + vimid = ignore_case_get(vim_connections[0], "vimId") + + for resource_type in ['vdus', 'vls', 'cps', 'volume_storages']: + for resource in ignore_case_get(self.vnfd_info, resource_type): + if "location_info" not in resource["properties"]: + resource["properties"]["location_info"] = {} + resource["properties"]["location_info"]["vimid"] = vimid + resource["properties"]["location_info"]["tenant"] = tenant + + def do_notify_create_vim_res(self, res_type, ret): + logger.debug('Scaling out [%s] resource' % res_type) + resource_save_method = getattr(common, res_type + '_save') + resource_save_method(self.job_id, self.nf_inst_id, ret) + self.scale_out_resource[res_type].append(self.gen_del_resource(ret)) + + def do_notify_del_vim_res(self, res_type, res_id): + logger.debug('Scaling in [%s] resource, resourceid [%s]', res_type, res_id) + resource_type = RESOURCE_MAP.keys()[RESOURCE_MAP.values().index(res_type)] + resource_table = globals().get(resource_type + 'InstModel') + resource_table.objects.filter(instid=self.nf_inst_id, resourceid=res_id).delete() + if res_type == "vm": + VNFCInstModel.objects.filter(instid=self.nf_inst_id, vmid=res_id).delete() + + def get_scale_in_resource(self, affected_vnfcs): + scale_in_resource = { + 'volumn': [], + 'network': [], + 'subnet': [], + 'port': [], + 'flavor': [], + 'vm': [] + } + scale_in_vms = VmInstModel.objects.filter(instid=self.nf_inst_id) + vms_count = scale_in_vms.count() + for index in range(self.scale_inst_num): + vm_index = vms_count - index - 1 + scale_in_resource["vm"].append(self.gen_del_resource(scale_in_vms[vm_index])) + self.set_affected_vnfcs(affected_vnfcs, scale_in_vms[vm_index].resourceid) + return scale_in_resource + + def gen_del_resource(self, res): + is_dict = isinstance(res, dict) + return { + "vim_id": res["vimId"] if is_dict else res.vimid, + "tenant_id": res["tenantId"] if is_dict else res.tenant, + "res_id": res["id"] if is_dict else res.resourceid, + "is_predefined": res["returnCode"] if is_dict else res.is_predefined + } + + def get_scale_step_delta(self): + for policy in self.vnfd_info.get("policies", []): + if policy.get("type") != "tosca.policies.nfv.ScalingAspects": + continue + aspects = policy["properties"]["aspects"] + if self.aspect_id in aspects: + return aspects.get(self.aspect_id).get("step_deltas")[0] + raise NFLCMException("Aspect(%s) does not exist" % self.aspect_id) + + def get_vdu_scale_aspect_deltas(self): + for policy in self.vnfd_info.get("policies", []): + if policy.get("type") != "tosca.policies.nfv.VduScalingAspectDeltas": + continue + target = policy.get("targets")[0] + deltas = policy["properties"]["deltas"] + if self.step_delta in deltas: + num = int(deltas.get(self.step_delta).get("number_of_instances")) + return target, num + raise NFLCMException("Aspect step delta(%s) does not exist" % self.step_delta) + + def get_instance_range(self): + for vdu in self.vnfd_info["vdus"]: + if vdu["vdu_id"] == self.target_vdu: + vdu_profile = vdu["properties"]["vdu_profile"] + min_inst_num = int(vdu_profile["min_number_of_instances"]) + max_inst_num = int(vdu_profile["max_number_of_instances"]) + return min_inst_num, max_inst_num + raise NFLCMException("VDU(%s) does not exist" % self.target_vdu) + + def check_if_can_scale(self): + cur_inst_num = VNFCInstModel.objects.filter(instid=self.nf_inst_id).count() + if self.is_scale_in: + if cur_inst_num - self.scale_inst_num < self.min_instance_num: + msg = "VNF(%s) cannot be scaled: less than min instance." + raise NFLCMException(msg % self.nf_inst_id) + else: + if cur_inst_num + self.scale_inst_num > self.max_instance_num: + msg = "VNF(%s) cannot be scaled: max instance exceeded." + raise NFLCMException(msg % self.nf_inst_id) + + def set_affected_vnfcs(self, affected_vnfcs, vm_id): + chgtype = CHANGE_TYPE.REMOVED if self.is_scale_in else CHANGE_TYPE.ADDED + vnfcs = VNFCInstModel.objects.filter(instid=self.nf_inst_id, vmid=vm_id) + vm = VmInstModel.objects.filter(instid=self.nf_inst_id, resourceid=vm_id) + vm_resource = {} + if vm: + vm_resource = { + 'vimConnectionId': vm[0].vimid, + 'resourceId': vm[0].resourceid, + 'vimLevelResourceType': 'vm' + } + if vnfcs: + affected_vnfcs.append({ + 'id': vnfcs[0].vnfcinstanceid, + 'vduId': vnfcs[0].vduid, + 'changeType': chgtype, + 'computeResource': vm_resource + }) + + def vnf_scale_failed_handle(self, error_msg): + logger.error('VNF scaling failed, detail message: %s', error_msg) + self.vnf_insts.update(status=VNF_STATUS.FAILED, + lastuptime=now_time()) + JobUtil.add_job_status(self.job_id, 255, error_msg) diff --git a/lcm/lcm/nf/const.py b/lcm/lcm/nf/const.py index c11c0996..f76a390f 100644 --- a/lcm/lcm/nf/const.py +++ b/lcm/lcm/nf/const.py @@ -13,6 +13,7 @@ # limitations under the License. import json +from lcm.pub.config import config from lcm.pub.utils.jobutil import enum HEAL_ACTION_TYPE = enum(START="vmCreate", RESTART="vmReset") @@ -34,6 +35,11 @@ OPERATION_TYPE = enum( MODIFY_INFO="MODIFY_INFO" ) +INSTANTIATION_STATE = enum( + NOT_INSTANTIATED="NOT_INSTANTIATED", + INSTANTIATED="INSTANTIATED" +) + LCM_NOTIFICATION_STATUS = enum(START="START", RESULT="RESULT") OPERATION_STATE_TYPE = enum( @@ -46,6 +52,25 @@ OPERATION_STATE_TYPE = enum( ROLLED_BACK="ROLLED_BACK" ) +RESULT_RANGE = ( + OPERATION_STATE_TYPE.COMPLETED, + OPERATION_STATE_TYPE.FAILED_TEMP, + OPERATION_STATE_TYPE.FAILED, + OPERATION_STATE_TYPE.ROLLED_BACK +) + +RUNNING_RANGE = ( + OPERATION_STATE_TYPE.STARTING, + OPERATION_STATE_TYPE.PROCESSING, + OPERATION_STATE_TYPE.ROLLING_BACK +) + +FINAL_STATE_RANGE = ( + OPERATION_STATE_TYPE.COMPLETED, + OPERATION_STATE_TYPE.FAILED, + OPERATION_STATE_TYPE.ROLLED_BACK +) + CHANGE_TYPE = enum( ADDED='ADDED', REMOVED='REMOVED', @@ -58,6 +83,8 @@ CHANGE_TYPE = enum( RESOURCE_MAP = {'Storage': 'volumn', 'Network': 'network', 'SubNetwork': 'subnet', 'Port': 'port', 'Flavour': 'flavor', 'Vm': 'vm'} +URL_PREFIX = "http://%s:%s/api/vnflcm/v1" % (config.MSB_SERVICE_IP, config.MSB_SERVICE_PORT) + ROOT_URI = "api/vnflcm/v1/subscriptions/" AUTH_TYPES = ["BASIC", "OAUTH2_CLIENT_CREDENTIALS", "TLS_CERT"] diff --git a/lcm/lcm/nf/views/scale_vnf_view.py b/lcm/lcm/nf/views/scale_vnf_view.py index 20ef95d0..f8778a7d 100644 --- a/lcm/lcm/nf/views/scale_vnf_view.py +++ b/lcm/lcm/nf/views/scale_vnf_view.py @@ -26,6 +26,7 @@ from lcm.pub.exceptions import NFLCMException, NFLCMExceptionNotFound, NFLCMExce from lcm.pub.utils.jobutil import JobUtil from lcm.pub.database.models import NfInstModel from lcm.nf.const import VNF_STATUS +from lcm.nf.biz.scale_vnf import ScaleVnf logger = logging.getLogger(__name__) @@ -50,7 +51,9 @@ class ScaleVnfView(APIView): job_id = JobUtil.create_job('NF', 'SCALE', instanceid) JobUtil.add_job_status(job_id, 0, "SCALE_VNF_READY") self.scale_pre_check(instanceid, job_id) - # TODO: do scale logic + + ScaleVnf(scale_vnf_request_serializer.data, instanceid, job_id).start() + response = Response(data={"jobId": job_id}, status=status.HTTP_202_ACCEPTED) return response except NFLCMExceptionNotFound as e: diff --git a/lcm/lcm/pub/utils/notificationsutil.py b/lcm/lcm/pub/utils/notificationsutil.py index d8e8b640..444e8071 100644 --- a/lcm/lcm/pub/utils/notificationsutil.py +++ b/lcm/lcm/pub/utils/notificationsutil.py @@ -14,16 +14,27 @@ import json import logging -import requests +import uuid -from rest_framework import status +import requests from requests.auth import HTTPBasicAuth +from rest_framework import status from lcm.nf import const from lcm.pub.database.models import SubscriptionModel +from lcm.pub.database.models import ( + VmInstModel, NetworkInstModel, + PortInstModel, StorageInstModel, VNFCInstModel +) +from lcm.pub.utils.timeutil import now_time +from lcm.pub.utils.enumutil import enum logger = logging.getLogger(__name__) +NOTIFY_TYPE = enum(lCM_OP_OCC="VnfLcmOperationOccurrenceNotification", + CREATION="VnfIdentifierCreationNotification", + DELETION="VnfIdentifierDeletionNotification") + class NotificationsUtil(object): def __init__(self): @@ -64,3 +75,163 @@ class NotificationsUtil(object): if resp.status_code != status.HTTP_204_NO_CONTENT: raise Exception("Unable to send the notification to %s, due to %s" % (callbackUri, resp.text)) return + + +def set_affected_vnfcs(affected_vnfcs, nfinstid, changetype): + vnfcs = VNFCInstModel.objects.filter(instid=nfinstid) + for vnfc in vnfcs: + vm_resource = {} + if vnfc.vmid: + vm = VmInstModel.objects.filter(vmid=vnfc.vmid) + if vm: + vm_resource = { + 'vimConnectionId': vm[0].vimid, + 'resourceId': vm[0].resourceid, + 'resourceProviderId': vm[0].vmname, # TODO: is resourceName mapped to resourceProviderId? + 'vimLevelResourceType': 'vm' + } + affected_vnfcs.append({ + 'id': vnfc.vnfcinstanceid, + 'vduId': vnfc.vduid, + 'changeType': changetype, + 'computeResource': vm_resource + }) + logger.debug("affected_vnfcs=%s", affected_vnfcs) + return affected_vnfcs + + +def set_affected_vls(affected_vls, nfinstid, changetype): + networks = NetworkInstModel.objects.filter(instid=nfinstid) + for network in networks: + network_resource = { + 'vimConnectionId': network.vimid, + 'resourceId': network.resourceid, + 'resourceProviderId': network.name, # TODO: is resourceName mapped to resourceProviderId? + 'vimLevelResourceType': 'network' + } + affected_vls.append({ + 'id': network.networkid, + 'virtualLinkDescId': network.nodeId, + 'changeType': changetype, + 'networkResource': network_resource + }) + logger.debug("affected_vls=%s", affected_vls) + + +def set_ext_connectivity(ext_connectivity, nfinstid): + ext_connectivity_map = {} + ports = PortInstModel.objects.filter(instid=nfinstid) + for port in ports: + if port.networkid not in ext_connectivity_map: + ext_connectivity_map[port.networkid] = [] + ext_connectivity_map[port.networkid].append({ + 'id': port.portid, # TODO: port.portid or port.nodeid? + 'resourceHandle': { + 'vimConnectionId': port.vimid, + 'resourceId': port.resourceid, + 'resourceProviderId': port.name, # TODO: is resourceName mapped to resourceProviderId? + 'vimLevelResourceType': 'port' + }, + 'cpInstanceId': port.portid # TODO: port.cpinstanceid is not initiated when create port resource. + }) + for network_id, ext_link_ports in ext_connectivity_map.items(): + networks = NetworkInstModel.objects.filter(networkid=network_id) + net_name = networks[0].name if networks else network_id + network_resource = { + 'vimConnectionId': ext_link_ports[0]['resourceHandle']['vimConnectionId'], + 'resourceId': network_id, + 'resourceProviderId': net_name, # TODO: is resourceName mapped to resourceProviderId? + 'vimLevelResourceType': 'network' + } + ext_connectivity.append({ + 'id': network_id, + 'resourceHandle': network_resource, + 'extLinkPorts': ext_link_ports + }) + logger.debug("ext_connectivity=%s", ext_connectivity) + + +def set_affected_vss(affected_vss, nfinstid, changetype): + vss = StorageInstModel.objects.filter(instid=nfinstid) + for vs in vss: + affected_vss.append({ + 'id': vs.storageid, + 'virtualStorageDescId': vs.nodeId, + 'changeType': changetype, + 'storageResource': { + 'vimConnectionId': vs.vimid, + 'resourceId': vs.resourceid, + 'resourceProviderId': vs.name, # TODO: is resourceName mapped to resourceProviderId? + 'vimLevelResourceType': 'volume' + } + }) + logger.debug("affected_vss=%s", affected_vss) + + +def get_notification_status(operation_state): + notification_status = const.LCM_NOTIFICATION_STATUS.START + if operation_state in const.RESULT_RANGE: + notification_status = const.LCM_NOTIFICATION_STATUS.RESULT + return notification_status + + +def prepare_notification(nfinstid, jobid, operation, operation_state): + logger.info('Start to prepare notification') + notification_content = { + 'id': str(uuid.uuid4()), # shall be the same if sent multiple times due to multiple subscriptions. + 'notificationType': NOTIFY_TYPE.lCM_OP_OCC, + # set 'subscriptionId' after filtering for subscribers + 'timeStamp': now_time(), + 'notificationStatus': get_notification_status(operation_state), + 'operationState': operation_state, + 'vnfInstanceId': nfinstid, + 'operation': operation, + 'isAutomaticInvocation': False, + 'vnfLcmOpOccId': jobid, + 'affectedVnfcs': [], + 'affectedVirtualLinks': [], + 'affectedVirtualStorages': [], + 'changedExtConnectivity': [], + 'error': '', + '_links': { + 'vnfInstance': { + 'href': '%s/vnf_instances/%s' % (const.URL_PREFIX, nfinstid) + }, + 'vnfLcmOpOcc': { + 'href': '%s/vnf_lcm_op_occs/%s' % (const.URL_PREFIX, jobid) + } + } + } + return notification_content + + +def prepare_notification_data(nfinstid, jobid, changetype, operation): + data = prepare_notification(nfinstid=nfinstid, + jobid=jobid, + operation=operation, + operation_state=const.OPERATION_STATE_TYPE.COMPLETED) + + set_affected_vnfcs(data['affectedVnfcs'], nfinstid, changetype) + set_affected_vls(data['affectedVirtualLinks'], nfinstid, changetype) + set_affected_vss(data['affectedVirtualStorages'], nfinstid, changetype) + set_ext_connectivity(data['changedExtConnectivity'], nfinstid) + + logger.debug('Notification content: %s' % data) + return data + + +def prepare_vnf_identifier_notification(notify_type, nfinstid): + data = { + "id": str(uuid.uuid4()), # shall be the same if sent multiple times due to multiple subscriptions. + "notificationType": notify_type, + "timeStamp": now_time(), + "vnfInstanceId": nfinstid, + "_links": { + "vnfInstance": { + 'href': '%s/vnf_instances/%s' % (const.URL_PREFIX, nfinstid) + } + } + } + + logger.debug('Vnf Identifier Notification: %s' % data) + return data |