summaryrefslogtreecommitdiffstats
path: root/share
diff options
context:
space:
mode:
Diffstat (limited to 'share')
-rw-r--r--share/common/msapi/helper.py86
-rw-r--r--share/newton_base/registration/registration.py9
-rw-r--r--share/starlingx_base/registration/registration.py12
-rw-r--r--share/starlingx_base/resource/infra_workload.py98
4 files changed, 154 insertions, 51 deletions
diff --git a/share/common/msapi/helper.py b/share/common/msapi/helper.py
index bb07b097..bd8932f3 100644
--- a/share/common/msapi/helper.py
+++ b/share/common/msapi/helper.py
@@ -18,10 +18,13 @@ import threading
import datetime
import time
+import traceback
+
# from common.exceptions import VimDriverNewtonException
from common.utils import restcall
from rest_framework import status
+from django.core.cache import cache
logger = logging.getLogger(__name__)
@@ -64,8 +67,12 @@ class Helper(object):
# Helper of AAI resource access
class MultiCloudAAIHelper(object):
+ '''
+ Helper to register infrastructure resource into AAI
+ '''
def __init__(self, multicloud_prefix, aai_base_url):
+ logger.debug("MultiCloudAAIHelper __init__ traceback: %s" % traceback.format_exc())
self.proxy_prefix = multicloud_prefix
self.aai_base_url = aai_base_url
self._logger = logger
@@ -159,7 +166,7 @@ class MultiCloudAAIHelper(object):
# thread helper
class MultiCloudThreadHelper(object):
'''
- thread to register infrastructure resource into AAI
+ Helper to manage LCM of an offloading thread
'''
@staticmethod
@@ -172,7 +179,10 @@ class MultiCloudThreadHelper(object):
epoch_time_sec = time.mktime(now_time.timetuple())
return int(epoch_time_sec * 1e6 + now_time.microsecond)
- def __init__(self):
+ def __init__(self, name=""):
+ # debug: dump the callstack to determine the callstack, hence the lcm
+ # logger.debug("MultiCloudThreadHelper __init__: %s" % traceback.format_exc())
+
# format of a backlog item:
# {
# "id": unique string to identify this item in backlog,
@@ -189,7 +199,12 @@ class MultiCloudThreadHelper(object):
self.expired_backlog = {}
self.lock = threading.Lock()
self.state_ = 0 # 0: stopped, 1: started
- self.thread = None
+ self.cache_prefix = "bi_"+name+"_"
+ self.cache_expired_prefix = "biex_"+name+"_"
+
+ self.thread = MultiCloudThreadHelper.HelperThread(self)
+ self.thread.start()
+
def state(self):
return self.state_
@@ -198,8 +213,8 @@ class MultiCloudThreadHelper(object):
self.lock.acquire()
if 0 == self.state_:
self.state_ = 1
- self.thread = MultiCloudThreadHelper.HelperThread(self)
- self.thread.start()
+ # self.thread = MultiCloudThreadHelper.HelperThread(self)
+ # self.thread.start()
else:
pass
self.lock.release()
@@ -208,35 +223,67 @@ class MultiCloudThreadHelper(object):
self.state_ = 0
def add(self, backlog_item):
+ cache_for_query = None
if not hasattr(backlog_item, "worker"):
return None
if not hasattr(backlog_item, "id"):
backlog_item["id"] = str(uuid.uuid1())
+ else:
+ cache_for_query = {
+ "id": backlog_item["id"],
+ "status": backlog_item.get("status", None)
+ }
+
if not hasattr(backlog_item, "repeat"):
backlog_item["repeat"] = 0
backlog_item["timestamp"] = 0
# self.lock.acquire()
# make sure there is no identical backlog in expired backlog
+ if cache_for_query:
+ cache.set(self.cache_prefix + backlog_item["id"],
+ json.dumps(cache_for_query), 3600 * 24)
+
self.expired_backlog.pop(backlog_item["id"], None)
self.backlog.update(backlog_item["id"], backlog_item)
# self.lock.release()
return len(self.backlog)
def get(self, backlog_id):
- self.backlog.get(backlog_id, None) or self.expired_backlog.get(backlog_id, None)
+ item = self.backlog.get(backlog_id, None) or self.expired_backlog.get(backlog_id, None)
+
+ # check the cache
+ if not item:
+ cache_for_query_str = cache.get(self.cache_prefix + backlog_id)
+ if cache_for_query_str:
+ item = json.loads(cache_for_query_str)
+ else:
+ cache_for_query_str = cache.get(self.cache_expired_prefix + backlog_id)
+ if cache_for_query_str:
+ item = json.loads(cache_for_query_str)
+ return item
# check if the backlog item is in expired backlog
def expired(self, backlog_id):
if not self.backlog.get(backlog_id, None):
if self.expired_backlog.get(backlog_id, None):
return True
+
+ # check the cache
+ cache_for_query_str = cache.get(self.cache_prefix + backlog_id)
+ if not cache_for_query_str:
+ cache_for_query_str = cache.get(self.cache_expired_prefix + backlog_id)
+ if cache_for_query_str:
+ return True
+
return False
def remove(self, backlog_id):
# self.lock.acquire()
self.backlog.pop(backlog_id, None)
self.expired_backlog.pop(backlog_id, None)
+ cache.delete(self.cache_prefix + backlog_id)
+ cache.delete(self.cache_expired_prefix + backlog_id)
# self.lock.release()
def reset(self):
@@ -245,8 +292,8 @@ class MultiCloudThreadHelper(object):
self.expired_backlog.clear()
# self.lock.release()
- def count(self):
- return len(self.backlog)
+ #def count(self):
+ # return len(self.backlog)
class HelperThread(threading.Thread):
def __init__(self, owner):
@@ -254,11 +301,13 @@ class MultiCloudThreadHelper(object):
self.daemon = True
self.duration = 0
self.owner = owner
+ # debug: dump the callstack to determine the callstack, hence the lcm
+ logger.debug("HelperThread __init__ : %s" % traceback.format_exc())
def run(self):
logger.debug("Start processing backlogs")
nexttimer = 0
- while self.owner.state_ == 1 and self.owner.count() > 0:
+ while self.owner.state_ == 1: # and self.owner.count() > 0:
if nexttimer > 1000000:
# sleep in case of interval > 1 second
time.sleep(nexttimer // 1000000)
@@ -285,14 +334,33 @@ class MultiCloudThreadHelper(object):
item["status"] = worker(payload) or 0
except Exception as e:
item["status"] = e.message
+ cache_item_for_query = {
+ "id": item["id"],
+ "status": item["status"]
+ }
if item.get("repeat", 0) == 0:
self.owner.remove(backlog_id)
# keep only the id and status
self.owner.expired_backlog[backlog_id] = {"status": item["status"]}
+
+ #update cache
+ try:
+ cache.set(self.owner.cache_expired_prefix + cache_item_for_query["id"], cache_item_for_query, 3600*24)
+ cache.delete(self.owner.cache_prefix + cache_item_for_query["id"])
+ except Exception as e:
+ logger.error(e.message)
else:
item["timestamp"] = now
+ #update cache
+ try:
+ cache.set(self.owner.cache_prefix + cache_item_for_query["id"], cache_item_for_query, 3600*24)
+ except Exception as e:
+ logger.error(e.message)
pass
# end of loop
+ # while True:
+ # logger.debug("thread sleep for 5 seconds")
+ # time.sleep(5) # wait forever, testonly
logger.debug("stop processing backlogs")
self.owner.state_ = 0
# end of processing
diff --git a/share/newton_base/registration/registration.py b/share/newton_base/registration/registration.py
index 6e8f8809..8789c388 100644
--- a/share/newton_base/registration/registration.py
+++ b/share/newton_base/registration/registration.py
@@ -24,7 +24,8 @@ from rest_framework.views import APIView
from common.exceptions import VimDriverNewtonException
from common.msapi import extsys
-from common.msapi import helper
+from common.msapi.helper import MultiCloudThreadHelper
+from common.msapi.helper import MultiCloudAAIHelper
from common.utils import restcall
from newton_base.util import VimDriverUtils
@@ -34,12 +35,13 @@ logger = logging.getLogger(__name__)
class Registry(APIView):
def __init__(self):
+ # logger.debug("Registry __init__: %s" % traceback.format_exc())
if not hasattr(self, "_logger"):
self._logger = logger
if not hasattr(self, "register_thread"):
# dedicate thread to offload vim registration process
- self.register_thread = helper.MultiCloudThreadHelper()
+ self.register_thread = MultiCloudThreadHelper()
if not hasattr(self, "register_helper") or not self.register_helper:
if not hasattr(self, "proxy_prefix"):
@@ -131,12 +133,13 @@ class Registry(APIView):
-class RegistryHelper(helper.MultiCloudAAIHelper):
+class RegistryHelper(MultiCloudAAIHelper):
'''
Helper code to discover and register a cloud region's resource
'''
def __init__(self, multicloud_prefix, aai_base_url):
+ # logger.debug("RegistryHelper __init__: %s" % traceback.format_exc())
self.proxy_prefix = multicloud_prefix
self.aai_base_url = aai_base_url
self._logger = logger
diff --git a/share/starlingx_base/registration/registration.py b/share/starlingx_base/registration/registration.py
index 9d1846d7..21ab1948 100644
--- a/share/starlingx_base/registration/registration.py
+++ b/share/starlingx_base/registration/registration.py
@@ -32,13 +32,14 @@ from django.core.cache import cache
logger = logging.getLogger(__name__)
# global var: Audition thread
-gAZCapAuditThread = helper.MultiCloudThreadHelper()
+gAZCapAuditThread = helper.MultiCloudThreadHelper("azcap")
# DEBUG=True
# APIv0 handler upgrading: leverage APIv1 handler
class APIv0Registry(newton_registration.Registry):
def __init__(self):
+ # logger.error(traceback.format_exc())
self.register_helper = RegistryHelper(settings.MULTICLOUD_PREFIX, settings.AAI_BASE_URL)
super(APIv0Registry, self).__init__()
# self._logger = logger
@@ -52,10 +53,10 @@ class APIv0Registry(newton_registration.Registry):
settings.AAI_BASE_URL
)
backlog_item = {
- "id": vimid,
- "worker": worker_self.azcap_audit,
- "payload": (worker_self, vimid),
- "repeat": 5*1000000, # repeat every 5 seconds
+ "id": vimid,
+ "worker": worker_self.azcap_audit,
+ "payload": (worker_self, vimid),
+ "repeat": 10*1000000, # repeat every 10 seconds
}
gAZCapAuditThread.add(backlog_item)
if 0 == gAZCapAuditThread.state():
@@ -127,6 +128,7 @@ class RegistryHelper(newton_registration.RegistryHelper):
Helper code to discover and register a cloud region's resource
'''
def __init__(self, multicloud_prefix, aai_base_url):
+ # logger.error(traceback.format_exc())
super(RegistryHelper, self).__init__(multicloud_prefix, aai_base_url)
# self._logger = logger
diff --git a/share/starlingx_base/resource/infra_workload.py b/share/starlingx_base/resource/infra_workload.py
index 8fea68f3..7d5873df 100644
--- a/share/starlingx_base/resource/infra_workload.py
+++ b/share/starlingx_base/resource/infra_workload.py
@@ -20,6 +20,7 @@ from rest_framework import status
from rest_framework.response import Response
from common.msapi import extsys
from common.msapi.helper import Helper as helper
+from common.msapi.helper import MultiCloudThreadHelper
from newton_base.resource import infra_workload as newton_infra_workload
from newton_base.resource import infra_workload_helper as infra_workload_helper
@@ -29,7 +30,7 @@ logger = logging.getLogger(__name__)
# global var: Audition thread
# the id is the workloadid, which implies post to workloadid1 followed by delete workloadid1
# will replace the previous backlog item
-gInfraWorkloadThread = helper.MultiCloudThreadHelper()
+gInfraWorkloadThread = MultiCloudThreadHelper("infw")
class InfraWorkload(newton_infra_workload.InfraWorkload):
def __init__(self):
@@ -46,6 +47,7 @@ class InfraWorkload(newton_infra_workload.InfraWorkload):
"workload_status": "WORKLOAD_CREATE_FAIL",
"workload_status_reason": "Exception occurs"
}
+ status_code = status.HTTP_500_INTERNAL_SERVER_ERROR
try:
worker_self = InfraWorkloadHelper(
@@ -104,15 +106,20 @@ class InfraWorkload(newton_infra_workload.InfraWorkload):
(13, "WORKLOAD_DELETE_FAIL",
"Unexpected:status not found in backlog item")
)
- progress_code = progress[0]
- progress_status = progress[1]
- progress_msg = progress[2]
- resp_template["workload_status"] = progress_status
- resp_template["workload_status_reason"] = progress_msg
- return Response(data=resp_template,
- status=status.HTTP_200_ACCEPTED
- if progress_code == 0 else progress_code
- )
+
+ try:
+ progress_code = progress[0]
+ progress_status = progress[1]
+ progress_msg = progress[2]
+ resp_template["workload_status"] = progress_status
+ resp_template["workload_status_reason"] = progress_msg
+
+ status_code = status.HTTP_200_ACCEPTED\
+ if progress_code == 0 else progress_code
+ except Exception as e:
+ resp_template["workload_status_reason"] = progress
+
+ return Response(data=resp_template, status=status_code)
except Exception as e:
errmsg = e.message
self._logger.error(errmsg)
@@ -130,6 +137,7 @@ class InfraWorkload(newton_infra_workload.InfraWorkload):
"workload_status": "WORKLOAD_GET_FAIL",
"workload_status_reason": "Exception occurs"
}
+ status_code = status.HTTP_500_INTERNAL_SERVER_ERROR
try:
if workloadid == "":
@@ -140,7 +148,7 @@ class InfraWorkload(newton_infra_workload.InfraWorkload):
)
# now query the progress
- status_code = status.HTTP_200_OK
+
backlog_item = gInfraWorkloadThread.get(workloadid)
if not backlog_item:
# backlog item not found, so check the stack status
@@ -160,19 +168,20 @@ class InfraWorkload(newton_infra_workload.InfraWorkload):
(13, "WORKLOAD_DELETE_FAIL",
"Unexpected:status not found in backlog item")
)
- progress_code = progress[0]
- progress_status = progress[1]
- progress_msg = progress[2]
- # if gInfraWorkloadThread.expired(workloadid):
- # gInfraWorkloadThread.remove(workloadid)
- resp_template["workload_status"] = progress_status
- resp_template["workload_status_reason"] = progress_msg
- status_code = status.HTTP_200_OK\
- if progress_code == 0 else progress_code
+ try:
+ progress_code = progress[0]
+ progress_status = progress[1]
+ progress_msg = progress[2]
+ # if gInfraWorkloadThread.expired(workloadid):
+ # gInfraWorkloadThread.remove(workloadid)
+ resp_template["workload_status"] = progress_status
+ resp_template["workload_status_reason"] = progress_msg
+ status_code = status.HTTP_200_OK\
+ if progress_code == 0 else progress_code
+ except Exception as e:
+ resp_template["workload_status_reason"] = progress
- return Response(data=resp_template,
- status=status_code
- )
+ return Response(data=resp_template, status=status_code)
except Exception as e:
self._logger.error(e.message)
@@ -190,6 +199,7 @@ class InfraWorkload(newton_infra_workload.InfraWorkload):
"workload_status": "WORKLOAD_DELETE_FAIL",
"workload_status_reason": "Exception occurs"
}
+ status_code = status.HTTP_500_INTERNAL_SERVER_ERROR
try:
if workloadid == "":
@@ -240,18 +250,20 @@ class InfraWorkload(newton_infra_workload.InfraWorkload):
(13, "WORKLOAD_DELETE_FAIL",
"Unexpected:status not found in backlog item")
)
- progress_code = progress[0]
- progress_status = progress[1]
- progress_msg = progress[2]
- # if gInfraWorkloadThread.expired(workloadid):
- # gInfraWorkloadThread.remove(workloadid)
+ try:
+ progress_code = progress[0]
+ progress_status = progress[1]
+ progress_msg = progress[2]
+ # if gInfraWorkloadThread.expired(workloadid):
+ # gInfraWorkloadThread.remove(workloadid)
- resp_template["workload_status"] = progress_status
- resp_template["workload_status_reason"] = progress_msg
- return Response(data=resp_template,
- status=status.HTTP_204_NO_CONTENT
- if progress_code == 0 else progress_code
- )
+ resp_template["workload_status"] = progress_status
+ resp_template["workload_status_reason"] = progress_msg
+ status_code = status.HTTP_200_ACCEPTED \
+ if progress_code == 0 else progress_code
+ except Exception as e:
+ resp_template["workload_status_reason"] = progress
+ return Response(data=resp_template, status=status_code)
except Exception as e:
self._logger.error(e.message)
resp_template["workload_status_reason"] = e.message
@@ -296,9 +308,27 @@ class InfraWorkloadHelper(infra_workload_helper.InfraWorkloadHelper):
self._logger = logger
def param_update_user_directives(self, parameters, oof_directives):
+ for attr in oof_directives.get("attributes", []):
+ aname = attr.get("attribute_name", None)
+ avalue = attr.get("attribute_value", None)
+ if aname in parameters:
+ parameters[aname] = avalue
+ else:
+ self._logger.warn(
+ "There is no parameter exist: %s" % aname)
+
return parameters
def param_update_sdnc_directives(self, parameters, sdnc_directives):
+ for attr in sdnc_directives.get("attributes", []):
+ aname = attr.get("attribute_name", None)
+ avalue = attr.get("attribute_value", None)
+ if aname in parameters:
+ parameters[aname] = avalue
+ else:
+ self._logger.warn(
+ "There is no parameter exist: %s" % aname)
+
return parameters
def param_update_oof_directives(self, parameters, oof_directives):