diff options
Diffstat (limited to 'src/onaptests/steps/loop')
-rw-r--r-- | src/onaptests/steps/loop/clamp.py | 125 | ||||
-rw-r--r-- | src/onaptests/steps/loop/instantiate_loop.py | 87 |
2 files changed, 212 insertions, 0 deletions
diff --git a/src/onaptests/steps/loop/clamp.py b/src/onaptests/steps/loop/clamp.py new file mode 100644 index 0000000..e601772 --- /dev/null +++ b/src/onaptests/steps/loop/clamp.py @@ -0,0 +1,125 @@ +#!/usr/bin/python +# +# This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# +# http://www.apache.org/licenses/LICENSE-2.0 +"""Clamp Scenario class.""" +from yaml import load +import time + +from onapsdk.clamp.clamp_element import Clamp +from onapsdk.sdc.service import Service + +from onapsdk.configuration import settings +from onaptests.steps.onboard.clamp import OnboardClampStep +from onaptests.steps.loop.instantiate_loop import InstantiateLoop + +from ..base import YamlTemplateBaseStep + + +class ClampStep(YamlTemplateBaseStep): + """class defining the different CLAMP scenarios.""" + + count: int = 0 + + def __init__(self, cleanup=False): + super().__init__(cleanup=cleanup) + self._yaml_template: dict = None + self.add_step(OnboardClampStep(cleanup=cleanup)) + Clamp(cert=settings.CERT) + self.loop_instance = None + + @property + def yaml_template(self) -> dict: + """Step YAML template. + + Load from file if it's a root step, get from parent otherwise. + + Returns: + dict: Step YAML template + + """ + if self.is_root: + if not self._yaml_template: + with open(settings.SERVICE_YAML_TEMPLATE, "r") as yaml_template: + self._yaml_template: dict = load(yaml_template) + return self._yaml_template + return self.parent.yaml_template + + @property + def service_name(self) -> str: + """Service name. + + Get from YAML template if it's a root step, get from parent otherwise. + + Returns: + str: Service name + + """ + if self.is_root: + return next(iter(self.yaml_template.keys())) + else: + return self.parent.service_name + + + def check(self, operational_policies: list, is_template: bool = False) -> str: + """Check CLAMP requirements to create a loop.""" + self._logger.info("Check operational policy") + for policy in operational_policies: + exist = Clamp.check_policies(policy_name=policy["name"], + req_policies=30)# 30 required policy + self._logger.info("Operational policy found.") + if not exist: + raise ValueError("Couldn't load the policy %s", policy) + # retrieve the service..based on service name + service: Service = Service(self.service_name) + if is_template: + loop_template = Clamp.check_loop_template(service=service) + self._logger.info("Loop template checked.") + return loop_template + + def instantiate_clamp(self, loop_template: str, loop_name: str, operational_policies: list): + """Instantite a closed loopin CLAMP.""" + loop = InstantiateLoop(template=loop_template, + loop_name=loop_name, + operational_policies=operational_policies, + cert=settings.CERT) + return loop.instantiate_loop() + + def loop_counter(self, action: str) -> None: + """ Count number of loop instances.""" + if action == "plus": + self.count += 1 + if action == "minus": + self.count -= 1 + + @YamlTemplateBaseStep.store_state + def execute(self): + super().execute() # TODO work only the 1st time, not if already onboarded + # time to wait for template load in CLAMP + self._logger.info("Wait a little bit to give a chance to the distribution") + time.sleep(settings.CLAMP_DISTRIBUTION_TIMER) + # Test 1 + operational_policies = settings.OPERATIONAL_POLICIES + loop_template = self.check(operational_policies, True) + # Test 2 + loop_name = "instance_" + self.service_name + str(self.count) + self.loop_counter(action="plus") + self.loop_instance = self.instantiate_clamp( + loop_template=loop_template, + loop_name=loop_name, + operational_policies=operational_policies) + + def cleanup(self) -> None: + """Cleanup Service. + + Raises: + Exception: Clamp cleaning failed + + """ + self.loop_counter(action="minus") + self.loop_instance.undeploy_microservice_from_dcae() + self.loop_instance.delete() + super().cleanup() diff --git a/src/onaptests/steps/loop/instantiate_loop.py b/src/onaptests/steps/loop/instantiate_loop.py new file mode 100644 index 0000000..bd34a04 --- /dev/null +++ b/src/onaptests/steps/loop/instantiate_loop.py @@ -0,0 +1,87 @@ +#!/usr/bin/python +# http://www.apache.org/licenses/LICENSE-2.0 +"""Instantiation class.""" +import logging +import logging.config + +from onapsdk.clamp.loop_instance import LoopInstance +from onapsdk.configuration import settings + + +class InstantiateLoop(): + """class instantiating a closed loop in clamp.""" + + def __init__(self, template: str, loop_name: str, operational_policies: list, cert: tuple): + self.template=template + self.loop_name=loop_name + self.operational_policies=operational_policies + self.cert=cert + + self._logger: logging.Logger = logging.getLogger("") + logging.config.dictConfig(settings.LOG_CONFIG) + + def add_policies(self, loop: LoopInstance) -> None: + """add necessary wanted operational policies.""" + for policy in self.operational_policies: + self._logger.info("******** ADD OPERATIONAL POLICY %s *******", policy["name"]) + added = loop.add_operational_policy(policy_type=policy["policy_type"], + policy_version=policy["policy_version"]) + if not added: + self._logger.error("an error occured while adding an operational policy") + self._logger.info("ADD OPERATION SUCCESSFULY DONE") + + + def configure_policies(self, loop: LoopInstance) -> None: + """Configure all policies.""" + self._logger.info("******** UPDATE MICROSERVICE POLICY *******") + loop._update_loop_details() + loop.update_microservice_policy() + self._logger.info("******** UPDATE OPERATIONAL POLICIES CONFIG *******") + for policy in self.operational_policies: + #loop.add_op_policy_config(loop.LoopInstance.__dict__[policy["config_function"]]) + #possible configurations for the moment + if policy["name"] == "MinMax": + loop.add_op_policy_config(loop.add_minmax_config) + if policy["name"] == "Drools": + loop.add_op_policy_config(loop.add_drools_conf) + if policy["name"] == "FrequencyLimiter": + loop.add_op_policy_config(loop.add_frequency_limiter) + self._logger.info("Policies are well configured") + + def deploy(self, loop: LoopInstance) -> None: + """Deploy closed loop.""" + self._logger.info("******** SUBMIT POLICIES TO PE *******") + submit = loop.act_on_loop_policy(loop.submit) + self._logger.info("******** CHECK POLICIES SUBMITION *******") + if submit : + self._logger.info("Policies successfully submited to PE") + + else: + self._logger.error("An error occured while submitting the loop instance") + exit(1) + self._logger.info("******** DEPLOY LOOP INSTANCE *******") + deploy = loop.deploy_microservice_to_dcae() + if deploy: + self._logger.info("Loop instance %s successfully deployed on DCAE !!", self.loop_name) + else: + self._logger.error("An error occured while deploying the loop instance") + exit(1) + + def instantiate_loop(self): + """Instantiate the control loop.""" + loop = LoopInstance(template=self.template, + name=self.loop_name, + details={}, + cert=self.cert) + details = loop.create() + if details: + self._logger.info("Loop instance %s successfully created !!", self.loop_name) + else: + self._logger.error("An error occured while creating the loop instance") + + self.add_policies(loop=loop) + self.configure_policies(loop=loop) + self.deploy(loop=loop) + + loop.details = loop._update_loop_details() + return loop |