diff options
Diffstat (limited to 'share')
-rw-r--r-- | share/common/msapi/helper.py | 86 | ||||
-rw-r--r-- | share/newton_base/registration/registration.py | 9 | ||||
-rw-r--r-- | share/starlingx_base/registration/registration.py | 12 | ||||
-rw-r--r-- | share/starlingx_base/resource/infra_workload.py | 98 |
4 files changed, 154 insertions, 51 deletions
diff --git a/share/common/msapi/helper.py b/share/common/msapi/helper.py index bb07b097..bd8932f3 100644 --- a/share/common/msapi/helper.py +++ b/share/common/msapi/helper.py @@ -18,10 +18,13 @@ import threading import datetime import time +import traceback + # from common.exceptions import VimDriverNewtonException from common.utils import restcall from rest_framework import status +from django.core.cache import cache logger = logging.getLogger(__name__) @@ -64,8 +67,12 @@ class Helper(object): # Helper of AAI resource access class MultiCloudAAIHelper(object): + ''' + Helper to register infrastructure resource into AAI + ''' def __init__(self, multicloud_prefix, aai_base_url): + logger.debug("MultiCloudAAIHelper __init__ traceback: %s" % traceback.format_exc()) self.proxy_prefix = multicloud_prefix self.aai_base_url = aai_base_url self._logger = logger @@ -159,7 +166,7 @@ class MultiCloudAAIHelper(object): # thread helper class MultiCloudThreadHelper(object): ''' - thread to register infrastructure resource into AAI + Helper to manage LCM of an offloading thread ''' @staticmethod @@ -172,7 +179,10 @@ class MultiCloudThreadHelper(object): epoch_time_sec = time.mktime(now_time.timetuple()) return int(epoch_time_sec * 1e6 + now_time.microsecond) - def __init__(self): + def __init__(self, name=""): + # debug: dump the callstack to determine the callstack, hence the lcm + # logger.debug("MultiCloudThreadHelper __init__: %s" % traceback.format_exc()) + # format of a backlog item: # { # "id": unique string to identify this item in backlog, @@ -189,7 +199,12 @@ class MultiCloudThreadHelper(object): self.expired_backlog = {} self.lock = threading.Lock() self.state_ = 0 # 0: stopped, 1: started - self.thread = None + self.cache_prefix = "bi_"+name+"_" + self.cache_expired_prefix = "biex_"+name+"_" + + self.thread = MultiCloudThreadHelper.HelperThread(self) + self.thread.start() + def state(self): return self.state_ @@ -198,8 +213,8 @@ class MultiCloudThreadHelper(object): self.lock.acquire() if 0 == self.state_: self.state_ = 1 - self.thread = MultiCloudThreadHelper.HelperThread(self) - self.thread.start() + # self.thread = MultiCloudThreadHelper.HelperThread(self) + # self.thread.start() else: pass self.lock.release() @@ -208,35 +223,67 @@ class MultiCloudThreadHelper(object): self.state_ = 0 def add(self, backlog_item): + cache_for_query = None if not hasattr(backlog_item, "worker"): return None if not hasattr(backlog_item, "id"): backlog_item["id"] = str(uuid.uuid1()) + else: + cache_for_query = { + "id": backlog_item["id"], + "status": backlog_item.get("status", None) + } + if not hasattr(backlog_item, "repeat"): backlog_item["repeat"] = 0 backlog_item["timestamp"] = 0 # self.lock.acquire() # make sure there is no identical backlog in expired backlog + if cache_for_query: + cache.set(self.cache_prefix + backlog_item["id"], + json.dumps(cache_for_query), 3600 * 24) + self.expired_backlog.pop(backlog_item["id"], None) self.backlog.update(backlog_item["id"], backlog_item) # self.lock.release() return len(self.backlog) def get(self, backlog_id): - self.backlog.get(backlog_id, None) or self.expired_backlog.get(backlog_id, None) + item = self.backlog.get(backlog_id, None) or self.expired_backlog.get(backlog_id, None) + + # check the cache + if not item: + cache_for_query_str = cache.get(self.cache_prefix + backlog_id) + if cache_for_query_str: + item = json.loads(cache_for_query_str) + else: + cache_for_query_str = cache.get(self.cache_expired_prefix + backlog_id) + if cache_for_query_str: + item = json.loads(cache_for_query_str) + return item # check if the backlog item is in expired backlog def expired(self, backlog_id): if not self.backlog.get(backlog_id, None): if self.expired_backlog.get(backlog_id, None): return True + + # check the cache + cache_for_query_str = cache.get(self.cache_prefix + backlog_id) + if not cache_for_query_str: + cache_for_query_str = cache.get(self.cache_expired_prefix + backlog_id) + if cache_for_query_str: + return True + return False def remove(self, backlog_id): # self.lock.acquire() self.backlog.pop(backlog_id, None) self.expired_backlog.pop(backlog_id, None) + cache.delete(self.cache_prefix + backlog_id) + cache.delete(self.cache_expired_prefix + backlog_id) # self.lock.release() def reset(self): @@ -245,8 +292,8 @@ class MultiCloudThreadHelper(object): self.expired_backlog.clear() # self.lock.release() - def count(self): - return len(self.backlog) + #def count(self): + # return len(self.backlog) class HelperThread(threading.Thread): def __init__(self, owner): @@ -254,11 +301,13 @@ class MultiCloudThreadHelper(object): self.daemon = True self.duration = 0 self.owner = owner + # debug: dump the callstack to determine the callstack, hence the lcm + logger.debug("HelperThread __init__ : %s" % traceback.format_exc()) def run(self): logger.debug("Start processing backlogs") nexttimer = 0 - while self.owner.state_ == 1 and self.owner.count() > 0: + while self.owner.state_ == 1: # and self.owner.count() > 0: if nexttimer > 1000000: # sleep in case of interval > 1 second time.sleep(nexttimer // 1000000) @@ -285,14 +334,33 @@ class MultiCloudThreadHelper(object): item["status"] = worker(payload) or 0 except Exception as e: item["status"] = e.message + cache_item_for_query = { + "id": item["id"], + "status": item["status"] + } if item.get("repeat", 0) == 0: self.owner.remove(backlog_id) # keep only the id and status self.owner.expired_backlog[backlog_id] = {"status": item["status"]} + + #update cache + try: + cache.set(self.owner.cache_expired_prefix + cache_item_for_query["id"], cache_item_for_query, 3600*24) + cache.delete(self.owner.cache_prefix + cache_item_for_query["id"]) + except Exception as e: + logger.error(e.message) else: item["timestamp"] = now + #update cache + try: + cache.set(self.owner.cache_prefix + cache_item_for_query["id"], cache_item_for_query, 3600*24) + except Exception as e: + logger.error(e.message) pass # end of loop + # while True: + # logger.debug("thread sleep for 5 seconds") + # time.sleep(5) # wait forever, testonly logger.debug("stop processing backlogs") self.owner.state_ = 0 # end of processing diff --git a/share/newton_base/registration/registration.py b/share/newton_base/registration/registration.py index 6e8f8809..8789c388 100644 --- a/share/newton_base/registration/registration.py +++ b/share/newton_base/registration/registration.py @@ -24,7 +24,8 @@ from rest_framework.views import APIView from common.exceptions import VimDriverNewtonException from common.msapi import extsys -from common.msapi import helper +from common.msapi.helper import MultiCloudThreadHelper +from common.msapi.helper import MultiCloudAAIHelper from common.utils import restcall from newton_base.util import VimDriverUtils @@ -34,12 +35,13 @@ logger = logging.getLogger(__name__) class Registry(APIView): def __init__(self): + # logger.debug("Registry __init__: %s" % traceback.format_exc()) if not hasattr(self, "_logger"): self._logger = logger if not hasattr(self, "register_thread"): # dedicate thread to offload vim registration process - self.register_thread = helper.MultiCloudThreadHelper() + self.register_thread = MultiCloudThreadHelper() if not hasattr(self, "register_helper") or not self.register_helper: if not hasattr(self, "proxy_prefix"): @@ -131,12 +133,13 @@ class Registry(APIView): -class RegistryHelper(helper.MultiCloudAAIHelper): +class RegistryHelper(MultiCloudAAIHelper): ''' Helper code to discover and register a cloud region's resource ''' def __init__(self, multicloud_prefix, aai_base_url): + # logger.debug("RegistryHelper __init__: %s" % traceback.format_exc()) self.proxy_prefix = multicloud_prefix self.aai_base_url = aai_base_url self._logger = logger diff --git a/share/starlingx_base/registration/registration.py b/share/starlingx_base/registration/registration.py index 9d1846d7..21ab1948 100644 --- a/share/starlingx_base/registration/registration.py +++ b/share/starlingx_base/registration/registration.py @@ -32,13 +32,14 @@ from django.core.cache import cache logger = logging.getLogger(__name__) # global var: Audition thread -gAZCapAuditThread = helper.MultiCloudThreadHelper() +gAZCapAuditThread = helper.MultiCloudThreadHelper("azcap") # DEBUG=True # APIv0 handler upgrading: leverage APIv1 handler class APIv0Registry(newton_registration.Registry): def __init__(self): + # logger.error(traceback.format_exc()) self.register_helper = RegistryHelper(settings.MULTICLOUD_PREFIX, settings.AAI_BASE_URL) super(APIv0Registry, self).__init__() # self._logger = logger @@ -52,10 +53,10 @@ class APIv0Registry(newton_registration.Registry): settings.AAI_BASE_URL ) backlog_item = { - "id": vimid, - "worker": worker_self.azcap_audit, - "payload": (worker_self, vimid), - "repeat": 5*1000000, # repeat every 5 seconds + "id": vimid, + "worker": worker_self.azcap_audit, + "payload": (worker_self, vimid), + "repeat": 10*1000000, # repeat every 10 seconds } gAZCapAuditThread.add(backlog_item) if 0 == gAZCapAuditThread.state(): @@ -127,6 +128,7 @@ class RegistryHelper(newton_registration.RegistryHelper): Helper code to discover and register a cloud region's resource ''' def __init__(self, multicloud_prefix, aai_base_url): + # logger.error(traceback.format_exc()) super(RegistryHelper, self).__init__(multicloud_prefix, aai_base_url) # self._logger = logger diff --git a/share/starlingx_base/resource/infra_workload.py b/share/starlingx_base/resource/infra_workload.py index 8fea68f3..7d5873df 100644 --- a/share/starlingx_base/resource/infra_workload.py +++ b/share/starlingx_base/resource/infra_workload.py @@ -20,6 +20,7 @@ from rest_framework import status from rest_framework.response import Response from common.msapi import extsys from common.msapi.helper import Helper as helper +from common.msapi.helper import MultiCloudThreadHelper from newton_base.resource import infra_workload as newton_infra_workload from newton_base.resource import infra_workload_helper as infra_workload_helper @@ -29,7 +30,7 @@ logger = logging.getLogger(__name__) # global var: Audition thread # the id is the workloadid, which implies post to workloadid1 followed by delete workloadid1 # will replace the previous backlog item -gInfraWorkloadThread = helper.MultiCloudThreadHelper() +gInfraWorkloadThread = MultiCloudThreadHelper("infw") class InfraWorkload(newton_infra_workload.InfraWorkload): def __init__(self): @@ -46,6 +47,7 @@ class InfraWorkload(newton_infra_workload.InfraWorkload): "workload_status": "WORKLOAD_CREATE_FAIL", "workload_status_reason": "Exception occurs" } + status_code = status.HTTP_500_INTERNAL_SERVER_ERROR try: worker_self = InfraWorkloadHelper( @@ -104,15 +106,20 @@ class InfraWorkload(newton_infra_workload.InfraWorkload): (13, "WORKLOAD_DELETE_FAIL", "Unexpected:status not found in backlog item") ) - progress_code = progress[0] - progress_status = progress[1] - progress_msg = progress[2] - resp_template["workload_status"] = progress_status - resp_template["workload_status_reason"] = progress_msg - return Response(data=resp_template, - status=status.HTTP_200_ACCEPTED - if progress_code == 0 else progress_code - ) + + try: + progress_code = progress[0] + progress_status = progress[1] + progress_msg = progress[2] + resp_template["workload_status"] = progress_status + resp_template["workload_status_reason"] = progress_msg + + status_code = status.HTTP_200_ACCEPTED\ + if progress_code == 0 else progress_code + except Exception as e: + resp_template["workload_status_reason"] = progress + + return Response(data=resp_template, status=status_code) except Exception as e: errmsg = e.message self._logger.error(errmsg) @@ -130,6 +137,7 @@ class InfraWorkload(newton_infra_workload.InfraWorkload): "workload_status": "WORKLOAD_GET_FAIL", "workload_status_reason": "Exception occurs" } + status_code = status.HTTP_500_INTERNAL_SERVER_ERROR try: if workloadid == "": @@ -140,7 +148,7 @@ class InfraWorkload(newton_infra_workload.InfraWorkload): ) # now query the progress - status_code = status.HTTP_200_OK + backlog_item = gInfraWorkloadThread.get(workloadid) if not backlog_item: # backlog item not found, so check the stack status @@ -160,19 +168,20 @@ class InfraWorkload(newton_infra_workload.InfraWorkload): (13, "WORKLOAD_DELETE_FAIL", "Unexpected:status not found in backlog item") ) - progress_code = progress[0] - progress_status = progress[1] - progress_msg = progress[2] - # if gInfraWorkloadThread.expired(workloadid): - # gInfraWorkloadThread.remove(workloadid) - resp_template["workload_status"] = progress_status - resp_template["workload_status_reason"] = progress_msg - status_code = status.HTTP_200_OK\ - if progress_code == 0 else progress_code + try: + progress_code = progress[0] + progress_status = progress[1] + progress_msg = progress[2] + # if gInfraWorkloadThread.expired(workloadid): + # gInfraWorkloadThread.remove(workloadid) + resp_template["workload_status"] = progress_status + resp_template["workload_status_reason"] = progress_msg + status_code = status.HTTP_200_OK\ + if progress_code == 0 else progress_code + except Exception as e: + resp_template["workload_status_reason"] = progress - return Response(data=resp_template, - status=status_code - ) + return Response(data=resp_template, status=status_code) except Exception as e: self._logger.error(e.message) @@ -190,6 +199,7 @@ class InfraWorkload(newton_infra_workload.InfraWorkload): "workload_status": "WORKLOAD_DELETE_FAIL", "workload_status_reason": "Exception occurs" } + status_code = status.HTTP_500_INTERNAL_SERVER_ERROR try: if workloadid == "": @@ -240,18 +250,20 @@ class InfraWorkload(newton_infra_workload.InfraWorkload): (13, "WORKLOAD_DELETE_FAIL", "Unexpected:status not found in backlog item") ) - progress_code = progress[0] - progress_status = progress[1] - progress_msg = progress[2] - # if gInfraWorkloadThread.expired(workloadid): - # gInfraWorkloadThread.remove(workloadid) + try: + progress_code = progress[0] + progress_status = progress[1] + progress_msg = progress[2] + # if gInfraWorkloadThread.expired(workloadid): + # gInfraWorkloadThread.remove(workloadid) - resp_template["workload_status"] = progress_status - resp_template["workload_status_reason"] = progress_msg - return Response(data=resp_template, - status=status.HTTP_204_NO_CONTENT - if progress_code == 0 else progress_code - ) + resp_template["workload_status"] = progress_status + resp_template["workload_status_reason"] = progress_msg + status_code = status.HTTP_200_ACCEPTED \ + if progress_code == 0 else progress_code + except Exception as e: + resp_template["workload_status_reason"] = progress + return Response(data=resp_template, status=status_code) except Exception as e: self._logger.error(e.message) resp_template["workload_status_reason"] = e.message @@ -296,9 +308,27 @@ class InfraWorkloadHelper(infra_workload_helper.InfraWorkloadHelper): self._logger = logger def param_update_user_directives(self, parameters, oof_directives): + for attr in oof_directives.get("attributes", []): + aname = attr.get("attribute_name", None) + avalue = attr.get("attribute_value", None) + if aname in parameters: + parameters[aname] = avalue + else: + self._logger.warn( + "There is no parameter exist: %s" % aname) + return parameters def param_update_sdnc_directives(self, parameters, sdnc_directives): + for attr in sdnc_directives.get("attributes", []): + aname = attr.get("attribute_name", None) + avalue = attr.get("attribute_value", None) + if aname in parameters: + parameters[aname] = avalue + else: + self._logger.warn( + "There is no parameter exist: %s" % aname) + return parameters def param_update_oof_directives(self, parameters, oof_directives): |