aboutsummaryrefslogtreecommitdiffstats
path: root/ice_validator/preload
diff options
context:
space:
mode:
Diffstat (limited to 'ice_validator/preload')
-rw-r--r--ice_validator/preload/__init__.py36
-rw-r--r--ice_validator/preload/environment.py267
-rw-r--r--ice_validator/preload/generator.py242
-rw-r--r--ice_validator/preload/model.py437
4 files changed, 982 insertions, 0 deletions
diff --git a/ice_validator/preload/__init__.py b/ice_validator/preload/__init__.py
new file mode 100644
index 0000000..70f9ecb
--- /dev/null
+++ b/ice_validator/preload/__init__.py
@@ -0,0 +1,36 @@
+# -*- coding: utf8 -*-
+# ============LICENSE_START====================================================
+# org.onap.vvp/validation-scripts
+# ===================================================================
+# Copyright © 2019 AT&T Intellectual Property. All rights reserved.
+# ===================================================================
+#
+# Unless otherwise specified, all software contained herein is licensed
+# under the Apache License, Version 2.0 (the "License");
+# you may not use this software except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+#
+# Unless otherwise specified, all documentation contained herein is licensed
+# under the Creative Commons License, Attribution 4.0 Intl. (the "License");
+# you may not use this documentation except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# https://creativecommons.org/licenses/by/4.0/
+#
+# Unless required by applicable law or agreed to in writing, documentation
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# ============LICENSE_END============================================
diff --git a/ice_validator/preload/environment.py b/ice_validator/preload/environment.py
new file mode 100644
index 0000000..c0f357a
--- /dev/null
+++ b/ice_validator/preload/environment.py
@@ -0,0 +1,267 @@
+import re
+import tempfile
+from pathlib import Path
+
+from cached_property import cached_property
+
+from tests.helpers import check, first, unzip, load_yaml
+
+SERVICE_TEMPLATE_PATTERN = re.compile(r".*service-.*?-template.yml")
+RESOURCE_TEMPLATE_PATTERN = re.compile(r".*resource-(.*?)-template.yml")
+
+
+def yaml_files(path):
+ """
+ Return files that are YAML (end with .yml or .yaml)
+
+ :param path: Directory path object
+ :return: list of paths to YAML files
+ """
+ return [
+ p
+ for p in path.iterdir()
+ if p.is_file() and p.suffix.lower() in (".yml", ".yaml")
+ ]
+
+
+class CloudServiceArchive:
+ """
+ Wrapper to extract information from a CSAR file.
+ """
+
+ def __init__(self, csar_path):
+ self.csar_path = Path(csar_path)
+ with tempfile.TemporaryDirectory() as csar_dir:
+ csar_dir = Path(csar_dir)
+ unzip(self.csar_path, csar_dir)
+ self._service = self._get_service_template(csar_dir)
+ self._resources = self._get_vf_module_resource_templates(csar_dir)
+
+ def get_vf_module(self, vf_module):
+ """
+ Retrieve the VF Module definition from the CSAR for the given heat
+ module name (should not include the file extension - ex: base)
+
+ :param vf_module: name of Heat module (no path or file extension)
+ :return: The definition of the module as a dict or None if not found
+ """
+ groups = self._service.get("topology_template", {}).get("groups", {})
+ for props in groups.values():
+ module_label = props.get("properties", {}).get("vf_module_label", "")
+ if module_label.lower() == vf_module.lower():
+ return props
+ return None
+
+ def get_vf_module_model_name(self, vf_module):
+ """
+ Retrieves the vfModuleModelName of the module or None if vf_module is not
+ found (see get_vf_module)
+
+ :param vf_module: name of Heat module (no path or file extension)
+ :return: The value if vfModuleModelName as string or None if not found
+ """
+ module = self.get_vf_module(vf_module)
+ return module.get("metadata", {}).get("vfModuleModelName") if module else None
+
+ @property
+ def topology_template(self):
+ """
+ Return dict representing the topology_template node of the service
+ template
+ """
+ return self._service.get("topology_template") or {}
+
+ @property
+ def groups(self):
+ """
+ Return dict representing the groups node of the service
+ template
+ """
+ return self.topology_template.get("groups") or {}
+
+ @property
+ def vf_modules(self):
+ """
+ Returns mapping of group ID to VfModule present in the service template
+ """
+ return {
+ group_id: props
+ for group_id, props in self.groups.items()
+ if props.get("type") == "org.openecomp.groups.VfModule"
+ }
+
+ @property
+ def vf_module_resource_names(self):
+ """
+ Returns the resource names for all VfModules (these can be used
+ to find the resource templates as they will be part of the filename)
+ """
+ names = (
+ module.get("metadata", {}).get("vfModuleModelName")
+ for module in self.vf_modules.values()
+ )
+ return [name.split(".")[0] for name in names if name]
+
+ def get_vf_module_resource_name(self, vf_module):
+ """
+ Retrieves the resource name of the module or None if vf_module is not
+ found (see get_vf_module)
+
+ :param vf_module: name of Heat module (no path or file extension)
+ :return: The value if resource nae as string or None if not found
+ """
+ vf_model_name = self.get_vf_module_model_name(vf_module)
+ if not vf_model_name:
+ return None
+ resource_name = vf_model_name.split(".")[0]
+ resource = self._resources.get(resource_name, {})
+ return resource.get("metadata", {}).get("name")
+
+ @staticmethod
+ def _get_definition_files(csar_dir):
+ """
+ Returns a list of all files in the CSAR's Definitions directory
+ """
+ def_dir = csar_dir / "Definitions"
+ check(
+ def_dir.exists(),
+ f"CSAR is invalid. {csar_dir.as_posix()} does not contain a "
+ f"Definitions directory.",
+ )
+ return yaml_files(def_dir)
+
+ def _get_service_template(self, csar_dir):
+ """
+ Returns the service template as a dict. Assumes there is only one.
+ """
+ files = map(str, self._get_definition_files(csar_dir))
+ service_template = first(files, SERVICE_TEMPLATE_PATTERN.match)
+ return load_yaml(service_template) if service_template else {}
+
+ def _get_vf_module_resource_templates(self, csar_dir):
+ """
+ Returns a mapping of resource name to resource definition (as a dict)
+ (Only loads resource templates that correspond to VF Modules
+ """
+ def_dir = csar_dir / "Definitions"
+ mapping = (
+ (name, def_dir / "resource-{}-template.yml".format(name))
+ for name in self.vf_module_resource_names
+ )
+ return {name: load_yaml(path) for name, path in mapping if path.exists()}
+
+ @property
+ def service_name(self):
+ """
+ Name of the service (extracted from the service template
+ """
+ return self._service.get("metadata", {}).get("name")
+
+ def __repr__(self):
+ return f"CSAR (path={self.csar_path.name}, name={self.service_name})"
+
+ def __str__(self):
+ return repr(self)
+
+
+class PreloadEnvironment:
+ """
+ A
+ """
+
+ def __init__(self, env_dir, parent=None):
+ self.base_dir = Path(env_dir)
+ self.parent = parent
+ self._modules = self._load_modules()
+ self._sub_env = self._load_envs()
+ self._defaults = self._load_defaults()
+
+ def _load_defaults(self):
+ defaults = self.base_dir / "defaults.yaml"
+ return load_yaml(defaults) if defaults.exists() else {}
+
+ def _load_modules(self):
+ files = [
+ p
+ for p in self.base_dir.iterdir()
+ if p.is_file() and p.suffix.lower().endswith(".env")
+ ]
+ return {f.name.lower(): load_yaml(f).get("parameters", {}) for f in files}
+
+ def _load_envs(self):
+ env_dirs = [
+ p for p in self.base_dir.iterdir() if p.is_dir() and p.name != "preloads"
+ ]
+ return {d.name: PreloadEnvironment(d, self) for d in env_dirs}
+
+ @cached_property
+ def csar(self):
+ csar_path = first(self.base_dir.iterdir(), lambda p: p.suffix == ".csar")
+ if csar_path:
+ return CloudServiceArchive(csar_path)
+ else:
+ return self.parent.csar if self.parent else None
+
+ @property
+ def defaults(self):
+ result = {}
+ if self.parent:
+ result.update(self.parent.defaults)
+ result.update(self._defaults)
+ return result
+
+ @property
+ def environments(self):
+ all_envs = [self]
+ for env in self._sub_env.values():
+ all_envs.append(env)
+ all_envs.extend(env.environments)
+ return [e for e in all_envs if e.is_leaf]
+
+ def get_module(self, name):
+ name = name if name.lower().endswith(".env") else f"{name}.env".lower()
+ if name not in self.module_names:
+ return {}
+ result = {}
+ parent_module = self.parent.get_module(name) if self.parent else None
+ module = self._modules.get(name)
+ for m in (parent_module, self.defaults, module):
+ if m:
+ result.update(m)
+ return result
+
+ @property
+ def module_names(self):
+ parent_modules = self.parent.module_names if self.parent else set()
+ result = set()
+ result.update(self._modules.keys())
+ result.update(parent_modules)
+ return result
+
+ @property
+ def modules(self):
+ return {name: self.get_module(name) for name in self.module_names}
+
+ def get_environment(self, env_name):
+ for name, env in self._sub_env.items():
+ if name == env_name:
+ return env
+ result = env.get_environment(env_name)
+ if result:
+ return result
+ return None
+
+ @property
+ def is_base(self):
+ return self.parent is None
+
+ @property
+ def is_leaf(self):
+ return not self._sub_env
+
+ @property
+ def name(self):
+ return self.base_dir.name
+
+ def __repr__(self):
+ return f"PreloadEnvironment(name={self.name})"
diff --git a/ice_validator/preload/generator.py b/ice_validator/preload/generator.py
new file mode 100644
index 0000000..38a051d
--- /dev/null
+++ b/ice_validator/preload/generator.py
@@ -0,0 +1,242 @@
+# -*- coding: utf8 -*-
+# ============LICENSE_START====================================================
+# org.onap.vvp/validation-scripts
+# ===================================================================
+# Copyright © 2019 AT&T Intellectual Property. All rights reserved.
+# ===================================================================
+#
+# Unless otherwise specified, all software contained herein is licensed
+# under the Apache License, Version 2.0 (the "License");
+# you may not use this software except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+#
+# Unless otherwise specified, all documentation contained herein is licensed
+# under the Creative Commons License, Attribution 4.0 Intl. (the "License");
+# you may not use this documentation except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# https://creativecommons.org/licenses/by/4.0/
+#
+# Unless required by applicable law or agreed to in writing, documentation
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# ============LICENSE_END============================================
+
+import json
+import os
+from abc import ABC, abstractmethod
+
+import yaml
+
+
+def get_json_template(template_dir, template_name):
+ template_name = template_name + ".json"
+ with open(os.path.join(template_dir, template_name)) as f:
+ return json.loads(f.read())
+
+
+def get_or_create_template(template_dir, key, value, sequence, template_name):
+ """
+ Search a sequence of dicts where a given key matches value. If
+ found, then it returns that item. If not, then it loads the
+ template identified by template_name, adds it ot the sequence, and
+ returns the template
+ """
+ for item in sequence:
+ if item[key] == value:
+ return item
+ new_template = get_json_template(template_dir, template_name)
+ sequence.append(new_template)
+ return new_template
+
+
+def yield_by_count(sequence):
+ """
+ Iterates through sequence and yields each item according to its __count__
+ attribute. If an item has a __count__ of it will be returned 3 times
+ before advancing to the next item in the sequence.
+
+ :param sequence: sequence of dicts (must contain __count__)
+ :returns: generator of tuple key, value pairs
+ """
+ for key, value in sequence.items():
+ for i in range(value["__count__"]):
+ yield (key, value)
+
+
+def replace(param):
+ """
+ Optionally used by the preload generator to wrap items in the preload
+ that need to be replaced by end users
+ :param param: p
+ """
+ return "VALUE FOR: {}".format(param) if param else ""
+
+
+class AbstractPreloadGenerator(ABC):
+ """
+ All preload generators must inherit from this class and implement the
+ abstract methods.
+
+ Preload generators are automatically discovered at runtime via a plugin
+ architecture. The system path is scanned looking for modules with the name
+ preload_*, then all non-abstract classes that inherit from AbstractPreloadGenerator
+ are registered as preload plugins
+
+ Attributes:
+ :param vnf: Instance of Vnf that contains the preload data
+ :param base_output_dir: Base directory to house the preloads. All preloads
+ must be written to a subdirectory under this directory
+ """
+
+ def __init__(self, vnf, base_output_dir, preload_env):
+ self.preload_env = preload_env
+ self.vnf = vnf
+ self.current_module = None
+ self.current_module_env = {}
+ self.base_output_dir = base_output_dir
+ self.env_cache = {}
+
+ @classmethod
+ @abstractmethod
+ def format_name(cls):
+ """
+ String name to identify the format (ex: VN-API, GR-API)
+ """
+ raise NotImplementedError()
+
+ @classmethod
+ @abstractmethod
+ def output_sub_dir(cls):
+ """
+ String sub-directory name that will appear under ``base_output_dir``
+ """
+ raise NotImplementedError()
+
+ @classmethod
+ @abstractmethod
+ def supports_output_passing(cls):
+ """
+ Some preload methods allow automatically mapping output parameters in the
+ base module to the input parameter of other modules. This means these
+ that the incremental modules do not need these base module outputs in their
+ preloads.
+
+ At this time, VNF-API does not support output parameter passing, but
+ GR-API does.
+
+ If this is true, then the generator will call Vnf#filter_output_params
+ after the preload module for the base module has been created
+ """
+ raise NotImplementedError()
+
+ @abstractmethod
+ def generate_module(self, module, output_dir):
+ """
+ Create the preloads and write them to ``output_dir``. This
+ method is responsible for generating the content of the preload and
+ writing the file to disk.
+ """
+ raise NotImplementedError()
+
+ def generate(self):
+ # handle the base module first
+ print("\nGenerating {} preloads".format(self.format_name()))
+ self.generate_environments(self.vnf.base_module)
+ if self.supports_output_passing():
+ self.vnf.filter_base_outputs()
+ for mod in self.vnf.incremental_modules:
+ self.generate_environments(mod)
+
+ def replace(self, param_name, alt_message=None, single=False):
+ value = self.get_param(param_name, single)
+ if value:
+ return value
+ return alt_message or replace(param_name)
+
+ def generate_environments(self, module):
+ """
+ Generate a preload for the given module in all available environments
+ in the ``self.preload_env``. This will invoke the abstract
+ generate_module once for each available environment **and** an
+ empty environment to create a blank template.
+
+ :param module: module to generate for
+ """
+ print("\nGenerating Preloads for {}".format(module))
+ print("-" * 50)
+ print("... generating blank template")
+ self.current_module = module
+ self.current_module_env = {}
+ self.env_cache = {}
+ blank_preload_dir = self.make_preload_dir(self.base_output_dir)
+ self.generate_module(module, blank_preload_dir)
+ self.generate_preload_env(module, blank_preload_dir)
+ if self.preload_env:
+ for env in self.preload_env.environments:
+ output_dir = self.make_preload_dir(env.base_dir / "preloads")
+ print(
+ "... generating preload for env ({}) to {}".format(
+ env.name, output_dir
+ )
+ )
+ self.env_cache = {}
+ self.current_module = module
+ self.current_module_env = env.get_module(module.label)
+ self.generate_module(module, output_dir)
+ self.current_module = None
+ self.current_module_env = None
+
+ def make_preload_dir(self, base_dir):
+ path = os.path.join(base_dir, self.output_sub_dir())
+ if not os.path.exists(path):
+ os.makedirs(path, exist_ok=True)
+ return path
+
+ def generate_preload_env(self, module, blank_preload_dir):
+ """
+ Create a .env template suitable for completing and using for
+ preload generation from env files.
+ """
+ output_dir = os.path.join(blank_preload_dir, "preload_env")
+ output_file = os.path.join(output_dir, "{}.env".format(module.vnf_name))
+ if not os.path.exists(output_dir):
+ os.makedirs(output_dir, exist_ok=True)
+ with open(output_file, "w") as f:
+ yaml.dump(module.env_template, f)
+
+ def get_param(self, param_name, single):
+ """
+ Retrieves the value for the given param if it exists. If requesting a
+ single item, and the parameter is tied to a list then only one item from
+ the list will be returned. For each subsequent call with the same parameter
+ it will iterate/rotate through the values in that list. If single is False
+ then the full list will be returned.
+
+ :param param_name: name of the parameter
+ :param single: If True returns single value from lists otherwises the full
+ list. This has no effect on non-list values
+ """
+ value = self.env_cache.get(param_name)
+ if not value:
+ value = self.current_module_env.get(param_name)
+ if isinstance(value, list):
+ value.reverse()
+ self.env_cache[param_name] = value
+ if value and single and isinstance(value, list):
+ return value.pop()
+ else:
+ return value
diff --git a/ice_validator/preload/model.py b/ice_validator/preload/model.py
new file mode 100644
index 0000000..e37c914
--- /dev/null
+++ b/ice_validator/preload/model.py
@@ -0,0 +1,437 @@
+# -*- coding: utf8 -*-
+# ============LICENSE_START====================================================
+# org.onap.vvp/validation-scripts
+# ===================================================================
+# Copyright © 2019 AT&T Intellectual Property. All rights reserved.
+# ===================================================================
+#
+# Unless otherwise specified, all software contained herein is licensed
+# under the Apache License, Version 2.0 (the "License");
+# you may not use this software except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+#
+#
+# Unless otherwise specified, all documentation contained herein is licensed
+# under the Creative Commons License, Attribution 4.0 Intl. (the "License");
+# you may not use this documentation except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# https://creativecommons.org/licenses/by/4.0/
+#
+# Unless required by applicable law or agreed to in writing, documentation
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# ============LICENSE_END============================================
+import os
+import shutil
+from abc import ABC, abstractmethod
+
+from preload.generator import yield_by_count
+from preload.environment import PreloadEnvironment
+from tests.helpers import (
+ get_param,
+ get_environment_pair,
+ prop_iterator,
+ get_output_dir,
+ is_base_module,
+ remove,
+)
+from tests.parametrizers import parametrize_heat_templates
+from tests.structures import NeutronPortProcessor, Heat
+from tests.test_environment_file_parameters import get_preload_excluded_parameters
+from tests.utils import nested_dict
+from tests.utils.vm_types import get_vm_type_for_nova_server
+from config import Config, get_generator_plugins
+
+CHANGE = "CHANGEME"
+
+
+# This is only used to fake out parametrizers
+class DummyMetafunc:
+ def __init__(self, config):
+ self.inputs = {}
+ self.config = config
+
+ def parametrize(self, name, file_list):
+ self.inputs[name] = file_list
+
+
+def get_heat_templates(config):
+ """
+ Returns the Heat template paths discovered by the pytest parameterizers
+ :param config: pytest config
+ :return: list of heat template paths
+ """
+ meta = DummyMetafunc(config)
+ parametrize_heat_templates(meta)
+ heat_templates = meta.inputs.get("heat_templates", [])
+ if isinstance(heat_templates, list) and len(heat_templates) > 0:
+ heat_templates = heat_templates[0]
+ else:
+ return
+ return heat_templates
+
+
+class FilterBaseOutputs(ABC):
+ """
+ Invoked to remove parameters in an object that appear in the base module.
+ Base output parameters can be passed to incremental modules
+ so they do not need to be defined in a preload. This method can be
+ invoked on a module to pre-filter the parameters before a preload is
+ created.
+
+ The method should remove the parameters that exist in the base module from
+ both itself and any sub-objects.
+ """
+
+ @abstractmethod
+ def filter_output_params(self, base_outputs):
+ raise NotImplementedError()
+
+
+class IpParam:
+ def __init__(self, ip_addr_param, port):
+ self.param = ip_addr_param or ""
+ self.port = port
+
+ @property
+ def ip_version(self):
+ return 6 if "_v6_" in self.param else 4
+
+ def __hash__(self):
+ return hash(self.param)
+
+ def __eq__(self, other):
+ return hash(self) == hash(other)
+
+ def __str__(self):
+ return "{}(v{})".format(self.param, self.ip_version)
+
+ def __repr(self):
+ return str(self)
+
+
+class Network(FilterBaseOutputs):
+ def __init__(self, role, name_param):
+ self.network_role = role
+ self.name_param = name_param
+ self.subnet_params = set()
+
+ def filter_output_params(self, base_outputs):
+ self.subnet_params = remove(self.subnet_params, base_outputs)
+
+ def __hash__(self):
+ return hash(self.network_role)
+
+ def __eq__(self, other):
+ return hash(self) == hash(other)
+
+
+class Port(FilterBaseOutputs):
+ def __init__(self, vm, network):
+ self.vm = vm
+ self.network = network
+ self.fixed_ips = []
+ self.floating_ips = []
+ self.uses_dhcp = True
+
+ def add_ips(self, props):
+ props = props.get("properties") or props
+ for fixed_ip in props.get("fixed_ips") or []:
+ if not isinstance(fixed_ip, dict):
+ continue
+ ip_address = get_param(fixed_ip.get("ip_address"))
+ subnet = get_param(fixed_ip.get("subnet") or fixed_ip.get("subnet_id"))
+ if ip_address:
+ self.uses_dhcp = False
+ self.fixed_ips.append(IpParam(ip_address, self))
+ if subnet:
+ self.network.subnet_params.add(subnet)
+ for ip in prop_iterator(props, "allowed_address_pairs", "ip_address"):
+ self.uses_dhcp = False
+ param = get_param(ip) if ip else ""
+ if param:
+ self.floating_ips.append(IpParam(param, self))
+
+ def filter_output_params(self, base_outputs):
+ self.fixed_ips = remove(self.fixed_ips, base_outputs, key=lambda ip: ip.param)
+ self.floating_ips = remove(
+ self.floating_ips, base_outputs, key=lambda ip: ip.param
+ )
+
+
+class VirtualMachineType(FilterBaseOutputs):
+ def __init__(self, vm_type, vnf_module):
+ self.vm_type = vm_type
+ self.names = []
+ self.ports = []
+ self.vm_count = 0
+ self.vnf_module = vnf_module
+
+ def filter_output_params(self, base_outputs):
+ self.names = remove(self.names, base_outputs)
+ for port in self.ports:
+ port.filter_output_params(base_outputs)
+
+ @property
+ def networks(self):
+ return {port.network for port in self.ports}
+
+ @property
+ def floating_ips(self):
+ for port in self.ports:
+ for ip in port.floating_ips:
+ yield ip
+
+ @property
+ def fixed_ips(self):
+ for port in self.ports:
+ for ip in port.fixed_ips:
+ yield ip
+
+ def update_ports(self, network, props):
+ port = self.get_or_create_port(network)
+ port.add_ips(props)
+
+ def get_or_create_port(self, network):
+ for port in self.ports:
+ if port.network == network:
+ return port
+ port = Port(self, network)
+ self.ports.append(port)
+ return port
+
+
+class Vnf:
+ def __init__(self, templates):
+ self.modules = [VnfModule(t, self) for t in templates]
+ self.uses_contrail = self._uses_contrail()
+ self.base_module = next(
+ (mod for mod in self.modules if mod.is_base_module), None
+ )
+ self.incremental_modules = [m for m in self.modules if not m.is_base_module]
+
+ def _uses_contrail(self):
+ for mod in self.modules:
+ resources = mod.heat.get_all_resources()
+ types = (r.get("type", "") for r in resources.values())
+ if any(t.startswith("OS::ContrailV2") for t in types):
+ return True
+ return False
+
+ @property
+ def base_output_params(self):
+ return self.base_module.heat.outputs
+
+ def filter_base_outputs(self):
+ non_base_modules = (m for m in self.modules if not m.is_base_module)
+ for mod in non_base_modules:
+ mod.filter_output_params(self.base_output_params)
+
+
+def env_path(heat_path):
+ """
+ Create the path to the env file for the give heat path.
+ :param heat_path: path to heat file
+ :return: path to env file (assumes it is present and named correctly)
+ """
+ base_path = os.path.splitext(heat_path)[0]
+ return "{}.env".format(base_path)
+
+
+class VnfModule(FilterBaseOutputs):
+ def __init__(self, template_file, vnf):
+ self.vnf = vnf
+ self.vnf_name = os.path.splitext(os.path.basename(template_file))[0]
+ self.template_file = template_file
+ self.heat = Heat(filepath=template_file, envpath=env_path(template_file))
+ env_pair = get_environment_pair(self.template_file)
+ env_yaml = env_pair.get("eyml") if env_pair else {}
+ self.parameters = env_yaml.get("parameters") or {}
+ self.networks = []
+ self.virtual_machine_types = self._create_vm_types()
+ self._add_networks()
+ self.outputs_filtered = False
+
+ def filter_output_params(self, base_outputs):
+ for vm in self.virtual_machine_types:
+ vm.filter_output_params(base_outputs)
+ for network in self.networks:
+ network.filter_output_params(base_outputs)
+ self.parameters = {
+ k: v for k, v in self.parameters.items() if k not in base_outputs
+ }
+ self.networks = [
+ network
+ for network in self.networks
+ if network.name_param not in base_outputs or network.subnet_params
+ ]
+ self.outputs_filtered = True
+
+ def _create_vm_types(self):
+ servers = self.heat.get_resource_by_type("OS::Nova::Server", all_resources=True)
+ vm_types = {}
+ for _, props in yield_by_count(servers):
+ vm_type = get_vm_type_for_nova_server(props)
+ vm = vm_types.setdefault(vm_type, VirtualMachineType(vm_type, self))
+ vm.vm_count += 1
+ name = nested_dict.get(props, "properties", "name", default={})
+ vm_name = get_param(name) if name else ""
+ vm.names.append(vm_name)
+ return list(vm_types.values())
+
+ def _add_networks(self):
+ ports = self.heat.get_resource_by_type("OS::Neutron::Port", all_resources=True)
+ for rid, props in yield_by_count(ports):
+ resource_type, port_match = NeutronPortProcessor.get_rid_match_tuple(rid)
+ if resource_type != "external":
+ continue
+ network_role = port_match.group("network_role")
+ vm = self._get_vm_type(port_match.group("vm_type"))
+ network = self._get_network(network_role, props)
+ vm.update_ports(network, props)
+
+ @property
+ def is_base_module(self):
+ return is_base_module(self.template_file)
+
+ @property
+ def availability_zones(self):
+ """Returns a list of all availability zone parameters found in the template"""
+ return sorted(
+ p for p in self.heat.parameters if p.startswith("availability_zone")
+ )
+
+ @property
+ def label(self):
+ """
+ Label for the VF module that will appear in the CSAR
+ """
+ return self.vnf_name
+
+ @property
+ def env_specs(self):
+ """Return available Environment Spec definitions"""
+ return Config().env_specs
+
+ @property
+ def env_template(self):
+ """
+ Returns a a template .env file that can be completed to enable
+ preload generation.
+ """
+ params = {}
+ params["vnf-name"] = CHANGE
+ params["vnf-type"] = CHANGE
+ params["vf-module-model-name"] = CHANGE
+ params["vf_module_name"] = CHANGE
+ for network in self.networks:
+ params[network.name_param] = CHANGE
+ for param in set(network.subnet_params):
+ params[param] = CHANGE
+ for vm in self.virtual_machine_types:
+ for name in set(vm.names):
+ params[name] = CHANGE
+ for ip in vm.floating_ips:
+ params[ip.param] = CHANGE
+ for ip in vm.fixed_ips:
+ params[ip.param] = CHANGE
+ excluded = get_preload_excluded_parameters(
+ self.template_file, persistent_only=True
+ )
+ for name, value in self.parameters.items():
+ if name in excluded:
+ continue
+ params[name] = value
+ return {"parameters": params}
+
+ @property
+ def preload_parameters(self):
+ """
+ Subset of parameters from the env file that can be overridden in
+ tag values. Per VNF Heat Guidelines, specific parameters such as
+ flavor, image, etc. must not be overridden so they are excluded.
+
+ :return: dict of parameters suitable for the preload
+ """
+ excluded = get_preload_excluded_parameters(self.template_file)
+ return {k: v for k, v in self.parameters.items() if k not in excluded}
+
+ def _get_vm_type(self, vm_type):
+ for vm in self.virtual_machine_types:
+ if vm_type.lower() == vm.vm_type.lower():
+ return vm
+ raise RuntimeError("Encountered unknown VM type: {}".format(vm_type))
+
+ def _get_network(self, network_role, props):
+ network_prop = nested_dict.get(props, "properties", "network") or {}
+ name_param = get_param(network_prop) if network_prop else ""
+ for network in self.networks:
+ if network.network_role.lower() == network_role.lower():
+ return network
+ new_network = Network(network_role, name_param)
+ self.networks.append(new_network)
+ return new_network
+
+ def __str__(self):
+ return "VNF Module ({})".format(os.path.basename(self.template_file))
+
+ def __repr__(self):
+ return str(self)
+
+ def __hash__(self):
+ return hash(self.vnf_name)
+
+ def __eq__(self, other):
+ return hash(self) == hash(other)
+
+
+def create_preloads(config, exitstatus):
+ """
+ Create preloads in every format that can be discovered by get_generator_plugins
+ """
+ if config.getoption("self_test"):
+ return
+ print("+===================================================================+")
+ print("| Preload Template Generation |")
+ print("+===================================================================+")
+
+ preload_dir = os.path.join(get_output_dir(config), "preloads")
+ if os.path.exists(preload_dir):
+ shutil.rmtree(preload_dir)
+ env_directory = config.getoption("env_dir")
+ preload_env = PreloadEnvironment(env_directory) if env_directory else None
+ plugins = get_generator_plugins()
+ available_formats = [p.format_name() for p in plugins]
+ selected_formats = config.getoption("preload_formats") or available_formats
+ heat_templates = get_heat_templates(config)
+ vnf = None
+ for plugin_class in plugins:
+ if plugin_class.format_name() not in selected_formats:
+ continue
+ vnf = Vnf(heat_templates)
+ generator = plugin_class(vnf, preload_dir, preload_env)
+ generator.generate()
+ if vnf and vnf.uses_contrail:
+ print(
+ "\nWARNING: Preload template generation does not support Contrail\n"
+ "at this time, but Contrail resources were detected. The preload \n"
+ "template may be incomplete."
+ )
+ if exitstatus != 0:
+ print(
+ "\nWARNING: Heat violations detected. Preload templates may be\n"
+ "incomplete."
+ )