aboutsummaryrefslogtreecommitdiffstats
path: root/src/onaptests/steps
diff options
context:
space:
mode:
authoramine ezziati <aymane.ezziati@gmail.com>2020-07-24 09:08:42 +0200
committermrichomme <morgan.richomme@orange.com>2020-11-06 17:35:33 +0100
commit18642f67fcd7268a914fb6632f0f755336e8ba24 (patch)
treef4d3bd61d3e8158ebf8bd337cdc07a46c6172eb6 /src/onaptests/steps
parent31dc9d07a9bd5c98304ae7d58e995321d3e1507f (diff)
Add CLAMP E2E tests
Issue-ID: TEST-240 Signed-off-by: amine ezziati <mohamedamine.ezziati@orange.com> Change-Id: Ib612f1670aca10d9ac9f15fadb9cab58a7d8e5a8 Signed-off-by: mrichomme <morgan.richomme@orange.com>
Diffstat (limited to 'src/onaptests/steps')
-rw-r--r--src/onaptests/steps/loop/clamp.py125
-rw-r--r--src/onaptests/steps/loop/instantiate_loop.py87
-rw-r--r--src/onaptests/steps/onboard/clamp.py91
3 files changed, 303 insertions, 0 deletions
diff --git a/src/onaptests/steps/loop/clamp.py b/src/onaptests/steps/loop/clamp.py
new file mode 100644
index 0000000..e601772
--- /dev/null
+++ b/src/onaptests/steps/loop/clamp.py
@@ -0,0 +1,125 @@
+#!/usr/bin/python
+#
+# This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+"""Clamp Scenario class."""
+from yaml import load
+import time
+
+from onapsdk.clamp.clamp_element import Clamp
+from onapsdk.sdc.service import Service
+
+from onapsdk.configuration import settings
+from onaptests.steps.onboard.clamp import OnboardClampStep
+from onaptests.steps.loop.instantiate_loop import InstantiateLoop
+
+from ..base import YamlTemplateBaseStep
+
+
+class ClampStep(YamlTemplateBaseStep):
+ """class defining the different CLAMP scenarios."""
+
+ count: int = 0
+
+ def __init__(self, cleanup=False):
+ super().__init__(cleanup=cleanup)
+ self._yaml_template: dict = None
+ self.add_step(OnboardClampStep(cleanup=cleanup))
+ Clamp(cert=settings.CERT)
+ self.loop_instance = None
+
+ @property
+ def yaml_template(self) -> dict:
+ """Step YAML template.
+
+ Load from file if it's a root step, get from parent otherwise.
+
+ Returns:
+ dict: Step YAML template
+
+ """
+ if self.is_root:
+ if not self._yaml_template:
+ with open(settings.SERVICE_YAML_TEMPLATE, "r") as yaml_template:
+ self._yaml_template: dict = load(yaml_template)
+ return self._yaml_template
+ return self.parent.yaml_template
+
+ @property
+ def service_name(self) -> str:
+ """Service name.
+
+ Get from YAML template if it's a root step, get from parent otherwise.
+
+ Returns:
+ str: Service name
+
+ """
+ if self.is_root:
+ return next(iter(self.yaml_template.keys()))
+ else:
+ return self.parent.service_name
+
+
+ def check(self, operational_policies: list, is_template: bool = False) -> str:
+ """Check CLAMP requirements to create a loop."""
+ self._logger.info("Check operational policy")
+ for policy in operational_policies:
+ exist = Clamp.check_policies(policy_name=policy["name"],
+ req_policies=30)# 30 required policy
+ self._logger.info("Operational policy found.")
+ if not exist:
+ raise ValueError("Couldn't load the policy %s", policy)
+ # retrieve the service..based on service name
+ service: Service = Service(self.service_name)
+ if is_template:
+ loop_template = Clamp.check_loop_template(service=service)
+ self._logger.info("Loop template checked.")
+ return loop_template
+
+ def instantiate_clamp(self, loop_template: str, loop_name: str, operational_policies: list):
+ """Instantite a closed loopin CLAMP."""
+ loop = InstantiateLoop(template=loop_template,
+ loop_name=loop_name,
+ operational_policies=operational_policies,
+ cert=settings.CERT)
+ return loop.instantiate_loop()
+
+ def loop_counter(self, action: str) -> None:
+ """ Count number of loop instances."""
+ if action == "plus":
+ self.count += 1
+ if action == "minus":
+ self.count -= 1
+
+ @YamlTemplateBaseStep.store_state
+ def execute(self):
+ super().execute() # TODO work only the 1st time, not if already onboarded
+ # time to wait for template load in CLAMP
+ self._logger.info("Wait a little bit to give a chance to the distribution")
+ time.sleep(settings.CLAMP_DISTRIBUTION_TIMER)
+ # Test 1
+ operational_policies = settings.OPERATIONAL_POLICIES
+ loop_template = self.check(operational_policies, True)
+ # Test 2
+ loop_name = "instance_" + self.service_name + str(self.count)
+ self.loop_counter(action="plus")
+ self.loop_instance = self.instantiate_clamp(
+ loop_template=loop_template,
+ loop_name=loop_name,
+ operational_policies=operational_policies)
+
+ def cleanup(self) -> None:
+ """Cleanup Service.
+
+ Raises:
+ Exception: Clamp cleaning failed
+
+ """
+ self.loop_counter(action="minus")
+ self.loop_instance.undeploy_microservice_from_dcae()
+ self.loop_instance.delete()
+ super().cleanup()
diff --git a/src/onaptests/steps/loop/instantiate_loop.py b/src/onaptests/steps/loop/instantiate_loop.py
new file mode 100644
index 0000000..bd34a04
--- /dev/null
+++ b/src/onaptests/steps/loop/instantiate_loop.py
@@ -0,0 +1,87 @@
+#!/usr/bin/python
+# http://www.apache.org/licenses/LICENSE-2.0
+"""Instantiation class."""
+import logging
+import logging.config
+
+from onapsdk.clamp.loop_instance import LoopInstance
+from onapsdk.configuration import settings
+
+
+class InstantiateLoop():
+ """class instantiating a closed loop in clamp."""
+
+ def __init__(self, template: str, loop_name: str, operational_policies: list, cert: tuple):
+ self.template=template
+ self.loop_name=loop_name
+ self.operational_policies=operational_policies
+ self.cert=cert
+
+ self._logger: logging.Logger = logging.getLogger("")
+ logging.config.dictConfig(settings.LOG_CONFIG)
+
+ def add_policies(self, loop: LoopInstance) -> None:
+ """add necessary wanted operational policies."""
+ for policy in self.operational_policies:
+ self._logger.info("******** ADD OPERATIONAL POLICY %s *******", policy["name"])
+ added = loop.add_operational_policy(policy_type=policy["policy_type"],
+ policy_version=policy["policy_version"])
+ if not added:
+ self._logger.error("an error occured while adding an operational policy")
+ self._logger.info("ADD OPERATION SUCCESSFULY DONE")
+
+
+ def configure_policies(self, loop: LoopInstance) -> None:
+ """Configure all policies."""
+ self._logger.info("******** UPDATE MICROSERVICE POLICY *******")
+ loop._update_loop_details()
+ loop.update_microservice_policy()
+ self._logger.info("******** UPDATE OPERATIONAL POLICIES CONFIG *******")
+ for policy in self.operational_policies:
+ #loop.add_op_policy_config(loop.LoopInstance.__dict__[policy["config_function"]])
+ #possible configurations for the moment
+ if policy["name"] == "MinMax":
+ loop.add_op_policy_config(loop.add_minmax_config)
+ if policy["name"] == "Drools":
+ loop.add_op_policy_config(loop.add_drools_conf)
+ if policy["name"] == "FrequencyLimiter":
+ loop.add_op_policy_config(loop.add_frequency_limiter)
+ self._logger.info("Policies are well configured")
+
+ def deploy(self, loop: LoopInstance) -> None:
+ """Deploy closed loop."""
+ self._logger.info("******** SUBMIT POLICIES TO PE *******")
+ submit = loop.act_on_loop_policy(loop.submit)
+ self._logger.info("******** CHECK POLICIES SUBMITION *******")
+ if submit :
+ self._logger.info("Policies successfully submited to PE")
+
+ else:
+ self._logger.error("An error occured while submitting the loop instance")
+ exit(1)
+ self._logger.info("******** DEPLOY LOOP INSTANCE *******")
+ deploy = loop.deploy_microservice_to_dcae()
+ if deploy:
+ self._logger.info("Loop instance %s successfully deployed on DCAE !!", self.loop_name)
+ else:
+ self._logger.error("An error occured while deploying the loop instance")
+ exit(1)
+
+ def instantiate_loop(self):
+ """Instantiate the control loop."""
+ loop = LoopInstance(template=self.template,
+ name=self.loop_name,
+ details={},
+ cert=self.cert)
+ details = loop.create()
+ if details:
+ self._logger.info("Loop instance %s successfully created !!", self.loop_name)
+ else:
+ self._logger.error("An error occured while creating the loop instance")
+
+ self.add_policies(loop=loop)
+ self.configure_policies(loop=loop)
+ self.deploy(loop=loop)
+
+ loop.details = loop._update_loop_details()
+ return loop
diff --git a/src/onaptests/steps/onboard/clamp.py b/src/onaptests/steps/onboard/clamp.py
new file mode 100644
index 0000000..8f6b6bf
--- /dev/null
+++ b/src/onaptests/steps/onboard/clamp.py
@@ -0,0 +1,91 @@
+#!/usr/bin/python
+# http://www.apache.org/licenses/LICENSE-2.0
+"""Clamp Onboard service class."""
+from yaml import load
+from onapsdk.sdc.service import Service
+from onapsdk.sdc.vf import Vf
+
+from onapsdk.configuration import settings
+
+from ..base import YamlTemplateBaseStep
+from .service import YamlTemplateVfOnboardStep
+
+class OnboardClampStep(YamlTemplateBaseStep):
+ """Onboard class to create CLAMP templates."""
+
+ def __init__(self, cleanup=False):
+ """Initialize Clamp Onboard object."""
+ super().__init__(cleanup=cleanup)
+ self._yaml_template: dict = None
+ self.add_step(YamlTemplateVfOnboardStep(cleanup=cleanup))
+ # if "service_name" in kwargs:
+ # self.service_name = kwargs['service_name']
+ # else:
+ # raise ValueError("Service Name to define")
+ # self.vf_list = []
+ # self.vsp_list = []
+ # self.set_logger()
+
+ @property
+ def yaml_template(self) -> dict:
+ """Step YAML template.
+
+ Load from file if it's a root step, get from parent otherwise.
+
+ Returns:
+ dict: Step YAML template
+
+ """
+ if self.is_root:
+ if not self._yaml_template:
+ with open(settings.SERVICE_YAML_TEMPLATE, "r") as yaml_template:
+ self._yaml_template: dict = load(yaml_template)
+ return self._yaml_template
+ return self.parent.yaml_template
+
+ @property
+ def service_name(self) -> str:
+ """Service name.
+
+ Get from YAML template if it's a root step, get from parent otherwise.
+
+ Returns:
+ str: Service name
+
+ """
+ if self.is_root:
+ return next(iter(self.yaml_template.keys()))
+ else:
+ return self.parent.service_name
+
+ @YamlTemplateBaseStep.store_state
+ def execute(self):
+ """Onboard service."""
+ super().execute()
+ # retrieve the Vf
+ vf = None
+ for sdc_vf in Vf.get_all():
+ if sdc_vf.name == settings.VF_NAME:
+ vf = sdc_vf
+ self._logger.debug("Vf retrieved %s", vf)
+
+ service: Service = Service(name=self.service_name,
+ resources=[vf])
+ service.create()
+ self._logger.info(" Service %s created", service)
+
+ service.add_resource(vf)
+
+ # we add the artifact to the first VNF
+ self._logger.info("Try to add blueprint to %s", vf.name)
+ payload_file = open(settings.CONFIGURATION_PATH + 'clampnode.yaml', 'rb')
+ data = payload_file.read()
+ self._logger.info("DCAE INVENTORY BLUEPRINT file retrieved")
+ service.add_artifact_to_vf(vnf_name=vf.name,
+ artifact_type="DCAE_INVENTORY_BLUEPRINT",
+ artifact_name="clampnode.yaml",
+ artifact=data)
+ payload_file.close()
+ service.checkin()
+ service.onboard()
+ self._logger.info("DCAE INVENTORY BLUEPRINT ADDED")