diff options
Diffstat (limited to 'osdf/adapters')
-rw-r--r-- | osdf/adapters/aaf/aaf_authentication.py | 46 | ||||
-rw-r--r-- | osdf/adapters/aaf/sms.py | 116 | ||||
-rw-r--r-- | osdf/adapters/aai/_init_.py | 0 | ||||
-rw-r--r-- | osdf/adapters/aai/fetch_aai_data.py | 90 | ||||
-rw-r--r-- | osdf/adapters/conductor/__init__.py | 17 | ||||
-rw-r--r-- | osdf/adapters/conductor/api_builder.py | 123 | ||||
-rw-r--r-- | osdf/adapters/conductor/conductor.py | 120 | ||||
-rwxr-xr-x | osdf/adapters/conductor/templates/conductor_interface.json | 41 | ||||
-rw-r--r-- | osdf/adapters/conductor/translation.py | 376 | ||||
-rw-r--r-- | osdf/adapters/dcae/des.py | 47 | ||||
-rwxr-xr-x | osdf/adapters/dcae/message_router.py | 37 | ||||
-rw-r--r-- | osdf/adapters/local_data/local_policies.py | 3 | ||||
-rw-r--r-- | osdf/adapters/policy/interface.py | 156 | ||||
-rw-r--r-- | osdf/adapters/policy/utils.py | 12 |
14 files changed, 1053 insertions, 131 deletions
diff --git a/osdf/adapters/aaf/aaf_authentication.py b/osdf/adapters/aaf/aaf_authentication.py index 26eac29..b9aa510 100644 --- a/osdf/adapters/aaf/aaf_authentication.py +++ b/osdf/adapters/aaf/aaf_authentication.py @@ -17,12 +17,14 @@ # import base64 -import re -from datetime import datetime, timedelta +from datetime import datetime +from datetime import timedelta from flask import request +import re from osdf.config.base import osdf_config -from osdf.logging.osdf_logging import error_log, debug_log +from osdf.logging.osdf_logging import debug_log +from osdf.logging.osdf_logging import error_log from osdf.utils.interfaces import RestClient AUTHZ_PERMS_USER = '{}/authz/perms/user/{}' @@ -43,7 +45,6 @@ def authenticate(uid, passwd): return has_valid_role(perms) except Exception as exp: error_log.error("Error Authenticating the user {} : {}: ".format(uid, exp)) - pass return False @@ -57,27 +58,38 @@ else return false def has_valid_role(perms): aaf_user_roles = deploy_config['aaf_user_roles'] + aaf_roles = get_role_list(perms) + for roles in aaf_user_roles: path_perm = roles.split(':') uri = path_perm[0] - role = path_perm[1].split('|')[0] - if re.search(uri, request.path) and perms: - roles = perms.get('roles') - if roles: - perm_list = roles.get('perm') - for p in perm_list: - if role == p['type']: - return True + perm = path_perm[1].split('|') + p = (perm[0], perm[1], perm[2].split()[0]) + if re.search(uri, request.path) and p in aaf_roles: + return True return False + """ -Make the remote aaf api call if user is not in the cache. +Build a list of roles tuples from the AAF response. -Return the perms """ + + +def get_role_list(perms): + role_list = [] + if perms: + roles = perms.get('roles') + if roles: + perm = roles.get('perm', []) + for p in perm: + role_list.append((p['type'], p['instance'], p['action'])) + return role_list + + def get_aaf_permissions(uid, passwd): key = base64.b64encode(bytes("{}_{}".format(uid, passwd), "ascii")) - time_delta = timedelta(hours=deploy_config.get('aaf_cache_expiry_hrs', 3)) + time_delta = timedelta(minutes=deploy_config.get('aaf_cache_expiry_mins', 5)) perms = perm_cache.get(key) @@ -91,8 +103,8 @@ def get_aaf_permissions(uid, passwd): def remote_api(passwd, uid): - headers = {"Accept": "application/Users+xml;q=1.0;charset=utf-8;version=2.0,text/xml;q=1.0;version=2.0", - "Accept": "application/Users+json;q=1.0;charset=utf-8;version=2.0,application/json;q=1.0;version=2.0,*/*;q=1.0"} + headers = {"Accept": "application/Users+json;q=1.0;charset=utf-8;version=2.0,application/json;q=1.0;version=2.0," + "*/*;q=1.0"} url = AUTHZ_PERMS_USER.format(deploy_config['aaf_url'], uid) rc = RestClient(userid=uid, passwd=passwd, headers=headers, url=url, log_func=debug_log.debug, req_id='aaf_user_id') diff --git a/osdf/adapters/aaf/sms.py b/osdf/adapters/aaf/sms.py index 9c7af51..031fee4 100644 --- a/osdf/adapters/aaf/sms.py +++ b/osdf/adapters/aaf/sms.py @@ -1,6 +1,7 @@ # # ------------------------------------------------------------------------- # Copyright (c) 2018 Intel Corporation Intellectual Property +# Copyright (C) 2020 Wipro Limited. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -21,9 +22,12 @@ from onapsmsclient import Client -import osdf.config.loader as config_loader +import osdf.config.base as cfg_base from osdf.config.base import osdf_config +import osdf.config.credentials as creds +import osdf.config.loader as config_loader from osdf.logging.osdf_logging import debug_log +from osdf.utils import cipherUtils config_spec = { "preload_secrets": "config/preload_secrets.yaml" @@ -31,9 +35,12 @@ config_spec = { def preload_secrets(): - """ This is intended to load the secrets required for testing Application - Actual deployment will have a preload script. Make sure the config is - in sync""" + """preload_secrets() + + This is intended to load the secrets required for testing Application + Actual deployment will have a preload script. Make sure the config is + in sync + """ preload_config = config_loader.load_config_file( config_spec.get("preload_secrets")) domain = preload_config.get("domain") @@ -41,6 +48,9 @@ def preload_secrets(): sms_url = config["aaf_sms_url"] timeout = config["aaf_sms_timeout"] cacert = config["aaf_ca_certs"] + if not sms_url: + debug_log.debug("SMS Disabled") + return sms_client = Client(url=sms_url, timeout=timeout, cacert=cacert) domain_uuid = sms_client.createDomain(domain) debug_log.debug( @@ -60,58 +70,80 @@ def retrieve_secrets(): timeout = config["aaf_sms_timeout"] cacert = config["aaf_ca_certs"] domain = config["secret_domain"] - sms_client = Client(url=sms_url, timeout=timeout, cacert=cacert) - secrets = sms_client.getSecretNames(domain) - for secret in secrets: - values = sms_client.getSecret(domain, secret) - secret_dict[secret] = values - debug_log.debug("Secret Dictionary Retrieval Success") + if sms_url: + sms_client = Client(url=sms_url, timeout=timeout, cacert=cacert) + secrets = sms_client.getSecretNames(domain) + for secret in secrets: + values = sms_client.getSecret(domain, secret) + secret_dict[secret] = values + debug_log.debug("Secret Dictionary Retrieval Success") + else: + debug_log.debug("SMS Disabled. Secrets not loaded") return secret_dict def load_secrets(): config = osdf_config.deployment secret_dict = retrieve_secrets() - config['soUsername'] = secret_dict['so']['UserName'] - config['soPassword'] = secret_dict['so']['Password'] - config['conductorUsername'] = secret_dict['conductor']['UserName'] - config['conductorPassword'] = secret_dict['conductor']['Password'] - config['policyPlatformUsername'] = secret_dict['policyPlatform']['UserName'] - config['policyPlatformPassword'] = secret_dict['policyPlatform']['Password'] - config['policyClientUsername'] = secret_dict['policyClient']['UserName'] - config['policyClientPassword'] = secret_dict['policyClient']['Password'] - config['messageReaderAafUserId'] = secret_dict['dmaap']['UserName'] - config['messageReaderAafPassword'] = secret_dict['dmaap']['Password'] - config['sdcUsername'] = secret_dict['sdc']['UserName'] - config['sdcPassword'] = secret_dict['sdc']['Password'] - config['osdfPlacementUsername'] = secret_dict['osdfPlacement']['UserName'] - config['osdfPlacementPassword'] = secret_dict['osdfPlacement']['Password'] - config['osdfPlacementSOUsername'] = secret_dict['osdfPlacementSO']['UserName'] - config['osdfPlacementSOPassword'] = secret_dict['osdfPlacementSO']['Password'] - config['osdfPlacementVFCUsername'] = secret_dict['osdfPlacementVFC']['UserName'] - config['osdfPlacementVFCPassword'] = secret_dict['osdfPlacementVFC']['Password'] - config['osdfCMSchedulerUsername'] = secret_dict['osdfCMScheduler']['UserName'] - config['osdfCMSchedulerPassword'] = secret_dict['osdfCMScheduler']['Password'] - config['configDbUserName'] = secret_dict['configDb']['UserName'] - config['configDbPassword'] = secret_dict['configDb']['Password'] - config['pciHMSUsername'] = secret_dict['pciHMS']['UserName'] - config['pciHMSPassword'] = secret_dict['pciHMS']['Password'] - config['osdfPCIOptUsername'] = secret_dict['osdfPCIOpt']['UserName'] - config['osdfPCIOptPassword'] = secret_dict['osdfPCIOpt']['Password'] + if secret_dict: + config['soUsername'] = secret_dict['so']['UserName'] + config['soPassword'] = decrypt_pass(secret_dict['so']['Password']) + config['conductorUsername'] = secret_dict['conductor']['UserName'] + config['conductorPassword'] = decrypt_pass(secret_dict['conductor']['Password']) + config['policyPlatformUsername'] = secret_dict['policyPlatform']['UserName'] + config['policyPlatformPassword'] = decrypt_pass(secret_dict['policyPlatform']['Password']) + config['policyClientUsername'] = secret_dict['policyPlatform']['UserName'] + config['policyClientPassword'] = decrypt_pass(secret_dict['policyPlatform']['Password']) + config['messageReaderAafUserId'] = secret_dict['dmaap']['UserName'] + config['messageReaderAafPassword'] = decrypt_pass(secret_dict['dmaap']['Password']) + config['sdcUsername'] = secret_dict['sdc']['UserName'] + config['sdcPassword'] = decrypt_pass(secret_dict['sdc']['Password']) + config['osdfPlacementUsername'] = secret_dict['osdfPlacement']['UserName'] + config['osdfPlacementPassword'] = decrypt_pass(secret_dict['osdfPlacement']['Password']) + config['osdfPlacementSOUsername'] = secret_dict['osdfPlacementSO']['UserName'] + config['osdfPlacementSOPassword'] = decrypt_pass(secret_dict['osdfPlacementSO']['Password']) + config['osdfPlacementVFCUsername'] = secret_dict['osdfPlacementVFC']['UserName'] + config['osdfPlacementVFCPassword'] = decrypt_pass(secret_dict['osdfPlacementVFC']['Password']) + config['osdfCMSchedulerUsername'] = secret_dict['osdfCMScheduler']['UserName'] + config['osdfCMSchedulerPassword'] = decrypt_pass(secret_dict['osdfCMScheduler']['Password']) + config['configDbUserName'] = secret_dict['configDb']['UserName'] + config['configDbPassword'] = decrypt_pass(secret_dict['configDb']['Password']) + config['pciHMSUsername'] = secret_dict['pciHMS']['UserName'] + config['pciHMSPassword'] = decrypt_pass(secret_dict['pciHMS']['Password']) + config['osdfPCIOptUsername'] = secret_dict['osdfPCIOpt']['UserName'] + config['osdfPCIOptPassword'] = decrypt_pass(secret_dict['osdfPCIOpt']['Password']) + config['osdfOptEngineUsername'] = secret_dict['osdfOptEngine']['UserName'] + config['osdfOptEnginePassword'] = decrypt_pass(secret_dict['osdfOptEngine']['Password']) + cfg_base.http_basic_auth_credentials = creds.load_credentials(osdf_config) + cfg_base.dmaap_creds = creds.dmaap_creds() + + +def decrypt_pass(passwd): + config = osdf_config.deployment + if not config.get('appkey') or passwd == '' or passwd == 'NA': + return passwd + else: + return cipherUtils.AESCipher.get_instance().decrypt(passwd) def delete_secrets(): - """ This is intended to delete the secrets for a clean initialization for - testing Application. Actual deployment will have a preload script. - Make sure the config is in sync""" + """delete_secrets() + + This is intended to delete the secrets for a clean initialization for + testing Application. Actual deployment will have a preload script. + Make sure the config is in sync + """ config = osdf_config.deployment sms_url = config["aaf_sms_url"] timeout = config["aaf_sms_timeout"] cacert = config["aaf_ca_certs"] domain = config["secret_domain"] - sms_client = Client(url=sms_url, timeout=timeout, cacert=cacert) - ret_val = sms_client.deleteDomain(domain) - debug_log.debug("Clean up complete") + if sms_url: + sms_client = Client(url=sms_url, timeout=timeout, cacert=cacert) + ret_val = sms_client.deleteDomain(domain) + debug_log.debug("Clean up complete") + else: + debug_log.debug("SMS Disabled. Secrets delete skipped") return ret_val diff --git a/osdf/adapters/aai/_init_.py b/osdf/adapters/aai/_init_.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/osdf/adapters/aai/_init_.py diff --git a/osdf/adapters/aai/fetch_aai_data.py b/osdf/adapters/aai/fetch_aai_data.py new file mode 100644 index 0000000..170d5e5 --- /dev/null +++ b/osdf/adapters/aai/fetch_aai_data.py @@ -0,0 +1,90 @@ +# ------------------------------------------------------------------------- +# Copyright (C) 2020 Wipro Limited. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +import json +import requests +from requests.auth import HTTPBasicAuth +from requests import RequestException + +from osdf.logging.osdf_logging import debug_log + +AAI_HEADERS = { + "X-TransactionId": "9999", + "X-FromAppId": "OOF", + "Accept": "application/json", + "Content-Type": "application/json", +} + +AUTH = HTTPBasicAuth("AAI", "AAI") + + +class AAIException(Exception): + pass + + +def get_aai_data(request_json, osdf_config): + + """Get the response from AAI + + :param request_json: requestjson + :param osdf_config: configuration specific to OSDF app + :return:response body from AAI + """ + + nxi_id = request_json["NxIId"] + config = osdf_config.deployment + aai_url = config["aaiUrl"] + aai_req_url = aai_url + config["aaiServiceInstanceUrl"] + nxi_id + "?depth=2" + + try: + debug_log.debug("aai request: {}".format(aai_req_url)) + response = requests.get(aai_req_url, headers=AAI_HEADERS, auth=AUTH, verify=False) + debug_log.debug("aai response: {}".format(response.json())) + except RequestException as e: + raise AAIException("Request exception was encountered {}".format(e)) + + if response.status_code == 200: + return response.json() + else: + raise AAIException("Error response recieved from AAI for the request {}".format(aai_req_url)) + + +def execute_dsl_query(query, format, osdf_config): + """Get the response from AAI + + :param query: dsl query to be executed + :param format format of the output + :param osdf_config: configuration specific to OSDF app + :return:response body from AAI + """ + config = osdf_config.deployment + dsl_url = config["aaiUrl"] + config["dslQueryPath"] + format + + data = json.dumps({'dsl': query}) + debug_log.debug("aai dsl request: {}".format(data)) + try: + response = requests.put(dsl_url, data=data, headers=AAI_HEADERS, auth=AUTH, verify=False) + debug_log.debug("aai dsl response: {}".format(response)) + except RequestException as ex: + raise AAIException("Request exception was encountered {}".format(ex)) + + if response.status_code == 200: + return response.json() + else: + raise AAIException("Response code other than 200 from AAI: {} {}".format(response.status_code, + response.content)) diff --git a/osdf/adapters/conductor/__init__.py b/osdf/adapters/conductor/__init__.py new file mode 100644 index 0000000..6156206 --- /dev/null +++ b/osdf/adapters/conductor/__init__.py @@ -0,0 +1,17 @@ +# ------------------------------------------------------------------------- +# Copyright (c) 2017-2018 AT&T Intellectual Property +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +#
\ No newline at end of file diff --git a/osdf/adapters/conductor/api_builder.py b/osdf/adapters/conductor/api_builder.py new file mode 100644 index 0000000..f3b0798 --- /dev/null +++ b/osdf/adapters/conductor/api_builder.py @@ -0,0 +1,123 @@ +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2017 AT&T Intellectual Property +# Copyright (C) 2020 Wipro Limited. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +from jinja2 import Template +import json + +import osdf.adapters.conductor.translation as tr +from osdf.adapters.policy.utils import group_policies_gen +from osdf.utils.programming_utils import list_flatten + + +def _build_parameters(group_policies, service_info, request_parameters): + """Function prepares parameters section for has request + + :param group_policies: filtered policies + :param service_info: service info + :param request_parameters: request parameters + :return: + """ + initial_params = tr.get_opt_query_data(request_parameters, + group_policies['onap.policies.optimization.service.QueryPolicy']) + params = dict() + params.update({"REQUIRED_MEM": initial_params.pop("requiredMemory", "")}) + params.update({"REQUIRED_DISK": initial_params.pop("requiredDisk", "")}) + params.update({"customer_lat": initial_params.pop("customerLatitude", 0.0)}) + params.update({"customer_long": initial_params.pop("customerLongitude", 0.0)}) + params.update({"service_name": service_info.get('serviceName', "")}) + params.update({"service_id": service_info.get('serviceInstanceId', "")}) + + for key, val in initial_params.items(): + if val and val != "": + params.update({key: val}) + return params + + +def conductor_api_builder(req_info, demands, request_parameters, service_info, + template_fields, flat_policies: list, local_config, + template="osdf/adapters/conductor/templates/conductor_interface.json"): + """Build an OSDF southbound API call for HAS-Conductor/Placement optimization + + :param req_info: parameter data received from a client + :param demands: list of demands + :param request_parameters: request parameters + :param service_info: service info object + :param template_fields: Fields that has to be passed to the template to render + :param flat_policies: policy data received from the policy platform (flat policies) + :param template: template to generate southbound API call to conductor + :param local_config: local configuration file with pointers for + the service specific information + :return: json to be sent to Conductor/placement optimization + """ + + templ = Template(open(template).read()) + gp = group_policies_gen(flat_policies, local_config) + demand_name_list = [] + for demand in demands: + demand_name_list.append(demand['resourceModuleName'].lower()) + demand_list = tr.gen_demands(demands, gp['onap.policies.optimization.resource.VnfPolicy']) + attribute_policy_list = tr.gen_attribute_policy( + demand_name_list, gp['onap.policies.optimization.resource.AttributePolicy']) + + distance_to_location_policy_list = tr.gen_distance_to_location_policy( + demand_name_list, gp['onap.policies.optimization.resource.DistancePolicy']) + inventory_policy_list = tr.gen_inventory_group_policy( + demand_name_list, gp['onap.policies.optimization.resource.InventoryGroupPolicy']) + resource_instance_policy_list = tr.gen_resource_instance_policy( + demand_name_list, gp['onap.policies.optimization.resource.ResourceInstancePolicy']) + resource_region_policy_list = tr.gen_resource_region_policy( + demand_name_list, gp['onap.policies.optimization.resource.ResourceRegionPolicy']) + zone_policy_list = tr.gen_zone_policy( + demand_name_list, gp['onap.policies.optimization.resource.AffinityPolicy']) + optimization_policy_list = tr.gen_optimization_policy( + demand_name_list, gp['onap.policies.optimization.resource.OptimizationPolicy']) + reservation_policy_list = tr.gen_reservation_policy( + demand_name_list, gp['onap.policies.optimization.resource.InstanceReservationPolicy']) + capacity_policy_list = tr.gen_capacity_policy( + demand_name_list, gp['onap.policies.optimization.resource.Vim_fit']) + hpa_policy_list = tr.gen_hpa_policy( + demand_name_list, gp['onap.policies.optimization.resource.HpaPolicy']) + threshold_policy_list = tr.gen_threshold_policy(demand_name_list, + gp['onap.policies.optimization.resource.' + 'ThresholdPolicy']) + aggregation_policy_list = tr.gen_aggregation_policy(demand_name_list, + gp['onap.policies.optimization.resource.' + 'AggregationPolicy']) + req_params_dict = _build_parameters(gp, service_info, request_parameters) + conductor_policies = [attribute_policy_list, distance_to_location_policy_list, + inventory_policy_list, resource_instance_policy_list, + resource_region_policy_list, zone_policy_list, reservation_policy_list, + capacity_policy_list, hpa_policy_list, threshold_policy_list, aggregation_policy_list] + filtered_policies = [x for x in conductor_policies if len(x) > 0] + policy_groups = list_flatten(filtered_policies) + request_type = req_info.get('requestType', None) + rendered_req = templ.render( + requestType=request_type, + demand_list=demand_list, + policy_groups=policy_groups, + optimization_policies=optimization_policy_list, + name=req_info['requestId'], + timeout=req_info['timeout'], + limit=req_info['numSolutions'], + request_params=req_params_dict, + location_enabled=template_fields.get('location_enabled'), + version=template_fields.get('version'), + json=json) + json_payload = json.dumps(json.loads(rendered_req)) # need this because template's JSON is ugly! + return json_payload diff --git a/osdf/adapters/conductor/conductor.py b/osdf/adapters/conductor/conductor.py new file mode 100644 index 0000000..49c123d --- /dev/null +++ b/osdf/adapters/conductor/conductor.py @@ -0,0 +1,120 @@ +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2017 AT&T Intellectual Property +# Copyright (C) 2020 Wipro Limited. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +import json +from requests import RequestException +import time + +from osdf.adapters.conductor.api_builder import conductor_api_builder +from osdf.logging.osdf_logging import debug_log +from osdf.operation.exceptions import BusinessException +from osdf.utils.interfaces import RestClient + + +def request(req_info, demands, request_parameters, service_info, template_fields, + osdf_config, flat_policies): + config = osdf_config.deployment + local_config = osdf_config.core + uid, passwd = config['conductorUsername'], config['conductorPassword'] + conductor_url = config['conductorUrl'] + req_id = req_info["requestId"] + transaction_id = req_info['transactionId'] + headers = dict(transaction_id=transaction_id) + placement_ver_enabled = config.get('placementVersioningEnabled', False) + + if placement_ver_enabled: + cond_minor_version = config.get('conductorMinorVersion', None) + if cond_minor_version is not None: + x_minor_version = str(cond_minor_version) + headers.update({'X-MinorVersion': x_minor_version}) + debug_log.debug("Versions set in HTTP header to " + "conductor: X-MinorVersion: {} ".format(x_minor_version)) + + max_retries = config.get('conductorMaxRetries', 30) + ping_wait_time = config.get('conductorPingWaitTime', 60) + + rc = RestClient(userid=uid, passwd=passwd, method="GET", log_func=debug_log.debug, + headers=headers) + conductor_req_json_str = conductor_api_builder(req_info, demands, request_parameters, + service_info, template_fields, flat_policies, + local_config) + conductor_req_json = json.loads(conductor_req_json_str) + + debug_log.debug("Sending first Conductor request for request_id {}".format(req_id)) + + resp, raw_resp = initial_request_to_conductor(rc, conductor_url, conductor_req_json) + # Very crude way of keeping track of time. + # We are not counting initial request time, first call back, or time for HTTP request + total_time, ctr = 0, 2 + client_timeout = req_info['timeout'] + configured_timeout = max_retries * ping_wait_time + max_timeout = min(client_timeout, configured_timeout) + + while True: # keep requesting conductor till we get a result or we run out of time + if resp is not None: + if resp["plans"][0].get("status") in ["error"]: + raise RequestException(response=raw_resp, request=raw_resp.request) + + if resp["plans"][0].get("status") in ["done", "not found", "solved"]: + return resp + new_url = resp['plans'][0]['links'][0][0]['href'] # TODO(krishna): check why a list of lists + + if total_time >= max_timeout: + raise BusinessException("Conductor could not provide a solution within {} seconds," + "this transaction is timing out".format(max_timeout)) + time.sleep(ping_wait_time) + ctr += 1 + debug_log.debug("Attempt number {} url {}; prior status={}" + .format(ctr, new_url, resp['plans'][0]['status'])) + total_time += ping_wait_time + + try: + raw_resp = rc.request(new_url, raw_response=True) + resp = raw_resp.json() + except RequestException as e: + debug_log.debug("Conductor attempt {} for request_id {} has failed because {}" + .format(ctr, req_id, str(e))) + + +def initial_request_to_conductor(rc, conductor_url, conductor_req_json): + """First steps in the request-redirect chain in making a call to Conductor + + :param rc: REST client object for calling conductor + :param conductor_url: conductor's base URL to submit a placement request + :param conductor_req_json: request json object to send to Conductor + :return: URL to check for follow up (similar to redirects); + we keep checking these till we get a result/error + """ + debug_log.debug("Payload to Conductor: {}".format(json.dumps(conductor_req_json))) + raw_resp = rc.request(url=conductor_url, raw_response=True, method="POST", + json=conductor_req_json) + resp = raw_resp.json() + if resp["status"] != "template": + raise RequestException(response=raw_resp, request=raw_resp.request) + time.sleep(10) # 10 seconds wait time to avoid being too quick! + plan_url = resp["links"][0][0]["href"] + debug_log.debug("Attempting to read the plan from " + "the conductor provided url {}".format(plan_url)) + raw_resp = rc.request(raw_response=True, + url=plan_url) + resp = raw_resp.json() + + if resp["plans"][0]["status"] in ["error"]: + raise RequestException(response=raw_resp, request=raw_resp.request) + return resp, raw_resp # now the caller of this will handle further follow-ups diff --git a/osdf/adapters/conductor/templates/conductor_interface.json b/osdf/adapters/conductor/templates/conductor_interface.json new file mode 100755 index 0000000..c0e28dc --- /dev/null +++ b/osdf/adapters/conductor/templates/conductor_interface.json @@ -0,0 +1,41 @@ +{ + "name": "{{ name }}", + "files": {}, + "timeout": {{ timeout }}, + "num_solution": "{{ limit }}", + "template": { + "homing_template_version": "{{ version }}", + "parameters": { + {% set comma=joiner(",") %} + {% for key, value in request_params.items() %} {{ comma() }} + "{{key}}": {{ json.dumps(value) }} + {% endfor %} + }, + {% if location_enabled %} + "locations": { + "customer_loc": { + "latitude": { "get_param": "customer_lat" }, + "longitude": { "get_param": "customer_long" } + } + }, + {% endif %} + "demands": {{ json.dumps(demand_list) }}, + {% set comma_main = joiner(",") %} + "constraints": { + {% set comma=joiner(",") %} + {% for elem in policy_groups %} {{ comma() }} + {% for key, value in elem.items() %} + "{{key}}": {{ json.dumps(value) }} + {% endfor %} + {% endfor %} + }, + "optimization": { + {% set comma=joiner(",") %} + {% for elem in optimization_policies %} {{ comma() }} + {% for key, value in elem.items() %} + "{{key}}": {{ json.dumps(value) }} {{ ", " if not loop.last }} + {% endfor %} + {% endfor %} + } + } +} diff --git a/osdf/adapters/conductor/translation.py b/osdf/adapters/conductor/translation.py new file mode 100644 index 0000000..f44f27f --- /dev/null +++ b/osdf/adapters/conductor/translation.py @@ -0,0 +1,376 @@ +# ------------------------------------------------------------------------- +# Copyright (c) 2015-2017 AT&T Intellectual Property +# Copyright (C) 2020 Wipro Limited. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# +import copy +import json +import re +import yaml + +from osdf.utils.programming_utils import dot_notation + +policy_config_mapping = yaml.safe_load(open('config/has_config.yaml')).get('policy_config_mapping') + +CONSTRAINT_TYPE_MAP = {"onap.policies.optimization.resource.AttributePolicy": "attribute", + "onap.policies.optimization.resource.DistancePolicy": "distance_to_location", + "onap.policies.optimization.resource.InventoryGroupPolicy": "inventory_group", + "onap.policies.optimization.resource.ResourceInstancePolicy": "instance_fit", + "onap.policies.optimization.resource.ResourceRegionPolicy": "region_fit", + "onap.policies.optimization.resource.AffinityPolicy": "zone", + "onap.policies.optimization.resource.InstanceReservationPolicy": + "instance_reservation", + "onap.policies.optimization.resource.Vim_fit": "vim_fit", + "onap.policies.optimization.resource.HpaPolicy": "hpa", + "onap.policies.optimization.resource.ThresholdPolicy": "threshold", + "onap.policies.optimization.resource.AggregationPolicy": "aggregation" + } + + +def get_opt_query_data(request_parameters, policies): + """Fetch service and order specific details from the requestParameters field of a request. + + :param request_parameters: A list of request parameters + :param policies: A set of policies + :return: A dictionary with service and order-specific attributes. + """ + req_param_dict = {} + if request_parameters: + for policy in policies: + for queryProp in policy[list(policy.keys())[0]]['properties']['queryProperties']: + attr_val = queryProp['value'] if 'value' in queryProp and queryProp['value'] != "" \ + else dot_notation(request_parameters, queryProp['attribute_location']) + if attr_val is not None: + req_param_dict.update({queryProp['attribute']: attr_val}) + return req_param_dict + + +def gen_optimization_policy(vnf_list, optimization_policy): + """Generate optimization policy details to pass to Conductor + + :param vnf_list: List of vnf's to used in placement request + :param optimization_policy: optimization objective policy information provided in the incoming request + :return: List of optimization objective policies in a format required by Conductor + """ + if len(optimization_policy) == 1: + policy = optimization_policy[0] + policy_content = policy[list(policy.keys())[0]] + if policy_content['type_version'] == '2.0.0': + properties = policy_content['properties'] + objective = {'goal': properties['goal'], + 'operation_function': properties['operation_function']} + return [objective] + + optimization_policy_list = [] + for policy in optimization_policy: + content = policy[list(policy.keys())[0]]['properties'] + parameter_list = [] + parameters = ["cloud_version", "hpa_score"] + + for attr in content['objectiveParameter']['parameterAttributes']: + parameter = attr['parameter'] if attr['parameter'] in parameters else attr['parameter'] + "_between" + default, vnfs = get_matching_vnfs(attr['resources'], vnf_list) + for vnf in vnfs: + value = [vnf] if attr['parameter'] in parameters else [attr['customerLocationInfo'], vnf] + parameter_list.append({ + attr['operator']: [attr['weight'], {parameter: value}] + }) + + optimization_policy_list.append({ + content['objective']: {content['objectiveParameter']['operator']: parameter_list} + }) + return optimization_policy_list + + +def get_matching_vnfs(resources, vnf_list, match_type="intersection"): + """Get a list of matching VNFs from the list of resources + + :param resources: + :param vnf_list: List of vnfs to used in placement request + :param match_type: "intersection" or "all" or "any" (any => send all_vnfs if there is any intersection) + :return: List of matching VNFs + """ + # Check if it is a default policy + default = True if resources == [] else False + resources_lcase = [x.lower() for x in resources] if not default else [x.lower() for x in vnf_list] + if match_type == "all": # don't bother with any comparisons + return default, resources if set(resources_lcase) <= set(vnf_list) else None + common_vnfs = set(vnf_list) & set(resources_lcase) if not default else set(vnf_list) + common_resources = [x for x in resources if x.lower() in common_vnfs] if not default else list(common_vnfs) + if match_type == "intersection": # specifically requested intersection + return default, list(common_resources) + return default, resources if common_vnfs else None # "any" match => all resources to be returned + + +def gen_policy_instance(vnf_list, resource_policy, match_type="intersection", rtype=None): + """Generate a list of policies + + :param vnf_list: List of vnf's to used in placement request + :param resource_policy: policy for this specific resource + :param match_type: How to match the vnf_names with the vnf_list (intersection or "any") + intersection => return intersection; "any" implies return all vnf_names if intersection is not null + :param rtype: resource type (e.g. resourceRegionProperty or resourceInstanceProperty) + None => no controller information added to the policy specification to Conductor + :return: resource policy list in a format required by Conductor + """ + resource_policy_list = [] + related_policies = [] + for policy in resource_policy: + pc = policy[list(policy.keys())[0]] + default, demands = get_matching_vnfs(pc['properties']['resources'], vnf_list, match_type=match_type) + resource = {pc['properties']['identity']: {'type': CONSTRAINT_TYPE_MAP.get(pc['type']), 'demands': demands}} + + if rtype: + resource[pc['properties']['identity']]['properties'] = {'controller': pc[rtype]['controller'], + 'request': json.loads(pc[rtype]['request'])} + if demands and len(demands) != 0: + # The default policy shall not override the specific policy that already appended + if default: + for d in demands: + resource_repeated = True \ + if {pc['properties']['identity']: {'type': CONSTRAINT_TYPE_MAP.get(pc['type']), + 'demands': d}} in resource_policy_list else False + if resource_repeated: + continue + else: + resource_policy_list.append( + {pc['properties']['identity']: {'type': CONSTRAINT_TYPE_MAP.get(pc['type']), + 'demands': d}}) + policy[list(policy.keys())[0]]['properties']['resources'] = d + related_policies.append(policy) + # Need to override the default policies, here delete the outdated policy stored in the db + if resource in resource_policy_list: + for pc in related_policies: + if pc[list(pc.keys()[0])]['properties']['resources'] == resource: + related_policies.remove(pc) + resource_policy_list.remove(resource) + related_policies.append(policy) + resource_policy_list.append(resource) + + return resource_policy_list, related_policies + + +def gen_resource_instance_policy(vnf_list, resource_instance_policy): + """Get policies governing resource instances in order to populate the Conductor API call""" + cur_policies, _ = gen_policy_instance(vnf_list, resource_instance_policy, rtype='resourceInstanceProperty') + return cur_policies + + +def gen_resource_region_policy(vnf_list, resource_region_policy): + """Get policies governing resource region in order to populate the Conductor API call""" + cur_policies, _ = gen_policy_instance(vnf_list, resource_region_policy, rtype='resourceRegionProperty') + return cur_policies + + +def gen_inventory_group_policy(vnf_list, inventory_group_policy): + """Get policies governing inventory group in order to populate the Conductor API call""" + cur_policies, _ = gen_policy_instance(vnf_list, inventory_group_policy, rtype=None) + return cur_policies + + +def gen_reservation_policy(vnf_list, reservation_policy): + """Get policies governing resource instances in order to populate the Conductor API call""" + cur_policies, _ = gen_policy_instance(vnf_list, reservation_policy, rtype='instanceReservationProperty') + return cur_policies + + +def gen_distance_to_location_policy(vnf_list, distance_to_location_policy): + """Get policies governing distance-to-location for VNFs in order to populate the Conductor API call""" + cur_policies, related_policies = gen_policy_instance(vnf_list, distance_to_location_policy, rtype=None) + for p_new, p_main in zip(cur_policies, related_policies): # add additional fields to each policy + properties = p_main[list(p_main.keys())[0]]['properties']['distanceProperties'] + pcp_d = properties['distance'] + p_new[p_main[list(p_main.keys())[0]]['properties']['identity']]['properties'] = { + 'distance': pcp_d['operator'] + " " + pcp_d['value'].lower() + " " + pcp_d['unit'].lower(), + 'location': properties['locationInfo'] + } + return cur_policies + + +def gen_attribute_policy(vnf_list, attribute_policy): + """Get policies governing attributes of VNFs in order to populate the Conductor API call""" + cur_policies, related_policies = gen_policy_instance(vnf_list, attribute_policy, rtype=None) + for p_new, p_main in zip(cur_policies, related_policies): # add additional fields to each policy + properties = p_main[list(p_main.keys())[0]]['properties']['attributeProperties'] + attribute_mapping = policy_config_mapping['filtering_attributes'] # wanted attributes and mapping + p_new[p_main[list(p_main.keys())[0]]['properties']['identity']]['properties'] = { + 'evaluate': dict((attribute_mapping[k], properties.get(k) + if k != "cloudRegion" else gen_cloud_region(properties)) + for k in attribute_mapping.keys()) + } + return cur_policies # cur_policies gets updated in place... + + +def gen_zone_policy(vnf_list, zone_policy): + """Get zone policies in order to populate the Conductor API call""" + cur_policies, related_policies = gen_policy_instance(vnf_list, zone_policy, match_type="all", rtype=None) + for p_new, p_main in zip(cur_policies, related_policies): # add additional fields to each policy + pmz = p_main[list(p_main.keys())[0]]['properties']['affinityProperties'] + p_new[p_main[list(p_main.keys())[0]]['properties']['identity']]['properties'] = \ + {'category': pmz['category'], 'qualifier': pmz['qualifier']} + return cur_policies + + +def gen_capacity_policy(vnf_list, capacity_policy): + """Get zone policies in order to populate the Conductor API call""" + cur_policies, related_policies = gen_policy_instance(vnf_list, capacity_policy, rtype=None) + for p_new, p_main in zip(cur_policies, related_policies): # add additional fields to each policy + pmz = p_main[list(p_main.keys())[0]]['properties']['capacityProperty'] + p_new[p_main[list(p_main.keys())[0]]['properties']['identity']]['properties'] = \ + {"controller": pmz['controller'], 'request': json.loads(pmz['request'])} + return cur_policies + + +def gen_hpa_policy(vnf_list, hpa_policy): + """Get zone policies in order to populate the Conductor API call""" + cur_policies, related_policies = gen_policy_instance(vnf_list, hpa_policy, rtype=None) + for p_new, p_main in zip(cur_policies, related_policies): # add additional fields to each policy + p_new[p_main[list(p_main.keys())[0]]['properties']['identity']]['properties'] = \ + {'evaluate': p_main[list(p_main.keys())[0]]['properties']['flavorFeatures']} + return cur_policies + + +def gen_threshold_policy(vnf_list, threshold_policy): + cur_policies, related_policies = gen_policy_instance(vnf_list, threshold_policy, rtype=None) + for p_new, p_main in zip(cur_policies, related_policies): + pmz = p_main[list(p_main.keys())[0]]['properties']['thresholdProperties'] + p_new[p_main[list(p_main.keys())[0]]['properties']['identity']]['properties'] = {'evaluate': pmz} + return cur_policies + + +def gen_aggregation_policy(vnf_list, cross_policy): + cur_policies, related_policies = gen_policy_instance(vnf_list, cross_policy, rtype=None) + for p_new, p_main in zip(cur_policies, related_policies): + pmz = p_main[list(p_main.keys())[0]]['properties']['aggregationProperties'] + p_new[p_main[list(p_main.keys())[0]]['properties']['identity']]['properties'] = {'evaluate': pmz} + return cur_policies + + +def get_augmented_policy_attributes(policy_property, demand): + """Get policy attributes and augment them using policy_config_mapping and demand information""" + attributes = copy.copy(policy_property['attributes']) + remapping = policy_config_mapping['remapping'] + extra = dict((x, demand['resourceModelInfo'][remapping[x]]) for x in attributes if x in remapping) + attributes.update(extra) + return attributes + + +def get_candidates_demands(demand): + """Get demands related to candidates; e.g. excluded/required""" + res = {} + for k, v in policy_config_mapping['candidates'].items(): + if k not in demand: + continue + res[v] = [{'inventory_type': x['identifierType'], 'candidate_id': x['identifiers']} for x in demand[k]] + return res + + +def get_policy_properties(demand, policies): + """Get policy_properties for cases where there is a match with the demand""" + for policy in policies: + policy_demands = set([x.lower() for x in policy[list(policy.keys())[0]]['properties']['resources']]) + if policy_demands and demand['resourceModuleName'].lower() not in policy_demands: + continue # no match for this policy + elif policy_demands == set(): # Append resource name for default policy + policy[list(policy.keys())[0]]['properties'].update(resources=list(demand.get('resourceModuleName'))) + for policy_property in policy[list(policy.keys())[0]]['properties']['vnfProperties']: + yield policy_property + + +def get_demand_properties(demand, policies): + """Get list demand properties objects (named tuples) from policy""" + demand_properties = [] + for policy_property in get_policy_properties(demand, policies): + prop = dict(inventory_provider=policy_property['inventoryProvider'], + inventory_type=policy_property['inventoryType'], + service_type=demand.get('serviceResourceId', ''), + service_resource_id=demand.get('serviceResourceId', '')) + policy_property_mapping = {'filtering_attributes': 'attributes', + 'passthrough_attributes': 'passthroughAttributes', + 'default_attributes': 'defaultAttributes'} + + prop.update({'unique': policy_property['unique']} if 'unique' in policy_property and + policy_property['unique'] else {}) + prop['filtering_attributes'] = dict() + for key, value in policy_property_mapping.items(): + get_demand_attributes(prop, policy_property, key, value) + + prop['filtering_attributes'].update({'global-customer-id': policy_property['customerId']} + if 'customerId' in policy_property and policy_property['customerId'] + else {}) + prop['filtering_attributes'].update({'model-invariant-id': demand['resourceModelInfo']['modelInvariantId']} + if 'modelInvariantId' in demand['resourceModelInfo'] + and demand['resourceModelInfo']['modelInvariantId'] else {}) + prop['filtering_attributes'].update({'model-version-id': demand['resourceModelInfo']['modelVersionId']} + if 'modelVersionId' in demand['resourceModelInfo'] + and demand['resourceModelInfo']['modelVersionId'] else {}) + prop['filtering_attributes'].update({'equipment-role': policy_property['equipmentRole']} + if 'equipmentRole' in policy_property and policy_property['equipmentRole'] + else {}) + prop.update(get_candidates_demands(demand)) + demand_properties.append(prop) + return demand_properties + + +def get_demand_attributes(prop, policy_property, attribute_type, key): + if policy_property.get(key): + prop[attribute_type] = dict() + for attr_key, attr_val in policy_property[key].items(): + update_converted_attribute(attr_key, attr_val, prop, attribute_type) + + +def update_converted_attribute(attr_key, attr_val, properties, attribute_type): + """Updates dictonary of attributes with one specified in the arguments. + + Automatically translates key namr from camelCase to hyphens + :param attribute_type: attribute section name + :param attr_key: key of the attribute + :param attr_val: value of the attribute + :param properties: dictionary with attributes to update + :return: + """ + if attr_val: + remapping = policy_config_mapping[attribute_type] + if remapping.get(attr_key): + key_value = remapping.get(attr_key) + else: + key_value = re.sub('(.)([A-Z][a-z]+)', r'\1-\2', attr_key) + key_value = re.sub('([a-z0-9])([A-Z])', r'\1-\2', key_value).lower() + properties[attribute_type].update({key_value: attr_val}) + + +def gen_demands(demands, vnf_policies): + """Generate list of demands based on request and VNF policies + + :param demands: A List of demands + :param vnf_policies: Policies associated with demand resources + (e.g. from grouped_policies['vnfPolicy']) + :return: list of demand parameters to populate the Conductor API call + """ + demand_dictionary = {} + for demand in demands: + prop = get_demand_properties(demand, vnf_policies) + if len(prop) > 0: + demand_dictionary.update({demand['resourceModuleName']: prop}) + return demand_dictionary + + +def gen_cloud_region(property): + prop = {"cloud_region_attributes": dict()} + if 'cloudRegion' in property: + for k, v in property['cloudRegion'].items(): + update_converted_attribute(k, v, prop, 'cloud_region_attributes') + return prop["cloud_region_attributes"] diff --git a/osdf/adapters/dcae/des.py b/osdf/adapters/dcae/des.py new file mode 100644 index 0000000..57cb128 --- /dev/null +++ b/osdf/adapters/dcae/des.py @@ -0,0 +1,47 @@ +# ------------------------------------------------------------------------- +# Copyright (C) 2020 Wipro Limited. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ------------------------------------------------------------------------- +# + +import requests + +from osdf.config.base import osdf_config +from osdf.utils.interfaces import RestClient + + +class DESException(Exception): + pass + + +def extract_data(service_id, request_data): + """Extracts data from the data lake via DES. + + param: service_id: kpi data identifier + param: request_data: data to send + param: osdf_config: osdf config to retrieve api info + """ + + config = osdf_config.deployment + user, password = config['desUsername'], config['desPassword'] + headers = config["desHeaders"] + req_url = config["desUrl"] + config["desApiPath"] + service_id + rc = RestClient(userid=user, passwd=password, url=req_url, headers=headers, method="POST") + + try: + response_json = rc.request(data=request_data) + return response_json.get("result") + except requests.RequestException as e: + raise DESException("Request exception was encountered {}".format(e)) diff --git a/osdf/adapters/dcae/message_router.py b/osdf/adapters/dcae/message_router.py index caf04a4..0968812 100755 --- a/osdf/adapters/dcae/message_router.py +++ b/osdf/adapters/dcae/message_router.py @@ -17,8 +17,10 @@ # import requests -from osdf.utils.data_types import list_like + from osdf.operation.exceptions import MessageBusConfigurationException +from osdf.utils.data_types import list_like +from osdf.utils.interfaces import RestClient class MessageRouterClient(object): @@ -27,7 +29,8 @@ class MessageRouterClient(object): consumer_group_id=':', timeout_ms=15000, fetch_limit=1000, userid_passwd=':'): - """ + """Class initializer + :param dmaap_url: protocol, host and port; can also be a list of URLs (e.g. https://dmaap-host.onapdemo.onap.org:3905/events/org.onap.dmaap.MULTICLOUD.URGENT), can also be a list of such URLs @@ -44,14 +47,14 @@ class MessageRouterClient(object): self.topic_urls = [dmaap_url] if not list_like(dmaap_url) else dmaap_url self.timeout_ms = timeout_ms self.fetch_limit = fetch_limit - userid, passwd = userid_passwd.split(':') - self.auth = (userid, passwd) if userid and passwd else None + self.userid, self.passwd = userid_passwd.split(':') consumer_group, consumer_id = consumer_group_id.split(':') self.consumer_group = consumer_group self.consumer_id = consumer_id def get(self, outputjson=True): """Fetch messages from message router (DMaaP or UEB) + :param outputjson: (optional, specifies if response is expected to be in json format), ignored for "POST" :return: response as a json object (if outputjson is True) or as a string """ @@ -61,7 +64,7 @@ class MessageRouterClient(object): for url in urls[:-1]: try: return self.http_request(method='GET', url=url, outputjson=outputjson) - except: + except Exception: pass return self.http_request(method='GET', url=urls[-1], outputjson=outputjson) @@ -69,13 +72,13 @@ class MessageRouterClient(object): for url in self.topic_urls[:-1]: try: return self.http_request(method='POST', url=url, inputjson=inputjson, msg=msg) - except: + except Exception: pass return self.http_request(method='POST', url=self.topic_urls[-1], inputjson=inputjson, msg=msg) def http_request(self, url, method, inputjson=True, outputjson=True, msg=None, **kwargs): - """ - Perform the actual URL request (GET or POST), and do error handling + """Perform the actual URL request (GET or POST), and do error handling + :param url: full URL (including topic, limit, timeout, etc.) :param method: GET or POST :param inputjson: Specify whether input is in json format (valid only for POST) @@ -83,9 +86,15 @@ class MessageRouterClient(object): :param msg: content to be posted (valid only for POST) :return: response as a json object (if outputjson or POST) or as a string; None if error """ - res = requests.request(url=url, method=method, auth=self.auth, **kwargs) - if res.status_code == requests.codes.ok: - return res.json() if outputjson or method == "POST" else res.content - else: - raise Exception("HTTP Response Error: code {}; headers:{}, content: {}".format( - res.status_code, res.headers, res.content)) + + rc = RestClient(userid=self.userid, passwd=self.passwd, url=url, method=method) + try: + res = rc.request(raw_response=True, data=msg, **kwargs) + if res.status_code == requests.codes.ok: + return res.json() if outputjson or method == "POST" else res.content + else: + raise Exception("HTTP Response Error: code {}; headers:{}, content: {}".format( + res.status_code, res.headers, res.content)) + + except requests.RequestException as ex: + raise Exception("Request Exception occurred {}".format(str(ex))) diff --git a/osdf/adapters/local_data/local_policies.py b/osdf/adapters/local_data/local_policies.py index 6e49388..dc6837a 100644 --- a/osdf/adapters/local_data/local_policies.py +++ b/osdf/adapters/local_data/local_policies.py @@ -19,7 +19,7 @@ import json import os import re - +from osdf.logging.osdf_logging import debug_log def get_local_policies(local_policy_folder, local_policy_list, policy_id_list=None): """ @@ -32,6 +32,7 @@ def get_local_policies(local_policy_folder, local_policy_list, policy_id_list=No :param policy_id_list: list of policies to get (if unspecified or None, get all) :return: get policies """ + debug_log.debug("Policy folder: {}, local_list {}, policy id list {}".format(local_policy_folder, local_policy_list, policy_id_list)) policies = [] if policy_id_list: for policy_id in policy_id_list: diff --git a/osdf/adapters/policy/interface.py b/osdf/adapters/policy/interface.py index 95bfacc..6799c6b 100644 --- a/osdf/adapters/policy/interface.py +++ b/osdf/adapters/policy/interface.py @@ -1,4 +1,4 @@ - # ------------------------------------------------------------------------- +# ------------------------------------------------------------------------- # Copyright (c) 2015-2017 AT&T Intellectual Property # # Licensed under the Apache License, Version 2.0 (the "License"); @@ -17,18 +17,23 @@ # import base64 -import itertools import json - - +import os from requests import RequestException -from osdf.operation.exceptions import BusinessException +import uuid +import yaml + from osdf.adapters.local_data.local_policies import get_local_policies -from osdf.adapters.policy.utils import policy_name_as_regex, retrieve_node -from osdf.utils.programming_utils import list_flatten, dot_notation +from osdf.adapters.policy.utils import policy_name_as_regex from osdf.config.base import osdf_config -from osdf.logging.osdf_logging import audit_log, MH, metrics_log, debug_log +from osdf.logging.osdf_logging import audit_log +from osdf.logging.osdf_logging import debug_log +from osdf.logging.osdf_logging import metrics_log +from osdf.logging.osdf_logging import MH +from osdf.operation.exceptions import BusinessException from osdf.utils.interfaces import RestClient +from osdf.utils.programming_utils import dot_notation +from osdf.utils.programming_utils import list_flatten def get_by_name(rest_client, policy_name_list, wildcards=True): @@ -46,7 +51,8 @@ def get_by_name(rest_client, policy_name_list, wildcards=True): def get_by_scope(rest_client, req, config_local, type_service): - """ Get policies by scopes as defined in the configuration file. + """Get policies by scopes as defined in the configuration file. + :param rest_client: a rest client object to make a call. :param req: an optimization request. :param config_local: application configuration file. @@ -55,28 +61,39 @@ def get_by_scope(rest_client, req, config_local, type_service): """ scope_policies = [] references = config_local.get('references', {}) - pscope = config_local.get('policy_info', {}).get(type_service, {}).get('policy_scope', {}) - service_name = dot_notation(req, references.get('service_name', {}).get('value', None)) - primary_scope = pscope['{}_scope'.format(service_name.lower() if pscope.get(service_name + "_scope", None) - else "default")] - for sec_scope in pscope.get('secondary_scopes', []): - policies, scope_fields = [], [] - for field in sec_scope: - scope_fields.extend([get_scope_fields(field, references, req, list_flatten(scope_policies)) - if 'get_param' in field else field]) - scope_fields = set(list_flatten(scope_fields)) - scope_fields = set([x.lower() for x in scope_fields]) - for scope in scope_fields: - policies.extend(policy_api_call(rest_client, primary_scope, scope)) - scope_policies.append([policy for policy in policies - if scope_fields <= set(json.loads(policy['config'])['content']['policyScope'])]) + pscope = config_local.get('policy_info', {}).get(type_service, {}).get('policy_scope', []) + scope_fields = {} + policies = {} + for scopes in pscope: + for key in scopes.keys(): + for field in scopes[key]: + scope_fields[key] = list_flatten([get_scope_fields(field, references, req, policies) + if 'get_param' in field else field]) + if scope_fields.get('resources') and len(scope_fields['resources']) > 1: + for s in scope_fields['resources']: + scope_fields['resources'] = [s] + policies.update(policy_api_call(rest_client, scope_fields).get('policies', {})) + else: + policies.update(policy_api_call(rest_client, scope_fields).get('policies', {})) + for policyName in policies.keys(): + keys = scope_fields.keys() & policies[policyName]['properties'].keys() + policy = {} + policy[policyName] = policies[policyName] + for k in keys: + if set(policies.get(policyName, {}).get('properties', {}).get(k)) >= set(scope_fields[k]) \ + and policy not in scope_policies: + scope_policies.append(policy) + return scope_policies -def get_scope_fields(field, references, req, policy_info): - """ Retrieve the values for scope fields from a request and policies as per the configuration - and references defined in a configuration file. If the value of a scope field missing in a request or +def get_scope_fields(field, references, req, policies): + """Retrieve the values for scope fields from a request and policies + + They are derived as per the configuration and references defined in a + configuration file. If the value of a scope field missing in a request or policies, throw an exception since correct policies cannot be retrieved. + :param field: details on a scope field from a configuration file. :param references: references defined in a configuration file. :param req: an optimization request. @@ -92,9 +109,9 @@ def get_scope_fields(field, references, req, policy_info): raise BusinessException("Field {} is missing a value in a request".format(ref_value.split('.')[-1])) else: scope_fields = [] - for policy in policy_info: - policy_content = json.loads(policy.get('config', "{}")) - if policy_content.get('content', {}).get('policyType', "invalid_policy") == ref_source: + for policyName in policies.keys(): + policy_content = policies.get(policyName) + if policy_content.get('type', "invalid_policy") == ref_source: scope_fields.append(dot_notation(policy_content, ref_value)) scope_values = list_flatten(scope_fields) if len(scope_values) > 0: @@ -103,31 +120,35 @@ def get_scope_fields(field, references, req, policy_info): ref_value.split('.')[-1], ref_source)) -def policy_api_call(rest_client, primary_scope, scope_field): - """ Makes a getConfig API call to the policy system to retrieve policies matching a scope. - :param rest_client: rest client object to make a call - :param primary_scope: the primary scope of policies, which is a folder in the policy system - where policies are stored. - :param scope_field: the secondary scope of policies, which is a collection of domain values. - :return: a list of policies matching both primary and secondary scopes. +def policy_api_call(rest_client, scope_fields): + """Makes the api call to policy and return the response if policies are present + + :param rest_client: rest client to make a call + :param scope_fields: a collection of scopes to be used for filtering + :return: a list of policies matching all filters """ - api_call_body = {"policyName": "{}.*".format(primary_scope), - "configAttributes": {"policyScope": "{}".format(scope_field)}} - return rest_client.request(json=api_call_body) + api_call_body = {"ONAPName": "OOF", + "ONAPComponent": "OOF_Component", + "ONAPInstance": "OOF_Component_Instance", + "action": "optimize", + "resource": scope_fields} + response = rest_client.request(json=api_call_body) + if not response.get("policies"): + raise Exception("Policies not found for the scope {}".format(scope_fields)) + return response def remote_api(req_json, osdf_config, service_type="placement"): """Make a request to policy and return response -- it accounts for multiple requests that be needed + :param req_json: policy request object (can have multiple policy names) :param osdf_config: main config that will have credential information :param service_type: the type of service to call: "placement", "scheduling" :return: all related policies and provStatus retrieved from Subscriber policy """ config = osdf_config.deployment + headers = {"Content-type": "application/json"} uid, passwd = config['policyPlatformUsername'], config['policyPlatformPassword'] - pcuid, pcpasswd = config['policyClientUsername'], config['policyClientPassword'] - headers = {"ClientAuth": base64.b64encode(bytes("{}:{}".format(pcuid, pcpasswd), "ascii"))} - headers.update({'Environment': config['policyPlatformEnv']}) url = config['policyPlatformUrl'] rc = RestClient(userid=uid, passwd=passwd, headers=headers, url=url, log_func=debug_log.debug) @@ -139,17 +160,17 @@ def remote_api(req_json, osdf_config, service_type="placement"): policies = get_by_scope(rc, req_json, osdf_config.core, service_type) formatted_policies = [] - for x in itertools.chain(*policies): - if x['config'] is None: - raise BusinessException("Config not found for policy with name %s" % x['policyName']) + for x in policies: + if x[list(x.keys())[0]].get('properties') is None: + raise BusinessException("Properties not found for policy with name %s" % x[list(x.keys()[0])]) else: - formatted_policies.append(json.loads(x['config'])) + formatted_policies.append(x) return formatted_policies def local_policies_location(req_json, osdf_config, service_type): - """ - Get folder and list of policy_files if "local policies" option is enabled + """Get folder and list of policy_files if "local policies" option is enabled + :param service_type: placement supported for now, but can be any other service :return: a tuple (folder, file_list) or None """ @@ -157,17 +178,20 @@ def local_policies_location(req_json, osdf_config, service_type): if lp.get('global_disabled'): return None # short-circuit to disable all local policies if lp.get('local_{}_policies_enabled'.format(service_type)): + debug_log.debug('Loading local policies for service type: {}'.format(service_type)) if service_type == "scheduling": return lp.get('{}_policy_dir'.format(service_type)), lp.get('{}_policy_files'.format(service_type)) else: - service_name = req_json['serviceInfo']['serviceName'] # TODO: data_mapping.get_service_type(model_name) + service_name = req_json['serviceInfo']['serviceName'] + debug_log.debug('Loading local policies for service name: {}'.format(service_name)) return lp.get('{}_policy_dir_{}'.format(service_type, service_name.lower())), \ - lp.get('{}_policy_files_{}'.format(service_type, service_name.lower())) + lp.get('{}_policy_files_{}'.format(service_type, service_name.lower())) return None def get_policies(request_json, service_type): """Validate the request and get relevant policies + :param request_json: Request object :param service_type: the type of service to call: "placement", "scheduling" :return: policies associated with this request and provStatus retrieved from Subscriber policy @@ -178,6 +202,8 @@ def get_policies(request_json, service_type): local_info = local_policies_location(request_json, osdf_config, service_type) if local_info: # tuple containing location and list of files + if local_info[0] is None or local_info[1] is None: + raise ValueError("Error fetching local policy info") to_filter = None if osdf_config.core['policy_info'][service_type]['policy_fetch'] == "by_name": to_filter = request_json[service_type + "Info"]['policyId'] @@ -186,3 +212,31 @@ def get_policies(request_json, service_type): policies = remote_api(request_json, osdf_config, service_type) return policies + + +def upload_policy_models(): + """Upload all the policy models reside in the folder""" + requestId = uuid.uuid4() + config = osdf_config.deployment + model_path = config['pathPolicyModelUpload'] + uid, passwd = config['policyPlatformUsername'], config['policyPlatformPassword'] + pcuid, pcpasswd = config['policyClientUsername'], config['policyClientPassword'] + headers = {"ClientAuth": base64.b64encode(bytes("{}:{}".format(pcuid, pcpasswd), "ascii"))} + headers.update({'Environment': config['policyPlatformEnv']}) + headers.update({'X-ONAP-RequestID': requestId}) + url = config['policyPlatformUrlModelUpload'] + rc = RestClient(userid=uid, passwd=passwd, headers=headers, url=url, log_func=debug_log.debug) + + for file in os.listdir(model_path): + if not file.endswith(".yaml"): + continue + with open(file) as f: + file_converted = json.dumps(yaml.load(f)) + response = rc.request(json=file_converted, ok_codes=(200)) + if not response: + success = False + audit_log.warn("Policy model %s uploading failed!" % file) + if not success: + return "Policy model uploading success!" + else: + return "Policy model uploading not success!" diff --git a/osdf/adapters/policy/utils.py b/osdf/adapters/policy/utils.py index 2f873af..79047eb 100644 --- a/osdf/adapters/policy/utils.py +++ b/osdf/adapters/policy/utils.py @@ -33,11 +33,11 @@ def group_policies_gen(flat_policies, config): """ filtered_policies = defaultdict(list) policy_name = [] - policies = [x for x in flat_policies if x['content'].get('policyType')] # drop ones without 'policy_type' + policies = [x for x in flat_policies if x[list(x.keys())[0]]["type"]] # drop ones without 'type' priority = config.get('policy_info', {}).get('prioritization_attributes', {}) aggregated_policies = dict() for plc in policies: - attrs = [dot_notation(plc, dot_path) for key in priority.keys() for dot_path in priority[key]] + attrs = [dot_notation(plc[list(plc.keys())[0]], dot_path) for key in priority.keys() for dot_path in priority[key]] attrs_list = [x if isinstance(x, list) else [x] for x in attrs] attributes = [list_flatten(x) if isinstance(x, list) else x for x in attrs_list] for y in itertools.product(*attributes): @@ -45,12 +45,12 @@ def group_policies_gen(flat_policies, config): aggregated_policies[y].append(plc) for key in aggregated_policies.keys(): - aggregated_policies[key].sort(key=lambda x: x['priority'], reverse=True) + #aggregated_policies[key].sort(key=lambda x: x['priority'], reverse=True) prioritized_policy = aggregated_policies[key][0] - if prioritized_policy['policyName'] not in policy_name: + if list(prioritized_policy.keys())[0] not in policy_name: # TODO: Check logic here... should policy appear only once across all groups? - filtered_policies[prioritized_policy['content']['policyType']].append(prioritized_policy) - policy_name.append(prioritized_policy['policyName']) + filtered_policies[prioritized_policy[list(prioritized_policy.keys())[0]]['type']].append(prioritized_policy) + policy_name.append(list(prioritized_policy.keys())[0]) return filtered_policies |