aboutsummaryrefslogtreecommitdiffstats
path: root/src/onaptests/steps/loop
diff options
context:
space:
mode:
Diffstat (limited to 'src/onaptests/steps/loop')
-rw-r--r--src/onaptests/steps/loop/clamp.py154
-rw-r--r--src/onaptests/steps/loop/instantiate_loop.py87
2 files changed, 0 insertions, 241 deletions
diff --git a/src/onaptests/steps/loop/clamp.py b/src/onaptests/steps/loop/clamp.py
deleted file mode 100644
index 1d71db2..0000000
--- a/src/onaptests/steps/loop/clamp.py
+++ /dev/null
@@ -1,154 +0,0 @@
-#!/usr/bin/python
-#
-# This program and the accompanying materials
-# are made available under the terms of the Apache License, Version 2.0
-# which accompanies this distribution, and is available at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-"""Clamp Scenario class."""
-import random
-import string
-import time
-
-from onapsdk.clamp.clamp_element import Clamp
-from onapsdk.configuration import settings
-from onapsdk.sdc.service import Service
-from yaml import SafeLoader, load
-
-import onaptests.utils.exceptions as onap_test_exceptions
-from onaptests.steps.base import YamlTemplateBaseStep
-from onaptests.steps.loop.instantiate_loop import InstantiateLoop
-from onaptests.steps.onboard.clamp import OnboardClampStep
-
-
-class ClampStep(YamlTemplateBaseStep):
- """class defining the different CLAMP scenarios."""
-
- count: int = 0
-
- def __init__(self):
- super().__init__(cleanup=settings.CLEANUP_FLAG)
- self._yaml_template: dict = None
- self.add_step(OnboardClampStep())
- Clamp()
- self.loop_instance = None
-
- @property
- def description(self) -> str:
- """Step description."""
- return "Retrieve TCA, Policy then create a loop"
-
- @property
- def component(self) -> str:
- """Component name."""
- return "CLAMP"
-
- @property
- def yaml_template(self) -> dict:
- """Step YAML template.
-
- Load from file if it's a root step, get from parent otherwise.
-
- Returns:
- dict: Step YAML template
-
- """
- if self.is_root:
- if not self._yaml_template:
- with open(settings.SERVICE_YAML_TEMPLATE, "r", encoding="utf-8") as yaml_template:
- self._yaml_template: dict = load(yaml_template, SafeLoader)
- return self._yaml_template
- return self.parent.yaml_template
-
- @property
- def model_yaml_template(self) -> dict:
- return {}
-
- def check(self, operational_policies: list, is_template: bool = False) -> str:
- """Check CLAMP requirements to create a loop."""
- self._logger.info("Check operational policy")
- for policy in operational_policies:
- exist = Clamp.check_policies(policy_name=policy["name"],
- req_policies=30) # 30 required policy
- self._logger.info("Operational policy found.")
- if not exist:
- raise ValueError("Couldn't load the policy %s" % policy)
- # retrieve the service..based on service name
- service: Service = Service(self.service_name)
- if is_template:
- loop_template = Clamp.check_loop_template(service=service)
- self._logger.info("Loop template checked.")
- return loop_template
- return None
-
- def instantiate_clamp(self, loop_template: str, loop_name: str, operational_policies: list):
- """Instantite a closed loopin CLAMP."""
- letters = string.ascii_letters
- loop_name_random = loop_name.join(
- random.choice(letters) for i in range(6))
- loop = InstantiateLoop(template=loop_template,
- loop_name=loop_name_random,
- operational_policies=operational_policies)
- return loop.instantiate_loop()
-
- def loop_counter(self, action: str) -> None:
- """ Count number of loop instances."""
- if action == "plus":
- self.count += 1
- if action == "minus":
- self.count -= 1
-
- @YamlTemplateBaseStep.store_state
- def execute(self):
- super().execute() # TODO work only the 1st time, not if already onboarded
-
- # Before instantiating, be sure that the service has been distributed
- service = Service(self.service_name)
- self._logger.info("******** Check Service Distribution *******")
- distribution_completed = False
- nb_try = 0
- nb_try_max = 10
- while distribution_completed is False and nb_try < nb_try_max:
- distribution_completed = service.distributed
- if distribution_completed is True:
- self._logger.info(
- "Service Distribution for %s is sucessfully finished",
- service.name)
- break
- self._logger.info(
- "Service Distribution for %s ongoing, Wait for %d s",
- service.name, settings.SERVICE_DISTRIBUTION_SLEEP_TIME)
- time.sleep(settings.SERVICE_DISTRIBUTION_SLEEP_TIME)
- nb_try += 1
-
- if distribution_completed is False:
- self._logger.error(
- "Service Distribution for %s failed !!", service.name)
- raise onap_test_exceptions.ServiceDistributionException
-
- # time to wait for template load in CLAMP
- self._logger.info("Wait a little bit to give a chance to the distribution")
- time.sleep(settings.CLAMP_DISTRIBUTION_TIMER)
- # Test 1
- operational_policies = settings.OPERATIONAL_POLICIES
- loop_template = self.check(operational_policies, True)
- # Test 2
- loop_name = "instance_" + self.service_name + str(self.count)
- self.loop_counter(action="plus")
- self.loop_instance = self.instantiate_clamp(
- loop_template=loop_template,
- loop_name=loop_name,
- operational_policies=operational_policies)
-
- @YamlTemplateBaseStep.store_state(cleanup=True)
- def cleanup(self) -> None:
- """Cleanup Service.
-
- Raises:
- Exception: Clamp cleaning failed
-
- """
- self.loop_counter(action="minus")
- self.loop_instance.undeploy_microservice_from_dcae()
- self.loop_instance.delete()
- super().cleanup()
diff --git a/src/onaptests/steps/loop/instantiate_loop.py b/src/onaptests/steps/loop/instantiate_loop.py
deleted file mode 100644
index 9aeefca..0000000
--- a/src/onaptests/steps/loop/instantiate_loop.py
+++ /dev/null
@@ -1,87 +0,0 @@
-#!/usr/bin/python
-# http://www.apache.org/licenses/LICENSE-2.0
-"""Instantiation class."""
-import logging
-import logging.config
-
-from onapsdk.clamp.loop_instance import LoopInstance
-from onapsdk.configuration import settings
-
-import onaptests.utils.exceptions as onap_test_exceptions
-
-
-# pylint: disable=protected-access
-class InstantiateLoop():
- """class instantiating a closed loop in clamp."""
-
- def __init__(self, template: str, loop_name: str, operational_policies: list):
- self.template = template
- self.loop_name = loop_name
- self.operational_policies = operational_policies
-
- self._logger: logging.Logger = logging.getLogger("")
- logging.config.dictConfig(settings.LOG_CONFIG)
-
- def add_policies(self, loop: LoopInstance) -> None:
- """add necessary wanted operational policies."""
- for policy in self.operational_policies:
- self._logger.info("******** ADD OPERATIONAL POLICY %s *******", policy["name"])
- added = loop.add_operational_policy(policy_type=policy["policy_type"],
- policy_version=policy["policy_version"])
- if not added:
- self._logger.error("an error occured while adding an operational policy")
- self._logger.info("ADD OPERATION SUCCESSFULY DONE")
-
- def configure_policies(self, loop: LoopInstance) -> None:
- """Configure all policies."""
- self._logger.info("******** UPDATE MICROSERVICE POLICY *******")
- loop._update_loop_details()
- loop.update_microservice_policy()
- self._logger.info("******** UPDATE OPERATIONAL POLICIES CONFIG *******")
- for policy in self.operational_policies:
- # loop.add_op_policy_config(loop.LoopInstance.__dict__[policy["config_function"]])
- # possible configurations for the moment
- if policy["name"] == "MinMax":
- loop.add_op_policy_config(loop.add_minmax_config)
- if policy["name"] == "Drools":
- loop.add_op_policy_config(loop.add_drools_conf)
- if policy["name"] == "FrequencyLimiter":
- loop.add_op_policy_config(loop.add_frequency_limiter)
- self._logger.info("Policies are well configured")
-
- def deploy(self, loop: LoopInstance) -> None:
- """Deploy closed loop."""
- self._logger.info("******** SUBMIT POLICIES TO PE *******")
- submit = loop.act_on_loop_policy(loop.submit)
- self._logger.info("******** CHECK POLICIES SUBMITION *******")
- if submit:
- self._logger.info("Policies successfully submited to PE")
-
- else:
- self._logger.error("An error occured while submitting the loop instance")
- raise onap_test_exceptions.PolicyException
- self._logger.info("******** DEPLOY LOOP INSTANCE *******")
- deploy = loop.deploy_microservice_to_dcae()
- if deploy:
- self._logger.info("Loop instance %s successfully deployed on DCAE !!", self.loop_name)
- else:
- self._logger.error("An error occured while deploying the loop instance")
- raise onap_test_exceptions.DcaeException
-
- def instantiate_loop(self):
- """Instantiate the control loop."""
- loop = LoopInstance(template=self.template,
- name=self.loop_name,
- details={})
- loop.create()
- if loop.details:
- self._logger.info("Loop instance %s successfully created !!", self.loop_name)
- else:
- self._logger.error("An error occured while creating the loop instance")
-
- self.add_policies(loop=loop)
- self.configure_policies(loop=loop)
- self.deploy(loop=loop)
-
- loop.details = loop._update_loop_details()
- return loop