aboutsummaryrefslogtreecommitdiffstats
path: root/src/onaptests/steps/loop/clamp.py
blob: e60177221443ce752a4e70d01aa76a21d6aed1b2 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
#!/usr/bin/python
#
# This program and the accompanying materials
# are made available under the terms of the Apache License, Version 2.0
# which accompanies this distribution, and is available at
#
# http://www.apache.org/licenses/LICENSE-2.0
"""Clamp Scenario class."""
from yaml import load
import time

from onapsdk.clamp.clamp_element import Clamp
from onapsdk.sdc.service import Service

from onapsdk.configuration import settings
from onaptests.steps.onboard.clamp import OnboardClampStep
from onaptests.steps.loop.instantiate_loop import InstantiateLoop

from ..base import YamlTemplateBaseStep


class ClampStep(YamlTemplateBaseStep):
    """class defining the different CLAMP scenarios."""

    count: int = 0

    def __init__(self, cleanup=False):
        super().__init__(cleanup=cleanup)
        self._yaml_template: dict = None
        self.add_step(OnboardClampStep(cleanup=cleanup))
        Clamp(cert=settings.CERT)
        self.loop_instance = None

    @property
    def yaml_template(self) -> dict:
        """Step YAML template.

        Load from file if it's a root step, get from parent otherwise.

        Returns:
            dict: Step YAML template

        """
        if self.is_root:
            if not self._yaml_template:
                with open(settings.SERVICE_YAML_TEMPLATE, "r") as yaml_template:
                    self._yaml_template: dict = load(yaml_template)
            return self._yaml_template
        return self.parent.yaml_template

    @property
    def service_name(self) -> str:
        """Service name.

        Get from YAML template if it's a root step, get from parent otherwise.

        Returns:
            str: Service name

        """
        if self.is_root:
            return next(iter(self.yaml_template.keys()))
        else:
            return self.parent.service_name


    def check(self, operational_policies: list, is_template: bool = False) -> str:
        """Check CLAMP requirements to create a loop."""
        self._logger.info("Check operational policy")
        for policy in operational_policies:
            exist = Clamp.check_policies(policy_name=policy["name"],
                                         req_policies=30)# 30 required policy
            self._logger.info("Operational policy found.")
            if not exist:
                raise ValueError("Couldn't load the policy %s", policy)
        # retrieve the service..based on service name
        service: Service = Service(self.service_name)
        if is_template:
            loop_template = Clamp.check_loop_template(service=service)
            self._logger.info("Loop template checked.")
            return loop_template

    def instantiate_clamp(self, loop_template: str, loop_name: str, operational_policies: list):
        """Instantite a closed loopin CLAMP."""
        loop = InstantiateLoop(template=loop_template,
                               loop_name=loop_name,
                               operational_policies=operational_policies,
                               cert=settings.CERT)
        return loop.instantiate_loop()

    def loop_counter(self, action: str) -> None:
        """ Count number of loop instances."""
        if  action == "plus":
            self.count += 1
        if  action == "minus":
            self.count -= 1

    @YamlTemplateBaseStep.store_state
    def execute(self):
        super().execute() # TODO work only the 1st time, not if already onboarded
        # time to wait for template load in CLAMP
        self._logger.info("Wait a little bit to give a chance to the distribution")
        time.sleep(settings.CLAMP_DISTRIBUTION_TIMER)
        # Test 1
        operational_policies = settings.OPERATIONAL_POLICIES
        loop_template = self.check(operational_policies, True)
        # Test 2
        loop_name = "instance_" + self.service_name + str(self.count)
        self.loop_counter(action="plus")
        self.loop_instance = self.instantiate_clamp(
            loop_template=loop_template,
            loop_name=loop_name,
            operational_policies=operational_policies)

    def cleanup(self) -> None:
        """Cleanup Service.

        Raises:
            Exception: Clamp cleaning failed

        """
        self.loop_counter(action="minus")
        self.loop_instance.undeploy_microservice_from_dcae()
        self.loop_instance.delete()
        super().cleanup()