# Copyright (c) 2018 Intel Corp. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. # import os import re import tempfile import six import udatetime from vnfsdk_pkgtools.packager import utils METADATA_KEYS = [ 'vnf_provider_id', 'vnf_product_name', 'vnf_release_data_time', 'vnf_package_version'] DIGEST_KEYS = [ 'Source', 'Algorithm', 'Hash' ] SUPPORTED_HASH_ALGO = ['SHA-256', 'SHA-512'] NON_MANO_ARTIFACT_RE = re.compile(r'^[0-9a-z_-]+(\.[0-9a-z_-]+)*:$') class ManifestException(Exception): pass class Manifest(object): ' Manifest file in CSAR package' def __init__(self, root_path, manifest_path): self.path = manifest_path self.root = root_path self.metadata = {} # digest dict # :key = source # :value = (algorithm, hash) self.digests = {} # signature string, in CMS format self.signature = None # non_mano_artifact dict # :key = set identifier # :value = list of files self.non_mano_artifacts = {} self.blocks = [ ] self._split_blocks() self._parse_all_blocks() @staticmethod def __split_line(s): remain=s try: (key, value)=s.split(':', 1) value = value.strip() remain = None except ValueError: key = None value = None return (key, value, remain) def _split_blocks(self): ''' Split manifest file into blocks, each block is seperated by a empty line or a line with only spaces and tabs. ''' block_content = [ ] with open(os.path.join(self.root, self.path), 'rU') as fp: for line in fp: line = line.strip(' \t\n') if line: block_content.append(line) else: if len(block_content): self.blocks.append(block_content) block_content = [] if len(block_content): self.blocks.append(block_content) def _parse_all_blocks(self): for block in self.blocks: if block[0] == 'metadata:': self.parse_metadata(block) elif block[0] == 'non_mano_artifact_sets:': self.parse_non_mano_artifacts(block) elif '--BEGIN CMS--' in block[0]: self.parse_cms(block) else: self.parse_digest(block) if not self.metadata: raise ManifestException("No metadata") def parse_metadata(self, lines): # Skip the first line for line in lines[1:]: (key, value, remain) = self.__split_line(line) if key in METADATA_KEYS: self.metadata[key] = value else: raise ManifestException("Unrecognized metadata %s:" % line) #validate metadata keys missing_keys = set(METADATA_KEYS) - set(self.metadata.keys()) if missing_keys: raise ManifestException("Missing metadata keys: %s" % ','.join(missing_keys)) # validate vnf_release_data_time try: udatetime.from_string(self.metadata['vnf_release_data_time']) except ValueError: raise ManifestException("Non IETF RFC 3339 vnf_release_data_time: %s" % self.metadata['vnf_release_data_time']) def parse_cms(self, lines): if '--END CMS--' not in lines[-1]: raise ManifestException("Can NOT find end of sigature block") self.signature = '\n'.join(lines) def parse_digest(self, lines): desc = {} for line in lines: (key, value, remain) = self.__split_line(line) if key in DIGEST_KEYS: desc[key] = value else: raise ManifestException("Unrecognized file digest line %s:" % line) if key == 'Source': self.digests[value] = (None, None) elif key == 'Algorithm': #validate algorithm desc['Algorithm'] = desc['Algorithm'].upper() if desc['Algorithm'] not in SUPPORTED_HASH_ALGO: raise ManifestException("Unsupported hash algorithm: %s" % desc['Algorithm']) #validate hash if desc.get('Algorithm') and desc.get('Hash') and desc.get('Source'): hash = utils.cal_file_hash(self.root, desc['Source'], desc['Algorithm']) if hash != desc['Hash']: raise ManifestException("Mismatched hash for file %s" % desc['Source']) # nothing is wrong, let's store this and start a new round self.digests[desc['Source']] = (desc['Algorithm'], desc['Hash']) desc = {} def parse_non_mano_artifacts(self, lines): # Skip the first line identifier = None for line in lines[1:]: if re.match(NON_MANO_ARTIFACT_RE, line): # new non mano artifact identifier identifier = line[:-1] self.non_mano_artifacts[identifier] = [] else: (key, value, remain) = self.__split_line(line) if key == 'Source' and value and not remain and identifier: # check for file existence utils.check_file_dir(self.root, value) self.non_mano_artifacts[identifier].append(value) else: raise ManifestException("Unrecogized non mano artifacts line %s:" % line) def add_file(self, rel_path, algo='SHA256'): '''Add file to the manifest and calculate the digest ''' if algo: algo = algo.upper() if algo not in SUPPORTED_HASH_ALGO: raise ManifestException("Unsupported hash algorithm: %s" % algo) hash = utils.cal_file_hash(self.root, rel_path, algo) else: hash = None self.digests[rel_path] = (algo, hash) def return_as_string(self): '''Return the manifest file content as a string ''' ret = "" # metadata ret += "metadata:\n" ret += "vnf_product_name: %s\n" % (self.metadata['vnf_product_name']) ret += "vnf_provider_id: %s\n" % (self.metadata['vnf_provider_id']) ret += "vnf_package_version: %s\n" % (self.metadata['vnf_package_version']) ret += "vnf_release_data_time: %s\n" % (self.metadata['vnf_release_data_time']) # non_mano_artifacts if self.non_mano_artifacts: ret += "\nnon_mano_artifact_sets:\n" for (key, sources) in six.iteritems(self.non_mano_artifacts): ret += key + ":\n" for s in sources: ret += "Source: %s\n" % s # degist for (key, digest) in six.iteritems(self.digests): ret += "\n" ret += "Source: %s\n" % key if digest[0]: ret += "Algorithm: %s\n" % digest[0] ret += "Hash: %s\n" % digest[1] if self.digests: # empty line between digest and signature section ret += "\n" # signature if self.signature: ret += self.signature return ret def update_to_file(self, temporary=False): content = self.return_as_string() if temporary: abs_path = tempfile.mktemp() else: abs_path = os.path.abspath(os.path.join(self.root, self.path)) with open(abs_path, 'w') as fp: fp.write(content) return abs_path def save_to_temp_without_cms(self): # we need to strip cms block with out changing the order of the # file digest content before we verify the signature skip = False lines = [] with open(os.path.join(self.root, self.path), 'rU') as fp: for line in fp: if '--BEGIN CMS--' in line: skip = True elif '--END CMS--' in line: skip = False elif not skip: lines.append(line) content = ''.join(lines) tmpfile = tempfile.NamedTemporaryFile(mode='w',delete=False) tmpfile.write(content) tmpfile.close() return tmpfile.name