blob: ed381ad8548c3787a926827c000702d18d4a583f (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
|
# http://www.apache.org/licenses/LICENSE-2.0
"""CDS onboard module."""
from abc import ABC
from pathlib import Path
from kubernetes import client, config
from onapsdk.cds import Blueprint, DataDictionarySet
from onapsdk.cds.blueprint_processor import Blueprintprocessor
from onapsdk.configuration import settings
from ..base import BaseStep
class CDSBaseStep(BaseStep, ABC):
"""Abstract CDS base step."""
@property
def component(self) -> str:
"""Component name."""
return "CDS"
class ExposeCDSBlueprintprocessorNodePortStep(CDSBaseStep):
"""Expose CDS blueprintsprocessor port."""
@property
def description(self) -> str:
"""Step description."""
return "Expose CDS blueprintsprocessor NodePort."
@BaseStep.store_state
def execute(self) -> None:
"""Expose CDS blueprintprocessor port using kubernetes client.
Use settings values:
- K8S_CONFIG,
- K8S_NAMESPACE.
"""
super().execute()
config.load_kube_config(settings.K8S_CONFIG)
k8s_client = client.CoreV1Api()
k8s_client.patch_namespaced_service(
"cds-blueprints-processor-http",
settings.K8S_NAMESPACE,
{"spec": {"ports": [{"port": 8080, "nodePort": 30449}], "type": "NodePort"}}
)
class BootstrapBlueprintprocessor(CDSBaseStep):
"""Bootstrap blueprintsprocessor."""
def __init__(self, cleanup: bool = False) -> None:
super().__init__(cleanup=cleanup)
self.add_step(ExposeCDSBlueprintprocessorNodePortStep(cleanup=cleanup))
@property
def description(self) -> str:
"""Step description."""
return "Bootstrap CDS blueprintprocessor"
@BaseStep.store_state
def execute(self) -> None:
"""Bootsrap CDS blueprintprocessor"""
super().execute()
Blueprintprocessor.bootstrap()
class DataDictionaryUploadStep(CDSBaseStep):
"""Upload data dictionaries to CDS step."""
def __init__(self, cleanup: bool = False) -> None:
"""Initialize data dictionary upload step."""
super().__init__(cleanup=cleanup)
self.add_step(BootstrapBlueprintprocessor(cleanup=cleanup))
@property
def description(self) -> str:
"""Step description."""
return "Upload data dictionaries to CDS."
@BaseStep.store_state
def execute(self) -> None:
"""Upload data dictionary to CDS.
Use settings values:
- CDS_DD_FILE.
"""
super().execute()
dd_set: DataDictionarySet = DataDictionarySet.load_from_file(settings.CDS_DD_FILE)
dd_set.upload()
class CbaEnrichStep(CDSBaseStep):
"""Enrich CBA file step."""
def __init__(self, cleanup=False) -> None:
"""Initialize CBA enrichment step."""
super().__init__(cleanup=cleanup)
self.add_step(DataDictionaryUploadStep(cleanup=cleanup))
@property
def description(self) -> str:
"""Step description."""
return "Enrich CBA file."
@BaseStep.store_state
def execute(self) -> None:
"""Enrich CBA file.
Use settings values:
- CDS_DD_FILE.
"""
super().execute()
blueprint: Blueprint = Blueprint.load_from_file(settings.CDS_CBA_UNENRICHED)
blueprint.enrich()
blueprint.save(settings.CDS_CBA_ENRICHED)
def cleanup(self) -> None:
"""Cleanup enrichment step.
Delete enriched CBA file.
"""
super().cleanup()
Path(settings.CDA_CBA_ENRICHED).unlink()
|