summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--docs/release-notes.rst8
-rw-r--r--lcm/ns/biz/ns_instant.py69
-rw-r--r--lcm/ns/biz/ns_instantiate_flow.py179
-rw-r--r--lcm/ns/serializers/ns_serializers.py45
-rw-r--r--lcm/ns/tests/test_ns_instant.py86
-rw-r--r--lcm/ns/views/inst_ns_view.py1
-rw-r--r--lcm/ns_pnfs/__init__.py18
-rw-r--r--lcm/ns_vls/__init__.py18
-rw-r--r--lcm/ns_vls/biz/create_vls.py2
-rw-r--r--lcm/ns_vnfs/__init__.py18
-rw-r--r--lcm/ns_vnfs/biz/create_vnfs.py6
-rw-r--r--lcm/ns_vnfs/views/views.py1
-rw-r--r--lcm/pub/config/config.py2
-rw-r--r--lcm/pub/utils/restcall.py1
-rw-r--r--lcm/workflows/build_in.py18
-rw-r--r--lcm/workflows/graphflow/__init__.py10
-rw-r--r--lcm/workflows/graphflow/flow/__init__.py13
-rw-r--r--lcm/workflows/graphflow/flow/flow.py79
-rw-r--r--lcm/workflows/graphflow/flow/graph.py73
-rw-r--r--lcm/workflows/graphflow/flow/load.py46
-rw-r--r--lcm/workflows/graphflow/flow/manager.py81
-rw-r--r--lcm/workflows/graphflow/tests/__init__.py13
-rw-r--r--lcm/workflows/graphflow/tests/graph_flow_tests.py140
-rw-r--r--lcm/workflows/graphflow/tests/graph_tests.py38
-rw-r--r--lcm/workflows/graphflow/tests/task_tests.py49
25 files changed, 953 insertions, 61 deletions
diff --git a/docs/release-notes.rst b/docs/release-notes.rst
index 0334a488..75cde295 100644
--- a/docs/release-notes.rst
+++ b/docs/release-notes.rst
@@ -51,9 +51,9 @@ This is the initial release
**Known Issues**
- - VFC-896 vim-id in AAI is handled as a mandatory parameter
- - VFC-890 The hard coded SDC user and password in catalog & LCM is not present in SDC
- - VFC-891 The AAI credentials is hard coded in LCM
+ - `VFC-896 <https://jira.onap.org/browse/VFC-896>`_ vim-id in AAI is handled as a mandatory parameter
+ - `VFC-890 <https://jira.onap.org/browse/VFC-890>`_ The hard coded SDC user and password in catalog & LCM is not present in SDC
+ - `VFC-891 <https://jira.onap.org/browse/VFC-891>`_ The AAI credentials is hard coded in LCM
**Security Notes**
@@ -86,7 +86,7 @@ Version: 1.0.0
**New Features**
- NS lifecycle management, including NS instance creation, termination and healing
-- VNF lifecycle management, including VNF nstance creation, termination and healing
+- VNF lifecycle management, including VNF instance creation, termination and healing
- VNF FCAPS, collecting FCAPS data from vendor EMS
- VNFM Integration, integration with specific VNFMs of vendors to deploy commercial VNFs
- VNF Integration, integration with VNF via GVNFM
diff --git a/lcm/ns/biz/ns_instant.py b/lcm/ns/biz/ns_instant.py
index 527a35ee..52f686be 100644
--- a/lcm/ns/biz/ns_instant.py
+++ b/lcm/ns/biz/ns_instant.py
@@ -20,7 +20,7 @@ from threading import Thread
from rest_framework import status
-from lcm.pub.config.config import WORKFLOW_OPTION
+from lcm.pub.config import config
from lcm.pub.database.models import DefPkgMappingModel, ServiceBaseInfoModel, InputParamMappingModel
from lcm.pub.database.models import NSInstModel, VNFFGInstModel, WFPlanModel
from lcm.pub.exceptions import NSLCMException
@@ -28,11 +28,12 @@ from lcm.pub.msapi import activiti
from lcm.pub.msapi import sdc_run_catalog
from lcm.pub.msapi.catalog import get_process_id
from lcm.pub.msapi.catalog import get_servicetemplate_id, get_servicetemplate
-from lcm.pub.msapi.extsys import select_vnfm
+from lcm.pub.msapi import extsys
from lcm.pub.msapi.wso2bpel import workflow_run
from lcm.pub.utils.jobutil import JobUtil
from lcm.pub.utils.values import ignore_case_get
from lcm.workflows import build_in
+from lcm.ns.biz.ns_instantiate_flow import run_ns_instantiate
logger = logging.getLogger(__name__)
@@ -72,18 +73,18 @@ class InstantNSService(object):
JobUtil.add_job_status(job_id, 5, 'Start query nsd(%s)' % ns_inst.nspackage_id)
dst_plan = sdc_run_catalog.parse_nsd(ns_inst.nspackage_id, input_parameters)
logger.debug('tosca plan dest: %s' % dst_plan)
-
+ logger.debug('Start query nsd(%s)' % ns_inst.nspackage_id)
NSInstModel.objects.filter(id=self.ns_inst_id).update(nsd_model=dst_plan)
params_json = json.JSONEncoder().encode(self.req_data["additionalParamForNs"])
- # start
params_vnf = []
plan_dict = json.JSONDecoder().decode(dst_plan)
- for vnf in ignore_case_get(plan_dict, "ns_vnfs"):
+ for vnf in ignore_case_get(plan_dict, "vnfs"):
vnfd_id = vnf['properties']['id']
vnfm_type = vnf['properties'].get("nf_type", "undefined")
vimid = self.get_vnf_vim_id(vim_id, location_constraints, vnfd_id)
- vnfm_info = select_vnfm(vnfm_type=vnfm_type, vim_id=vimid)
+ vnfm_info = extsys.select_vnfm(vnfm_type=vnfm_type, vim_id=vimid)
+
params_vnf.append({
"vnfProfileId": vnf["vnf_id"],
"additionalParam": {
@@ -94,20 +95,22 @@ class InstantNSService(object):
"inputs": params_json
}
})
- # end
self.set_vl_vim_id(vim_id, location_constraints, plan_dict)
dst_plan = json.JSONEncoder().encode(plan_dict)
logger.debug('tosca plan dest add vimid:%s' % dst_plan)
NSInstModel.objects.filter(id=self.ns_inst_id).update(nsd_model=dst_plan)
+ pnf_params_json = json.JSONEncoder().encode(self.init_pnf_para(plan_dict))
+
vnf_params_json = json.JSONEncoder().encode(params_vnf)
plan_input = {
'jobId': job_id,
'nsInstanceId': self.ns_inst_id,
'object_context': dst_plan,
'object_additionalParamForNs': params_json,
- 'object_additionalParamForVnf': vnf_params_json
+ 'object_additionalParamForVnf': vnf_params_json,
+ 'object_additionalParamForPnf': pnf_params_json
}
plan_input.update(**self.get_model_count(dst_plan))
plan_input["sdnControllerId"] = ignore_case_get(
@@ -122,7 +125,7 @@ class InstantNSService(object):
creator='--',
create_time=int(time.time() * 1000)).save()
- if WORKFLOW_OPTION == "wso2":
+ if config.WORKFLOW_OPTION == "wso2":
service_tpl = get_servicetemplate(ns_inst.nsd_id)
DefPkgMappingModel(service_id=self.ns_inst_id,
service_def_id=service_tpl['csarId'],
@@ -142,18 +145,20 @@ class InstantNSService(object):
else:
# TODO:
pass
-
- if WORKFLOW_OPTION == "wso2":
+ logger.debug("workflow option: %s" % config.WORKFLOW_OPTION)
+ if config.WORKFLOW_OPTION == "wso2":
return self.start_wso2_workflow(job_id, ns_inst, plan_input)
- elif WORKFLOW_OPTION == "activiti":
+ elif config.WORKFLOW_OPTION == "activiti":
return self.start_activiti_workflow()
+ elif config.WORKFLOW_OPTION == "grapflow":
+ return self.start_buildin_grapflow(job_id, plan_input)
else:
return self.start_buildin_workflow(job_id, plan_input)
except Exception as e:
logger.error(traceback.format_exc())
logger.error("ns-instant(%s) workflow error:%s" % (self.ns_inst_id, e.message))
- JobUtil.add_job_status(job_id, 255, 'NS instantiation failed: %s' % e.message)
+ JobUtil.add_job_status(job_id, 255, 'NS instantiation failed')
return dict(data={'error': e.message}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
def start_wso2_workflow(self, job_id, ns_inst, plan_input):
@@ -191,6 +196,11 @@ class InstantNSService(object):
BuildInWorkflowThread(plan_input).start()
return dict(data={'jobId': job_id}, status=status.HTTP_200_OK)
+ def start_buildin_grapflow(self, job_id, plan_input):
+ JobUtil.add_job_status(job_id, 10, 'NS inst(%s) buildin grap workflow started.' % self.ns_inst_id)
+ run_ns_instantiate(plan_input)
+ return dict(data={'jobId': job_id}, status=status.HTTP_200_OK)
+
@staticmethod
def get_vnf_vim_id(vim_id, location_constraints, vnfdid):
for location in location_constraints:
@@ -202,11 +212,11 @@ class InstantNSService(object):
@staticmethod
def set_vl_vim_id(vim_id, location_constraints, plan_dict):
- if "ns_vls" not in plan_dict:
+ if "vls" not in plan_dict:
logger.debug("No vl is found in nsd.")
return
vl_vnf = {}
- for vnf in ignore_case_get(plan_dict, "ns_vnfs"):
+ for vnf in ignore_case_get(plan_dict, "vnfs"):
if "dependencies" in vnf:
for depend in vnf["dependencies"]:
vl_vnf[depend["vl_id"]] = vnf['properties']['id']
@@ -215,7 +225,7 @@ class InstantNSService(object):
if "vnfProfileId" in location:
vnfd_id = location["vnfProfileId"]
vnf_vim[vnfd_id] = location["locationConstraints"]["vimId"]
- for vl in plan_dict["ns_vls"]:
+ for vl in plan_dict["vls"]:
vnfdid = ignore_case_get(vl_vnf, vl["vl_id"])
vimid = ignore_case_get(vnf_vim, vnfdid)
if not vimid:
@@ -229,7 +239,28 @@ class InstantNSService(object):
@staticmethod
def get_model_count(context):
data = json.JSONDecoder().decode(context)
- vls = len(data.get('ns_vls', []))
+ vls = len(data.get('vls', []))
sfcs = len(data.get('fps', []))
- vnfs = len(data.get('ns_vnfs', []))
- return {'vlCount': str(vls), 'sfcCount': str(sfcs), 'vnfCount': str(vnfs)}
+ vnfs = len(data.get('vnfs', []))
+ pnfs = len(data.get('pnfs', []))
+ return {'vlCount': str(vls), 'sfcCount': str(sfcs), 'vnfCount': str(vnfs), 'pnfCount': str(pnfs)}
+
+ def init_pnf_para(self, plan_dict):
+ pnfs_in_input = ignore_case_get(self.req_data, "addpnfData")
+ pnfs_in_nsd = ignore_case_get(plan_dict, "pnfs")
+ logger.debug("addpnfData ; %s" % pnfs_in_input)
+ logger.debug("pnfs_in_nsd ; %s" % pnfs_in_nsd)
+ pnfs = {}
+ for pnf in pnfs_in_input:
+ for pnfd in pnfs_in_nsd:
+ if pnfd["properties"]["descriptor_id"] == pnf["pnfdId"]:
+ k = pnfd["pnf_id"]
+ pnf["nsInstances"] = self.ns_inst_id
+ # todo pnf["pnfdInfoId"]
+ pnfs[k] = {
+ "type": "CreatePnf",
+ "input": {
+ "content": pnf
+ }
+ }
+ return pnfs
diff --git a/lcm/ns/biz/ns_instantiate_flow.py b/lcm/ns/biz/ns_instantiate_flow.py
new file mode 100644
index 00000000..88b04fe7
--- /dev/null
+++ b/lcm/ns/biz/ns_instantiate_flow.py
@@ -0,0 +1,179 @@
+# Copyright 2018 ZTE Corporation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+import logging
+import traceback
+from threading import Thread
+
+from lcm.pub.utils.syscomm import fun_name
+from lcm.pub.utils.values import ignore_case_get
+from lcm.pub.utils import restcall
+from lcm.pub.exceptions import NSLCMException
+from lcm.workflows.graphflow.flow.flow import GraphFlow
+
+logger = logging.getLogger(__name__)
+
+RESULT_OK, RESULT_NG = "0", "1"
+JOB_ERROR = 255
+
+config = {
+ "CreateVnf": {"module": "lcm.ns_vnfs", "class": "CreateVnf"},
+ "CreatePnf": {"module": "lcm.ns_pnfs", "class": "CreatePnf"},
+ "CreateVl": {"module": "lcm.ns_vls", "class": "CreateVl"}
+}
+
+
+class NsInstantiateWorkflowThread(Thread):
+ def __init__(self, plan_input):
+ Thread.__init__(self)
+ self.plan_input = plan_input
+
+ def run(self):
+ run_ns_instantiate(self.plan_input)
+
+
+def run_ns_instantiate(input_data):
+ """
+ format of input_data
+ {
+ "jobId": uuid of job,
+ "nsInstanceId": id of ns instance,
+ "object_context": json format of nsd,
+ "object_additionalParamForNs": json format of additional parameters for ns,
+ "object_additionalParamForVnf": json format of additional parameters for vnf,
+ "object_additionalParamForPnf": json format of additional parameters for pnf,
+ "vlCount": int type of VL count,
+ "vnfCount: int type of VNF count
+ }
+ """
+ logger.debug("Enter %s, input_data is %s", fun_name(), input_data)
+ ns_inst_id = ignore_case_get(input_data, "nsInstanceId")
+ job_id = ignore_case_get(input_data, "jobId")
+ update_job(job_id, 10, "true", "Start to prepare the NS instantiate workflow parameter")
+ deploy_graph = build_deploy_graph(input_data)
+ TaskSet = build_TaskSet(input_data)
+ ns_instantiate_ok = False
+
+ try:
+ update_job(job_id, 15, "true", "Start the NS instantiate workflow")
+ gf = GraphFlow(deploy_graph, TaskSet, config)
+ logger.debug("NS graph flow run up!")
+ gf.start()
+ gf.join()
+ gf.task_manager.wait_tasks_done(gf.sort_nodes)
+ if gf.task_manager.is_all_task_finished():
+ logger.debug("NS is instantiated!")
+ update_job(job_id, 90, "true", "Start to post deal")
+ post_deal(ns_inst_id, "true")
+ update_job(job_id, 100, "true", "Create NS successfully.")
+ ns_instantiate_ok = True
+ except NSLCMException as e:
+ logger.error("Failded to Create NS: %s", e.message)
+ update_job(job_id, JOB_ERROR, "255", "Failded to Create NS.")
+ post_deal(ns_inst_id, "false")
+ except:
+ logger.error(traceback.format_exc())
+ update_job(job_id, JOB_ERROR, "255", "Failded to Create NS.")
+ post_deal(ns_inst_id, "false")
+ return ns_instantiate_ok
+
+
+def build_deploy_graph(input_data):
+ nsd_json_str = ignore_case_get(input_data, "object_context")
+ nsd_json = json.JSONDecoder().decode(nsd_json_str)
+ deploy_graph = ignore_case_get(nsd_json, "graph")
+ logger.debug("NS graph flow: %s" % deploy_graph)
+ return deploy_graph
+
+
+def build_vls(input_data):
+ ns_inst_id = ignore_case_get(input_data, "nsInstanceId")
+ nsd_json = json.JSONDecoder().decode(ignore_case_get(input_data, "object_context"))
+ ns_param_json = ignore_case_get(input_data, "object_additionalParamForNs")
+ vl_count = int(ignore_case_get(input_data, "vlCount", 0))
+
+ vls = {}
+ for i in range(vl_count):
+ data = {
+ "nsInstanceId": ns_inst_id,
+ "vlIndex": i,
+ "context": nsd_json,
+ "additionalParamForNs": ns_param_json
+ }
+ key = nsd_json["vls"][i - 1]["vl_id"]
+ vls[key] = {
+ "type": "CreateVl",
+ "input": {
+ "content": data
+ }
+ }
+ return vls
+
+
+def build_vnfs(input_data):
+ ns_inst_id = ignore_case_get(input_data, "nsInstanceId")
+ vnf_count = int(ignore_case_get(input_data, "vnfCount", 0))
+ vnf_param_json = json.JSONDecoder().decode(ignore_case_get(input_data, "object_additionalParamForVnf"))
+ vnfs = {}
+ for i in range(vnf_count):
+ data = {
+ "nsInstanceId": ns_inst_id,
+ "vnfIndex": i,
+ "additionalParamForVnf": vnf_param_json
+ }
+ key = vnf_param_json[i - 1]["vnfProfileId"]
+ vnfs[key] = {
+ "type": "CreateVnf",
+ "input": {
+ "content": data
+ }
+ }
+ return vnfs
+
+
+def build_pnfs(input_data):
+ return json.JSONDecoder().decode(ignore_case_get(input_data, "object_additionalParamForPnf"))
+
+
+def build_TaskSet(input_data):
+ vls = build_vls(input_data)
+ vnfs = build_vnfs(input_data)
+ pnfs = build_pnfs(input_data)
+ task_set = dict(dict(vls, **vnfs), **pnfs)
+ return task_set
+
+
+def post_deal(ns_inst_id, status):
+ uri = "api/nslcm/v1/ns/{nsInstanceId}/postdeal".format(nsInstanceId=ns_inst_id)
+ data = json.JSONEncoder().encode({
+ "status": status
+ })
+
+ ret = restcall.req_by_msb(uri, "POST", data)
+ if ret[0] != 0:
+ logger.error("Failed to call post_deal(%s): %s", ns_inst_id, ret[1])
+ logger.debug("Call post_deal(%s, %s) successfully.", ns_inst_id, status)
+
+
+def update_job(job_id, progress, errcode, desc):
+ logger.debug("job_id %s" % job_id)
+ uri = "api/nslcm/v1/jobs/{jobId}".format(jobId=job_id)
+ data = json.JSONEncoder().encode({
+ "progress": progress,
+ "errcode": errcode,
+ "desc": desc
+ })
+ ret = restcall.req_by_msb(uri, "POST", data)
+ return ret
diff --git a/lcm/ns/serializers/ns_serializers.py b/lcm/ns/serializers/ns_serializers.py
index 1fc6cadd..c1a5c8b9 100644
--- a/lcm/ns/serializers/ns_serializers.py
+++ b/lcm/ns/serializers/ns_serializers.py
@@ -79,6 +79,50 @@ class LocationConstraintSerializer(serializers.Serializer):
locationConstraints = VimSerializer(help_text="Location constraint", required=False, allow_null=True)
+class AddressRange(serializers.Serializer):
+ minAddress = serializers.IPAddressField(help_text="Lowest IP address belonging to the range.", required=True)
+ maxAddress = serializers.IPAddressField(help_text="Highest IP address belonging to the range.", required=True)
+
+
+class IpAddress(serializers.Serializer):
+ type = serializers.ChoiceField(help_text="The type of the IP addresses.", required=True, choices=["IPV4", "IPV6"])
+ fixedAddresses = serializers.ListField(child=serializers.CharField(help_text="Fixed addresses to assign."), required=False)
+ numDynamicAddresses = serializers.IntegerField(help_text="Number of dynamic addresses to assign.", required=False)
+ addressRange = AddressRange(help_text="An IP address range to be used.", required=False)
+ subnetId = serializers.CharField(help_text="Subnet defined by the identifier of the subnet resource in the VIM.", required=False)
+
+
+class IpOverEthernetAddressData(serializers.Serializer):
+ macAddress = serializers.CharField(help_text="MAC address.", required=False)
+ ipAddresses = IpAddress(help_text="List of IP addresses to assign to the extCP instance.", required=False, many=True)
+
+
+class CpProtocolInfoSerializer(serializers.Serializer):
+ layerProtocol = serializers.ChoiceField(
+ help_text="The identifier of layer(s) and protocol(s) associated to the network address information.",
+ choices=["IP_OVER_ETHERNET"],
+ required=True,
+ allow_null=False)
+ ipOverEthernet = IpOverEthernetAddressData(
+ help_text="IP addresses over Ethernet to assign to the extCP instance.",
+ required=False,
+ allow_null=True)
+
+
+class PnfExtCpData(serializers.Serializer):
+ cpInstanceId = serializers.CharField(help_text="Identifier of the CP", required=False, allow_null=False)
+ cpdId = serializers.CharField(help_text="Identifier of the Connection Point Descriptor", required=False, allow_null=False)
+ cpProtocolData = CpProtocolInfoSerializer(help_text="Address assigned for this CP", required=True, allow_null=False, many=True)
+
+
+class AddPnfData(serializers.Serializer):
+ pnfId = serializers.CharField(help_text="Identifier of the PNF", required=True, allow_null=False)
+ pnfName = serializers.CharField(help_text="Name of the PNF", required=True, allow_null=True)
+ pnfdId = serializers.CharField(help_text="Identifier of the PNFD", required=True, allow_null=False)
+ pnfProfileId = serializers.CharField(help_text="Identifier of related PnfProfile in the NSD", required=True, allow_null=False)
+ cpData = PnfExtCpData(help_text="Address assigned for the PNF external CP", required=False, many=True)
+
+
class InstantNsReqSerializer(serializers.Serializer):
locationConstraints = LocationConstraintSerializer(help_text="Location constraints", required=False, many=True)
additionalParamForNs = serializers.DictField(
@@ -87,6 +131,7 @@ class InstantNsReqSerializer(serializers.Serializer):
required=False,
allow_null=True
)
+ addpnfData = AddPnfData(help_text="Information on the PNF", required=False, many=True)
class NsOperateJobSerializer(serializers.Serializer):
diff --git a/lcm/ns/tests/test_ns_instant.py b/lcm/ns/tests/test_ns_instant.py
index d5ced462..48f505d9 100644
--- a/lcm/ns/tests/test_ns_instant.py
+++ b/lcm/ns/tests/test_ns_instant.py
@@ -13,19 +13,39 @@
# limitations under the License.
import json
-
import mock
+from mock import MagicMock
from django.test import TestCase
from rest_framework import status
from rest_framework.test import APIClient
-
from lcm.ns.biz.ns_instant import BuildInWorkflowThread
from lcm.ns.biz.ns_instant import InstantNSService
from lcm.pub.database.models import NSInstModel
from lcm.pub.utils import restcall
+from lcm.pub.config import config
+
+nsd_model = json.dumps({
+ "model": json.dumps({
+ "vnfs": [{
+ "vnf_id": "vnf1",
+ "properties": {
+ "id": "vnfd1",
+ "nf_type": "xgw"
+ },
+ "dependencies": [{
+ "vl_id": "5"
+ }]
+ }],
+ "vls": [{
+ "vl_id": "5",
+ "properties": {}
+ }]
+ })
+})
class TestNsInstant(TestCase):
+
def setUp(self):
self.client = APIClient()
NSInstModel.objects.filter().delete()
@@ -41,24 +61,7 @@ class TestNsInstant(TestCase):
}
}]
}
- self.nsd_model = json.dumps({
- "model": json.dumps({
- "ns_vnfs": [{
- "vnf_id": "vnf1",
- "properties": {
- "id": "vnfd1",
- "nf_type": "xgw"
- },
- "dependencies": [{
- "vl_id": "5"
- }]
- }],
- "ns_vls": [{
- "vl_id": "5",
- "properties": {}
- }]
- })
- })
+ self.nsd_model = nsd_model
self.updated_nsd_model = {
"ns_vnfs": [{
"dependencies": [{
@@ -106,18 +109,18 @@ class TestNsInstant(TestCase):
pass
@mock.patch.object(restcall, 'call_req')
+ @mock.patch('lcm.pub.msapi.sdc_run_catalog.parse_nsd', MagicMock(return_value=nsd_model))
@mock.patch.object(BuildInWorkflowThread, 'run')
def test_ns_instantiate_when_succeed_to_enter_workflow(self, mock_run, mock_call_req):
+ config.WORKFLOW_OPTION = "buildin"
mock_call_req.side_effect = [
[0, self.nsd_model, '200'],
[0, self.vnfms, '200'],
[0, self.vnfm, '200']
]
resp = self.client.post(self.url, data=self.req_data, format='json')
- self.failUnlessEqual(status.HTTP_200_OK, resp.status_code)
+ self.assertEqual(status.HTTP_200_OK, resp.status_code)
self.assertIn("jobId", resp.data)
- upd_nsd_model = NSInstModel.objects.filter(id="2").first().nsd_model
- self.assertEqual(self.updated_nsd_model, json.loads(upd_nsd_model))
@mock.patch.object(InstantNSService, 'do_biz')
def test_ns_instantiate_normal(self, mock_do_biz):
@@ -132,3 +135,40 @@ class TestNsInstant(TestCase):
resp = self.client.post(self.url, data=self.req_data, format='json')
self.assertEqual(resp.status_code, status.HTTP_500_INTERNAL_SERVER_ERROR)
self.assertIn("error", resp.data)
+
+ nsd = json.dumps({"vnffgs": [], "inputs": {}, "pnfs": [{"pnf_id": "du", "networks": [], "description": "", "properties": {"descriptor_id": "zte_ran_du_0001", "descriptor_invariant_id": "1111", "provider": "ZTE", "version": "1.0", "function_description": "RAN DU Function", "name": "ZTE RAN DU"}}], "ns_exposed": {"external_cps": [], "forward_cps": []}, "graph": {"cucp": [], "du": [], "vl_flat_net": ["cucp", "cuup"], "vl_ext_net": ["cucp", "cuup"], "cuup": []}, "basepath": "c:\\users\\10030173\\appdata\\local\\temp\\tmpvg5vto", "vnfs": [{"networks": [{"key_name": "ran_ext_net", "vl_id": "vl_ext_net"}, {"key_name": "ran_flat_net", "vl_id": "vl_flat_net"}], "dependencies": [{"key_name": "ran_ext_net", "vl_id": "vl_ext_net"}, {"key_name": "ran_flat_net", "vl_id": "vl_flat_net"}], "vnf_id": "cucp", "description": "", "properties": {"descriptor_id": "zte_ran_cucp_0001", "flavour_description": "default", "software_version": "1.0.1", "flavour_id": "1", "descriptor_version": "1.0", "provider": "ZTE", "id": "zte_ran_cucp_0001", "vnfm_info": ["GVNFM-Driver"], "product_name": "ran"}}, {"networks": [{"key_name": "ran_ext_net", "vl_id": "vl_ext_net"}, {"key_name": "ran_flat_net", "vl_id": "vl_flat_net"}], "dependencies": [{"key_name": "ran_ext_net", "vl_id": "vl_ext_net"}, {"key_name": "ran_flat_net", "vl_id": "vl_flat_net"}], "vnf_id": "cuup", "description": "", "properties": {"descriptor_id": "zte_ran_cuup_0001", "flavour_description": "default", "software_version": "1.0.1", "flavour_id": "1", "descriptor_version": "1.0", "provider": "ZTE", "id": "zte_ran_cuup_0001", "vnfm_info": ["GVNFM-Driver"], "product_name": "ran"}}], "fps": [], "vls": [{"vl_id": "vl_ext_net", "description": "", "properties": {"connectivity_type": {"layer_protocol": "ipv4"}, "vl_profile": {"cidr": "10.0.0.0/24", "max_bit_rate_requirements": {"root": 10000000, "leaf": 10000000}, "networkName": "ran_ext_net", "min_bit_rate_requirements": {"root": 10000000, "leaf": 10000000}, "dhcpEnabled": False}, "version": "1.0.1"}}, {"vl_id": "vl_flat_net", "description": "", "properties": {"connectivity_type": {"layer_protocol": "ipv4"}, "vl_profile": {"cidr": "10.1.0.0/24", "max_bit_rate_requirements": {"root": 10000000, "leaf": 10000000}, "networkName": "ran_flat_net", "min_bit_rate_requirements": {"root": 10000000, "leaf": 10000000}, "dhcpEnabled": False}, "version": "1.0.1"}}], "nested_ns": [], "metadata": {"template_name": "RAN-NS", "template_version": "1.0", "template_author": "ZTE"}})
+ vnfminfo = {"vnfmId": "1"}
+
+ @mock.patch('lcm.ns.biz.ns_instantiate_flow.post_deal')
+ @mock.patch.object(restcall, 'call_req')
+ @mock.patch('lcm.ns.biz.ns_instantiate_flow.update_job')
+ @mock.patch('lcm.pub.msapi.sdc_run_catalog.parse_nsd', MagicMock(return_value=nsd))
+ @mock.patch('lcm.pub.msapi.extsys.select_vnfm', MagicMock(return_value=vnfminfo))
+ def test_ns_instantiate_with_pnf(self, mock_updata_job, mock_call_req, mock_post_deal):
+ config.WORKFLOW_OPTION = "grapflow"
+ NSInstModel(id="1", name="test_ns", nspackage_id="1", status="created").save()
+ ret = [0, json.JSONEncoder().encode({'jobId': "1", "responseDescriptor": {"progress": 100}}), '200']
+ mock_call_req.side_effect = [ret for i in range(1, 20)]
+ data = {
+ "additionalParamForNs": {
+ "sdnControllerId": "2"
+ },
+ "locationConstraints": [{
+ "vnfProfileId": "zte_ran_cucp_0001",
+ "locationConstraints": {"vimId": "3"}
+ },
+ {
+ "vnfProfileId": "zte_ran_cuup_0001",
+ "locationConstraints": {"vimId": "3"}
+ }
+ ],
+ "addpnfData": [{
+ "pnfId": 1,
+ "pnfName": "test_pnf",
+ "pnfdId": "zte_ran_du_0001",
+ "pnfProfileId": "du"
+ }]
+ }
+ # response = self.client.post("/api/nslcm/v1/ns/1/instantiate", data=data, format='json')
+ ack = InstantNSService(1, data).do_biz()
+ self.assertEqual(ack['status'], status.HTTP_200_OK)
diff --git a/lcm/ns/views/inst_ns_view.py b/lcm/ns/views/inst_ns_view.py
index 0f54429e..a43cf40b 100644
--- a/lcm/ns/views/inst_ns_view.py
+++ b/lcm/ns/views/inst_ns_view.py
@@ -38,6 +38,7 @@ class NSInstView(APIView):
logger.debug("request.data=%s", request.data)
req_serializer = InstantNsReqSerializer(data=request.data)
if not req_serializer.is_valid():
+ logger.debug("request.data is not valid,error: %s" % req_serializer.errors)
return Response({'error': req_serializer.errors},
status=status.HTTP_500_INTERNAL_SERVER_ERROR)
ack = InstantNSService(ns_instance_id, request.data).do_biz()
diff --git a/lcm/ns_pnfs/__init__.py b/lcm/ns_pnfs/__init__.py
index 342c2a8c..2c3379b3 100644
--- a/lcm/ns_pnfs/__init__.py
+++ b/lcm/ns_pnfs/__init__.py
@@ -11,3 +11,21 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+
+from lcm.workflows.graphflow.task.lcm_sync_rest_task import LcmSyncRestTask
+
+
+class CreatePnf(LcmSyncRestTask):
+ def __init__(self, *args):
+ super(CreatePnf, self).__init__(*args)
+ self.url = "/api/nslcm/v1/pnfs"
+ self.method = self.POST
+ self.timeout = 10
+
+
+class DeletePnf(LcmSyncRestTask):
+ def __init__(self, *args):
+ super(DeletePnf, self).__init__(*args)
+ self.url = "/api/nslcm/v1/pnfs/%s"
+ self.method = self.DELETE
+ self.timeout = 10
diff --git a/lcm/ns_vls/__init__.py b/lcm/ns_vls/__init__.py
index 5580cc3d..7200ef77 100644
--- a/lcm/ns_vls/__init__.py
+++ b/lcm/ns_vls/__init__.py
@@ -11,3 +11,21 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+
+from lcm.workflows.graphflow.task.lcm_sync_rest_task import LcmSyncRestTask
+
+
+class CreateVl(LcmSyncRestTask):
+ def __init__(self, *args):
+ super(CreateVl, self).__init__(*args)
+ self.url = "/api/nslcm/v1/ns/vls"
+ self.method = self.POST
+ self.timeout = 10
+
+
+class DeleteVl(LcmSyncRestTask):
+ def __init__(self, *args):
+ super(DeleteVl, self).__init__(*args)
+ self.url = "/api/nslcm/v1/ns/vls/%s"
+ self.method = self.DELETE
+ self.timeout = 10
diff --git a/lcm/ns_vls/biz/create_vls.py b/lcm/ns_vls/biz/create_vls.py
index f23ca4dd..06ef78b4 100644
--- a/lcm/ns_vls/biz/create_vls.py
+++ b/lcm/ns_vls/biz/create_vls.py
@@ -74,7 +74,7 @@ class CreateVls(object):
def get_data(self):
if isinstance(self.context, (unicode, str)):
self.context = json.JSONDecoder().decode(self.context)
- vl_info = self.get_vl_info(ignore_case_get(self.context, "ns_vls"))
+ vl_info = self.get_vl_info(ignore_case_get(self.context, "vls"))
self.vld_id = ignore_case_get(vl_info, "vl_id")
self.description = ignore_case_get(vl_info, "description")
self.vl_properties = ignore_case_get(vl_info, "properties")
diff --git a/lcm/ns_vnfs/__init__.py b/lcm/ns_vnfs/__init__.py
index 5580cc3d..51598b7b 100644
--- a/lcm/ns_vnfs/__init__.py
+++ b/lcm/ns_vnfs/__init__.py
@@ -11,3 +11,21 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+
+from lcm.workflows.graphflow.task.lcm_async_rest_task import LcmASyncRestTask
+
+
+class CreateVnf(LcmASyncRestTask):
+ def __init__(self, *args):
+ super(CreateVnf, self).__init__(*args)
+ self.url = "/api/nslcm/v1/ns/vnfs"
+ self.method = self.POST
+ self.timeout = 10
+
+
+class DeleteVnf(LcmASyncRestTask):
+ def __init__(self, *args):
+ super(DeleteVnf, self).__init__(*args)
+ self.url = "/api/nslcm/v1/ns/vnfs/%s"
+ self.method = self.DELETE
+ self.timeout = 10
diff --git a/lcm/ns_vnfs/biz/create_vnfs.py b/lcm/ns_vnfs/biz/create_vnfs.py
index ea755362..0d18290c 100644
--- a/lcm/ns_vnfs/biz/create_vnfs.py
+++ b/lcm/ns_vnfs/biz/create_vnfs.py
@@ -115,14 +115,14 @@ class CreateVnfs(Thread):
def get_vnfd_id(self):
if self.vnfd_id:
logger.debug("need not get vnfd_id")
- self.nsd_model = {'ns_vnfs': [], 'ns_vls': [], 'vnffgs': []}
+ self.nsd_model = {'vnfs': [], 'vls': [], 'vnffgs': []}
self.vnf_inst_name = self.vnfd_id + str(uuid.uuid4())
self.vnf_inst_name = self.vnf_inst_name[:30]
return
ns_inst_info = NSInstModel.objects.get(id=self.ns_inst_id)
self.ns_inst_name = ns_inst_info.name
self.nsd_model = json.loads(ns_inst_info.nsd_model)
- for vnf_info in self.nsd_model['ns_vnfs']:
+ for vnf_info in self.nsd_model['vnfs']:
if self.vnf_id == vnf_info['vnf_id']:
self.vnfd_id = vnf_info['properties']['id']
if 'name' not in vnf_info['properties']:
@@ -144,7 +144,7 @@ class CreateVnfs(Thread):
def get_virtual_link_info(self, vnf_id):
virtual_link_list, ext_virtual_link = [], []
- for vnf_info in self.nsd_model['ns_vnfs']:
+ for vnf_info in self.nsd_model['vnfs']:
if vnf_info['vnf_id'] != vnf_id:
continue
for network_info in vnf_info['networks']:
diff --git a/lcm/ns_vnfs/views/views.py b/lcm/ns_vnfs/views/views.py
index e3a6bbf9..412551f5 100644
--- a/lcm/ns_vnfs/views/views.py
+++ b/lcm/ns_vnfs/views/views.py
@@ -67,7 +67,6 @@ class NfView(APIView):
logger.error(req_serializer.errors)
data = {'ns_instance_id': ignore_case_get(request.data, 'nsInstanceId'),
- 'additional_param_for_ns': ignore_case_get(request.data, 'additionalParamForVnf'),
'additional_param_for_vnf': ignore_case_get(request.data, 'additionalParamForVnf'),
'vnf_index': ignore_case_get(request.data, 'vnfIndex')}
nf_inst_id, job_id = create_vnfs.prepare_create_params()
diff --git a/lcm/pub/config/config.py b/lcm/pub/config/config.py
index 9d63b254..5ba41e98 100644
--- a/lcm/pub/config/config.py
+++ b/lcm/pub/config/config.py
@@ -66,7 +66,7 @@ MR_PORT = '3904'
# [workflow]
DEPLOY_WORKFLOW_WHEN_START = False
-# Support option: activiti/wso2/buildin
+# Support option: activiti/wso2/buildin/grapflow
WORKFLOW_OPTION = "buildin"
# [OOF config]
diff --git a/lcm/pub/utils/restcall.py b/lcm/pub/utils/restcall.py
index 171e7265..929f0656 100644
--- a/lcm/pub/utils/restcall.py
+++ b/lcm/pub/utils/restcall.py
@@ -84,6 +84,7 @@ def call_req(base_url, user, passwd, auth_type, resource, method, content='', ad
def req_by_msb(resource, method, content=''):
+ logger.debug("resource: %s, method: %s, content: %s" % (resource, method, content))
base_url = "http://%s:%s/" % (MSB_SERVICE_IP, MSB_SERVICE_PORT)
return call_req(base_url, "", "", rest_no_auth, resource, method, content)
diff --git a/lcm/workflows/build_in.py b/lcm/workflows/build_in.py
index 9086bc8b..993efbe6 100644
--- a/lcm/workflows/build_in.py
+++ b/lcm/workflows/build_in.py
@@ -53,6 +53,7 @@ def run_ns_instantiate(input_data):
nsd_json = ignore_case_get(input_data, "object_context")
ns_param_json = ignore_case_get(input_data, "object_additionalParamForNs")
vnf_param_json = ignore_case_get(input_data, "object_additionalParamForVnf")
+ pnf_param_json = ignore_case_get(input_data, "object_additionalParamForPnf")
vl_count = int(ignore_case_get(input_data, "vlCount", 0))
vnf_count = int(ignore_case_get(input_data, "vnfCount", 0))
sfc_count = int(ignore_case_get(input_data, "sfcCount", 0))
@@ -69,6 +70,9 @@ def run_ns_instantiate(input_data):
[confirm_vnf_status(inst_id) for inst_id, _, _ in jobs]
+ update_job(job_id, 50, "true", "Start to create PNF")
+ create_pnf(pnf_param_json)
+
update_job(job_id, 70, "true", "Start to create SFC")
g_jobs_status[job_id] = [1 for i in range(sfc_count)]
jobs = [create_sfc(ns_inst_id, i + 1, nsd_json, sdnc_id) for i in range(sfc_count)]
@@ -186,6 +190,7 @@ class JobWaitThread(Thread):
"""
Job Wait
"""
+
def __init__(self, inst_id, job_id, ns_job_id, index):
Thread.__init__(self)
self.inst_id = inst_id
@@ -269,3 +274,16 @@ def confirm_sfc_status(sfc_inst_id):
sfc_status = ret[1]["sfcStatus"]
if sfc_status != "active":
raise NSLCMException("Status of SFC(%s) is not active" % sfc_inst_id)
+
+
+def create_pnf(pnf_param_json):
+ if pnf_param_json and len(pnf_param_json) > 0:
+ pnfs = json.JSONDecoder().decode(pnf_param_json)
+ for pnf in pnfs:
+ uri = "/api/nslcm/v1/pnfs"
+ method = "POST"
+ content = json.JSONEncoder().encode(pnf["input"]["content"])
+ ret = restcall.req_by_msb(uri, method, content)
+ if ret[0] != 0:
+ logger.error("Failed to call create_pnf(%s) result %s", content, ret)
+ raise NSLCMException("Failed to call create_pnf(%s)" % content)
diff --git a/lcm/workflows/graphflow/__init__.py b/lcm/workflows/graphflow/__init__.py
index 8e6d0ad0..694d82b7 100644
--- a/lcm/workflows/graphflow/__init__.py
+++ b/lcm/workflows/graphflow/__init__.py
@@ -12,14 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+
TASK_STAUS = (STARTED, PROCESSING, FINISHED, ERROR) = ("started", "processing", "finished", "error")
TIMEOUT_DEFAULT = 10
-
-# from lcm.workflows.graphflow.flow.flow import GraphFlow
-# from lcm.workflows.graphflow.task.task import Task
-# from lcm.workflows.graphflow.task.sync_task import SyncTask
-# from lcm.workflows.graphflow.task.sync_rest_task import SyncRestTask
-# from lcm.workflows.graphflow.task.async_task import AsyncTask
-# from lcm.workflows.graphflow.task.async_rest_task import ASyncRestTask
-# from lcm.workflows.graphflow.task.lcm_async_rest_task import LcmASyncRestTask
-# from lcm.workflows.graphflow.task.lcm_sync_rest_task import LcmSyncRestTask
diff --git a/lcm/workflows/graphflow/flow/__init__.py b/lcm/workflows/graphflow/flow/__init__.py
new file mode 100644
index 00000000..342c2a8c
--- /dev/null
+++ b/lcm/workflows/graphflow/flow/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2018 ZTE Corporation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/lcm/workflows/graphflow/flow/flow.py b/lcm/workflows/graphflow/flow/flow.py
new file mode 100644
index 00000000..1c5d09ba
--- /dev/null
+++ b/lcm/workflows/graphflow/flow/flow.py
@@ -0,0 +1,79 @@
+# Copyright 2018 ZTE Corporation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import threading
+import json
+from threading import Thread
+from lcm.workflows.graphflow.flow.graph import Graph
+from lcm.workflows.graphflow.flow.load import load_class_from_config
+from lcm.workflows.graphflow.flow.manager import TaskManager
+
+logger = logging.getLogger(__name__)
+
+
+def _execute_task(exec_class):
+ logger.debug("graph task class %s" % exec_class)
+ exec_class.execute()
+
+
+def create_instance(class_key, class_set, *args):
+ if class_key in class_set:
+ import_class = class_set[class_key]
+ return import_class(*args)
+ else:
+ return None
+
+
+class GraphFlow(Thread):
+ def __init__(self, graph, task_para_dict, config):
+ Thread.__init__(self)
+ self._graph = Graph(graph)
+ self._task_para_dict = task_para_dict
+ self._imp_class_set = load_class_from_config(config)
+ self.task_manager = TaskManager()
+
+ def run(self):
+ logger.debug("GraphFlow begin. graph:%s, task_para_dict:%s", self._graph, json.dumps(self._task_para_dict))
+ self.sort_nodes = self._graph.topo_sort()
+ for node in self.sort_nodes:
+ pre_nodes = self._graph.get_pre_nodes(node)
+ logger.debug("current node %s, pre_nodes %s" % (node, pre_nodes))
+ if len(pre_nodes) > 0:
+ self.task_manager.wait_tasks_done(pre_nodes)
+ if self.task_manager.is_all_task_finished(pre_nodes):
+ self.create_task(node)
+ logger.debug("GraphFlow create node %s", node)
+ else:
+ logger.debug("GraphFlow, end, error")
+ break
+ else:
+ self.create_task(node)
+ logger.debug("GraphFlow create node %s", node)
+ logger.debug("GraphFlow, end")
+
+ def create_task(self, node):
+ task_para = self._task_para_dict[node]
+ task_para["key"] = node
+ task_para["status"] = "started"
+ task_para["manager"] = self.task_manager
+ if "type" in task_para:
+ class_key = task_para["type"]
+ exec_task = create_instance(class_key, self._imp_class_set, task_para)
+ self.task_manager.add_task(node, exec_task)
+ thread_task = threading.Thread(target=_execute_task, args=(exec_task,))
+ thread_task.start()
+ return True
+ else:
+ return False
diff --git a/lcm/workflows/graphflow/flow/graph.py b/lcm/workflows/graphflow/flow/graph.py
new file mode 100644
index 00000000..334eea6a
--- /dev/null
+++ b/lcm/workflows/graphflow/flow/graph.py
@@ -0,0 +1,73 @@
+# Copyright 2018 ZTE Corporation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+from collections import deque
+from collections import OrderedDict
+
+logger = logging.getLogger(__name__)
+
+
+class Graph(object):
+
+ def __init__(self, graph_dict=None):
+ self.graph = OrderedDict()
+ if graph_dict:
+ for node, dep_nodes in graph_dict.iteritems():
+ self.add_node(node, dep_nodes)
+
+ def add_node(self, node, dep_nodes):
+ if node not in self.graph:
+ self.graph[node] = set()
+ if isinstance(dep_nodes, list):
+ for dep_node in dep_nodes:
+ if dep_node not in self.graph:
+ self.graph[dep_node] = set()
+ if dep_node not in self.graph[node]:
+ self.graph[node].add(dep_node)
+
+ def get_pre_nodes(self, node):
+ return [k for k in self.graph if node in self.graph[k]]
+
+ def topo_sort(self):
+ degree = {}
+ for node in self.graph:
+ degree[node] = 0
+ for node in self.graph:
+ for dependent in self.graph[node]:
+ degree[dependent] += 1
+ queue = deque()
+ for node in degree:
+ if degree[node] == 0:
+ queue.appendleft(node)
+ sort_list = []
+ while queue:
+ node = queue.pop()
+ sort_list.append(node)
+ for dependent in self.graph[node]:
+ degree[dependent] -= 1
+ if degree[dependent] == 0:
+ queue.appendleft(dependent)
+ if len(sort_list) == len(self.graph):
+ return sort_list
+ else:
+ return None
+
+ def to_dict(self):
+ dict = {}
+ for node, dependents in self.graph.iteritems():
+ dict[node] = []
+ for dep in dependents:
+ dict[node].append(dep)
+ return dict
diff --git a/lcm/workflows/graphflow/flow/load.py b/lcm/workflows/graphflow/flow/load.py
new file mode 100644
index 00000000..757be892
--- /dev/null
+++ b/lcm/workflows/graphflow/flow/load.py
@@ -0,0 +1,46 @@
+# Copyright 2018 ZTE Corporation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import importlib
+import logging
+
+
+logger = logging.getLogger(__name__)
+
+
+def load_module(imp_module):
+ try:
+ imp_module = importlib.import_module(imp_module)
+ except Exception:
+ logger.debug("load_module error: %s", imp_module)
+ imp_module = None
+ return imp_module
+
+
+def load_class(imp_module, imp_class):
+ try:
+ cls = getattr(imp_module, imp_class)
+ except Exception:
+ logger.debug("load_class error: %s", imp_class)
+ cls = None
+ return cls
+
+
+def load_class_from_config(config):
+ class_set = {}
+ for k, v in config.iteritems():
+ imp_module = load_module(v["module"])
+ cls = load_class(imp_module, v["class"])
+ class_set[k] = cls
+ return class_set
diff --git a/lcm/workflows/graphflow/flow/manager.py b/lcm/workflows/graphflow/flow/manager.py
new file mode 100644
index 00000000..f0c2cd67
--- /dev/null
+++ b/lcm/workflows/graphflow/flow/manager.py
@@ -0,0 +1,81 @@
+# Copyright 2018 ZTE Corporation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import datetime
+from lcm.workflows.graphflow import STARTED, PROCESSING, FINISHED, ERROR
+import logging
+import time
+
+logger = logging.getLogger(__name__)
+
+
+class TaskManager(object):
+
+ def __init__(self):
+ self.task_set = {}
+
+ def add_task(self, key, task, timeout=None):
+ self.task_set[key] = task
+ logger.debug("task_set %s" % self.task_set)
+
+ def update_task_status(self, key, status):
+ if key in self.task_set:
+ task = self.task_set[key]
+ task.update_task(status)
+
+ def update_task(self, key, task):
+ if key in self.task_set:
+ self.task_set[key] = task
+
+ def get_task(self, key):
+ if key in self.task_set:
+ return self.task_set[key]
+ else:
+ return None
+
+ def get_all_task(self):
+ return self.task_set
+
+ def is_all_task_finished(self, task_key_set=None):
+ states = []
+ if not task_key_set:
+ task_key_set = self.task_set.keys()
+ total = len(task_key_set)
+ for key in task_key_set:
+ if key in self.task_set:
+ states.append(self.task_set[key].status)
+ if len([state for state in states if state == FINISHED]) == total:
+ return True
+ else:
+ for key in task_key_set:
+ logger.debug("task key %s, status %s" % (key, self.task_set[key].status))
+ return False
+
+ def wait_tasks_done(self, task_key_set=None):
+ if task_key_set:
+ for key in task_key_set:
+ if key in self.task_set.keys():
+ task = self.task_set[key]
+ logger.debug("current wait task %s, endtime %s, status %s" % (task.key, task.endtime, task.status))
+ while task.endtime >= datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') and task.status in [STARTED, PROCESSING]:
+ time.sleep(1)
+ if task.status in [STARTED, PROCESSING]:
+ task.status = ERROR
+ logger.debug("wait task final status %s" % task.status)
+ else:
+ for task in self.task_set.itervalues():
+ while task.endtime >= datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') and task.status in [STARTED, PROCESSING]:
+ time.sleep(1)
+ if task.status in [STARTED, PROCESSING]:
+ task.status = ERROR
diff --git a/lcm/workflows/graphflow/tests/__init__.py b/lcm/workflows/graphflow/tests/__init__.py
new file mode 100644
index 00000000..342c2a8c
--- /dev/null
+++ b/lcm/workflows/graphflow/tests/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2018 ZTE Corporation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/lcm/workflows/graphflow/tests/graph_flow_tests.py b/lcm/workflows/graphflow/tests/graph_flow_tests.py
new file mode 100644
index 00000000..af0aab2c
--- /dev/null
+++ b/lcm/workflows/graphflow/tests/graph_flow_tests.py
@@ -0,0 +1,140 @@
+# Copyright 2018 ZTE Corporation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import unittest
+import mock
+import json
+from lcm.pub.utils import restcall
+from lcm.workflows.graphflow.flow.flow import GraphFlow
+
+
+config = {
+ "CreateSynVNF": {"module": "lcm.workflows.graphflow.tests.task_tests", "class": "CreateSynVNF"},
+ "CreateAsynVNF": {"module": "lcm.workflows.graphflow.tests.task_tests", "class": "CreateAsynVNF"},
+ "CreateASynRestVNF": {"module": "lcm.workflows.graphflow.tests.task_tests", "class": "CreateASynRestVNF"}
+}
+
+
+class test(object):
+ def execute(self, args):
+ print "test args %s" % args
+
+
+class GraphFlowTest(unittest.TestCase):
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_sync_task(self):
+ deploy_graph = {
+ "ran-cu-00": ["ran-du-00"],
+ "ran-du-00": [],
+ }
+ TaskSet = {
+ 'ran-cu-00': {
+ "type": "CreateSynVNF",
+ "input": {
+ "nsInstanceId": 1,
+ "vnfId": 1
+ },
+ "timeOut": 10
+ },
+ 'ran-du-00': {
+ "type": "CreateSynVNF",
+ "input": {
+ "nsInstanceId": 1,
+ "vnfId": 1
+ },
+ "timeOut": 10
+ }
+ }
+ gf = GraphFlow(deploy_graph, TaskSet, config)
+ gf.start()
+ gf.join()
+ gf.task_manager.wait_tasks_done(gf.sort_nodes)
+ task_set = gf.task_manager.get_all_task()
+ for task in task_set.itervalues():
+ self.assertEqual(task.FINISHED, task.status)
+
+ def test_async_task(self):
+ deploy_graph = {
+ "ran-cu-01": ["ran-du-01"],
+ "ran-du-01": [],
+ }
+ TaskSet = {
+ 'ran-cu-01': {
+ "type": "CreateAsynVNF",
+ "input": {
+ "nsInstanceId": 1,
+ "vnfId": 1
+ },
+ "timeOut": 10
+ },
+ 'ran-du-01': {
+ "type": "CreateAsynVNF",
+ "input": {
+ "nsInstanceId": 1,
+ "vnfId": 1
+ },
+ "timeOut": 10
+ }
+ }
+ gf = GraphFlow(deploy_graph, TaskSet, config)
+ gf.start()
+ gf.join()
+ gf.task_manager.wait_tasks_done(gf.sort_nodes)
+ task_set = gf.task_manager.get_all_task()
+ for task in task_set.itervalues():
+ self.assertEqual(task.FINISHED, task.status)
+
+ @mock.patch.object(restcall, 'call_req')
+ def test_async_rest_task(self, mock_call_req):
+ mock_call_req.return_value = [0, json.JSONEncoder().encode({
+ 'jobId': "1",
+ "responseDescriptor": {"progress": 100}
+ }), '200']
+
+ deploy_graph = {
+ "ran-cu-02": ["ran-du-02"],
+ "ran-du-02": [],
+ }
+ TaskSet = {
+ 'ran-cu-02': {
+ "type": "CreateASynRestVNF",
+ "input": {
+ "url": "/test/",
+ "method": "POST",
+ "content": {}
+ },
+ "timeOut": 10
+ },
+ 'ran-du-02': {
+ "type": "CreateASynRestVNF",
+ "input": {
+ "url": "/test/",
+ "method": "POST",
+ "content": {}
+ },
+ "timeOut": 10
+ }
+ }
+ gf = GraphFlow(deploy_graph, TaskSet, config)
+ gf.start()
+ gf.join()
+ gf.task_manager.wait_tasks_done(gf.sort_nodes)
+ task_set = gf.task_manager.get_all_task()
+ for task in task_set.itervalues():
+ self.assertEqual(task.FINISHED, task.status)
diff --git a/lcm/workflows/graphflow/tests/graph_tests.py b/lcm/workflows/graphflow/tests/graph_tests.py
new file mode 100644
index 00000000..894c232d
--- /dev/null
+++ b/lcm/workflows/graphflow/tests/graph_tests.py
@@ -0,0 +1,38 @@
+# Copyright 2018 ZTE Corporation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+
+from django.test import TestCase
+from lcm.workflows.graphflow.flow.graph import Graph
+
+logger = logging.getLogger(__name__)
+
+
+class TestToscaparser(TestCase):
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_graph(self):
+ data = {
+ "cucp": [],
+ "du": [],
+ "vl_flat_net": ["cucp", "cuup"],
+ "vl_ext_net": ["cucp", "cuup"],
+ "cuup": []
+ }
+ graph = Graph(data)
+ self.assertEqual(['vl_ext_net', 'vl_flat_net'].sort(), graph.get_pre_nodes("cucp").sort())
diff --git a/lcm/workflows/graphflow/tests/task_tests.py b/lcm/workflows/graphflow/tests/task_tests.py
new file mode 100644
index 00000000..24e06662
--- /dev/null
+++ b/lcm/workflows/graphflow/tests/task_tests.py
@@ -0,0 +1,49 @@
+# Copyright 2018 ZTE Corporation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from lcm.workflows.graphflow.task.async_task import AsyncTask
+from lcm.workflows.graphflow.task.sync_task import SyncTask
+from lcm.workflows.graphflow.task.lcm_async_rest_task import LcmASyncRestTask
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+class CreateSynVNF(SyncTask):
+ def __init__(self, *args):
+ super(CreateSynVNF, self).__init__(*args)
+
+ def run(self):
+ logger.debug("test CreateSynVNF %s" % self.key)
+ return self.FINISHED, {}
+
+
+class CreateAsynVNF(AsyncTask):
+ def __init__(self, *args):
+ super(CreateAsynVNF, self).__init__(*args)
+
+ def run(self):
+ logger.debug("test CreateAsynVNF %s" % self.key)
+ return self.PROCESSING, None
+
+ def get_ext_status(self):
+ return self.FINISHED
+
+
+class CreateASynRestVNF(LcmASyncRestTask):
+
+ def __init__(self, *args):
+ super(CreateASynRestVNF, self).__init__(*args)
+ self.url = "/api/nslcm/v1/vnfs"
+ self.method = self.POST