diff options
Diffstat (limited to 'vnfsdk_pkgtools/packager/csar.py')
-rw-r--r-- | vnfsdk_pkgtools/packager/csar.py | 285 |
1 files changed, 285 insertions, 0 deletions
diff --git a/vnfsdk_pkgtools/packager/csar.py b/vnfsdk_pkgtools/packager/csar.py new file mode 100644 index 0000000..b4bee29 --- /dev/null +++ b/vnfsdk_pkgtools/packager/csar.py @@ -0,0 +1,285 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import pprint +import tempfile +import zipfile + +import requests +from ruamel import yaml # @UnresolvedImport + + +META_FILE = 'TOSCA-Metadata/TOSCA.meta' +META_FILE_VERSION_KEY = 'TOSCA-Meta-File-Version' +META_FILE_VERSION_VALUE = '1.0' +META_CSAR_VERSION_KEY = 'CSAR-Version' +META_CSAR_VERSION_VALUE = '1.1' +META_CREATED_BY_KEY = 'Created-By' +META_CREATED_BY_VALUE = 'ONAP' +META_ENTRY_DEFINITIONS_KEY = 'Entry-Definitions' +META_ENTRY_MANIFEST_FILE_KEY = 'Entry-Manifest' +META_ENTRY_HISTORY_FILE_KEY = 'Entry-Change-Log' +META_ENTRY_TESTS_DIR_KEY = 'Entry-Tests' +META_ENTRY_LICENSES_DIR_KEY = 'Entry-Licenses' + +BASE_METADATA = { + META_FILE_VERSION_KEY: META_FILE_VERSION_VALUE, + META_CSAR_VERSION_KEY: META_CSAR_VERSION_VALUE, + META_CREATED_BY_KEY: META_CREATED_BY_VALUE, +} + + +def check_file_dir(root, entry, msg, check_for_non=False, check_dir=False): + path = os.path.join(root, entry) + if check_for_non: + ret = not os.path.exists(path) + error_msg = '{0} already exists. ' + msg + elif check_dir: + ret = os.path.isdir(path) + error_msg = '{0} is not an existing directory. ' + msg + else: + ret = os.path.isfile(path) + error_msg = '{0} is not an existing file. ' + msg + if not ret: + raise ValueError(error_msg.format(path)) + + +def write(source, entry, destination, logger, args): + source = os.path.expanduser(source) + destination = os.path.expanduser(destination) + metadata = BASE_METADATA.copy() + + check_file_dir(root=source, + entry='', + msg='Please specify the service template directory.', + check_dir=True) + + check_file_dir(root=source, + entry=entry, + msg='Please specify a valid entry point.', + check_dir=False) + metadata[META_ENTRY_DEFINITIONS_KEY] = entry + + check_file_dir(root='', + entry=destination, + msg='Please provide a path to where the CSAR should be created.', + check_for_non=True) + + check_file_dir(root=source, + entry=META_FILE, + msg='This commands generates a meta file for you. Please ' + 'remove the existing metafile.', + check_for_non=True) + + if(args.manifest): + check_file_dir(root=source, + entry=args.manifest, + msg='Please specify a valid manifest file.', + check_dir=False) + metadata[META_ENTRY_MANIFEST_FILE_KEY] = args.manifest + + if(args.history): + check_file_dir(root=source, + entry=args.history, + msg='Please specify a valid change history file.', + check_dir=False) + metadata[META_ENTRY_HISTORY_FILE_KEY] = args.history + + if(args.tests): + check_file_dir(root=source, + entry=args.tests, + msg='Please specify a valid test directory.', + check_dir=True) + metadata[META_ENTRY_TESTS_DIR_KEY] = args.tests + + if(args.licenses): + check_file_dir(root=source, + entry=args.licenses, + msg='Please specify a valid license directory.', + check_dir=True) + metadata[META_ENTRY_LICENSES_DIR_KEY] = args.licenses + + logger.debug('Compressing root directory to ZIP') + with zipfile.ZipFile(destination, 'w', zipfile.ZIP_DEFLATED) as f: + for root, dirs, files in os.walk(source): + for file in files: + file_full_path = os.path.join(root, file) + file_relative_path = os.path.relpath(file_full_path, source) + logger.debug('Writing to archive: {0}'.format(file_relative_path)) + f.write(file_full_path, file_relative_path) + # add empty dir + for dir in dirs: + dir_full_path = os.path.join(root, dir) + if len(os.listdir(dir_full_path)) == 0: + dir_relative_path = os.path.relpath(dir_full_path, source) + os.sep + logger.debug('Writing to archive: {0}'.format(dir_relative_path)) + f.write(dir_full_path + os.sep, dir_relative_path) + + logger.debug('Writing new metadata file to {0}'.format(META_FILE)) + f.writestr(META_FILE, yaml.dump(metadata, default_flow_style=False)) + + +class _CSARReader(object): + + def __init__(self, source, destination, logger): + self.logger = logger + if os.path.isdir(destination) and os.listdir(destination): + raise ValueError('{0} already exists and is not empty. ' + 'Please specify the location where the CSAR ' + 'should be extracted.'.format(destination)) + downloaded_csar = '://' in source + if downloaded_csar: + file_descriptor, download_target = tempfile.mkstemp() + os.close(file_descriptor) + self._download(source, download_target) + source = download_target + self.source = os.path.expanduser(source) + self.destination = os.path.expanduser(destination) + self.metadata = {} + try: + if not os.path.exists(self.source): + raise ValueError('{0} does not exists. Please specify a valid CSAR path.' + .format(self.source)) + if not zipfile.is_zipfile(self.source): + raise ValueError('{0} is not a valid CSAR.'.format(self.source)) + self._extract() + self._read_metadata() + self._validate() + finally: + if downloaded_csar: + os.remove(self.source) + + @property + def created_by(self): + return self.metadata.get(META_CREATED_BY_KEY) + + @property + def csar_version(self): + return self.metadata.get(META_CSAR_VERSION_KEY) + + @property + def meta_file_version(self): + return self.metadata.get(META_FILE_VERSION_KEY) + + @property + def entry_definitions(self): + return self.metadata.get(META_ENTRY_DEFINITIONS_KEY) + + @property + def entry_definitions_yaml(self): + with open(os.path.join(self.destination, self.entry_definitions)) as f: + return yaml.load(f) + + @property + def entry_manifest_file(self): + return self.metadata.get(META_ENTRY_MANIFEST_FILE_KEY) + + @property + def entry_history_file(self): + return self.metadata.get(META_ENTRY_HISTORY_FILE_KEY) + + @property + def entry_tests_dir(self): + return self.metadata.get(META_ENTRY_TESTS_DIR_KEY) + + @property + def entry_licenses_dir(self): + return self.metadata.get(META_ENTRY_LICENSES_DIR_KEY) + + def _extract(self): + self.logger.debug('Extracting CSAR contents') + if not os.path.exists(self.destination): + os.mkdir(self.destination) + with zipfile.ZipFile(self.source) as f: + f.extractall(self.destination) + self.logger.debug('CSAR contents successfully extracted') + + def _read_metadata(self): + csar_metafile = os.path.join(self.destination, META_FILE) + if not os.path.exists(csar_metafile): + raise ValueError('Metadata file {0} is missing from the CSAR'.format(csar_metafile)) + self.logger.debug('CSAR metadata file: {0}'.format(csar_metafile)) + self.logger.debug('Attempting to parse CSAR metadata YAML') + with open(csar_metafile) as f: + self.metadata.update(yaml.load(f)) + self.logger.debug('CSAR metadata:\n{0}'.format(pprint.pformat(self.metadata))) + + def _validate(self): + def validate_key(key, expected=None): + if not self.metadata.get(key): + raise ValueError('{0} is missing from the metadata file.'.format(key)) + actual = str(self.metadata[key]) + if expected and actual != expected: + raise ValueError('{0} is expected to be {1} in the metadata file while it is in ' + 'fact {2}.'.format(key, expected, actual)) + validate_key(META_FILE_VERSION_KEY, expected=META_FILE_VERSION_VALUE) + validate_key(META_CSAR_VERSION_KEY, expected=META_CSAR_VERSION_VALUE) + validate_key(META_CREATED_BY_KEY) + validate_key(META_ENTRY_DEFINITIONS_KEY) + self.logger.debug('CSAR entry definitions: {0}'.format(self.entry_definitions)) + self.logger.debug('CSAR manifest file: {0}'.format(self.entry_manifest_file)) + self.logger.debug('CSAR change history file: {0}'.format(self.entry_history_file)) + self.logger.debug('CSAR tests directory: {0}'.format(self.entry_tests_dir)) + self.logger.debug('CSAR licenses directory: {0}'.format(self.entry_licenses_dir)) + + check_file_dir(self.destination, + self.entry_definitions, + 'The entry definitions {0} referenced by the metadata ' + 'file does not exist.'.format(self.entry_definitions), + check_dir=False) + + if(self.entry_manifest_file): + check_file_dir(self.destination, + self.entry_manifest_file, + 'The manifest file {0} referenced by the metadata ' + 'file does not exist.'.format(self.entry_manifest_file), + check_dir=False) + + if(self.entry_history_file): + check_file_dir(self.destination, + self.entry_history_file, + 'The change history file {0} referenced by the metadata ' + 'file does not exist.'.format(self.entry_history_file), + check_dir=False) + + if(self.entry_tests_dir): + check_file_dir(self.destination, + self.entry_tests_dir, + 'The test directory {0} referenced by the metadata ' + 'file does not exist.'.format(self.entry_tests_dir), + check_dir=True) + + if(self.entry_licenses_dir): + check_file_dir(self.destination, + self.entry_licenses_dir, + 'The license directory {0} referenced by the metadata ' + 'file does not exist.'.format(self.entry_licenses_dir), + check_dir=True) + + def _download(self, url, target): + response = requests.get(url, stream=True) + if response.status_code != 200: + raise ValueError('Server at {0} returned a {1} status code' + .format(url, response.status_code)) + self.logger.info('Downloading {0} to {1}'.format(url, target)) + with open(target, 'wb') as f: + for chunk in response.iter_content(chunk_size=8192): + if chunk: + f.write(chunk) + + +def read(source, destination, logger): + return _CSARReader(source=source, destination=destination, logger=logger) |