From 04387deeab820e75c6d37d2ddd5b514cb7bcfd9e Mon Sep 17 00:00:00 2001 From: Pavel Aharoni Date: Sun, 11 Jun 2017 14:33:57 +0300 Subject: [SDC-32] separate Tosca Parser from DC Change-Id: I7e7f31ff2bd92fec22031f75b7051d129a21d01b Signed-off-by: Pavel Aharoni --- .../main/resources/Lib/site-packages/pip/index.py | 1106 -------------------- 1 file changed, 1106 deletions(-) delete mode 100644 jython-tosca-parser/src/main/resources/Lib/site-packages/pip/index.py (limited to 'jython-tosca-parser/src/main/resources/Lib/site-packages/pip/index.py') diff --git a/jython-tosca-parser/src/main/resources/Lib/site-packages/pip/index.py b/jython-tosca-parser/src/main/resources/Lib/site-packages/pip/index.py deleted file mode 100644 index 5d2fb58..0000000 --- a/jython-tosca-parser/src/main/resources/Lib/site-packages/pip/index.py +++ /dev/null @@ -1,1106 +0,0 @@ -"""Routines related to PyPI, indexes""" - -import sys -import os -import re -import mimetypes -import posixpath - -from pip.log import logger -from pip.util import Inf, normalize_name, splitext, is_prerelease -from pip.exceptions import ( - DistributionNotFound, BestVersionAlreadyInstalled, InvalidWheelFilename, - UnsupportedWheel, -) -from pip.backwardcompat import urlparse, url2pathname -from pip.download import PipSession, url_to_path, path_to_url -from pip.wheel import Wheel, wheel_ext -from pip.pep425tags import supported_tags, supported_tags_noarch, get_platform -from pip._vendor import html5lib, requests, pkg_resources -from pip._vendor.requests.exceptions import SSLError - - -__all__ = ['PackageFinder'] - - -INSECURE_SCHEMES = { - "http": ["https"], -} - - -class PackageFinder(object): - """This finds packages. - - This is meant to match easy_install's technique for looking for - packages, by reading pages and looking for appropriate links - """ - - def __init__(self, find_links, index_urls, - use_wheel=True, allow_external=[], allow_unverified=[], - allow_all_external=False, allow_all_prereleases=False, - session=None): - self.find_links = find_links - self.index_urls = index_urls - self.cache = PageCache() - # These are boring links that have already been logged somehow: - self.logged_links = set() - - self.use_wheel = use_wheel - - # Do we allow (safe and verifiable) externally hosted files? - self.allow_external = set(normalize_name(n) for n in allow_external) - - # Which names are allowed to install insecure and unverifiable files? - self.allow_unverified = set( - normalize_name(n) for n in allow_unverified - ) - - # Anything that is allowed unverified is also allowed external - self.allow_external |= self.allow_unverified - - # Do we allow all (safe and verifiable) externally hosted files? - self.allow_all_external = allow_all_external - - # Stores if we ignored any external links so that we can instruct - # end users how to install them if no distributions are available - self.need_warn_external = False - - # Stores if we ignored any unsafe links so that we can instruct - # end users how to install them if no distributions are available - self.need_warn_unverified = False - - # Do we want to allow _all_ pre-releases? - self.allow_all_prereleases = allow_all_prereleases - - # The Session we'll use to make requests - self.session = session or PipSession() - - def _sort_locations(self, locations): - """ - Sort locations into "files" (archives) and "urls", and return - a pair of lists (files,urls) - """ - files = [] - urls = [] - - # puts the url for the given file path into the appropriate list - def sort_path(path): - url = path_to_url(path) - if mimetypes.guess_type(url, strict=False)[0] == 'text/html': - urls.append(url) - else: - files.append(url) - - for url in locations: - - is_local_path = os.path.exists(url) - is_file_url = url.startswith('file:') - is_find_link = url in self.find_links - - if is_local_path or is_file_url: - if is_local_path: - path = url - else: - path = url_to_path(url) - if is_find_link and os.path.isdir(path): - path = os.path.realpath(path) - for item in os.listdir(path): - sort_path(os.path.join(path, item)) - elif is_file_url and os.path.isdir(path): - urls.append(url) - elif os.path.isfile(path): - sort_path(path) - else: - urls.append(url) - - return files, urls - - def _link_sort_key(self, link_tuple): - """ - Function used to generate link sort key for link tuples. - The greater the return value, the more preferred it is. - If not finding wheels, then sorted by version only. - If finding wheels, then the sort order is by version, then: - 1. existing installs - 2. wheels ordered via Wheel.support_index_min() - 3. source archives - Note: it was considered to embed this logic into the Link - comparison operators, but then different sdist links - with the same version, would have to be considered equal - """ - parsed_version, link, _ = link_tuple - if self.use_wheel: - support_num = len(supported_tags) - if link == INSTALLED_VERSION: - pri = 1 - elif link.ext == wheel_ext: - wheel = Wheel(link.filename) # can raise InvalidWheelFilename - if not wheel.supported(): - raise UnsupportedWheel( - "%s is not a supported wheel for this platform. It " - "can't be sorted." % wheel.filename - ) - pri = -(wheel.support_index_min()) - else: # sdist - pri = -(support_num) - return (parsed_version, pri) - else: - return parsed_version - - def _sort_versions(self, applicable_versions): - """ - Bring the latest version (and wheels) to the front, but maintain the - existing ordering as secondary. See the docstring for `_link_sort_key` - for details. This function is isolated for easier unit testing. - """ - return sorted( - applicable_versions, - key=self._link_sort_key, - reverse=True - ) - - def find_requirement(self, req, upgrade): - - def mkurl_pypi_url(url): - loc = posixpath.join(url, url_name) - # For maximum compatibility with easy_install, ensure the path - # ends in a trailing slash. Although this isn't in the spec - # (and PyPI can handle it without the slash) some other index - # implementations might break if they relied on easy_install's - # behavior. - if not loc.endswith('/'): - loc = loc + '/' - return loc - - url_name = req.url_name - # Only check main index if index URL is given: - main_index_url = None - if self.index_urls: - # Check that we have the url_name correctly spelled: - main_index_url = Link( - mkurl_pypi_url(self.index_urls[0]), - trusted=True, - ) - # This will also cache the page, so it's okay that we get it again - # later: - page = self._get_page(main_index_url, req) - if page is None: - url_name = self._find_url_name( - Link(self.index_urls[0], trusted=True), - url_name, req - ) or req.url_name - - if url_name is not None: - locations = [ - mkurl_pypi_url(url) - for url in self.index_urls] + self.find_links - else: - locations = list(self.find_links) - for version in req.absolute_versions: - if url_name is not None and main_index_url is not None: - locations = [ - posixpath.join(main_index_url.url, version)] + locations - - file_locations, url_locations = self._sort_locations(locations) - - # We trust every url that the user has given us whether it was given - # via --index-url or --find-links - locations = [Link(url, trusted=True) for url in url_locations] - - logger.debug('URLs to search for versions for %s:' % req) - for location in locations: - logger.debug('* %s' % location) - - # Determine if this url used a secure transport mechanism - parsed = urlparse.urlparse(str(location)) - if parsed.scheme in INSECURE_SCHEMES: - secure_schemes = INSECURE_SCHEMES[parsed.scheme] - - if len(secure_schemes) == 1: - ctx = (location, parsed.scheme, secure_schemes[0], - parsed.netloc) - logger.warn("%s uses an insecure transport scheme (%s). " - "Consider using %s if %s has it available" % - ctx) - elif len(secure_schemes) > 1: - ctx = ( - location, - parsed.scheme, - ", ".join(secure_schemes), - parsed.netloc, - ) - logger.warn("%s uses an insecure transport scheme (%s). " - "Consider using one of %s if %s has any of " - "them available" % ctx) - else: - ctx = (location, parsed.scheme) - logger.warn("%s uses an insecure transport scheme (%s)." % - ctx) - - found_versions = [] - found_versions.extend( - self._package_versions( - # We trust every directly linked archive in find_links - [Link(url, '-f', trusted=True) for url in self.find_links], - req.name.lower() - ) - ) - page_versions = [] - for page in self._get_pages(locations, req): - logger.debug('Analyzing links from page %s' % page.url) - logger.indent += 2 - try: - page_versions.extend( - self._package_versions(page.links, req.name.lower()) - ) - finally: - logger.indent -= 2 - file_versions = list( - self._package_versions( - [Link(url) for url in file_locations], - req.name.lower() - ) - ) - if (not found_versions - and not page_versions - and not file_versions): - logger.fatal( - 'Could not find any downloads that satisfy the requirement' - ' %s' % req - ) - - if self.need_warn_external: - logger.warn("Some externally hosted files were ignored (use " - "--allow-external %s to allow)." % req.name) - - if self.need_warn_unverified: - logger.warn("Some insecure and unverifiable files were ignored" - " (use --allow-unverified %s to allow)." % - req.name) - - raise DistributionNotFound( - 'No distributions at all found for %s' % req - ) - installed_version = [] - if req.satisfied_by is not None: - installed_version = [( - req.satisfied_by.parsed_version, - INSTALLED_VERSION, - req.satisfied_by.version, - )] - if file_versions: - file_versions.sort(reverse=True) - logger.info( - 'Local files found: %s' % - ', '.join([ - url_to_path(link.url) - for parsed, link, version in file_versions - ]) - ) - # this is an intentional priority ordering - all_versions = installed_version + file_versions + found_versions \ - + page_versions - applicable_versions = [] - for (parsed_version, link, version) in all_versions: - if version not in req.req: - logger.info( - "Ignoring link %s, version %s doesn't match %s" % - ( - link, - version, - ','.join([''.join(s) for s in req.req.specs]) - ) - ) - continue - elif (is_prerelease(version) - and not (self.allow_all_prereleases or req.prereleases)): - # If this version isn't the already installed one, then - # ignore it if it's a pre-release. - if link is not INSTALLED_VERSION: - logger.info( - "Ignoring link %s, version %s is a pre-release (use " - "--pre to allow)." % (link, version) - ) - continue - applicable_versions.append((parsed_version, link, version)) - applicable_versions = self._sort_versions(applicable_versions) - existing_applicable = bool([ - link - for parsed_version, link, version in applicable_versions - if link is INSTALLED_VERSION - ]) - if not upgrade and existing_applicable: - if applicable_versions[0][1] is INSTALLED_VERSION: - logger.info( - 'Existing installed version (%s) is most up-to-date and ' - 'satisfies requirement' % req.satisfied_by.version - ) - else: - logger.info( - 'Existing installed version (%s) satisfies requirement ' - '(most up-to-date version is %s)' % - (req.satisfied_by.version, applicable_versions[0][2]) - ) - return None - if not applicable_versions: - logger.fatal( - 'Could not find a version that satisfies the requirement %s ' - '(from versions: %s)' % - ( - req, - ', '.join([ - version - for parsed_version, link, version in all_versions - ]) - ) - ) - - if self.need_warn_external: - logger.warn("Some externally hosted files were ignored (use " - "--allow-external to allow).") - - if self.need_warn_unverified: - logger.warn("Some insecure and unverifiable files were ignored" - " (use --allow-unverified %s to allow)." % - req.name) - - raise DistributionNotFound( - 'No distributions matching the version for %s' % req - ) - if applicable_versions[0][1] is INSTALLED_VERSION: - # We have an existing version, and its the best version - logger.info( - 'Installed version (%s) is most up-to-date (past versions: ' - '%s)' % ( - req.satisfied_by.version, - ', '.join([ - version for parsed_version, link, version - in applicable_versions[1:] - ]) or 'none')) - raise BestVersionAlreadyInstalled - if len(applicable_versions) > 1: - logger.info( - 'Using version %s (newest of versions: %s)' % - ( - applicable_versions[0][2], - ', '.join([ - version for parsed_version, link, version - in applicable_versions - ]) - ) - ) - - selected_version = applicable_versions[0][1] - - if (selected_version.internal is not None - and not selected_version.internal): - logger.warn("%s an externally hosted file and may be " - "unreliable" % req.name) - - if (selected_version.verifiable is not None - and not selected_version.verifiable): - logger.warn("%s is potentially insecure and " - "unverifiable." % req.name) - - if selected_version._deprecated_regex: - logger.deprecated( - "1.7", - "%s discovered using a deprecated method of parsing, " - "in the future it will no longer be discovered" % req.name - ) - - return selected_version - - def _find_url_name(self, index_url, url_name, req): - """ - Finds the true URL name of a package, when the given name isn't quite - correct. - This is usually used to implement case-insensitivity. - """ - if not index_url.url.endswith('/'): - # Vaguely part of the PyPI API... weird but true. - # FIXME: bad to modify this? - index_url.url += '/' - page = self._get_page(index_url, req) - if page is None: - logger.fatal('Cannot fetch index base URL %s' % index_url) - return - norm_name = normalize_name(req.url_name) - for link in page.links: - base = posixpath.basename(link.path.rstrip('/')) - if norm_name == normalize_name(base): - logger.notify( - 'Real name of requirement %s is %s' % (url_name, base) - ) - return base - return None - - def _get_pages(self, locations, req): - """ - Yields (page, page_url) from the given locations, skipping - locations that have errors, and adding download/homepage links - """ - all_locations = list(locations) - seen = set() - - while all_locations: - location = all_locations.pop(0) - if location in seen: - continue - seen.add(location) - - page = self._get_page(location, req) - if page is None: - continue - - yield page - - for link in page.rel_links(): - normalized = normalize_name(req.name).lower() - - if (normalized not in self.allow_external - and not self.allow_all_external): - self.need_warn_external = True - logger.debug("Not searching %s for files because external " - "urls are disallowed." % link) - continue - - if (link.trusted is not None - and not link.trusted - and normalized not in self.allow_unverified): - logger.debug( - "Not searching %s for urls, it is an " - "untrusted link and cannot produce safe or " - "verifiable files." % link - ) - self.need_warn_unverified = True - continue - - all_locations.append(link) - - _egg_fragment_re = re.compile(r'#egg=([^&]*)') - _egg_info_re = re.compile(r'([a-z0-9_.]+)-([a-z0-9_.-]+)', re.I) - _py_version_re = re.compile(r'-py([123]\.?[0-9]?)$') - - def _sort_links(self, links): - """ - Returns elements of links in order, non-egg links first, egg links - second, while eliminating duplicates - """ - eggs, no_eggs = [], [] - seen = set() - for link in links: - if link not in seen: - seen.add(link) - if link.egg_fragment: - eggs.append(link) - else: - no_eggs.append(link) - return no_eggs + eggs - - def _package_versions(self, links, search_name): - for link in self._sort_links(links): - for v in self._link_package_versions(link, search_name): - yield v - - def _known_extensions(self): - extensions = ('.tar.gz', '.tar.bz2', '.tar', '.tgz', '.zip') - if self.use_wheel: - return extensions + (wheel_ext,) - return extensions - - def _link_package_versions(self, link, search_name): - """ - Return an iterable of triples (pkg_resources_version_key, - link, python_version) that can be extracted from the given - link. - - Meant to be overridden by subclasses, not called by clients. - """ - platform = get_platform() - - version = None - if link.egg_fragment: - egg_info = link.egg_fragment - else: - egg_info, ext = link.splitext() - if not ext: - if link not in self.logged_links: - logger.debug('Skipping link %s; not a file' % link) - self.logged_links.add(link) - return [] - if egg_info.endswith('.tar'): - # Special double-extension case: - egg_info = egg_info[:-4] - ext = '.tar' + ext - if ext not in self._known_extensions(): - if link not in self.logged_links: - logger.debug( - 'Skipping link %s; unknown archive format: %s' % - (link, ext) - ) - self.logged_links.add(link) - return [] - if "macosx10" in link.path and ext == '.zip': - if link not in self.logged_links: - logger.debug('Skipping link %s; macosx10 one' % (link)) - self.logged_links.add(link) - return [] - if ext == wheel_ext: - try: - wheel = Wheel(link.filename) - except InvalidWheelFilename: - logger.debug( - 'Skipping %s because the wheel filename is invalid' % - link - ) - return [] - if wheel.name.lower() != search_name.lower(): - logger.debug( - 'Skipping link %s; wrong project name (not %s)' % - (link, search_name) - ) - return [] - if not wheel.supported(): - logger.debug( - 'Skipping %s because it is not compatible with this ' - 'Python' % link - ) - return [] - # This is a dirty hack to prevent installing Binary Wheels from - # PyPI unless it is a Windows or Mac Binary Wheel. This is - # paired with a change to PyPI disabling uploads for the - # same. Once we have a mechanism for enabling support for - # binary wheels on linux that deals with the inherent problems - # of binary distribution this can be removed. - comes_from = getattr(link, "comes_from", None) - if ( - ( - not platform.startswith('win') - and not platform.startswith('macosx') - ) - and comes_from is not None - and urlparse.urlparse( - comes_from.url - ).netloc.endswith("pypi.python.org")): - if not wheel.supported(tags=supported_tags_noarch): - logger.debug( - "Skipping %s because it is a pypi-hosted binary " - "Wheel on an unsupported platform" % link - ) - return [] - version = wheel.version - - if not version: - version = self._egg_info_matches(egg_info, search_name, link) - if version is None: - logger.debug( - 'Skipping link %s; wrong project name (not %s)' % - (link, search_name) - ) - return [] - - if (link.internal is not None - and not link.internal - and not normalize_name(search_name).lower() - in self.allow_external - and not self.allow_all_external): - # We have a link that we are sure is external, so we should skip - # it unless we are allowing externals - logger.debug("Skipping %s because it is externally hosted." % link) - self.need_warn_external = True - return [] - - if (link.verifiable is not None - and not link.verifiable - and not (normalize_name(search_name).lower() - in self.allow_unverified)): - # We have a link that we are sure we cannot verify its integrity, - # so we should skip it unless we are allowing unsafe installs - # for this requirement. - logger.debug("Skipping %s because it is an insecure and " - "unverifiable file." % link) - self.need_warn_unverified = True - return [] - - match = self._py_version_re.search(version) - if match: - version = version[:match.start()] - py_version = match.group(1) - if py_version != sys.version[:3]: - logger.debug( - 'Skipping %s because Python version is incorrect' % link - ) - return [] - logger.debug('Found link %s, version: %s' % (link, version)) - return [( - pkg_resources.parse_version(version), - link, - version, - )] - - def _egg_info_matches(self, egg_info, search_name, link): - match = self._egg_info_re.search(egg_info) - if not match: - logger.debug('Could not parse version from link: %s' % link) - return None - name = match.group(0).lower() - # To match the "safe" name that pkg_resources creates: - name = name.replace('_', '-') - # project name and version must be separated by a dash - look_for = search_name.lower() + "-" - if name.startswith(look_for): - return match.group(0)[len(look_for):] - else: - return None - - def _get_page(self, link, req): - return HTMLPage.get_page( - link, req, - cache=self.cache, - session=self.session, - ) - - -class PageCache(object): - """Cache of HTML pages""" - - failure_limit = 3 - - def __init__(self): - self._failures = {} - self._pages = {} - self._archives = {} - - def too_many_failures(self, url): - return self._failures.get(url, 0) >= self.failure_limit - - def get_page(self, url): - return self._pages.get(url) - - def is_archive(self, url): - return self._archives.get(url, False) - - def set_is_archive(self, url, value=True): - self._archives[url] = value - - def add_page_failure(self, url, level): - self._failures[url] = self._failures.get(url, 0) + level - - def add_page(self, urls, page): - for url in urls: - self._pages[url] = page - - -class HTMLPage(object): - """Represents one page, along with its URL""" - - # FIXME: these regexes are horrible hacks: - _homepage_re = re.compile(r'\s*home\s*page', re.I) - _download_re = re.compile(r'\s*download\s+url', re.I) - _href_re = re.compile( - 'href=(?:"([^"]*)"|\'([^\']*)\'|([^>\\s\\n]*))', - re.I | re.S - ) - - def __init__(self, content, url, headers=None, trusted=None): - self.content = content - self.parsed = html5lib.parse(self.content, namespaceHTMLElements=False) - self.url = url - self.headers = headers - self.trusted = trusted - - def __str__(self): - return self.url - - @classmethod - def get_page(cls, link, req, cache=None, skip_archives=True, session=None): - if session is None: - session = PipSession() - - url = link.url - url = url.split('#', 1)[0] - if cache.too_many_failures(url): - return None - - # Check for VCS schemes that do not support lookup as web pages. - from pip.vcs import VcsSupport - for scheme in VcsSupport.schemes: - if url.lower().startswith(scheme) and url[len(scheme)] in '+:': - logger.debug( - 'Cannot look at %(scheme)s URL %(link)s' % locals() - ) - return None - - if cache is not None: - inst = cache.get_page(url) - if inst is not None: - return inst - try: - if skip_archives: - if cache is not None: - if cache.is_archive(url): - return None - filename = link.filename - for bad_ext in ['.tar', '.tar.gz', '.tar.bz2', '.tgz', '.zip']: - if filename.endswith(bad_ext): - content_type = cls._get_content_type( - url, session=session, - ) - if content_type.lower().startswith('text/html'): - break - else: - logger.debug( - 'Skipping page %s because of Content-Type: ' - '%s' % (link, content_type) - ) - if cache is not None: - cache.set_is_archive(url) - return None - logger.debug('Getting page %s' % url) - - # Tack index.html onto file:// URLs that point to directories - (scheme, netloc, path, params, query, fragment) = \ - urlparse.urlparse(url) - if scheme == 'file' and os.path.isdir(url2pathname(path)): - # add trailing slash if not present so urljoin doesn't trim - # final segment - if not url.endswith('/'): - url += '/' - url = urlparse.urljoin(url, 'index.html') - logger.debug(' file: URL is directory, getting %s' % url) - - resp = session.get(url, headers={"Accept": "text/html"}) - resp.raise_for_status() - - # The check for archives above only works if the url ends with - # something that looks like an archive. However that is not a - # requirement of an url. Unless we issue a HEAD request on every - # url we cannot know ahead of time for sure if something is HTML - # or not. However we can check after we've downloaded it. - content_type = resp.headers.get('Content-Type', 'unknown') - if not content_type.lower().startswith("text/html"): - logger.debug( - 'Skipping page %s because of Content-Type: %s' % - (link, content_type) - ) - if cache is not None: - cache.set_is_archive(url) - return None - - inst = cls(resp.text, resp.url, resp.headers, trusted=link.trusted) - except requests.HTTPError as exc: - level = 2 if exc.response.status_code == 404 else 1 - cls._handle_fail(req, link, exc, url, cache=cache, level=level) - except requests.ConnectionError as exc: - cls._handle_fail( - req, link, "connection error: %s" % exc, url, - cache=cache, - ) - except requests.Timeout: - cls._handle_fail(req, link, "timed out", url, cache=cache) - except SSLError as exc: - reason = ("There was a problem confirming the ssl certificate: " - "%s" % exc) - cls._handle_fail( - req, link, reason, url, - cache=cache, - level=2, - meth=logger.notify, - ) - else: - if cache is not None: - cache.add_page([url, resp.url], inst) - return inst - - @staticmethod - def _handle_fail(req, link, reason, url, cache=None, level=1, meth=None): - if meth is None: - meth = logger.info - - meth("Could not fetch URL %s: %s", link, reason) - meth("Will skip URL %s when looking for download links for %s" % - (link.url, req)) - - if cache is not None: - cache.add_page_failure(url, level) - - @staticmethod - def _get_content_type(url, session=None): - """Get the Content-Type of the given url, using a HEAD request""" - if session is None: - session = PipSession() - - scheme, netloc, path, query, fragment = urlparse.urlsplit(url) - if scheme not in ('http', 'https', 'ftp', 'ftps'): - # FIXME: some warning or something? - # assertion error? - return '' - - resp = session.head(url, allow_redirects=True) - resp.raise_for_status() - - return resp.headers.get("Content-Type", "") - - @property - def api_version(self): - if not hasattr(self, "_api_version"): - _api_version = None - - metas = [ - x for x in self.parsed.findall(".//meta") - if x.get("name", "").lower() == "api-version" - ] - if metas: - try: - _api_version = int(metas[0].get("value", None)) - except (TypeError, ValueError): - _api_version = None - self._api_version = _api_version - return self._api_version - - @property - def base_url(self): - if not hasattr(self, "_base_url"): - base = self.parsed.find(".//base") - if base is not None and base.get("href"): - self._base_url = base.get("href") - else: - self._base_url = self.url - return self._base_url - - @property - def links(self): - """Yields all links in the page""" - for anchor in self.parsed.findall(".//a"): - if anchor.get("href"): - href = anchor.get("href") - url = self.clean_link(urlparse.urljoin(self.base_url, href)) - - # Determine if this link is internal. If that distinction - # doesn't make sense in this context, then we don't make - # any distinction. - internal = None - if self.api_version and self.api_version >= 2: - # Only api_versions >= 2 have a distinction between - # external and internal links - internal = bool( - anchor.get("rel") - and "internal" in anchor.get("rel").split() - ) - - yield Link(url, self, internal=internal) - - def rel_links(self): - for url in self.explicit_rel_links(): - yield url - for url in self.scraped_rel_links(): - yield url - - def explicit_rel_links(self, rels=('homepage', 'download')): - """Yields all links with the given relations""" - rels = set(rels) - - for anchor in self.parsed.findall(".//a"): - if anchor.get("rel") and anchor.get("href"): - found_rels = set(anchor.get("rel").split()) - # Determine the intersection between what rels were found and - # what rels were being looked for - if found_rels & rels: - href = anchor.get("href") - url = self.clean_link( - urlparse.urljoin(self.base_url, href) - ) - yield Link(url, self, trusted=False) - - def scraped_rel_links(self): - # Can we get rid of this horrible horrible method? - for regex in (self._homepage_re, self._download_re): - match = regex.search(self.content) - if not match: - continue - href_match = self._href_re.search(self.content, pos=match.end()) - if not href_match: - continue - url = ( - href_match.group(1) - or href_match.group(2) - or href_match.group(3) - ) - if not url: - continue - url = self.clean_link(urlparse.urljoin(self.base_url, url)) - yield Link(url, self, trusted=False, _deprecated_regex=True) - - _clean_re = re.compile(r'[^a-z0-9$&+,/:;=?@.#%_\\|-]', re.I) - - def clean_link(self, url): - """Makes sure a link is fully encoded. That is, if a ' ' shows up in - the link, it will be rewritten to %20 (while not over-quoting - % or other characters).""" - return self._clean_re.sub( - lambda match: '%%%2x' % ord(match.group(0)), url) - - -class Link(object): - - def __init__(self, url, comes_from=None, internal=None, trusted=None, - _deprecated_regex=False): - self.url = url - self.comes_from = comes_from - self.internal = internal - self.trusted = trusted - self._deprecated_regex = _deprecated_regex - - def __str__(self): - if self.comes_from: - return '%s (from %s)' % (self.url, self.comes_from) - else: - return str(self.url) - - def __repr__(self): - return '' % self - - def __eq__(self, other): - return self.url == other.url - - def __ne__(self, other): - return self.url != other.url - - def __lt__(self, other): - return self.url < other.url - - def __le__(self, other): - return self.url <= other.url - - def __gt__(self, other): - return self.url > other.url - - def __ge__(self, other): - return self.url >= other.url - - def __hash__(self): - return hash(self.url) - - @property - def filename(self): - _, netloc, path, _, _ = urlparse.urlsplit(self.url) - name = posixpath.basename(path.rstrip('/')) or netloc - assert name, ('URL %r produced no filename' % self.url) - return name - - @property - def scheme(self): - return urlparse.urlsplit(self.url)[0] - - @property - def path(self): - return urlparse.urlsplit(self.url)[2] - - def splitext(self): - return splitext(posixpath.basename(self.path.rstrip('/'))) - - @property - def ext(self): - return self.splitext()[1] - - @property - def url_without_fragment(self): - scheme, netloc, path, query, fragment = urlparse.urlsplit(self.url) - return urlparse.urlunsplit((scheme, netloc, path, query, None)) - - _egg_fragment_re = re.compile(r'#egg=([^&]*)') - - @property - def egg_fragment(self): - match = self._egg_fragment_re.search(self.url) - if not match: - return None - return match.group(1) - - _hash_re = re.compile( - r'(sha1|sha224|sha384|sha256|sha512|md5)=([a-f0-9]+)' - ) - - @property - def hash(self): - match = self._hash_re.search(self.url) - if match: - return match.group(2) - return None - - @property - def hash_name(self): - match = self._hash_re.search(self.url) - if match: - return match.group(1) - return None - - @property - def show_url(self): - return posixpath.basename(self.url.split('#', 1)[0].split('?', 1)[0]) - - @property - def verifiable(self): - """ - Returns True if this link can be verified after download, False if it - cannot, and None if we cannot determine. - """ - trusted = self.trusted or getattr(self.comes_from, "trusted", None) - if trusted is not None and trusted: - # This link came from a trusted source. It *may* be verifiable but - # first we need to see if this page is operating under the new - # API version. - try: - api_version = getattr(self.comes_from, "api_version", None) - api_version = int(api_version) - except (ValueError, TypeError): - api_version = None - - if api_version is None or api_version <= 1: - # This link is either trusted, or it came from a trusted, - # however it is not operating under the API version 2 so - # we can't make any claims about if it's safe or not - return - - if self.hash: - # This link came from a trusted source and it has a hash, so we - # can consider it safe. - return True - else: - # This link came from a trusted source, using the new API - # version, and it does not have a hash. It is NOT verifiable - return False - elif trusted is not None: - # This link came from an untrusted source and we cannot trust it - return False - - -# An object to represent the "link" for the installed version of a requirement. -# Using Inf as the url makes it sort higher. -INSTALLED_VERSION = Link(Inf) - - -def get_requirement_from_url(url): - """Get a requirement from the URL, if possible. This looks for #egg - in the URL""" - link = Link(url) - egg_info = link.egg_fragment - if not egg_info: - egg_info = splitext(link.filename)[0] - return package_to_requirement(egg_info) - - -def package_to_requirement(package_name): - """Translate a name like Foo-1.2 to Foo==1.3""" - match = re.search(r'^(.*?)-(dev|\d.*)', package_name) - if match: - name = match.group(1) - version = match.group(2) - else: - name = package_name - version = '' - if version: - return '%s==%s' % (name, version) - else: - return name -- cgit 1.2.3-korg