diff options
Diffstat (limited to 'share/common')
-rw-r--r-- | share/common/msapi/helper.py | 86 |
1 files changed, 77 insertions, 9 deletions
diff --git a/share/common/msapi/helper.py b/share/common/msapi/helper.py index bb07b097..bd8932f3 100644 --- a/share/common/msapi/helper.py +++ b/share/common/msapi/helper.py @@ -18,10 +18,13 @@ import threading import datetime import time +import traceback + # from common.exceptions import VimDriverNewtonException from common.utils import restcall from rest_framework import status +from django.core.cache import cache logger = logging.getLogger(__name__) @@ -64,8 +67,12 @@ class Helper(object): # Helper of AAI resource access class MultiCloudAAIHelper(object): + ''' + Helper to register infrastructure resource into AAI + ''' def __init__(self, multicloud_prefix, aai_base_url): + logger.debug("MultiCloudAAIHelper __init__ traceback: %s" % traceback.format_exc()) self.proxy_prefix = multicloud_prefix self.aai_base_url = aai_base_url self._logger = logger @@ -159,7 +166,7 @@ class MultiCloudAAIHelper(object): # thread helper class MultiCloudThreadHelper(object): ''' - thread to register infrastructure resource into AAI + Helper to manage LCM of an offloading thread ''' @staticmethod @@ -172,7 +179,10 @@ class MultiCloudThreadHelper(object): epoch_time_sec = time.mktime(now_time.timetuple()) return int(epoch_time_sec * 1e6 + now_time.microsecond) - def __init__(self): + def __init__(self, name=""): + # debug: dump the callstack to determine the callstack, hence the lcm + # logger.debug("MultiCloudThreadHelper __init__: %s" % traceback.format_exc()) + # format of a backlog item: # { # "id": unique string to identify this item in backlog, @@ -189,7 +199,12 @@ class MultiCloudThreadHelper(object): self.expired_backlog = {} self.lock = threading.Lock() self.state_ = 0 # 0: stopped, 1: started - self.thread = None + self.cache_prefix = "bi_"+name+"_" + self.cache_expired_prefix = "biex_"+name+"_" + + self.thread = MultiCloudThreadHelper.HelperThread(self) + self.thread.start() + def state(self): return self.state_ @@ -198,8 +213,8 @@ class MultiCloudThreadHelper(object): self.lock.acquire() if 0 == self.state_: self.state_ = 1 - self.thread = MultiCloudThreadHelper.HelperThread(self) - self.thread.start() + # self.thread = MultiCloudThreadHelper.HelperThread(self) + # self.thread.start() else: pass self.lock.release() @@ -208,35 +223,67 @@ class MultiCloudThreadHelper(object): self.state_ = 0 def add(self, backlog_item): + cache_for_query = None if not hasattr(backlog_item, "worker"): return None if not hasattr(backlog_item, "id"): backlog_item["id"] = str(uuid.uuid1()) + else: + cache_for_query = { + "id": backlog_item["id"], + "status": backlog_item.get("status", None) + } + if not hasattr(backlog_item, "repeat"): backlog_item["repeat"] = 0 backlog_item["timestamp"] = 0 # self.lock.acquire() # make sure there is no identical backlog in expired backlog + if cache_for_query: + cache.set(self.cache_prefix + backlog_item["id"], + json.dumps(cache_for_query), 3600 * 24) + self.expired_backlog.pop(backlog_item["id"], None) self.backlog.update(backlog_item["id"], backlog_item) # self.lock.release() return len(self.backlog) def get(self, backlog_id): - self.backlog.get(backlog_id, None) or self.expired_backlog.get(backlog_id, None) + item = self.backlog.get(backlog_id, None) or self.expired_backlog.get(backlog_id, None) + + # check the cache + if not item: + cache_for_query_str = cache.get(self.cache_prefix + backlog_id) + if cache_for_query_str: + item = json.loads(cache_for_query_str) + else: + cache_for_query_str = cache.get(self.cache_expired_prefix + backlog_id) + if cache_for_query_str: + item = json.loads(cache_for_query_str) + return item # check if the backlog item is in expired backlog def expired(self, backlog_id): if not self.backlog.get(backlog_id, None): if self.expired_backlog.get(backlog_id, None): return True + + # check the cache + cache_for_query_str = cache.get(self.cache_prefix + backlog_id) + if not cache_for_query_str: + cache_for_query_str = cache.get(self.cache_expired_prefix + backlog_id) + if cache_for_query_str: + return True + return False def remove(self, backlog_id): # self.lock.acquire() self.backlog.pop(backlog_id, None) self.expired_backlog.pop(backlog_id, None) + cache.delete(self.cache_prefix + backlog_id) + cache.delete(self.cache_expired_prefix + backlog_id) # self.lock.release() def reset(self): @@ -245,8 +292,8 @@ class MultiCloudThreadHelper(object): self.expired_backlog.clear() # self.lock.release() - def count(self): - return len(self.backlog) + #def count(self): + # return len(self.backlog) class HelperThread(threading.Thread): def __init__(self, owner): @@ -254,11 +301,13 @@ class MultiCloudThreadHelper(object): self.daemon = True self.duration = 0 self.owner = owner + # debug: dump the callstack to determine the callstack, hence the lcm + logger.debug("HelperThread __init__ : %s" % traceback.format_exc()) def run(self): logger.debug("Start processing backlogs") nexttimer = 0 - while self.owner.state_ == 1 and self.owner.count() > 0: + while self.owner.state_ == 1: # and self.owner.count() > 0: if nexttimer > 1000000: # sleep in case of interval > 1 second time.sleep(nexttimer // 1000000) @@ -285,14 +334,33 @@ class MultiCloudThreadHelper(object): item["status"] = worker(payload) or 0 except Exception as e: item["status"] = e.message + cache_item_for_query = { + "id": item["id"], + "status": item["status"] + } if item.get("repeat", 0) == 0: self.owner.remove(backlog_id) # keep only the id and status self.owner.expired_backlog[backlog_id] = {"status": item["status"]} + + #update cache + try: + cache.set(self.owner.cache_expired_prefix + cache_item_for_query["id"], cache_item_for_query, 3600*24) + cache.delete(self.owner.cache_prefix + cache_item_for_query["id"]) + except Exception as e: + logger.error(e.message) else: item["timestamp"] = now + #update cache + try: + cache.set(self.owner.cache_prefix + cache_item_for_query["id"], cache_item_for_query, 3600*24) + except Exception as e: + logger.error(e.message) pass # end of loop + # while True: + # logger.debug("thread sleep for 5 seconds") + # time.sleep(5) # wait forever, testonly logger.debug("stop processing backlogs") self.owner.state_ = 0 # end of processing |