summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorfujinhua <fu.jinhua@zte.com.cn>2017-08-23 17:34:41 +0800
committerfujinhua <fu.jinhua@zte.com.cn>2017-08-23 17:34:41 +0800
commit410c12fddf993c1704830a76a712d4833b88c7fa (patch)
tree18c84876da743b3b529112c97a9993044c2cf45e
parent2f0b58cf0e2fbe212e8912e04f00124bb39acec8 (diff)
Add create vnf of build-in workflow
Change-Id: Idd15d70ec322ee00f27d6e233ba81f18570ad9bf Issue-Id: VFC-132 Signed-off-by: fujinhua <fu.jinhua@zte.com.cn>
-rw-r--r--lcm/workflows/build_in.py130
1 files changed, 109 insertions, 21 deletions
diff --git a/lcm/workflows/build_in.py b/lcm/workflows/build_in.py
index b24c7af8..1e4b292f 100644
--- a/lcm/workflows/build_in.py
+++ b/lcm/workflows/build_in.py
@@ -13,6 +13,8 @@
# limitations under the License.
import logging
import traceback
+from threading import Thread
+import time
from lcm.pub.utils.syscomm import fun_name
from lcm.pub.utils.values import ignore_case_get
@@ -22,6 +24,9 @@ from lcm.pub.exceptions import NSLCMException
logger = logging.getLogger(__name__)
RESULT_OK, RESULT_NG = "0", "1"
+JOB_ERROR = 255
+
+g_jobs_status = {}
"""
format of input_data
@@ -48,25 +53,34 @@ def run_ns_instantiate(input_data):
vnf_count = ignore_case_get(input_data, "vnfCount")
sfc_count = ignore_case_get(input_data, "sfcCount")
sdnc_id = ignore_case_get(input_data, "sdnControllerId")
-
- update_job(job_id, 10, "0", "Start to create VL")
- for i in range(vl_count):
- create_vl(ns_inst_id, i + 1, nsd_json, ns_param_json)
+ g_jobs_status[job_id] = [1 for i in range(vnf_count)]
+ try:
+ update_job(job_id, 10, "0", "Start to create VL")
+ for i in range(vl_count):
+ create_vl(ns_inst_id, i + 1, nsd_json, ns_param_json)
+
+ update_job(job_id, 30, "0", "Start to create VNF")
+ jobs = [create_vnf(ns_inst_id, i + 1, vnf_param_json) for i in range(vnf_count)]
+ wait_until_jobs_done(job_id, jobs)
- update_job(job_id, 30, "0", "Start to create VNF")
- for i in range(vnf_count):
- create_vnf()
- wait_until_job_done()
+ update_job(job_id, 70, "0", "Start to create SFC")
+ for i in range(sfc_count):
+ create_sfc()
- update_job(job_id, 70, "0", "Start to create SFC")
- for i in range(sfc_count):
- create_sfc()
- wait_until_job_done()
+ update_job(job_id, 90, "0", "Start to post deal")
+ post_deal()
+
+ update_job(job_id, 100, "0", "Create NS successfully.")
+ except NSLCMException as e:
+ logger.error("Failded to Create NS: %s", e.message)
+ update_job(job_id, JOB_ERROR, "255", "Failded to Create NS.")
+ except:
+ logger.error(traceback.format_exc())
+ update_job(job_id, JOB_ERROR, "255", "Failded to Create NS.")
+ finally:
+ g_jobs_status.pop(job_id)
- update_job(job_id, 90, "0", "Start to post deal")
- post_deal()
- update_job(job_id, 100, "0", "Create NS successfully.")
def create_vl(ns_inst_id, vl_index, nsd, ns_param):
uri = "api/nslcm/v1/ns/vls"
@@ -91,9 +105,23 @@ def create_vl(ns_inst_id, vl_index, nsd, ns_param):
logger.debug("Create VL(%s) successfully.", vl_id)
-def create_vnf():
- # TODO:
- pass
+def create_vnf(ns_inst_id, vnf_index, nf_param):
+ uri = "/ns/vnfs"
+ data = json.JSONEncoder().encode({
+ "nsInstanceId": ns_inst_id,
+ "vnfIndex": vnf_index,
+ "additionalParamForVnf": nf_param
+ })
+
+ ret = restcall.req_by_msb(uri, "POST", data)
+ if ret[0] != 0:
+ logger.error("Failed to call create_vnf(%s): %s", vnf_index, ret[1])
+ raise NSLCMException("Failed to call create_vnf(index is %s)" % vnf_index)
+
+ vnf_inst_id = ret[1]["vnfInstId"]
+ job_id = ret[1]["jobId"]
+ logger.debug("Create VNF(%s) started.", vnf_inst_id)
+ return vnf_inst_id, job_id, vnf_index - 1
def create_sfc():
# TODO:
@@ -112,6 +140,66 @@ def update_job(job_id, progress, errcode, desc):
})
restcall.req_by_msb(uri, "POST", data)
-def wait_until_job_done():
- # TODO:
- pass
+class JobWaitThread(Thread):
+ """
+ Job Wait
+ """
+
+ def __init__(self, inst_id, job_id, index):
+ Thread.__init__(self)
+ self.inst_id = inst_id
+ self.job_id = job_id
+ self.index = index
+ self.retry_count = 60
+ self.interval_second = 3
+
+ def run(self):
+ count = 0
+ response_id, new_response_id = 0, 0
+ job_end_normal, job_timeout = False, True
+ while count < self.retry_count:
+ count = count + 1
+ time.sleep(self.interval_second)
+ uri = "/api/nslcm/v1/jobs/%s?responseId=%s" % (self.job_id, response_id)
+ ret = restcall.req_by_msb(uri, "GET")
+ if ret[0] != 0:
+ logger.error("Failed to query job: %s:%s", ret[2], ret[1])
+ continue
+ job_result = json.JSONDecoder().decode(ret[1])
+ if "responseDescriptor" not in job_result:
+ logger.error("Job(%s) does not exist.", self.job_id)
+ continue
+ progress = job_result["responseDescriptor"]["progress"]
+ new_response_id = job_result["responseDescriptor"]["responseId"]
+ job_desc = job_result["responseDescriptor"]["statusDescription"]
+ if new_response_id != response_id:
+ logger.debug("%s:%s:%s", progress, new_response_id, job_desc)
+ response_id = new_response_id
+ count = 0
+ if progress == JOB_ERROR:
+ job_timeout = False
+ logger.error("Job(%s) failed: %s", self.job_id, job_desc)
+ break
+ elif progress == 100:
+ job_end_normal, job_timeout = True, False
+ logger.info("Job(%s) ended normally", self.job_id)
+ break
+ if job_timeout:
+ logger.error("Job(%s) timeout", self.job_id)
+ if self.job_id in g_jobs_status:
+ if job_end_normal:
+ g_jobs_status[self.job_id][self.index] = 0
+
+def wait_until_jobs_done(g_job_id, jobs):
+ job_threads = []
+ for inst_id, job_id, index in jobs:
+ job_threads.append(JobWaitThread(inst_id, job_id, index))
+ for t in job_threads:
+ t.start()
+ for t in job_threads:
+ t.join()
+ if g_job_id in g_jobs_status:
+ if sum(g_jobs_status[g_job_id]) > 0:
+ raise NSLCMException("Some jobs failed!")
+
+