aboutsummaryrefslogtreecommitdiffstats
path: root/vnfsdk_pkgtools/packager/csar.py
diff options
context:
space:
mode:
authorLianhao Lu <lianhao.lu@intel.com>2019-11-28 15:12:22 +0800
committerLianhao Lu <lianhao.lu@intel.com>2019-11-29 10:11:46 +0800
commitfdb7c576cf4238228a786aa7cdb808aad2cb72cb (patch)
tree0a704125fdcfebbb8cba9748daf45922a337ae11 /vnfsdk_pkgtools/packager/csar.py
parent6bdd924968ceccf33a86482aa720609d1c156741 (diff)
Adapt to TOSCA.meta chagnes in SOL004 v2.6.1
Adapted to changes made in SOL004 v2.6.1 about TOSCA.meta file content, while still keeps the backward ability to generate SOL004 v2.4.1 compatible csar file. Issue-ID: VNFSDK-420 Change-Id: I2ea8d001211ea15c8409ee2e2802798a0945f390 Signed-off-by: Lianhao Lu <lianhao.lu@intel.com>
Diffstat (limited to 'vnfsdk_pkgtools/packager/csar.py')
-rw-r--r--vnfsdk_pkgtools/packager/csar.py221
1 files changed, 48 insertions, 173 deletions
diff --git a/vnfsdk_pkgtools/packager/csar.py b/vnfsdk_pkgtools/packager/csar.py
index 1bf5c20..5fcbec7 100644
--- a/vnfsdk_pkgtools/packager/csar.py
+++ b/vnfsdk_pkgtools/packager/csar.py
@@ -20,83 +20,43 @@ import tempfile
import zipfile
import requests
-from ruamel import yaml # @UnresolvedImport
from vnfsdk_pkgtools.packager import manifest
+from vnfsdk_pkgtools.packager import toscameta
from vnfsdk_pkgtools.packager import utils
LOG = logging.getLogger(__name__)
-META_FILE = 'TOSCA-Metadata/TOSCA.meta'
-META_FILE_VERSION_KEY = 'TOSCA-Meta-File-Version'
-META_FILE_VERSION_VALUE = '1.0'
-META_CSAR_VERSION_KEY = 'CSAR-Version'
-META_CSAR_VERSION_VALUE = '1.1'
-META_CREATED_BY_KEY = 'Created-By'
-META_CREATED_BY_VALUE = 'ONAP'
-META_ENTRY_DEFINITIONS_KEY = 'Entry-Definitions'
-META_ENTRY_MANIFEST_FILE_KEY = 'Entry-Manifest'
-META_ENTRY_HISTORY_FILE_KEY = 'Entry-Change-Log'
-META_ENTRY_TESTS_DIR_KEY = 'Entry-Tests'
-META_ENTRY_LICENSES_DIR_KEY = 'Entry-Licenses'
-META_ENTRY_CERT_FILE_KEY = 'Entry-Certificate'
-
-BASE_METADATA = {
- META_FILE_VERSION_KEY: META_FILE_VERSION_VALUE,
- META_CSAR_VERSION_KEY: META_CSAR_VERSION_VALUE,
- META_CREATED_BY_KEY: META_CREATED_BY_VALUE,
-}
-
-
-def check_file_dir(root, entry, msg, check_for_non=False, check_dir=False):
- path = os.path.join(root, entry)
- if check_for_non:
- ret = not os.path.exists(path)
- error_msg = '{0} already exists. ' + msg
- elif check_dir:
- ret = os.path.isdir(path)
- error_msg = '{0} is not an existing directory. ' + msg
- else:
- ret = os.path.isfile(path)
- error_msg = '{0} is not an existing file. ' + msg
- if not ret:
- raise ValueError(error_msg.format(path))
-
def write(source, entry, destination, args):
source = os.path.expanduser(source)
destination = os.path.expanduser(destination)
- metadata = BASE_METADATA.copy()
-
- check_file_dir(root=source,
- entry='',
- msg='Please specify the service template directory.',
- check_dir=True)
-
- check_file_dir(root=source,
- entry=entry,
- msg='Please specify a valid entry point.',
- check_dir=False)
- metadata[META_ENTRY_DEFINITIONS_KEY] = entry
-
- check_file_dir(root='',
- entry=destination,
- msg='Please provide a path to where the CSAR should be created.',
- check_for_non=True)
-
- check_file_dir(root=source,
- entry=META_FILE,
- msg='This commands generates a meta file for you. Please '
- 'remove the existing metafile.',
- check_for_non=True)
-
- if(args.manifest):
- check_file_dir(root=source,
- entry=args.manifest,
- msg='Please specify a valid manifest file.',
- check_dir=False)
- metadata[META_ENTRY_MANIFEST_FILE_KEY] = args.manifest
- manifest_file = manifest.Manifest(source, args.manifest)
+
+ utils.check_file_dir(root=source,
+ entry='',
+ msg='Please specify the service template directory.',
+ check_dir=True)
+
+ utils.check_file_dir(root='',
+ entry=destination,
+ msg='Please provide a path to where the CSAR should be created.',
+ check_for_non=True)
+
+ utils.check_file_dir(root=source,
+ entry=toscameta.META_FILE,
+ msg='This commands generates a meta file for you.'
+ 'Please remove the existing metafile.',
+ check_for_non=True)
+ if args.sol241:
+ metadatacls = toscameta.ToscaMeta241
+ else:
+ metadatacls = toscameta.ToscaMeta261
+ metadata = metadatacls(source, args.entry, args.manifest,
+ args.history, args.licenses,
+ args.tests, args.certificate)
+
+ if args.manifest:
+ manifest_file = manifest.Manifest(source, args.manifest)
manifest_file_full_path = os.path.join(source, args.manifest)
elif args.certificate or args.digest:
raise ValueError("Must specify manifest file if certificate or digest is specified")
@@ -104,40 +64,13 @@ def write(source, entry, destination, args):
manifest_file = None
manifest_file_full_path = None
-
- if(args.history):
- check_file_dir(root=source,
- entry=args.history,
- msg='Please specify a valid change history file.',
- check_dir=False)
- metadata[META_ENTRY_HISTORY_FILE_KEY] = args.history
-
if args.certificate:
- check_file_dir(root=source,
- entry=args.certificate,
- msg='Please specify a valid certificate file.',
- check_dir=False)
- metadata[META_ENTRY_CERT_FILE_KEY] = args.certificate
if not args.privkey:
raise ValueError('Need private key file for signing')
- check_file_dir(root='',
- entry=args.privkey,
- msg='Please specify a valid private key file.',
- check_dir=False)
-
- if(args.tests):
- check_file_dir(root=source,
- entry=args.tests,
- msg='Please specify a valid test directory.',
- check_dir=True)
- metadata[META_ENTRY_TESTS_DIR_KEY] = args.tests
-
- if(args.licenses):
- check_file_dir(root=source,
- entry=args.licenses,
- msg='Please specify a valid license directory.',
- check_dir=True)
- metadata[META_ENTRY_LICENSES_DIR_KEY] = args.licenses
+ utils.check_file_dir(root='',
+ entry=args.privkey,
+ msg='Please specify a valid private key file.',
+ check_dir=False)
LOG.debug('Compressing root directory to ZIP')
with zipfile.ZipFile(destination, 'w', zipfile.ZIP_DEFLATED) as f:
@@ -152,14 +85,13 @@ def write(source, entry, destination, args):
for file in files:
file_full_path = os.path.join(root, file)
# skip manifest file here in case we need to generate digest
- if file_full_path!=manifest_file_full_path:
+ if file_full_path != manifest_file_full_path:
file_relative_path = os.path.relpath(file_full_path, source)
LOG.debug('Writing to archive: {0}'.format(file_relative_path))
f.write(file_full_path, file_relative_path)
if manifest_file and args.digest:
LOG.debug('Update file digest: {0}'.format(file_relative_path))
manifest_file.add_file(file_relative_path, args.digest)
-
if manifest_file:
LOG.debug('Update manifest file to temporary file')
manifest_file_full_path = manifest_file.update_to_file(True)
@@ -173,8 +105,8 @@ def write(source, entry, destination, args):
LOG.debug('Writing to archive: {0}'.format(args.manifest))
f.write(manifest_file_full_path, args.manifest)
- LOG.debug('Writing new metadata file to {0}'.format(META_FILE))
- f.writestr(META_FILE, yaml.dump(metadata, default_flow_style=False))
+ LOG.debug('Writing new metadata file to {0}'.format(toscameta.META_FILE))
+ f.writestr(toscameta.META_FILE, metadata.dump_as_string())
class _CSARReader(object):
@@ -192,7 +124,7 @@ class _CSARReader(object):
source = download_target
self.source = os.path.expanduser(source)
self.destination = os.path.expanduser(destination)
- self.metadata = {}
+ self.metadata = None
self.manifest = None
try:
if not os.path.exists(self.source):
@@ -209,19 +141,19 @@ class _CSARReader(object):
@property
def created_by(self):
- return self.metadata.get(META_CREATED_BY_KEY)
+ return self.metadata.created_by
@property
def csar_version(self):
- return self.metadata.get(META_CSAR_VERSION_KEY)
+ return self.metadata.csar_version
@property
def meta_file_version(self):
- return self.metadata.get(META_FILE_VERSION_KEY)
+ return self.metadata.meta_file_version
@property
def entry_definitions(self):
- return self.metadata.get(META_ENTRY_DEFINITIONS_KEY)
+ return self.metadata.entry_definitions
@property
def entry_definitions_yaml(self):
@@ -230,23 +162,23 @@ class _CSARReader(object):
@property
def entry_manifest_file(self):
- return self.metadata.get(META_ENTRY_MANIFEST_FILE_KEY)
+ return self.metadata.entry_manifest_file
@property
def entry_history_file(self):
- return self.metadata.get(META_ENTRY_HISTORY_FILE_KEY)
+ return self.metadata.entry_history_file
@property
def entry_tests_dir(self):
- return self.metadata.get(META_ENTRY_TESTS_DIR_KEY)
+ return self.metadata.entry_tests_dir
@property
def entry_licenses_dir(self):
- return self.metadata.get(META_ENTRY_LICENSES_DIR_KEY)
+ return self.metadata.entry_licenses_dir
@property
def entry_certificate_file(self):
- return self.metadata.get(META_ENTRY_CERT_FILE_KEY)
+ return self.metadata.entry_certificate_file
def _extract(self):
LOG.debug('Extracting CSAR contents')
@@ -257,27 +189,9 @@ class _CSARReader(object):
LOG.debug('CSAR contents successfully extracted')
def _read_metadata(self):
- csar_metafile = os.path.join(self.destination, META_FILE)
- if not os.path.exists(csar_metafile):
- raise ValueError('Metadata file {0} is missing from the CSAR'.format(csar_metafile))
- LOG.debug('CSAR metadata file: {0}'.format(csar_metafile))
- LOG.debug('Attempting to parse CSAR metadata YAML')
- with open(csar_metafile) as f:
- self.metadata.update(yaml.safe_load(f))
- LOG.debug('CSAR metadata:\n{0}'.format(pprint.pformat(self.metadata)))
+ self.metadata = toscameta.create_from_file(self.destination)
def _validate(self, no_verify_cert):
- def validate_key(key, expected=None):
- if not self.metadata.get(key):
- raise ValueError('{0} is missing from the metadata file.'.format(key))
- actual = str(self.metadata[key])
- if expected and actual != expected:
- raise ValueError('{0} is expected to be {1} in the metadata file while it is in '
- 'fact {2}.'.format(key, expected, actual))
- validate_key(META_FILE_VERSION_KEY, expected=META_FILE_VERSION_VALUE)
- validate_key(META_CSAR_VERSION_KEY, expected=META_CSAR_VERSION_VALUE)
- validate_key(META_CREATED_BY_KEY)
- validate_key(META_ENTRY_DEFINITIONS_KEY)
LOG.debug('CSAR entry definitions: {0}'.format(self.entry_definitions))
LOG.debug('CSAR manifest file: {0}'.format(self.entry_manifest_file))
LOG.debug('CSAR change history file: {0}'.format(self.entry_history_file))
@@ -285,50 +199,11 @@ class _CSARReader(object):
LOG.debug('CSAR licenses directory: {0}'.format(self.entry_licenses_dir))
LOG.debug('CSAR certificate file: {0}'.format(self.entry_certificate_file))
- check_file_dir(self.destination,
- self.entry_definitions,
- 'The entry definitions {0} referenced by the metadata '
- 'file does not exist.'.format(self.entry_definitions),
- check_dir=False)
-
- if(self.entry_manifest_file):
- check_file_dir(self.destination,
- self.entry_manifest_file,
- 'The manifest file {0} referenced by the metadata '
- 'file does not exist.'.format(self.entry_manifest_file),
- check_dir=False)
- self.manifest = manifest.Manifest(self.destination,
- self.entry_manifest_file)
-
-
- if(self.entry_history_file):
- check_file_dir(self.destination,
- self.entry_history_file,
- 'The change history file {0} referenced by the metadata '
- 'file does not exist.'.format(self.entry_history_file),
- check_dir=False)
-
- if(self.entry_tests_dir):
- check_file_dir(self.destination,
- self.entry_tests_dir,
- 'The test directory {0} referenced by the metadata '
- 'file does not exist.'.format(self.entry_tests_dir),
- check_dir=True)
-
- if(self.entry_licenses_dir):
- check_file_dir(self.destination,
- self.entry_licenses_dir,
- 'The license directory {0} referenced by the metadata '
- 'file does not exist.'.format(self.entry_licenses_dir),
- check_dir=True)
+ if self.entry_manifest_file:
+ self.manifest = manifest.Manifest(self.destination,
+ self.entry_manifest_file)
if(self.entry_certificate_file):
- # check certificate
- check_file_dir(self.destination,
- self.entry_certificate_file,
- 'The certificate file {0} referenced by the metadata '
- 'file does not exist.'.format(self.entry_certificate_file),
- check_dir=False)
tmp_manifest = self.manifest.save_to_temp_without_cms()
utils.verify(tmp_manifest,
os.path.join(self.destination, self.entry_certificate_file),