summaryrefslogtreecommitdiffstats
path: root/share/common
diff options
context:
space:
mode:
Diffstat (limited to 'share/common')
-rw-r--r--share/common/msapi/helper.py86
1 files changed, 77 insertions, 9 deletions
diff --git a/share/common/msapi/helper.py b/share/common/msapi/helper.py
index bb07b097..bd8932f3 100644
--- a/share/common/msapi/helper.py
+++ b/share/common/msapi/helper.py
@@ -18,10 +18,13 @@ import threading
import datetime
import time
+import traceback
+
# from common.exceptions import VimDriverNewtonException
from common.utils import restcall
from rest_framework import status
+from django.core.cache import cache
logger = logging.getLogger(__name__)
@@ -64,8 +67,12 @@ class Helper(object):
# Helper of AAI resource access
class MultiCloudAAIHelper(object):
+ '''
+ Helper to register infrastructure resource into AAI
+ '''
def __init__(self, multicloud_prefix, aai_base_url):
+ logger.debug("MultiCloudAAIHelper __init__ traceback: %s" % traceback.format_exc())
self.proxy_prefix = multicloud_prefix
self.aai_base_url = aai_base_url
self._logger = logger
@@ -159,7 +166,7 @@ class MultiCloudAAIHelper(object):
# thread helper
class MultiCloudThreadHelper(object):
'''
- thread to register infrastructure resource into AAI
+ Helper to manage LCM of an offloading thread
'''
@staticmethod
@@ -172,7 +179,10 @@ class MultiCloudThreadHelper(object):
epoch_time_sec = time.mktime(now_time.timetuple())
return int(epoch_time_sec * 1e6 + now_time.microsecond)
- def __init__(self):
+ def __init__(self, name=""):
+ # debug: dump the callstack to determine the callstack, hence the lcm
+ # logger.debug("MultiCloudThreadHelper __init__: %s" % traceback.format_exc())
+
# format of a backlog item:
# {
# "id": unique string to identify this item in backlog,
@@ -189,7 +199,12 @@ class MultiCloudThreadHelper(object):
self.expired_backlog = {}
self.lock = threading.Lock()
self.state_ = 0 # 0: stopped, 1: started
- self.thread = None
+ self.cache_prefix = "bi_"+name+"_"
+ self.cache_expired_prefix = "biex_"+name+"_"
+
+ self.thread = MultiCloudThreadHelper.HelperThread(self)
+ self.thread.start()
+
def state(self):
return self.state_
@@ -198,8 +213,8 @@ class MultiCloudThreadHelper(object):
self.lock.acquire()
if 0 == self.state_:
self.state_ = 1
- self.thread = MultiCloudThreadHelper.HelperThread(self)
- self.thread.start()
+ # self.thread = MultiCloudThreadHelper.HelperThread(self)
+ # self.thread.start()
else:
pass
self.lock.release()
@@ -208,35 +223,67 @@ class MultiCloudThreadHelper(object):
self.state_ = 0
def add(self, backlog_item):
+ cache_for_query = None
if not hasattr(backlog_item, "worker"):
return None
if not hasattr(backlog_item, "id"):
backlog_item["id"] = str(uuid.uuid1())
+ else:
+ cache_for_query = {
+ "id": backlog_item["id"],
+ "status": backlog_item.get("status", None)
+ }
+
if not hasattr(backlog_item, "repeat"):
backlog_item["repeat"] = 0
backlog_item["timestamp"] = 0
# self.lock.acquire()
# make sure there is no identical backlog in expired backlog
+ if cache_for_query:
+ cache.set(self.cache_prefix + backlog_item["id"],
+ json.dumps(cache_for_query), 3600 * 24)
+
self.expired_backlog.pop(backlog_item["id"], None)
self.backlog.update(backlog_item["id"], backlog_item)
# self.lock.release()
return len(self.backlog)
def get(self, backlog_id):
- self.backlog.get(backlog_id, None) or self.expired_backlog.get(backlog_id, None)
+ item = self.backlog.get(backlog_id, None) or self.expired_backlog.get(backlog_id, None)
+
+ # check the cache
+ if not item:
+ cache_for_query_str = cache.get(self.cache_prefix + backlog_id)
+ if cache_for_query_str:
+ item = json.loads(cache_for_query_str)
+ else:
+ cache_for_query_str = cache.get(self.cache_expired_prefix + backlog_id)
+ if cache_for_query_str:
+ item = json.loads(cache_for_query_str)
+ return item
# check if the backlog item is in expired backlog
def expired(self, backlog_id):
if not self.backlog.get(backlog_id, None):
if self.expired_backlog.get(backlog_id, None):
return True
+
+ # check the cache
+ cache_for_query_str = cache.get(self.cache_prefix + backlog_id)
+ if not cache_for_query_str:
+ cache_for_query_str = cache.get(self.cache_expired_prefix + backlog_id)
+ if cache_for_query_str:
+ return True
+
return False
def remove(self, backlog_id):
# self.lock.acquire()
self.backlog.pop(backlog_id, None)
self.expired_backlog.pop(backlog_id, None)
+ cache.delete(self.cache_prefix + backlog_id)
+ cache.delete(self.cache_expired_prefix + backlog_id)
# self.lock.release()
def reset(self):
@@ -245,8 +292,8 @@ class MultiCloudThreadHelper(object):
self.expired_backlog.clear()
# self.lock.release()
- def count(self):
- return len(self.backlog)
+ #def count(self):
+ # return len(self.backlog)
class HelperThread(threading.Thread):
def __init__(self, owner):
@@ -254,11 +301,13 @@ class MultiCloudThreadHelper(object):
self.daemon = True
self.duration = 0
self.owner = owner
+ # debug: dump the callstack to determine the callstack, hence the lcm
+ logger.debug("HelperThread __init__ : %s" % traceback.format_exc())
def run(self):
logger.debug("Start processing backlogs")
nexttimer = 0
- while self.owner.state_ == 1 and self.owner.count() > 0:
+ while self.owner.state_ == 1: # and self.owner.count() > 0:
if nexttimer > 1000000:
# sleep in case of interval > 1 second
time.sleep(nexttimer // 1000000)
@@ -285,14 +334,33 @@ class MultiCloudThreadHelper(object):
item["status"] = worker(payload) or 0
except Exception as e:
item["status"] = e.message
+ cache_item_for_query = {
+ "id": item["id"],
+ "status": item["status"]
+ }
if item.get("repeat", 0) == 0:
self.owner.remove(backlog_id)
# keep only the id and status
self.owner.expired_backlog[backlog_id] = {"status": item["status"]}
+
+ #update cache
+ try:
+ cache.set(self.owner.cache_expired_prefix + cache_item_for_query["id"], cache_item_for_query, 3600*24)
+ cache.delete(self.owner.cache_prefix + cache_item_for_query["id"])
+ except Exception as e:
+ logger.error(e.message)
else:
item["timestamp"] = now
+ #update cache
+ try:
+ cache.set(self.owner.cache_prefix + cache_item_for_query["id"], cache_item_for_query, 3600*24)
+ except Exception as e:
+ logger.error(e.message)
pass
# end of loop
+ # while True:
+ # logger.debug("thread sleep for 5 seconds")
+ # time.sleep(5) # wait forever, testonly
logger.debug("stop processing backlogs")
self.owner.state_ = 0
# end of processing