diff options
20 files changed, 1232 insertions, 73 deletions
diff --git a/newton/newton/registration/views/registration.py b/newton/newton/registration/views/registration.py index 4d954cb1..88cdae4d 100644 --- a/newton/newton/registration/views/registration.py +++ b/newton/newton/registration/views/registration.py @@ -119,13 +119,13 @@ class Registry(newton_registration.Registry): if len(caps_dict) > 0: self._logger.debug("storage_capabilities_info: %s" % caps_dict) hpa_caps.append(caps_dict) - + # CPU instruction set extension capabilities caps_dict = self._get_instruction_set_capabilities(extra_specs) if len(caps_dict) > 0: self._logger.debug("instruction_set_capabilities_info: %s" % caps_dict) hpa_caps.append(caps_dict) - + # PCI passthrough capabilities caps_dict = self._get_pci_passthrough_capabilities(extra_specs) if len(caps_dict) > 0: @@ -151,10 +151,13 @@ class Registry(newton_registration.Registry): basic_capability['hpa-feature-attributes'] = [] basic_capability['hpa-feature-attributes'].append({'hpa-attribute-key': 'numVirtualCpu', - 'hpa-attribute-value':str({'value': str(flavor['vcpus']) })}) + 'hpa-attribute-value': + '{{\"value\":\"{0}\"}}'.format(flavor['vcpus']) + }) basic_capability['hpa-feature-attributes'].append({'hpa-attribute-key':'virtualMemSize', - 'hpa-attribute-value': str({'value':str(flavor['ram']), 'unit':'MB'})}) - + 'hpa-attribute-value': + '{{\"value\":\"{0}\",\"unit\":\"{1}\"}}'.format(flavor['ram'],"MB") + }) return basic_capability def _get_cpupinning_capabilities(self, extra_specs): @@ -170,10 +173,14 @@ class Registry(newton_registration.Registry): cpupining_capability['hpa-feature-attributes'] = [] if extra_specs.has_key('hw:cpu_thread_policy'): cpupining_capability['hpa-feature-attributes'].append({'hpa-attribute-key': 'logicalCpuThreadPinningPolicy', - 'hpa-attribute-value':str({'value': str(extra_specs['hw:cpu_thread_policy'])})}) + 'hpa-attribute-value': + '{{\"value\":\"{0}\"}}'.format(extra_specs['hw:cpu_thread_policy']) + }) if extra_specs.has_key('hw:cpu_policy'): cpupining_capability['hpa-feature-attributes'].append({'hpa-attribute-key':'logicalCpuPinningPolicy', - 'hpa-attribute-value': str({'value':str(extra_specs['hw:cpu_policy'])})}) + 'hpa-attribute-value': + '{{\"value\":\"{0}\"}}'.format(extra_specs['hw:cpu_policy']) + }) return cpupining_capability @@ -190,13 +197,19 @@ class Registry(newton_registration.Registry): cputopology_capability['hpa-feature-attributes'] = [] if extra_specs.has_key('hw:cpu_sockets'): cputopology_capability['hpa-feature-attributes'].append({'hpa-attribute-key': 'numCpuSockets', - 'hpa-attribute-value':str({'value': str(extra_specs['hw:cpu_sockets'])})}) + 'hpa-attribute-value': + '{{\"value\":\"{0}\"}}'.format(extra_specs['hw:cpu_sockets']) + }) if extra_specs.has_key('hw:cpu_cores'): cputopology_capability['hpa-feature-attributes'].append({'hpa-attribute-key': 'numCpuCores', - 'hpa-attribute-value':str({'value': str(extra_specs['hw:cpu_cores'])})}) + 'hpa-attribute-value': + '{{\"value\":\"{0}\"}}'.format(extra_specs['hw:cpu_cores']) + }) if extra_specs.has_key('hw:cpu_threads'): cputopology_capability['hpa-feature-attributes'].append({'hpa-attribute-key': 'numCpuThreads', - 'hpa-attribute-value':str({'value': str(extra_specs['hw:cpu_threads'])})}) + 'hpa-attribute-value': + '{{\"value\":\"{0}\"}}'.format(extra_specs['hw:cpu_threads']) + }) return cputopology_capability @@ -213,18 +226,21 @@ class Registry(newton_registration.Registry): hugepages_capability['hpa-feature-attributes'] = [] if extra_specs['hw:mem_page_size'] == 'large': hugepages_capability['hpa-feature-attributes'].append({'hpa-attribute-key': 'memoryPageSize', - 'hpa-attribute-value':str({'value': '2', - 'unit': 'MB'})}) + 'hpa-attribute-value': + '{{\"value\":\"{0}\",\"unit\":\"{1}\"}}'.format(2,"MB") + }) elif extra_specs['hw:mem_page_size'] == 'small': hugepages_capability['hpa-feature-attributes'].append({'hpa-attribute-key': 'memoryPageSize', - 'hpa-attribute-value':str({'value': '4', - 'unit': 'KB'})}) + 'hpa-attribute-value': + '{{\"value\":\"{0}\",\"unit\":\"{1}\"}}'.format(4,"KB") + }) elif extra_specs['hw:mem_page_size'] == 'any': self._logger.info("Currently HPA feature memoryPageSize did not support 'any' page!!") else : hugepages_capability['hpa-feature-attributes'].append({'hpa-attribute-key': 'memoryPageSize', - 'hpa-attribute-value':str({'value': str(extra_specs['hw:mem_page_size']), - 'unit': 'KB'})}) + 'hpa-attribute-value': + '{{\"value\":\"{0}\",\"unit\":\"{1}\"}}'.format(extra_specs['hw:mem_page_size'],"KB") + }) return hugepages_capability @@ -240,7 +256,9 @@ class Registry(newton_registration.Registry): numa_capability['hpa-feature-attributes'] = [] numa_capability['hpa-feature-attributes'].append({'hpa-attribute-key': 'numaNodes', - 'hpa-attribute-value':str({'value': str(extra_specs['hw:numa_nodes'])})}) + 'hpa-attribute-value': + '{{\"value\":\"{0}\"}}'.format(extra_specs['hw:numa_nodes'] or 0) + }) for num in range(0, int(extra_specs['hw:numa_nodes'])): numa_cpu_node = "hw:numa_cpus.%s" % num @@ -250,9 +268,13 @@ class Registry(newton_registration.Registry): if extra_specs.has_key(numa_cpu_node) and extra_specs.has_key(numa_mem_node): numa_capability['hpa-feature-attributes'].append({'hpa-attribute-key': numacpu_key, - 'hpa-attribute-value':str({'value': str(extra_specs[numa_cpu_node])})}) + 'hpa-attribute-value': + '{{\"value\":\"{0}\"}}'.format(extra_specs[numa_cpu_node]) + }) numa_capability['hpa-feature-attributes'].append({'hpa-attribute-key': numamem_key, - 'hpa-attribute-value':str({'value': str(extra_specs[numa_mem_node]),'unit':'MB'})}) + 'hpa-attribute-value': + '{{\"value\":\"{0}\",\"unit\":\"{1}\"}}'.format(extra_specs[numa_mem_node],"MB") + }) return numa_capability @@ -267,11 +289,17 @@ class Registry(newton_registration.Registry): storage_capability['hpa-feature-attributes'] = [] storage_capability['hpa-feature-attributes'].append({'hpa-attribute-key': 'diskSize', - 'hpa-attribute-value':str({'value': str(flavor['disk']), 'unit':'GB'})}) + 'hpa-attribute-value': + '{{\"value\":\"{0}\",\"unit\":\"{1}\"}}'.format(flavor['disk'] or 0,"GB") + }) storage_capability['hpa-feature-attributes'].append({'hpa-attribute-key': 'swapMemSize', - 'hpa-attribute-value':str({'value': str(flavor['swap']), 'unit':'MB'})}) + 'hpa-attribute-value': + '{{\"value\":\"{0}\",\"unit\":\"{1}\"}}'.format(flavor['swap'] or 0,"MB") + }) storage_capability['hpa-feature-attributes'].append({'hpa-attribute-key': 'ephemeralDiskSize', - 'hpa-attribute-value':str({'value': str(flavor['OS-FLV-EXT-DATA:ephemeral']), 'unit':'GB'})}) + 'hpa-attribute-value': + '{{\"value\":\"{0}\",\"unit\":\"{1}\"}}'.format(flavor['OS-FLV-EXT-DATA:ephemeral'] or 0,"GB") + }) return storage_capability def _get_instruction_set_capabilities(self, extra_specs): @@ -286,7 +314,9 @@ class Registry(newton_registration.Registry): instruction_capability['hpa-feature-attributes'] = [] instruction_capability['hpa-feature-attributes'].append({'hpa-attribute-key': 'instructionSetExtensions', - 'hpa-attribute-value':str({'value': str(extra_specs['hw:capabilities:cpu_info:features'])})}) + 'hpa-attribute-value': + '{{\"value\":\"{0}\"}}'.format(extra_specs['hw:capabilities:cpu_info:features']) + }) return instruction_capability def _get_pci_passthrough_capabilities(self, extra_specs): @@ -305,11 +335,14 @@ class Registry(newton_registration.Registry): instruction_capability['hpa-feature-attributes'] = [] instruction_capability['hpa-feature-attributes'].append({'hpa-attribute-key': 'pciCount', - 'hpa-attribute-value':str({'value': str(value1[1])})}) + 'hpa-attribute-value': + '{{\"value\":\"{0}\"}}'.format(value1[1]) + }) instruction_capability['hpa-feature-attributes'].append({'hpa-attribute-key': 'pciVendorId', - 'hpa-attribute-value':str({'value': str(value2[3])})}) + 'hpa-attribute-value': + '{{\"value\":\"{0}\"}}'.format(value2[3]) }) instruction_capability['hpa-feature-attributes'].append({'hpa-attribute-key': 'pciDeviceId', - 'hpa-attribute-value':str({'value': str(value2[4])})}) + 'hpa-attribute-value': '{{\"value\":\"{0}\"}}'.format(value2[4]) }) return instruction_capability @@ -328,5 +361,7 @@ class Registry(newton_registration.Registry): ovsdpdk_capability['hpa-feature-attributes'] = [] ovsdpdk_capability['hpa-feature-attributes'].append({'hpa-attribute-key': str(cloud_dpdk_info.get("libname")), - 'hpa-attribute-value':str({'value': str(cloud_dpdk_info.get("libversion"))})}) + 'hpa-attribute-value': + '{{\"value\":\"{0}\"}}'.format(cloud_dpdk_info.get("libversion")) + }) return ovsdpdk_capability diff --git a/ocata/ocata/registration/tests/test_registration.py b/ocata/ocata/registration/tests/test_registration.py index 0d1cfab0..9087a792 100644 --- a/ocata/ocata/registration/tests/test_registration.py +++ b/ocata/ocata/registration/tests/test_registration.py @@ -41,7 +41,7 @@ OCATA_MOCK_VIM_INFO = { "version": "v1", "arch": "Intel64", "libname":"dataProcessingAccelerationLibrary", - "libvalue":"v12.1", + "libversion":"v12.1", } }, 'insecure': 'True' diff --git a/ocata/ocata/registration/views/registration.py b/ocata/ocata/registration/views/registration.py index 4d954cb1..bb898995 100644 --- a/ocata/ocata/registration/views/registration.py +++ b/ocata/ocata/registration/views/registration.py @@ -318,6 +318,13 @@ class Registry(newton_registration.Registry): feature_uuid = uuid.uuid4() cloud_extra_info_str = viminfo.get('cloud_extra_info') + if not isinstance(cloud_extra_info_str, dict): + try: + cloud_extra_info_str = json.loads(cloud_extra_info_str) + except Exception as ex: + logger.error("Can not convert cloud extra info %s %s" % ( + str(ex), cloud_extra_info_str)) + return {} if cloud_extra_info_str : cloud_dpdk_info = cloud_extra_info_str.get("ovsDpdk") if cloud_dpdk_info : diff --git a/share/common/msapi/extsys.py b/share/common/msapi/extsys.py index 99824ef2..626a38fc 100644 --- a/share/common/msapi/extsys.py +++ b/share/common/msapi/extsys.py @@ -24,8 +24,9 @@ def get_vim_by_id(vim_id): cloud_owner,cloud_region_id = decode_vim_id(vim_id) if cloud_owner and cloud_region_id: + # get cloud region without depth retcode, content, status_code = \ - restcall.req_to_aai("/cloud-infrastructure/cloud-regions/cloud-region/%s/%s?depth=1" + restcall.req_to_aai("/cloud-infrastructure/cloud-regions/cloud-region/%s/%s" % (cloud_owner,cloud_region_id),"GET") if retcode != 0: logger.error("Status code is %s, detail is %s.", status_code, content) @@ -34,18 +35,20 @@ def get_vim_by_id(vim_id): status_code, content) tmp_viminfo = json.JSONDecoder().decode(content) - #assume esr-system-info-id is composed by {cloud-owner} _ {cloud-region-id} -# retcode2,content2,status_code2 = \ -# restcall.req_to_aai(("/cloud-infrastructure/cloud-regions/cloud-region/%(owner)s/%(region)s" -# "/esr-system-info-list/esr-system-info/%(owner)s_%(region)s" % { -# "owner": cloud_owner, "region": cloud_region_id}), "GET") -# if retcode2 != 0: -# logger.error("Status code is %s, detail is %s.", status_code, content) -# raise VimDriverNewtonException( -# "Failed to query ESR system with id (%s:%s,%s)." % (vim_id,cloud_owner,cloud_region_id), -# status_code2, content2) -# tmp_authinfo = json.JSONDecoder().decode(content2) - tmp_authinfo = tmp_viminfo['esr-system-info-list']['esr-system-info'][0] if tmp_viminfo else None + # get esr-system-info under this cloud region + retcode2, content2, status_code2 = \ + restcall.req_to_aai("/cloud-infrastructure/cloud-regions/cloud-region/%s/%s/esr-system-info-list" + % (cloud_owner,cloud_region_id),"GET") + if retcode2 != 0: + logger.error("Status code is %s, detail is %s.", status_code2, content2) + raise VimDriverNewtonException( + "Failed to query esr info for VIM with id (%s:%s,%s)." % (vim_id,cloud_owner,cloud_region_id), + status_code2, content2) + tmp_authinfo = json.JSONDecoder().decode(content2) + + # get the first auth info by default + tmp_authinfo = tmp_authinfo['esr-system-info'][0] if tmp_authinfo \ + and tmp_authinfo.get('esr-system-info', None) else None #convert vim information if tmp_viminfo and tmp_authinfo: diff --git a/share/newton_base/registration/registration.py b/share/newton_base/registration/registration.py index 726eb691..67004390 100644 --- a/share/newton_base/registration/registration.py +++ b/share/newton_base/registration/registration.py @@ -47,12 +47,9 @@ class Registry(APIView): self._logger.info("request returns with status %s" % resp.status_code) if resp.status_code == status.HTTP_200_OK: self._logger.debug("with content:%s" % resp.json()) - pass - content = resp.json() - - if resp.status_code != status.HTTP_200_OK: - return # failed to discover resources - return content.get(content_key) + content = resp.json() + return content.get(content_key) + return # failed to discover resources def _update_resoure(self, cloud_owner, cloud_region_id, resoure_id, resource_info, resource_type): diff --git a/windriver/docker/Dockerfile b/windriver/docker/Dockerfile index 82e669e0..745eba93 100644 --- a/windriver/docker/Dockerfile +++ b/windriver/docker/Dockerfile @@ -19,6 +19,7 @@ EXPOSE 9005 # COPY ./ /opt/windriver/ RUN apt-get update && \ apt-get install -y memcached && \ + apt-get install -y rabbitmq-server && \ apt-get install -y unzip && \ cd /opt/ && \ wget -O multicloud-openstack-windriver.zip "https://nexus.onap.org/service/local/artifact/maven/redirect?r=snapshots&g=org.onap.multicloud.openstack&a=multicloud-openstack-windriver&e=zip&v=LATEST" && \ diff --git a/windriver/requirements.txt b/windriver/requirements.txt index 3d769c15..cec3a5ef 100644 --- a/windriver/requirements.txt +++ b/windriver/requirements.txt @@ -22,3 +22,5 @@ unittest_xml_reporting==1.12.0 # for onap logging onappylog>=1.0.6 +# for background tasks +celery >= 4.0 diff --git a/windriver/run.sh b/windriver/run.sh index 4cc23a60..8729584e 100644 --- a/windriver/run.sh +++ b/windriver/run.sh @@ -15,6 +15,11 @@ memcached -d -m 2048 -u root -c 1024 -p 11211 -P /tmp/memcached1.pid export PYTHONPATH=lib/share + +service rabbitmq-server restart +# make sure only 1 worker due to missing the synchronization between workers now +nohup celery -A titanium_cloud worker --concurrency=1 --loglevel=debug & + #nohup python manage.py runserver 0.0.0.0:9005 2>&1 & nohup uwsgi --http :9005 --module titanium_cloud.wsgi --master --processes 4 & diff --git a/windriver/titanium_cloud/__init__.py b/windriver/titanium_cloud/__init__.py index afa702d3..e9fe5d03 100644 --- a/windriver/titanium_cloud/__init__.py +++ b/windriver/titanium_cloud/__init__.py @@ -12,3 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. +from __future__ import absolute_import, unicode_literals + +# This will make sure the app is always imported when +# Django starts so that shared_task will use this app. +from .celery import app as celery_app + +__all__ = ['celery_app']
\ No newline at end of file diff --git a/windriver/titanium_cloud/celery.py b/windriver/titanium_cloud/celery.py new file mode 100644 index 00000000..368c5ddb --- /dev/null +++ b/windriver/titanium_cloud/celery.py @@ -0,0 +1,38 @@ +# Copyright (c) 2017-2018 Wind River Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import absolute_import, unicode_literals +import os +from celery import Celery +import logging + +# set the default Django settings module for the 'celery' program. +os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'titanium_cloud.settings') + +app = Celery('titanium_cloud') + +# Using a string here means the worker doesn't have to serialize +# the configuration object to child processes. +# - namespace='CELERY' means all celery-related configuration keys +# should have a `CELERY_` prefix. +app.config_from_object('django.conf:settings', namespace='CELERY') + +# Load task modules from all registered Django app configs. +app.autodiscover_tasks() + +logger = logging.getLogger(__name__) + +@app.task(bind=True) +def debug_task(self): + logger.debug("self.request") diff --git a/windriver/titanium_cloud/registration/views/registration.py b/windriver/titanium_cloud/registration/views/registration.py index a597dcca..3f1e5790 100644 --- a/windriver/titanium_cloud/registration/views/registration.py +++ b/windriver/titanium_cloud/registration/views/registration.py @@ -150,9 +150,13 @@ class Registry(newton_registration.Registry): basic_capability['hpa-feature-attributes'] = [] basic_capability['hpa-feature-attributes'].append({'hpa-attribute-key': 'numVirtualCpu', - 'hpa-attribute-value': str({'value': str(flavor['vcpus']) })}) + 'hpa-attribute-value': + '{{\"value\":\"{0}\"}}'.format(flavor['vcpus']) + }) basic_capability['hpa-feature-attributes'].append({'hpa-attribute-key':'virtualMemSize', - 'hpa-attribute-value': str({'value':str(flavor['ram']), 'unit':'MB'})}) + 'hpa-attribute-value': + '{{\"value\":\"{0}\",\"unit\":\"{1}\"}}'.format(flavor['ram'],"MB") + }) return basic_capability @@ -162,17 +166,21 @@ class Registry(newton_registration.Registry): if extra_specs.has_key('hw:cpu_policy') or extra_specs.has_key('hw:cpu_thread_policy'): cpupining_capability['hpa-capability-id'] = str(feature_uuid) - cpupining_capability['hpa-feature'] = 'cpuPining' + cpupining_capability['hpa-feature'] = 'cpuPinning' cpupining_capability['architecture'] = 'generic' cpupining_capability['hpa-version'] = 'v1' cpupining_capability['hpa-feature-attributes'] = [] if extra_specs.has_key('hw:cpu_thread_policy'): cpupining_capability['hpa-feature-attributes'].append({'hpa-attribute-key': 'logicalCpuThreadPinningPolicy', - 'hpa-attribute-value': str({'value': str(extra_specs['hw:cpu_thread_policy'])})}) + 'hpa-attribute-value': + '{{\"value\":\"{0}\"}}'.format(extra_specs['hw:cpu_thread_policy']) + }) if extra_specs.has_key('hw:cpu_policy'): cpupining_capability['hpa-feature-attributes'].append({'hpa-attribute-key':'logicalCpuPinningPolicy', - 'hpa-attribute-value': str({'value':str(extra_specs['hw:cpu_policy'])})}) + 'hpa-attribute-value': + '{{\"value\":\"{0}\"}}'.format(extra_specs['hw:cpu_policy']) + }) return cpupining_capability @@ -189,13 +197,19 @@ class Registry(newton_registration.Registry): cputopology_capability['hpa-feature-attributes'] = [] if extra_specs.has_key('hw:cpu_sockets'): cputopology_capability['hpa-feature-attributes'].append({'hpa-attribute-key': 'numCpuSockets', - 'hpa-attribute-value': str({'value': str(extra_specs['hw:cpu_sockets'])})}) + 'hpa-attribute-value': + '{{\"value\":\"{0}\"}}'.format(extra_specs['hw:cpu_sockets']) + }) if extra_specs.has_key('hw:cpu_cores'): cputopology_capability['hpa-feature-attributes'].append({'hpa-attribute-key': 'numCpuCores', - 'hpa-attribute-value': str({'value': str(extra_specs['hw:cpu_cores'])})}) + 'hpa-attribute-value': + '{{\"value\":\"{0}\"}}'.format(extra_specs['hw:cpu_cores']) + }) if extra_specs.has_key('hw:cpu_threads'): cputopology_capability['hpa-feature-attributes'].append({'hpa-attribute-key': 'numCpuThreads', - 'hpa-attribute-value': str({'value': str(extra_specs['hw:cpu_threads'])})}) + 'hpa-attribute-value': + '{{\"value\":\"{0}\"}}'.format(extra_specs['hw:cpu_threads']) + }) return cputopology_capability @@ -212,18 +226,21 @@ class Registry(newton_registration.Registry): hugepages_capability['hpa-feature-attributes'] = [] if extra_specs['hw:mem_page_size'] == 'large': hugepages_capability['hpa-feature-attributes'].append({'hpa-attribute-key': 'memoryPageSize', - 'hpa-attribute-value': str({'value': '2', - 'unit': 'MB'})}) + 'hpa-attribute-value': + '{{\"value\":\"{0}\",\"unit\":\"{1}\"}}'.format(2,"MB") + }) elif extra_specs['hw:mem_page_size'] == 'small': hugepages_capability['hpa-feature-attributes'].append({'hpa-attribute-key': 'memoryPageSize', - 'hpa-attribute-value': str({'value': '4', - 'unit': 'KB'})}) + 'hpa-attribute-value': + '{{\"value\":\"{0}\",\"unit\":\"{1}\"}}'.format(4,"KB") + }) elif extra_specs['hw:mem_page_size'] == 'any': self._logger.info("Currently HPA feature memoryPageSize did not support 'any' page!!") else : hugepages_capability['hpa-feature-attributes'].append({'hpa-attribute-key': 'memoryPageSize', - 'hpa-attribute-value': str({'value': str(extra_specs['hw:mem_page_size']), - 'unit': 'KB'})}) + 'hpa-attribute-value': + '{{\"value\":\"{0}\",\"unit\":\"{1}\"}}'.format(extra_specs['hw:mem_page_size'],"KB") + }) return hugepages_capability def _get_numa_capabilities(self, extra_specs): @@ -238,7 +255,9 @@ class Registry(newton_registration.Registry): numa_capability['hpa-feature-attributes'] = [] numa_capability['hpa-feature-attributes'].append({'hpa-attribute-key': 'numaNodes', - 'hpa-attribute-value': str({'value': str(extra_specs['hw:numa_nodes'])})}) + 'hpa-attribute-value': + '{{\"value\":\"{0}\"}}'.format(extra_specs['hw:numa_nodes'] or 0) + }) for num in range(0, int(extra_specs['hw:numa_nodes'])): numa_cpu_node = "hw:numa_cpus.%s" % num @@ -248,9 +267,13 @@ class Registry(newton_registration.Registry): if extra_specs.has_key(numa_cpu_node) and extra_specs.has_key(numa_mem_node): numa_capability['hpa-feature-attributes'].append({'hpa-attribute-key': numacpu_key, - 'hpa-attribute-value': str({'value': str(extra_specs[numa_cpu_node])})}) + 'hpa-attribute-value': + '{{\"value\":\"{0}\"}}'.format(extra_specs[numa_cpu_node]) + }) numa_capability['hpa-feature-attributes'].append({'hpa-attribute-key': numamem_key, - 'hpa-attribute-value': str({'value': str(extra_specs[numa_mem_node]),'unit':'MB'})}) + 'hpa-attribute-value': + '{{\"value\":\"{0}\",\"unit\":\"{1}\"}}'.format(extra_specs[numa_mem_node],"MB") + }) return numa_capability @@ -265,11 +288,17 @@ class Registry(newton_registration.Registry): storage_capability['hpa-feature-attributes'] = [] storage_capability['hpa-feature-attributes'].append({'hpa-attribute-key': 'diskSize', - 'hpa-attribute-value': str({'value': str(flavor['disk']), 'unit':'GB'})}) + 'hpa-attribute-value': + '{{\"value\":\"{0}\",\"unit\":\"{1}\"}}'.format(flavor['disk'] or 0,"GB") + }) storage_capability['hpa-feature-attributes'].append({'hpa-attribute-key': 'swapMemSize', - 'hpa-attribute-value': str({'value': str(flavor['swap']), 'unit':'MB'})}) + 'hpa-attribute-value': + '{{\"value\":\"{0}\",\"unit\":\"{1}\"}}'.format(flavor['swap'] or 0,"MB") + }) storage_capability['hpa-feature-attributes'].append({'hpa-attribute-key': 'ephemeralDiskSize', - 'hpa-attribute-value': str({'value': str(flavor['OS-FLV-EXT-DATA:ephemeral']), 'unit':'GB'})}) + 'hpa-attribute-value': + '{{\"value\":\"{0}\",\"unit\":\"{1}\"}}'.format(flavor['OS-FLV-EXT-DATA:ephemeral'] or 0,"GB") + }) return storage_capability def _get_instruction_set_capabilities(self, extra_specs): @@ -284,7 +313,9 @@ class Registry(newton_registration.Registry): instruction_capability['hpa-feature-attributes'] = [] instruction_capability['hpa-feature-attributes'].append({'hpa-attribute-key': 'instructionSetExtensions', - 'hpa-attribute-value': str({'value': str(extra_specs['hw:capabilities:cpu_info:features'])})}) + 'hpa-attribute-value': + '{{\"value\":\"{0}\"}}'.format(extra_specs['hw:capabilities:cpu_info:features']) + }) return instruction_capability def _get_pci_passthrough_capabilities(self, extra_specs): @@ -303,11 +334,17 @@ class Registry(newton_registration.Registry): instruction_capability['hpa-feature-attributes'] = [] instruction_capability['hpa-feature-attributes'].append({'hpa-attribute-key': 'pciCount', - 'hpa-attribute-value': str({'value': str(value1[1])})}) + 'hpa-attribute-value': + '{{\"value\":\"{0}\"}}'.format(value1[1]) + }) instruction_capability['hpa-feature-attributes'].append({'hpa-attribute-key': 'pciVendorId', - 'hpa-attribute-value': str({'value': str(value2[3])})}) + 'hpa-attribute-value': + '{{\"value\":\"{0}\"}}'.format(value2[3]) + }) instruction_capability['hpa-feature-attributes'].append({'hpa-attribute-key': 'pciDeviceId', - 'hpa-attribute-value': str({'value': str(value2[4])})}) + 'hpa-attribute-value': + '{{\"value\":\"{0}\"}}'.format(value2[4]) + }) return instruction_capability @@ -322,5 +359,7 @@ class Registry(newton_registration.Registry): instruction_capability['hpa-feature-attributes'] = [] instruction_capability['hpa-feature-attributes'].append({'hpa-attribute-key': 'dataProcessingAccelerationLibrary', - 'hpa-attribute-value': str({'value': str('v17.02')})}) + 'hpa-attribute-value': + '{{\"value\":\"{0}\"}}'.format("v17.02") + }) return instruction_capability diff --git a/windriver/titanium_cloud/requests/urls.py b/windriver/titanium_cloud/requests/urls.py index 6b5dc462..c3fd5a6a 100644 --- a/windriver/titanium_cloud/requests/urls.py +++ b/windriver/titanium_cloud/requests/urls.py @@ -35,7 +35,7 @@ urlpatterns = [ url(r'^images(/(?P<imageid>[0-9a-zA-Z_-]+))?', image.Images.as_view()), # image-file - url(r'^api/multicloud-titanium_cloud/v0/(?P<vimid>[0-9a-zA-Z_-]+)/image-file(/(?P<imageid>[0-9a-zA-Z_-]+))?', + url(r'^image-file(/(?P<imageid>[0-9a-zA-Z_-]+))?', imagefile.ImageFile.as_view()), url(r'^volumes(/(?P<volumeid>[0-9a-zA-Z_-]+))?', volume.Volumes.as_view()), diff --git a/windriver/titanium_cloud/urls.py b/windriver/titanium_cloud/urls.py index a6fcdcfb..51bcdd7f 100644 --- a/windriver/titanium_cloud/urls.py +++ b/windriver/titanium_cloud/urls.py @@ -18,6 +18,7 @@ from titanium_cloud.registration.views import registration from newton_base.openoapi import tenants from titanium_cloud.resource.views import capacity from titanium_cloud.resource.views import events +from titanium_cloud.vesagent import vesagent_ctrl urlpatterns = [ url(r'^', include('titanium_cloud.swagger.urls')), @@ -40,6 +41,9 @@ urlpatterns = [ # events url(r'^api/multicloud-titanium_cloud/v0/(?P<vimid>[0-9a-zA-Z_-]+)/events_check/?$', events.EventsCheck.as_view()), + url(r'^api/multicloud-titanium_cloud/v0/(?P<vimid>[0-9a-zA-Z_-]+)/vesagent/?$', + vesagent_ctrl.VesAgentCtrl.as_view()), + ] diff --git a/windriver/titanium_cloud/vesagent/__init__.py b/windriver/titanium_cloud/vesagent/__init__.py new file mode 100644 index 00000000..e4fe7a00 --- /dev/null +++ b/windriver/titanium_cloud/vesagent/__init__.py @@ -0,0 +1,16 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# Copyright (c) 2017-2018 Wind River Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + diff --git a/windriver/titanium_cloud/vesagent/event_domain/__init__.py b/windriver/titanium_cloud/vesagent/event_domain/__init__.py new file mode 100644 index 00000000..e4fe7a00 --- /dev/null +++ b/windriver/titanium_cloud/vesagent/event_domain/__init__.py @@ -0,0 +1,16 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# Copyright (c) 2017-2018 Wind River Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + diff --git a/windriver/titanium_cloud/vesagent/event_domain/fault_vm.py b/windriver/titanium_cloud/vesagent/event_domain/fault_vm.py new file mode 100644 index 00000000..308ef24c --- /dev/null +++ b/windriver/titanium_cloud/vesagent/event_domain/fault_vm.py @@ -0,0 +1,284 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# Copyright (c) 2017-2018 Wind River Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import json +import uuid +import time + +from django.conf import settings +from titanium_cloud.vesagent.vespublish import publishAnyEventToVES +from common.utils.restcall import _call_req + +logger = logging.getLogger(__name__) + +### build backlog with domain:"fault", type:"vm" + +def buildBacklog_fault_vm(vimid, backlog_input): + + logger.info("vimid: %s" % vimid) + logger.debug("with input: %s" % backlog_input) + + try: + + #must resolve the tenant id and server id while building the backlog + tenant_id = backlog_input.get("tenantid", None) + server_id = backlog_input.get("sourceid", None) + + # should resolve the name to id later + if tenant_id is None: + tenant_name = backlog_input["tenant"] + server_name = backlog_input["source"] + + if tenant_name is None or server_name is None: + logger.warn("tenant and source should be provided as backlog config") + return None + + # get token + # resolve tenant_name to tenant_id + auth_api_url_format = "/{f_vim_id}/identity/v2.0/tokens" + auth_api_url = auth_api_url_format.format(f_vim_id=vimid) + auth_api_data = { "auth":{"tenantName": tenant_name} } + base_url = settings.MULTICLOUD_PREFIX + extra_headers = '' + ret = _call_req(base_url, "", "", 0, auth_api_url, "POST", extra_headers, json.dumps(auth_api_data)) + if ret[0] > 0 or ret[1] is None: + logger.critical("call url %s failed with status %s" % (auth_api_url, ret[0])) + return None + + token_resp = json.JSONDecoder().decode(ret[1]) + token = token_resp["access"]["token"]["id"] + tenant_id = token_resp["access"]["token"]["tenant"]["id"] + + if server_id is None: + # resolve server_name to server_id + vserver_api_url_format \ + = "/{f_vim_id}/compute/v2.1/{f_tenant_id}/servers?name={f_server_name}" + vserver_api_url = vserver_api_url_format.format(f_vim_id=vimid, + f_tenant_id=tenant_id, + f_server_name=server_name) + base_url = settings.MULTICLOUD_PREFIX + extra_headers = {'X-Auth-Token': token} + ret = _call_req(base_url, "", "", 0, vserver_api_url, "GET", extra_headers, "") + if ret[0] > 0 or ret[1] is None: + logger.critical("call url %s failed with status %s" % (vserver_api_url, ret[0])) + return None + + server_resp = json.JSONDecoder().decode(ret[1]) + # find out the server wanted + for s in server_resp.get("servers", []): + if s["name"] == server_name: + server_id = s["id"] + break + if server_id is None: + logger.warn("source %s cannot be found under tenant id %s " + % (server_name, tenant_id)) + return None + + #m.c. proxied OpenStack API + api_url_fmt = "/{f_vim_id}/compute/v2.1/{f_tenant_id}/servers/{f_server_id}" + api_url = api_url_fmt.format( + f_vim_id=vimid, f_tenant_id=tenant_id, f_server_id=server_id) + + backlog = { + "backlog_uuid":str(uuid.uuid3(uuid.NAMESPACE_URL, + str("%s-%s-%s"%(vimid, tenant_id,server_id)))), + "tenant_id": tenant_id, + "server_id": server_id, + "api_method": "GET", + "api_link": api_url, + } + backlog.update(backlog_input) + except Exception as e: + logger.error("exception:%s" % str(e)) + return None + + logger.info("return") + logger.debug("with backlog: %s" % backlog) + return backlog + + +### process backlog with domain:"fault", type:"vm" + + + +def processBacklog_fault_vm(vesAgentConfig, vesAgentState, oneBacklog): + logger.debug("vesAgentConfig:%s, vesAgentState:%s, oneBacklog: %s" + % (vesAgentConfig, vesAgentState, oneBacklog)) + + try: + vimid = vesAgentConfig["vimid"] + tenant_name = oneBacklog["tenant"] + + # get token + auth_api_url_format = "/{f_vim_id}/identity/v2.0/tokens" + auth_api_url = auth_api_url_format.format(f_vim_id=vimid) + auth_api_data = { "auth":{"tenantName": tenant_name} } + base_url = settings.MULTICLOUD_PREFIX + extra_headers = '' + logger.debug("authenticate with url:%s" % auth_api_url) + ret = _call_req(base_url, "", "", 0, auth_api_url, "POST", extra_headers, json.dumps(auth_api_data)) + if ret[0] > 0 or ret[1] is None: + logger.critical("call url %s failed with status %s" % (auth_api_url, ret[0])) + + token_resp = json.JSONDecoder().decode(ret[1]) + logger.debug("authenticate resp: %s" % token_resp) + token = token_resp["access"]["token"]["id"] + + # collect data by issue API + api_link = oneBacklog["api_link"] + method = oneBacklog["api_method"] + base_url = settings.MULTICLOUD_PREFIX + data = '' + extra_headers = {'X-Auth-Token': token} + #which one is correct? extra_headers = {'HTTP_X_AUTH_TOKEN': token} + logger.debug("authenticate with url:%s, header:%s" % (auth_api_url,extra_headers)) + ret = _call_req(base_url, "", "", 0, api_link, method, extra_headers, data) + if ret[0] > 0 or ret[1] is None: + logger.critical("call url %s failed with status %s" % (api_link, ret[0])) + + server_resp = json.JSONDecoder().decode(ret[1]) + logger.debug("collected data: %s" % server_resp) + + # encode data + backlog_uuid = oneBacklog.get("backlog_uuid", None) + backlogState = vesAgentState.get("%s" % (backlog_uuid), None) + last_event = backlogState.get("last_event", None) + logger.debug("last event: %s" % last_event) + + this_event = data2event_fault_vm(oneBacklog, last_event, server_resp) + + if this_event is not None: + logger.debug("this event: %s" % this_event) + # report data to VES + ves_subscription = vesAgentConfig.get("subscription", None) + publishAnyEventToVES(ves_subscription, this_event) + # store the latest data into cache, never expire + backlogState["last_event"] = this_event + + except Exception as e: + logger.error("exception:%s" % str(e)) + return + + logger.info("return") + return + + +def data2event_fault_vm(oneBacklog, last_event, vm_data): + + VES_EVENT_VERSION = 3.0 + VES_EVENT_FAULT_VERSION = 2.0 + VES_EVENT_FAULT_DOMAIN = "fault" + + try: + + if vm_status_is_fault(vm_data["server"]["status"]): + if last_event is not None \ + and last_event['event']['commonEventHeader']['eventName'] == 'Fault_MultiCloud_VMFailure': + # asserted alarm already, so no need to assert it again + return None + + eventName = "Fault_MultiCloud_VMFailure" + priority = "High" + eventSeverity = "CRITICAL" + alarmCondition = "Guest_Os_Failure" + vfStatus = "Active" + specificProblem = "AlarmOn" + eventType = '' + reportingEntityId = '' + reportingEntityName = '' + sequence = 0 + + startEpochMicrosec = int(time.time()) + lastEpochMicrosec = int(time.time()) + + eventId = str(uuid.uuid4()) + pass + else: + if last_event is None \ + or last_event['event']['commonEventHeader']['eventName'] != 'Fault_MultiCloud_VMFailure': + # not assert alarm yet, so no need to clear it + return None + + + eventName = "Fault_MultiCloud_VMFailureCleared" + priority = "Normal" + eventSeverity = "NORMAL" + alarmCondition = "Vm_Restart" + vfStatus = "Active" + specificProblem = "AlarmOff" + eventType = '' + reportingEntityId = '' + reportingEntityName = '' + sequence = 0 + + startEpochMicrosec = last_event['event']['commonEventHeader']['startEpochMicrosec'] + lastEpochMicrosec = int(time.time()) + eventId = last_event['event']['commonEventHeader']['eventId'] + + pass + + # now populate the event structure + this_event = { + 'event': { + 'commonEventHeader': { + 'version': VES_EVENT_VERSION, + 'eventName': eventName, + 'domain': VES_EVENT_FAULT_DOMAIN, + 'eventId': eventId, + 'eventType': eventType, + 'sourceId': vm_data["server"]['id'], + 'sourceName': vm_data["server"]['name'], + 'reportingEntityId': reportingEntityId, + 'reportingEntityName': reportingEntityName, + 'priority': priority, + 'startEpochMicrosec': startEpochMicrosec, + 'lastEpochMicrosec': lastEpochMicrosec, + 'sequence': sequence + }, + 'faultFields': { + 'faultFieldsVersion': VES_EVENT_FAULT_VERSION, + 'eventSeverity': eventSeverity, + 'eventSourceType': 'virtualMachine', + 'alarmCondition': alarmCondition, + 'specificProblem': specificProblem, + 'vfStatus': 'Active' + } + + } + + } + + return this_event + + except Exception as e: + logger.error("exception:%s" % str(e)) + return None + + +def vm_status_is_fault(status): + ''' + report VM fault when status falls into one of following state + ['ERROR', 'DELETED', 'PAUSED', 'REBUILD', 'RESCUE', + 'RESIZE','REVERT_RESIZE', 'SHELVED', 'SHELVED_OFFLOADED', + 'SHUTOFF', 'SOFT_DELETED','SUSPENDED', 'UNKNOWN', 'VERIFY_RESIZE'] + :param status: + :return: + ''' + if status in ['BUILD', 'ACTIVE', 'HARD_REBOOT', 'REBOOT', 'MIGRATING', 'PASSWORD']: + return False + else: + return True diff --git a/windriver/titanium_cloud/vesagent/tasks.py b/windriver/titanium_cloud/vesagent/tasks.py new file mode 100644 index 00000000..ac760ece --- /dev/null +++ b/windriver/titanium_cloud/vesagent/tasks.py @@ -0,0 +1,196 @@ +# Copyright (c) 2017-2018 Wind River Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +### VES agent workers +from __future__ import absolute_import, unicode_literals +from titanium_cloud.celery import app +import os +import logging +import json +import time + +from django.core.cache import cache + +from titanium_cloud.vesagent.event_domain.fault_vm import processBacklog_fault_vm + +logger = logging.getLogger(__name__) + + +@app.task(bind=True) +def scheduleBacklogs(self, vimid): + # make sure only one task runs here + # cannot get vimid ? logger.info("schedule with vimid:%" % (vimid)) + + logger.debug("scheduleBacklogs starts") + backlog_count, next_time_slot = processBacklogs() + logger.debug("processBacklogs return with %s, %s" % (backlog_count, next_time_slot)) + + # sleep for next_time_slot + while backlog_count > 0: + time.sleep(next_time_slot) + backlog_count, next_time_slot = processBacklogs() + + logger.debug("scheduleBacklogs stops") + + +def processBacklogs(): + # find out count of valid backlog and the next time slot + backlog_count = 0 + next_time_slot = 10 + try: + #get the whole list of backlog + VesAgentBacklogsVimListStr = cache.get("VesAgentBacklogs.vimlist") + if VesAgentBacklogsVimListStr is None: + logger.warn("VesAgentBacklogs.vimlist cannot be found in cache") + return 0,next_time_slot + + logger.debug("VesAgentBacklogs.vimlist: %s" % (VesAgentBacklogsVimListStr)) + + backlogsAllVims = json.loads(VesAgentBacklogsVimListStr) + if backlogsAllVims is None: + logger.warn("VesAgentBacklogs.vimlist is empty") + return 0,next_time_slot + + for vimid in backlogsAllVims: + #iterate each backlogs + backlog_count_tmp,next_time_slot_tmp = processBacklogsOfOneVIM(vimid) + logger.debug("vimid:%s, backlog_count,next_time_slot:%s,%s" + %( vimid,backlog_count_tmp,next_time_slot_tmp )) + backlog_count += backlog_count_tmp + next_time_slot = next_time_slot_tmp if next_time_slot > next_time_slot_tmp else next_time_slot + pass + + except Exception as e: + logger.error("exception:%s" % str(e)) + + return backlog_count, next_time_slot + + pass + + +def processBacklogsOfOneVIM(vimid): + ''' + process all backlogs for a VIM, return count of valid backlogs + :param vimid: + :return: + ''' + backlog_count = 0 + next_time_slot = 10 + + try: + vesAgentConfigStr = cache.get("VesAgentBacklogs.config.%s" % (vimid)) + if vesAgentConfigStr is None: + logger.warn("VesAgentBacklogs.config.%s cannot be found in cache" % (vimid)) + return 0,next_time_slot + + logger.debug("VesAgentBacklogs.config.%s: %s" % (vimid, vesAgentConfigStr)) + + vesAgentConfig = json.loads(vesAgentConfigStr) + if vesAgentConfig is None: + logger.warn("VesAgentBacklogs.config.%s corrupts" % (vimid)) + return 0,next_time_slot + + + vesAgentStateStr = cache.get("VesAgentBacklogs.state.%s" % (vimid)) + vesAgentState = json.loads(vesAgentStateStr) if vesAgentStateStr is not None else {} + + ves_info = vesAgentConfig.get("subscription", None) + if ves_info is None: + logger.warn("VesAgentBacklogs.config.%s: ves subscription corrupts:%s" % (vimid, vesAgentConfigStr)) + return 0,next_time_slot + + poll_interval_default = vesAgentConfig.get("poll_interval_default", None) + if poll_interval_default is None: + logger.warn("VesAgentBacklogs.config.%s: poll_interval_default corrupts:%s" % (vimid, vesAgentConfigStr)) + return 0,next_time_slot + + if poll_interval_default == 0: + # invalid interval value + logger.warn("VesAgentBacklogs.config.%s: poll_interval_default invalid:%s" % (vimid, vesAgentConfigStr)) + return 0,next_time_slot + + backlogs_list = vesAgentConfig.get("backlogs", None) + if backlogs_list is None: + logger.warn("VesAgentBacklogs.config.%s: backlogs corrupts:%s" % (vimid, vesAgentConfigStr)) + return 0,next_time_slot + + for backlog in backlogs_list: + backlog_count_tmp, next_time_slot_tmp = processOneBacklog( + vesAgentConfig, vesAgentState, poll_interval_default, backlog) + logger.debug("processOneBacklog return with %s,%s" % (backlog_count_tmp, next_time_slot_tmp)) + backlog_count += backlog_count_tmp + next_time_slot = next_time_slot_tmp if next_time_slot > next_time_slot_tmp else next_time_slot + + pass + + # save back the updated backlogs state + vesAgentStateStr = json.dumps(vesAgentState) + cache.set("VesAgentBacklogs.state.%s" % vimid, vesAgentStateStr, None) + + except Exception as e: + logger.error("exception:%s" % str(e)) + + return backlog_count, next_time_slot + + +def processOneBacklog(vesAgentConfig, vesAgentState, poll_interval_default, oneBacklog): + logger.info("Process one backlog") + #logger.debug("vesAgentConfig:%s, vesAgentState:%s, poll_interval_default:%s, oneBacklog: %s" + # % (vesAgentConfig, vesAgentState, poll_interval_default, oneBacklog)) + + backlog_count = 1 + next_time_slot = 10 + try: + timestamp_now = int(time.time()) + backlog_uuid = oneBacklog.get("backlog_uuid", None) + if backlog_uuid is None: + # warning: uuid is None, omit this backlog + logger.warn("backlog without uuid: %s" % oneBacklog) + return 0, next_time_slot + + backlogState = vesAgentState.get("%s" % (backlog_uuid), None) + if backlogState is None: + initialBacklogState = { + "timestamp": timestamp_now + } + vesAgentState["%s" % (backlog_uuid)] = initialBacklogState + backlogState = initialBacklogState + + time_expiration = backlogState["timestamp"] \ + + oneBacklog.get("poll_interval", poll_interval_default) + # check if poll interval expires + if timestamp_now < time_expiration: + # not expired yet + logger.info("return without dispatching, not expired yet") + return backlog_count, next_time_slot + + logger.info("Dispatching backlog") + + # collect data in case of expiration + if oneBacklog["domain"] == "fault" and oneBacklog["type"] == "vm": + processBacklog_fault_vm(vesAgentConfig, vesAgentState, oneBacklog) + else: + logger.warn("Dispatching backlog fails due to unsupported backlog domain %s,type:%s" + % (oneBacklog["domain"], oneBacklog["type"])) + backlog_count = 0 + pass + + # update timestamp and internal state + backlogState["timestamp"] = timestamp_now + except Exception as e: + logger.error("exception:%s" % str(e)) + + logger.info("return") + return backlog_count, next_time_slot + diff --git a/windriver/titanium_cloud/vesagent/tests.py b/windriver/titanium_cloud/vesagent/tests.py new file mode 100644 index 00000000..7026d569 --- /dev/null +++ b/windriver/titanium_cloud/vesagent/tests.py @@ -0,0 +1,60 @@ +# Copyright (c) 2017-2018 Wind River Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import mock + +import unittest +import json +from django.test import Client +from rest_framework import status + +from django.core.cache import cache +from common.msapi import extsys + + + +MOCK_VIM_INFO = { + "createTime": "2017-04-01 02:22:27", + "domain": "Default", + "name": "TiS_R4", + "password": "admin", + "tenant": "admin", + "type": "openstack", + "url": "http://128.224.180.14:5000/v3", + "userName": "admin", + "vendor": "WindRiver", + "version": "newton", + "vimId": "windriver-hudson-dc_RegionOne", + 'cloud_owner': 'windriver-hudson-dc', + 'cloud_region_id': 'RegionOne', + 'cloud_extra_info': '{"vesagent_config":{"backlogs":[{"source":"onap-aaf","domain":"fault","type":"vm","tenant":"VIM"}],"poll_interval_default":10,"ves_subscription":{"username":"user","password":"password","endpoint":"http://127.0.0.1:9005/sample"}}}', + 'cloud_epa_caps': '', + 'insecure': 'True', +} + +class VesAgentCtrlTest(unittest.TestCase): + def setUp(self): + self.client = Client() + + def tearDown(self): + pass + + @mock.patch.object(cache, 'get') + @mock.patch.object(extsys, 'get_vim_by_id') + def test_get(self, mock_get_vim_by_id, mock_get): + mock_get_vim_by_id.return_value = MOCK_VIM_INFO + mock_get.return_value = '{"backlogs": [{"backlog_uuid": "2b8f6ff8-bc64-339b-a714-155909db937f", "server_id": "c4b575fa-ed85-4642-ab4b-335cb5744721", "tenant_id": "0e148b76ee8c42f78d37013bf6b7b1ae", "api_method": "GET", "source": "onap-aaf", "api_link": "/onaplab_RegionOne/compute/v2.1/0e148b76ee8c42f78d37013bf6b7b1ae/servers/c4b575fa-ed85-4642-ab4b-335cb5744721", "domain": "fault", "type": "vm", "tenant": "VIM"}], "poll_interval_default": 10, "vimid": "onaplab_RegionOne", "subscription": {"username": "user", "password": "password", "endpoint": "http://127.0.0.1:9005/sample"}}' + + response = self.client.get("/api/multicloud-titanium_cloud/v0/windriver-hudson-dc_RegionOne/vesagent") + self.assertEqual(status.HTTP_200_OK, response.status_code, response.content) diff --git a/windriver/titanium_cloud/vesagent/vesagent_ctrl.py b/windriver/titanium_cloud/vesagent/vesagent_ctrl.py new file mode 100644 index 00000000..fbdd93f8 --- /dev/null +++ b/windriver/titanium_cloud/vesagent/vesagent_ctrl.py @@ -0,0 +1,406 @@ +# Copyright (c) 2017-2018 Wind River Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import traceback +import json + +from rest_framework import status +from rest_framework.response import Response +from rest_framework.views import APIView + +from django.conf import settings +from common.msapi import extsys +from titanium_cloud.vesagent.tasks import scheduleBacklogs +from titanium_cloud.vesagent.event_domain.fault_vm import buildBacklog_fault_vm + +from django.core.cache import cache + +logger = logging.getLogger(__name__) + +class VesAgentCtrl(APIView): + ''' + control plane of VesAgent + Design tips: + 1, vesagent are multiple processing workers + 2, the runtime logic is simple: a vesagent worker polls the data source (vm/hypervisor/host/vim/etc.) + and then feeds the encoded data to VES. + 3, the vesagent workers can be distributed to different clouds while latency/throughput is concerned, + this distributed deployment usually comes along with the distributed VES deployment. + So it is very likely that the collected data from different VIM/Cloud instance will be fed into + different VES endpoint, however, assuming that there will be at most one VES endpoint serving + any single VIM/Cloud instance. + 4, According to VES specs, the collected data can be cataloged by domain: + domain : fault, heartbeat, measurementsForVfScaling, other, stateChange, syslog, thresholdCrossingAlert + As far as VIM/Cloud concerned, fault, heartbeat, measurementsForVfScaling, TCAalert are relevant. + 5, the source of the collected data can be cataloged by eventSourceType: + eventSourceType: VNF/VNFC/VM + As far as VIM/Cloud concerned, only VM is relevant. This eventSourceType should be extended to cover + the data source of hypervisor, VIM, Host,Controller, PIM, etc. + + 6, the source of collected data should be specified explicitly,so is the domain of the collected data. + To specify the source: eventSourceType, uuid or name of the source + To specify the domain: domain + the specifications above will be provisioned as a vesagent backlog entry to a VIM/Cloud instance + to tell a vesagent worker that : + with regarding to that VIM/Cloud instance, what kind of data to be collected from which source . + + 7,the VES endpoint will be also specified for a VIM/Cloud instance, so that all collected data + will be fed into this VES endpoint + + 8, the vesagent backlog are stored into the respective cloud_region's property "cloud-extra-info", + which implies that those specifications can be CRUD either by ESR portal or the RestAPIs in this view, e.g. + "cloud-extra-info": { + ..., + "vesagent_config": + { + "ves_subscription":{ + "endpoint":"http://{VES IP}:{VES port}/{URI}", + "username":"{VES username}", + "password":"{VES password}", + }, + "poll_interval_default" : "{default interval for polling}", + "backlogs":[ + { + "domain":"fault" + "type":"vm", + "tenant":"{tenant name1}", + "source":"{VM name1}", + "poll_interval" : "{optional, interval for polling}", + }, + { + "domain":"fault" + "type":"vm", + "tenant":"{tenant name2}", + "source":"{VM name2}", + "poll_interval" : "{optional, interval for polling}", + } + ] + } + } + + Idea: API dispatching to distributed M.C. service can be determined by Complex Object in AAI: + cloud-region has been assoicated to a Complex Object + M.C. plugin service instance should refer to the same Complex Object (by physical_locaton_id ?) + So the M.C. broker/API distributor/other approach will correlate the cloud-region with + corresponding M.C. plugin service instance. + + + Backlog built in cache: + + maintain backlog in cache and VES agent workers + cache objects: + "VesAgentBacklogs.vimlist": [ list of vimid] ### will not expire forever + "VesAgentBacklogs.state.{vimdid}": + ### will expire eventually to eliminate the garbage, expiration duration: 1hour? + { + "{backlog_uuid}": { + "timestamp": "{timestamp for last time of data collecting}", + "api_data": [list of data to populate the format string of the API link] + "last_event": {object, event reported to ves last time}" + } + } + "VesAgentBacklogs.config.{vimdid}": ### will not expire forever + { + "vimid": "{vim id}", + "subscription": { + "endpoint": "{ves endpoint, e.g. http://ves_ip:ves_port/eventListener/v5}", + "username": "{username}", + "password": "{password}" + } + "poll_interval_default" : "{default interval for polling}", + "backlogs":[ + { + "backlog_uuid": "{uuid to identify the backlog}" + "domain":"fault" + "type":"vm", + "tenant":"{tenant name1}", + "source":"{VM name1}", + "poll_interval" : "{optional, interval in second for polling}", + "api_method": "{GET/POST/PUT/etc.}", + "api_link":"{API link to collect data, could be format string}", + "tenant_id": tenant_id, + "server_id": server_id, + }, + { + "domain":"fault" + "type":"vm", + "tenant":"{tenant name2}", + "source":"{VM name2}", + "poll_interval" : "{optional, interval in second for polling}", + "api_method": "{GET/POST/PUT/etc.}", + "api_link":"{API link to collect data, could be format string}", + "tenant_id": tenant_id, + "server_id": server_id, + } + ] + } + ''' + + def __init__(self): + self._logger = logger + self.proxy_prefix = settings.MULTICLOUD_PREFIX + + + def get(self, request, vimid=""): + ''' + get blob of vesagent-config + :param request: + :param vimid: + :return: + ''' + self._logger.info("vimid: %s" % vimid) + self._logger.debug("with META: %s" % request.META) + try: + # get vesagent_config from cloud region + try: + viminfo = extsys.get_vim_by_id(vimid) + cloud_extra_info_str = viminfo.get('cloud_extra_info', '') + cloud_extra_info = json.loads(cloud_extra_info_str) if cloud_extra_info_str != '' else None + vesagent_config = cloud_extra_info.get("vesagent_config", None) if cloud_extra_info is not None else None + except Exception as e: + #ignore this error + self._logger.warn("cloud extra info is provided with data in bad format: %s" % cloud_extra_info_str) + pass + + vesagent_backlogs = self.getBacklogsOneVIM(vimid) + + except Exception as e: + self._logger.error("exception:%s" % str(e)) + return Response(data={'error': str(e)}, + status=status.HTTP_500_INTERNAL_SERVER_ERROR) + + self._logger.info("return with %s" % status.HTTP_200_OK) + return Response(data={"vesagent_config":vesagent_config, + "vesagent_backlogs": vesagent_backlogs}, + status=status.HTTP_200_OK) + + + def post(self, request, vimid=""): + ''' + update the blob of vesagent-config, rebuild the backlog for the vesagent workers, + and start the vesagent workers if not started yet + Implication: the request to this API endpoint will build the backlog locally, hence only local VES agent workers + will process these backlogs, which conforms to distributed deployment of M.C. services which includes VES agents + :param request:{"vesagent_config": + {"ves_subscription": + {"endpoint":"http://127.0.0.1:9005/sample", + "username":"user","password":"password"}, + "poll_interval_default":10, + "backlogs": + [ + {"domain":"fault","type":"vm","tenant":"VIM","source":"onap-aaf"} + ] + } + } + :param vimid: + :return: + ''' + self._logger.info("vimid: %s" % vimid) + self._logger.debug("with META: %s, with data: %s" % (request.META, request.data)) + try: + vesagent_config = None + if request.data is None or request.data.get("vesagent_config", None) is None: + #Try to load the vesagent_config out of cloud_region["cloud_extra_info"] + viminfo = extsys.get_vim_by_id(vimid) + cloud_extra_info_str = viminfo.get('cloud_extra_info', None) + cloud_extra_info = json.loads(cloud_extra_info_str) if cloud_extra_info_str is not None else None + vesagent_config = cloud_extra_info.get("vesagent_config", None) if cloud_extra_info is not None else None + else: + vesagent_config = request.data.get("vesagent_config", None) + + if vesagent_config is None: + return Response(data={'vesagent_config is not provided'}, + status=status.HTTP_400_BAD_REQUEST) + + vesagent_backlogs = self.buildBacklogsOneVIM(vimid, vesagent_config) + + # store back to cloud_extra_info + # tbd + + except Exception as e: + self._logger.error("exception:%s" % str(e)) + return Response(data={'error': str(e)}, + status=status.HTTP_500_INTERNAL_SERVER_ERROR) + + self._logger.info("return with %s" % status.HTTP_201_CREATED) + return Response(data={"vesagent_config":vesagent_config, + "vesagent_backlogs": vesagent_backlogs}, + status=status.HTTP_201_CREATED) + + def delete(self, request, vimid=""): + ''' + delete the blob of vesagent-config, remove it from backlog and stop the vesagent worker if no backlog + :param request: + :param vimid: + :return: + ''' + self._logger.info("vimid: %s" % vimid) + self._logger.debug("with META: %s" % request.META) + try: + # tbd + self.clearBacklogsOneVIM(vimid) + except Exception as e: + self._logger.error("exception:%s" % str(e)) + return Response(data={'error': str(e)}, + status=status.HTTP_500_INTERNAL_SERVER_ERROR) + + self._logger.info("return with %s" % status.HTTP_200_OK) + return Response(status=status.HTTP_200_OK) + + + def getBacklogsOneVIM(self, vimid): + ''' + remove the specified backlogs for a VIM + :param vimid: + :return: + ''' + self._logger.debug("vimid: %s" % vimid) + + vesAgentConfig = None + try: + # retrive the backlogs + vesAgentConfigStr = cache.get("VesAgentBacklogs.config.%s" % (vimid)) + if vesAgentConfigStr is None: + logger.warn("VesAgentBacklogs.config.%s cannot be found in cache" % (vimid)) + return None + + logger.debug("VesAgentBacklogs.config.%s: %s" % (vimid, vesAgentConfigStr)) + + vesAgentConfig = json.loads(vesAgentConfigStr) + if vesAgentConfig is None: + logger.warn("VesAgentBacklogs.config.%s corrupts" % (vimid)) + return None + + except Exception as e: + self._logger.error("exception:%s" % str(e)) + vesAgentConfig = {"error": "exception occurs"} + + self._logger.debug("return") + return vesAgentConfig + + def clearBacklogsOneVIM(self, vimid): + ''' + remove the specified backlogs for a VIM + :param vimid: + :param vesagent_config: + :return: + ''' + self._logger.debug("vimid: %s" % vimid) + + try: + # remove vimid from "VesAgentBacklogs.vimlist" + VesAgentBacklogsVimListStr = cache.get("VesAgentBacklogs.vimlist") + VesAgentBacklogsVimList = [] + if VesAgentBacklogsVimListStr is not None: + VesAgentBacklogsVimList = json.loads(VesAgentBacklogsVimListStr) + VesAgentBacklogsVimList = [v for v in VesAgentBacklogsVimList if v != vimid] + + logger.debug("VesAgentBacklogs.vimlist is %s" % VesAgentBacklogsVimList) + + # cache forever + cache.set("VesAgentBacklogs.vimlist", json.dumps(VesAgentBacklogsVimList), None) + + # retrieve the backlogs + vesAgentConfigStr = cache.get("VesAgentBacklogs.config.%s" % (vimid)) + if vesAgentConfigStr is None: + logger.warn("VesAgentBacklogs.config.%s cannot be found in cache" % (vimid)) + return 0 + + logger.debug("VesAgentBacklogs.config.%s: %s" % (vimid, vesAgentConfigStr)) + + vesAgentConfig = json.loads(vesAgentConfigStr) + if vesAgentConfig is None: + logger.warn("VesAgentBacklogs.config.%s corrupts" % (vimid)) + return 0 + + # iterate all backlog and remove the associate state! + # tbd + + # clear the whole backlogs for a VIM + cache.set("VesAgentBacklogs.config.%s" % vimid, "deleting the backlogs", 1) + + except Exception as e: + self._logger.error("exception:%s" % str(e)) + + self._logger.debug("return") + return 0 + + def buildBacklogsOneVIM(self, vimid, vesagent_config = None): + ''' + build and cache backlog for specific cloud region,spawn vesagent workers if needed + :param vimid: + :param vesagent_config: vesagent_config data in json object + :return: + ''' + self._logger.debug("vimid: %s" % vimid) + self._logger.debug("config data: %s" % vesagent_config) + + VesAgentBacklogsConfig = None + try: + if vesagent_config : + # now rebuild the backlog + VesAgentBacklogsConfig = { + "vimid": vimid, + "poll_interval_default": vesagent_config.get("poll_interval_default", 0), + "subscription": vesagent_config.get("ves_subscription", None), + "backlogs": [self.buildBacklog(vimid, b) for b in vesagent_config.get("backlogs", [])] + } + + + # add/update the backlog into cache + VesAgentBacklogsConfigStr = json.dumps(VesAgentBacklogsConfig) + # cache forever + cache.set("VesAgentBacklogs.config.%s" % vimid, VesAgentBacklogsConfigStr, None) + + # update list of vimid for vesagent + # get the whole list of backlog + VesAgentBacklogsVimListStr = cache.get("VesAgentBacklogs.vimlist") + VesAgentBacklogsVimList = [vimid] + if VesAgentBacklogsVimListStr is not None: + VesAgentBacklogsVimList = json.loads(VesAgentBacklogsVimListStr) + VesAgentBacklogsVimList = [v for v in VesAgentBacklogsVimList if v != vimid] + VesAgentBacklogsVimList.append(vimid) + + logger.debug("VesAgentBacklogs.vimlist is %s" % VesAgentBacklogsVimList) + + #cache forever + cache.set("VesAgentBacklogs.vimlist", json.dumps(VesAgentBacklogsVimList), None) + + # notify schduler + scheduleBacklogs.delay(vimid) + except Exception as e: + self._logger.error("exception:%s" % str(e)) + VesAgentBacklogsConfig = {"error":"exception occurs during build backlogs"} + + self._logger.debug("return") + return VesAgentBacklogsConfig + + def buildBacklog(self, vimid, backlog_input): + self._logger.debug("build backlog for: %s" % vimid) + self._logger.debug("with input: %s" % backlog_input) + + try: + if backlog_input["domain"] == "fault" and backlog_input["type"] == "vm": + return buildBacklog_fault_vm(vimid, backlog_input) + else: + self._logger.warn("return with failure: unsupported backlog domain:%s, type:%s" + % (backlog_input["domain"], backlog_input["type"] == "vm")) + return None + except Exception as e: + self._logger.error("exception:%s" % str(e)) + return None + + self._logger.debug("return without backlog") + return None diff --git a/windriver/titanium_cloud/vesagent/vespublish.py b/windriver/titanium_cloud/vesagent/vespublish.py new file mode 100644 index 00000000..ab0155c2 --- /dev/null +++ b/windriver/titanium_cloud/vesagent/vespublish.py @@ -0,0 +1,43 @@ +# Copyright (c) 2017-2018 Wind River Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import absolute_import, unicode_literals + +import time +import logging +import json +import urllib2 + +logger = logging.getLogger(__name__) + +def publishAnyEventToVES(ves_subscription, event): + logger.info("Start to send single event to VES collector.") + endpoint = ves_subscription.get("endpoint", None) + username = ves_subscription.get("username", None) + password = ves_subscription.get("password", None) + + if endpoint: + try: + logger.info("publish event to VES: %s", endpoint) + headers = {'Content-Type': 'application/json'} + request = urllib2.Request(url=endpoint, headers=headers, data=json.dumps(event)) + time.sleep(1) + response = urllib2.urlopen(request) + logger.info("VES response is: %s", response.read()) + except urllib2.URLError, e: + logger.critical("Failed to publish to %s: %s", endpoint, e.reason) + except Exception as e: + logger.error("exception:%s" % str(e)) + else: + logger.info("Missing VES info.")
\ No newline at end of file |