aboutsummaryrefslogtreecommitdiffstats
path: root/vnfsdk_pkgtools/packager/csar.py
diff options
context:
space:
mode:
Diffstat (limited to 'vnfsdk_pkgtools/packager/csar.py')
-rw-r--r--vnfsdk_pkgtools/packager/csar.py58
1 files changed, 51 insertions, 7 deletions
diff --git a/vnfsdk_pkgtools/packager/csar.py b/vnfsdk_pkgtools/packager/csar.py
index 162985f..a397f2e 100644
--- a/vnfsdk_pkgtools/packager/csar.py
+++ b/vnfsdk_pkgtools/packager/csar.py
@@ -23,6 +23,7 @@ import requests
from ruamel import yaml # @UnresolvedImport
from vnfsdk_pkgtools.packager import manifest
+from vnfsdk_pkgtools.packager import utils
LOG = logging.getLogger(__name__)
@@ -38,6 +39,7 @@ META_ENTRY_MANIFEST_FILE_KEY = 'Entry-Manifest'
META_ENTRY_HISTORY_FILE_KEY = 'Entry-Change-Log'
META_ENTRY_TESTS_DIR_KEY = 'Entry-Tests'
META_ENTRY_LICENSES_DIR_KEY = 'Entry-Licenses'
+META_ENTRY_CERT_FILE_KEY = 'Entry-Certificate'
BASE_METADATA = {
META_FILE_VERSION_KEY: META_FILE_VERSION_VALUE,
@@ -108,6 +110,19 @@ def write(source, entry, destination, args):
check_dir=False)
metadata[META_ENTRY_HISTORY_FILE_KEY] = args.history
+ if args.certificate:
+ check_file_dir(root=source,
+ entry=args.certificate,
+ msg='Please specify a valid certificate file.',
+ check_dir=False)
+ metadata[META_ENTRY_CERT_FILE_KEY] = args.certificate
+ if not args.privkey:
+ raise RuntimeError('Need private key file for signing')
+ check_file_dir(root='',
+ entry=args.privkey,
+ msg='Please specify a valid private key file.',
+ check_dir=False)
+
if(args.tests):
check_file_dir(root=source,
entry=args.tests,
@@ -144,8 +159,14 @@ def write(source, entry, destination, args):
f.write(dir_full_path + os.sep, dir_relative_path)
if manifest_file:
- if args.digest:
- LOG.debug('Update manifest file to temporary file')
+ LOG.debug('Update manifest file to temporary file')
+ manifest_file_full_path = manifest_file.update_to_file(True)
+ if args.certificate and args.privkey:
+ LOG.debug('calculate signature')
+ manifest_file.signature = utils.sign(msg_file=manifest_file_full_path,
+ cert_file=os.path.join(source, args.certificate),
+ key_file=args.privkey)
+ # write cms into it
manifest_file_full_path = manifest_file.update_to_file(True)
LOG.debug('Writing to archive: {0}'.format(args.manifest))
f.write(manifest_file_full_path, args.manifest)
@@ -156,7 +177,7 @@ def write(source, entry, destination, args):
class _CSARReader(object):
- def __init__(self, source, destination):
+ def __init__(self, source, destination, no_verify_cert=True):
if os.path.isdir(destination) and os.listdir(destination):
raise ValueError('{0} already exists and is not empty. '
'Please specify the location where the CSAR '
@@ -179,7 +200,7 @@ class _CSARReader(object):
raise ValueError('{0} is not a valid CSAR.'.format(self.source))
self._extract()
self._read_metadata()
- self._validate()
+ self._validate(no_verify_cert)
finally:
if downloaded_csar:
os.remove(self.source)
@@ -221,6 +242,10 @@ class _CSARReader(object):
def entry_licenses_dir(self):
return self.metadata.get(META_ENTRY_LICENSES_DIR_KEY)
+ @property
+ def entry_certificate_file(self):
+ return self.metadata.get(META_ENTRY_CERT_FILE_KEY)
+
def _extract(self):
LOG.debug('Extracting CSAR contents')
if not os.path.exists(self.destination):
@@ -239,7 +264,7 @@ class _CSARReader(object):
self.metadata.update(yaml.load(f))
LOG.debug('CSAR metadata:\n{0}'.format(pprint.pformat(self.metadata)))
- def _validate(self):
+ def _validate(self, no_verify_cert):
def validate_key(key, expected=None):
if not self.metadata.get(key):
raise ValueError('{0} is missing from the metadata file.'.format(key))
@@ -256,6 +281,7 @@ class _CSARReader(object):
LOG.debug('CSAR change history file: {0}'.format(self.entry_history_file))
LOG.debug('CSAR tests directory: {0}'.format(self.entry_tests_dir))
LOG.debug('CSAR licenses directory: {0}'.format(self.entry_licenses_dir))
+ LOG.debug('CSAR certificate file: {0}'.format(self.entry_certificate_file))
check_file_dir(self.destination,
self.entry_definitions,
@@ -294,6 +320,22 @@ class _CSARReader(object):
'file does not exist.'.format(self.entry_licenses_dir),
check_dir=True)
+ if(self.entry_certificate_file):
+ # check certificate
+ check_file_dir(self.destination,
+ self.entry_certificate_file,
+ 'The certificate file {0} referenced by the metadata '
+ 'file does not exist.'.format(self.entry_certificate_file),
+ check_dir=False)
+ tmp_manifest = self.manifest.save_to_temp_without_cms()
+ utils.verify(tmp_manifest,
+ os.path.join(self.destination, self.entry_certificate_file),
+ self.manifest.signature,
+ no_verify_cert)
+ os.unlink(tmp_manifest)
+
+
+
def _download(self, url, target):
response = requests.get(url, stream=True)
if response.status_code != 200:
@@ -306,5 +348,7 @@ class _CSARReader(object):
f.write(chunk)
-def read(source, destination):
- return _CSARReader(source=source, destination=destination)
+def read(source, destination, no_verify_cert=False):
+ return _CSARReader(source=source,
+ destination=destination,
+ no_verify_cert=no_verify_cert)