summaryrefslogtreecommitdiffstats
path: root/share/common
diff options
context:
space:
mode:
authorBin Yang <bin.yang@windriver.com>2019-04-02 08:28:12 +0000
committerBin Yang <bin.yang@windriver.com>2019-04-02 08:28:12 +0000
commitfeec0bcd53f24b0383f9f2db3a7837fa08525947 (patch)
tree3d8bd59c4cad6c2926735198e73634969b9aee2b /share/common
parent3176e57da097f2c238b134f18b1a4af00e105546 (diff)
Refactor helper codes
Move helper thread to common module Refactor the interface between helper thread and worker Change-Id: I0b61a2ed1a428f67cfbe3cc1411ace39e245932d Issue-ID: MULTICLOUD-554 Signed-off-by: Bin Yang <bin.yang@windriver.com>
Diffstat (limited to 'share/common')
-rw-r--r--share/common/msapi/helper.py228
1 files changed, 225 insertions, 3 deletions
diff --git a/share/common/msapi/helper.py b/share/common/msapi/helper.py
index 3e10c0fd..979c7feb 100644
--- a/share/common/msapi/helper.py
+++ b/share/common/msapi/helper.py
@@ -11,9 +11,14 @@
import json
import logging
-import re
+# import re
+import uuid
-from common.exceptions import VimDriverNewtonException
+import threading
+import datetime
+import time
+
+# from common.exceptions import VimDriverNewtonException
from common.utils import restcall
from rest_framework import status
@@ -21,6 +26,7 @@ from rest_framework import status
logger = logging.getLogger(__name__)
+# Helper of MultiCloud API invocation
class Helper(object):
@staticmethod
@@ -53,4 +59,220 @@ class Helper(object):
content = json.JSONDecoder().decode(ret[1])
ret[1] = content
return ret
- return [1, None, status.HTTP_404_NOT_FOUND] # return resource not found in case no type found \ No newline at end of file
+ return [1, None, status.HTTP_404_NOT_FOUND] # return resource not found in case no type found
+
+
+# Helper of AAI resource access
+class MultiCloudAAIHelper(object):
+
+ def __init__(self, multicloud_prefix, aai_base_url):
+ self.proxy_prefix = multicloud_prefix
+ self.aai_base_url = aai_base_url
+ self._logger = logger
+ # super(MultiCloudAAIHelper, self).__init__()
+
+ def _get_list_resources(
+ self, resource_url, service_type, session, viminfo,
+ vimid, content_key):
+ service = {
+ 'service_type': service_type,
+ 'interface': 'public'
+ }
+
+ # identity service should not filtered by region since it is might be first call
+ # to figure out available region list
+ if service_type != 'identity':
+ service['region_name'] = viminfo['openstack_region_id']\
+ if viminfo.get('openstack_region_id') else viminfo['cloud_region_id']
+
+ self._logger.debug("making request with URI:%s,%s" % (resource_url, service))
+ resp = session.get(resource_url, endpoint_filter=service)
+ self._logger.debug("request returns with status %s" % resp.status_code)
+ if resp.status_code == status.HTTP_200_OK:
+ self._logger.debug("with content:%s" % resp.json())
+ content = resp.json()
+ return content.get(content_key)
+ return None # failed to discover resources
+
+ def _update_resoure(self, cloud_owner, cloud_region_id,
+ resoure_id, resource_info, resource_type):
+ if cloud_owner and cloud_region_id:
+ self._logger.debug(
+ ("_update_resoure,vimid:%(cloud_owner)s"
+ "_%(cloud_region_id)s req_to_aai: %(resoure_id)s, "
+ "%(resource_type)s, %(resource_info)s")
+ % {
+ "cloud_owner": cloud_owner,
+ "cloud_region_id": cloud_region_id,
+ "resoure_id": resoure_id,
+ "resource_type": resource_type,
+ "resource_info": resource_info,
+ })
+
+ # get the resource first
+ resource_url = ("/cloud-infrastructure/cloud-regions/"
+ "cloud-region/%(cloud_owner)s/%(cloud_region_id)s/"
+ "%(resource_type)ss/%(resource_type)s/%(resoure_id)s"
+ % {
+ "cloud_owner": cloud_owner,
+ "cloud_region_id": cloud_region_id,
+ "resoure_id": resoure_id,
+ "resource_type": resource_type,
+ })
+
+ # get cloud-region
+ retcode, content, status_code = \
+ restcall.req_to_aai(resource_url, "GET")
+
+ # add resource-version
+ if retcode == 0 and content:
+ content = json.JSONDecoder().decode(content)
+ #resource_info["resource-version"] = content["resource-version"]
+ content.update(resource_info)
+ resource_info = content
+
+ #then update the resource
+ retcode, content, status_code = \
+ restcall.req_to_aai(resource_url, "PUT", content=resource_info)
+
+ self._logger.debug(
+ ("_update_resoure,vimid:%(cloud_owner)s"
+ "_%(cloud_region_id)s req_to_aai: %(resoure_id)s, "
+ "return %(retcode)s, %(content)s, %(status_code)s")
+ % {
+ "cloud_owner": cloud_owner,
+ "cloud_region_id": cloud_region_id,
+ "resoure_id": resoure_id,
+ "retcode": retcode,
+ "content": content,
+ "status_code": status_code,
+ })
+ return retcode, content
+ # unknown cloud owner,region_id
+ return (
+ 11,
+ "Unknown Cloud Region ID: %s ,%s" %(cloud_owner, cloud_region_id)
+ )
+ pass
+
+
+# thread helper
+class MultiCloudThreadHelper(object):
+ '''
+ thread to register infrastructure resource into AAI
+ '''
+
+ @staticmethod
+ def get_epoch_now_usecond():
+ '''
+ get epoch timestamp of this moment in usecond
+ :return:
+ '''
+ now_time = datetime.datetime.now()
+ epoch_time_sec = time.mktime(now_time.timetuple())
+ return int(epoch_time_sec * 1e6 + now_time.microsecond)
+
+ def __init__(self):
+ # format of a backlog item:
+ # {
+ # "id": unique string to identify this item in backlog,
+ # "worker": pointer to helper method
+ # "payload": opaque object to pass to the worker for processing
+ # "repeat": interval in micro-seconds for repeating this worker, 0 for one time worker
+ # "timestamp": time stamp of last invocation of this worker, 0 for initial state
+ # }
+ # format of backlog:
+ # {"<id value of backlog item>": <backlog item>, ...}
+ self.backlog = {}
+ # expired backlog items
+ self.expired_backlog = {}
+ # self.lock = threading.Lock()
+ self.state_ = 0 # 0: stopped, 1: started
+ self.thread = None
+
+ def state(self):
+ return self.state_
+
+ def start(self):
+ if 0 == self.state_:
+ self.state_ = 1
+ self.thread = MultiCloudThreadHelper.HelperThread(self)
+ self.thread.start()
+ else:
+ pass
+
+ def stop(self):
+ self.state_ = 0
+
+ def add(self, backlog_item):
+ if not hasattr(backlog_item, "worker"):
+ return None
+ if not hasattr(backlog_item, "id"):
+ backlog_item["id"] = str(uuid.uuid1())
+ if not hasattr(backlog_item, "repeat"):
+ backlog_item["repeat"] = 0
+ backlog_item["timestamp"] = 0
+
+ # self.lock.acquire()
+ self.backlog.update(backlog_item["id"], backlog_item)
+ # self.lock.release()
+ return len(self.backlog)
+
+ def get(self, backlog_id):
+ self.backlog.get(backlog_id, None) or self.expired_backlog.get(backlog_id, None)
+
+ def remove(self, backlog_id):
+ # self.lock.acquire()
+ self.backlog.pop(backlog_id, None)
+ self.expired_backlog.pop(backlog_id, None)
+ # self.lock.release()
+
+ def reset(self):
+ # self.lock.acquire()
+ self.backlog.clear()
+ self.expired_backlog.clear()
+ # self.lock.release()
+
+ def count(self):
+ return len(self.backlog)
+
+ class HelperThread(threading.Thread):
+ def __init__(self, owner):
+ threading.Thread.__init__(self)
+ self.daemon = True
+ self.duration = 0
+ self.owner = owner
+
+ def run(self):
+ logger.debug("Start processing backlogs")
+ while self.owner.state_ == 1 and self.owner.count() > 0:
+ for backlog_id, item in self.owner.backlog:
+ # check interval for repeatable backlog item
+ now = MultiCloudThreadHelper.get_epoch_now_usecond()
+ repeat_interval = item.get("repeat", 0)
+ if repeat_interval > 0:
+ timestamp = item.get("timestamp", 0)
+ # compare interval with elapsed time.
+ # workaround the case of timestamp turnaround
+ if repeat_interval > (now - timestamp
+ if now > timestamp
+ else repeat_interval):
+ # not time to run this backlog item yet
+ continue
+
+ worker = item.get("worker", None)
+ payload = item.get("payload", None)
+ try:
+ item["status"] = worker(payload) or 0
+ except Exception as e:
+ item["status"] = e.message
+ item["timestamp"] = now
+ if item.get("repeat", 0) == 0:
+ self.owner.remove(backlog_id)
+ self.owner.expired_backlog[backlog_id] = item
+ pass
+ pass
+ # end of loop
+ logger.debug("stop processing backlogs")
+ self.owner.state_ = 0
+ # end of processing