aboutsummaryrefslogtreecommitdiffstats
path: root/src/onaptests/steps/loop/clamp.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/onaptests/steps/loop/clamp.py')
-rw-r--r--src/onaptests/steps/loop/clamp.py125
1 files changed, 125 insertions, 0 deletions
diff --git a/src/onaptests/steps/loop/clamp.py b/src/onaptests/steps/loop/clamp.py
new file mode 100644
index 0000000..e601772
--- /dev/null
+++ b/src/onaptests/steps/loop/clamp.py
@@ -0,0 +1,125 @@
+#!/usr/bin/python
+#
+# This program and the accompanying materials
+# are made available under the terms of the Apache License, Version 2.0
+# which accompanies this distribution, and is available at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+"""Clamp Scenario class."""
+from yaml import load
+import time
+
+from onapsdk.clamp.clamp_element import Clamp
+from onapsdk.sdc.service import Service
+
+from onapsdk.configuration import settings
+from onaptests.steps.onboard.clamp import OnboardClampStep
+from onaptests.steps.loop.instantiate_loop import InstantiateLoop
+
+from ..base import YamlTemplateBaseStep
+
+
+class ClampStep(YamlTemplateBaseStep):
+ """class defining the different CLAMP scenarios."""
+
+ count: int = 0
+
+ def __init__(self, cleanup=False):
+ super().__init__(cleanup=cleanup)
+ self._yaml_template: dict = None
+ self.add_step(OnboardClampStep(cleanup=cleanup))
+ Clamp(cert=settings.CERT)
+ self.loop_instance = None
+
+ @property
+ def yaml_template(self) -> dict:
+ """Step YAML template.
+
+ Load from file if it's a root step, get from parent otherwise.
+
+ Returns:
+ dict: Step YAML template
+
+ """
+ if self.is_root:
+ if not self._yaml_template:
+ with open(settings.SERVICE_YAML_TEMPLATE, "r") as yaml_template:
+ self._yaml_template: dict = load(yaml_template)
+ return self._yaml_template
+ return self.parent.yaml_template
+
+ @property
+ def service_name(self) -> str:
+ """Service name.
+
+ Get from YAML template if it's a root step, get from parent otherwise.
+
+ Returns:
+ str: Service name
+
+ """
+ if self.is_root:
+ return next(iter(self.yaml_template.keys()))
+ else:
+ return self.parent.service_name
+
+
+ def check(self, operational_policies: list, is_template: bool = False) -> str:
+ """Check CLAMP requirements to create a loop."""
+ self._logger.info("Check operational policy")
+ for policy in operational_policies:
+ exist = Clamp.check_policies(policy_name=policy["name"],
+ req_policies=30)# 30 required policy
+ self._logger.info("Operational policy found.")
+ if not exist:
+ raise ValueError("Couldn't load the policy %s", policy)
+ # retrieve the service..based on service name
+ service: Service = Service(self.service_name)
+ if is_template:
+ loop_template = Clamp.check_loop_template(service=service)
+ self._logger.info("Loop template checked.")
+ return loop_template
+
+ def instantiate_clamp(self, loop_template: str, loop_name: str, operational_policies: list):
+ """Instantite a closed loopin CLAMP."""
+ loop = InstantiateLoop(template=loop_template,
+ loop_name=loop_name,
+ operational_policies=operational_policies,
+ cert=settings.CERT)
+ return loop.instantiate_loop()
+
+ def loop_counter(self, action: str) -> None:
+ """ Count number of loop instances."""
+ if action == "plus":
+ self.count += 1
+ if action == "minus":
+ self.count -= 1
+
+ @YamlTemplateBaseStep.store_state
+ def execute(self):
+ super().execute() # TODO work only the 1st time, not if already onboarded
+ # time to wait for template load in CLAMP
+ self._logger.info("Wait a little bit to give a chance to the distribution")
+ time.sleep(settings.CLAMP_DISTRIBUTION_TIMER)
+ # Test 1
+ operational_policies = settings.OPERATIONAL_POLICIES
+ loop_template = self.check(operational_policies, True)
+ # Test 2
+ loop_name = "instance_" + self.service_name + str(self.count)
+ self.loop_counter(action="plus")
+ self.loop_instance = self.instantiate_clamp(
+ loop_template=loop_template,
+ loop_name=loop_name,
+ operational_policies=operational_policies)
+
+ def cleanup(self) -> None:
+ """Cleanup Service.
+
+ Raises:
+ Exception: Clamp cleaning failed
+
+ """
+ self.loop_counter(action="minus")
+ self.loop_instance.undeploy_microservice_from_dcae()
+ self.loop_instance.delete()
+ super().cleanup()