From edf98746a52408386efab26143778198b0efd3c5 Mon Sep 17 00:00:00 2001 From: dhebeha Date: Sat, 5 Sep 2020 20:16:48 +0530 Subject: Add support to process NSI selection request Issue-ID: OPTFRA-802 Signed-off-by: dhebeha Signed-off-by: krishnaa96 Change-Id: I85d951061abc697714425bd223b89102d4f2ede9 --- .../optimizers/conductor/remote_opt_processor.py | 176 ++++++++-------- .../optimizers/conductor/response_processor.py | 227 ++++++++------------- 2 files changed, 181 insertions(+), 222 deletions(-) (limited to 'apps/slice_selection/optimizers') diff --git a/apps/slice_selection/optimizers/conductor/remote_opt_processor.py b/apps/slice_selection/optimizers/conductor/remote_opt_processor.py index 40638fc..c1c6980 100644 --- a/apps/slice_selection/optimizers/conductor/remote_opt_processor.py +++ b/apps/slice_selection/optimizers/conductor/remote_opt_processor.py @@ -20,92 +20,106 @@ Module for processing slice selection request """ -import json -import traceback from requests import RequestException +from threading import Thread +import traceback -from apps.slice_selection.optimizers.conductor.response_processor \ - import conductor_response_processor, conductor_error_response_processor, solution_with_only_slice_profile, get_nsi_selection_response +from apps.slice_selection.optimizers.conductor.response_processor import ResponseProcessor from osdf.adapters.conductor import conductor from osdf.adapters.policy.interface import get_policies -from osdf.adapters.policy.utils import group_policies_gen -from osdf.logging.osdf_logging import error_log, debug_log +from osdf.logging.osdf_logging import debug_log +from osdf.logging.osdf_logging import error_log +from osdf.utils.interfaces import get_rest_client from osdf.utils.mdc_utils import mdc_from_json -def process_nsi_selection_opt(request_json, osdf_config): - """Process the nsi selection request from API layer - :param request_json: api request - :param policies: flattened policies corresponding to this request - :param osdf_config: configuration specific to OSDF app - :return: response as a dictionary - """ - req_info = request_json['requestInfo'] - try: - mdc_from_json(request_json) - - overall_recommendations = dict() - nst_info_map = dict() - new_nsi_solutions = list() - for nst_info in request_json["NSTInfoList"]: - nst_name = nst_info["modelName"] - nst_info_map[nst_name] = {"NSTName": nst_name, - "UUID": nst_info["modelVersionId"], - "invariantUUID": nst_info["modelInvariantId"]} - - if request_json["serviceProfile"]["resourceSharingLevel"] == "non-shared": - new_nsi_solution = solution_with_only_slice_profile(request_json['serviceProfile'], nst_info_map.get(nst_name)) - new_nsi_solutions.append(new_nsi_solution) +class SliceSelectionOptimizer(Thread): + def __init__(self, osdf_config, slice_config, request_json, model_type): + self.osdf_config = osdf_config + self.slice_config = slice_config + self.request_json = request_json + self.model_type = model_type + self.response_processor = ResponseProcessor(request_json['requestInfo'], slice_config) + + def run(self): + self.process_slice_selection_opt() + + def process_slice_selection_opt(self): + """Process the slice selection request from the API layer""" + req_info = self.request_json['requestInfo'] + rc = get_rest_client(self.request_json, service='so') + + try: + if self.model_type == 'NSSI' \ + and self.request_json['sliceProfile'].get('resourceSharingLevel', "") == 'not-shared': + final_response = self.response_processor.get_slice_selection_response([]) + else: - policy_request_json = request_json.copy() - policy_request_json['serviceInfo']['serviceName'] = nst_name - policies = get_policies(policy_request_json, "slice_selection") - - demands = get_slice_demands(nst_name, policies, osdf_config.core) - - request_parameters = request_json.get('serviceProfile',{}) - service_info = {} - req_info['numSolutions'] = 'all' - try: - resp = conductor.request(req_info, demands, request_parameters, service_info, False, - osdf_config, policies) - except RequestException as e: - resp = e.response.json() - error = resp['plans'][0]['message'] - error_log.error('Error from conductor {}'.format(error)) - debug_log.debug("Response from conductor {}".format(str(resp))) - overall_recommendations[nst_name] = resp["plans"][0].get("recommendations") - - if request_json["serviceProfile"]["resourceSharingLevel"] == "non-shared": - solutions = dict() - solutions['newNSISolutions'] = new_nsi_solutions - solutions['sharedNSISolutions'] = [] - return get_nsi_selection_response(req_info, solutions) - else: - return conductor_response_processor(overall_recommendations, nst_info_map, req_info, request_json["serviceProfile"]) - except Exception as ex: - error_log.error("Error for {} {}".format(req_info.get('requestId'), - traceback.format_exc())) - error_message = str(ex) - return conductor_error_response_processor(req_info, error_message) - - -def get_slice_demands(model_name, policies, config): - """ - :param model_name: model name of the slice - :param policies: flattened polcies corresponding to the request - :param config: configuration specific to OSDF app - :return: list of demands for the request - """ - group_policies = group_policies_gen(policies, config) - subscriber_policy_list = group_policies["onap.policies.optimization.service.SubscriberPolicy"] - slice_demands = list() - for subscriber_policy in subscriber_policy_list: - policy_properties = subscriber_policy[list(subscriber_policy.keys())[0]]['properties'] - if model_name in policy_properties["services"]: - for subnet in policy_properties["properties"]["subscriberName"]: - slice_demand = dict() - slice_demand["resourceModuleName"] = subnet - slice_demand['resourceModelInfo'] = {} - slice_demands.append(slice_demand) - return slice_demands + final_response = self.do_slice_selection() + + except Exception as ex: + error_log.error("Error for {} {}".format(req_info.get('requestId'), + traceback.format_exc())) + error_message = str(ex) + final_response = self.response_processor.process_error_response(error_message) + + try: + rc.request(json=final_response, noresponse=True) + except RequestException: + error_log.error("Error sending asynchronous notification for {} {}".format(req_info['request_id'], + traceback.format_exc())) + + def do_slice_selection(self): + req_info = self.request_json['requestInfo'] + app_info = self.slice_config['app_info'][self.model_type] + mdc_from_json(self.request_json) + requirements = self.request_json.get(app_info['requirements_field'], {}) + model_info = self.request_json.get(app_info['model_info']) + model_name = model_info['name'] + policies = self.get_app_policies(model_name, app_info['app_name']) + request_parameters = self.get_request_parameters(requirements) + + demands = [ + { + "resourceModuleName": model_name, + "resourceModelInfo": {} + } + ] + + try: + template_fields = { + 'location_enabled': False, + 'version': '2020-08-13' + } + resp = conductor.request(req_info, demands, request_parameters, {}, template_fields, + self.osdf_config, policies) + except RequestException as e: + resp = e.response.json() + error = resp['plans'][0]['message'] + error_log.error('Error from conductor {}'.format(error)) + return self.response_processor.process_error_response(error) + + debug_log.debug("Response from conductor {}".format(str(resp))) + recommendations = resp["plans"][0].get("recommendations") + subnets = [subnet['domainType'] for subnet in self.request_json['subnetCapabilities']] \ + if self.request_json.get('subnetCapabilities') else [] + return self.response_processor.process_response(recommendations, model_info, subnets) + + def get_request_parameters(self, requirements): + camel_to_snake = self.slice_config['attribute_mapping']['camel_to_snake'] + request_params = {camel_to_snake[key]: value for key, value in requirements.items()} + subnet_capabilities = self.request_json.get('subnetCapabilities') + if subnet_capabilities: + for subnet_capability in subnet_capabilities: + domain_type = f"{subnet_capability['domainType'].lower().replace('-', '_')}_" + capability_details = subnet_capability['capabilityDetails'] + for key, value in capability_details.items(): + request_params[f"{domain_type}{camel_to_snake[key]}"] = value + return request_params + + def get_app_policies(self, model_name, app_name): + policy_request_json = self.request_json.copy() + policy_request_json['serviceInfo'] = {'serviceName': model_name} + if 'preferReuse' in self.request_json: + policy_request_json['preferReuse'] = "reuse" if self.request_json['preferReuse'] else "create_new" + return get_policies(policy_request_json, app_name) diff --git a/apps/slice_selection/optimizers/conductor/response_processor.py b/apps/slice_selection/optimizers/conductor/response_processor.py index a9bdad0..71a350f 100644 --- a/apps/slice_selection/optimizers/conductor/response_processor.py +++ b/apps/slice_selection/optimizers/conductor/response_processor.py @@ -20,144 +20,89 @@ Module for processing response from conductor for slice selection """ -from osdf.logging.osdf_logging import debug_log - - -SLICE_PROFILE_FIELDS = {"latency":"latency", "max_number_of_ues":"maxNumberOfUEs", "coverage_area_ta_list": "coverageAreaTAList", - "ue_mobility_level":"uEMobilityLevel", "resource_sharing_level":"resourceSharingLevel", "exp_data_rate_ul": "expDataRateUL", - "exp_data_rate_dl":"expDataRateDL", "area_traffic_cap_ul":"areaTrafficCapUL", "area_traffic_cap_dl": "areaTrafficCapDL", - "activity_factor":"activityFactor", "e2e_latency":"e2eLatency", "jitter":"jitter", "survival_time": "survivalTime", - "exp_data_rate":"expDataRate", "payload_size":"payloadSize", "traffic_density":"trafficDensity", "conn_density":"connDensity", - "reliability":"reliability", "service_area_dimension":"serviceAreaDimension", "cs_availability": "csAvailability"} - - -def conductor_response_processor(overall_recommendations, nst_info_map, request_info, service_profile): - """Process conductor response to form the response for the API request - :param overall_recommendations: recommendations from conductor - :param nst_info_map: NST info from the request - :param request_info: request info - :return: response json as a dictionary - """ - shared_nsi_solutions = list() - new_nsi_solutions = list() - for nst_name, recommendations in overall_recommendations.items(): - if not (recommendations): - new_nsi_solution = solution_with_only_slice_profile(service_profile, nst_info_map.get(nst_name)) - new_nsi_solutions.append(new_nsi_solution) - continue - - for recommendation in recommendations: - nsi_set = set(values['candidate']['nsi_id'] for key, values in recommendation.items()) - if len(nsi_set) == 1: - nsi_id = nsi_set.pop() - candidate = list(recommendation.values())[0]['candidate'] - debug_log.debug("The NSSIs in the solution belongs to the same NSI {}" - .format(nsi_id)) - shared_nsi_solution = dict() - shared_nsi_solution["NSIId"] = nsi_id - shared_nsi_solution["NSIName"] = candidate.get('nsi_name') - shared_nsi_solution["UUID"] = candidate.get('nsi_model_version_id') - shared_nsi_solution["invariantUUID"] = candidate.get('nsi_model_invariant_id') - - nssi_info_list = get_nssi_solutions(recommendation) - nssis = list() - for nssi_info in nssi_info_list: - nssi = dict() - nssi["NSSIId"] = nssi_info.get("NSSISolution").get("NSSIId") - nssi["NSSIName"] = nssi_info.get("NSSISolution").get("NSSIName") - nssi["UUID"] = "" - nssi["invariantUUID"] = "" - nssi_info.get("sliceProfile").update({"domainType":"cn"}) - nssi["sliceProfile"] = [nssi_info.get("sliceProfile")] - nssis.append(nssi) - - shared_nsi_solution["NSSIs"] = nssis - shared_nsi_solutions.append(shared_nsi_solution) - else: - nssi_solutions = get_nssi_solutions(recommendation) - new_nsi_solution = dict() - new_nsi_solution['matchLevel'] = "" - new_nsi_solution['NSTInfo'] = nst_info_map.get(nst_name) - new_nsi_solution['NSSISolutions'] = nssi_solutions - new_nsi_solutions.append(new_nsi_solution) - - solutions = dict() - solutions['sharedNSISolutions'] = shared_nsi_solutions - solutions['newNSISolutions'] = new_nsi_solutions - return get_nsi_selection_response(request_info, solutions) - - -def solution_with_only_slice_profile(service_profile, nst_info): - nssi_solutions = get_slice_profile_from_service_profile(service_profile) - new_nsi_solution = dict() - new_nsi_solution['matchLevel'] = "" - new_nsi_solution['NSTInfo'] = nst_info - new_nsi_solution['NSSISolutions'] = nssi_solutions - return new_nsi_solution - -def conductor_error_response_processor(request_info, error_message): - """Form response message from the error message - :param request_info: request info - :param error_message: error message while processing the request - :return: response json as dictionary - """ - return {'requestId': request_info['requestId'], - 'transactionId': request_info['transactionId'], - 'requestStatus': 'error', - 'statusMessage': error_message} - - -def get_slice_profile_from_service_profile(service_profile): - nssi_solutions = list() - service_profile["domainType"] = "cn" - nssi_solution = {"sliceProfile": service_profile} - nssi_solutions.append(nssi_solution) - return nssi_solutions - - -def get_nssi_solutions(recommendation): - """Get nssi solutions from recommendation - :param recommendation: recommendation from conductor - :return: new nssi solutions list - """ - nssi_solutions = list() - - for nsst_name, nsst_rec in recommendation.items(): - candidate = nsst_rec['candidate'] - nssi_info, slice_profile = get_solution_from_candidate(candidate) - nsst_info = {"NSSTName": nsst_name} - nssi_solution = {"sliceProfile": slice_profile, - "NSSTInfo": nsst_info, - "NSSISolution": nssi_info} - nssi_solutions.append(nssi_solution) - return nssi_solutions - - -def get_solution_from_candidate(candidate): - """Get nssi info from candidate - :param candidate: Candidate from the recommendation - :return: nssi_info and slice profile derived from candidate - """ - slice_profile = dict() - nssi_info = {"NSSIName": candidate['instance_name'], - "NSSIId": candidate['candidate_id']} - - for field in SLICE_PROFILE_FIELDS: - if candidate[field]: - slice_profile[SLICE_PROFILE_FIELDS[field]] = candidate[field] - - return nssi_info, slice_profile - - -def get_nsi_selection_response(request_info, solutions): - """Get NSI selection response from final solution - :param request_info: request info - :param solutions: final solutions - :return: NSI selection response to send back as dictionary - """ - return {'requestId': request_info['requestId'], - 'transactionId': request_info['transactionId'], - 'requestStatus': 'completed', - 'statusMessage': '', - 'solutions': solutions} - +import re + + +class ResponseProcessor(object): + def __init__(self, request_info, slice_config): + self.request_info = request_info + self.slice_config = slice_config + + def process_response(self, recommendations, model_info, subnets): + """Process conductor response to form the response for the API request + + :param recommendations: recommendations from conductor + :param model_info: model info from the request + :param subnets: list of subnets + :return: response json as a dictionary + """ + if not recommendations: + return self.get_slice_selection_response([]) + model_name = model_info['name'] + solutions = [self.get_solution_from_candidate(rec[model_name]['candidate'], model_info, subnets) + for rec in recommendations] + return self.get_slice_selection_response(solutions) + + def get_solution_from_candidate(self, candidate, model_info, subnets): + if candidate['inventory_type'] == 'nssi': + return { + 'UUID': model_info['UUID'], + 'invariantUUID': model_info['invariantUUID'], + 'NSSIName': candidate['instance_name'], + 'NSSIId': candidate['instance_id'] + } + + elif candidate['inventory_type'] == 'nsi': + return { + 'existingNSI': True, + 'sharedNSISolution': { + 'UUID': model_info['UUID'], + 'invariantUUID': model_info['invariantUUID'], + 'NSIName': candidate['instance_name'], + 'NSIId': candidate['instance_id'] + } + } + + elif candidate['inventory_type'] == 'slice_profiles': + return { + 'existingNSI': False, + 'newNSISolution': { + 'slice_profiles': self.get_slice_profiles_from_candidate(candidate, subnets) + } + } + + def get_slice_profiles_from_candidate(self, candidate, subnets): + slice_profiles = [] + for subnet in subnets: + slice_profile = {self.get_profile_attribute(k, subnet): v for k, v in candidate.items() + if k.startswith(subnet)} + slice_profile['domainType'] = subnet + slice_profiles.append(slice_profile) + return slice_profiles + + def get_profile_attribute(self, attribute, subnet): + snake_to_camel = self.slice_config['attribute_mapping']['snake_to_camel'] + return snake_to_camel[re.sub(f'^{subnet}_', '', attribute)] + + def process_error_response(self, error_message): + """Form response message from the error message + + :param error_message: error message while processing the request + :return: response json as dictionary + """ + return {'requestId': self.request_info['requestId'], + 'transactionId': self.request_info['transactionId'], + 'requestStatus': 'error', + 'statusMessage': error_message} + + def get_slice_selection_response(self, solutions): + """Get NSI selection response from final solution + + :param solutions: final solutions + :return: NSI selection response to send back as dictionary + """ + return {'requestId': self.request_info['requestId'], + 'transactionId': self.request_info['transactionId'], + 'requestStatus': 'completed', + 'statusMessage': '', + 'solutions': solutions} -- cgit 1.2.3-korg