diff options
author | rl001m <ruilu@research.att.com> | 2017-12-17 09:18:41 -0500 |
---|---|---|
committer | rl001m <ruilu@research.att.com> | 2017-12-17 09:32:55 -0500 |
commit | c98713c2fa29dc93a4226eaa837ddb36f11350a6 (patch) | |
tree | fde8530ebe337b9ebf4be6c9af25113c5807e659 /conductor | |
parent | 22cff5d3b51d9aa2d4fd11f657264e41063add1c (diff) |
Added data directory to the repository
Added the HAS-Data module in ONAP
Change-Id: Iac9774f8d88bff672c35e5cbab29bd4397012c73
Issue-ID: OPTFRA-12
Signed-off-by: rl001m <ruilu@research.att.com>
Diffstat (limited to 'conductor')
12 files changed, 1937 insertions, 0 deletions
diff --git a/conductor/conductor/data/__init__.py b/conductor/conductor/data/__init__.py new file mode 100644 index 0000000..9c965aa --- /dev/null +++ b/conductor/conductor/data/__init__.py @@ -0,0 +1,20 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2017 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +from .service import DataServiceLauncher # noqa: F401 diff --git a/conductor/conductor/data/plugins/__init__.py b/conductor/conductor/data/plugins/__init__.py new file mode 100644 index 0000000..f2bbdfd --- /dev/null +++ b/conductor/conductor/data/plugins/__init__.py @@ -0,0 +1,19 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2017 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + diff --git a/conductor/conductor/data/plugins/base.py b/conductor/conductor/data/plugins/base.py new file mode 100644 index 0000000..a124e29 --- /dev/null +++ b/conductor/conductor/data/plugins/base.py @@ -0,0 +1,30 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2017 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +import abc + +from oslo_log import log +import six + +LOG = log.getLogger(__name__) + + +@six.add_metaclass(abc.ABCMeta) +class DataPlugin(object): + """Base Data Plugin Class""" diff --git a/conductor/conductor/data/plugins/inventory_provider/__init__.py b/conductor/conductor/data/plugins/inventory_provider/__init__.py new file mode 100644 index 0000000..f2bbdfd --- /dev/null +++ b/conductor/conductor/data/plugins/inventory_provider/__init__.py @@ -0,0 +1,19 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2017 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + diff --git a/conductor/conductor/data/plugins/inventory_provider/aai.py b/conductor/conductor/data/plugins/inventory_provider/aai.py new file mode 100644 index 0000000..35b4ba7 --- /dev/null +++ b/conductor/conductor/data/plugins/inventory_provider/aai.py @@ -0,0 +1,1070 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2017 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +import re +import time +import uuid + + +from oslo_config import cfg +from oslo_log import log + +from conductor.common import rest +from conductor.data.plugins.inventory_provider import base +from conductor.i18n import _LE, _LI + +LOG = log.getLogger(__name__) + +CONF = cfg.CONF + +AAI_OPTS = [ + cfg.IntOpt('cache_refresh_interval', + default=1440, + help='Interval with which to refresh the local cache, ' + 'in minutes.'), + cfg.IntOpt('complex_cache_refresh_interval', + default=1440, + help='Interval with which to refresh the local complex cache, ' + 'in minutes.'), + cfg.StrOpt('table_prefix', + default='aai', + help='Data Store table prefix.'), + cfg.StrOpt('server_url', + default='https://controller:8443/aai', + help='Base URL for A&AI, up to and not including ' + 'the version, and without a trailing slash.'), + cfg.StrOpt('server_url_version', + default='v10', + help='The version of A&AI in v# format.'), + cfg.StrOpt('certificate_file', + default='certificate.pem', + help='SSL/TLS certificate file in pem format. ' + 'This certificate must be registered with the A&AI ' + 'endpoint.'), + cfg.StrOpt('certificate_key_file', + default='certificate_key.pem', + help='Private Certificate Key file in pem format.'), + cfg.StrOpt('certificate_authority_bundle_file', + default='certificate_authority_bundle.pem', + help='Certificate Authority Bundle file in pem format. ' + 'Must contain the appropriate trust chain for the ' + 'Certificate file.'), +] + +CONF.register_opts(AAI_OPTS, group='aai') + + +class AAI(base.InventoryProviderBase): + """Active and Available Inventory Provider""" + + def __init__(self): + """Initializer""" + + # FIXME(jdandrea): Pass this in to init. + self.conf = CONF + + self.base = self.conf.aai.server_url.rstrip('/') + self.version = self.conf.aai.server_url_version.rstrip('/') + self.cert = self.conf.aai.certificate_file + self.key = self.conf.aai.certificate_key_file + self.verify = self.conf.aai.certificate_authority_bundle_file + self.cache_refresh_interval = self.conf.aai.cache_refresh_interval + self.last_refresh_time = None + self.complex_cache_refresh_interval = \ + self.conf.aai.complex_cache_refresh_interval + self.complex_last_refresh_time = None + + # TODO(jdandrea): Make these config options? + self.timeout = 30 + self.retries = 3 + + kwargs = { + "server_url": self.base, + "retries": self.retries, + "cert_file": self.cert, + "cert_key_file": self.key, + "ca_bundle_file": self.verify, + "log_debug": self.conf.debug, + } + self.rest = rest.REST(**kwargs) + + # Cache is initially empty + self._aai_cache = {} + self._aai_complex_cache = {} + + def initialize(self): + """Perform any late initialization.""" + + # Refresh the cache once for now + self._refresh_cache() + + # TODO(jdandrea): Make this periodic, and without a True condition! + # executor = futurist.ThreadPoolExecutor() + # while True: + # fut = executor.submit(self.refresh_cache) + # fut.result() + # + # # Now wait for the next time. + # # FIXME(jdandrea): Put inside refresh_cache()? + # refresh_interval = self.conf.aai.cache_refresh_interval + # time.sleep(refresh_interval) + # executor.shutdown() + + def name(self): + """Return human-readable name.""" + return "A&AI" + + def _get_version_from_string(self, string): + """Extract version number from string""" + return re.sub("[^0-9.]", "", string) + + def _aai_versioned_path(self, path): + """Return a URL path with the A&AI version prepended""" + return '/{}/{}'.format(self.version, path.lstrip('/')) + + def _request(self, method='get', path='/', data=None, + context=None, value=None): + """Performs HTTP request.""" + headers = { + 'X-FromAppId': 'CONDUCTOR', + 'X-TransactionId': str(uuid.uuid4()), + } + kwargs = { + "method": method, + "path": path, + "headers": headers, + "data": data, + } + + # TODO(jdandrea): Move timing/response logging into the rest helper? + start_time = time.time() + response = self.rest.request(**kwargs) + elapsed = time.time() - start_time + LOG.debug("Total time for A&AI request " + "({0:}: {1:}): {2:.3f} sec".format(context, value, elapsed)) + + if response is None: + LOG.error(_LE("No response from A&AI ({}: {})"). + format(context, value)) + elif response.status_code != 200: + LOG.error(_LE("A&AI request ({}: {}) returned HTTP " + "status {} {}, link: {}{}"). + format(context, value, + response.status_code, response.reason, + self.base, path)) + return response + + def _refresh_cache(self): + """Refresh the A&AI cache.""" + if not self.last_refresh_time or \ + (time.time() - self.last_refresh_time) > \ + self.cache_refresh_interval * 60: + # TODO(snarayanan): + # The cache is not persisted to Music currently. + # A general purpose ORM caching + # object likely needs to be made, with a key (hopefully we + # can use one that is not just a UUID), a value, and a + # timestamp. The other alternative is to not use the ORM + # layer and call the API directly, but that is + # also trading one set of todos for another ... + + # Get all A&AI sites + LOG.info(_LI("**** Refreshing A&AI cache *****")) + path = self._aai_versioned_path( + '/cloud-infrastructure/cloud-regions/?depth=0') + response = self._request( + path=path, context="cloud regions", value="all") + if response is None: + return + regions = {} + if response.status_code == 200: + body = response.json() + regions = body.get('cloud-region', {}) + if not regions: + # Nothing to update the cache with + LOG.error(_LE("A&AI returned no regions, link: {}{}"). + format(self.base, path)) + return + cache = { + 'cloud_region': {}, + 'service': {}, + } + for region in regions: + cloud_region_version = region.get('cloud-region-version') + cloud_region_id = region.get('cloud-region-id') + cloud_owner = region.get('cloud-owner') + if not (cloud_region_version and + cloud_region_id): + continue + rel_link_data_list = \ + self._get_aai_rel_link_data( + data=region, + related_to='complex', + search_key='complex.physical-location-id') + if len(rel_link_data_list) > 1: + LOG.error(_LE("Region {} has more than one complex"). + format(cloud_region_id)) + LOG.debug("Region {}: {}".format(cloud_region_id, region)) + continue + rel_link_data = rel_link_data_list[0] + complex_id = rel_link_data.get("d_value") + complex_link = rel_link_data.get("link") + if complex_id and complex_link: + complex_info = self._get_complex( + complex_link=complex_link, + complex_id=complex_id) + else: # no complex information + LOG.error(_LE("Region {} does not reference a complex"). + format(cloud_region_id)) + continue + if not complex_info: + LOG.error(_LE("Region {}, complex {} info not found, " + "link {}").format(cloud_region_id, + complex_id, complex_link)) + continue + + latitude = complex_info.get('latitude') + longitude = complex_info.get('longitude') + complex_name = complex_info.get('complex-name') + city = complex_info.get('city') + state = complex_info.get('state') + region = complex_info.get('region') + country = complex_info.get('country') + if not (complex_name and latitude and longitude + and city and region and country): + keys = ('latitude', 'longitude', 'city', + 'complex-name', 'region', 'country') + missing_keys = \ + list(set(keys).difference(complex_info.keys())) + LOG.error(_LE("Complex {} is missing {}, link: {}"). + format(complex_id, missing_keys, complex_link)) + LOG.debug("Complex {}: {}". + format(complex_id, complex_info)) + continue + cache['cloud_region'][cloud_region_id] = { + 'cloud_region_version': cloud_region_version, + 'cloud_owner': cloud_owner, + 'complex': { + 'complex_id': complex_id, + 'complex_name': complex_name, + 'latitude': latitude, + 'longitude': longitude, + 'city': city, + 'state': state, + 'region': region, + 'country': country, + } + } + self._aai_cache = cache + self.last_refresh_time = time.time() + LOG.info(_LI("**** A&AI cache refresh complete *****")) + + # Helper functions to parse the relationships that + # AAI uses to tie information together. This should ideally be + # handled with libraries built for graph databases. Needs more + # exploration for such libraries. + @staticmethod + def _get_aai_rel_link(data, related_to): + """Given an A&AI data structure, return the related-to link""" + rel_dict = data.get('relationship-list') + if rel_dict: + for key, rel_list in rel_dict.items(): + for rel in rel_list: + if related_to == rel.get('related-to'): + return rel.get('related-link') + + @staticmethod + def _get_aai_rel_link_data(data, related_to, search_key=None, + match_dict=None): + # some strings that we will encounter frequently + rel_lst = "relationship-list" + rkey = "relationship-key" + rval = "relationship-value" + rdata = "relationship-data" + response = list() + if match_dict: + m_key = match_dict.get('key') + m_value = match_dict.get('value') + else: + m_key = None + m_value = None + rel_dict = data.get(rel_lst) + if rel_dict: # check if data has relationship lists + for key, rel_list in rel_dict.items(): + for rel in rel_list: + if rel.get("related-to") == related_to: + dval = None + matched = False + link = rel.get("related-link") + r_data = rel.get(rdata, []) + if search_key: + for rd in r_data: + if rd.get(rkey) == search_key: + dval = rd.get(rval) + if not match_dict: # return first match + response.append( + {"link": link, "d_value": dval} + ) + break # go to next relation + if rd.get(rkey) == m_key \ + and rd.get(rval) == m_value: + matched = True + if match_dict and matched: # if matching required + response.append( + {"link": link, "d_value": dval} + ) + # matched, return search value corresponding + # to the matched r_data group + else: # no search key; just return the link + response.append( + {"link": link, "d_value": dval} + ) + if len(response) == 0: + response.append( + {"link": None, "d_value": None} + ) + return response + + def _get_complex(self, complex_link, complex_id=None): + if not self.complex_last_refresh_time or \ + (time.time() - self.complex_last_refresh_time) > \ + self.complex_cache_refresh_interval * 60: + self._aai_complex_cache.clear() + if complex_id and complex_id in self._aai_complex_cache: + return self._aai_complex_cache[complex_id] + else: + path = self._aai_versioned_path( + self._get_aai_path_from_link(complex_link)) + response = self._request( + path=path, context="complex", value=complex_id) + if response is None: + return + if response.status_code == 200: + complex_info = response.json() + if 'complex' in complex_info: + complex_info = complex_info.get('complex') + latitude = complex_info.get('latitude') + longitude = complex_info.get('longitude') + complex_name = complex_info.get('complex-name') + city = complex_info.get('city') + region = complex_info.get('region') + country = complex_info.get('country') + if not (complex_name and latitude and longitude + and city and region and country): + keys = ('latitude', 'longitude', 'city', + 'complex-name', 'region', 'country') + missing_keys = \ + list(set(keys).difference(complex_info.keys())) + LOG.error(_LE("Complex {} is missing {}, link: {}"). + format(complex_id, missing_keys, complex_link)) + LOG.debug("Complex {}: {}". + format(complex_id, complex_info)) + return + + if complex_id: # cache only if complex_id is given + self._aai_complex_cache[complex_id] = response.json() + self.complex_last_refresh_time = time.time() + + return complex_info + + def _get_regions(self): + self._refresh_cache() + regions = self._aai_cache.get('cloud_region', {}) + return regions + + def _get_aai_path_from_link(self, link): + path = link.split(self.version) + if not path or len(path) <= 1: + # TODO(shankar): Treat this as a critical error? + LOG.error(_LE("A&AI version {} not found in link {}"). + format(self.version, link)) + else: + return "{}?depth=0".format(path[1]) + + def check_network_roles(self, network_role_id=None): + # the network role query from A&AI is not using + # the version number in the query + network_role_uri = \ + '/network/l3-networks?network-role=' + network_role_id + path = self._aai_versioned_path(network_role_uri) + network_role_id = network_role_id + + # This UUID is usually reserved by A&AI for a Conductor-specific named query. + named_query_uid = "" + + data = { + "query-parameters": { + "named-query": { + "named-query-uuid": named_query_uid + } + }, + "instance-filters": { + "instance-filter": [ + { + "l3-network": { + "network-role": network_role_id + } + } + ] + } + } + region_ids = set() + response = self._request('get', path=path, data=data, + context="role", value=network_role_id) + if response is None: + return None + body = response.json() + + response_items = body.get('l3-network', []) + + for item in response_items: + cloud_region_instances = self._get_aai_rel_link_data( + data=item, + related_to='cloud-region', + search_key='cloud-region.cloud-region-id' + ) + + if len(cloud_region_instances) > 0: + for r_instance in cloud_region_instances: + region_id = r_instance.get('d_value') + if region_id is not None: + region_ids.add(region_id) + + # return region ids that fit the role + return region_ids + + def resolve_host_location(self, host_name): + path = self._aai_versioned_path('/query?format=id') + data = {"start": ["network/pnfs/pnf/" + host_name, + "cloud-infrastructure/pservers/pserver/" + host_name], + "query": "query/ucpe-instance" + } + response = self._request('put', path=path, data=data, + context="host name", value=host_name) + if response is None or response.status_code != 200: + return None + body = response.json() + results = body.get('results', []) + complex_link = None + for result in results: + if "resource-type" in result and \ + "resource-link" in result and \ + result["resource-type"] == "complex": + complex_link = result["resource-link"] + if not complex_link: + LOG.error(_LE("Unable to get a complex link for hostname {} " + " in response {}").format(host_name, response)) + return None + complex_info = self._get_complex( + complex_link=complex_link, + complex_id=None + ) + if complex_info: + lat = complex_info.get('latitude') + lon = complex_info.get('longitude') + if lat and lon: + location = {"latitude": lat, "longitude": lon} + return location + else: + LOG.error(_LE("Unable to get a latitude and longitude " + "information for hostname {} from complex " + " link {}").format(host_name, complex_link)) + return None + else: + LOG.error(_LE("Unable to get a complex information for " + " hostname {} from complex " + " link {}").format(host_name, complex_link)) + return None + + def resolve_clli_location(self, clli_name): + clli_uri = '/cloud-infrastructure/complexes/complex/' + clli_name + path = self._aai_versioned_path(clli_uri) + + response = self._request('get', path=path, data=None, + context="clli name", value=clli_name) + if response is None or response.status_code != 200: + return None + + body = response.json() + + if body: + lat = body.get('latitude') + lon = body.get('longitude') + if lat and lon: + location = {"latitude": lat, "longitude": lon} + return location + else: + LOG.error(_LE("Unable to get a latitude and longitude " + "information for CLLI code {} from complex"). + format(clli_name)) + return None + + def get_inventory_group_pairs(self, service_description): + pairs = list() + path = self._aai_versioned_path( + '/network/instance-groups/?description={}&depth=0'.format( + service_description)) + response = self._request(path=path, context="inventory group", + value=service_description) + if response is None or response.status_code != 200: + return + body = response.json() + if "instance-group" not in body: + LOG.error(_LE("Unable to get instance groups from inventory " + " in response {}").format(response)) + return + for instance_groups in body["instance-group"]: + s_instances = self._get_aai_rel_link_data( + data=instance_groups, + related_to='service-instance', + search_key='service-instance.service-instance-id' + ) + if s_instances and len(s_instances) == 2: + pair = list() + for s_inst in s_instances: + pair.append(s_inst.get('d_value')) + pairs.append(pair) + else: + LOG.error(_LE("Number of instance pairs not found to " + "be two: {}").format(instance_groups)) + return pairs + + def _log_multiple_item_error(self, name, service_type, + related_to, search_key='', + context=None, value=None): + """Helper method to log multiple-item errors + + Used by resolve_demands + """ + LOG.error(_LE("Demand {}, role {} has more than one {} ({})"). + format(name, service_type, related_to, search_key)) + if context and value: + LOG.debug("{} details: {}".format(context, value)) + + def check_sriov_automation(self, aic_version, demand_name, candidate_name): + + """Check if specific candidate has SRIOV automation available or not + + Used by resolve_demands + """ + + if aic_version: + LOG.debug(_LI("Demand {}, candidate {} has an AIC version " + "number {}").format(demand_name, candidate_name, + aic_version) + ) + if aic_version == "3.6": + return True + return False + + def check_orchestration_status(self, orchestration_status, demand_name, candidate_name): + + """Check if the orchestration-status of a candidate is activated + + Used by resolve_demands + """ + + if orchestration_status: + LOG.debug(_LI("Demand {}, candidate {} has an orchestration " + "status {}").format(demand_name, candidate_name, + orchestration_status)) + if orchestration_status.lower() == "activated": + return True + return False + + def match_candidate_attribute(self, candidate, attribute_name, + restricted_value, demand_name, + inventory_type): + """Check if specific candidate attribute matches the restricted value + + Used by resolve_demands + """ + if restricted_value and \ + restricted_value is not '' and \ + candidate[attribute_name] != restricted_value: + LOG.info(_LI("Demand: {} " + "Discarded {} candidate as " + "it doesn't match the " + "{} attribute " + "{} ").format(demand_name, + inventory_type, + attribute_name, + restricted_value + ) + ) + return True + return False + + def match_vserver_attribute(self, vserver_list): + + value = None + for i in range(0, len(vserver_list)): + if value and \ + value != vserver_list[i].get('d_value'): + return False + value = vserver_list[i].get('d_value') + return True + + def resolve_demands(self, demands): + """Resolve demands into inventory candidate lists""" + + resolved_demands = {} + for name, requirements in demands.items(): + resolved_demands[name] = [] + for requirement in requirements: + inventory_type = requirement.get('inventory_type').lower() + service_type = requirement.get('service_type') + # service_id = requirement.get('service_id') + customer_id = requirement.get('customer_id') + + # region_id is OPTIONAL. This will restrict the initial + # candidate set to come from the given region id + restricted_region_id = requirement.get('region') + restricted_complex_id = requirement.get('complex') + + # get required candidates from the demand + required_candidates = requirement.get("required_candidates") + if required_candidates: + resolved_demands['required_candidates'] = \ + required_candidates + + # get excluded candidate from the demand + excluded_candidates = requirement.get("excluded_candidates") + + # service_resource_id is OPTIONAL and is + # transparent to Conductor + service_resource_id = requirement.get('service_resource_id') \ + if requirement.get('service_resource_id') else '' + + # add all the candidates of cloud type + if inventory_type == 'cloud': + # load region candidates from cache + regions = self._get_regions() + + if not regions or len(regions) < 1: + LOG.debug("Region information is not " + "available in cache") + for region_id, region in regions.items(): + # Pick only candidates from the restricted_region + + candidate = dict() + candidate['inventory_provider'] = 'aai' + candidate['service_resource_id'] = service_resource_id + candidate['inventory_type'] = 'cloud' + candidate['candidate_id'] = region_id + candidate['location_id'] = region_id + candidate['location_type'] = 'att_aic' + candidate['cost'] = 0 + candidate['cloud_region_version'] = \ + self._get_version_from_string( + region['cloud_region_version']) + candidate['cloud_owner'] = \ + region['cloud_owner'] + candidate['physical_location_id'] = \ + region['complex']['complex_id'] + candidate['complex_name'] = \ + region['complex']['complex_name'] + candidate['latitude'] = \ + region['complex']['latitude'] + candidate['longitude'] = \ + region['complex']['longitude'] + candidate['city'] = \ + region['complex']['city'] + candidate['state'] = \ + region['complex']['state'] + candidate['region'] = \ + region['complex']['region'] + candidate['country'] = \ + region['complex']['country'] + + if self.check_sriov_automation( + candidate['cloud_region_version'], name, + candidate['candidate_id']): + candidate['sriov_automation'] = 'true' + else: + candidate['sriov_automation'] = 'false' + + if self.match_candidate_attribute( + candidate, "candidate_id", + restricted_region_id, name, + inventory_type) or \ + self.match_candidate_attribute( + candidate, "physical_location_id", + restricted_complex_id, name, + inventory_type): + continue + + # Pick only candidates not in the excluded list + # if excluded candidate list is provided + if excluded_candidates: + has_excluded_candidate = False + for excluded_candidate in excluded_candidates: + if excluded_candidate \ + and excluded_candidate.get('inventory_type') == \ + candidate.get('inventory_type') \ + and excluded_candidate.get('candidate_id') == \ + candidate.get('candidate_id'): + has_excluded_candidate = True + break + + if has_excluded_candidate: + continue + + # Pick only candidates in the required list + # if required candidate list is provided + if required_candidates: + has_required_candidate = False + for required_candidate in required_candidates: + if required_candidate \ + and required_candidate.get('inventory_type') \ + == candidate.get('inventory_type') \ + and required_candidate.get('candidate_id') \ + == candidate.get('candidate_id'): + has_required_candidate = True + break + + if not has_required_candidate: + continue + + # add candidate to demand candidates + resolved_demands[name].append(candidate) + + elif inventory_type == 'service' \ + and service_type and customer_id: + # First level query to get the list of generic vnfs + path = self._aai_versioned_path( + '/network/generic-vnfs/' + '?prov-status=PROV&equipment-role={}&depth=0'.format(service_type)) + response = self._request( + path=path, context="demand, GENERIC-VNF role", + value="{}, {}".format(name, service_type)) + if response is None or response.status_code != 200: + continue # move ahead with next requirement + body = response.json() + generic_vnf = body.get("generic-vnf", []) + for vnf in generic_vnf: + # create a default candidate + candidate = dict() + candidate['inventory_provider'] = 'aai' + candidate['service_resource_id'] = service_resource_id + candidate['inventory_type'] = 'service' + candidate['candidate_id'] = '' + candidate['location_id'] = '' + candidate['location_type'] = 'att_aic' + candidate['host_id'] = '' + candidate['cost'] = 0 + candidate['cloud_owner'] = '' + candidate['cloud_region_version'] = '' + + # start populating the candidate + candidate['host_id'] = vnf.get("vnf-name") + + # check orchestration-status attribute, only keep Activated candidate + if (not self.check_orchestration_status( + vnf.get("orchestration-status"), name, candidate['host_id'])): + continue + + related_to = "vserver" + search_key = "cloud-region.cloud-owner" + rl_data_list = self._get_aai_rel_link_data( + data=vnf, related_to=related_to, + search_key=search_key) + + if len(rl_data_list) > 1: + if not self.match_vserver_attribute(rl_data_list): + self._log_multiple_item_error( + name, service_type, related_to, search_key, + "GENERIC-VNF", vnf) + continue + rl_data = rl_data_list[0] + + vs_link_list = list() + for i in range(0, len(rl_data_list)): + vs_link_list.append(rl_data_list[i].get('link')) + + candidate['cloud_owner'] = rl_data.get('d_value') + + search_key = "cloud-region.cloud-region-id" + + rl_data_list = self._get_aai_rel_link_data( + data=vnf, + related_to=related_to, + search_key=search_key + ) + if len(rl_data_list) > 1: + if not self.match_vserver_attribute(rl_data_list): + self._log_multiple_item_error( + name, service_type, related_to, search_key, + "GENERIC-VNF", vnf) + continue + rl_data = rl_data_list[0] + cloud_region_id = rl_data.get('d_value') + candidate['location_id'] = cloud_region_id + + # get AIC version for service candidate + if cloud_region_id: + cloud_region_uri = '/cloud-infrastructure/cloud-regions' \ + '/?cloud-region-id=' \ + + cloud_region_id + path = self._aai_versioned_path(cloud_region_uri) + + response = self._request('get', + path=path, + data=None) + if response is None or response.status_code != 200: + return None + + body = response.json() + regions = body.get('cloud-region', []) + + for region in regions: + if "cloud-region-version" in region: + candidate['cloud_region_version'] = \ + self._get_version_from_string( + region["cloud-region-version"]) + + if self.check_sriov_automation( + candidate['cloud_region_version'], name, + candidate['host_id']): + candidate['sriov_automation'] = 'true' + else: + candidate['sriov_automation'] = 'false' + + related_to = "service-instance" + search_key = "customer.global-customer-id" + match_key = "customer.global-customer-id" + rl_data_list = self._get_aai_rel_link_data( + data=vnf, + related_to=related_to, + search_key=search_key, + match_dict={'key': match_key, + 'value': customer_id} + ) + if len(rl_data_list) > 1: + if not self.match_vserver_attribute(rl_data_list): + self._log_multiple_item_error( + name, service_type, related_to, search_key, + "GENERIC-VNF", vnf) + continue + rl_data = rl_data_list[0] + vs_cust_id = rl_data.get('d_value') + + search_key = "service-instance.service-instance-id" + match_key = "customer.global-customer-id" + rl_data_list = self._get_aai_rel_link_data( + data=vnf, + related_to=related_to, + search_key=search_key, + match_dict={'key': match_key, + 'value': customer_id} + ) + if len(rl_data_list) > 1: + if not self.match_vserver_attribute(rl_data_list): + self._log_multiple_item_error( + name, service_type, related_to, search_key, + "GENERIC-VNF", vnf) + continue + rl_data = rl_data_list[0] + vs_service_instance_id = rl_data.get('d_value') + + if vs_cust_id and vs_cust_id == customer_id: + candidate['candidate_id'] = \ + vs_service_instance_id + else: # vserver is for a different customer + continue + + # Second level query to get the pserver from vserver + complex_list = list() + + for vs_link in vs_link_list: + + if not vs_link: + LOG.error(_LE("{} VSERVER link information not " + "available from A&AI").format(name)) + LOG.debug("Related link data: {}".format(rl_data)) + continue # move ahead with the next vnf + + vs_path = self._get_aai_path_from_link(vs_link) + if not vs_path: + LOG.error(_LE("{} VSERVER path information not " + "available from A&AI - {}"). + format(name, vs_path)) + continue # move ahead with the next vnf + path = self._aai_versioned_path(vs_path) + response = self._request( + path=path, context="demand, VSERVER", + value="{}, {}".format(name, vs_path)) + if response is None or response.status_code != 200: + continue + body = response.json() + + related_to = "pserver" + rl_data_list = self._get_aai_rel_link_data( + data=body, + related_to=related_to, + search_key=None + ) + if len(rl_data_list) > 1: + self._log_multiple_item_error( + name, service_type, related_to, "item", + "VSERVER", body) + continue + rl_data = rl_data_list[0] + ps_link = rl_data.get('link') + + # Third level query to get cloud region from pserver + if not ps_link: + LOG.error(_LE("{} pserver related link " + "not found in A&AI: {}"). + format(name, rl_data)) + continue + ps_path = self._get_aai_path_from_link(ps_link) + if not ps_path: + LOG.error(_LE("{} pserver path information " + "not found in A&AI: {}"). + format(name, ps_link)) + continue # move ahead with the next vnf + path = self._aai_versioned_path(ps_path) + response = self._request( + path=path, context="PSERVER", value=ps_path) + if response is None or response.status_code != 200: + continue + body = response.json() + + related_to = "complex" + search_key = "complex.physical-location-id" + rl_data_list = self._get_aai_rel_link_data( + data=body, + related_to=related_to, + search_key=search_key + ) + if len(rl_data_list) > 1: + if not self.match_vserver_attribute(rl_data_list): + self._log_multiple_item_error( + name, service_type, related_to, search_key, + "PSERVER", body) + continue + rl_data = rl_data_list[0] + complex_list.append(rl_data) + + if not complex_list or \ + len(complex_list) < 1: + LOG.error("Complex information not " + "available from A&AI") + continue + + if len(complex_list) > 1: + if not self.match_vserver_attribute(complex_list): + self._log_multiple_item_error( + name, service_type, related_to, search_key, + "GENERIC-VNF", vnf) + continue + + rl_data = complex_list[0] + complex_link = rl_data.get('link') + complex_id = rl_data.get('d_value') + + # Final query for the complex information + if not (complex_link and complex_id): + LOG.debug("{} complex information not " + "available from A&AI - {}". + format(name, complex_link)) + continue # move ahead with the next vnf + else: + complex_info = self._get_complex( + complex_link=complex_link, + complex_id=complex_id + ) + if not complex_info: + LOG.debug("{} complex information not " + "available from A&AI - {}". + format(name, complex_link)) + continue # move ahead with the next vnf + candidate['physical_location_id'] = \ + complex_id + candidate['complex_name'] = \ + complex_info.get('complex-name') + candidate['latitude'] = \ + complex_info.get('latitude') + candidate['longitude'] = \ + complex_info.get('longitude') + candidate['state'] = \ + complex_info.get('state') + candidate['country'] = \ + complex_info.get('country') + candidate['city'] = \ + complex_info.get('city') + candidate['region'] = \ + complex_info.get('region') + + # Pick only candidates not in the excluded list + # if excluded candidate list is provided + if excluded_candidates: + has_excluded_candidate = False + for excluded_candidate in excluded_candidates: + if excluded_candidate \ + and excluded_candidate.get('inventory_type') == \ + candidate.get('inventory_type') \ + and excluded_candidate.get('candidate_id') == \ + candidate.get('candidate_id'): + has_excluded_candidate = True + break + + if has_excluded_candidate: + continue + + # Pick only candidates in the required list + # if required candidate list is provided + if required_candidates: + has_required_candidate = False + for required_candidate in required_candidates: + if required_candidate \ + and required_candidate.get('inventory_type') \ + == candidate.get('inventory_type') \ + and required_candidate.get('candidate_id') \ + == candidate.get('candidate_id'): + has_required_candidate = True + break + + if not has_required_candidate: + continue + + # add the candidate to the demand + # Pick only candidates from the restricted_region + # or restricted_complex + if self.match_candidate_attribute( + candidate, + "location_id", + restricted_region_id, + name, + inventory_type) or \ + self.match_candidate_attribute( + candidate, + "physical_location_id", + restricted_complex_id, + name, + inventory_type): + continue + else: + resolved_demands[name].append(candidate) + else: + LOG.error("Unknown inventory_type " + " {}".format(inventory_type)) + + return resolved_demands diff --git a/conductor/conductor/data/plugins/inventory_provider/base.py b/conductor/conductor/data/plugins/inventory_provider/base.py new file mode 100644 index 0000000..8afb090 --- /dev/null +++ b/conductor/conductor/data/plugins/inventory_provider/base.py @@ -0,0 +1,42 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2017 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +import abc + +from oslo_log import log +import six + +from conductor.data.plugins import base + +LOG = log.getLogger(__name__) + + +@six.add_metaclass(abc.ABCMeta) +class InventoryProviderBase(base.DataPlugin): + """Base class for Inventory Provider plugins""" + + @abc.abstractmethod + def name(self): + """Return human-readable name.""" + pass + + @abc.abstractmethod + def resolve_demands(self, demands): + """Resolve demands into inventory candidate lists""" + pass diff --git a/conductor/conductor/data/plugins/inventory_provider/extensions.py b/conductor/conductor/data/plugins/inventory_provider/extensions.py new file mode 100644 index 0000000..18f4c4b --- /dev/null +++ b/conductor/conductor/data/plugins/inventory_provider/extensions.py @@ -0,0 +1,45 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2017 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +from oslo_log import log +import stevedore + +from conductor.conf import inventory_provider +from conductor.i18n import _LI + +LOG = log.getLogger(__name__) + +inventory_provider.register_extension_manager_opts() + + +class Manager(stevedore.named.NamedExtensionManager): + """Manage Inventory Provider extensions.""" + + def __init__(self, conf, namespace): + super(Manager, self).__init__( + namespace, conf.inventory_provider.extensions, + invoke_on_load=True, name_order=True) + LOG.info(_LI("Loaded inventory provider extensions: %s"), self.names()) + + def initialize(self): + """Initialize enabled inventory provider extensions.""" + for extension in self.extensions: + LOG.info(_LI("Initializing inventory provider extension '%s'"), + extension.name) + extension.obj.initialize() diff --git a/conductor/conductor/data/plugins/service_controller/__init__.py b/conductor/conductor/data/plugins/service_controller/__init__.py new file mode 100644 index 0000000..f2bbdfd --- /dev/null +++ b/conductor/conductor/data/plugins/service_controller/__init__.py @@ -0,0 +1,19 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2017 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + diff --git a/conductor/conductor/data/plugins/service_controller/base.py b/conductor/conductor/data/plugins/service_controller/base.py new file mode 100644 index 0000000..ad00c98 --- /dev/null +++ b/conductor/conductor/data/plugins/service_controller/base.py @@ -0,0 +1,42 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2017 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +import abc + +from oslo_log import log +import six + +from conductor.data.plugins import base + +LOG = log.getLogger(__name__) + + +@six.add_metaclass(abc.ABCMeta) +class ServiceControllerBase(base.DataPlugin): + """Base class for Service Controller plugins""" + + @abc.abstractmethod + def name(self): + """Return human-readable name.""" + pass + + @abc.abstractmethod + def filter_candidates(self, candidates): + """Reduce candidate list based on SDN-C intelligence""" + pass diff --git a/conductor/conductor/data/plugins/service_controller/extensions.py b/conductor/conductor/data/plugins/service_controller/extensions.py new file mode 100644 index 0000000..f309102 --- /dev/null +++ b/conductor/conductor/data/plugins/service_controller/extensions.py @@ -0,0 +1,45 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2017 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +from oslo_log import log +import stevedore + +from conductor.conf import service_controller +from conductor.i18n import _LI + +LOG = log.getLogger(__name__) + +service_controller.register_extension_manager_opts() + + +class Manager(stevedore.named.NamedExtensionManager): + """Manage Service Controller extensions.""" + + def __init__(self, conf, namespace): + super(Manager, self).__init__( + namespace, conf.service_controller.extensions, + invoke_on_load=True, name_order=True) + LOG.info(_LI("Loaded service controller extensions: %s"), self.names()) + + def initialize(self): + """Initialize enabled service controller extensions.""" + for extension in self.extensions: + LOG.info(_LI("Initializing service controller extension '%s'"), + extension.name) + extension.obj.initialize() diff --git a/conductor/conductor/data/plugins/service_controller/sdnc.py b/conductor/conductor/data/plugins/service_controller/sdnc.py new file mode 100644 index 0000000..23968f0 --- /dev/null +++ b/conductor/conductor/data/plugins/service_controller/sdnc.py @@ -0,0 +1,126 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2017 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +import time + +from oslo_config import cfg +from oslo_log import log + +from conductor.common import rest +from conductor.data.plugins.service_controller import base +from conductor.i18n import _LE + +LOG = log.getLogger(__name__) + +CONF = cfg.CONF + +SDNC_OPTS = [ + cfg.IntOpt('cache_refresh_interval', + default=1440, + help='Interval with which to refresh the local cache, ' + 'in minutes.'), + cfg.StrOpt('table_prefix', + default='sdnc', + help='Data Store table prefix.'), + cfg.StrOpt('server_url', + default='https://controller:8443/restconf/', + help='Base URL for SDN-C, up to and including the version.'), + cfg.StrOpt('username', + help='Basic Authentication Username'), + cfg.StrOpt('password', + help='Basic Authentication Password'), + cfg.StrOpt('sdnc_rest_timeout', + default=60, + help='Timeout for SDNC Rest Call'), + cfg.StrOpt('sdnc_retries', + default=3, + help='Retry Numbers for SDNC Rest Call'), +] + +CONF.register_opts(SDNC_OPTS, group='sdnc') + + +class SDNC(base.ServiceControllerBase): + """SDN Service Controller""" + + def __init__(self): + """Initializer""" + + # FIXME(jdandrea): Pass this in to init. + self.conf = CONF + + self.base = self.conf.sdnc.server_url.rstrip('/') + self.password = self.conf.sdnc.password + self.timeout = self.conf.sdnc.sdnc_rest_timeout + self.verify = False + self.retries = self.conf.sdnc.sdnc_retries + self.username = self.conf.sdnc.username + + kwargs = { + "server_url": self.base, + "retries": self.retries, + "username": self.username, + "password": self.password, + "log_debug": self.conf.debug, + } + self.rest = rest.REST(**kwargs) + + # Not sure what info from SDNC is cacheable + self._sdnc_cache = {} + + def initialize(self): + """Perform any late initialization.""" + pass + + def name(self): + """Return human-readable name.""" + return "SDN-C" + + def _request(self, method='get', path='/', data=None, + context=None, value=None): + """Performs HTTP request.""" + kwargs = { + "method": method, + "path": path, + "data": data, + } + + # TODO(jdandrea): Move timing/response logging into the rest helper? + start_time = time.time() + response = self.rest.request(**kwargs) + elapsed = time.time() - start_time + LOG.debug("Total time for SDN-C request " + "({0:}: {1:}): {2:.3f} sec".format(context, value, elapsed)) + + if response is None: + LOG.error(_LE("No response from SDN-C ({}: {})"). + format(context, value)) + elif response.status_code != 200: + LOG.error(_LE("SDN-C request ({}: {}) returned HTTP " + "status {} {}, link: {}{}"). + format(context, value, + response.status_code, response.reason, + self.base, path)) + return response + + def filter_candidates(self, request, candidate_list, + constraint_name, constraint_type): + """Reduce candidate list based on SDN-C intelligence""" + selected_candidates = candidate_list + return selected_candidates diff --git a/conductor/conductor/data/service.py b/conductor/conductor/data/service.py new file mode 100644 index 0000000..33d467f --- /dev/null +++ b/conductor/conductor/data/service.py @@ -0,0 +1,460 @@ +# +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2017 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +# import json +# import os + +import cotyledon +from oslo_config import cfg +from oslo_log import log +# from stevedore import driver + +# from conductor import __file__ as conductor_root +from conductor.common.music import messaging as music_messaging +from conductor.data.plugins.inventory_provider import extensions as ip_ext +from conductor.data.plugins.service_controller import extensions as sc_ext +from conductor.i18n import _LE, _LI, _LW +from conductor import messaging +# from conductor.solver.resource import region +# from conductor.solver.resource import service + +LOG = log.getLogger(__name__) + +CONF = cfg.CONF + +DATA_OPTS = [ + cfg.IntOpt('workers', + default=1, + min=1, + help='Number of workers for data service. ' + 'Default value is 1.'), + cfg.BoolOpt('concurrent', + default=False, + help='Set to True when data will run in active-active ' + 'mode. When set to False, data will flush any abandoned ' + 'messages at startup.'), +] + +CONF.register_opts(DATA_OPTS, group='data') + + +class DataServiceLauncher(object): + """Listener for the data service.""" + + def __init__(self, conf): + """Initializer.""" + self.conf = conf + self.init_extension_managers(conf) + + def init_extension_managers(self, conf): + """Initialize extension managers.""" + self.ip_ext_manager = ( + ip_ext.Manager(conf, 'conductor.inventory_provider.plugin')) + self.ip_ext_manager.initialize() + self.sc_ext_manager = ( + sc_ext.Manager(conf, 'conductor.service_controller.plugin')) + self.sc_ext_manager.initialize() + + def run(self): + transport = messaging.get_transport(self.conf) + if transport: + topic = "data" + target = music_messaging.Target(topic=topic) + endpoints = [DataEndpoint(self.ip_ext_manager, + self.sc_ext_manager), ] + flush = not self.conf.data.concurrent + kwargs = {'transport': transport, + 'target': target, + 'endpoints': endpoints, + 'flush': flush, } + svcmgr = cotyledon.ServiceManager() + svcmgr.add(music_messaging.RPCService, + workers=self.conf.data.workers, + args=(self.conf,), kwargs=kwargs) + svcmgr.run() + + +class DataEndpoint(object): + def __init__(self, ip_ext_manager, sc_ext_manager): + + self.ip_ext_manager = ip_ext_manager + self.sc_ext_manager = sc_ext_manager + self.plugin_cache = {} + + def get_candidate_location(self, ctx, arg): + # candidates should have lat long info already + error = False + location = None + candidate = arg["candidate"] + lat = candidate.get('latitude', None) + lon = candidate.get('longitude', None) + if lat and lon: + location = (float(lat), float(lon)) + else: + error = True + return {'response': location, 'error': error} + + def get_candidate_zone(self, ctx, arg): + candidate = arg["candidate"] + category = arg["category"] + zone = None + error = False + + if category == 'region': + zone = candidate['location_id'] + elif category == 'complex': + zone = candidate['complex_name'] + else: + error = True + + if error: + LOG.error(_LE("Unresolvable zone category {}").format(category)) + else: + LOG.info(_LI("Candidate zone is {}").format(zone)) + return {'response': zone, 'error': error} + + def get_candidates_from_service(self, ctx, arg): + candidate_list = arg["candidate_list"] + constraint_name = arg["constraint_name"] + constraint_type = arg["constraint_type"] + # inventory_type = arg["inventory_type"] + controller = arg["controller"] + request = arg["request"] + # cost = arg["cost"] + error = False + filtered_candidates = [] + # call service and fetch candidates + # TODO(jdandrea): Get rid of the SDN-C reference (outside of plugin!) + if controller == "SDN-C": + service_model = request.get("service_model") + results = self.sc_ext_manager.map_method( + 'filter_candidates', + request=request, + candidate_list=candidate_list, + constraint_name=constraint_name, + constraint_type=constraint_type + ) + if results and len(results) > 0: + filtered_candidates = results[0] + else: + LOG.warn( + _LW("No candidates returned by service " + "controller: {}; may be a new service " + "instantiation.").format(controller)) + else: + LOG.error(_LE("Unknown service controller: {}").format(controller)) + # if response from service controller is empty + if filtered_candidates is None: + LOG.error("No capacity found from SDN-GC for candidates: " + "{}".format(candidate_list)) + return {'response': [], 'error': error} + else: + LOG.debug("Filtered candidates: {}".format(filtered_candidates)) + candidate_list = [c for c in candidate_list + if c in filtered_candidates] + return {'response': candidate_list, 'error': error} + + def get_candidate_discard_set(self, value, candidate_list, value_attrib): + discard_set = set() + value_dict = value + value_condition = '' + if value_dict: + if "all" in value_dict: + value_list = value_dict.get("all") + value_condition = "all" + elif "any" in value_dict: + value_list = value_dict.get("any") + value_condition = "any" + + if not value_list: + return discard_set + + for candidate in candidate_list: + c_any = False + c_all = True + for value in value_list: + if candidate.get(value_attrib) == value: + c_any = True # include if any one is met + elif candidate.get(value_attrib) != value: + c_all = False # discard even if one is not met + if value_condition == 'any' and not c_any: + discard_set.add(candidate.get("candidate_id")) + elif value_condition == 'all' and not c_all: + discard_set.add(candidate.get("candidate_id")) + return discard_set + + def get_inventory_group_candidates(self, ctx, arg): + candidate_list = arg["candidate_list"] + resolved_candidate = arg["resolved_candidate"] + candidate_names = [] + error = False + service_description = 'DHV_VVIG_PAIR' + results = self.ip_ext_manager.map_method( + 'get_inventory_group_pairs', + service_description=service_description + ) + if not results or len(results) < 1: + LOG.error( + _LE("Empty inventory group response for service: {}").format( + service_description)) + error = True + else: + pairs = results[0] + if not pairs or len(pairs) < 1: + LOG.error( + _LE("No inventory group candidates found for service: {}, " + "inventory provider: {}").format( + service_description, self.ip_ext_manager.names()[0])) + error = True + else: + LOG.debug( + "Inventory group pairs: {}, service: {}, " + "inventory provider: {}".format( + pairs, service_description, + self.ip_ext_manager.names()[0])) + for pair in pairs: + if resolved_candidate.get("candidate_id") == pair[0]: + candidate_names.append(pair[1]) + elif resolved_candidate.get("candidate_id") == pair[1]: + candidate_names.append(pair[0]) + + candidate_list = [c for c in candidate_list + if c["candidate_id"] in candidate_names] + LOG.info( + _LI("Inventory group candidates: {}, service: {}, " + "inventory provider: {}").format( + candidate_list, service_description, + self.ip_ext_manager.names()[0])) + return {'response': candidate_list, 'error': error} + + def get_candidates_by_attributes(self, ctx, arg): + candidate_list = arg["candidate_list"] + # demand_name = arg["demand_name"] + properties = arg["properties"] + discard_set = set() + + attributes_to_evaluate = properties.get('evaluate') + for attrib, value in attributes_to_evaluate.items(): + if value == '': + continue + if attrib == 'network_roles': + role_candidates = dict() + role_list = [] + nrc_dict = value + role_condition = '' + if nrc_dict: + if "all" in nrc_dict: + role_list = nrc_dict.get("all") + role_condition = "all" + elif "any" in nrc_dict: + role_list = nrc_dict.get("any") + role_condition = "any" + + # if the role_list is empty do nothing + if not role_list or role_list == '': + LOG.error( + _LE("No roles available, " + "inventory provider: {}").format( + self.ip_ext_manager.names()[0])) + continue + for role in role_list: + # query inventory provider to check if + # the candidate is in role + results = self.ip_ext_manager.map_method( + 'check_network_roles', + network_role_id=role + ) + if not results or len(results) < 1: + LOG.error( + _LE("Empty response from inventory " + "provider {} for network role {}").format( + self.ip_ext_manager.names()[0], role)) + continue + region_ids = results[0] + if not region_ids: + LOG.error( + _LE("No candidates from inventory provider {} " + "for network role {}").format( + self.ip_ext_manager.names()[0], role)) + continue + LOG.debug( + "Network role candidates: {}, role: {}," + "inventory provider: {}".format( + region_ids, role, + self.ip_ext_manager.names()[0])) + role_candidates[role] = region_ids + + # find candidates that meet conditions + for candidate in candidate_list: + # perform this check only for cloud candidates + if candidate["inventory_type"] != "cloud": + continue + c_any = False + c_all = True + for role in role_list: + if role not in role_candidates: + c_all = False + continue + rc = role_candidates.get(role) + if rc and candidate.get("candidate_id") not in rc: + c_all = False + # discard even if one role is not met + elif rc and candidate.get("candidate_id") in rc: + c_any = True + # include if any one role is met + if role_condition == 'any' and not c_any: + discard_set.add(candidate.get("candidate_id")) + elif role_condition == 'all' and not c_all: + discard_set.add(candidate.get("candidate_id")) + + elif attrib == 'complex': + v_discard_set = \ + self.get_candidate_discard_set( + value=value, + candidate_list=candidate_list, + value_attrib="complex_name") + discard_set.update(v_discard_set) + elif attrib == "country": + v_discard_set = \ + self.get_candidate_discard_set( + value=value, + candidate_list=candidate_list, + value_attrib="country") + discard_set.update(v_discard_set) + elif attrib == "state": + v_discard_set = \ + self.get_candidate_discard_set( + value=value, + candidate_list=candidate_list, + value_attrib="state") + discard_set.update(v_discard_set) + elif attrib == "region": + v_discard_set = \ + self.get_candidate_discard_set( + value=value, + candidate_list=candidate_list, + value_attrib="region") + discard_set.update(v_discard_set) + + # return candidates not in discard set + candidate_list[:] = [c for c in candidate_list + if c['candidate_id'] not in discard_set] + LOG.info( + "Available candidates after attribute checks: {}, " + "inventory provider: {}".format( + candidate_list, self.ip_ext_manager.names()[0])) + return {'response': candidate_list, 'error': False} + + def resolve_demands(self, ctx, arg): + error = False + demands = arg.get('demands') + resolved_demands = None + results = self.ip_ext_manager.map_method( + 'resolve_demands', + demands + ) + if results and len(results) > 0: + resolved_demands = results[0] + else: + error = True + + return {'response': {'resolved_demands': resolved_demands}, + 'error': error} + + def resolve_location(self, ctx, arg): + + error = False + resolved_location = None + + host_name = arg.get('host_name') + clli_code = arg.get('clli_code') + + if host_name: + results = self.ip_ext_manager.map_method( + 'resolve_host_location', + host_name + ) + + elif clli_code: + results = self.ip_ext_manager.map_method( + 'resolve_clli_location', + clli_code + ) + else: + # unknown location response + LOG.error(_LE("Unknown location type from the input template." + "Expected location types are host_name" + " or clli_code.")) + + if results and len(results) > 0: + resolved_location = results[0] + else: + error = True + return {'response': {'resolved_location': resolved_location}, + 'error': error} + + def call_reservation_operation(self, ctx, arg): + result = True + reserved_candidates = None + method = arg["method"] + candidate_list = arg["candidate_list"] + reservation_name = arg["reservation_name"] + reservation_type = arg["reservation_type"] + controller = arg["controller"] + request = arg["request"] + + if controller == "SDN-C": + results = self.sc_ext_manager.map_method( + 'call_reservation_operation', + method=method, + candidate_list=candidate_list, + reservation_name=reservation_name, + reservation_type=reservation_type, + request=request + ) + if results and len(results) > 0: + reserved_candidates = results[0] + else: + LOG.error(_LE("Unknown service controller: {}").format(controller)) + if reserved_candidates is None or not reserved_candidates: + result = False + LOG.debug( + _LW("Unable to {} for " + "candidate {}.").format(method, reserved_candidates)) + return {'response': result, + 'error': not result} + else: + LOG.debug("{} for the candidate: " + "{}".format(method, reserved_candidates)) + return {'response': result, + 'error': not result} + + # def do_something(self, ctx, arg): + # """RPC endpoint for data messages + # + # When another service sends a notification over the message + # bus, this method receives it. + # """ + # LOG.debug("Got a message!") + # + # res = { + # 'note': 'do_something called!', + # 'arg': str(arg), + # } + # return {'response': res, 'error': False} |