From feec0bcd53f24b0383f9f2db3a7837fa08525947 Mon Sep 17 00:00:00 2001 From: Bin Yang Date: Tue, 2 Apr 2019 08:28:12 +0000 Subject: Refactor helper codes Move helper thread to common module Refactor the interface between helper thread and worker Change-Id: I0b61a2ed1a428f67cfbe3cc1411ace39e245932d Issue-ID: MULTICLOUD-554 Signed-off-by: Bin Yang --- share/common/msapi/helper.py | 228 ++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 225 insertions(+), 3 deletions(-) (limited to 'share/common') diff --git a/share/common/msapi/helper.py b/share/common/msapi/helper.py index 3e10c0fd..979c7feb 100644 --- a/share/common/msapi/helper.py +++ b/share/common/msapi/helper.py @@ -11,9 +11,14 @@ import json import logging -import re +# import re +import uuid -from common.exceptions import VimDriverNewtonException +import threading +import datetime +import time + +# from common.exceptions import VimDriverNewtonException from common.utils import restcall from rest_framework import status @@ -21,6 +26,7 @@ from rest_framework import status logger = logging.getLogger(__name__) +# Helper of MultiCloud API invocation class Helper(object): @staticmethod @@ -53,4 +59,220 @@ class Helper(object): content = json.JSONDecoder().decode(ret[1]) ret[1] = content return ret - return [1, None, status.HTTP_404_NOT_FOUND] # return resource not found in case no type found \ No newline at end of file + return [1, None, status.HTTP_404_NOT_FOUND] # return resource not found in case no type found + + +# Helper of AAI resource access +class MultiCloudAAIHelper(object): + + def __init__(self, multicloud_prefix, aai_base_url): + self.proxy_prefix = multicloud_prefix + self.aai_base_url = aai_base_url + self._logger = logger + # super(MultiCloudAAIHelper, self).__init__() + + def _get_list_resources( + self, resource_url, service_type, session, viminfo, + vimid, content_key): + service = { + 'service_type': service_type, + 'interface': 'public' + } + + # identity service should not filtered by region since it is might be first call + # to figure out available region list + if service_type != 'identity': + service['region_name'] = viminfo['openstack_region_id']\ + if viminfo.get('openstack_region_id') else viminfo['cloud_region_id'] + + self._logger.debug("making request with URI:%s,%s" % (resource_url, service)) + resp = session.get(resource_url, endpoint_filter=service) + self._logger.debug("request returns with status %s" % resp.status_code) + if resp.status_code == status.HTTP_200_OK: + self._logger.debug("with content:%s" % resp.json()) + content = resp.json() + return content.get(content_key) + return None # failed to discover resources + + def _update_resoure(self, cloud_owner, cloud_region_id, + resoure_id, resource_info, resource_type): + if cloud_owner and cloud_region_id: + self._logger.debug( + ("_update_resoure,vimid:%(cloud_owner)s" + "_%(cloud_region_id)s req_to_aai: %(resoure_id)s, " + "%(resource_type)s, %(resource_info)s") + % { + "cloud_owner": cloud_owner, + "cloud_region_id": cloud_region_id, + "resoure_id": resoure_id, + "resource_type": resource_type, + "resource_info": resource_info, + }) + + # get the resource first + resource_url = ("/cloud-infrastructure/cloud-regions/" + "cloud-region/%(cloud_owner)s/%(cloud_region_id)s/" + "%(resource_type)ss/%(resource_type)s/%(resoure_id)s" + % { + "cloud_owner": cloud_owner, + "cloud_region_id": cloud_region_id, + "resoure_id": resoure_id, + "resource_type": resource_type, + }) + + # get cloud-region + retcode, content, status_code = \ + restcall.req_to_aai(resource_url, "GET") + + # add resource-version + if retcode == 0 and content: + content = json.JSONDecoder().decode(content) + #resource_info["resource-version"] = content["resource-version"] + content.update(resource_info) + resource_info = content + + #then update the resource + retcode, content, status_code = \ + restcall.req_to_aai(resource_url, "PUT", content=resource_info) + + self._logger.debug( + ("_update_resoure,vimid:%(cloud_owner)s" + "_%(cloud_region_id)s req_to_aai: %(resoure_id)s, " + "return %(retcode)s, %(content)s, %(status_code)s") + % { + "cloud_owner": cloud_owner, + "cloud_region_id": cloud_region_id, + "resoure_id": resoure_id, + "retcode": retcode, + "content": content, + "status_code": status_code, + }) + return retcode, content + # unknown cloud owner,region_id + return ( + 11, + "Unknown Cloud Region ID: %s ,%s" %(cloud_owner, cloud_region_id) + ) + pass + + +# thread helper +class MultiCloudThreadHelper(object): + ''' + thread to register infrastructure resource into AAI + ''' + + @staticmethod + def get_epoch_now_usecond(): + ''' + get epoch timestamp of this moment in usecond + :return: + ''' + now_time = datetime.datetime.now() + epoch_time_sec = time.mktime(now_time.timetuple()) + return int(epoch_time_sec * 1e6 + now_time.microsecond) + + def __init__(self): + # format of a backlog item: + # { + # "id": unique string to identify this item in backlog, + # "worker": pointer to helper method + # "payload": opaque object to pass to the worker for processing + # "repeat": interval in micro-seconds for repeating this worker, 0 for one time worker + # "timestamp": time stamp of last invocation of this worker, 0 for initial state + # } + # format of backlog: + # {"": , ...} + self.backlog = {} + # expired backlog items + self.expired_backlog = {} + # self.lock = threading.Lock() + self.state_ = 0 # 0: stopped, 1: started + self.thread = None + + def state(self): + return self.state_ + + def start(self): + if 0 == self.state_: + self.state_ = 1 + self.thread = MultiCloudThreadHelper.HelperThread(self) + self.thread.start() + else: + pass + + def stop(self): + self.state_ = 0 + + def add(self, backlog_item): + if not hasattr(backlog_item, "worker"): + return None + if not hasattr(backlog_item, "id"): + backlog_item["id"] = str(uuid.uuid1()) + if not hasattr(backlog_item, "repeat"): + backlog_item["repeat"] = 0 + backlog_item["timestamp"] = 0 + + # self.lock.acquire() + self.backlog.update(backlog_item["id"], backlog_item) + # self.lock.release() + return len(self.backlog) + + def get(self, backlog_id): + self.backlog.get(backlog_id, None) or self.expired_backlog.get(backlog_id, None) + + def remove(self, backlog_id): + # self.lock.acquire() + self.backlog.pop(backlog_id, None) + self.expired_backlog.pop(backlog_id, None) + # self.lock.release() + + def reset(self): + # self.lock.acquire() + self.backlog.clear() + self.expired_backlog.clear() + # self.lock.release() + + def count(self): + return len(self.backlog) + + class HelperThread(threading.Thread): + def __init__(self, owner): + threading.Thread.__init__(self) + self.daemon = True + self.duration = 0 + self.owner = owner + + def run(self): + logger.debug("Start processing backlogs") + while self.owner.state_ == 1 and self.owner.count() > 0: + for backlog_id, item in self.owner.backlog: + # check interval for repeatable backlog item + now = MultiCloudThreadHelper.get_epoch_now_usecond() + repeat_interval = item.get("repeat", 0) + if repeat_interval > 0: + timestamp = item.get("timestamp", 0) + # compare interval with elapsed time. + # workaround the case of timestamp turnaround + if repeat_interval > (now - timestamp + if now > timestamp + else repeat_interval): + # not time to run this backlog item yet + continue + + worker = item.get("worker", None) + payload = item.get("payload", None) + try: + item["status"] = worker(payload) or 0 + except Exception as e: + item["status"] = e.message + item["timestamp"] = now + if item.get("repeat", 0) == 0: + self.owner.remove(backlog_id) + self.owner.expired_backlog[backlog_id] = item + pass + pass + # end of loop + logger.debug("stop processing backlogs") + self.owner.state_ = 0 + # end of processing -- cgit 1.2.3-korg