summaryrefslogtreecommitdiffstats
path: root/nfvparser/src/main/python/toscaparser/prereq/csar.py
diff options
context:
space:
mode:
Diffstat (limited to 'nfvparser/src/main/python/toscaparser/prereq/csar.py')
-rw-r--r--nfvparser/src/main/python/toscaparser/prereq/csar.py286
1 files changed, 286 insertions, 0 deletions
diff --git a/nfvparser/src/main/python/toscaparser/prereq/csar.py b/nfvparser/src/main/python/toscaparser/prereq/csar.py
new file mode 100644
index 0000000..1cba5c4
--- /dev/null
+++ b/nfvparser/src/main/python/toscaparser/prereq/csar.py
@@ -0,0 +1,286 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import os.path
+import requests
+import shutil
+import six
+import tempfile
+import yaml
+import zipfile
+
+from toscaparser.common.exception import ExceptionCollector
+from toscaparser.common.exception import URLException
+from toscaparser.common.exception import ValidationError
+from toscaparser.imports import ImportsLoader
+from toscaparser.utils.gettextutils import _
+from toscaparser.utils.urlutils import UrlUtils
+
+try: # Python 2.x
+ from BytesIO import BytesIO
+except ImportError: # Python 3.x
+ from io import BytesIO
+
+
+class CSAR(object):
+
+ def __init__(self, csar_file, a_file=True):
+ self.path = csar_file
+ self.a_file = a_file
+ self.is_validated = False
+ self.error_caught = False
+ self.csar = None
+ self.temp_dir = None
+
+ def validate(self):
+ """Validate the provided CSAR file."""
+
+ self.is_validated = True
+
+ # validate that the file or URL exists
+ missing_err_msg = (_('"%s" does not exist.') % self.path)
+ if self.a_file:
+ if not os.path.isfile(self.path):
+ ExceptionCollector.appendException(
+ ValidationError(message=missing_err_msg))
+ return False
+ else:
+ self.csar = self.path
+ else: # a URL
+ if not UrlUtils.validate_url(self.path):
+ ExceptionCollector.appendException(
+ ValidationError(message=missing_err_msg))
+ return False
+ else:
+ response = requests.get(self.path)
+ self.csar = BytesIO(response.content)
+
+ # validate that it is a valid zip file
+ if not zipfile.is_zipfile(self.csar):
+ err_msg = (_('"%s" is not a valid zip file.') % self.path)
+ ExceptionCollector.appendException(
+ ValidationError(message=err_msg))
+ return False
+
+ # validate that it contains the metadata file in the correct location
+ self.zfile = zipfile.ZipFile(self.csar, 'r')
+ filelist = self.zfile.namelist()
+ if 'TOSCA-Metadata/TOSCA.meta' not in filelist:
+ err_msg = (_('"%s" is not a valid CSAR as it does not contain the '
+ 'required file "TOSCA.meta" in the folder '
+ '"TOSCA-Metadata".') % self.path)
+ ExceptionCollector.appendException(
+ ValidationError(message=err_msg))
+ return False
+
+ # validate that 'Entry-Definitions' property exists in TOSCA.meta
+ data = self.zfile.read('TOSCA-Metadata/TOSCA.meta')
+ invalid_yaml_err_msg = (_('The file "TOSCA-Metadata/TOSCA.meta" in '
+ 'the CSAR "%s" does not contain valid YAML '
+ 'content.') % self.path)
+ try:
+ meta = yaml.load(data)
+ if type(meta) is dict:
+ self.metadata = meta
+ else:
+ ExceptionCollector.appendException(
+ ValidationError(message=invalid_yaml_err_msg))
+ return False
+ except yaml.YAMLError:
+ ExceptionCollector.appendException(
+ ValidationError(message=invalid_yaml_err_msg))
+ return False
+
+ if 'Entry-Definitions' not in self.metadata:
+ err_msg = (_('The CSAR "%s" is missing the required metadata '
+ '"Entry-Definitions" in '
+ '"TOSCA-Metadata/TOSCA.meta".')
+ % self.path)
+ ExceptionCollector.appendException(
+ ValidationError(message=err_msg))
+ return False
+
+ # validate that 'Entry-Definitions' metadata value points to an
+ # existing file in the CSAR
+ entry = self.metadata.get('Entry-Definitions')
+ if entry and entry not in filelist:
+ err_msg = (_('The "Entry-Definitions" file defined in the '
+ 'CSAR "%s" does not exist.') % self.path)
+ ExceptionCollector.appendException(
+ ValidationError(message=err_msg))
+ return False
+
+ # validate that external references in the main template actually
+ # exist and are accessible
+ self._validate_external_references()
+ return not self.error_caught
+
+ def get_metadata(self):
+ """Return the metadata dictionary."""
+
+ # validate the csar if not already validated
+ if not self.is_validated:
+ self.validate()
+
+ # return a copy to avoid changes overwrite the original
+ return dict(self.metadata) if self.metadata else None
+
+ def _get_metadata(self, key):
+ if not self.is_validated:
+ self.validate()
+ return self.metadata.get(key)
+
+ def get_author(self):
+ return self._get_metadata('Created-By')
+
+ def get_version(self):
+ return self._get_metadata('CSAR-Version')
+
+ def get_main_template(self):
+ entry_def = self._get_metadata('Entry-Definitions')
+ if entry_def in self.zfile.namelist():
+ return entry_def
+
+ def get_main_template_yaml(self):
+ main_template = self.get_main_template()
+ if main_template:
+ data = self.zfile.read(main_template)
+ invalid_tosca_yaml_err_msg = (
+ _('The file "%(template)s" in the CSAR "%(csar)s" does not '
+ 'contain valid TOSCA YAML content.') %
+ {'template': main_template, 'csar': self.path})
+ try:
+ tosca_yaml = yaml.load(data)
+ if type(tosca_yaml) is not dict:
+ ExceptionCollector.appendException(
+ ValidationError(message=invalid_tosca_yaml_err_msg))
+ return tosca_yaml
+ except Exception:
+ ExceptionCollector.appendException(
+ ValidationError(message=invalid_tosca_yaml_err_msg))
+
+ def get_description(self):
+ desc = self._get_metadata('Description')
+ if desc is not None:
+ return desc
+
+ self.metadata['Description'] = \
+ self.get_main_template_yaml().get('description')
+ return self.metadata['Description']
+
+ def decompress(self):
+ if not self.is_validated:
+ self.validate()
+ self.temp_dir = tempfile.NamedTemporaryFile().name
+ with zipfile.ZipFile(self.csar, "r") as zf:
+ zf.extractall(self.temp_dir)
+
+ def _validate_external_references(self):
+ """Extracts files referenced in the main template
+
+ These references are currently supported:
+ * imports
+ * interface implementations
+ * artifacts
+ """
+ try:
+ self.decompress()
+ main_tpl_file = self.get_main_template()
+ if not main_tpl_file:
+ return
+ main_tpl = self.get_main_template_yaml()
+
+ if 'imports' in main_tpl:
+ ImportsLoader(main_tpl['imports'],
+ os.path.join(self.temp_dir, main_tpl_file))
+
+ if 'topology_template' in main_tpl:
+ topology_template = main_tpl['topology_template']
+
+ if 'node_templates' in topology_template:
+ node_templates = topology_template['node_templates']
+
+ for node_template_key in node_templates:
+ node_template = node_templates[node_template_key]
+ if 'artifacts' in node_template:
+ artifacts = node_template['artifacts']
+ for artifact_key in artifacts:
+ artifact = artifacts[artifact_key]
+ if isinstance(artifact, six.string_types):
+ self._validate_external_reference(
+ main_tpl_file,
+ artifact)
+ elif isinstance(artifact, dict):
+ if 'file' in artifact:
+ self._validate_external_reference(
+ main_tpl_file,
+ artifact['file'])
+ else:
+ ExceptionCollector.appendException(
+ ValueError(_('Unexpected artifact '
+ 'definition for "%s".')
+ % artifact_key))
+ self.error_caught = True
+ if 'interfaces' in node_template:
+ interfaces = node_template['interfaces']
+ for interface_key in interfaces:
+ interface = interfaces[interface_key]
+ for opertation_key in interface:
+ operation = interface[opertation_key]
+ if isinstance(operation, six.string_types):
+ self._validate_external_reference(
+ main_tpl_file,
+ operation,
+ False)
+ elif isinstance(operation, dict):
+ if 'implementation' in operation:
+ self._validate_external_reference(
+ main_tpl_file,
+ operation['implementation'])
+ finally:
+ if self.temp_dir:
+ shutil.rmtree(self.temp_dir)
+
+ def _validate_external_reference(self, tpl_file, resource_file,
+ raise_exc=True):
+ """Verify that the external resource exists
+
+ If resource_file is a URL verify that the URL is valid.
+ If resource_file is a relative path verify that the path is valid
+ considering base folder (self.temp_dir) and tpl_file.
+ Note that in a CSAR resource_file cannot be an absolute path.
+ """
+ if UrlUtils.validate_url(resource_file):
+ msg = (_('The resource at "%s" cannot be accessed.') %
+ resource_file)
+ try:
+ if UrlUtils.url_accessible(resource_file):
+ return
+ else:
+ ExceptionCollector.appendException(
+ URLException(what=msg))
+ self.error_caught = True
+ except Exception:
+ ExceptionCollector.appendException(
+ URLException(what=msg))
+ self.error_caught = True
+
+ if os.path.isfile(os.path.join(self.temp_dir,
+ os.path.dirname(tpl_file),
+ resource_file)):
+ return
+
+ if raise_exc:
+ ExceptionCollector.appendException(
+ ValueError(_('The resource "%s" does not exist.')
+ % resource_file))
+ self.error_caught = True