diff options
25 files changed, 953 insertions, 61 deletions
diff --git a/docs/release-notes.rst b/docs/release-notes.rst index 0334a488..75cde295 100644 --- a/docs/release-notes.rst +++ b/docs/release-notes.rst @@ -51,9 +51,9 @@ This is the initial release **Known Issues** - - VFC-896 vim-id in AAI is handled as a mandatory parameter - - VFC-890 The hard coded SDC user and password in catalog & LCM is not present in SDC - - VFC-891 The AAI credentials is hard coded in LCM + - `VFC-896 <https://jira.onap.org/browse/VFC-896>`_ vim-id in AAI is handled as a mandatory parameter + - `VFC-890 <https://jira.onap.org/browse/VFC-890>`_ The hard coded SDC user and password in catalog & LCM is not present in SDC + - `VFC-891 <https://jira.onap.org/browse/VFC-891>`_ The AAI credentials is hard coded in LCM **Security Notes** @@ -86,7 +86,7 @@ Version: 1.0.0 **New Features** - NS lifecycle management, including NS instance creation, termination and healing -- VNF lifecycle management, including VNF nstance creation, termination and healing +- VNF lifecycle management, including VNF instance creation, termination and healing - VNF FCAPS, collecting FCAPS data from vendor EMS - VNFM Integration, integration with specific VNFMs of vendors to deploy commercial VNFs - VNF Integration, integration with VNF via GVNFM diff --git a/lcm/ns/biz/ns_instant.py b/lcm/ns/biz/ns_instant.py index 527a35ee..52f686be 100644 --- a/lcm/ns/biz/ns_instant.py +++ b/lcm/ns/biz/ns_instant.py @@ -20,7 +20,7 @@ from threading import Thread from rest_framework import status -from lcm.pub.config.config import WORKFLOW_OPTION +from lcm.pub.config import config from lcm.pub.database.models import DefPkgMappingModel, ServiceBaseInfoModel, InputParamMappingModel from lcm.pub.database.models import NSInstModel, VNFFGInstModel, WFPlanModel from lcm.pub.exceptions import NSLCMException @@ -28,11 +28,12 @@ from lcm.pub.msapi import activiti from lcm.pub.msapi import sdc_run_catalog from lcm.pub.msapi.catalog import get_process_id from lcm.pub.msapi.catalog import get_servicetemplate_id, get_servicetemplate -from lcm.pub.msapi.extsys import select_vnfm +from lcm.pub.msapi import extsys from lcm.pub.msapi.wso2bpel import workflow_run from lcm.pub.utils.jobutil import JobUtil from lcm.pub.utils.values import ignore_case_get from lcm.workflows import build_in +from lcm.ns.biz.ns_instantiate_flow import run_ns_instantiate logger = logging.getLogger(__name__) @@ -72,18 +73,18 @@ class InstantNSService(object): JobUtil.add_job_status(job_id, 5, 'Start query nsd(%s)' % ns_inst.nspackage_id) dst_plan = sdc_run_catalog.parse_nsd(ns_inst.nspackage_id, input_parameters) logger.debug('tosca plan dest: %s' % dst_plan) - + logger.debug('Start query nsd(%s)' % ns_inst.nspackage_id) NSInstModel.objects.filter(id=self.ns_inst_id).update(nsd_model=dst_plan) params_json = json.JSONEncoder().encode(self.req_data["additionalParamForNs"]) - # start params_vnf = [] plan_dict = json.JSONDecoder().decode(dst_plan) - for vnf in ignore_case_get(plan_dict, "ns_vnfs"): + for vnf in ignore_case_get(plan_dict, "vnfs"): vnfd_id = vnf['properties']['id'] vnfm_type = vnf['properties'].get("nf_type", "undefined") vimid = self.get_vnf_vim_id(vim_id, location_constraints, vnfd_id) - vnfm_info = select_vnfm(vnfm_type=vnfm_type, vim_id=vimid) + vnfm_info = extsys.select_vnfm(vnfm_type=vnfm_type, vim_id=vimid) + params_vnf.append({ "vnfProfileId": vnf["vnf_id"], "additionalParam": { @@ -94,20 +95,22 @@ class InstantNSService(object): "inputs": params_json } }) - # end self.set_vl_vim_id(vim_id, location_constraints, plan_dict) dst_plan = json.JSONEncoder().encode(plan_dict) logger.debug('tosca plan dest add vimid:%s' % dst_plan) NSInstModel.objects.filter(id=self.ns_inst_id).update(nsd_model=dst_plan) + pnf_params_json = json.JSONEncoder().encode(self.init_pnf_para(plan_dict)) + vnf_params_json = json.JSONEncoder().encode(params_vnf) plan_input = { 'jobId': job_id, 'nsInstanceId': self.ns_inst_id, 'object_context': dst_plan, 'object_additionalParamForNs': params_json, - 'object_additionalParamForVnf': vnf_params_json + 'object_additionalParamForVnf': vnf_params_json, + 'object_additionalParamForPnf': pnf_params_json } plan_input.update(**self.get_model_count(dst_plan)) plan_input["sdnControllerId"] = ignore_case_get( @@ -122,7 +125,7 @@ class InstantNSService(object): creator='--', create_time=int(time.time() * 1000)).save() - if WORKFLOW_OPTION == "wso2": + if config.WORKFLOW_OPTION == "wso2": service_tpl = get_servicetemplate(ns_inst.nsd_id) DefPkgMappingModel(service_id=self.ns_inst_id, service_def_id=service_tpl['csarId'], @@ -142,18 +145,20 @@ class InstantNSService(object): else: # TODO: pass - - if WORKFLOW_OPTION == "wso2": + logger.debug("workflow option: %s" % config.WORKFLOW_OPTION) + if config.WORKFLOW_OPTION == "wso2": return self.start_wso2_workflow(job_id, ns_inst, plan_input) - elif WORKFLOW_OPTION == "activiti": + elif config.WORKFLOW_OPTION == "activiti": return self.start_activiti_workflow() + elif config.WORKFLOW_OPTION == "grapflow": + return self.start_buildin_grapflow(job_id, plan_input) else: return self.start_buildin_workflow(job_id, plan_input) except Exception as e: logger.error(traceback.format_exc()) logger.error("ns-instant(%s) workflow error:%s" % (self.ns_inst_id, e.message)) - JobUtil.add_job_status(job_id, 255, 'NS instantiation failed: %s' % e.message) + JobUtil.add_job_status(job_id, 255, 'NS instantiation failed') return dict(data={'error': e.message}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) def start_wso2_workflow(self, job_id, ns_inst, plan_input): @@ -191,6 +196,11 @@ class InstantNSService(object): BuildInWorkflowThread(plan_input).start() return dict(data={'jobId': job_id}, status=status.HTTP_200_OK) + def start_buildin_grapflow(self, job_id, plan_input): + JobUtil.add_job_status(job_id, 10, 'NS inst(%s) buildin grap workflow started.' % self.ns_inst_id) + run_ns_instantiate(plan_input) + return dict(data={'jobId': job_id}, status=status.HTTP_200_OK) + @staticmethod def get_vnf_vim_id(vim_id, location_constraints, vnfdid): for location in location_constraints: @@ -202,11 +212,11 @@ class InstantNSService(object): @staticmethod def set_vl_vim_id(vim_id, location_constraints, plan_dict): - if "ns_vls" not in plan_dict: + if "vls" not in plan_dict: logger.debug("No vl is found in nsd.") return vl_vnf = {} - for vnf in ignore_case_get(plan_dict, "ns_vnfs"): + for vnf in ignore_case_get(plan_dict, "vnfs"): if "dependencies" in vnf: for depend in vnf["dependencies"]: vl_vnf[depend["vl_id"]] = vnf['properties']['id'] @@ -215,7 +225,7 @@ class InstantNSService(object): if "vnfProfileId" in location: vnfd_id = location["vnfProfileId"] vnf_vim[vnfd_id] = location["locationConstraints"]["vimId"] - for vl in plan_dict["ns_vls"]: + for vl in plan_dict["vls"]: vnfdid = ignore_case_get(vl_vnf, vl["vl_id"]) vimid = ignore_case_get(vnf_vim, vnfdid) if not vimid: @@ -229,7 +239,28 @@ class InstantNSService(object): @staticmethod def get_model_count(context): data = json.JSONDecoder().decode(context) - vls = len(data.get('ns_vls', [])) + vls = len(data.get('vls', [])) sfcs = len(data.get('fps', [])) - vnfs = len(data.get('ns_vnfs', [])) - return {'vlCount': str(vls), 'sfcCount': str(sfcs), 'vnfCount': str(vnfs)} + vnfs = len(data.get('vnfs', [])) + pnfs = len(data.get('pnfs', [])) + return {'vlCount': str(vls), 'sfcCount': str(sfcs), 'vnfCount': str(vnfs), 'pnfCount': str(pnfs)} + + def init_pnf_para(self, plan_dict): + pnfs_in_input = ignore_case_get(self.req_data, "addpnfData") + pnfs_in_nsd = ignore_case_get(plan_dict, "pnfs") + logger.debug("addpnfData ; %s" % pnfs_in_input) + logger.debug("pnfs_in_nsd ; %s" % pnfs_in_nsd) + pnfs = {} + for pnf in pnfs_in_input: + for pnfd in pnfs_in_nsd: + if pnfd["properties"]["descriptor_id"] == pnf["pnfdId"]: + k = pnfd["pnf_id"] + pnf["nsInstances"] = self.ns_inst_id + # todo pnf["pnfdInfoId"] + pnfs[k] = { + "type": "CreatePnf", + "input": { + "content": pnf + } + } + return pnfs diff --git a/lcm/ns/biz/ns_instantiate_flow.py b/lcm/ns/biz/ns_instantiate_flow.py new file mode 100644 index 00000000..88b04fe7 --- /dev/null +++ b/lcm/ns/biz/ns_instantiate_flow.py @@ -0,0 +1,179 @@ +# Copyright 2018 ZTE Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import logging +import traceback +from threading import Thread + +from lcm.pub.utils.syscomm import fun_name +from lcm.pub.utils.values import ignore_case_get +from lcm.pub.utils import restcall +from lcm.pub.exceptions import NSLCMException +from lcm.workflows.graphflow.flow.flow import GraphFlow + +logger = logging.getLogger(__name__) + +RESULT_OK, RESULT_NG = "0", "1" +JOB_ERROR = 255 + +config = { + "CreateVnf": {"module": "lcm.ns_vnfs", "class": "CreateVnf"}, + "CreatePnf": {"module": "lcm.ns_pnfs", "class": "CreatePnf"}, + "CreateVl": {"module": "lcm.ns_vls", "class": "CreateVl"} +} + + +class NsInstantiateWorkflowThread(Thread): + def __init__(self, plan_input): + Thread.__init__(self) + self.plan_input = plan_input + + def run(self): + run_ns_instantiate(self.plan_input) + + +def run_ns_instantiate(input_data): + """ + format of input_data + { + "jobId": uuid of job, + "nsInstanceId": id of ns instance, + "object_context": json format of nsd, + "object_additionalParamForNs": json format of additional parameters for ns, + "object_additionalParamForVnf": json format of additional parameters for vnf, + "object_additionalParamForPnf": json format of additional parameters for pnf, + "vlCount": int type of VL count, + "vnfCount: int type of VNF count + } + """ + logger.debug("Enter %s, input_data is %s", fun_name(), input_data) + ns_inst_id = ignore_case_get(input_data, "nsInstanceId") + job_id = ignore_case_get(input_data, "jobId") + update_job(job_id, 10, "true", "Start to prepare the NS instantiate workflow parameter") + deploy_graph = build_deploy_graph(input_data) + TaskSet = build_TaskSet(input_data) + ns_instantiate_ok = False + + try: + update_job(job_id, 15, "true", "Start the NS instantiate workflow") + gf = GraphFlow(deploy_graph, TaskSet, config) + logger.debug("NS graph flow run up!") + gf.start() + gf.join() + gf.task_manager.wait_tasks_done(gf.sort_nodes) + if gf.task_manager.is_all_task_finished(): + logger.debug("NS is instantiated!") + update_job(job_id, 90, "true", "Start to post deal") + post_deal(ns_inst_id, "true") + update_job(job_id, 100, "true", "Create NS successfully.") + ns_instantiate_ok = True + except NSLCMException as e: + logger.error("Failded to Create NS: %s", e.message) + update_job(job_id, JOB_ERROR, "255", "Failded to Create NS.") + post_deal(ns_inst_id, "false") + except: + logger.error(traceback.format_exc()) + update_job(job_id, JOB_ERROR, "255", "Failded to Create NS.") + post_deal(ns_inst_id, "false") + return ns_instantiate_ok + + +def build_deploy_graph(input_data): + nsd_json_str = ignore_case_get(input_data, "object_context") + nsd_json = json.JSONDecoder().decode(nsd_json_str) + deploy_graph = ignore_case_get(nsd_json, "graph") + logger.debug("NS graph flow: %s" % deploy_graph) + return deploy_graph + + +def build_vls(input_data): + ns_inst_id = ignore_case_get(input_data, "nsInstanceId") + nsd_json = json.JSONDecoder().decode(ignore_case_get(input_data, "object_context")) + ns_param_json = ignore_case_get(input_data, "object_additionalParamForNs") + vl_count = int(ignore_case_get(input_data, "vlCount", 0)) + + vls = {} + for i in range(vl_count): + data = { + "nsInstanceId": ns_inst_id, + "vlIndex": i, + "context": nsd_json, + "additionalParamForNs": ns_param_json + } + key = nsd_json["vls"][i - 1]["vl_id"] + vls[key] = { + "type": "CreateVl", + "input": { + "content": data + } + } + return vls + + +def build_vnfs(input_data): + ns_inst_id = ignore_case_get(input_data, "nsInstanceId") + vnf_count = int(ignore_case_get(input_data, "vnfCount", 0)) + vnf_param_json = json.JSONDecoder().decode(ignore_case_get(input_data, "object_additionalParamForVnf")) + vnfs = {} + for i in range(vnf_count): + data = { + "nsInstanceId": ns_inst_id, + "vnfIndex": i, + "additionalParamForVnf": vnf_param_json + } + key = vnf_param_json[i - 1]["vnfProfileId"] + vnfs[key] = { + "type": "CreateVnf", + "input": { + "content": data + } + } + return vnfs + + +def build_pnfs(input_data): + return json.JSONDecoder().decode(ignore_case_get(input_data, "object_additionalParamForPnf")) + + +def build_TaskSet(input_data): + vls = build_vls(input_data) + vnfs = build_vnfs(input_data) + pnfs = build_pnfs(input_data) + task_set = dict(dict(vls, **vnfs), **pnfs) + return task_set + + +def post_deal(ns_inst_id, status): + uri = "api/nslcm/v1/ns/{nsInstanceId}/postdeal".format(nsInstanceId=ns_inst_id) + data = json.JSONEncoder().encode({ + "status": status + }) + + ret = restcall.req_by_msb(uri, "POST", data) + if ret[0] != 0: + logger.error("Failed to call post_deal(%s): %s", ns_inst_id, ret[1]) + logger.debug("Call post_deal(%s, %s) successfully.", ns_inst_id, status) + + +def update_job(job_id, progress, errcode, desc): + logger.debug("job_id %s" % job_id) + uri = "api/nslcm/v1/jobs/{jobId}".format(jobId=job_id) + data = json.JSONEncoder().encode({ + "progress": progress, + "errcode": errcode, + "desc": desc + }) + ret = restcall.req_by_msb(uri, "POST", data) + return ret diff --git a/lcm/ns/serializers/ns_serializers.py b/lcm/ns/serializers/ns_serializers.py index 1fc6cadd..c1a5c8b9 100644 --- a/lcm/ns/serializers/ns_serializers.py +++ b/lcm/ns/serializers/ns_serializers.py @@ -79,6 +79,50 @@ class LocationConstraintSerializer(serializers.Serializer): locationConstraints = VimSerializer(help_text="Location constraint", required=False, allow_null=True) +class AddressRange(serializers.Serializer): + minAddress = serializers.IPAddressField(help_text="Lowest IP address belonging to the range.", required=True) + maxAddress = serializers.IPAddressField(help_text="Highest IP address belonging to the range.", required=True) + + +class IpAddress(serializers.Serializer): + type = serializers.ChoiceField(help_text="The type of the IP addresses.", required=True, choices=["IPV4", "IPV6"]) + fixedAddresses = serializers.ListField(child=serializers.CharField(help_text="Fixed addresses to assign."), required=False) + numDynamicAddresses = serializers.IntegerField(help_text="Number of dynamic addresses to assign.", required=False) + addressRange = AddressRange(help_text="An IP address range to be used.", required=False) + subnetId = serializers.CharField(help_text="Subnet defined by the identifier of the subnet resource in the VIM.", required=False) + + +class IpOverEthernetAddressData(serializers.Serializer): + macAddress = serializers.CharField(help_text="MAC address.", required=False) + ipAddresses = IpAddress(help_text="List of IP addresses to assign to the extCP instance.", required=False, many=True) + + +class CpProtocolInfoSerializer(serializers.Serializer): + layerProtocol = serializers.ChoiceField( + help_text="The identifier of layer(s) and protocol(s) associated to the network address information.", + choices=["IP_OVER_ETHERNET"], + required=True, + allow_null=False) + ipOverEthernet = IpOverEthernetAddressData( + help_text="IP addresses over Ethernet to assign to the extCP instance.", + required=False, + allow_null=True) + + +class PnfExtCpData(serializers.Serializer): + cpInstanceId = serializers.CharField(help_text="Identifier of the CP", required=False, allow_null=False) + cpdId = serializers.CharField(help_text="Identifier of the Connection Point Descriptor", required=False, allow_null=False) + cpProtocolData = CpProtocolInfoSerializer(help_text="Address assigned for this CP", required=True, allow_null=False, many=True) + + +class AddPnfData(serializers.Serializer): + pnfId = serializers.CharField(help_text="Identifier of the PNF", required=True, allow_null=False) + pnfName = serializers.CharField(help_text="Name of the PNF", required=True, allow_null=True) + pnfdId = serializers.CharField(help_text="Identifier of the PNFD", required=True, allow_null=False) + pnfProfileId = serializers.CharField(help_text="Identifier of related PnfProfile in the NSD", required=True, allow_null=False) + cpData = PnfExtCpData(help_text="Address assigned for the PNF external CP", required=False, many=True) + + class InstantNsReqSerializer(serializers.Serializer): locationConstraints = LocationConstraintSerializer(help_text="Location constraints", required=False, many=True) additionalParamForNs = serializers.DictField( @@ -87,6 +131,7 @@ class InstantNsReqSerializer(serializers.Serializer): required=False, allow_null=True ) + addpnfData = AddPnfData(help_text="Information on the PNF", required=False, many=True) class NsOperateJobSerializer(serializers.Serializer): diff --git a/lcm/ns/tests/test_ns_instant.py b/lcm/ns/tests/test_ns_instant.py index d5ced462..48f505d9 100644 --- a/lcm/ns/tests/test_ns_instant.py +++ b/lcm/ns/tests/test_ns_instant.py @@ -13,19 +13,39 @@ # limitations under the License. import json - import mock +from mock import MagicMock from django.test import TestCase from rest_framework import status from rest_framework.test import APIClient - from lcm.ns.biz.ns_instant import BuildInWorkflowThread from lcm.ns.biz.ns_instant import InstantNSService from lcm.pub.database.models import NSInstModel from lcm.pub.utils import restcall +from lcm.pub.config import config + +nsd_model = json.dumps({ + "model": json.dumps({ + "vnfs": [{ + "vnf_id": "vnf1", + "properties": { + "id": "vnfd1", + "nf_type": "xgw" + }, + "dependencies": [{ + "vl_id": "5" + }] + }], + "vls": [{ + "vl_id": "5", + "properties": {} + }] + }) +}) class TestNsInstant(TestCase): + def setUp(self): self.client = APIClient() NSInstModel.objects.filter().delete() @@ -41,24 +61,7 @@ class TestNsInstant(TestCase): } }] } - self.nsd_model = json.dumps({ - "model": json.dumps({ - "ns_vnfs": [{ - "vnf_id": "vnf1", - "properties": { - "id": "vnfd1", - "nf_type": "xgw" - }, - "dependencies": [{ - "vl_id": "5" - }] - }], - "ns_vls": [{ - "vl_id": "5", - "properties": {} - }] - }) - }) + self.nsd_model = nsd_model self.updated_nsd_model = { "ns_vnfs": [{ "dependencies": [{ @@ -106,18 +109,18 @@ class TestNsInstant(TestCase): pass @mock.patch.object(restcall, 'call_req') + @mock.patch('lcm.pub.msapi.sdc_run_catalog.parse_nsd', MagicMock(return_value=nsd_model)) @mock.patch.object(BuildInWorkflowThread, 'run') def test_ns_instantiate_when_succeed_to_enter_workflow(self, mock_run, mock_call_req): + config.WORKFLOW_OPTION = "buildin" mock_call_req.side_effect = [ [0, self.nsd_model, '200'], [0, self.vnfms, '200'], [0, self.vnfm, '200'] ] resp = self.client.post(self.url, data=self.req_data, format='json') - self.failUnlessEqual(status.HTTP_200_OK, resp.status_code) + self.assertEqual(status.HTTP_200_OK, resp.status_code) self.assertIn("jobId", resp.data) - upd_nsd_model = NSInstModel.objects.filter(id="2").first().nsd_model - self.assertEqual(self.updated_nsd_model, json.loads(upd_nsd_model)) @mock.patch.object(InstantNSService, 'do_biz') def test_ns_instantiate_normal(self, mock_do_biz): @@ -132,3 +135,40 @@ class TestNsInstant(TestCase): resp = self.client.post(self.url, data=self.req_data, format='json') self.assertEqual(resp.status_code, status.HTTP_500_INTERNAL_SERVER_ERROR) self.assertIn("error", resp.data) + + nsd = json.dumps({"vnffgs": [], "inputs": {}, "pnfs": [{"pnf_id": "du", "networks": [], "description": "", "properties": {"descriptor_id": "zte_ran_du_0001", "descriptor_invariant_id": "1111", "provider": "ZTE", "version": "1.0", "function_description": "RAN DU Function", "name": "ZTE RAN DU"}}], "ns_exposed": {"external_cps": [], "forward_cps": []}, "graph": {"cucp": [], "du": [], "vl_flat_net": ["cucp", "cuup"], "vl_ext_net": ["cucp", "cuup"], "cuup": []}, "basepath": "c:\\users\\10030173\\appdata\\local\\temp\\tmpvg5vto", "vnfs": [{"networks": [{"key_name": "ran_ext_net", "vl_id": "vl_ext_net"}, {"key_name": "ran_flat_net", "vl_id": "vl_flat_net"}], "dependencies": [{"key_name": "ran_ext_net", "vl_id": "vl_ext_net"}, {"key_name": "ran_flat_net", "vl_id": "vl_flat_net"}], "vnf_id": "cucp", "description": "", "properties": {"descriptor_id": "zte_ran_cucp_0001", "flavour_description": "default", "software_version": "1.0.1", "flavour_id": "1", "descriptor_version": "1.0", "provider": "ZTE", "id": "zte_ran_cucp_0001", "vnfm_info": ["GVNFM-Driver"], "product_name": "ran"}}, {"networks": [{"key_name": "ran_ext_net", "vl_id": "vl_ext_net"}, {"key_name": "ran_flat_net", "vl_id": "vl_flat_net"}], "dependencies": [{"key_name": "ran_ext_net", "vl_id": "vl_ext_net"}, {"key_name": "ran_flat_net", "vl_id": "vl_flat_net"}], "vnf_id": "cuup", "description": "", "properties": {"descriptor_id": "zte_ran_cuup_0001", "flavour_description": "default", "software_version": "1.0.1", "flavour_id": "1", "descriptor_version": "1.0", "provider": "ZTE", "id": "zte_ran_cuup_0001", "vnfm_info": ["GVNFM-Driver"], "product_name": "ran"}}], "fps": [], "vls": [{"vl_id": "vl_ext_net", "description": "", "properties": {"connectivity_type": {"layer_protocol": "ipv4"}, "vl_profile": {"cidr": "10.0.0.0/24", "max_bit_rate_requirements": {"root": 10000000, "leaf": 10000000}, "networkName": "ran_ext_net", "min_bit_rate_requirements": {"root": 10000000, "leaf": 10000000}, "dhcpEnabled": False}, "version": "1.0.1"}}, {"vl_id": "vl_flat_net", "description": "", "properties": {"connectivity_type": {"layer_protocol": "ipv4"}, "vl_profile": {"cidr": "10.1.0.0/24", "max_bit_rate_requirements": {"root": 10000000, "leaf": 10000000}, "networkName": "ran_flat_net", "min_bit_rate_requirements": {"root": 10000000, "leaf": 10000000}, "dhcpEnabled": False}, "version": "1.0.1"}}], "nested_ns": [], "metadata": {"template_name": "RAN-NS", "template_version": "1.0", "template_author": "ZTE"}}) + vnfminfo = {"vnfmId": "1"} + + @mock.patch('lcm.ns.biz.ns_instantiate_flow.post_deal') + @mock.patch.object(restcall, 'call_req') + @mock.patch('lcm.ns.biz.ns_instantiate_flow.update_job') + @mock.patch('lcm.pub.msapi.sdc_run_catalog.parse_nsd', MagicMock(return_value=nsd)) + @mock.patch('lcm.pub.msapi.extsys.select_vnfm', MagicMock(return_value=vnfminfo)) + def test_ns_instantiate_with_pnf(self, mock_updata_job, mock_call_req, mock_post_deal): + config.WORKFLOW_OPTION = "grapflow" + NSInstModel(id="1", name="test_ns", nspackage_id="1", status="created").save() + ret = [0, json.JSONEncoder().encode({'jobId': "1", "responseDescriptor": {"progress": 100}}), '200'] + mock_call_req.side_effect = [ret for i in range(1, 20)] + data = { + "additionalParamForNs": { + "sdnControllerId": "2" + }, + "locationConstraints": [{ + "vnfProfileId": "zte_ran_cucp_0001", + "locationConstraints": {"vimId": "3"} + }, + { + "vnfProfileId": "zte_ran_cuup_0001", + "locationConstraints": {"vimId": "3"} + } + ], + "addpnfData": [{ + "pnfId": 1, + "pnfName": "test_pnf", + "pnfdId": "zte_ran_du_0001", + "pnfProfileId": "du" + }] + } + # response = self.client.post("/api/nslcm/v1/ns/1/instantiate", data=data, format='json') + ack = InstantNSService(1, data).do_biz() + self.assertEqual(ack['status'], status.HTTP_200_OK) diff --git a/lcm/ns/views/inst_ns_view.py b/lcm/ns/views/inst_ns_view.py index 0f54429e..a43cf40b 100644 --- a/lcm/ns/views/inst_ns_view.py +++ b/lcm/ns/views/inst_ns_view.py @@ -38,6 +38,7 @@ class NSInstView(APIView): logger.debug("request.data=%s", request.data) req_serializer = InstantNsReqSerializer(data=request.data) if not req_serializer.is_valid(): + logger.debug("request.data is not valid,error: %s" % req_serializer.errors) return Response({'error': req_serializer.errors}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) ack = InstantNSService(ns_instance_id, request.data).do_biz() diff --git a/lcm/ns_pnfs/__init__.py b/lcm/ns_pnfs/__init__.py index 342c2a8c..2c3379b3 100644 --- a/lcm/ns_pnfs/__init__.py +++ b/lcm/ns_pnfs/__init__.py @@ -11,3 +11,21 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + +from lcm.workflows.graphflow.task.lcm_sync_rest_task import LcmSyncRestTask + + +class CreatePnf(LcmSyncRestTask): + def __init__(self, *args): + super(CreatePnf, self).__init__(*args) + self.url = "/api/nslcm/v1/pnfs" + self.method = self.POST + self.timeout = 10 + + +class DeletePnf(LcmSyncRestTask): + def __init__(self, *args): + super(DeletePnf, self).__init__(*args) + self.url = "/api/nslcm/v1/pnfs/%s" + self.method = self.DELETE + self.timeout = 10 diff --git a/lcm/ns_vls/__init__.py b/lcm/ns_vls/__init__.py index 5580cc3d..7200ef77 100644 --- a/lcm/ns_vls/__init__.py +++ b/lcm/ns_vls/__init__.py @@ -11,3 +11,21 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + +from lcm.workflows.graphflow.task.lcm_sync_rest_task import LcmSyncRestTask + + +class CreateVl(LcmSyncRestTask): + def __init__(self, *args): + super(CreateVl, self).__init__(*args) + self.url = "/api/nslcm/v1/ns/vls" + self.method = self.POST + self.timeout = 10 + + +class DeleteVl(LcmSyncRestTask): + def __init__(self, *args): + super(DeleteVl, self).__init__(*args) + self.url = "/api/nslcm/v1/ns/vls/%s" + self.method = self.DELETE + self.timeout = 10 diff --git a/lcm/ns_vls/biz/create_vls.py b/lcm/ns_vls/biz/create_vls.py index f23ca4dd..06ef78b4 100644 --- a/lcm/ns_vls/biz/create_vls.py +++ b/lcm/ns_vls/biz/create_vls.py @@ -74,7 +74,7 @@ class CreateVls(object): def get_data(self): if isinstance(self.context, (unicode, str)): self.context = json.JSONDecoder().decode(self.context) - vl_info = self.get_vl_info(ignore_case_get(self.context, "ns_vls")) + vl_info = self.get_vl_info(ignore_case_get(self.context, "vls")) self.vld_id = ignore_case_get(vl_info, "vl_id") self.description = ignore_case_get(vl_info, "description") self.vl_properties = ignore_case_get(vl_info, "properties") diff --git a/lcm/ns_vnfs/__init__.py b/lcm/ns_vnfs/__init__.py index 5580cc3d..51598b7b 100644 --- a/lcm/ns_vnfs/__init__.py +++ b/lcm/ns_vnfs/__init__.py @@ -11,3 +11,21 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + +from lcm.workflows.graphflow.task.lcm_async_rest_task import LcmASyncRestTask + + +class CreateVnf(LcmASyncRestTask): + def __init__(self, *args): + super(CreateVnf, self).__init__(*args) + self.url = "/api/nslcm/v1/ns/vnfs" + self.method = self.POST + self.timeout = 10 + + +class DeleteVnf(LcmASyncRestTask): + def __init__(self, *args): + super(DeleteVnf, self).__init__(*args) + self.url = "/api/nslcm/v1/ns/vnfs/%s" + self.method = self.DELETE + self.timeout = 10 diff --git a/lcm/ns_vnfs/biz/create_vnfs.py b/lcm/ns_vnfs/biz/create_vnfs.py index ea755362..0d18290c 100644 --- a/lcm/ns_vnfs/biz/create_vnfs.py +++ b/lcm/ns_vnfs/biz/create_vnfs.py @@ -115,14 +115,14 @@ class CreateVnfs(Thread): def get_vnfd_id(self): if self.vnfd_id: logger.debug("need not get vnfd_id") - self.nsd_model = {'ns_vnfs': [], 'ns_vls': [], 'vnffgs': []} + self.nsd_model = {'vnfs': [], 'vls': [], 'vnffgs': []} self.vnf_inst_name = self.vnfd_id + str(uuid.uuid4()) self.vnf_inst_name = self.vnf_inst_name[:30] return ns_inst_info = NSInstModel.objects.get(id=self.ns_inst_id) self.ns_inst_name = ns_inst_info.name self.nsd_model = json.loads(ns_inst_info.nsd_model) - for vnf_info in self.nsd_model['ns_vnfs']: + for vnf_info in self.nsd_model['vnfs']: if self.vnf_id == vnf_info['vnf_id']: self.vnfd_id = vnf_info['properties']['id'] if 'name' not in vnf_info['properties']: @@ -144,7 +144,7 @@ class CreateVnfs(Thread): def get_virtual_link_info(self, vnf_id): virtual_link_list, ext_virtual_link = [], [] - for vnf_info in self.nsd_model['ns_vnfs']: + for vnf_info in self.nsd_model['vnfs']: if vnf_info['vnf_id'] != vnf_id: continue for network_info in vnf_info['networks']: diff --git a/lcm/ns_vnfs/views/views.py b/lcm/ns_vnfs/views/views.py index e3a6bbf9..412551f5 100644 --- a/lcm/ns_vnfs/views/views.py +++ b/lcm/ns_vnfs/views/views.py @@ -67,7 +67,6 @@ class NfView(APIView): logger.error(req_serializer.errors) data = {'ns_instance_id': ignore_case_get(request.data, 'nsInstanceId'), - 'additional_param_for_ns': ignore_case_get(request.data, 'additionalParamForVnf'), 'additional_param_for_vnf': ignore_case_get(request.data, 'additionalParamForVnf'), 'vnf_index': ignore_case_get(request.data, 'vnfIndex')} nf_inst_id, job_id = create_vnfs.prepare_create_params() diff --git a/lcm/pub/config/config.py b/lcm/pub/config/config.py index 9d63b254..5ba41e98 100644 --- a/lcm/pub/config/config.py +++ b/lcm/pub/config/config.py @@ -66,7 +66,7 @@ MR_PORT = '3904' # [workflow] DEPLOY_WORKFLOW_WHEN_START = False -# Support option: activiti/wso2/buildin +# Support option: activiti/wso2/buildin/grapflow WORKFLOW_OPTION = "buildin" # [OOF config] diff --git a/lcm/pub/utils/restcall.py b/lcm/pub/utils/restcall.py index 171e7265..929f0656 100644 --- a/lcm/pub/utils/restcall.py +++ b/lcm/pub/utils/restcall.py @@ -84,6 +84,7 @@ def call_req(base_url, user, passwd, auth_type, resource, method, content='', ad def req_by_msb(resource, method, content=''): + logger.debug("resource: %s, method: %s, content: %s" % (resource, method, content)) base_url = "http://%s:%s/" % (MSB_SERVICE_IP, MSB_SERVICE_PORT) return call_req(base_url, "", "", rest_no_auth, resource, method, content) diff --git a/lcm/workflows/build_in.py b/lcm/workflows/build_in.py index 9086bc8b..993efbe6 100644 --- a/lcm/workflows/build_in.py +++ b/lcm/workflows/build_in.py @@ -53,6 +53,7 @@ def run_ns_instantiate(input_data): nsd_json = ignore_case_get(input_data, "object_context") ns_param_json = ignore_case_get(input_data, "object_additionalParamForNs") vnf_param_json = ignore_case_get(input_data, "object_additionalParamForVnf") + pnf_param_json = ignore_case_get(input_data, "object_additionalParamForPnf") vl_count = int(ignore_case_get(input_data, "vlCount", 0)) vnf_count = int(ignore_case_get(input_data, "vnfCount", 0)) sfc_count = int(ignore_case_get(input_data, "sfcCount", 0)) @@ -69,6 +70,9 @@ def run_ns_instantiate(input_data): [confirm_vnf_status(inst_id) for inst_id, _, _ in jobs] + update_job(job_id, 50, "true", "Start to create PNF") + create_pnf(pnf_param_json) + update_job(job_id, 70, "true", "Start to create SFC") g_jobs_status[job_id] = [1 for i in range(sfc_count)] jobs = [create_sfc(ns_inst_id, i + 1, nsd_json, sdnc_id) for i in range(sfc_count)] @@ -186,6 +190,7 @@ class JobWaitThread(Thread): """ Job Wait """ + def __init__(self, inst_id, job_id, ns_job_id, index): Thread.__init__(self) self.inst_id = inst_id @@ -269,3 +274,16 @@ def confirm_sfc_status(sfc_inst_id): sfc_status = ret[1]["sfcStatus"] if sfc_status != "active": raise NSLCMException("Status of SFC(%s) is not active" % sfc_inst_id) + + +def create_pnf(pnf_param_json): + if pnf_param_json and len(pnf_param_json) > 0: + pnfs = json.JSONDecoder().decode(pnf_param_json) + for pnf in pnfs: + uri = "/api/nslcm/v1/pnfs" + method = "POST" + content = json.JSONEncoder().encode(pnf["input"]["content"]) + ret = restcall.req_by_msb(uri, method, content) + if ret[0] != 0: + logger.error("Failed to call create_pnf(%s) result %s", content, ret) + raise NSLCMException("Failed to call create_pnf(%s)" % content) diff --git a/lcm/workflows/graphflow/__init__.py b/lcm/workflows/graphflow/__init__.py index 8e6d0ad0..694d82b7 100644 --- a/lcm/workflows/graphflow/__init__.py +++ b/lcm/workflows/graphflow/__init__.py @@ -12,14 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. + TASK_STAUS = (STARTED, PROCESSING, FINISHED, ERROR) = ("started", "processing", "finished", "error") TIMEOUT_DEFAULT = 10 - -# from lcm.workflows.graphflow.flow.flow import GraphFlow -# from lcm.workflows.graphflow.task.task import Task -# from lcm.workflows.graphflow.task.sync_task import SyncTask -# from lcm.workflows.graphflow.task.sync_rest_task import SyncRestTask -# from lcm.workflows.graphflow.task.async_task import AsyncTask -# from lcm.workflows.graphflow.task.async_rest_task import ASyncRestTask -# from lcm.workflows.graphflow.task.lcm_async_rest_task import LcmASyncRestTask -# from lcm.workflows.graphflow.task.lcm_sync_rest_task import LcmSyncRestTask diff --git a/lcm/workflows/graphflow/flow/__init__.py b/lcm/workflows/graphflow/flow/__init__.py new file mode 100644 index 00000000..342c2a8c --- /dev/null +++ b/lcm/workflows/graphflow/flow/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2018 ZTE Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/lcm/workflows/graphflow/flow/flow.py b/lcm/workflows/graphflow/flow/flow.py new file mode 100644 index 00000000..1c5d09ba --- /dev/null +++ b/lcm/workflows/graphflow/flow/flow.py @@ -0,0 +1,79 @@ +# Copyright 2018 ZTE Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import threading +import json +from threading import Thread +from lcm.workflows.graphflow.flow.graph import Graph +from lcm.workflows.graphflow.flow.load import load_class_from_config +from lcm.workflows.graphflow.flow.manager import TaskManager + +logger = logging.getLogger(__name__) + + +def _execute_task(exec_class): + logger.debug("graph task class %s" % exec_class) + exec_class.execute() + + +def create_instance(class_key, class_set, *args): + if class_key in class_set: + import_class = class_set[class_key] + return import_class(*args) + else: + return None + + +class GraphFlow(Thread): + def __init__(self, graph, task_para_dict, config): + Thread.__init__(self) + self._graph = Graph(graph) + self._task_para_dict = task_para_dict + self._imp_class_set = load_class_from_config(config) + self.task_manager = TaskManager() + + def run(self): + logger.debug("GraphFlow begin. graph:%s, task_para_dict:%s", self._graph, json.dumps(self._task_para_dict)) + self.sort_nodes = self._graph.topo_sort() + for node in self.sort_nodes: + pre_nodes = self._graph.get_pre_nodes(node) + logger.debug("current node %s, pre_nodes %s" % (node, pre_nodes)) + if len(pre_nodes) > 0: + self.task_manager.wait_tasks_done(pre_nodes) + if self.task_manager.is_all_task_finished(pre_nodes): + self.create_task(node) + logger.debug("GraphFlow create node %s", node) + else: + logger.debug("GraphFlow, end, error") + break + else: + self.create_task(node) + logger.debug("GraphFlow create node %s", node) + logger.debug("GraphFlow, end") + + def create_task(self, node): + task_para = self._task_para_dict[node] + task_para["key"] = node + task_para["status"] = "started" + task_para["manager"] = self.task_manager + if "type" in task_para: + class_key = task_para["type"] + exec_task = create_instance(class_key, self._imp_class_set, task_para) + self.task_manager.add_task(node, exec_task) + thread_task = threading.Thread(target=_execute_task, args=(exec_task,)) + thread_task.start() + return True + else: + return False diff --git a/lcm/workflows/graphflow/flow/graph.py b/lcm/workflows/graphflow/flow/graph.py new file mode 100644 index 00000000..334eea6a --- /dev/null +++ b/lcm/workflows/graphflow/flow/graph.py @@ -0,0 +1,73 @@ +# Copyright 2018 ZTE Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from collections import deque +from collections import OrderedDict + +logger = logging.getLogger(__name__) + + +class Graph(object): + + def __init__(self, graph_dict=None): + self.graph = OrderedDict() + if graph_dict: + for node, dep_nodes in graph_dict.iteritems(): + self.add_node(node, dep_nodes) + + def add_node(self, node, dep_nodes): + if node not in self.graph: + self.graph[node] = set() + if isinstance(dep_nodes, list): + for dep_node in dep_nodes: + if dep_node not in self.graph: + self.graph[dep_node] = set() + if dep_node not in self.graph[node]: + self.graph[node].add(dep_node) + + def get_pre_nodes(self, node): + return [k for k in self.graph if node in self.graph[k]] + + def topo_sort(self): + degree = {} + for node in self.graph: + degree[node] = 0 + for node in self.graph: + for dependent in self.graph[node]: + degree[dependent] += 1 + queue = deque() + for node in degree: + if degree[node] == 0: + queue.appendleft(node) + sort_list = [] + while queue: + node = queue.pop() + sort_list.append(node) + for dependent in self.graph[node]: + degree[dependent] -= 1 + if degree[dependent] == 0: + queue.appendleft(dependent) + if len(sort_list) == len(self.graph): + return sort_list + else: + return None + + def to_dict(self): + dict = {} + for node, dependents in self.graph.iteritems(): + dict[node] = [] + for dep in dependents: + dict[node].append(dep) + return dict diff --git a/lcm/workflows/graphflow/flow/load.py b/lcm/workflows/graphflow/flow/load.py new file mode 100644 index 00000000..757be892 --- /dev/null +++ b/lcm/workflows/graphflow/flow/load.py @@ -0,0 +1,46 @@ +# Copyright 2018 ZTE Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import importlib +import logging + + +logger = logging.getLogger(__name__) + + +def load_module(imp_module): + try: + imp_module = importlib.import_module(imp_module) + except Exception: + logger.debug("load_module error: %s", imp_module) + imp_module = None + return imp_module + + +def load_class(imp_module, imp_class): + try: + cls = getattr(imp_module, imp_class) + except Exception: + logger.debug("load_class error: %s", imp_class) + cls = None + return cls + + +def load_class_from_config(config): + class_set = {} + for k, v in config.iteritems(): + imp_module = load_module(v["module"]) + cls = load_class(imp_module, v["class"]) + class_set[k] = cls + return class_set diff --git a/lcm/workflows/graphflow/flow/manager.py b/lcm/workflows/graphflow/flow/manager.py new file mode 100644 index 00000000..f0c2cd67 --- /dev/null +++ b/lcm/workflows/graphflow/flow/manager.py @@ -0,0 +1,81 @@ +# Copyright 2018 ZTE Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime +from lcm.workflows.graphflow import STARTED, PROCESSING, FINISHED, ERROR +import logging +import time + +logger = logging.getLogger(__name__) + + +class TaskManager(object): + + def __init__(self): + self.task_set = {} + + def add_task(self, key, task, timeout=None): + self.task_set[key] = task + logger.debug("task_set %s" % self.task_set) + + def update_task_status(self, key, status): + if key in self.task_set: + task = self.task_set[key] + task.update_task(status) + + def update_task(self, key, task): + if key in self.task_set: + self.task_set[key] = task + + def get_task(self, key): + if key in self.task_set: + return self.task_set[key] + else: + return None + + def get_all_task(self): + return self.task_set + + def is_all_task_finished(self, task_key_set=None): + states = [] + if not task_key_set: + task_key_set = self.task_set.keys() + total = len(task_key_set) + for key in task_key_set: + if key in self.task_set: + states.append(self.task_set[key].status) + if len([state for state in states if state == FINISHED]) == total: + return True + else: + for key in task_key_set: + logger.debug("task key %s, status %s" % (key, self.task_set[key].status)) + return False + + def wait_tasks_done(self, task_key_set=None): + if task_key_set: + for key in task_key_set: + if key in self.task_set.keys(): + task = self.task_set[key] + logger.debug("current wait task %s, endtime %s, status %s" % (task.key, task.endtime, task.status)) + while task.endtime >= datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') and task.status in [STARTED, PROCESSING]: + time.sleep(1) + if task.status in [STARTED, PROCESSING]: + task.status = ERROR + logger.debug("wait task final status %s" % task.status) + else: + for task in self.task_set.itervalues(): + while task.endtime >= datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') and task.status in [STARTED, PROCESSING]: + time.sleep(1) + if task.status in [STARTED, PROCESSING]: + task.status = ERROR diff --git a/lcm/workflows/graphflow/tests/__init__.py b/lcm/workflows/graphflow/tests/__init__.py new file mode 100644 index 00000000..342c2a8c --- /dev/null +++ b/lcm/workflows/graphflow/tests/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2018 ZTE Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/lcm/workflows/graphflow/tests/graph_flow_tests.py b/lcm/workflows/graphflow/tests/graph_flow_tests.py new file mode 100644 index 00000000..af0aab2c --- /dev/null +++ b/lcm/workflows/graphflow/tests/graph_flow_tests.py @@ -0,0 +1,140 @@ +# Copyright 2018 ZTE Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +import mock +import json +from lcm.pub.utils import restcall +from lcm.workflows.graphflow.flow.flow import GraphFlow + + +config = { + "CreateSynVNF": {"module": "lcm.workflows.graphflow.tests.task_tests", "class": "CreateSynVNF"}, + "CreateAsynVNF": {"module": "lcm.workflows.graphflow.tests.task_tests", "class": "CreateAsynVNF"}, + "CreateASynRestVNF": {"module": "lcm.workflows.graphflow.tests.task_tests", "class": "CreateASynRestVNF"} +} + + +class test(object): + def execute(self, args): + print "test args %s" % args + + +class GraphFlowTest(unittest.TestCase): + def setUp(self): + pass + + def tearDown(self): + pass + + def test_sync_task(self): + deploy_graph = { + "ran-cu-00": ["ran-du-00"], + "ran-du-00": [], + } + TaskSet = { + 'ran-cu-00': { + "type": "CreateSynVNF", + "input": { + "nsInstanceId": 1, + "vnfId": 1 + }, + "timeOut": 10 + }, + 'ran-du-00': { + "type": "CreateSynVNF", + "input": { + "nsInstanceId": 1, + "vnfId": 1 + }, + "timeOut": 10 + } + } + gf = GraphFlow(deploy_graph, TaskSet, config) + gf.start() + gf.join() + gf.task_manager.wait_tasks_done(gf.sort_nodes) + task_set = gf.task_manager.get_all_task() + for task in task_set.itervalues(): + self.assertEqual(task.FINISHED, task.status) + + def test_async_task(self): + deploy_graph = { + "ran-cu-01": ["ran-du-01"], + "ran-du-01": [], + } + TaskSet = { + 'ran-cu-01': { + "type": "CreateAsynVNF", + "input": { + "nsInstanceId": 1, + "vnfId": 1 + }, + "timeOut": 10 + }, + 'ran-du-01': { + "type": "CreateAsynVNF", + "input": { + "nsInstanceId": 1, + "vnfId": 1 + }, + "timeOut": 10 + } + } + gf = GraphFlow(deploy_graph, TaskSet, config) + gf.start() + gf.join() + gf.task_manager.wait_tasks_done(gf.sort_nodes) + task_set = gf.task_manager.get_all_task() + for task in task_set.itervalues(): + self.assertEqual(task.FINISHED, task.status) + + @mock.patch.object(restcall, 'call_req') + def test_async_rest_task(self, mock_call_req): + mock_call_req.return_value = [0, json.JSONEncoder().encode({ + 'jobId': "1", + "responseDescriptor": {"progress": 100} + }), '200'] + + deploy_graph = { + "ran-cu-02": ["ran-du-02"], + "ran-du-02": [], + } + TaskSet = { + 'ran-cu-02': { + "type": "CreateASynRestVNF", + "input": { + "url": "/test/", + "method": "POST", + "content": {} + }, + "timeOut": 10 + }, + 'ran-du-02': { + "type": "CreateASynRestVNF", + "input": { + "url": "/test/", + "method": "POST", + "content": {} + }, + "timeOut": 10 + } + } + gf = GraphFlow(deploy_graph, TaskSet, config) + gf.start() + gf.join() + gf.task_manager.wait_tasks_done(gf.sort_nodes) + task_set = gf.task_manager.get_all_task() + for task in task_set.itervalues(): + self.assertEqual(task.FINISHED, task.status) diff --git a/lcm/workflows/graphflow/tests/graph_tests.py b/lcm/workflows/graphflow/tests/graph_tests.py new file mode 100644 index 00000000..894c232d --- /dev/null +++ b/lcm/workflows/graphflow/tests/graph_tests.py @@ -0,0 +1,38 @@ +# Copyright 2018 ZTE Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging + +from django.test import TestCase +from lcm.workflows.graphflow.flow.graph import Graph + +logger = logging.getLogger(__name__) + + +class TestToscaparser(TestCase): + def setUp(self): + pass + + def tearDown(self): + pass + + def test_graph(self): + data = { + "cucp": [], + "du": [], + "vl_flat_net": ["cucp", "cuup"], + "vl_ext_net": ["cucp", "cuup"], + "cuup": [] + } + graph = Graph(data) + self.assertEqual(['vl_ext_net', 'vl_flat_net'].sort(), graph.get_pre_nodes("cucp").sort()) diff --git a/lcm/workflows/graphflow/tests/task_tests.py b/lcm/workflows/graphflow/tests/task_tests.py new file mode 100644 index 00000000..24e06662 --- /dev/null +++ b/lcm/workflows/graphflow/tests/task_tests.py @@ -0,0 +1,49 @@ +# Copyright 2018 ZTE Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from lcm.workflows.graphflow.task.async_task import AsyncTask +from lcm.workflows.graphflow.task.sync_task import SyncTask +from lcm.workflows.graphflow.task.lcm_async_rest_task import LcmASyncRestTask +import logging + +logger = logging.getLogger(__name__) + + +class CreateSynVNF(SyncTask): + def __init__(self, *args): + super(CreateSynVNF, self).__init__(*args) + + def run(self): + logger.debug("test CreateSynVNF %s" % self.key) + return self.FINISHED, {} + + +class CreateAsynVNF(AsyncTask): + def __init__(self, *args): + super(CreateAsynVNF, self).__init__(*args) + + def run(self): + logger.debug("test CreateAsynVNF %s" % self.key) + return self.PROCESSING, None + + def get_ext_status(self): + return self.FINISHED + + +class CreateASynRestVNF(LcmASyncRestTask): + + def __init__(self, *args): + super(CreateASynRestVNF, self).__init__(*args) + self.url = "/api/nslcm/v1/vnfs" + self.method = self.POST |