aboutsummaryrefslogtreecommitdiffstats
path: root/src/onapsdk/clamp/loop_instance.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/onapsdk/clamp/loop_instance.py')
-rw-r--r--src/onapsdk/clamp/loop_instance.py349
1 files changed, 349 insertions, 0 deletions
diff --git a/src/onapsdk/clamp/loop_instance.py b/src/onapsdk/clamp/loop_instance.py
new file mode 100644
index 0000000..a72f9d1
--- /dev/null
+++ b/src/onapsdk/clamp/loop_instance.py
@@ -0,0 +1,349 @@
+"""Control Loop module."""
+# Copyright 2022 Orange, Deutsche Telekom AG
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import json
+from pathlib import Path
+from jsonschema import validate, ValidationError
+
+from onapsdk.clamp.clamp_element import Clamp
+from onapsdk.utils.jinja import jinja_env
+from onapsdk.exceptions import ParameterError
+
+CLAMP_UPDDATE_REFRESH_TIMER = 60
+
+class LoopInstance(Clamp):
+ """Control Loop instantiation class."""
+
+ # class variable
+ _loop_schema = None
+ operational_policies = ""
+
+ def __init__(self, template: str, name: str, details: dict) -> None:
+ """
+ Initialize loop instance object.
+
+ Args:
+ template (str): template from which we build the loop
+ name (str) : loop creation name
+ details (dict) : dictionnary containing all loop details
+
+ """
+ super().__init__()
+ self.template = template
+ self.name = "LOOP_" + name
+ self._details = details
+
+ @property
+ def details(self) -> dict:
+ """Return and lazy load the details."""
+ if not self._details:
+ self._update_loop_details()
+ return self._details
+
+ @details.setter
+ def details(self, details: dict) -> None:
+ """Set value for details."""
+ self._details = details
+
+ def _update_loop_details(self) -> dict:
+ """
+ Update loop details.
+
+ Returns:
+ the dictionnary of loop details
+
+ """
+ url = f"{self.base_url()}/loop/{self.name}"
+ loop_details = self.send_message_json('GET',
+ 'Get loop details',
+ url)
+ return loop_details
+
+ def refresh_status(self) -> None:
+ """Reshresh loop status."""
+ url = f"{self.base_url()}/loop/getstatus/{self.name}"
+ loop_details = self.send_message_json('GET',
+ 'Get loop status',
+ url)
+
+ self.details = loop_details
+
+ @property
+ def loop_schema(self) -> dict:
+ """
+ Return and lazy load the details schema.
+
+ Returns:
+ schema to be respected to accede to loop details
+
+ """
+ if not self._loop_schema:
+ schema_file = Path.cwd() / 'src' / 'onapsdk' / 'clamp' / 'schema_details.json'
+ with open(schema_file, "rb") as plan:
+ self._loop_schema = json.load(plan)
+ return self._loop_schema
+
+ def validate_details(self) -> bool:
+ """
+ Validate Loop Instance details.
+
+ Returns:
+ schema validation status (True, False)
+
+ """
+ try:
+ validate(self.details, self.loop_schema)
+ except ValidationError as error:
+ self._logger.error(error)
+ self._logger.error("---------")
+ self._logger.error(error.absolute_path)
+ self._logger.error("---------")
+ self._logger.error(error.absolute_schema_path)
+ return False
+ return True
+
+ def create(self) -> None:
+ """Create instance and load loop details."""
+ url = f"{self.base_url()}/loop/create/{self.name}?templateName={self.template}"
+ instance_details = self.send_message_json('POST',
+ 'Create Loop Instance',
+ url)
+ self.details = instance_details
+
+ def add_operational_policy(self, policy_type: str, policy_version: str) -> None:
+ """
+ Add operational policy to the loop instance.
+
+ Args:
+ policy_type (str): the full policy model type
+ policy_version (str): policy version
+
+ Raises:
+ ParameterError : Corrupt response or a key in a dictionary not found.
+ It will also be raised when the response contains more operational
+ policies than there are currently.
+
+ """
+ url = (f"{self.base_url()}/loop/addOperationaPolicy/{self.name}/"
+ f"policyModel/{policy_type}/{policy_version}")
+ add_response = self.send_message_json('PUT',
+ 'Create Operational Policy',
+ url)
+
+ key = "operationalPolicies"
+
+ try:
+ if self.details[key] is None:
+ self.details[key] = []
+
+ response_policies = add_response[key]
+ current_policies = self.details[key]
+ except KeyError as exc:
+ msg = 'Corrupt response, current loop details. Key not found.'
+ raise ParameterError(msg) from exc
+
+ if len(response_policies) > len(current_policies):
+ self.details = add_response
+ else:
+ raise ParameterError("Couldn't add the operational policy.")
+
+ def remove_operational_policy(self, policy_type: str, policy_version: str) -> None:
+ """
+ Remove operational policy from the loop instance.
+
+ Args:
+ policy_type (str): the full policy model type
+ policy_version (str): policy version
+
+ """
+ url = (f"{self.base_url()}/loop/removeOperationaPolicy/"
+ f"{self.name}/policyModel/{policy_type}/{policy_version}")
+ self.details = self.send_message_json('PUT',
+ 'Remove Operational Policy',
+ url)
+
+ def update_microservice_policy(self) -> None:
+ """
+ Update microservice policy configuration.
+
+ Update microservice policy configuration using payload data.
+
+ """
+ url = f"{self.base_url()}/loop/updateMicroservicePolicy/{self.name}"
+ template = jinja_env().get_template("clamp_add_tca_config.json.j2")
+ microservice_name = self.details["globalPropertiesJson"]["dcaeDeployParameters"]\
+ ["uniqueBlueprintParameters"]["policy_id"]
+ data = template.render(name=microservice_name,
+ LOOP_name=self.name)
+
+ self.send_message('POST',
+ 'ADD TCA config',
+ url,
+ data=data)
+
+ def extract_operational_policy_name(self, policy_type: str) -> str:
+ """
+ Return operational policy name for a closed loop and a given policy.
+
+ Args:
+ policy_type (str): the policy acronym.
+
+ Raises:
+ ParameterError : Couldn't load the operational policy name.
+
+ Returns:
+ Operational policy name in the loop details after adding a policy.
+
+ """
+ for policy in filter(lambda x: x["policyModel"]["policyAcronym"] == policy_type,
+ self.details["operationalPolicies"]):
+ return policy["name"]
+
+ raise ParameterError("Couldn't load the operational policy name.")
+
+ def add_drools_conf(self) -> dict:
+ """Add drools configuration."""
+ self.validate_details()
+ vfmodule_dicts = self.details["modelService"]["resourceDetails"]["VFModule"]
+ entity_ids = {}
+ #Get the vf module details
+ for vfmodule in vfmodule_dicts.values():
+ entity_ids["resourceID"] = vfmodule["vfModuleModelName"]
+ entity_ids["modelInvariantId"] = vfmodule["vfModuleModelInvariantUUID"]
+ entity_ids["modelVersionId"] = vfmodule["vfModuleModelUUID"]
+ entity_ids["modelName"] = vfmodule["vfModuleModelName"]
+ entity_ids["modelVersion"] = vfmodule["vfModuleModelVersion"]
+ entity_ids["modelCustomizationId"] = vfmodule["vfModuleModelCustomizationUUID"]
+ template = jinja_env().get_template("clamp_add_drools_policy.json.j2")
+ data = template.render(name=self.extract_operational_policy_name("Drools"),
+ resourceID=entity_ids["resourceID"],
+ modelInvariantId=entity_ids["modelInvariantId"],
+ modelVersionId=entity_ids["modelVersionId"],
+ modelName=entity_ids["modelName"],
+ modelVersion=entity_ids["modelVersion"],
+ modelCustomizationId=entity_ids["modelCustomizationId"],
+ LOOP_name=self.name)
+ return data
+
+ def add_minmax_config(self) -> None:
+ """Add MinMax operational policy config."""
+ #must configure start/end time and min/max instances in json file
+ template = jinja_env().get_template("clamp_MinMax_config.json.j2")
+ return template.render(name=self.extract_operational_policy_name("MinMax"))
+
+ def add_frequency_limiter(self, limit: int = 1) -> None:
+ """Add frequency limiter config."""
+ template = jinja_env().get_template("clamp_add_frequency.json.j2")
+ return template.render(name=self.extract_operational_policy_name("FrequencyLimiter"),
+ LOOP_name=self.name,
+ limit=limit)
+
+ def add_op_policy_config(self, func, **kwargs) -> None:
+ """
+ Add operational policy configuration.
+
+ Add operation policy configuration using payload data.
+
+ Args:
+ func (function): policy configuration function in (add_drools_conf,
+ add_minmax_config,
+ add_frequency_limiter)
+
+ """
+ data = func(**kwargs)
+ if not data:
+ raise ParameterError("Payload data from configuration is None.")
+ if self.operational_policies:
+ self.operational_policies = self.operational_policies[:-1] + ","
+ data = data[1:]
+ self.operational_policies += data
+ url = f"{self.base_url()}/loop/updateOperationalPolicies/{self.name}"
+ self.send_message('POST',
+ 'ADD operational policy config',
+ url,
+ data=self.operational_policies)
+
+ self._logger.info(("Files for op policy config %s have been uploaded to loop's"
+ "Op policy"), self.name)
+
+ def submit(self):
+ """Submit policies to policy engine."""
+ state = self.details["components"]["POLICY"]["componentState"]["stateName"]
+ return state == "SENT_AND_DEPLOYED"
+
+ def stop(self):
+ """Undeploy Policies from policy engine."""
+ state = self.details["components"]["POLICY"]["componentState"]["stateName"]
+ return state == "SENT"
+
+ def restart(self):
+ """Redeploy policies to policy engine."""
+ state = self.details["components"]["POLICY"]["componentState"]["stateName"]
+ return state == "SENT_AND_DEPLOYED"
+
+ def act_on_loop_policy(self, func) -> bool:
+ """
+ Act on loop's policy.
+
+ Args:
+ func (function): function of action to be done (submit, stop, restart)
+
+ Returns:
+ action state : failed or done
+
+ """
+ url = f"{self.base_url()}/loop/{func.__name__}/{self.name}"
+ self.send_message('PUT',
+ f'{func.__name__} policy',
+ url)
+ self.refresh_status()
+ self.validate_details()
+ return func()
+
+ def deploy_microservice_to_dcae(self) -> bool:
+ """
+ Execute the deploy operation on the loop instance.
+
+ Returns:
+ loop deploy on DCAE status (True, False)
+
+ """
+ url = f"{self.base_url()}/loop/deploy/{self.name}"
+ self.send_message('PUT',
+ 'Deploy microservice to DCAE',
+ url)
+ self.validate_details()
+ state = self.details["components"]["DCAE"]["componentState"]["stateName"]
+ failure = "MICROSERVICE_INSTALLATION_FAILED"
+ success = "MICROSERVICE_INSTALLED_SUCCESSFULLY"
+ while state not in (success, failure):
+ self.refresh_status()
+ self.validate_details()
+ state = self.details["components"]["DCAE"]["componentState"]["stateName"]
+ return state == success
+
+ def undeploy_microservice_from_dcae(self) -> None:
+ """Stop the deploy operation."""
+ url = f"{self.base_url()}/loop/undeploy/{self.name}"
+ self.send_message('PUT',
+ 'Undeploy microservice from DCAE',
+ url)
+
+ def delete(self) -> None:
+ """Delete the loop instance."""
+ self._logger.debug("Delete %s loop instance", self.name)
+ url = "{}/loop/delete/{}".format(self.base_url(), self.name)
+ self.send_message('PUT',
+ 'Delete loop instance',
+ url)