"""CDS Blueprint module.""" # Copyright 2022 Orange, Deutsche Telekom AG # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import json import re from dataclasses import dataclass, field from datetime import datetime from io import BytesIO from typing import Any, Dict, Generator, Iterator, List, Optional from urllib.parse import urlencode from uuid import uuid4 from zipfile import ZipFile import oyaml as yaml from onapsdk.utils.jinja import jinja_env from onapsdk.exceptions import FileError, ParameterError, ValidationError from .cds_element import CdsElement from .data_dictionary import DataDictionary, DataDictionarySet @dataclass class CbaMetadata: """Class to hold CBA metadata values.""" tosca_meta_file_version: str csar_version: str created_by: str entry_definitions: str template_name: str template_version: str template_tags: str @dataclass class Mapping: """Blueprint's template mapping. Stores mapping data: - name, - type, - name of dictionary from which value should be get, - dictionary source of value. """ name: str mapping_type: str dictionary_name: str dictionary_sources: List[str] = field(default_factory=list) def __hash__(self) -> int: # noqa: D401 """Mapping object hash. Based on mapping name. Returns: int: Mapping hash """ return hash(self.name) def __eq__(self, mapping: "Mapping") -> bool: """Compare two mapping objects. Mappings are equal if have the same name. Args: mapping (Mapping): Mapping object to compare with. Returns: bool: True if objects have the same name, False otherwise. """ return self.name == mapping.name def merge(self, mapping: "Mapping") -> None: """Merge mapping objects. Merge objects dictionary sources. Args: mapping (Mapping): Mapping object to merge. """ self.dictionary_sources = list( set(self.dictionary_sources) | set(mapping.dictionary_sources) ) def generate_data_dictionary(self) -> dict: """Generate data dictionary for mapping. Data dictionary with required data sources, type and name for mapping will be created from Jinja2 template. Returns: dict: Data dictionary """ return json.loads( jinja_env().get_template("data_dictionary_base.json.j2").render(mapping=self) ) class MappingSet: """Set of mapping objects. Mapping objects will be stored in dictionary where mapping name is a key. No two mappings with the same name can be stored in this collection. """ def __init__(self) -> None: """Initialize mappings collection. Create dictionary to store mappings. """ self.mappings = {} def __len__(self) -> int: # noqa: D401 """Mapping set length. Returns: int: Number of stored mapping objects. """ return len(self.mappings) def __iter__(self) -> Iterator[Mapping]: """Iterate through mapping stored in set. Returns: Iterator[Mapping]: Stored mappings iterator. """ return iter(list(self.mappings.values())) def __getitem__(self, index: int) -> Mapping: """Get item stored on given index. Args: index (int): Index number. Returns: Mapping: Mapping stored on given index. """ return list(self.mappings.values())[index] def add(self, mapping: Mapping) -> None: """Add mapping to set. If there is already mapping object with the same name in collection they will be merged. Args: mapping (Mapping): Mapping to add to collection. """ if mapping.name not in self.mappings: self.mappings.update({mapping.name: mapping}) else: self.mappings[mapping.name].merge(mapping) def extend(self, iterable: Iterator[Mapping]) -> None: """Extend set with an iterator of mappings. Args: iterable (Iterator[Mapping]): Mappings iterator. """ for mapping in iterable: self.add(mapping) class Workflow(CdsElement): """Blueprint's workflow. Stores workflow steps, inputs, outputs. Executes workflow using CDS HTTP API. """ @dataclass class WorkflowStep: """Workflow step class. Stores step name, description, target and optional activities. """ name: str description: str target: str activities: List[Dict[str, str]] = field(default_factory=list) @dataclass class WorkflowInput: """Workflow input class. Stores input name, information if it's required, type, and optional description. """ name: str required: bool type: str description: str = "" @dataclass class WorkflowOutput: """Workflow output class. Stores output name, type na value. """ name: str type: str value: Dict[str, Any] def __init__(self, cba_workflow_name: str, cba_workflow_data: dict, blueprint: "Blueprint") -> None: """Workflow initialization. Args: cba_workflow_name (str): Workflow name. cba_workflow_data (dict): Workflow data. blueprint (Blueprint): Blueprint object which contains workflow. """ super().__init__() self.name: str = cba_workflow_name self.workflow_data: dict = cba_workflow_data self.blueprint: "Blueprint" = blueprint self._steps: List[self.WorkflowStep] = None self._inputs: List[self.WorkflowInput] = None self._outputs: List[self.WorkflowOutput] = None def __repr__(self) -> str: """Representation of object. Returns: str: Object's string representation """ return (f"Workflow(name='{self.name}', " f"blueprint_name='{self.blueprint.metadata.template_name})'") @property def steps(self) -> List["Workflow.WorkflowStep"]: """Workflow's steps property. Returns: List[Workflow.WorkflowStep]: List of workflow's steps. """ if self._steps is None: self._steps = [] for step_name, step_data in self.workflow_data.get("steps", {}).items(): self._steps.append( self.WorkflowStep( name=step_name, description=step_data.get("description"), target=step_data.get("target"), activities=step_data.get("activities", []), ) ) return self._steps @property def inputs(self) -> List["Workflow.WorkflowInput"]: """Workflow's inputs property. Returns: List[Workflow.WorkflowInput]: List of workflows's inputs. """ if self._inputs is None: self._inputs = [] for input_name, input_data in self.workflow_data.get("inputs", {}).items(): self._inputs.append( self.WorkflowInput( name=input_name, required=input_data.get("required"), type=input_data.get("type"), description=input_data.get("description"), ) ) return self._inputs @property def outputs(self) -> List["Workflow.WorkflowOutput"]: """Workflow's outputs property. Returns: List[Workflow.WorkflowOutput]: List of workflows's outputs. """ if self._outputs is None: self._outputs = [] for output_name, output_data in self.workflow_data.get("outputs", {}).items(): self._outputs.append( self.WorkflowOutput( name=output_name, type=output_data.get("type"), value=output_data.get("value"), ) ) return self._outputs @property def url(self) -> str: """Workflow execution url. Returns: str: Url to call warkflow execution. """ return f"{self._url}/api/v1/execution-service/process" def execute(self, inputs: dict) -> dict: """Execute workflow. Call CDS HTTP API to execute workflow. Args: inputs (dict): Inputs dictionary. Returns: dict: Response's payload. """ # There should be some flague to check if CDS UI API is used or blueprintprocessor. # For CDS UI API there is no endporint to execute workflow, so it has to be turned off. execution_service_input: dict = { "commonHeader": { "originatorId": "onapsdk", "requestId": str(uuid4()), "subRequestId": str(uuid4()), "timestamp": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ"), }, "actionIdentifiers": { "blueprintName": self.blueprint.metadata.template_name, "blueprintVersion": self.blueprint.metadata.template_version, "actionName": self.name, "mode": "SYNC", # Has to be SYNC for REST call }, "payload": {f"{self.name}-request": inputs}, } response: "requests.Response" = self.send_message_json( "POST", f"Execute {self.blueprint.metadata.template_name} blueprint {self.name} workflow", self.url, auth=self.auth, data=json.dumps(execution_service_input) ) return response["payload"] class ResolvedTemplate(CdsElement): """Resolved template class. Store and retrieve rendered template results. """ def __init__(self, blueprint: "Blueprint", # pylint: disable=too-many-arguments artifact_name: Optional[str] = None, resolution_key: Optional[str] = None, resource_id: Optional[str] = None, resource_type: Optional[str] = None, occurrence: Optional[str] = None, response_format: str = "application/json") -> None: """Init resolved template class instance. Args: blueprint (Blueprint): Blueprint object. artifact_name (Optional[str], optional): Artifact name for which to retrieve a resolved resource. Defaults to None. resolution_key (Optional[str], optional): Resolution Key associated with the resolution. Defaults to None. resource_id (Optional[str], optional): Resource Id associated with the resolution. Defaults to None. resource_type (Optional[str], optional): Resource Type associated with the resolution. Defaults to None. occurrence (Optional[str], optional): Occurrence of the template resolution (1-n). Defaults to None. response_format (str): Expected format of the template being retrieved. """ super().__init__() self.blueprint: "Blueprint" = blueprint self.artifact_name: Optional[str] = artifact_name self.resolution_key: Optional[str] = resolution_key self.resource_id: Optional[str] = resource_id self.resource_type: Optional[str] = resource_type self.occurrence: Optional[str] = occurrence self.response_format: str = response_format @property def url(self) -> str: """Url property. Returns: str: Url """ return f"{self._url}/api/v1/template" @property def resolved_template_url(self) -> str: """Url to retrieve resolved template. Filter None parameters. Returns: str: Retrieve resolved template url """ params_dict: Dict[str, str] = urlencode(dict(filter(lambda item: item[1] is not None, { "bpName": self.blueprint.metadata.template_name, "bpVersion": self.blueprint.metadata.template_version, "artifactName": self.artifact_name, "resolutionKey": self.resolution_key, "resourceType": self.resource_type, "resourceId": self.resource_id, "occurrence": self.occurrence, "format": self.response_format }.items()))) return f"{self.url}?{params_dict}" def get_resolved_template(self) -> Dict[str, str]: """Get resolved template. Returns: Dict[str, str]: Resolved template """ return self.send_message_json( "GET", f"Get resolved template {self.artifact_name} for " f"{self.blueprint.metadata.template_name} version " f"{self.blueprint.metadata.template_version}", self.resolved_template_url, auth=self.auth ) def store_resolved_template(self, resolved_template: str) -> None: """Store resolved template. Args: resolved_template (str): Template to store Raises: ParameterError: To store template it's needed to pass artifact name and: - resolution key, or - resource type and resource id. If not all needed parameters are given that exception will be raised. """ if self.artifact_name and self.resolution_key: return self.store_resolved_template_with_resolution_key(resolved_template) if self.artifact_name and self.resource_type and self.resource_id: return self.store_resolved_template_with_resource_type_and_id(resolved_template) raise ParameterError("To store template artifact name with resolution key or both " "resource type and id is needed") def store_resolved_template_with_resolution_key(self, resolved_template: str) -> None: """Store template using resolution key. Args: resolved_template (str): Template to store """ return self.send_message( "POST", f"Store resolved template {self.artifact_name} for " f"{self.blueprint.metadata.template_name} version " f"{self.blueprint.metadata.template_version}", f"{self.url}/{self.blueprint.metadata.template_name}/" f"{self.blueprint.metadata.template_version}/{self.artifact_name}/" f"{self.resolution_key}", auth=self.auth, data=resolved_template ) def store_resolved_template_with_resource_type_and_id(self, resolved_template: str) -> None: """Store template using resource type and resource ID. Args: resolved_template (str): Template to store """ return self.send_message( "POST", f"Store resolved template {self.artifact_name} for " f"{self.blueprint.metadata.template_name} version " f"{self.blueprint.metadata.template_version}", f"{self.url}/{self.blueprint.metadata.template_name}/" f"{self.blueprint.metadata.template_version}/{self.artifact_name}/" f"{self.resource_type}/{self.resource_id}", auth=self.auth, data=resolved_template ) class Blueprint(CdsElement): """CDS blueprint representation.""" TEMPLATES_RE = r"Templates\/.*json$" TOSCA_META = "TOSCA-Metadata/TOSCA.meta" def __init__(self, cba_file_bytes: bytes) -> None: """Blueprint initialization. Save blueprint zip file bytes. You can create that object using opened file or bytes: blueprint = Blueprint(open("path/to/CBA.zip", "rb")) or with open("path/to/CBA.zip", "rb") as cba: blueprint = Blueprint(cba.read()) It is even better to use second example due to CBA file will be correctly closed for sure. Args: cba_file_bytes (bytes): CBA ZIP file bytes """ super().__init__() self.cba_file_bytes: bytes = cba_file_bytes self._cba_metadata: CbaMetadata = None self._cba_mappings: MappingSet = None self._cba_workflows: List[Workflow] = None @property def url(self) -> str: """URL address to use for CDS API call. Returns: str: URL to CDS blueprintprocessor. """ return f"{self._url}/api/v1/blueprint-model" @property def metadata(self) -> CbaMetadata: """Blueprint metadata. Data from TOSCA.meta file. Returns: CbaMetadata: Blueprint metadata object. """ if not self._cba_metadata: with ZipFile(BytesIO(self.cba_file_bytes)) as cba_zip_file: self._cba_metadata = self.get_cba_metadata(cba_zip_file.read(self.TOSCA_META)) return self._cba_metadata @property def mappings(self) -> MappingSet: """Blueprint mappings collection. Returns: MappingSet: Mappings collection. """ if not self._cba_mappings: with ZipFile(BytesIO(self.cba_file_bytes)) as cba_zip_file: self._cba_mappings = self.get_mappings(cba_zip_file) return self._cba_mappings @property def workflows(self) -> List["Workflow"]: """Blueprint's workflows property. Returns: List[Workflow]: Blueprint's workflow list. """ if not self._cba_workflows: with ZipFile(BytesIO(self.cba_file_bytes)) as cba_zip_file: self._cba_workflows = list( self.get_workflows(cba_zip_file.read(self.metadata.entry_definitions)) ) return self._cba_workflows @classmethod def load_from_file(cls, cba_file_path: str) -> "Blueprint": """Load blueprint from file. Raises: FileError: File to load blueprint from doesn't exist. Returns: Blueprint: Blueprint object """ try: with open(cba_file_path, "rb") as cba_file: return Blueprint(cba_file.read()) except FileNotFoundError as exc: msg = "The requested file with a blueprint doesn't exist." raise FileError(msg) from exc def enrich(self) -> "Blueprint": """Call CDS API to get enriched blueprint file. Returns: Blueprint: Enriched blueprint object """ response: "requests.Response" = self.send_message( "POST", "Enrich CDS blueprint", f"{self.url}/enrich", files={"file": self.cba_file_bytes}, headers={}, # Leave headers empty to fill it correctly by `requests` library auth=self.auth ) return Blueprint(response.content) def publish(self) -> None: """Publish blueprint.""" self.send_message( "POST", "Publish CDS blueprint", f"{self.url}/publish", files={"file": self.cba_file_bytes}, headers={}, # Leave headers empty to fill it correctly by `requests` library auth=self.auth ) def deploy(self) -> None: """Deploy blueprint.""" self.send_message( "POST", "Deploy CDS blueprint", f"{self.url}", files={"file": self.cba_file_bytes}, headers={}, # Leave headers empty to fill it correctly by `requests` library auth=self.auth ) def save(self, dest_file_path: str) -> None: """Save blueprint to file. Args: dest_file_path (str): Path of file where blueprint is going to be saved """ with open(dest_file_path, "wb") as cba_file: cba_file.write(self.cba_file_bytes) def get_data_dictionaries(self) -> DataDictionarySet: """Get the generated data dictionaries required by blueprint. If mapping reqires other source than input it should be updated before upload to CDS. Returns: Generator[DataDictionary, None, None]: DataDictionary objects. """ dd_set: DataDictionarySet = DataDictionarySet() for mapping in self.mappings: dd_set.add(DataDictionary(mapping.generate_data_dictionary())) return dd_set @staticmethod def get_cba_metadata(cba_tosca_meta_bytes: bytes) -> CbaMetadata: """Parse CBA TOSCA.meta file and get values from it. Args: cba_tosca_meta_bytes (bytes): TOSCA.meta file bytes. Raises: ValidationError: TOSCA Meta file has invalid format. Returns: CbaMetadata: Dataclass with CBA metadata """ meta_dict: dict = yaml.safe_load(cba_tosca_meta_bytes) if not isinstance(meta_dict, dict): raise ValidationError("Invalid TOSCA Meta file") return CbaMetadata( tosca_meta_file_version=meta_dict.get("TOSCA-Meta-File-Version"), csar_version=meta_dict.get("CSAR-Version"), created_by=meta_dict.get("Created-By"), entry_definitions=meta_dict.get("Entry-Definitions"), template_name=meta_dict.get("Template-Name"), template_version=meta_dict.get("Template-Version"), template_tags=meta_dict.get("Template-Tags"), ) @staticmethod def get_mappings_from_mapping_file(cba_mapping_file_bytes: bytes ) -> Generator[Mapping, None, None]: """Read mapping file and create Mappping object for it. Args: cba_mapping_file_bytes (bytes): CBA mapping file bytes. Yields: Generator[Mapping, None, None]: Mapping object. """ mapping_file_json = json.loads(cba_mapping_file_bytes) for mapping in mapping_file_json: yield Mapping( name=mapping["name"], mapping_type=mapping["property"]["type"], dictionary_name=mapping["dictionary-name"], dictionary_sources=[mapping["dictionary-source"]], ) def get_mappings(self, cba_zip_file: ZipFile) -> MappingSet: """Read mappings from CBA file. Search mappings in CBA file and create Mapping object for each of them. Args: cba_zip_file (ZipFile): CBA file to get mappings from. Returns: MappingSet: Mappings set object. """ mapping_set = MappingSet() for info in cba_zip_file.infolist(): if re.match(self.TEMPLATES_RE, info.filename): mapping_set.extend( self.get_mappings_from_mapping_file(cba_zip_file.read(info.filename)) ) return mapping_set def get_workflows(self, cba_entry_definitions_file_bytes: bytes) -> Generator[Workflow, None, None]: """Get worfklows from entry_definitions file. Parse entry_definitions file and create Workflow objects for workflows stored in. Args: cba_entry_definitions_file_bytes (bytes): entry_definition file. Yields: Generator[Workflow, None, None]: Workflow object. """ entry_definitions_json: dict = json.loads(cba_entry_definitions_file_bytes) workflows: dict = entry_definitions_json.get("topology_template", {}).get("workflows", {}) for workflow_name, workflow_data in workflows.items(): yield Workflow(workflow_name, workflow_data, self) def get_workflow_by_name(self, workflow_name: str) -> Workflow: """Get workflow by name. If there is no workflow with given name `ParameterError` is going to be raised. Args: workflow_name (str): Name of the workflow Returns: Workflow: Workflow with given name """ try: return next(filter(lambda workflow: workflow.name == workflow_name, self.workflows)) except StopIteration: raise ParameterError("Workflow with given name does not exist") def get_resolved_template(self, # pylint: disable=too-many-arguments artifact_name: str, resolution_key: Optional[str] = None, resource_type: Optional[str] = None, resource_id: Optional[str] = None, occurrence: Optional[str] = None) -> Dict[str, str]: """Get resolved template for Blueprint. Args: artifact_name (str): Resolved template's artifact name resolution_key (Optional[str], optional): Resolved template's resolution key. Defaults to None. resource_type (Optional[str], optional): Resolved template's resource type. Defaults to None. resource_id (Optional[str], optional): Resolved template's resource ID. Defaults to None. occurrence: (Optional[str], optional): Resolved template's occurrence value. Defaults to None. Returns: Dict[str, str]: Resolved template """ return ResolvedTemplate(blueprint=self, artifact_name=artifact_name, resolution_key=resolution_key, resource_type=resource_type, resource_id=resource_id, occurrence=occurrence).get_resolved_template() def store_resolved_template(self, # pylint: disable=too-many-arguments artifact_name: str, data: str, resolution_key: Optional[str] = None, resource_type: Optional[str] = None, resource_id: Optional[str] = None) -> None: """Store resolved template for Blueprint. Args: artifact_name (str): Resolved template's artifact name data (str): Resolved template resolution_key (Optional[str], optional): Resolved template's resolution key. Defaults to None. resource_type (Optional[str], optional): Resolved template's resource type. Defaults to None. resource_id (Optional[str], optional): Resolved template's resource ID. Defaults to None. """ ResolvedTemplate(blueprint=self, artifact_name=artifact_name, resolution_key=resolution_key, resource_type=resource_type, resource_id=resource_id).store_resolved_template(data)