# Copyright (c) 2018 Intel Corp. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. # from collections import namedtuple import os import tempfile import six import udatetime from vnfsdk_pkgtools.packager import utils METADATA_KEYS = [ 'vnf_provider_id', 'vnf_product_name', 'vnf_release_data_time', 'vnf_package_version'] DIGEST_KEYS = [ 'Source', 'Algorithm', 'Hash' ] SUPPORTED_HASH_ALGO = ['SHA-256', 'SHA-512'] class ManifestException(Exception): pass class Manifest(object): ' Manifest file in CSAR package' def __init__(self, root_path, manifest_path): self.path = manifest_path self.root = root_path self.metadata = {} # digest dict # :key = source # :value = (algorithm, hash) self.digests = {} self.signature = None self.blocks = [ ] self._split_blocks() self._parse_blocks() @staticmethod def __split_line(s): remain=s try: (key, value)=s.split(':', 1) value = value.strip() remain = None except ValueError: key = None value = None return (key, value, remain) def _split_blocks(self): ''' Split manifest file into blocks, each block is seperated by a empty line or a line with only spaces and tabs. ''' block_content = [ ] with open(os.path.join(self.root, self.path), 'rU') as fp: for line in fp: line = line.strip(' \t\n') if line: block_content.append(line) else: if len(block_content): self.blocks.append(block_content) block_content = [] if len(block_content): self.blocks.append(block_content) def _parse_blocks(self): for block in self.blocks: (key, value, remain) = self.__split_line(block.pop(0)) if key == 'metadata': # metadata block for line in block: (key, value, remain) = self.__split_line(line) if key in METADATA_KEYS: self.metadata[key] = value else: raise ManifestException("Unrecognized metadata %s:" % line) #validate metadata keys missing_keys = set(METADATA_KEYS) - set(self.metadata.keys()) if missing_keys: raise ManifestException("Missing metadata keys: %s" % ','.join(missing_keys)) # validate vnf_release_data_time try: udatetime.from_string(self.metadata['vnf_release_data_time']) except ValueError: raise ManifestException("Non IETF RFC 3339 vnf_release_data_time: %s" % self.metadata['vnf_release_data_time']) elif key in DIGEST_KEYS: # file digest block desc = {} desc[key] = value for line in block: (key, value, remain) = self.__split_line(line) if key in DIGEST_KEYS: desc[key] = value else: raise ManifestException("Unrecognized file digest line %s:" % line) # validate file digest keys missing_keys = set(DIGEST_KEYS) - set(desc.keys()) if missing_keys: raise ManifestException("Missing file digest keys: %s" % ','.join(missing_keys)) # validate file digest algo desc['Algorithm'] = desc['Algorithm'].upper() if desc['Algorithm'] not in SUPPORTED_HASH_ALGO: raise ManifestException("Unsupported hash algorithm: %s" % desc['Algorithm']) # validate file digest hash hash = utils.cal_file_hash(self.root, desc['Source'], desc['Algorithm']) if hash != desc['Hash']: raise ManifestException("Mismatched hash for file %s" % desc['Source']) # nothing is wrong, let's store this self.digests[desc['Source']] = (desc['Algorithm'], desc['Hash']) elif key: raise ManifestException("Unknown key in line '%s:%s'" % (key, value)) elif '--BEGIN CMS--' in remain: if '--END CMS--' not in block[-1]: raise ManifestException("Can NOT find end of sigature block") self.signature = remain + '\n' + '\n'.join(block) else: raise ManifestException("Unknown content: '%s'" % remain) if not self.metadata: raise ManifestException("No metadata") def add_file(self, rel_path, algo='SHA256'): '''Add file to the manifest and calculate the digest ''' algo = algo.upper() if algo not in SUPPORTED_HASH_ALGO: raise ManifestException("Unsupported hash algorithm: %s" % algo) hash = utils.cal_file_hash(self.root, rel_path, algo) self.digests[rel_path] = (algo, hash) def return_as_string(self): '''Return the manifest file content as a string ''' ret = "" # metadata ret += "metadata:\n" ret += "vnf_product_name: %s\n" % (self.metadata['vnf_product_name']) ret += "vnf_provider_id: %s\n" % (self.metadata['vnf_provider_id']) ret += "vnf_package_version: %s\n" % (self.metadata['vnf_package_version']) ret += "vnf_release_data_time: %s\n" % (self.metadata['vnf_release_data_time']) # degist for (key, digest) in six.iteritems(self.digests): ret += "\n" ret += "Source: %s\n" % key ret += "Algorithm: %s\n" % digest[0] ret += "Hash: %s\n" % digest[1] if self.digests: # empty line between digest and signature section ret += "\n" # signature if self.signature: ret += self.signature return ret def update_to_file(self, temporary=False): content = self.return_as_string() if temporary: abs_path = tempfile.mktemp() else: abs_path = os.path.abspath(os.path.join(self.root, self.path)) with open(abs_path, 'w') as fp: fp.write(content) return abs_path def save_to_temp_without_cms(self): # we need to strip cms block with out changing the order of the # file digest content before we verify the signature skip = False lines = [] with open(os.path.join(self.root, self.path), 'rU') as fp: for line in fp: if '--BEGIN CMS--' in line: skip = True elif '--END CMS--' in line: skip = False elif not skip: lines.append(line) # strip trailing empty lines content = ''.join(lines) tmpfile = tempfile.NamedTemporaryFile(mode='w',delete=False) tmpfile.write(content) tmpfile.close() return tmpfile.name