From 410c12fddf993c1704830a76a712d4833b88c7fa Mon Sep 17 00:00:00 2001 From: fujinhua Date: Wed, 23 Aug 2017 17:34:41 +0800 Subject: Add create vnf of build-in workflow Change-Id: Idd15d70ec322ee00f27d6e233ba81f18570ad9bf Issue-Id: VFC-132 Signed-off-by: fujinhua --- lcm/workflows/build_in.py | 130 ++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 109 insertions(+), 21 deletions(-) diff --git a/lcm/workflows/build_in.py b/lcm/workflows/build_in.py index b24c7af8..1e4b292f 100644 --- a/lcm/workflows/build_in.py +++ b/lcm/workflows/build_in.py @@ -13,6 +13,8 @@ # limitations under the License. import logging import traceback +from threading import Thread +import time from lcm.pub.utils.syscomm import fun_name from lcm.pub.utils.values import ignore_case_get @@ -22,6 +24,9 @@ from lcm.pub.exceptions import NSLCMException logger = logging.getLogger(__name__) RESULT_OK, RESULT_NG = "0", "1" +JOB_ERROR = 255 + +g_jobs_status = {} """ format of input_data @@ -48,25 +53,34 @@ def run_ns_instantiate(input_data): vnf_count = ignore_case_get(input_data, "vnfCount") sfc_count = ignore_case_get(input_data, "sfcCount") sdnc_id = ignore_case_get(input_data, "sdnControllerId") - - update_job(job_id, 10, "0", "Start to create VL") - for i in range(vl_count): - create_vl(ns_inst_id, i + 1, nsd_json, ns_param_json) + g_jobs_status[job_id] = [1 for i in range(vnf_count)] + try: + update_job(job_id, 10, "0", "Start to create VL") + for i in range(vl_count): + create_vl(ns_inst_id, i + 1, nsd_json, ns_param_json) + + update_job(job_id, 30, "0", "Start to create VNF") + jobs = [create_vnf(ns_inst_id, i + 1, vnf_param_json) for i in range(vnf_count)] + wait_until_jobs_done(job_id, jobs) - update_job(job_id, 30, "0", "Start to create VNF") - for i in range(vnf_count): - create_vnf() - wait_until_job_done() + update_job(job_id, 70, "0", "Start to create SFC") + for i in range(sfc_count): + create_sfc() - update_job(job_id, 70, "0", "Start to create SFC") - for i in range(sfc_count): - create_sfc() - wait_until_job_done() + update_job(job_id, 90, "0", "Start to post deal") + post_deal() + + update_job(job_id, 100, "0", "Create NS successfully.") + except NSLCMException as e: + logger.error("Failded to Create NS: %s", e.message) + update_job(job_id, JOB_ERROR, "255", "Failded to Create NS.") + except: + logger.error(traceback.format_exc()) + update_job(job_id, JOB_ERROR, "255", "Failded to Create NS.") + finally: + g_jobs_status.pop(job_id) - update_job(job_id, 90, "0", "Start to post deal") - post_deal() - update_job(job_id, 100, "0", "Create NS successfully.") def create_vl(ns_inst_id, vl_index, nsd, ns_param): uri = "api/nslcm/v1/ns/vls" @@ -91,9 +105,23 @@ def create_vl(ns_inst_id, vl_index, nsd, ns_param): logger.debug("Create VL(%s) successfully.", vl_id) -def create_vnf(): - # TODO: - pass +def create_vnf(ns_inst_id, vnf_index, nf_param): + uri = "/ns/vnfs" + data = json.JSONEncoder().encode({ + "nsInstanceId": ns_inst_id, + "vnfIndex": vnf_index, + "additionalParamForVnf": nf_param + }) + + ret = restcall.req_by_msb(uri, "POST", data) + if ret[0] != 0: + logger.error("Failed to call create_vnf(%s): %s", vnf_index, ret[1]) + raise NSLCMException("Failed to call create_vnf(index is %s)" % vnf_index) + + vnf_inst_id = ret[1]["vnfInstId"] + job_id = ret[1]["jobId"] + logger.debug("Create VNF(%s) started.", vnf_inst_id) + return vnf_inst_id, job_id, vnf_index - 1 def create_sfc(): # TODO: @@ -112,6 +140,66 @@ def update_job(job_id, progress, errcode, desc): }) restcall.req_by_msb(uri, "POST", data) -def wait_until_job_done(): - # TODO: - pass +class JobWaitThread(Thread): + """ + Job Wait + """ + + def __init__(self, inst_id, job_id, index): + Thread.__init__(self) + self.inst_id = inst_id + self.job_id = job_id + self.index = index + self.retry_count = 60 + self.interval_second = 3 + + def run(self): + count = 0 + response_id, new_response_id = 0, 0 + job_end_normal, job_timeout = False, True + while count < self.retry_count: + count = count + 1 + time.sleep(self.interval_second) + uri = "/api/nslcm/v1/jobs/%s?responseId=%s" % (self.job_id, response_id) + ret = restcall.req_by_msb(uri, "GET") + if ret[0] != 0: + logger.error("Failed to query job: %s:%s", ret[2], ret[1]) + continue + job_result = json.JSONDecoder().decode(ret[1]) + if "responseDescriptor" not in job_result: + logger.error("Job(%s) does not exist.", self.job_id) + continue + progress = job_result["responseDescriptor"]["progress"] + new_response_id = job_result["responseDescriptor"]["responseId"] + job_desc = job_result["responseDescriptor"]["statusDescription"] + if new_response_id != response_id: + logger.debug("%s:%s:%s", progress, new_response_id, job_desc) + response_id = new_response_id + count = 0 + if progress == JOB_ERROR: + job_timeout = False + logger.error("Job(%s) failed: %s", self.job_id, job_desc) + break + elif progress == 100: + job_end_normal, job_timeout = True, False + logger.info("Job(%s) ended normally", self.job_id) + break + if job_timeout: + logger.error("Job(%s) timeout", self.job_id) + if self.job_id in g_jobs_status: + if job_end_normal: + g_jobs_status[self.job_id][self.index] = 0 + +def wait_until_jobs_done(g_job_id, jobs): + job_threads = [] + for inst_id, job_id, index in jobs: + job_threads.append(JobWaitThread(inst_id, job_id, index)) + for t in job_threads: + t.start() + for t in job_threads: + t.join() + if g_job_id in g_jobs_status: + if sum(g_jobs_status[g_job_id]) > 0: + raise NSLCMException("Some jobs failed!") + + -- cgit 1.2.3-korg