aboutsummaryrefslogtreecommitdiffstats
path: root/src/onaptests/steps/loop
diff options
context:
space:
mode:
Diffstat (limited to 'src/onaptests/steps/loop')
-rw-r--r--src/onaptests/steps/loop/clamp.py125
-rw-r--r--src/onaptests/steps/loop/instantiate_loop.py87
2 files changed, 212 insertions, 0 deletions
diff --git a/src/onaptests/steps/loop/clamp.py b/src/onaptests/steps/loop/clamp.py
new file mode 100644
index 0000000..e601772
--- /dev/null
+++ b/src/onaptests/steps/loop/clamp.py
@@ -0,0 +1,125 @@
+#!/usr/bin/python
+#
+# This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+"""Clamp Scenario class."""
+from yaml import load
+import time
+
+from onapsdk.clamp.clamp_element import Clamp
+from onapsdk.sdc.service import Service
+
+from onapsdk.configuration import settings
+from onaptests.steps.onboard.clamp import OnboardClampStep
+from onaptests.steps.loop.instantiate_loop import InstantiateLoop
+
+from ..base import YamlTemplateBaseStep
+
+
+class ClampStep(YamlTemplateBaseStep):
+ """class defining the different CLAMP scenarios."""
+
+ count: int = 0
+
+ def __init__(self, cleanup=False):
+ super().__init__(cleanup=cleanup)
+ self._yaml_template: dict = None
+ self.add_step(OnboardClampStep(cleanup=cleanup))
+ Clamp(cert=settings.CERT)
+ self.loop_instance = None
+
+ @property
+ def yaml_template(self) -> dict:
+ """Step YAML template.
+
+ Load from file if it's a root step, get from parent otherwise.
+
+ Returns:
+ dict: Step YAML template
+
+ """
+ if self.is_root:
+ if not self._yaml_template:
+ with open(settings.SERVICE_YAML_TEMPLATE, "r") as yaml_template:
+ self._yaml_template: dict = load(yaml_template)
+ return self._yaml_template
+ return self.parent.yaml_template
+
+ @property
+ def service_name(self) -> str:
+ """Service name.
+
+ Get from YAML template if it's a root step, get from parent otherwise.
+
+ Returns:
+ str: Service name
+
+ """
+ if self.is_root:
+ return next(iter(self.yaml_template.keys()))
+ else:
+ return self.parent.service_name
+
+
+ def check(self, operational_policies: list, is_template: bool = False) -> str:
+ """Check CLAMP requirements to create a loop."""
+ self._logger.info("Check operational policy")
+ for policy in operational_policies:
+ exist = Clamp.check_policies(policy_name=policy["name"],
+ req_policies=30)# 30 required policy
+ self._logger.info("Operational policy found.")
+ if not exist:
+ raise ValueError("Couldn't load the policy %s", policy)
+ # retrieve the service..based on service name
+ service: Service = Service(self.service_name)
+ if is_template:
+ loop_template = Clamp.check_loop_template(service=service)
+ self._logger.info("Loop template checked.")
+ return loop_template
+
+ def instantiate_clamp(self, loop_template: str, loop_name: str, operational_policies: list):
+ """Instantite a closed loopin CLAMP."""
+ loop = InstantiateLoop(template=loop_template,
+ loop_name=loop_name,
+ operational_policies=operational_policies,
+ cert=settings.CERT)
+ return loop.instantiate_loop()
+
+ def loop_counter(self, action: str) -> None:
+ """ Count number of loop instances."""
+ if action == "plus":
+ self.count += 1
+ if action == "minus":
+ self.count -= 1
+
+ @YamlTemplateBaseStep.store_state
+ def execute(self):
+ super().execute() # TODO work only the 1st time, not if already onboarded
+ # time to wait for template load in CLAMP
+ self._logger.info("Wait a little bit to give a chance to the distribution")
+ time.sleep(settings.CLAMP_DISTRIBUTION_TIMER)
+ # Test 1
+ operational_policies = settings.OPERATIONAL_POLICIES
+ loop_template = self.check(operational_policies, True)
+ # Test 2
+ loop_name = "instance_" + self.service_name + str(self.count)
+ self.loop_counter(action="plus")
+ self.loop_instance = self.instantiate_clamp(
+ loop_template=loop_template,
+ loop_name=loop_name,
+ operational_policies=operational_policies)
+
+ def cleanup(self) -> None:
+ """Cleanup Service.
+
+ Raises:
+ Exception: Clamp cleaning failed
+
+ """
+ self.loop_counter(action="minus")
+ self.loop_instance.undeploy_microservice_from_dcae()
+ self.loop_instance.delete()
+ super().cleanup()
diff --git a/src/onaptests/steps/loop/instantiate_loop.py b/src/onaptests/steps/loop/instantiate_loop.py
new file mode 100644
index 0000000..bd34a04
--- /dev/null
+++ b/src/onaptests/steps/loop/instantiate_loop.py
@@ -0,0 +1,87 @@
+#!/usr/bin/python
+# http://www.apache.org/licenses/LICENSE-2.0
+"""Instantiation class."""
+import logging
+import logging.config
+
+from onapsdk.clamp.loop_instance import LoopInstance
+from onapsdk.configuration import settings
+
+
+class InstantiateLoop():
+ """class instantiating a closed loop in clamp."""
+
+ def __init__(self, template: str, loop_name: str, operational_policies: list, cert: tuple):
+ self.template=template
+ self.loop_name=loop_name
+ self.operational_policies=operational_policies
+ self.cert=cert
+
+ self._logger: logging.Logger = logging.getLogger("")
+ logging.config.dictConfig(settings.LOG_CONFIG)
+
+ def add_policies(self, loop: LoopInstance) -> None:
+ """add necessary wanted operational policies."""
+ for policy in self.operational_policies:
+ self._logger.info("******** ADD OPERATIONAL POLICY %s *******", policy["name"])
+ added = loop.add_operational_policy(policy_type=policy["policy_type"],
+ policy_version=policy["policy_version"])
+ if not added:
+ self._logger.error("an error occured while adding an operational policy")
+ self._logger.info("ADD OPERATION SUCCESSFULY DONE")
+
+
+ def configure_policies(self, loop: LoopInstance) -> None:
+ """Configure all policies."""
+ self._logger.info("******** UPDATE MICROSERVICE POLICY *******")
+ loop._update_loop_details()
+ loop.update_microservice_policy()
+ self._logger.info("******** UPDATE OPERATIONAL POLICIES CONFIG *******")
+ for policy in self.operational_policies:
+ #loop.add_op_policy_config(loop.LoopInstance.__dict__[policy["config_function"]])
+ #possible configurations for the moment
+ if policy["name"] == "MinMax":
+ loop.add_op_policy_config(loop.add_minmax_config)
+ if policy["name"] == "Drools":
+ loop.add_op_policy_config(loop.add_drools_conf)
+ if policy["name"] == "FrequencyLimiter":
+ loop.add_op_policy_config(loop.add_frequency_limiter)
+ self._logger.info("Policies are well configured")
+
+ def deploy(self, loop: LoopInstance) -> None:
+ """Deploy closed loop."""
+ self._logger.info("******** SUBMIT POLICIES TO PE *******")
+ submit = loop.act_on_loop_policy(loop.submit)
+ self._logger.info("******** CHECK POLICIES SUBMITION *******")
+ if submit :
+ self._logger.info("Policies successfully submited to PE")
+
+ else:
+ self._logger.error("An error occured while submitting the loop instance")
+ exit(1)
+ self._logger.info("******** DEPLOY LOOP INSTANCE *******")
+ deploy = loop.deploy_microservice_to_dcae()
+ if deploy:
+ self._logger.info("Loop instance %s successfully deployed on DCAE !!", self.loop_name)
+ else:
+ self._logger.error("An error occured while deploying the loop instance")
+ exit(1)
+
+ def instantiate_loop(self):
+ """Instantiate the control loop."""
+ loop = LoopInstance(template=self.template,
+ name=self.loop_name,
+ details={},
+ cert=self.cert)
+ details = loop.create()
+ if details:
+ self._logger.info("Loop instance %s successfully created !!", self.loop_name)
+ else:
+ self._logger.error("An error occured while creating the loop instance")
+
+ self.add_policies(loop=loop)
+ self.configure_policies(loop=loop)
+ self.deploy(loop=loop)
+
+ loop.details = loop._update_loop_details()
+ return loop