summaryrefslogtreecommitdiffstats
path: root/nfvparser/toscaparser/imports.py
diff options
context:
space:
mode:
Diffstat (limited to 'nfvparser/toscaparser/imports.py')
-rw-r--r--nfvparser/toscaparser/imports.py290
1 files changed, 290 insertions, 0 deletions
diff --git a/nfvparser/toscaparser/imports.py b/nfvparser/toscaparser/imports.py
new file mode 100644
index 0000000..429a396
--- /dev/null
+++ b/nfvparser/toscaparser/imports.py
@@ -0,0 +1,290 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import logging
+import os
+
+from toscaparser.common.exception import ExceptionCollector
+from toscaparser.common.exception import InvalidPropertyValueError
+from toscaparser.common.exception import MissingRequiredFieldError
+from toscaparser.common.exception import UnknownFieldError
+from toscaparser.common.exception import ValidationError
+from toscaparser.elements.tosca_type_validation import TypeValidation
+from toscaparser.utils.gettextutils import _
+import toscaparser.utils.urlutils
+import toscaparser.utils.yamlparser
+
+YAML_LOADER = toscaparser.utils.yamlparser.load_yaml
+log = logging.getLogger("tosca")
+
+
+class ImportsLoader(object):
+
+ IMPORTS_SECTION = (FILE, REPOSITORY, NAMESPACE_URI, NAMESPACE_PREFIX) = \
+ ('file', 'repository', 'namespace_uri',
+ 'namespace_prefix')
+
+ def __init__(self, importslist, path, type_definition_list=None,
+ tpl=None):
+ self.importslist = importslist
+ self.custom_defs = {}
+ self.nested_tosca_tpls = []
+ if not path and not tpl:
+ msg = _('Input tosca template is not provided.')
+ log.warning(msg)
+ ExceptionCollector.appendException(ValidationError(message=msg))
+ self.path = path
+ self.repositories = {}
+ if tpl and tpl.get('repositories'):
+ self.repositories = tpl.get('repositories')
+ self.type_definition_list = []
+ if type_definition_list:
+ if isinstance(type_definition_list, list):
+ self.type_definition_list = type_definition_list
+ else:
+ self.type_definition_list.append(type_definition_list)
+ self._validate_and_load_imports()
+
+ def get_custom_defs(self):
+ return self.custom_defs
+
+ def get_nested_tosca_tpls(self):
+ return self.nested_tosca_tpls
+
+ def _validate_and_load_imports(self):
+ imports_names = set()
+
+ if not self.importslist:
+ msg = _('"imports" keyname is defined without including '
+ 'templates.')
+ log.error(msg)
+ ExceptionCollector.appendException(ValidationError(message=msg))
+ return
+
+ for import_def in self.importslist:
+ if isinstance(import_def, dict):
+ for import_name, import_uri in import_def.items():
+ if import_name in imports_names:
+ msg = (_('Duplicate import name "%s" was found.') %
+ import_name)
+ log.error(msg)
+ ExceptionCollector.appendException(
+ ValidationError(message=msg))
+ imports_names.add(import_name)
+
+ full_file_name, custom_type = self._load_import_template(
+ import_name, import_uri)
+ namespace_prefix = None
+ if isinstance(import_uri, dict):
+ namespace_prefix = import_uri.get(
+ self.NAMESPACE_PREFIX)
+ if custom_type:
+ TypeValidation(custom_type, import_def)
+ self._update_custom_def(custom_type, namespace_prefix)
+ else: # old style of imports
+ full_file_name, custom_type = self._load_import_template(
+ None, import_def)
+ if custom_type:
+ TypeValidation(
+ custom_type, import_def)
+ self._update_custom_def(custom_type, None)
+
+ self._update_nested_tosca_tpls(full_file_name, custom_type)
+
+ def _update_custom_def(self, custom_type, namespace_prefix):
+ outer_custom_types = {}
+ for type_def in self.type_definition_list:
+ outer_custom_types = custom_type.get(type_def)
+ if outer_custom_types:
+ if type_def == "imports":
+ for i in self.custom_defs.get('imports', []):
+ if i not in outer_custom_types:
+ outer_custom_types.append(i)
+ self.custom_defs.update({'imports': outer_custom_types})
+ else:
+ if namespace_prefix:
+ prefix_custom_types = {}
+ for type_def_key in outer_custom_types.keys():
+ namespace_prefix_to_key = (namespace_prefix +
+ "." + type_def_key)
+ prefix_custom_types[namespace_prefix_to_key] = \
+ outer_custom_types[type_def_key]
+ self.custom_defs.update(prefix_custom_types)
+ else:
+ self.custom_defs.update(outer_custom_types)
+
+ def _update_nested_tosca_tpls(self, full_file_name, custom_tpl):
+ if full_file_name and custom_tpl:
+ topo_tpl = {full_file_name: custom_tpl}
+ self.nested_tosca_tpls.append(topo_tpl)
+
+ def _validate_import_keys(self, import_name, import_uri_def):
+ if self.FILE not in import_uri_def.keys():
+ log.warning(_('Missing keyname "file" in import "%(name)s".')
+ % {'name': import_name})
+ ExceptionCollector.appendException(
+ MissingRequiredFieldError(
+ what='Import of template "%s"' % import_name,
+ required=self.FILE))
+ for key in import_uri_def.keys():
+ if key not in self.IMPORTS_SECTION:
+ log.warning(_('Unknown keyname "%(key)s" error in '
+ 'imported definition "%(def)s".')
+ % {'key': key, 'def': import_name})
+ ExceptionCollector.appendException(
+ UnknownFieldError(
+ what='Import of template "%s"' % import_name,
+ field=key))
+
+ def _load_import_template(self, import_name, import_uri_def):
+ """Handle custom types defined in imported template files
+
+ This method loads the custom type definitions referenced in "imports"
+ section of the TOSCA YAML template by determining whether each import
+ is specified via a file reference (by relative or absolute path) or a
+ URL reference.
+
+ Possibilities:
+ +----------+--------+------------------------------+
+ | template | import | comment |
+ +----------+--------+------------------------------+
+ | file | file | OK |
+ | file | URL | OK |
+ | preparsed| file | file must be a full path |
+ | preparsed| URL | OK |
+ | URL | file | file must be a relative path |
+ | URL | URL | OK |
+ +----------+--------+------------------------------+
+ """
+ short_import_notation = False
+ if isinstance(import_uri_def, dict):
+ self._validate_import_keys(import_name, import_uri_def)
+ file_name = import_uri_def.get(self.FILE)
+ repository = import_uri_def.get(self.REPOSITORY)
+ repos = self.repositories.keys()
+ if repository is not None:
+ if repository not in repos:
+ ExceptionCollector.appendException(
+ InvalidPropertyValueError(
+ what=_('Repository is not found in "%s"') % repos))
+ else:
+ file_name = import_uri_def
+ repository = None
+ short_import_notation = True
+
+ if not file_name:
+ msg = (_('A template file name is not provided with import '
+ 'definition "%(import_name)s".')
+ % {'import_name': import_name})
+ log.error(msg)
+ ExceptionCollector.appendException(ValidationError(message=msg))
+ return None, None
+
+ if toscaparser.utils.urlutils.UrlUtils.validate_url(file_name):
+ return file_name, YAML_LOADER(file_name, False)
+ elif not repository:
+ import_template = None
+ if self.path:
+ if toscaparser.utils.urlutils.UrlUtils.validate_url(self.path):
+ if os.path.isabs(file_name):
+ msg = (_('Absolute file name "%(name)s" cannot be '
+ 'used in a URL-based input template '
+ '"%(template)s".')
+ % {'name': file_name, 'template': self.path})
+ log.error(msg)
+ ExceptionCollector.appendException(ImportError(msg))
+ return None, None
+ import_template = toscaparser.utils.urlutils.UrlUtils.\
+ join_url(self.path, file_name)
+ a_file = False
+ else:
+ a_file = True
+ main_a_file = os.path.isfile(self.path)
+
+ if main_a_file:
+ if os.path.isfile(file_name):
+ import_template = file_name
+ else:
+ full_path = os.path.join(
+ os.path.dirname(os.path.abspath(self.path)),
+ file_name)
+ if os.path.isfile(full_path):
+ import_template = full_path
+ else:
+ file_path = file_name.rpartition("/")
+ dir_path = os.path.dirname(os.path.abspath(
+ self.path))
+ if file_path[0] != '' and dir_path.endswith(
+ file_path[0]):
+ import_template = dir_path + "/" +\
+ file_path[2]
+ if not os.path.isfile(import_template):
+ msg = (_('"%(import_template)s" is'
+ 'not a valid file')
+ % {'import_template':
+ import_template})
+ log.error(msg)
+ ExceptionCollector.appendException
+ (ValueError(msg))
+ else: # template is pre-parsed
+ if os.path.isabs(file_name) and os.path.isfile(file_name):
+ a_file = True
+ import_template = file_name
+ else:
+ msg = (_('Relative file name "%(name)s" cannot be used '
+ 'in a pre-parsed input template.')
+ % {'name': file_name})
+ log.error(msg)
+ ExceptionCollector.appendException(ImportError(msg))
+ return None, None
+
+ if not import_template:
+ log.error(_('Import "%(name)s" is not valid.') %
+ {'name': import_uri_def})
+ ExceptionCollector.appendException(
+ ImportError(_('Import "%s" is not valid.') %
+ import_uri_def))
+ return None, None
+ return import_template, YAML_LOADER(import_template, a_file)
+
+ if short_import_notation:
+ log.error(_('Import "%(name)s" is not valid.') % import_uri_def)
+ ExceptionCollector.appendException(
+ ImportError(_('Import "%s" is not valid.') % import_uri_def))
+ return None, None
+
+ full_url = ""
+ if repository:
+ if self.repositories:
+ for repo_name, repo_def in self.repositories.items():
+ if repo_name == repository:
+ # Remove leading, ending spaces and strip
+ # the last character if "/"
+ repo_url = ((repo_def['url']).strip()).rstrip("//")
+ full_url = repo_url + "/" + file_name
+
+ if not full_url:
+ msg = (_('referenced repository "%(n_uri)s" in import '
+ 'definition "%(tpl)s" not found.')
+ % {'n_uri': repository, 'tpl': import_name})
+ log.error(msg)
+ ExceptionCollector.appendException(ImportError(msg))
+ return None, None
+
+ if toscaparser.utils.urlutils.UrlUtils.validate_url(full_url):
+ return full_url, YAML_LOADER(full_url, False)
+ else:
+ msg = (_('repository url "%(n_uri)s" is not valid in import '
+ 'definition "%(tpl)s".')
+ % {'n_uri': repo_url, 'tpl': import_name})
+ log.error(msg)
+ ExceptionCollector.appendException(ImportError(msg))