# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """ Enhancements and ARIA-specific conveniences for `Click `__. """ import os import sys import difflib import traceback import inspect from functools import wraps import click from ..env import ( env, logger ) from .. import defaults from .. import helptexts from ..ascii_art import ARIA_ASCII_ART from ..inputs import inputs_to_dict from ... import __version__ from ...utils.exceptions import get_exception_as_string CLICK_CONTEXT_SETTINGS = dict( help_option_names=['-h', '--help'], token_normalize_func=lambda param: param.lower()) class MutuallyExclusiveOption(click.Option): def __init__(self, *args, **kwargs): self.mutually_exclusive = set(kwargs.pop('mutually_exclusive', tuple())) self.mutuality_description = kwargs.pop('mutuality_description', ', '.join(self.mutually_exclusive)) self.mutuality_error = kwargs.pop('mutuality_error', helptexts.DEFAULT_MUTUALITY_ERROR_MESSAGE) if self.mutually_exclusive: help = kwargs.get('help', '') kwargs['help'] = '{0}. {1}'.format(help, self._message) super(MutuallyExclusiveOption, self).__init__(*args, **kwargs) def handle_parse_result(self, ctx, opts, args): if (self.name in opts) and self.mutually_exclusive.intersection(opts): raise click.UsageError('Illegal usage: {0}'.format(self._message)) return super(MutuallyExclusiveOption, self).handle_parse_result(ctx, opts, args) @property def _message(self): return '{0} be used together with {1} ({2}).'.format( '{0} cannot'.format(', '.join(self.opts)) if hasattr(self, 'opts') else 'Cannot', self.mutuality_description, self.mutuality_error) def mutually_exclusive_option(*param_decls, **attrs): """ Decorator for mutually exclusive options. This decorator works similarly to `click.option`, but supports an extra ``mutually_exclusive`` argument, which is a list of argument names with which the option is mutually exclusive. You can optionally also supply ``mutuality_description`` and ``mutuality_error`` to override the default messages. NOTE: All mutually exclusive options must use this. It's not enough to use it in just one of the options. """ # NOTE: This code is copied and slightly modified from click.decorators.option and # click.decorators._param_memo. Unfortunately, using click's ``cls`` parameter support does not # work as is with extra decorator arguments. def decorator(func): if 'help' in attrs: attrs['help'] = inspect.cleandoc(attrs['help']) param = MutuallyExclusiveOption(param_decls, **attrs) if not hasattr(func, '__click_params__'): func.__click_params__ = [] func.__click_params__.append(param) return func return decorator def show_version(ctx, param, value): if not value: return logger.info('{0} v{1}'.format(ARIA_ASCII_ART, __version__)) ctx.exit() def inputs_callback(ctx, param, value): """ Allow to pass any inputs we provide to a command as processed inputs instead of having to call ``inputs_to_dict`` inside the command. ``@aria.options.inputs`` already calls this callback so that every time you use the option it returns the inputs as a dictionary. """ if not value: return {} return inputs_to_dict(value) def set_verbosity_level(ctx, param, value): if not value: return env.logging.verbosity_level = value def set_cli_except_hook(): def recommend(possible_solutions): logger.info('Possible solutions:') for solution in possible_solutions: logger.info(' - {0}'.format(solution)) def new_excepthook(tpe, value, trace): if env.logging.is_high_verbose_level(): # log error including traceback logger.error(get_exception_as_string(tpe, value, trace)) else: # write the full error to the log file with open(env.logging.log_file, 'a') as log_file: traceback.print_exception( etype=tpe, value=value, tb=trace, file=log_file) # print only the error message print value if hasattr(value, 'possible_solutions'): recommend(getattr(value, 'possible_solutions')) sys.excepthook = new_excepthook def pass_logger(func): """ Simply passes the logger to a command. """ # Wraps here makes sure the original docstring propagates to click @wraps(func) def wrapper(*args, **kwargs): return func(logger=logger, *args, **kwargs) return wrapper def pass_plugin_manager(func): """ Simply passes the plugin manager to a command. """ # Wraps here makes sure the original docstring propagates to click @wraps(func) def wrapper(*args, **kwargs): return func(plugin_manager=env.plugin_manager, *args, **kwargs) return wrapper def pass_model_storage(func): """ Simply passes the model storage to a command. """ # Wraps here makes sure the original docstring propagates to click @wraps(func) def wrapper(*args, **kwargs): return func(model_storage=env.model_storage, *args, **kwargs) return wrapper def pass_resource_storage(func): """ Simply passes the resource storage to a command. """ # Wraps here makes sure the original docstring propagates to click @wraps(func) def wrapper(*args, **kwargs): return func(resource_storage=env.resource_storage, *args, **kwargs) return wrapper def pass_context(func): """ Make click context ARIA specific. This exists purely for aesthetic reasons, otherwise some decorators are called ``@click.something`` instead of ``@aria.something``. """ return click.pass_context(func) class AliasedGroup(click.Group): def __init__(self, *args, **kwargs): self.max_suggestions = kwargs.pop("max_suggestions", 3) self.cutoff = kwargs.pop("cutoff", 0.5) super(AliasedGroup, self).__init__(*args, **kwargs) def get_command(self, ctx, cmd_name): cmd = click.Group.get_command(self, ctx, cmd_name) if cmd is not None: return cmd matches = \ [x for x in self.list_commands(ctx) if x.startswith(cmd_name)] if not matches: return None elif len(matches) == 1: return click.Group.get_command(self, ctx, matches[0]) ctx.fail('Too many matches: {0}'.format(', '.join(sorted(matches)))) def resolve_command(self, ctx, args): """ Override clicks ``resolve_command`` method and appends *Did you mean ...* suggestions to the raised exception message. """ try: return super(AliasedGroup, self).resolve_command(ctx, args) except click.exceptions.UsageError as error: error_msg = str(error) original_cmd_name = click.utils.make_str(args[0]) matches = difflib.get_close_matches( original_cmd_name, self.list_commands(ctx), self.max_suggestions, self.cutoff) if matches: error_msg += '{0}{0}Did you mean one of these?{0} {1}'.format( os.linesep, '{0} '.format(os.linesep).join(matches, )) raise click.exceptions.UsageError(error_msg, error.ctx) def group(name): """ Allow to create a group with a default click context and a class for Click's ``didyoueamn`` without having to repeat it for every group. """ return click.group( name=name, context_settings=CLICK_CONTEXT_SETTINGS, cls=AliasedGroup) def command(*args, **kwargs): """ Make Click commands ARIA specific. This exists purely for aesthetic reasons, otherwise some decorators are called ``@click.something`` instead of ``@aria.something``. """ return click.command(*args, **kwargs) def argument(*args, **kwargs): """ Make Click arguments specific to ARIA. This exists purely for aesthetic reasons, otherwise some decorators are called ``@click.something`` instead of ``@aria.something`` """ return click.argument(*args, **kwargs) class Options(object): def __init__(self): """ The options API is nicer when you use each option by calling ``@aria.options.some_option`` instead of ``@aria.some_option``. Note that some options are attributes and some are static methods. The reason for that is that we want to be explicit regarding how a developer sees an option. If it can receive arguments, it's a method - if not, it's an attribute. """ self.version = click.option( '--version', is_flag=True, callback=show_version, expose_value=False, is_eager=True, help=helptexts.VERSION) self.json_output = click.option( '--json-output', is_flag=True, help=helptexts.JSON_OUTPUT) self.dry_execution = click.option( '--dry', is_flag=True, help=helptexts.DRY_EXECUTION) self.retry_failed_tasks = click.option( '--retry-failed-tasks', is_flag=True, help=helptexts.RETRY_FAILED_TASK ) self.reset_config = click.option( '--reset-config', is_flag=True, help=helptexts.RESET_CONFIG) self.descending = click.option( '--descending', required=False, is_flag=True, default=defaults.SORT_DESCENDING, help=helptexts.DESCENDING) self.service_template_filename = click.option( '-n', '--service-template-filename', default=defaults.SERVICE_TEMPLATE_FILENAME, help=helptexts.SERVICE_TEMPLATE_FILENAME) self.service_template_mode_full = mutually_exclusive_option( '-f', '--full', 'mode_full', mutually_exclusive=('mode_types',), is_flag=True, help=helptexts.SHOW_FULL, mutuality_description='-t, --types', mutuality_error=helptexts.MODE_MUTUALITY_ERROR_MESSAGE) self.service_mode_full = mutually_exclusive_option( '-f', '--full', 'mode_full', mutually_exclusive=('mode_graph',), is_flag=True, help=helptexts.SHOW_FULL, mutuality_description='-g, --graph', mutuality_error=helptexts.MODE_MUTUALITY_ERROR_MESSAGE) self.mode_types = mutually_exclusive_option( '-t', '--types', 'mode_types', mutually_exclusive=('mode_full',), is_flag=True, help=helptexts.SHOW_TYPES, mutuality_description='-f, --full', mutuality_error=helptexts.MODE_MUTUALITY_ERROR_MESSAGE) self.mode_graph = mutually_exclusive_option( '-g', '--graph', 'mode_graph', mutually_exclusive=('mode_full',), is_flag=True, help=helptexts.SHOW_GRAPH, mutuality_description='-f, --full', mutuality_error=helptexts.MODE_MUTUALITY_ERROR_MESSAGE) self.format_json = mutually_exclusive_option( '-j', '--json', 'format_json', mutually_exclusive=('format_yaml',), is_flag=True, help=helptexts.SHOW_JSON, mutuality_description='-y, --yaml', mutuality_error=helptexts.FORMAT_MUTUALITY_ERROR_MESSAGE) self.format_yaml = mutually_exclusive_option( '-y', '--yaml', 'format_yaml', mutually_exclusive=('format_json',), is_flag=True, help=helptexts.SHOW_YAML, mutuality_description='-j, --json', mutuality_error=helptexts.FORMAT_MUTUALITY_ERROR_MESSAGE) @staticmethod def verbose(expose_value=False): return click.option( '-v', '--verbose', count=True, callback=set_verbosity_level, expose_value=expose_value, is_eager=True, help=helptexts.VERBOSE) @staticmethod def inputs(help): return click.option( '-i', '--inputs', multiple=True, callback=inputs_callback, help=help) @staticmethod def force(help): return click.option( '-f', '--force', is_flag=True, help=help) @staticmethod def task_max_attempts(default=defaults.TASK_MAX_ATTEMPTS): return click.option( '--task-max-attempts', type=int, default=default, help=helptexts.TASK_MAX_ATTEMPTS.format(default)) @staticmethod def sort_by(default='created_at'): return click.option( '--sort-by', required=False, default=default, help=helptexts.SORT_BY) @staticmethod def task_retry_interval(default=defaults.TASK_RETRY_INTERVAL): return click.option( '--task-retry-interval', type=int, default=default, help=helptexts.TASK_RETRY_INTERVAL.format(default)) @staticmethod def service_id(required=False): return click.option( '-s', '--service-id', required=required, help=helptexts.SERVICE_ID) @staticmethod def execution_id(required=False): return click.option( '-e', '--execution-id', required=required, help=helptexts.EXECUTION_ID) @staticmethod def service_template_id(required=False): return click.option( '-t', '--service-template-id', required=required, help=helptexts.SERVICE_TEMPLATE_ID) @staticmethod def service_template_path(required=False): return click.option( '-p', '--service-template-path', required=required, type=click.Path(exists=True)) @staticmethod def service_name(required=False): return click.option( '-s', '--service-name', required=required, help=helptexts.SERVICE_ID) @staticmethod def service_template_name(required=False): return click.option( '-t', '--service-template-name', required=required, help=helptexts.SERVICE_ID) @staticmethod def mark_pattern(): return click.option( '-m', '--mark-pattern', help=helptexts.MARK_PATTERN, type=str, required=False ) options = Options()