summaryrefslogtreecommitdiffstats
path: root/share
diff options
context:
space:
mode:
authorBin Yang <bin.yang@windriver.com>2019-04-02 08:28:12 +0000
committerBin Yang <bin.yang@windriver.com>2019-04-02 08:28:12 +0000
commitfeec0bcd53f24b0383f9f2db3a7837fa08525947 (patch)
tree3d8bd59c4cad6c2926735198e73634969b9aee2b /share
parent3176e57da097f2c238b134f18b1a4af00e105546 (diff)
Refactor helper codes
Move helper thread to common module Refactor the interface between helper thread and worker Change-Id: I0b61a2ed1a428f67cfbe3cc1411ace39e245932d Issue-ID: MULTICLOUD-554 Signed-off-by: Bin Yang <bin.yang@windriver.com>
Diffstat (limited to 'share')
-rw-r--r--share/common/msapi/helper.py228
-rw-r--r--share/newton_base/registration/registration.py479
-rw-r--r--share/starlingx_base/registration/registration.py106
3 files changed, 496 insertions, 317 deletions
diff --git a/share/common/msapi/helper.py b/share/common/msapi/helper.py
index 3e10c0fd..979c7feb 100644
--- a/share/common/msapi/helper.py
+++ b/share/common/msapi/helper.py
@@ -11,9 +11,14 @@
import json
import logging
-import re
+# import re
+import uuid
-from common.exceptions import VimDriverNewtonException
+import threading
+import datetime
+import time
+
+# from common.exceptions import VimDriverNewtonException
from common.utils import restcall
from rest_framework import status
@@ -21,6 +26,7 @@ from rest_framework import status
logger = logging.getLogger(__name__)
+# Helper of MultiCloud API invocation
class Helper(object):
@staticmethod
@@ -53,4 +59,220 @@ class Helper(object):
content = json.JSONDecoder().decode(ret[1])
ret[1] = content
return ret
- return [1, None, status.HTTP_404_NOT_FOUND] # return resource not found in case no type found \ No newline at end of file
+ return [1, None, status.HTTP_404_NOT_FOUND] # return resource not found in case no type found
+
+
+# Helper of AAI resource access
+class MultiCloudAAIHelper(object):
+
+ def __init__(self, multicloud_prefix, aai_base_url):
+ self.proxy_prefix = multicloud_prefix
+ self.aai_base_url = aai_base_url
+ self._logger = logger
+ # super(MultiCloudAAIHelper, self).__init__()
+
+ def _get_list_resources(
+ self, resource_url, service_type, session, viminfo,
+ vimid, content_key):
+ service = {
+ 'service_type': service_type,
+ 'interface': 'public'
+ }
+
+ # identity service should not filtered by region since it is might be first call
+ # to figure out available region list
+ if service_type != 'identity':
+ service['region_name'] = viminfo['openstack_region_id']\
+ if viminfo.get('openstack_region_id') else viminfo['cloud_region_id']
+
+ self._logger.debug("making request with URI:%s,%s" % (resource_url, service))
+ resp = session.get(resource_url, endpoint_filter=service)
+ self._logger.debug("request returns with status %s" % resp.status_code)
+ if resp.status_code == status.HTTP_200_OK:
+ self._logger.debug("with content:%s" % resp.json())
+ content = resp.json()
+ return content.get(content_key)
+ return None # failed to discover resources
+
+ def _update_resoure(self, cloud_owner, cloud_region_id,
+ resoure_id, resource_info, resource_type):
+ if cloud_owner and cloud_region_id:
+ self._logger.debug(
+ ("_update_resoure,vimid:%(cloud_owner)s"
+ "_%(cloud_region_id)s req_to_aai: %(resoure_id)s, "
+ "%(resource_type)s, %(resource_info)s")
+ % {
+ "cloud_owner": cloud_owner,
+ "cloud_region_id": cloud_region_id,
+ "resoure_id": resoure_id,
+ "resource_type": resource_type,
+ "resource_info": resource_info,
+ })
+
+ # get the resource first
+ resource_url = ("/cloud-infrastructure/cloud-regions/"
+ "cloud-region/%(cloud_owner)s/%(cloud_region_id)s/"
+ "%(resource_type)ss/%(resource_type)s/%(resoure_id)s"
+ % {
+ "cloud_owner": cloud_owner,
+ "cloud_region_id": cloud_region_id,
+ "resoure_id": resoure_id,
+ "resource_type": resource_type,
+ })
+
+ # get cloud-region
+ retcode, content, status_code = \
+ restcall.req_to_aai(resource_url, "GET")
+
+ # add resource-version
+ if retcode == 0 and content:
+ content = json.JSONDecoder().decode(content)
+ #resource_info["resource-version"] = content["resource-version"]
+ content.update(resource_info)
+ resource_info = content
+
+ #then update the resource
+ retcode, content, status_code = \
+ restcall.req_to_aai(resource_url, "PUT", content=resource_info)
+
+ self._logger.debug(
+ ("_update_resoure,vimid:%(cloud_owner)s"
+ "_%(cloud_region_id)s req_to_aai: %(resoure_id)s, "
+ "return %(retcode)s, %(content)s, %(status_code)s")
+ % {
+ "cloud_owner": cloud_owner,
+ "cloud_region_id": cloud_region_id,
+ "resoure_id": resoure_id,
+ "retcode": retcode,
+ "content": content,
+ "status_code": status_code,
+ })
+ return retcode, content
+ # unknown cloud owner,region_id
+ return (
+ 11,
+ "Unknown Cloud Region ID: %s ,%s" %(cloud_owner, cloud_region_id)
+ )
+ pass
+
+
+# thread helper
+class MultiCloudThreadHelper(object):
+ '''
+ thread to register infrastructure resource into AAI
+ '''
+
+ @staticmethod
+ def get_epoch_now_usecond():
+ '''
+ get epoch timestamp of this moment in usecond
+ :return:
+ '''
+ now_time = datetime.datetime.now()
+ epoch_time_sec = time.mktime(now_time.timetuple())
+ return int(epoch_time_sec * 1e6 + now_time.microsecond)
+
+ def __init__(self):
+ # format of a backlog item:
+ # {
+ # "id": unique string to identify this item in backlog,
+ # "worker": pointer to helper method
+ # "payload": opaque object to pass to the worker for processing
+ # "repeat": interval in micro-seconds for repeating this worker, 0 for one time worker
+ # "timestamp": time stamp of last invocation of this worker, 0 for initial state
+ # }
+ # format of backlog:
+ # {"<id value of backlog item>": <backlog item>, ...}
+ self.backlog = {}
+ # expired backlog items
+ self.expired_backlog = {}
+ # self.lock = threading.Lock()
+ self.state_ = 0 # 0: stopped, 1: started
+ self.thread = None
+
+ def state(self):
+ return self.state_
+
+ def start(self):
+ if 0 == self.state_:
+ self.state_ = 1
+ self.thread = MultiCloudThreadHelper.HelperThread(self)
+ self.thread.start()
+ else:
+ pass
+
+ def stop(self):
+ self.state_ = 0
+
+ def add(self, backlog_item):
+ if not hasattr(backlog_item, "worker"):
+ return None
+ if not hasattr(backlog_item, "id"):
+ backlog_item["id"] = str(uuid.uuid1())
+ if not hasattr(backlog_item, "repeat"):
+ backlog_item["repeat"] = 0
+ backlog_item["timestamp"] = 0
+
+ # self.lock.acquire()
+ self.backlog.update(backlog_item["id"], backlog_item)
+ # self.lock.release()
+ return len(self.backlog)
+
+ def get(self, backlog_id):
+ self.backlog.get(backlog_id, None) or self.expired_backlog.get(backlog_id, None)
+
+ def remove(self, backlog_id):
+ # self.lock.acquire()
+ self.backlog.pop(backlog_id, None)
+ self.expired_backlog.pop(backlog_id, None)
+ # self.lock.release()
+
+ def reset(self):
+ # self.lock.acquire()
+ self.backlog.clear()
+ self.expired_backlog.clear()
+ # self.lock.release()
+
+ def count(self):
+ return len(self.backlog)
+
+ class HelperThread(threading.Thread):
+ def __init__(self, owner):
+ threading.Thread.__init__(self)
+ self.daemon = True
+ self.duration = 0
+ self.owner = owner
+
+ def run(self):
+ logger.debug("Start processing backlogs")
+ while self.owner.state_ == 1 and self.owner.count() > 0:
+ for backlog_id, item in self.owner.backlog:
+ # check interval for repeatable backlog item
+ now = MultiCloudThreadHelper.get_epoch_now_usecond()
+ repeat_interval = item.get("repeat", 0)
+ if repeat_interval > 0:
+ timestamp = item.get("timestamp", 0)
+ # compare interval with elapsed time.
+ # workaround the case of timestamp turnaround
+ if repeat_interval > (now - timestamp
+ if now > timestamp
+ else repeat_interval):
+ # not time to run this backlog item yet
+ continue
+
+ worker = item.get("worker", None)
+ payload = item.get("payload", None)
+ try:
+ item["status"] = worker(payload) or 0
+ except Exception as e:
+ item["status"] = e.message
+ item["timestamp"] = now
+ if item.get("repeat", 0) == 0:
+ self.owner.remove(backlog_id)
+ self.owner.expired_backlog[backlog_id] = item
+ pass
+ pass
+ # end of loop
+ logger.debug("stop processing backlogs")
+ self.owner.state_ = 0
+ # end of processing
diff --git a/share/newton_base/registration/registration.py b/share/newton_base/registration/registration.py
index c7636ef2..6e8f8809 100644
--- a/share/newton_base/registration/registration.py
+++ b/share/newton_base/registration/registration.py
@@ -16,7 +16,6 @@ import logging
import json
import uuid
import traceback
-import threading
from keystoneauth1.exceptions import HttpError
from rest_framework import status
@@ -25,6 +24,7 @@ from rest_framework.views import APIView
from common.exceptions import VimDriverNewtonException
from common.msapi import extsys
+from common.msapi import helper
from common.utils import restcall
from newton_base.util import VimDriverUtils
@@ -37,6 +37,10 @@ class Registry(APIView):
if not hasattr(self, "_logger"):
self._logger = logger
+ if not hasattr(self, "register_thread"):
+ # dedicate thread to offload vim registration process
+ self.register_thread = helper.MultiCloudThreadHelper()
+
if not hasattr(self, "register_helper") or not self.register_helper:
if not hasattr(self, "proxy_prefix"):
self.proxy_prefix = "multicloud"
@@ -49,11 +53,17 @@ class Registry(APIView):
self._logger.debug("with data: %s" % request.data)
try:
-
- thread1 = RegisterHelperThread(self.register_helper.registry)
- thread1.addv0(vimid)
- if 0 == thread1.state():
- thread1.start()
+ # compose the one time backlog item
+ backlog_item = {
+ "id": vimid,
+ "worker": self.register_helper.registryV0,
+ "payload": (self.register_helper, vimid),
+ "repeat": 0,
+ "status": (1, "The registration process waits to be scheduled to run")
+ }
+ self.register_thread.add(backlog_item)
+ if 0 == self.register_thread.state():
+ self.register_thread.start()
return Response(status=status.HTTP_202_ACCEPTED)
@@ -68,15 +78,47 @@ class Registry(APIView):
data={'error': str(e)},
status=status.HTTP_500_INTERNAL_SERVER_ERROR)
+ def get(self, request, vimid):
+ try:
+ backlog_item = self.register_thread.get(vimid)
+ if backlog_item:
+ return Response(
+ data={'status': backlog_item.get("status", "Status not available, vimid: %s" % vimid)},
+ status=status.HTTP_200_OK)
+ else:
+ return Response(
+ data={
+ 'error': "Registration process for "
+ "Cloud Region not found: %s"
+ % vimid
+ },
+ status=status.HTTP_404_NOT_FOUND)
+ except Exception as e:
+ self._logger.error(traceback.format_exc())
+ return Response(
+ data={'error': str(e)},
+ status=status.HTTP_500_INTERNAL_SERVER_ERROR)
+
def delete(self, request, vimid=""):
self._logger.debug("Registration--delete::data> %s" % request.data)
self._logger.debug("Registration--delete::vimid > %s"% vimid)
try:
- retcode = RegistryHelper.unregistry(vimid)
+ # compose the one time backlog item
+ backlog_item = {
+ "id": vimid,
+ "worker": self.register_helper.unregistryV0,
+ "payload": (self.register_helper, vimid),
+ "repeat": 0,
+ "status": (1, "The registration process waits to be scheduled to run")
+ }
+ self.register_thread.add(backlog_item)
+ if 0 == self.register_thread.state():
+ self.register_thread.start()
- #ret_code = VimDriverUtils.delete_vim_info(vimid)
- return Response(status=status.HTTP_204_NO_CONTENT if retcode==0 else status.HTTP_500_INTERNAL_SERVER_ERROR)
+ return Response(
+ status=status.HTTP_204_NO_CONTENT
+ )
except VimDriverNewtonException as e:
return Response(data={'error': e.content}, status=e.status_code)
except HttpError as e:
@@ -88,7 +130,8 @@ class Registry(APIView):
status=status.HTTP_500_INTERNAL_SERVER_ERROR)
-class RegistryHelper(object):
+
+class RegistryHelper(helper.MultiCloudAAIHelper):
'''
Helper code to discover and register a cloud region's resource
'''
@@ -97,8 +140,15 @@ class RegistryHelper(object):
self.proxy_prefix = multicloud_prefix
self.aai_base_url = aai_base_url
self._logger = logger
+ super(RegistryHelper, self).__init__(multicloud_prefix, aai_base_url)
- def registry(self, vimid=""):
+ def registryV1(self, cloud_owner, cloud_region_id):
+ # cloud_owner = payload.get("cloud-owner", None)
+ # cloud_region_id = payload.get("cloud-region-id", None)
+ vimid = extsys.encode_vim_id(cloud_owner, cloud_region_id)
+ return self.registryV0(vimid)
+
+ def registryV0(self, vimid):
# populate proxy identity url
self._update_proxy_identity_endpoint(vimid)
@@ -106,48 +156,85 @@ class RegistryHelper(object):
# get token:
viminfo = VimDriverUtils.get_vim_info(vimid)
if not viminfo:
- raise VimDriverNewtonException(
- "There is no cloud-region with {cloud-owner}_{cloud-region-id}=%s in AAI" % vimid)
+ return (
+ 10,
+ "Cloud Region not found in AAI: %s" % vimid
+ )
# set the default tenant since there is no tenant info in the VIM yet
sess = VimDriverUtils.get_session(
viminfo, tenant_name=viminfo['tenant'])
# step 1. discover all projects and populate into AAI
- self._discover_tenants(vimid, sess, viminfo)
+ retcode, status = self._discover_tenants(vimid, sess, viminfo)
+ # if 0 != retcode:
+ # return (
+ # retcode, status
+ # )
# discover all flavors
- self._discover_flavors(vimid, sess, viminfo)
+ retcode, status = self._discover_flavors(vimid, sess, viminfo)
+ # if 0 != retcode:
+ # return (
+ # retcode, status
+ # )
# discover all images
- self._discover_images(vimid, sess, viminfo)
+ retcode, status = self._discover_images(vimid, sess, viminfo)
+ # if 0 != retcode:
+ # return (
+ # retcode, status
+ # )
# discover all az
- self._discover_availability_zones(vimid, sess, viminfo)
+ retcode, status = self._discover_availability_zones(vimid, sess, viminfo)
+ # if 0 != retcode:
+ # return (
+ # retcode, status
+ # )
# discover all vg
#self._discover_volumegroups(vimid, sess, viminfo)
+ # if 0 != retcode:
+ # return (
+ # retcode, status
+ # )
# discover all snapshots
#self._discover_snapshots(vimid, sess, viminfo)
+ # if 0 != retcode:
+ # return retcode, status
# discover all server groups
#self.discover_servergroups(request, vimid, sess, viminfo)
+ # if 0 != retcode:
+ # return retcode, status
# discover all pservers
#self._discover_pservers(vimid, sess, viminfo)
+ # if 0 != retcode:
+ # return retcode, status
- return 0
-
+ return (
+ 0,
+ "Registration finished for Cloud Region: %s" % vimid
+ )
- def unregistry(self, vimid=""):
+ def unregistryV1(self, cloud_owner, cloud_region_id):
+ # cloud_owner = payload.get("cloud-owner", None)
+ # cloud_region_id = payload.get("cloud-region-id", None)
+ vimid = extsys.encode_vim_id(cloud_owner, cloud_region_id)
+ return self.unregistryV0(vimid)
+ def unregistryV0(self, vimid):
# prepare request resource to vim instance
# get token:
viminfo = VimDriverUtils.get_vim_info(vimid)
if not viminfo:
- raise VimDriverNewtonException(
- "There is no cloud-region with {cloud-owner}_{cloud-region-id}=%s in AAI" % vimid)
+ return (
+ 10,
+ "Cloud Region not found:" % vimid
+ )
cloud_owner, cloud_region_id = extsys.decode_vim_id(vimid)
@@ -164,8 +251,14 @@ class RegistryHelper(object):
restcall.req_to_aai(resource_url, "GET")
# add resource-version
+ cloudregiondata = {}
if retcode == 0 and content:
cloudregiondata = json.JSONDecoder().decode(content)
+ else:
+ return (
+ 10,
+ "Cloud Region not found: %s, %s" % (cloud_owner, cloud_region_id)
+ )
# step 1. remove all tenants
tenants = cloudregiondata.get("tenants", None)
@@ -314,87 +407,7 @@ class RegistryHelper(object):
retcode, content, status_code = \
restcall.req_to_aai(resource_url, "DELETE")
- return retcode, content, status_code
-
-
- def _get_list_resources(
- self, resource_url, service_type, session, viminfo,
- vimid, content_key):
- service = {
- 'service_type': service_type,
- 'interface': 'public'
- }
-
- # identity service should not filtered by region since it is might be first call
- # to figure out available region list
- if service_type != 'identity':
- service['region_name'] = viminfo['openstack_region_id']\
- if viminfo.get('openstack_region_id') else viminfo['cloud_region_id']
-
- self._logger.info("making request with URI:%s,%s" % (resource_url,service))
- resp = session.get(resource_url, endpoint_filter=service)
- self._logger.info("request returns with status %s" % resp.status_code)
- if resp.status_code == status.HTTP_200_OK:
- self._logger.debug("with content:%s" % resp.json())
- content = resp.json()
- return content.get(content_key)
- return # failed to discover resources
-
- def _update_resoure(self, cloud_owner, cloud_region_id,
- resoure_id, resource_info, resource_type):
- if cloud_owner and cloud_region_id:
- self._logger.debug(
- ("_update_resoure,vimid:%(cloud_owner)s"
- "_%(cloud_region_id)s req_to_aai: %(resoure_id)s, "
- "%(resource_type)s, %(resource_info)s")
- % {
- "cloud_owner": cloud_owner,
- "cloud_region_id": cloud_region_id,
- "resoure_id": resoure_id,
- "resource_type": resource_type,
- "resource_info": resource_info,
- })
-
- #get the resource first
- resource_url = ("/cloud-infrastructure/cloud-regions/"
- "cloud-region/%(cloud_owner)s/%(cloud_region_id)s/"
- "%(resource_type)ss/%(resource_type)s/%(resoure_id)s"
- % {
- "cloud_owner": cloud_owner,
- "cloud_region_id": cloud_region_id,
- "resoure_id": resoure_id,
- "resource_type": resource_type,
- })
-
- # get cloud-region
- retcode, content, status_code = \
- restcall.req_to_aai(resource_url, "GET")
-
- # add resource-version
- if retcode == 0 and content:
- content = json.JSONDecoder().decode(content)
- #resource_info["resource-version"] = content["resource-version"]
- content.update(resource_info)
- resource_info = content
-
- #then update the resource
- retcode, content, status_code = \
- restcall.req_to_aai(resource_url, "PUT", content=resource_info)
-
- self._logger.debug(
- ("_update_resoure,vimid:%(cloud_owner)s"
- "_%(cloud_region_id)s req_to_aai: %(resoure_id)s, "
- "return %(retcode)s, %(content)s, %(status_code)s")
- % {
- "cloud_owner": cloud_owner,
- "cloud_region_id": cloud_region_id,
- "resoure_id": resoure_id,
- "retcode": retcode,
- "content": content,
- "status_code": status_code,
- })
- return retcode
- return 1 # unknown cloud owner,region_id
+ return retcode, content
def _discover_tenants(self, vimid="", session=None, viminfo=None):
try:
@@ -410,10 +423,12 @@ class RegistryHelper(object):
self._update_resoure(
cloud_owner, cloud_region_id, tenant['id'],
tenant_info, "tenant")
-
+ return (0, "succeed")
except VimDriverNewtonException as e:
self._logger.error("VimDriverNewtonException: status:%s, response:%s" % (e.http_status, e.content))
- return
+ return (
+ e.http_status, e.content
+ )
except HttpError as e:
if e.http_status == status.HTTP_403_FORBIDDEN:
### get the tenant information from the token response
@@ -434,12 +449,21 @@ class RegistryHelper(object):
except Exception as ex:
self._logger.error(traceback.format_exc())
+ return (
+ 11,
+ ex.message
+ )
else:
self._logger.error("HttpError: status:%s, response:%s" % (e.http_status, e.response.json()))
- return
+ return (
+ e.http_status, e.response.json()
+ )
except Exception as e:
self._logger.error(traceback.format_exc())
- return
+ return (
+ 11,
+ e.message
+ )
def _discover_flavors(self, vimid="", session=None, viminfo=None):
try:
@@ -472,19 +496,26 @@ class RegistryHelper(object):
hpa_capabilities = self._get_hpa_capabilities(flavor, extraResp, viminfo)
flavor_info['hpa-capabilities'] = {'hpa-capability': hpa_capabilities}
- self._update_resoure(
+ retcode, content = self._update_resoure(
cloud_owner, cloud_region_id, flavor['id'],
flavor_info, "flavor")
+ return (0, "succeed")
except VimDriverNewtonException as e:
self._logger.error("VimDriverNewtonException: status:%s, response:%s" % (e.http_status, e.content))
- return
+ return (
+ e.http_status, e.content
+ )
except HttpError as e:
self._logger.error("HttpError: status:%s, response:%s" % (e.http_status, e.response.json()))
- return
+ return (
+ e.http_status, e.response.json()
+ )
except Exception as e:
self._logger.error(traceback.format_exc())
- return
+ return (
+ 11, e.message
+ )
def _get_hpa_capabilities(self, flavor, extra_specs, viminfo):
hpa_caps = []
@@ -571,8 +602,11 @@ class RegistryHelper(object):
'hpa-attribute-value':
'{{\"value\":\"{0}\",\"unit\":\"{1}\"}}'.format(flavor['ram'],"MB")
})
- except Exception:
+ except Exception as e:
self._logger.error(traceback.format_exc())
+ return (
+ 11, e.message
+ )
return basic_capability
@@ -927,16 +961,22 @@ class RegistryHelper(object):
# parse the schema? TBD
# self.update_image(cloud_owner, cloud_region_id, image_info)
#metadata_info = {}
-
+ return (0, "succeed")
except VimDriverNewtonException as e:
self._logger.error("VimDriverNewtonException: status:%s, response:%s" % (e.http_status, e.content))
- return
+ return (
+ e.http_status, e.content
+ )
except HttpError as e:
self._logger.error("HttpError: status:%s, response:%s" % (e.http_status, e.response.json()))
- return
+ return (
+ e.http_status, e.response.json()
+ )
except Exception as e:
self._logger.error(traceback.format_exc())
- return
+ return (
+ 11, e.message
+ )
def _discover_availability_zones(self, vimid="", session=None,
viminfo=None):
@@ -968,37 +1008,18 @@ class RegistryHelper(object):
az_info['hypervisor-type'] = 'QEMU' # default for OpenStack
- # if az.get('hosts'):
- # for (k, v) in az['hosts'].items():
- # req_resource = "/os-hypervisors/detail?hypervisor_hostname_pattern=%s" % k
- # service = {'service_type': "compute",
- # 'interface': 'public',
- # 'region_name': viminfo['openstack_region_id']
- # if viminfo.get('openstack_region_id')
- # else viminfo['cloud_region_id']
- # }
- #
- # self._logger.info("making request with URI:%s" % req_resource)
- # resp = session.get(req_resource, endpoint_filter=service)
- # self._logger.info("request returns with status %s" % resp.status_code)
- # if resp.status_code == status.HTTP_200_OK:
- # self._logger.debug("with content:%s" % resp.json())
- # pass
- # content = resp.json()
- # if resp.status_code != status.HTTP_200_OK and not content[0]:
- # continue
- # az_info['hypervisor-type'] = content['hypervisors'][0]['hypervisor_type']\
- # if len(content.get('hypervisors')) else ''
- #
- # break
- ret = self._update_resoure(
+ ret, content = self._update_resoure(
cloud_owner, cloud_region_id, az['zoneName'], az_info,
"availability-zone")
if ret != 0:
# failed to update image
self._logger.debug("failed to populate az info into AAI: %s, az name: %s, ret:%s"
% (vimid, az_info['availability-zone-name'], ret))
- return None
+ # return (
+ # ret,
+ # "fail to popluate az info into AAI:%s" % content
+ # )
+ continue
# populate pservers:
for hostname in pservers_info:
@@ -1023,17 +1044,22 @@ class RegistryHelper(object):
self._update_pserver_relation_az(cloud_owner, cloud_region_id, pinfo, azName)
self._update_pserver_relation_cloudregion(cloud_owner, cloud_region_id, pinfo)
- return az_pserver_info
-
+ return (0, az_pserver_info)
except VimDriverNewtonException as e:
self._logger.error("VimDriverNewtonException: status:%s, response:%s" % (e.http_status, e.content))
- return None
+ return (
+ e.http_status, e.content
+ )
except HttpError as e:
self._logger.error("HttpError: status:%s, response:%s" % (e.http_status, e.response.json()))
- return None
+ return (
+ e.http_status, e.response.json()
+ )
except Exception as e:
self._logger.error(traceback.format_exc())
- return None
+ return (
+ 11, e.message
+ )
# def _discover_volumegroups(self, vimid="", session=None, viminfo=None):
# cloud_owner, cloud_region_id = extsys.decode_vim_id(vimid)
@@ -1076,23 +1102,33 @@ class RegistryHelper(object):
snapshot_info['snapshot-selflink'] = ss['metadata'].get('selflink')
snapshot_info['prev-snapshot-id'] = ss['metadata'].get('prev-snapshot-id')
- ret = self._update_resoure(
+ ret, content = self._update_resoure(
cloud_owner, cloud_region_id, ss['id'], snapshot_info,
"snapshot")
if ret != 0:
# failed to update image
self._logger.debug("failed to populate snapshot info into AAI: %s, snapshot-id: %s, ret:%s"
% (vimid, snapshot_info['snapshot-id'], ret))
-
+ return (
+ ret,
+ "fail to populate snapshot into AAI:%s" % content
+ )
+ return 0, "Succeed"
except VimDriverNewtonException as e:
self._logger.error("VimDriverNewtonException: status:%s, response:%s" % (e.http_status, e.content))
- return
+ return (
+ e.http_status, e.content
+ )
except HttpError as e:
self._logger.error("HttpError: status:%s, response:%s" % (e.http_status, e.response.json()))
- return
+ return (
+ e.http_status, e.response.json()
+ )
except Exception as e:
self._logger.error(traceback.format_exc())
- return
+ return (
+ 11, e.message
+ )
# def _discover_servergroups(self, vimid="", session=None, viminfo=None):
# for sg in self._get_list_resources(
@@ -1151,9 +1187,17 @@ class RegistryHelper(object):
cloud_owner, cloud_region_id,
pserverinfo['hostname'], retcode, content,
status_code))
-
-
- def _update_pserver_relation_cloudregion(self, cloud_owner, cloud_region_id, pserverinfo):
+ return (
+ 0,
+ "succeed"
+ )
+
+ def _update_pserver_relation_cloudregion(
+ self,
+ cloud_owner,
+ cloud_region_id,
+ pserverinfo
+ ):
related_link = ("%s/cloud-infrastructure/cloud-regions/"
"cloud-region/%s/%s"% (
self.aai_base_url, cloud_owner,
@@ -1194,6 +1238,10 @@ class RegistryHelper(object):
% (cloud_owner, cloud_region_id, cloud_owner, cloud_region_id,
pserverinfo['hostname'], retcode, content,
status_code))
+ return (
+ 0,
+ "succeed"
+ )
def _update_pserver(self, cloud_owner, cloud_region_id, pserverinfo):
'''
@@ -1254,8 +1302,13 @@ class RegistryHelper(object):
self._logger.debug("update_snapshot,vimid:%s_%s req_to_aai: %s, return %s, %s, %s"
% (cloud_owner,cloud_region_id, pserverinfo['hostname'], retcode, content, status_code))
- return retcode
- return 1 # unknown cloud owner,region_id
+ return retcode, content
+ else:
+ # unknown cloud owner,region_id
+ return (
+ 10,
+ "Cloud Region not found: %s,%s" % (cloud_owner, cloud_region_id)
+ )
def _discover_pservers(self, vimid="", session=None, viminfo=None):
try:
@@ -1287,22 +1340,30 @@ class RegistryHelper(object):
n_cpus = cputopo['cores'] * cputopo['threads'] * cputopo['sockets']
hypervisor_info['number-of-cpus'] = n_cpus
- ret = self._update_pserver(cloud_owner, cloud_region_id,
+ ret, content = self._update_pserver(cloud_owner, cloud_region_id,
hypervisor_info)
if ret != 0:
# failed to update image
self._logger.debug("failed to populate pserver info into AAI: %s, hostname: %s, ret:%s"
% (vimid, hypervisor_info['hostname'], ret))
+ return ret, "fail to update pserver to AAI:%s" % content
+ return 0, "succeed"
except VimDriverNewtonException as e:
self._logger.error("VimDriverNewtonException: status:%s, response:%s" % (e.http_status, e.content))
- return
+ return (
+ e.http_status, e.content
+ )
except HttpError as e:
self._logger.error("HttpError: status:%s, response:%s" % (e.http_status, e.response.json()))
- return
+ return (
+ e.http_status, e.response.json()
+ )
except Exception as e:
self._logger.error(traceback.format_exc())
- return
+ return (
+ 11, e.message
+ )
def _update_proxy_identity_endpoint(self, vimid):
'''
@@ -1335,96 +1396,30 @@ class RegistryHelper(object):
self._logger.debug("update_proxy_identity_endpoint,vimid:%s req_to_aai: %s, return %s, %s, %s"
% (vimid, viminfo['identity-url'], retcode, content, status_code))
+ return 0, "succeed"
else:
self._logger.debug("failure: update_proxy_identity_endpoint,vimid:%s req_to_aai: return %s, %s, %s"
% (vimid, retcode, content, status_code))
+ return retcode, content
+ else:
+ return (
+ 10,
+ "Cloud Region not found: %s" % vimid
+ )
except VimDriverNewtonException as e:
self._logger.error("VimDriverNewtonException: status:%s, response:%s" % (e.http_status, e.content))
- return
+ return (
+ e.http_status, e.content
+ )
except HttpError as e:
self._logger.error("HttpError: status:%s, response:%s" % (e.http_status, e.response.json()))
- return
+ return (
+ e.http_status, e.response.json()
+ )
except Exception as e:
self._logger.error(traceback.format_exc())
- return
-
-
-class RegisterHelperThread(threading.Thread):
- '''
- thread to register infrastructure resource into AAI
- '''
-
- def __init__(self, registry_helper):
- threading.Thread.__init__(self)
- self.daemon = True
- self.duration = 0
- self.helper = registry_helper
-
- # The queue of IDs of cloud regions, format:
- # v0: "owner1_regionid1"
- self.queuev0 = []
-
- # v1: {"cloud-owner": "owner1", "cloud-region-id": "regionid1"},
- self.queuev1 = []
- self.lock = threading.Lock()
-
- self.state_ = 0 # 0: stopped, 1: started
-
- def addv0(self, vimid):
- self.lock.acquire()
- self.queuev0.append(vimid)
- self.lock.release()
- return len(self.queuev0)
-
- def removev0(self, vimid):
- '''
- remove cloud region from list
- '''
- self.queuev0 = [x for x in self.queuev0 if x != vimid]
-
- def resetv0(self):
- self.queuev0 = []
-
- def countv0(self):
- return len(self.queuev0)
-
- def addv1(self, cloud_owner, cloud_region_id):
- self.lock.acquire()
- self.queuev1.append({"cloud-owner": cloud_owner, "cloud-region-id": cloud_region_id})
- self.lock.release()
- return len(self.queuev1)
-
- def removev1(self, cloud_owner, cloud_region_id):
- '''
- remove cloud region from list
- '''
- self.queuev1 = [x for x in self.queuev1 if x["cloud-owner"] != cloud_owner or x["cloud-region-id"] != cloud_region_id]
-
- def resetv1(self):
- self.queuev1 = []
-
- def countv1(self):
- return len(self.queuev1)
-
- def state(self):
- return self.state_
-
- def run(self):
- logger.debug("Starting registration thread")
- self.state_ = 1
- while self.helper and len(self.queuev0) > 0 and len(self.queuev1) > 0:
- self.lock.acquire()
- vimidv1 = self.queuev1.pop()
- self.lock.release()
- vimid = extsys.encode_vim_id(vimidv1["cloud-owner"], vimidv1["cloud-region-id"])
- self.helper(vimid)
-
- self.lock.acquire()
- vimidv0 = self.queuev0.pop()
- self.lock.release()
- self.helper(vimidv0)
-
- self.state_ = 0
- # end of processing
+ return (
+ 11, e.message
+ )
diff --git a/share/starlingx_base/registration/registration.py b/share/starlingx_base/registration/registration.py
index 59074568..fa79e5b3 100644
--- a/share/starlingx_base/registration/registration.py
+++ b/share/starlingx_base/registration/registration.py
@@ -25,12 +25,17 @@ from newton_base.registration import registration as newton_registration
from rest_framework import status
from rest_framework.response import Response
from common.msapi import extsys
+from common.msapi import helper
from keystoneauth1.exceptions import HttpError
from newton_base.util import VimDriverUtils
from common.utils import restcall
from django.core.cache import cache
logger = logging.getLogger(__name__)
+
+# global var: Audition thread
+gAZCapAuditThread = helper.MultiCloudThreadHelper()
+
# DEBUG=True
# APIv0 handler upgrading: leverage APIv1 handler
@@ -44,14 +49,24 @@ class APIv0Registry(newton_registration.Registry):
self._logger.info("registration with : %s" % vimid)
# vim registration will trigger the start the audit of AZ capacity
- gAZCapAuditThread.addv0(vimid)
+ worker_self = InfraResourceAuditor(
+ settings.MULTICLOUD_API_V1_PREFIX,
+ settings.AAI_BASE_URL
+ )
+ backlog_item = {
+ "id": vimid,
+ "worker": worker_self.azcap_audit,
+ "payload": (worker_self, vimid),
+ "repeat": 5*1000000, # repeat every 5 seconds
+ }
+ gAZCapAuditThread.add(backlog_item)
if 0 == gAZCapAuditThread.state():
gAZCapAuditThread.start()
return super(APIv0Registry, self).post(request, vimid)
def delete(self, request, vimid=""):
self._logger.debug("unregister cloud region: %s" % vimid)
- gAZCapAuditThread.removev0(vimid)
+ gAZCapAuditThread.remove(vimid)
return super(APIv0Registry, self).delete(request, vimid)
@@ -62,8 +77,8 @@ class Registry(APIv0Registry):
class APIv1Registry(newton_registration.Registry):
def __init__(self):
- super(APIv1Registry, self).__init__()
self.register_helper = RegistryHelper(settings.MULTICLOUD_API_V1_PREFIX, settings.AAI_BASE_URL)
+ super(APIv1Registry, self).__init__()
# self._logger = logger
def post(self, request, cloud_owner="", cloud_region_id=""):
@@ -74,7 +89,17 @@ class APIv1Registry(newton_registration.Registry):
vimid = extsys.encode_vim_id(cloud_owner, cloud_region_id)
# vim registration will trigger the start the audit of AZ capacity
- gAZCapAuditThread.addv0(vimid)
+ worker_self = InfraResourceAuditor(
+ settings.MULTICLOUD_API_V1_PREFIX,
+ settings.AAI_BASE_URL
+ )
+ backlog_item = {
+ "id": vimid,
+ "worker": worker_self.azcap_audit,
+ "payload": (worker_self, vimid),
+ "repeat": 5 * 1000000, # repeat every 5 seconds
+ }
+ gAZCapAuditThread.add(backlog_item)
if 0 == gAZCapAuditThread.state():
gAZCapAuditThread.start()
@@ -95,7 +120,7 @@ class APIv1Registry(newton_registration.Registry):
% (cloud_owner, cloud_region_id))
vimid = extsys.encode_vim_id(cloud_owner, cloud_region_id)
- gAZCapAuditThread.removev0(vimid)
+ gAZCapAuditThread.remove(vimid)
return super(APIv1Registry, self).delete(request, vimid)
@@ -107,7 +132,7 @@ class RegistryHelper(newton_registration.RegistryHelper):
super(RegistryHelper, self).__init__(multicloud_prefix, aai_base_url)
# self._logger = logger
- def registry(self, vimid=""):
+ def registryV0(self, vimid=""):
'''
extend base method
'''
@@ -132,7 +157,7 @@ class RegistryHelper(newton_registration.RegistryHelper):
viminfo, tenant_name=viminfo['tenant'])
# discover the regions, expect it always returns a list (even empty list)
- # cloud_owner, cloud_region_id = extsys.decode_vim_id(vimid)
+ cloud_owner, cloud_region_id = extsys.decode_vim_id(vimid)
# region_ids = self._discover_regions(cloud_owner, cloud_region_id, sess, viminfo)
region_ids = self._discover_regions(vimid, sess, viminfo)
@@ -298,8 +323,7 @@ class RegistryHelper(newton_registration.RegistryHelper):
return 1 # unknown cloud owner,region_id
# def _discover_regions(self, cloud_owner="", cloud_region_id="",
- def _discover_regions(self, vimid="",
- session=None, viminfo=None):
+ def _discover_regions(self, vimid, session=None, viminfo=None):
try:
regions = []
# vimid = extsys.encode_vim_id(cloud_owner, cloud_region_id)
@@ -343,7 +367,7 @@ class InfraResourceAuditor(newton_registration.RegistryHelper):
self._logger = logger
# super(InfraResourceAuditor, self).__init__();
- def azcap_audit(self, vimid=""):
+ def azcap_audit(self, vimid):
viminfo = VimDriverUtils.get_vim_info(vimid)
if not viminfo:
self._logger.warn("azcap_audit no valid vimid: %s" % vimid)
@@ -398,8 +422,6 @@ class InfraResourceAuditor(newton_registration.RegistryHelper):
# get list of host names
pservers_info = [k for (k, v) in az['hosts'].items()]
- # set the association between az and pservers
- #az_pserver_info[azName] = az['hosts']
# Get current cap info of azName
azCapCacheKey = "cap_" + vimid + "_" + azName
@@ -463,63 +485,3 @@ class InfraResourceAuditor(newton_registration.RegistryHelper):
except Exception as e:
self._logger.error("azcap_audit raise exception: %s" % e)
pass
-
-
-class AuditorHelperThread(threading.Thread):
- '''
- thread to register infrastructure resource into AAI
- '''
-
- def __init__(self, audit_helper):
- threading.Thread.__init__(self)
- self.daemon = True
- self.duration = 0
- self.helper = audit_helper
-
- # The set of IDs of cloud regions, format:
- # v0: "owner1_regionid1"
- self.queuev0 = set()
- self.lock = threading.Lock()
- self.state_ = 0 # 0: stopped, 1: started
-
- def addv0(self, vimid=""):
- self.lock.acquire()
- self.queuev0.add(vimid)
- self.lock.release()
- return len(self.queuev0)
-
- def removev0(self, vimid):
- '''
- discard cloud region from list without raise any exception
- '''
- self.queuev0.discard(vimid)
-
- def resetv0(self):
- self.queuev0.clear()
-
- def countv0(self):
- return len(self.queuev0)
-
- def state(self):
- return self.state_
-
- def run(self):
- logger.debug("Start the Audition thread")
- self.state_ = 1
- while self.helper and self.countv0() > 0:
- for vimidv0 in self.queuev0:
- self.helper(vimidv0)
- # sleep for a while in seconds
- time.sleep(5)
-
- self.state_ = 0
- logger.debug("Stop the Audition thread")
- # end of processing
-
-# global Audition thread
-gAZCapAuditThread = AuditorHelperThread(
- InfraResourceAuditor(
- settings.MULTICLOUD_API_V1_PREFIX,
- settings.AAI_BASE_URL).azcap_audit
-)
-