diff options
Diffstat (limited to 'vnfsdk_pkgtools/packager/csar.py')
-rw-r--r-- | vnfsdk_pkgtools/packager/csar.py | 221 |
1 files changed, 48 insertions, 173 deletions
diff --git a/vnfsdk_pkgtools/packager/csar.py b/vnfsdk_pkgtools/packager/csar.py index 1bf5c20..5fcbec7 100644 --- a/vnfsdk_pkgtools/packager/csar.py +++ b/vnfsdk_pkgtools/packager/csar.py @@ -20,83 +20,43 @@ import tempfile import zipfile import requests -from ruamel import yaml # @UnresolvedImport from vnfsdk_pkgtools.packager import manifest +from vnfsdk_pkgtools.packager import toscameta from vnfsdk_pkgtools.packager import utils LOG = logging.getLogger(__name__) -META_FILE = 'TOSCA-Metadata/TOSCA.meta' -META_FILE_VERSION_KEY = 'TOSCA-Meta-File-Version' -META_FILE_VERSION_VALUE = '1.0' -META_CSAR_VERSION_KEY = 'CSAR-Version' -META_CSAR_VERSION_VALUE = '1.1' -META_CREATED_BY_KEY = 'Created-By' -META_CREATED_BY_VALUE = 'ONAP' -META_ENTRY_DEFINITIONS_KEY = 'Entry-Definitions' -META_ENTRY_MANIFEST_FILE_KEY = 'Entry-Manifest' -META_ENTRY_HISTORY_FILE_KEY = 'Entry-Change-Log' -META_ENTRY_TESTS_DIR_KEY = 'Entry-Tests' -META_ENTRY_LICENSES_DIR_KEY = 'Entry-Licenses' -META_ENTRY_CERT_FILE_KEY = 'Entry-Certificate' - -BASE_METADATA = { - META_FILE_VERSION_KEY: META_FILE_VERSION_VALUE, - META_CSAR_VERSION_KEY: META_CSAR_VERSION_VALUE, - META_CREATED_BY_KEY: META_CREATED_BY_VALUE, -} - - -def check_file_dir(root, entry, msg, check_for_non=False, check_dir=False): - path = os.path.join(root, entry) - if check_for_non: - ret = not os.path.exists(path) - error_msg = '{0} already exists. ' + msg - elif check_dir: - ret = os.path.isdir(path) - error_msg = '{0} is not an existing directory. ' + msg - else: - ret = os.path.isfile(path) - error_msg = '{0} is not an existing file. ' + msg - if not ret: - raise ValueError(error_msg.format(path)) - def write(source, entry, destination, args): source = os.path.expanduser(source) destination = os.path.expanduser(destination) - metadata = BASE_METADATA.copy() - - check_file_dir(root=source, - entry='', - msg='Please specify the service template directory.', - check_dir=True) - - check_file_dir(root=source, - entry=entry, - msg='Please specify a valid entry point.', - check_dir=False) - metadata[META_ENTRY_DEFINITIONS_KEY] = entry - - check_file_dir(root='', - entry=destination, - msg='Please provide a path to where the CSAR should be created.', - check_for_non=True) - - check_file_dir(root=source, - entry=META_FILE, - msg='This commands generates a meta file for you. Please ' - 'remove the existing metafile.', - check_for_non=True) - - if(args.manifest): - check_file_dir(root=source, - entry=args.manifest, - msg='Please specify a valid manifest file.', - check_dir=False) - metadata[META_ENTRY_MANIFEST_FILE_KEY] = args.manifest - manifest_file = manifest.Manifest(source, args.manifest) + + utils.check_file_dir(root=source, + entry='', + msg='Please specify the service template directory.', + check_dir=True) + + utils.check_file_dir(root='', + entry=destination, + msg='Please provide a path to where the CSAR should be created.', + check_for_non=True) + + utils.check_file_dir(root=source, + entry=toscameta.META_FILE, + msg='This commands generates a meta file for you.' + 'Please remove the existing metafile.', + check_for_non=True) + if args.sol241: + metadatacls = toscameta.ToscaMeta241 + else: + metadatacls = toscameta.ToscaMeta261 + metadata = metadatacls(source, args.entry, args.manifest, + args.history, args.licenses, + args.tests, args.certificate) + + if args.manifest: + manifest_file = manifest.Manifest(source, args.manifest) manifest_file_full_path = os.path.join(source, args.manifest) elif args.certificate or args.digest: raise ValueError("Must specify manifest file if certificate or digest is specified") @@ -104,40 +64,13 @@ def write(source, entry, destination, args): manifest_file = None manifest_file_full_path = None - - if(args.history): - check_file_dir(root=source, - entry=args.history, - msg='Please specify a valid change history file.', - check_dir=False) - metadata[META_ENTRY_HISTORY_FILE_KEY] = args.history - if args.certificate: - check_file_dir(root=source, - entry=args.certificate, - msg='Please specify a valid certificate file.', - check_dir=False) - metadata[META_ENTRY_CERT_FILE_KEY] = args.certificate if not args.privkey: raise ValueError('Need private key file for signing') - check_file_dir(root='', - entry=args.privkey, - msg='Please specify a valid private key file.', - check_dir=False) - - if(args.tests): - check_file_dir(root=source, - entry=args.tests, - msg='Please specify a valid test directory.', - check_dir=True) - metadata[META_ENTRY_TESTS_DIR_KEY] = args.tests - - if(args.licenses): - check_file_dir(root=source, - entry=args.licenses, - msg='Please specify a valid license directory.', - check_dir=True) - metadata[META_ENTRY_LICENSES_DIR_KEY] = args.licenses + utils.check_file_dir(root='', + entry=args.privkey, + msg='Please specify a valid private key file.', + check_dir=False) LOG.debug('Compressing root directory to ZIP') with zipfile.ZipFile(destination, 'w', zipfile.ZIP_DEFLATED) as f: @@ -152,14 +85,13 @@ def write(source, entry, destination, args): for file in files: file_full_path = os.path.join(root, file) # skip manifest file here in case we need to generate digest - if file_full_path!=manifest_file_full_path: + if file_full_path != manifest_file_full_path: file_relative_path = os.path.relpath(file_full_path, source) LOG.debug('Writing to archive: {0}'.format(file_relative_path)) f.write(file_full_path, file_relative_path) if manifest_file and args.digest: LOG.debug('Update file digest: {0}'.format(file_relative_path)) manifest_file.add_file(file_relative_path, args.digest) - if manifest_file: LOG.debug('Update manifest file to temporary file') manifest_file_full_path = manifest_file.update_to_file(True) @@ -173,8 +105,8 @@ def write(source, entry, destination, args): LOG.debug('Writing to archive: {0}'.format(args.manifest)) f.write(manifest_file_full_path, args.manifest) - LOG.debug('Writing new metadata file to {0}'.format(META_FILE)) - f.writestr(META_FILE, yaml.dump(metadata, default_flow_style=False)) + LOG.debug('Writing new metadata file to {0}'.format(toscameta.META_FILE)) + f.writestr(toscameta.META_FILE, metadata.dump_as_string()) class _CSARReader(object): @@ -192,7 +124,7 @@ class _CSARReader(object): source = download_target self.source = os.path.expanduser(source) self.destination = os.path.expanduser(destination) - self.metadata = {} + self.metadata = None self.manifest = None try: if not os.path.exists(self.source): @@ -209,19 +141,19 @@ class _CSARReader(object): @property def created_by(self): - return self.metadata.get(META_CREATED_BY_KEY) + return self.metadata.created_by @property def csar_version(self): - return self.metadata.get(META_CSAR_VERSION_KEY) + return self.metadata.csar_version @property def meta_file_version(self): - return self.metadata.get(META_FILE_VERSION_KEY) + return self.metadata.meta_file_version @property def entry_definitions(self): - return self.metadata.get(META_ENTRY_DEFINITIONS_KEY) + return self.metadata.entry_definitions @property def entry_definitions_yaml(self): @@ -230,23 +162,23 @@ class _CSARReader(object): @property def entry_manifest_file(self): - return self.metadata.get(META_ENTRY_MANIFEST_FILE_KEY) + return self.metadata.entry_manifest_file @property def entry_history_file(self): - return self.metadata.get(META_ENTRY_HISTORY_FILE_KEY) + return self.metadata.entry_history_file @property def entry_tests_dir(self): - return self.metadata.get(META_ENTRY_TESTS_DIR_KEY) + return self.metadata.entry_tests_dir @property def entry_licenses_dir(self): - return self.metadata.get(META_ENTRY_LICENSES_DIR_KEY) + return self.metadata.entry_licenses_dir @property def entry_certificate_file(self): - return self.metadata.get(META_ENTRY_CERT_FILE_KEY) + return self.metadata.entry_certificate_file def _extract(self): LOG.debug('Extracting CSAR contents') @@ -257,27 +189,9 @@ class _CSARReader(object): LOG.debug('CSAR contents successfully extracted') def _read_metadata(self): - csar_metafile = os.path.join(self.destination, META_FILE) - if not os.path.exists(csar_metafile): - raise ValueError('Metadata file {0} is missing from the CSAR'.format(csar_metafile)) - LOG.debug('CSAR metadata file: {0}'.format(csar_metafile)) - LOG.debug('Attempting to parse CSAR metadata YAML') - with open(csar_metafile) as f: - self.metadata.update(yaml.safe_load(f)) - LOG.debug('CSAR metadata:\n{0}'.format(pprint.pformat(self.metadata))) + self.metadata = toscameta.create_from_file(self.destination) def _validate(self, no_verify_cert): - def validate_key(key, expected=None): - if not self.metadata.get(key): - raise ValueError('{0} is missing from the metadata file.'.format(key)) - actual = str(self.metadata[key]) - if expected and actual != expected: - raise ValueError('{0} is expected to be {1} in the metadata file while it is in ' - 'fact {2}.'.format(key, expected, actual)) - validate_key(META_FILE_VERSION_KEY, expected=META_FILE_VERSION_VALUE) - validate_key(META_CSAR_VERSION_KEY, expected=META_CSAR_VERSION_VALUE) - validate_key(META_CREATED_BY_KEY) - validate_key(META_ENTRY_DEFINITIONS_KEY) LOG.debug('CSAR entry definitions: {0}'.format(self.entry_definitions)) LOG.debug('CSAR manifest file: {0}'.format(self.entry_manifest_file)) LOG.debug('CSAR change history file: {0}'.format(self.entry_history_file)) @@ -285,50 +199,11 @@ class _CSARReader(object): LOG.debug('CSAR licenses directory: {0}'.format(self.entry_licenses_dir)) LOG.debug('CSAR certificate file: {0}'.format(self.entry_certificate_file)) - check_file_dir(self.destination, - self.entry_definitions, - 'The entry definitions {0} referenced by the metadata ' - 'file does not exist.'.format(self.entry_definitions), - check_dir=False) - - if(self.entry_manifest_file): - check_file_dir(self.destination, - self.entry_manifest_file, - 'The manifest file {0} referenced by the metadata ' - 'file does not exist.'.format(self.entry_manifest_file), - check_dir=False) - self.manifest = manifest.Manifest(self.destination, - self.entry_manifest_file) - - - if(self.entry_history_file): - check_file_dir(self.destination, - self.entry_history_file, - 'The change history file {0} referenced by the metadata ' - 'file does not exist.'.format(self.entry_history_file), - check_dir=False) - - if(self.entry_tests_dir): - check_file_dir(self.destination, - self.entry_tests_dir, - 'The test directory {0} referenced by the metadata ' - 'file does not exist.'.format(self.entry_tests_dir), - check_dir=True) - - if(self.entry_licenses_dir): - check_file_dir(self.destination, - self.entry_licenses_dir, - 'The license directory {0} referenced by the metadata ' - 'file does not exist.'.format(self.entry_licenses_dir), - check_dir=True) + if self.entry_manifest_file: + self.manifest = manifest.Manifest(self.destination, + self.entry_manifest_file) if(self.entry_certificate_file): - # check certificate - check_file_dir(self.destination, - self.entry_certificate_file, - 'The certificate file {0} referenced by the metadata ' - 'file does not exist.'.format(self.entry_certificate_file), - check_dir=False) tmp_manifest = self.manifest.save_to_temp_without_cms() utils.verify(tmp_manifest, os.path.join(self.destination, self.entry_certificate_file), |