diff options
Diffstat (limited to 'src/onapsdk/clamp/loop_instance.py')
-rw-r--r-- | src/onapsdk/clamp/loop_instance.py | 349 |
1 files changed, 349 insertions, 0 deletions
diff --git a/src/onapsdk/clamp/loop_instance.py b/src/onapsdk/clamp/loop_instance.py new file mode 100644 index 0000000..a72f9d1 --- /dev/null +++ b/src/onapsdk/clamp/loop_instance.py @@ -0,0 +1,349 @@ +"""Control Loop module.""" +# Copyright 2022 Orange, Deutsche Telekom AG +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import json +from pathlib import Path +from jsonschema import validate, ValidationError + +from onapsdk.clamp.clamp_element import Clamp +from onapsdk.utils.jinja import jinja_env +from onapsdk.exceptions import ParameterError + +CLAMP_UPDDATE_REFRESH_TIMER = 60 + +class LoopInstance(Clamp): + """Control Loop instantiation class.""" + + # class variable + _loop_schema = None + operational_policies = "" + + def __init__(self, template: str, name: str, details: dict) -> None: + """ + Initialize loop instance object. + + Args: + template (str): template from which we build the loop + name (str) : loop creation name + details (dict) : dictionnary containing all loop details + + """ + super().__init__() + self.template = template + self.name = "LOOP_" + name + self._details = details + + @property + def details(self) -> dict: + """Return and lazy load the details.""" + if not self._details: + self._update_loop_details() + return self._details + + @details.setter + def details(self, details: dict) -> None: + """Set value for details.""" + self._details = details + + def _update_loop_details(self) -> dict: + """ + Update loop details. + + Returns: + the dictionnary of loop details + + """ + url = f"{self.base_url()}/loop/{self.name}" + loop_details = self.send_message_json('GET', + 'Get loop details', + url) + return loop_details + + def refresh_status(self) -> None: + """Reshresh loop status.""" + url = f"{self.base_url()}/loop/getstatus/{self.name}" + loop_details = self.send_message_json('GET', + 'Get loop status', + url) + + self.details = loop_details + + @property + def loop_schema(self) -> dict: + """ + Return and lazy load the details schema. + + Returns: + schema to be respected to accede to loop details + + """ + if not self._loop_schema: + schema_file = Path.cwd() / 'src' / 'onapsdk' / 'clamp' / 'schema_details.json' + with open(schema_file, "rb") as plan: + self._loop_schema = json.load(plan) + return self._loop_schema + + def validate_details(self) -> bool: + """ + Validate Loop Instance details. + + Returns: + schema validation status (True, False) + + """ + try: + validate(self.details, self.loop_schema) + except ValidationError as error: + self._logger.error(error) + self._logger.error("---------") + self._logger.error(error.absolute_path) + self._logger.error("---------") + self._logger.error(error.absolute_schema_path) + return False + return True + + def create(self) -> None: + """Create instance and load loop details.""" + url = f"{self.base_url()}/loop/create/{self.name}?templateName={self.template}" + instance_details = self.send_message_json('POST', + 'Create Loop Instance', + url) + self.details = instance_details + + def add_operational_policy(self, policy_type: str, policy_version: str) -> None: + """ + Add operational policy to the loop instance. + + Args: + policy_type (str): the full policy model type + policy_version (str): policy version + + Raises: + ParameterError : Corrupt response or a key in a dictionary not found. + It will also be raised when the response contains more operational + policies than there are currently. + + """ + url = (f"{self.base_url()}/loop/addOperationaPolicy/{self.name}/" + f"policyModel/{policy_type}/{policy_version}") + add_response = self.send_message_json('PUT', + 'Create Operational Policy', + url) + + key = "operationalPolicies" + + try: + if self.details[key] is None: + self.details[key] = [] + + response_policies = add_response[key] + current_policies = self.details[key] + except KeyError as exc: + msg = 'Corrupt response, current loop details. Key not found.' + raise ParameterError(msg) from exc + + if len(response_policies) > len(current_policies): + self.details = add_response + else: + raise ParameterError("Couldn't add the operational policy.") + + def remove_operational_policy(self, policy_type: str, policy_version: str) -> None: + """ + Remove operational policy from the loop instance. + + Args: + policy_type (str): the full policy model type + policy_version (str): policy version + + """ + url = (f"{self.base_url()}/loop/removeOperationaPolicy/" + f"{self.name}/policyModel/{policy_type}/{policy_version}") + self.details = self.send_message_json('PUT', + 'Remove Operational Policy', + url) + + def update_microservice_policy(self) -> None: + """ + Update microservice policy configuration. + + Update microservice policy configuration using payload data. + + """ + url = f"{self.base_url()}/loop/updateMicroservicePolicy/{self.name}" + template = jinja_env().get_template("clamp_add_tca_config.json.j2") + microservice_name = self.details["globalPropertiesJson"]["dcaeDeployParameters"]\ + ["uniqueBlueprintParameters"]["policy_id"] + data = template.render(name=microservice_name, + LOOP_name=self.name) + + self.send_message('POST', + 'ADD TCA config', + url, + data=data) + + def extract_operational_policy_name(self, policy_type: str) -> str: + """ + Return operational policy name for a closed loop and a given policy. + + Args: + policy_type (str): the policy acronym. + + Raises: + ParameterError : Couldn't load the operational policy name. + + Returns: + Operational policy name in the loop details after adding a policy. + + """ + for policy in filter(lambda x: x["policyModel"]["policyAcronym"] == policy_type, + self.details["operationalPolicies"]): + return policy["name"] + + raise ParameterError("Couldn't load the operational policy name.") + + def add_drools_conf(self) -> dict: + """Add drools configuration.""" + self.validate_details() + vfmodule_dicts = self.details["modelService"]["resourceDetails"]["VFModule"] + entity_ids = {} + #Get the vf module details + for vfmodule in vfmodule_dicts.values(): + entity_ids["resourceID"] = vfmodule["vfModuleModelName"] + entity_ids["modelInvariantId"] = vfmodule["vfModuleModelInvariantUUID"] + entity_ids["modelVersionId"] = vfmodule["vfModuleModelUUID"] + entity_ids["modelName"] = vfmodule["vfModuleModelName"] + entity_ids["modelVersion"] = vfmodule["vfModuleModelVersion"] + entity_ids["modelCustomizationId"] = vfmodule["vfModuleModelCustomizationUUID"] + template = jinja_env().get_template("clamp_add_drools_policy.json.j2") + data = template.render(name=self.extract_operational_policy_name("Drools"), + resourceID=entity_ids["resourceID"], + modelInvariantId=entity_ids["modelInvariantId"], + modelVersionId=entity_ids["modelVersionId"], + modelName=entity_ids["modelName"], + modelVersion=entity_ids["modelVersion"], + modelCustomizationId=entity_ids["modelCustomizationId"], + LOOP_name=self.name) + return data + + def add_minmax_config(self) -> None: + """Add MinMax operational policy config.""" + #must configure start/end time and min/max instances in json file + template = jinja_env().get_template("clamp_MinMax_config.json.j2") + return template.render(name=self.extract_operational_policy_name("MinMax")) + + def add_frequency_limiter(self, limit: int = 1) -> None: + """Add frequency limiter config.""" + template = jinja_env().get_template("clamp_add_frequency.json.j2") + return template.render(name=self.extract_operational_policy_name("FrequencyLimiter"), + LOOP_name=self.name, + limit=limit) + + def add_op_policy_config(self, func, **kwargs) -> None: + """ + Add operational policy configuration. + + Add operation policy configuration using payload data. + + Args: + func (function): policy configuration function in (add_drools_conf, + add_minmax_config, + add_frequency_limiter) + + """ + data = func(**kwargs) + if not data: + raise ParameterError("Payload data from configuration is None.") + if self.operational_policies: + self.operational_policies = self.operational_policies[:-1] + "," + data = data[1:] + self.operational_policies += data + url = f"{self.base_url()}/loop/updateOperationalPolicies/{self.name}" + self.send_message('POST', + 'ADD operational policy config', + url, + data=self.operational_policies) + + self._logger.info(("Files for op policy config %s have been uploaded to loop's" + "Op policy"), self.name) + + def submit(self): + """Submit policies to policy engine.""" + state = self.details["components"]["POLICY"]["componentState"]["stateName"] + return state == "SENT_AND_DEPLOYED" + + def stop(self): + """Undeploy Policies from policy engine.""" + state = self.details["components"]["POLICY"]["componentState"]["stateName"] + return state == "SENT" + + def restart(self): + """Redeploy policies to policy engine.""" + state = self.details["components"]["POLICY"]["componentState"]["stateName"] + return state == "SENT_AND_DEPLOYED" + + def act_on_loop_policy(self, func) -> bool: + """ + Act on loop's policy. + + Args: + func (function): function of action to be done (submit, stop, restart) + + Returns: + action state : failed or done + + """ + url = f"{self.base_url()}/loop/{func.__name__}/{self.name}" + self.send_message('PUT', + f'{func.__name__} policy', + url) + self.refresh_status() + self.validate_details() + return func() + + def deploy_microservice_to_dcae(self) -> bool: + """ + Execute the deploy operation on the loop instance. + + Returns: + loop deploy on DCAE status (True, False) + + """ + url = f"{self.base_url()}/loop/deploy/{self.name}" + self.send_message('PUT', + 'Deploy microservice to DCAE', + url) + self.validate_details() + state = self.details["components"]["DCAE"]["componentState"]["stateName"] + failure = "MICROSERVICE_INSTALLATION_FAILED" + success = "MICROSERVICE_INSTALLED_SUCCESSFULLY" + while state not in (success, failure): + self.refresh_status() + self.validate_details() + state = self.details["components"]["DCAE"]["componentState"]["stateName"] + return state == success + + def undeploy_microservice_from_dcae(self) -> None: + """Stop the deploy operation.""" + url = f"{self.base_url()}/loop/undeploy/{self.name}" + self.send_message('PUT', + 'Undeploy microservice from DCAE', + url) + + def delete(self) -> None: + """Delete the loop instance.""" + self._logger.debug("Delete %s loop instance", self.name) + url = "{}/loop/delete/{}".format(self.base_url(), self.name) + self.send_message('PUT', + 'Delete loop instance', + url) |