diff options
author | Krzysztof Kuzmicki <krzysztof.kuzmicki@nokia.com> | 2020-11-06 20:44:28 +0000 |
---|---|---|
committer | Gerrit Code Review <gerrit@onap.org> | 2020-11-06 20:44:28 +0000 |
commit | 882e2b34db0ea53de3195303267c17d8809fca15 (patch) | |
tree | e75baa7f93f042fa43e4ee684adbf336b227b80c /src/onaptests/steps | |
parent | eeeb7190de7185c9994e460cc0472e8817ab68aa (diff) | |
parent | 18642f67fcd7268a914fb6632f0f755336e8ba24 (diff) |
Merge "Add CLAMP E2E tests"
Diffstat (limited to 'src/onaptests/steps')
-rw-r--r-- | src/onaptests/steps/loop/clamp.py | 125 | ||||
-rw-r--r-- | src/onaptests/steps/loop/instantiate_loop.py | 87 | ||||
-rw-r--r-- | src/onaptests/steps/onboard/clamp.py | 91 |
3 files changed, 303 insertions, 0 deletions
diff --git a/src/onaptests/steps/loop/clamp.py b/src/onaptests/steps/loop/clamp.py new file mode 100644 index 0000000..e601772 --- /dev/null +++ b/src/onaptests/steps/loop/clamp.py @@ -0,0 +1,125 @@ +#!/usr/bin/python +# +# This program and the accompanying materials +# are made available under the terms of the Apache License, Version 2.0 +# which accompanies this distribution, and is available at +# +# http://www.apache.org/licenses/LICENSE-2.0 +"""Clamp Scenario class.""" +from yaml import load +import time + +from onapsdk.clamp.clamp_element import Clamp +from onapsdk.sdc.service import Service + +from onapsdk.configuration import settings +from onaptests.steps.onboard.clamp import OnboardClampStep +from onaptests.steps.loop.instantiate_loop import InstantiateLoop + +from ..base import YamlTemplateBaseStep + + +class ClampStep(YamlTemplateBaseStep): + """class defining the different CLAMP scenarios.""" + + count: int = 0 + + def __init__(self, cleanup=False): + super().__init__(cleanup=cleanup) + self._yaml_template: dict = None + self.add_step(OnboardClampStep(cleanup=cleanup)) + Clamp(cert=settings.CERT) + self.loop_instance = None + + @property + def yaml_template(self) -> dict: + """Step YAML template. + + Load from file if it's a root step, get from parent otherwise. + + Returns: + dict: Step YAML template + + """ + if self.is_root: + if not self._yaml_template: + with open(settings.SERVICE_YAML_TEMPLATE, "r") as yaml_template: + self._yaml_template: dict = load(yaml_template) + return self._yaml_template + return self.parent.yaml_template + + @property + def service_name(self) -> str: + """Service name. + + Get from YAML template if it's a root step, get from parent otherwise. + + Returns: + str: Service name + + """ + if self.is_root: + return next(iter(self.yaml_template.keys())) + else: + return self.parent.service_name + + + def check(self, operational_policies: list, is_template: bool = False) -> str: + """Check CLAMP requirements to create a loop.""" + self._logger.info("Check operational policy") + for policy in operational_policies: + exist = Clamp.check_policies(policy_name=policy["name"], + req_policies=30)# 30 required policy + self._logger.info("Operational policy found.") + if not exist: + raise ValueError("Couldn't load the policy %s", policy) + # retrieve the service..based on service name + service: Service = Service(self.service_name) + if is_template: + loop_template = Clamp.check_loop_template(service=service) + self._logger.info("Loop template checked.") + return loop_template + + def instantiate_clamp(self, loop_template: str, loop_name: str, operational_policies: list): + """Instantite a closed loopin CLAMP.""" + loop = InstantiateLoop(template=loop_template, + loop_name=loop_name, + operational_policies=operational_policies, + cert=settings.CERT) + return loop.instantiate_loop() + + def loop_counter(self, action: str) -> None: + """ Count number of loop instances.""" + if action == "plus": + self.count += 1 + if action == "minus": + self.count -= 1 + + @YamlTemplateBaseStep.store_state + def execute(self): + super().execute() # TODO work only the 1st time, not if already onboarded + # time to wait for template load in CLAMP + self._logger.info("Wait a little bit to give a chance to the distribution") + time.sleep(settings.CLAMP_DISTRIBUTION_TIMER) + # Test 1 + operational_policies = settings.OPERATIONAL_POLICIES + loop_template = self.check(operational_policies, True) + # Test 2 + loop_name = "instance_" + self.service_name + str(self.count) + self.loop_counter(action="plus") + self.loop_instance = self.instantiate_clamp( + loop_template=loop_template, + loop_name=loop_name, + operational_policies=operational_policies) + + def cleanup(self) -> None: + """Cleanup Service. + + Raises: + Exception: Clamp cleaning failed + + """ + self.loop_counter(action="minus") + self.loop_instance.undeploy_microservice_from_dcae() + self.loop_instance.delete() + super().cleanup() diff --git a/src/onaptests/steps/loop/instantiate_loop.py b/src/onaptests/steps/loop/instantiate_loop.py new file mode 100644 index 0000000..bd34a04 --- /dev/null +++ b/src/onaptests/steps/loop/instantiate_loop.py @@ -0,0 +1,87 @@ +#!/usr/bin/python +# http://www.apache.org/licenses/LICENSE-2.0 +"""Instantiation class.""" +import logging +import logging.config + +from onapsdk.clamp.loop_instance import LoopInstance +from onapsdk.configuration import settings + + +class InstantiateLoop(): + """class instantiating a closed loop in clamp.""" + + def __init__(self, template: str, loop_name: str, operational_policies: list, cert: tuple): + self.template=template + self.loop_name=loop_name + self.operational_policies=operational_policies + self.cert=cert + + self._logger: logging.Logger = logging.getLogger("") + logging.config.dictConfig(settings.LOG_CONFIG) + + def add_policies(self, loop: LoopInstance) -> None: + """add necessary wanted operational policies.""" + for policy in self.operational_policies: + self._logger.info("******** ADD OPERATIONAL POLICY %s *******", policy["name"]) + added = loop.add_operational_policy(policy_type=policy["policy_type"], + policy_version=policy["policy_version"]) + if not added: + self._logger.error("an error occured while adding an operational policy") + self._logger.info("ADD OPERATION SUCCESSFULY DONE") + + + def configure_policies(self, loop: LoopInstance) -> None: + """Configure all policies.""" + self._logger.info("******** UPDATE MICROSERVICE POLICY *******") + loop._update_loop_details() + loop.update_microservice_policy() + self._logger.info("******** UPDATE OPERATIONAL POLICIES CONFIG *******") + for policy in self.operational_policies: + #loop.add_op_policy_config(loop.LoopInstance.__dict__[policy["config_function"]]) + #possible configurations for the moment + if policy["name"] == "MinMax": + loop.add_op_policy_config(loop.add_minmax_config) + if policy["name"] == "Drools": + loop.add_op_policy_config(loop.add_drools_conf) + if policy["name"] == "FrequencyLimiter": + loop.add_op_policy_config(loop.add_frequency_limiter) + self._logger.info("Policies are well configured") + + def deploy(self, loop: LoopInstance) -> None: + """Deploy closed loop.""" + self._logger.info("******** SUBMIT POLICIES TO PE *******") + submit = loop.act_on_loop_policy(loop.submit) + self._logger.info("******** CHECK POLICIES SUBMITION *******") + if submit : + self._logger.info("Policies successfully submited to PE") + + else: + self._logger.error("An error occured while submitting the loop instance") + exit(1) + self._logger.info("******** DEPLOY LOOP INSTANCE *******") + deploy = loop.deploy_microservice_to_dcae() + if deploy: + self._logger.info("Loop instance %s successfully deployed on DCAE !!", self.loop_name) + else: + self._logger.error("An error occured while deploying the loop instance") + exit(1) + + def instantiate_loop(self): + """Instantiate the control loop.""" + loop = LoopInstance(template=self.template, + name=self.loop_name, + details={}, + cert=self.cert) + details = loop.create() + if details: + self._logger.info("Loop instance %s successfully created !!", self.loop_name) + else: + self._logger.error("An error occured while creating the loop instance") + + self.add_policies(loop=loop) + self.configure_policies(loop=loop) + self.deploy(loop=loop) + + loop.details = loop._update_loop_details() + return loop diff --git a/src/onaptests/steps/onboard/clamp.py b/src/onaptests/steps/onboard/clamp.py new file mode 100644 index 0000000..8f6b6bf --- /dev/null +++ b/src/onaptests/steps/onboard/clamp.py @@ -0,0 +1,91 @@ +#!/usr/bin/python +# http://www.apache.org/licenses/LICENSE-2.0 +"""Clamp Onboard service class.""" +from yaml import load +from onapsdk.sdc.service import Service +from onapsdk.sdc.vf import Vf + +from onapsdk.configuration import settings + +from ..base import YamlTemplateBaseStep +from .service import YamlTemplateVfOnboardStep + +class OnboardClampStep(YamlTemplateBaseStep): + """Onboard class to create CLAMP templates.""" + + def __init__(self, cleanup=False): + """Initialize Clamp Onboard object.""" + super().__init__(cleanup=cleanup) + self._yaml_template: dict = None + self.add_step(YamlTemplateVfOnboardStep(cleanup=cleanup)) + # if "service_name" in kwargs: + # self.service_name = kwargs['service_name'] + # else: + # raise ValueError("Service Name to define") + # self.vf_list = [] + # self.vsp_list = [] + # self.set_logger() + + @property + def yaml_template(self) -> dict: + """Step YAML template. + + Load from file if it's a root step, get from parent otherwise. + + Returns: + dict: Step YAML template + + """ + if self.is_root: + if not self._yaml_template: + with open(settings.SERVICE_YAML_TEMPLATE, "r") as yaml_template: + self._yaml_template: dict = load(yaml_template) + return self._yaml_template + return self.parent.yaml_template + + @property + def service_name(self) -> str: + """Service name. + + Get from YAML template if it's a root step, get from parent otherwise. + + Returns: + str: Service name + + """ + if self.is_root: + return next(iter(self.yaml_template.keys())) + else: + return self.parent.service_name + + @YamlTemplateBaseStep.store_state + def execute(self): + """Onboard service.""" + super().execute() + # retrieve the Vf + vf = None + for sdc_vf in Vf.get_all(): + if sdc_vf.name == settings.VF_NAME: + vf = sdc_vf + self._logger.debug("Vf retrieved %s", vf) + + service: Service = Service(name=self.service_name, + resources=[vf]) + service.create() + self._logger.info(" Service %s created", service) + + service.add_resource(vf) + + # we add the artifact to the first VNF + self._logger.info("Try to add blueprint to %s", vf.name) + payload_file = open(settings.CONFIGURATION_PATH + 'clampnode.yaml', 'rb') + data = payload_file.read() + self._logger.info("DCAE INVENTORY BLUEPRINT file retrieved") + service.add_artifact_to_vf(vnf_name=vf.name, + artifact_type="DCAE_INVENTORY_BLUEPRINT", + artifact_name="clampnode.yaml", + artifact=data) + payload_file.close() + service.checkin() + service.onboard() + self._logger.info("DCAE INVENTORY BLUEPRINT ADDED") |