aboutsummaryrefslogtreecommitdiffstats
path: root/src/onaptests/steps/onboard/cds.py
blob: ed381ad8548c3787a926827c000702d18d4a583f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
# http://www.apache.org/licenses/LICENSE-2.0
"""CDS onboard module."""

from abc import ABC
from pathlib import Path

from kubernetes import client, config
from onapsdk.cds import Blueprint, DataDictionarySet
from onapsdk.cds.blueprint_processor import Blueprintprocessor
from onapsdk.configuration import settings

from ..base import BaseStep


class CDSBaseStep(BaseStep, ABC):
    """Abstract CDS base step."""

    @property
    def component(self) -> str:
        """Component name."""
        return "CDS"


class ExposeCDSBlueprintprocessorNodePortStep(CDSBaseStep):
    """Expose CDS blueprintsprocessor port."""

    @property
    def description(self) -> str:
        """Step description."""
        return "Expose CDS blueprintsprocessor NodePort."

    @BaseStep.store_state
    def execute(self) -> None:
        """Expose CDS blueprintprocessor port using kubernetes client.

        Use settings values:
         - K8S_CONFIG,
         - K8S_NAMESPACE.

        """
        super().execute()
        config.load_kube_config(settings.K8S_CONFIG)
        k8s_client = client.CoreV1Api()
        k8s_client.patch_namespaced_service(
            "cds-blueprints-processor-http",
            settings.K8S_NAMESPACE,
            {"spec": {"ports": [{"port": 8080, "nodePort": 30449}], "type": "NodePort"}}
        )


class BootstrapBlueprintprocessor(CDSBaseStep):
    """Bootstrap blueprintsprocessor."""

    def __init__(self, cleanup: bool = False) -> None:
        super().__init__(cleanup=cleanup)
        self.add_step(ExposeCDSBlueprintprocessorNodePortStep(cleanup=cleanup))

    @property
    def description(self) -> str:
        """Step description."""
        return "Bootstrap CDS blueprintprocessor"

    @BaseStep.store_state
    def execute(self) -> None:
        """Bootsrap CDS blueprintprocessor"""
        super().execute()
        Blueprintprocessor.bootstrap()


class DataDictionaryUploadStep(CDSBaseStep):
    """Upload data dictionaries to CDS step."""

    def __init__(self, cleanup: bool = False) -> None:
        """Initialize data dictionary upload step."""
        super().__init__(cleanup=cleanup)
        self.add_step(BootstrapBlueprintprocessor(cleanup=cleanup))

    @property
    def description(self) -> str:
        """Step description."""
        return "Upload data dictionaries to CDS."

    @BaseStep.store_state
    def execute(self) -> None:
        """Upload data dictionary to CDS.

        Use settings values:
         - CDS_DD_FILE.

        """
        super().execute()
        dd_set: DataDictionarySet = DataDictionarySet.load_from_file(settings.CDS_DD_FILE)
        dd_set.upload()


class CbaEnrichStep(CDSBaseStep):
    """Enrich CBA file step."""

    def __init__(self, cleanup=False) -> None:
        """Initialize CBA enrichment step."""
        super().__init__(cleanup=cleanup)
        self.add_step(DataDictionaryUploadStep(cleanup=cleanup))

    @property
    def description(self) -> str:
        """Step description."""
        return "Enrich CBA file."

    @BaseStep.store_state
    def execute(self) -> None:
        """Enrich CBA file.

        Use settings values:
         - CDS_DD_FILE.

        """
        super().execute()
        blueprint: Blueprint = Blueprint.load_from_file(settings.CDS_CBA_UNENRICHED)
        blueprint.enrich()
        blueprint.save(settings.CDS_CBA_ENRICHED)

    def cleanup(self) -> None:
        """Cleanup enrichment step.

        Delete enriched CBA file.

        """
        super().cleanup()
        Path(settings.CDA_CBA_ENRICHED).unlink()