summaryrefslogtreecommitdiffstats
path: root/share/common
diff options
context:
space:
mode:
authorXiaohua Zhang <xiaohua.zhang@windriver.com>2019-04-04 08:08:35 +0000
committerXiaohua Zhang <xiaohua.zhang@windriver.com>2019-04-04 08:34:07 +0000
commit54700feaf9055b2192612e4c3958c6ac5bbd6a56 (patch)
tree788f1e23b372c57bc3942e639c7562458c4119a4 /share/common
parent0491327f8dcd808402669c3d8f32a5a0f879db45 (diff)
Fix thread helper bug
the uwsgi disable threads by default. Enable it with additonal option. The post and get API request might routed to different uwsgi process, Leverage memcached to sync backlog items between uwsgi process Change-Id: Iaac6b66061f2c396cd24825ab020f0a937dfb505 Issue-ID: MULTICLOUD-561 Signed-off-by: Xiaohua Zhang <xiaohua.zhang@windriver.com>
Diffstat (limited to 'share/common')
-rw-r--r--share/common/msapi/helper.py86
1 files changed, 77 insertions, 9 deletions
diff --git a/share/common/msapi/helper.py b/share/common/msapi/helper.py
index bb07b097..bd8932f3 100644
--- a/share/common/msapi/helper.py
+++ b/share/common/msapi/helper.py
@@ -18,10 +18,13 @@ import threading
import datetime
import time
+import traceback
+
# from common.exceptions import VimDriverNewtonException
from common.utils import restcall
from rest_framework import status
+from django.core.cache import cache
logger = logging.getLogger(__name__)
@@ -64,8 +67,12 @@ class Helper(object):
# Helper of AAI resource access
class MultiCloudAAIHelper(object):
+ '''
+ Helper to register infrastructure resource into AAI
+ '''
def __init__(self, multicloud_prefix, aai_base_url):
+ logger.debug("MultiCloudAAIHelper __init__ traceback: %s" % traceback.format_exc())
self.proxy_prefix = multicloud_prefix
self.aai_base_url = aai_base_url
self._logger = logger
@@ -159,7 +166,7 @@ class MultiCloudAAIHelper(object):
# thread helper
class MultiCloudThreadHelper(object):
'''
- thread to register infrastructure resource into AAI
+ Helper to manage LCM of an offloading thread
'''
@staticmethod
@@ -172,7 +179,10 @@ class MultiCloudThreadHelper(object):
epoch_time_sec = time.mktime(now_time.timetuple())
return int(epoch_time_sec * 1e6 + now_time.microsecond)
- def __init__(self):
+ def __init__(self, name=""):
+ # debug: dump the callstack to determine the callstack, hence the lcm
+ # logger.debug("MultiCloudThreadHelper __init__: %s" % traceback.format_exc())
+
# format of a backlog item:
# {
# "id": unique string to identify this item in backlog,
@@ -189,7 +199,12 @@ class MultiCloudThreadHelper(object):
self.expired_backlog = {}
self.lock = threading.Lock()
self.state_ = 0 # 0: stopped, 1: started
- self.thread = None
+ self.cache_prefix = "bi_"+name+"_"
+ self.cache_expired_prefix = "biex_"+name+"_"
+
+ self.thread = MultiCloudThreadHelper.HelperThread(self)
+ self.thread.start()
+
def state(self):
return self.state_
@@ -198,8 +213,8 @@ class MultiCloudThreadHelper(object):
self.lock.acquire()
if 0 == self.state_:
self.state_ = 1
- self.thread = MultiCloudThreadHelper.HelperThread(self)
- self.thread.start()
+ # self.thread = MultiCloudThreadHelper.HelperThread(self)
+ # self.thread.start()
else:
pass
self.lock.release()
@@ -208,35 +223,67 @@ class MultiCloudThreadHelper(object):
self.state_ = 0
def add(self, backlog_item):
+ cache_for_query = None
if not hasattr(backlog_item, "worker"):
return None
if not hasattr(backlog_item, "id"):
backlog_item["id"] = str(uuid.uuid1())
+ else:
+ cache_for_query = {
+ "id": backlog_item["id"],
+ "status": backlog_item.get("status", None)
+ }
+
if not hasattr(backlog_item, "repeat"):
backlog_item["repeat"] = 0
backlog_item["timestamp"] = 0
# self.lock.acquire()
# make sure there is no identical backlog in expired backlog
+ if cache_for_query:
+ cache.set(self.cache_prefix + backlog_item["id"],
+ json.dumps(cache_for_query), 3600 * 24)
+
self.expired_backlog.pop(backlog_item["id"], None)
self.backlog.update(backlog_item["id"], backlog_item)
# self.lock.release()
return len(self.backlog)
def get(self, backlog_id):
- self.backlog.get(backlog_id, None) or self.expired_backlog.get(backlog_id, None)
+ item = self.backlog.get(backlog_id, None) or self.expired_backlog.get(backlog_id, None)
+
+ # check the cache
+ if not item:
+ cache_for_query_str = cache.get(self.cache_prefix + backlog_id)
+ if cache_for_query_str:
+ item = json.loads(cache_for_query_str)
+ else:
+ cache_for_query_str = cache.get(self.cache_expired_prefix + backlog_id)
+ if cache_for_query_str:
+ item = json.loads(cache_for_query_str)
+ return item
# check if the backlog item is in expired backlog
def expired(self, backlog_id):
if not self.backlog.get(backlog_id, None):
if self.expired_backlog.get(backlog_id, None):
return True
+
+ # check the cache
+ cache_for_query_str = cache.get(self.cache_prefix + backlog_id)
+ if not cache_for_query_str:
+ cache_for_query_str = cache.get(self.cache_expired_prefix + backlog_id)
+ if cache_for_query_str:
+ return True
+
return False
def remove(self, backlog_id):
# self.lock.acquire()
self.backlog.pop(backlog_id, None)
self.expired_backlog.pop(backlog_id, None)
+ cache.delete(self.cache_prefix + backlog_id)
+ cache.delete(self.cache_expired_prefix + backlog_id)
# self.lock.release()
def reset(self):
@@ -245,8 +292,8 @@ class MultiCloudThreadHelper(object):
self.expired_backlog.clear()
# self.lock.release()
- def count(self):
- return len(self.backlog)
+ #def count(self):
+ # return len(self.backlog)
class HelperThread(threading.Thread):
def __init__(self, owner):
@@ -254,11 +301,13 @@ class MultiCloudThreadHelper(object):
self.daemon = True
self.duration = 0
self.owner = owner
+ # debug: dump the callstack to determine the callstack, hence the lcm
+ logger.debug("HelperThread __init__ : %s" % traceback.format_exc())
def run(self):
logger.debug("Start processing backlogs")
nexttimer = 0
- while self.owner.state_ == 1 and self.owner.count() > 0:
+ while self.owner.state_ == 1: # and self.owner.count() > 0:
if nexttimer > 1000000:
# sleep in case of interval > 1 second
time.sleep(nexttimer // 1000000)
@@ -285,14 +334,33 @@ class MultiCloudThreadHelper(object):
item["status"] = worker(payload) or 0
except Exception as e:
item["status"] = e.message
+ cache_item_for_query = {
+ "id": item["id"],
+ "status": item["status"]
+ }
if item.get("repeat", 0) == 0:
self.owner.remove(backlog_id)
# keep only the id and status
self.owner.expired_backlog[backlog_id] = {"status": item["status"]}
+
+ #update cache
+ try:
+ cache.set(self.owner.cache_expired_prefix + cache_item_for_query["id"], cache_item_for_query, 3600*24)
+ cache.delete(self.owner.cache_prefix + cache_item_for_query["id"])
+ except Exception as e:
+ logger.error(e.message)
else:
item["timestamp"] = now
+ #update cache
+ try:
+ cache.set(self.owner.cache_prefix + cache_item_for_query["id"], cache_item_for_query, 3600*24)
+ except Exception as e:
+ logger.error(e.message)
pass
# end of loop
+ # while True:
+ # logger.debug("thread sleep for 5 seconds")
+ # time.sleep(5) # wait forever, testonly
logger.debug("stop processing backlogs")
self.owner.state_ = 0
# end of processing