# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import os import pprint import tempfile import zipfile import requests from ruamel import yaml # @UnresolvedImport META_FILE = 'TOSCA-Metadata/TOSCA.meta' META_FILE_VERSION_KEY = 'TOSCA-Meta-File-Version' META_FILE_VERSION_VALUE = '1.0' META_CSAR_VERSION_KEY = 'CSAR-Version' META_CSAR_VERSION_VALUE = '1.1' META_CREATED_BY_KEY = 'Created-By' META_CREATED_BY_VALUE = 'ONAP' META_ENTRY_DEFINITIONS_KEY = 'Entry-Definitions' META_ENTRY_MANIFEST_FILE_KEY = 'Entry-Manifest' META_ENTRY_HISTORY_FILE_KEY = 'Entry-Change-Log' META_ENTRY_TESTS_DIR_KEY = 'Entry-Tests' META_ENTRY_LICENSES_DIR_KEY = 'Entry-Licenses' BASE_METADATA = { META_FILE_VERSION_KEY: META_FILE_VERSION_VALUE, META_CSAR_VERSION_KEY: META_CSAR_VERSION_VALUE, META_CREATED_BY_KEY: META_CREATED_BY_VALUE, } def check_file_dir(root, entry, msg, check_for_non=False, check_dir=False): path = os.path.join(root, entry) if check_for_non: ret = not os.path.exists(path) error_msg = '{0} already exists. ' + msg elif check_dir: ret = os.path.isdir(path) error_msg = '{0} is not an existing directory. ' + msg else: ret = os.path.isfile(path) error_msg = '{0} is not an existing file. ' + msg if not ret: raise ValueError(error_msg.format(path)) def write(source, entry, destination, logger, args): source = os.path.expanduser(source) destination = os.path.expanduser(destination) metadata = BASE_METADATA.copy() check_file_dir(root=source, entry='', msg='Please specify the service template directory.', check_dir=True) check_file_dir(root=source, entry=entry, msg='Please specify a valid entry point.', check_dir=False) metadata[META_ENTRY_DEFINITIONS_KEY] = entry check_file_dir(root='', entry=destination, msg='Please provide a path to where the CSAR should be created.', check_for_non=True) check_file_dir(root=source, entry=META_FILE, msg='This commands generates a meta file for you. Please ' 'remove the existing metafile.', check_for_non=True) if(args.manifest): check_file_dir(root=source, entry=args.manifest, msg='Please specify a valid manifest file.', check_dir=False) metadata[META_ENTRY_MANIFEST_FILE_KEY] = args.manifest if(args.history): check_file_dir(root=source, entry=args.history, msg='Please specify a valid change history file.', check_dir=False) metadata[META_ENTRY_HISTORY_FILE_KEY] = args.history if(args.tests): check_file_dir(root=source, entry=args.tests, msg='Please specify a valid test directory.', check_dir=True) metadata[META_ENTRY_TESTS_DIR_KEY] = args.tests if(args.licenses): check_file_dir(root=source, entry=args.licenses, msg='Please specify a valid license directory.', check_dir=True) metadata[META_ENTRY_LICENSES_DIR_KEY] = args.licenses logger.debug('Compressing root directory to ZIP') with zipfile.ZipFile(destination, 'w', zipfile.ZIP_DEFLATED) as f: for root, dirs, files in os.walk(source): for file in files: file_full_path = os.path.join(root, file) file_relative_path = os.path.relpath(file_full_path, source) logger.debug('Writing to archive: {0}'.format(file_relative_path)) f.write(file_full_path, file_relative_path) # add empty dir for dir in dirs: dir_full_path = os.path.join(root, dir) if len(os.listdir(dir_full_path)) == 0: dir_relative_path = os.path.relpath(dir_full_path, source) + os.sep logger.debug('Writing to archive: {0}'.format(dir_relative_path)) f.write(dir_full_path + os.sep, dir_relative_path) logger.debug('Writing new metadata file to {0}'.format(META_FILE)) f.writestr(META_FILE, yaml.dump(metadata, default_flow_style=False)) class _CSARReader(object): def __init__(self, source, destination, logger): self.logger = logger if os.path.isdir(destination) and os.listdir(destination): raise ValueError('{0} already exists and is not empty. ' 'Please specify the location where the CSAR ' 'should be extracted.'.format(destination)) downloaded_csar = '://' in source if downloaded_csar: file_descriptor, download_target = tempfile.mkstemp() os.close(file_descriptor) self._download(source, download_target) source = download_target self.source = os.path.expanduser(source) self.destination = os.path.expanduser(destination) self.metadata = {} try: if not os.path.exists(self.source): raise ValueError('{0} does not exists. Please specify a valid CSAR path.' .format(self.source)) if not zipfile.is_zipfile(self.source): raise ValueError('{0} is not a valid CSAR.'.format(self.source)) self._extract() self._read_metadata() self._validate() finally: if downloaded_csar: os.remove(self.source) @property def created_by(self): return self.metadata.get(META_CREATED_BY_KEY) @property def csar_version(self): return self.metadata.get(META_CSAR_VERSION_KEY) @property def meta_file_version(self): return self.metadata.get(META_FILE_VERSION_KEY) @property def entry_definitions(self): return self.metadata.get(META_ENTRY_DEFINITIONS_KEY) @property def entry_definitions_yaml(self): with open(os.path.join(self.destination, self.entry_definitions)) as f: return yaml.load(f) @property def entry_manifest_file(self): return self.metadata.get(META_ENTRY_MANIFEST_FILE_KEY) @property def entry_history_file(self): return self.metadata.get(META_ENTRY_HISTORY_FILE_KEY) @property def entry_tests_dir(self): return self.metadata.get(META_ENTRY_TESTS_DIR_KEY) @property def entry_licenses_dir(self): return self.metadata.get(META_ENTRY_LICENSES_DIR_KEY) def _extract(self): self.logger.debug('Extracting CSAR contents') if not os.path.exists(self.destination): os.mkdir(self.destination) with zipfile.ZipFile(self.source) as f: f.extractall(self.destination) self.logger.debug('CSAR contents successfully extracted') def _read_metadata(self): csar_metafile = os.path.join(self.destination, META_FILE) if not os.path.exists(csar_metafile): raise ValueError('Metadata file {0} is missing from the CSAR'.format(csar_metafile)) self.logger.debug('CSAR metadata file: {0}'.format(csar_metafile)) self.logger.debug('Attempting to parse CSAR metadata YAML') with open(csar_metafile) as f: self.metadata.update(yaml.load(f)) self.logger.debug('CSAR metadata:\n{0}'.format(pprint.pformat(self.metadata))) def _validate(self): def validate_key(key, expected=None): if not self.metadata.get(key): raise ValueError('{0} is missing from the metadata file.'.format(key)) actual = str(self.metadata[key]) if expected and actual != expected: raise ValueError('{0} is expected to be {1} in the metadata file while it is in ' 'fact {2}.'.format(key, expected, actual)) validate_key(META_FILE_VERSION_KEY, expected=META_FILE_VERSION_VALUE) validate_key(META_CSAR_VERSION_KEY, expected=META_CSAR_VERSION_VALUE) validate_key(META_CREATED_BY_KEY) validate_key(META_ENTRY_DEFINITIONS_KEY) self.logger.debug('CSAR entry definitions: {0}'.format(self.entry_definitions)) self.logger.debug('CSAR manifest file: {0}'.format(self.entry_manifest_file)) self.logger.debug('CSAR change history file: {0}'.format(self.entry_history_file)) self.logger.debug('CSAR tests directory: {0}'.format(self.entry_tests_dir)) self.logger.debug('CSAR licenses directory: {0}'.format(self.entry_licenses_dir)) check_file_dir(self.destination, self.entry_definitions, 'The entry definitions {0} referenced by the metadata ' 'file does not exist.'.format(self.entry_definitions), check_dir=False) if(self.entry_manifest_file): check_file_dir(self.destination, self.entry_manifest_file, 'The manifest file {0} referenced by the metadata ' 'file does not exist.'.format(self.entry_manifest_file), check_dir=False) if(self.entry_history_file): check_file_dir(self.destination, self.entry_history_file, 'The change history file {0} referenced by the metadata ' 'file does not exist.'.format(self.entry_history_file), check_dir=False) if(self.entry_tests_dir): check_file_dir(self.destination, self.entry_tests_dir, 'The test directory {0} referenced by the metadata ' 'file does not exist.'.format(self.entry_tests_dir), check_dir=True) if(self.entry_licenses_dir): check_file_dir(self.destination, self.entry_licenses_dir, 'The license directory {0} referenced by the metadata ' 'file does not exist.'.format(self.entry_licenses_dir), check_dir=True) def _download(self, url, target): response = requests.get(url, stream=True) if response.status_code != 200: raise ValueError('Server at {0} returned a {1} status code' .format(url, response.status_code)) self.logger.info('Downloading {0} to {1}'.format(url, target)) with open(target, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): if chunk: f.write(chunk) def read(source, destination, logger): return _CSARReader(source=source, destination=destination, logger=logger)