aboutsummaryrefslogtreecommitdiffstats
path: root/apps/slice_selection/optimizers/conductor
diff options
context:
space:
mode:
authordhebeha <dhebeha.mj71@wipro.com>2020-09-05 20:16:48 +0530
committerVikas Varma <vikas.varma@att.com>2020-09-18 19:34:01 +0000
commitedf98746a52408386efab26143778198b0efd3c5 (patch)
treeadc101beb879a57547cc828283803bfe7c5fd89b /apps/slice_selection/optimizers/conductor
parentf9b3575cae2b521ba8c6b6b30b15c89bd8a1cb48 (diff)
Add support to process NSI selection request
Issue-ID: OPTFRA-802 Signed-off-by: dhebeha <dhebeha.mj71@wipro.com> Signed-off-by: krishnaa96 <krishna.moorthy6@wipro.com> Change-Id: I85d951061abc697714425bd223b89102d4f2ede9
Diffstat (limited to 'apps/slice_selection/optimizers/conductor')
-rw-r--r--apps/slice_selection/optimizers/conductor/remote_opt_processor.py176
-rw-r--r--apps/slice_selection/optimizers/conductor/response_processor.py227
2 files changed, 181 insertions, 222 deletions
diff --git a/apps/slice_selection/optimizers/conductor/remote_opt_processor.py b/apps/slice_selection/optimizers/conductor/remote_opt_processor.py
index 40638fc..c1c6980 100644
--- a/apps/slice_selection/optimizers/conductor/remote_opt_processor.py
+++ b/apps/slice_selection/optimizers/conductor/remote_opt_processor.py
@@ -20,92 +20,106 @@
Module for processing slice selection request
"""
-import json
-import traceback
from requests import RequestException
+from threading import Thread
+import traceback
-from apps.slice_selection.optimizers.conductor.response_processor \
- import conductor_response_processor, conductor_error_response_processor, solution_with_only_slice_profile, get_nsi_selection_response
+from apps.slice_selection.optimizers.conductor.response_processor import ResponseProcessor
from osdf.adapters.conductor import conductor
from osdf.adapters.policy.interface import get_policies
-from osdf.adapters.policy.utils import group_policies_gen
-from osdf.logging.osdf_logging import error_log, debug_log
+from osdf.logging.osdf_logging import debug_log
+from osdf.logging.osdf_logging import error_log
+from osdf.utils.interfaces import get_rest_client
from osdf.utils.mdc_utils import mdc_from_json
-def process_nsi_selection_opt(request_json, osdf_config):
- """Process the nsi selection request from API layer
- :param request_json: api request
- :param policies: flattened policies corresponding to this request
- :param osdf_config: configuration specific to OSDF app
- :return: response as a dictionary
- """
- req_info = request_json['requestInfo']
- try:
- mdc_from_json(request_json)
-
- overall_recommendations = dict()
- nst_info_map = dict()
- new_nsi_solutions = list()
- for nst_info in request_json["NSTInfoList"]:
- nst_name = nst_info["modelName"]
- nst_info_map[nst_name] = {"NSTName": nst_name,
- "UUID": nst_info["modelVersionId"],
- "invariantUUID": nst_info["modelInvariantId"]}
-
- if request_json["serviceProfile"]["resourceSharingLevel"] == "non-shared":
- new_nsi_solution = solution_with_only_slice_profile(request_json['serviceProfile'], nst_info_map.get(nst_name))
- new_nsi_solutions.append(new_nsi_solution)
+class SliceSelectionOptimizer(Thread):
+ def __init__(self, osdf_config, slice_config, request_json, model_type):
+ self.osdf_config = osdf_config
+ self.slice_config = slice_config
+ self.request_json = request_json
+ self.model_type = model_type
+ self.response_processor = ResponseProcessor(request_json['requestInfo'], slice_config)
+
+ def run(self):
+ self.process_slice_selection_opt()
+
+ def process_slice_selection_opt(self):
+ """Process the slice selection request from the API layer"""
+ req_info = self.request_json['requestInfo']
+ rc = get_rest_client(self.request_json, service='so')
+
+ try:
+ if self.model_type == 'NSSI' \
+ and self.request_json['sliceProfile'].get('resourceSharingLevel', "") == 'not-shared':
+ final_response = self.response_processor.get_slice_selection_response([])
+
else:
- policy_request_json = request_json.copy()
- policy_request_json['serviceInfo']['serviceName'] = nst_name
- policies = get_policies(policy_request_json, "slice_selection")
-
- demands = get_slice_demands(nst_name, policies, osdf_config.core)
-
- request_parameters = request_json.get('serviceProfile',{})
- service_info = {}
- req_info['numSolutions'] = 'all'
- try:
- resp = conductor.request(req_info, demands, request_parameters, service_info, False,
- osdf_config, policies)
- except RequestException as e:
- resp = e.response.json()
- error = resp['plans'][0]['message']
- error_log.error('Error from conductor {}'.format(error))
- debug_log.debug("Response from conductor {}".format(str(resp)))
- overall_recommendations[nst_name] = resp["plans"][0].get("recommendations")
-
- if request_json["serviceProfile"]["resourceSharingLevel"] == "non-shared":
- solutions = dict()
- solutions['newNSISolutions'] = new_nsi_solutions
- solutions['sharedNSISolutions'] = []
- return get_nsi_selection_response(req_info, solutions)
- else:
- return conductor_response_processor(overall_recommendations, nst_info_map, req_info, request_json["serviceProfile"])
- except Exception as ex:
- error_log.error("Error for {} {}".format(req_info.get('requestId'),
- traceback.format_exc()))
- error_message = str(ex)
- return conductor_error_response_processor(req_info, error_message)
-
-
-def get_slice_demands(model_name, policies, config):
- """
- :param model_name: model name of the slice
- :param policies: flattened polcies corresponding to the request
- :param config: configuration specific to OSDF app
- :return: list of demands for the request
- """
- group_policies = group_policies_gen(policies, config)
- subscriber_policy_list = group_policies["onap.policies.optimization.service.SubscriberPolicy"]
- slice_demands = list()
- for subscriber_policy in subscriber_policy_list:
- policy_properties = subscriber_policy[list(subscriber_policy.keys())[0]]['properties']
- if model_name in policy_properties["services"]:
- for subnet in policy_properties["properties"]["subscriberName"]:
- slice_demand = dict()
- slice_demand["resourceModuleName"] = subnet
- slice_demand['resourceModelInfo'] = {}
- slice_demands.append(slice_demand)
- return slice_demands
+ final_response = self.do_slice_selection()
+
+ except Exception as ex:
+ error_log.error("Error for {} {}".format(req_info.get('requestId'),
+ traceback.format_exc()))
+ error_message = str(ex)
+ final_response = self.response_processor.process_error_response(error_message)
+
+ try:
+ rc.request(json=final_response, noresponse=True)
+ except RequestException:
+ error_log.error("Error sending asynchronous notification for {} {}".format(req_info['request_id'],
+ traceback.format_exc()))
+
+ def do_slice_selection(self):
+ req_info = self.request_json['requestInfo']
+ app_info = self.slice_config['app_info'][self.model_type]
+ mdc_from_json(self.request_json)
+ requirements = self.request_json.get(app_info['requirements_field'], {})
+ model_info = self.request_json.get(app_info['model_info'])
+ model_name = model_info['name']
+ policies = self.get_app_policies(model_name, app_info['app_name'])
+ request_parameters = self.get_request_parameters(requirements)
+
+ demands = [
+ {
+ "resourceModuleName": model_name,
+ "resourceModelInfo": {}
+ }
+ ]
+
+ try:
+ template_fields = {
+ 'location_enabled': False,
+ 'version': '2020-08-13'
+ }
+ resp = conductor.request(req_info, demands, request_parameters, {}, template_fields,
+ self.osdf_config, policies)
+ except RequestException as e:
+ resp = e.response.json()
+ error = resp['plans'][0]['message']
+ error_log.error('Error from conductor {}'.format(error))
+ return self.response_processor.process_error_response(error)
+
+ debug_log.debug("Response from conductor {}".format(str(resp)))
+ recommendations = resp["plans"][0].get("recommendations")
+ subnets = [subnet['domainType'] for subnet in self.request_json['subnetCapabilities']] \
+ if self.request_json.get('subnetCapabilities') else []
+ return self.response_processor.process_response(recommendations, model_info, subnets)
+
+ def get_request_parameters(self, requirements):
+ camel_to_snake = self.slice_config['attribute_mapping']['camel_to_snake']
+ request_params = {camel_to_snake[key]: value for key, value in requirements.items()}
+ subnet_capabilities = self.request_json.get('subnetCapabilities')
+ if subnet_capabilities:
+ for subnet_capability in subnet_capabilities:
+ domain_type = f"{subnet_capability['domainType'].lower().replace('-', '_')}_"
+ capability_details = subnet_capability['capabilityDetails']
+ for key, value in capability_details.items():
+ request_params[f"{domain_type}{camel_to_snake[key]}"] = value
+ return request_params
+
+ def get_app_policies(self, model_name, app_name):
+ policy_request_json = self.request_json.copy()
+ policy_request_json['serviceInfo'] = {'serviceName': model_name}
+ if 'preferReuse' in self.request_json:
+ policy_request_json['preferReuse'] = "reuse" if self.request_json['preferReuse'] else "create_new"
+ return get_policies(policy_request_json, app_name)
diff --git a/apps/slice_selection/optimizers/conductor/response_processor.py b/apps/slice_selection/optimizers/conductor/response_processor.py
index a9bdad0..71a350f 100644
--- a/apps/slice_selection/optimizers/conductor/response_processor.py
+++ b/apps/slice_selection/optimizers/conductor/response_processor.py
@@ -20,144 +20,89 @@
Module for processing response from conductor for slice selection
"""
-from osdf.logging.osdf_logging import debug_log
-
-
-SLICE_PROFILE_FIELDS = {"latency":"latency", "max_number_of_ues":"maxNumberOfUEs", "coverage_area_ta_list": "coverageAreaTAList",
- "ue_mobility_level":"uEMobilityLevel", "resource_sharing_level":"resourceSharingLevel", "exp_data_rate_ul": "expDataRateUL",
- "exp_data_rate_dl":"expDataRateDL", "area_traffic_cap_ul":"areaTrafficCapUL", "area_traffic_cap_dl": "areaTrafficCapDL",
- "activity_factor":"activityFactor", "e2e_latency":"e2eLatency", "jitter":"jitter", "survival_time": "survivalTime",
- "exp_data_rate":"expDataRate", "payload_size":"payloadSize", "traffic_density":"trafficDensity", "conn_density":"connDensity",
- "reliability":"reliability", "service_area_dimension":"serviceAreaDimension", "cs_availability": "csAvailability"}
-
-
-def conductor_response_processor(overall_recommendations, nst_info_map, request_info, service_profile):
- """Process conductor response to form the response for the API request
- :param overall_recommendations: recommendations from conductor
- :param nst_info_map: NST info from the request
- :param request_info: request info
- :return: response json as a dictionary
- """
- shared_nsi_solutions = list()
- new_nsi_solutions = list()
- for nst_name, recommendations in overall_recommendations.items():
- if not (recommendations):
- new_nsi_solution = solution_with_only_slice_profile(service_profile, nst_info_map.get(nst_name))
- new_nsi_solutions.append(new_nsi_solution)
- continue
-
- for recommendation in recommendations:
- nsi_set = set(values['candidate']['nsi_id'] for key, values in recommendation.items())
- if len(nsi_set) == 1:
- nsi_id = nsi_set.pop()
- candidate = list(recommendation.values())[0]['candidate']
- debug_log.debug("The NSSIs in the solution belongs to the same NSI {}"
- .format(nsi_id))
- shared_nsi_solution = dict()
- shared_nsi_solution["NSIId"] = nsi_id
- shared_nsi_solution["NSIName"] = candidate.get('nsi_name')
- shared_nsi_solution["UUID"] = candidate.get('nsi_model_version_id')
- shared_nsi_solution["invariantUUID"] = candidate.get('nsi_model_invariant_id')
-
- nssi_info_list = get_nssi_solutions(recommendation)
- nssis = list()
- for nssi_info in nssi_info_list:
- nssi = dict()
- nssi["NSSIId"] = nssi_info.get("NSSISolution").get("NSSIId")
- nssi["NSSIName"] = nssi_info.get("NSSISolution").get("NSSIName")
- nssi["UUID"] = ""
- nssi["invariantUUID"] = ""
- nssi_info.get("sliceProfile").update({"domainType":"cn"})
- nssi["sliceProfile"] = [nssi_info.get("sliceProfile")]
- nssis.append(nssi)
-
- shared_nsi_solution["NSSIs"] = nssis
- shared_nsi_solutions.append(shared_nsi_solution)
- else:
- nssi_solutions = get_nssi_solutions(recommendation)
- new_nsi_solution = dict()
- new_nsi_solution['matchLevel'] = ""
- new_nsi_solution['NSTInfo'] = nst_info_map.get(nst_name)
- new_nsi_solution['NSSISolutions'] = nssi_solutions
- new_nsi_solutions.append(new_nsi_solution)
-
- solutions = dict()
- solutions['sharedNSISolutions'] = shared_nsi_solutions
- solutions['newNSISolutions'] = new_nsi_solutions
- return get_nsi_selection_response(request_info, solutions)
-
-
-def solution_with_only_slice_profile(service_profile, nst_info):
- nssi_solutions = get_slice_profile_from_service_profile(service_profile)
- new_nsi_solution = dict()
- new_nsi_solution['matchLevel'] = ""
- new_nsi_solution['NSTInfo'] = nst_info
- new_nsi_solution['NSSISolutions'] = nssi_solutions
- return new_nsi_solution
-
-def conductor_error_response_processor(request_info, error_message):
- """Form response message from the error message
- :param request_info: request info
- :param error_message: error message while processing the request
- :return: response json as dictionary
- """
- return {'requestId': request_info['requestId'],
- 'transactionId': request_info['transactionId'],
- 'requestStatus': 'error',
- 'statusMessage': error_message}
-
-
-def get_slice_profile_from_service_profile(service_profile):
- nssi_solutions = list()
- service_profile["domainType"] = "cn"
- nssi_solution = {"sliceProfile": service_profile}
- nssi_solutions.append(nssi_solution)
- return nssi_solutions
-
-
-def get_nssi_solutions(recommendation):
- """Get nssi solutions from recommendation
- :param recommendation: recommendation from conductor
- :return: new nssi solutions list
- """
- nssi_solutions = list()
-
- for nsst_name, nsst_rec in recommendation.items():
- candidate = nsst_rec['candidate']
- nssi_info, slice_profile = get_solution_from_candidate(candidate)
- nsst_info = {"NSSTName": nsst_name}
- nssi_solution = {"sliceProfile": slice_profile,
- "NSSTInfo": nsst_info,
- "NSSISolution": nssi_info}
- nssi_solutions.append(nssi_solution)
- return nssi_solutions
-
-
-def get_solution_from_candidate(candidate):
- """Get nssi info from candidate
- :param candidate: Candidate from the recommendation
- :return: nssi_info and slice profile derived from candidate
- """
- slice_profile = dict()
- nssi_info = {"NSSIName": candidate['instance_name'],
- "NSSIId": candidate['candidate_id']}
-
- for field in SLICE_PROFILE_FIELDS:
- if candidate[field]:
- slice_profile[SLICE_PROFILE_FIELDS[field]] = candidate[field]
-
- return nssi_info, slice_profile
-
-
-def get_nsi_selection_response(request_info, solutions):
- """Get NSI selection response from final solution
- :param request_info: request info
- :param solutions: final solutions
- :return: NSI selection response to send back as dictionary
- """
- return {'requestId': request_info['requestId'],
- 'transactionId': request_info['transactionId'],
- 'requestStatus': 'completed',
- 'statusMessage': '',
- 'solutions': solutions}
-
+import re
+
+
+class ResponseProcessor(object):
+ def __init__(self, request_info, slice_config):
+ self.request_info = request_info
+ self.slice_config = slice_config
+
+ def process_response(self, recommendations, model_info, subnets):
+ """Process conductor response to form the response for the API request
+
+ :param recommendations: recommendations from conductor
+ :param model_info: model info from the request
+ :param subnets: list of subnets
+ :return: response json as a dictionary
+ """
+ if not recommendations:
+ return self.get_slice_selection_response([])
+ model_name = model_info['name']
+ solutions = [self.get_solution_from_candidate(rec[model_name]['candidate'], model_info, subnets)
+ for rec in recommendations]
+ return self.get_slice_selection_response(solutions)
+
+ def get_solution_from_candidate(self, candidate, model_info, subnets):
+ if candidate['inventory_type'] == 'nssi':
+ return {
+ 'UUID': model_info['UUID'],
+ 'invariantUUID': model_info['invariantUUID'],
+ 'NSSIName': candidate['instance_name'],
+ 'NSSIId': candidate['instance_id']
+ }
+
+ elif candidate['inventory_type'] == 'nsi':
+ return {
+ 'existingNSI': True,
+ 'sharedNSISolution': {
+ 'UUID': model_info['UUID'],
+ 'invariantUUID': model_info['invariantUUID'],
+ 'NSIName': candidate['instance_name'],
+ 'NSIId': candidate['instance_id']
+ }
+ }
+
+ elif candidate['inventory_type'] == 'slice_profiles':
+ return {
+ 'existingNSI': False,
+ 'newNSISolution': {
+ 'slice_profiles': self.get_slice_profiles_from_candidate(candidate, subnets)
+ }
+ }
+
+ def get_slice_profiles_from_candidate(self, candidate, subnets):
+ slice_profiles = []
+ for subnet in subnets:
+ slice_profile = {self.get_profile_attribute(k, subnet): v for k, v in candidate.items()
+ if k.startswith(subnet)}
+ slice_profile['domainType'] = subnet
+ slice_profiles.append(slice_profile)
+ return slice_profiles
+
+ def get_profile_attribute(self, attribute, subnet):
+ snake_to_camel = self.slice_config['attribute_mapping']['snake_to_camel']
+ return snake_to_camel[re.sub(f'^{subnet}_', '', attribute)]
+
+ def process_error_response(self, error_message):
+ """Form response message from the error message
+
+ :param error_message: error message while processing the request
+ :return: response json as dictionary
+ """
+ return {'requestId': self.request_info['requestId'],
+ 'transactionId': self.request_info['transactionId'],
+ 'requestStatus': 'error',
+ 'statusMessage': error_message}
+
+ def get_slice_selection_response(self, solutions):
+ """Get NSI selection response from final solution
+
+ :param solutions: final solutions
+ :return: NSI selection response to send back as dictionary
+ """
+ return {'requestId': self.request_info['requestId'],
+ 'transactionId': self.request_info['transactionId'],
+ 'requestStatus': 'completed',
+ 'statusMessage': '',
+ 'solutions': solutions}