summaryrefslogtreecommitdiffstats
path: root/azure/aria/aria-extension-cloudify/src/aria/aria/cli/commands/executions.py
diff options
context:
space:
mode:
Diffstat (limited to 'azure/aria/aria-extension-cloudify/src/aria/aria/cli/commands/executions.py')
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/aria/cli/commands/executions.py246
1 files changed, 246 insertions, 0 deletions
diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/cli/commands/executions.py b/azure/aria/aria-extension-cloudify/src/aria/aria/cli/commands/executions.py
new file mode 100644
index 0000000..cecbbc5
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/aria/cli/commands/executions.py
@@ -0,0 +1,246 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+CLI ``executions`` sub-commands.
+"""
+
+import os
+
+from .. import helptexts
+from .. import table
+from .. import utils
+from .. import logger as cli_logger
+from .. import execution_logging
+from ..core import aria
+from ...modeling.models import Execution
+from ...orchestrator.workflow_runner import WorkflowRunner
+from ...orchestrator.workflows.executor.dry import DryExecutor
+from ...utils import formatting
+from ...utils import threading
+
+EXECUTION_COLUMNS = ('id', 'workflow_name', 'status', 'service_name',
+ 'created_at', 'error')
+
+
+@aria.group(name='executions')
+@aria.options.verbose()
+def executions():
+ """
+ Manage executions
+ """
+ pass
+
+
+@executions.command(name='show',
+ short_help='Show information for an execution')
+@aria.argument('execution-id')
+@aria.options.verbose()
+@aria.pass_model_storage
+@aria.pass_logger
+def show(execution_id, model_storage, logger):
+ """
+ Show information for an execution
+
+ EXECUTION_ID is the unique ID of the execution.
+ """
+ logger.info('Showing execution {0}'.format(execution_id))
+ execution = model_storage.execution.get(execution_id)
+
+ table.print_data(EXECUTION_COLUMNS, execution, 'Execution:', col_max_width=50)
+
+ # print execution parameters
+ logger.info('Execution Inputs:')
+ if execution.inputs:
+ #TODO check this section, havent tested it
+ execution_inputs = [ei.to_dict() for ei in execution.inputs]
+ for input_name, input_value in formatting.decode_dict(
+ execution_inputs).iteritems():
+ logger.info('\t{0}: \t{1}'.format(input_name, input_value))
+ else:
+ logger.info('\tNo inputs')
+
+
+@executions.command(name='list',
+ short_help='List executions')
+@aria.options.service_name(required=False)
+@aria.options.sort_by()
+@aria.options.descending
+@aria.options.verbose()
+@aria.pass_model_storage
+@aria.pass_logger
+def list(service_name,
+ sort_by,
+ descending,
+ model_storage,
+ logger):
+ """
+ List executions
+
+ If SERVICE_NAME is provided, list executions on that service. Otherwise, list executions on all
+ services.
+ """
+ if service_name:
+ logger.info('Listing executions for service {0}...'.format(
+ service_name))
+ service = model_storage.service.get_by_name(service_name)
+ filters = dict(service=service)
+ else:
+ logger.info('Listing all executions...')
+ filters = {}
+
+ executions_list = model_storage.execution.list(
+ filters=filters,
+ sort=utils.storage_sort_param(sort_by, descending)).items
+
+ table.print_data(EXECUTION_COLUMNS, executions_list, 'Executions:')
+
+
+@executions.command(name='start',
+ short_help='Start a workflow on a service')
+@aria.argument('workflow-name')
+@aria.options.service_name(required=True)
+@aria.options.inputs(help=helptexts.EXECUTION_INPUTS)
+@aria.options.dry_execution
+@aria.options.task_max_attempts()
+@aria.options.task_retry_interval()
+@aria.options.mark_pattern()
+@aria.options.verbose()
+@aria.pass_model_storage
+@aria.pass_resource_storage
+@aria.pass_plugin_manager
+@aria.pass_logger
+def start(workflow_name,
+ service_name,
+ inputs,
+ dry,
+ task_max_attempts,
+ task_retry_interval,
+ mark_pattern,
+ model_storage,
+ resource_storage,
+ plugin_manager,
+ logger):
+ """
+ Start a workflow on a service
+
+ SERVICE_NAME is the unique name of the service.
+
+ WORKFLOW_NAME is the unique name of the workflow within the service (e.g. "uninstall").
+ """
+ service = model_storage.service.get_by_name(service_name)
+ executor = DryExecutor() if dry else None # use WorkflowRunner's default executor
+
+ workflow_runner = \
+ WorkflowRunner(
+ model_storage, resource_storage, plugin_manager,
+ service_id=service.id, workflow_name=workflow_name, inputs=inputs, executor=executor,
+ task_max_attempts=task_max_attempts, task_retry_interval=task_retry_interval
+ )
+ logger.info('Starting {0}execution. Press Ctrl+C cancel'.format('dry ' if dry else ''))
+
+ _run_execution(workflow_runner, logger, model_storage, dry, mark_pattern)
+
+
+@executions.command(name='resume',
+ short_help='Resume a stopped execution')
+@aria.argument('execution-id')
+@aria.options.dry_execution
+@aria.options.retry_failed_tasks
+@aria.options.mark_pattern()
+@aria.options.verbose()
+@aria.pass_model_storage
+@aria.pass_resource_storage
+@aria.pass_plugin_manager
+@aria.pass_logger
+def resume(execution_id,
+ retry_failed_tasks,
+ dry,
+ mark_pattern,
+ model_storage,
+ resource_storage,
+ plugin_manager,
+ logger):
+ """
+ Resume a stopped execution
+
+ EXECUTION_ID is the unique ID of the execution.
+ """
+ executor = DryExecutor() if dry else None # use WorkflowRunner's default executor
+
+ execution = model_storage.execution.get(execution_id)
+ if execution.status != execution.CANCELLED:
+ logger.info("Can't resume execution {execution.id} - "
+ "execution is in status {execution.status}. "
+ "Can only resume executions in status {valid_status}"
+ .format(execution=execution, valid_status=execution.CANCELLED))
+ return
+
+ workflow_runner = \
+ WorkflowRunner(
+ model_storage, resource_storage, plugin_manager,
+ execution_id=execution_id, retry_failed_tasks=retry_failed_tasks, executor=executor,
+ )
+
+ logger.info('Resuming {0}execution. Press Ctrl+C cancel'.format('dry ' if dry else ''))
+ _run_execution(workflow_runner, logger, model_storage, dry, mark_pattern)
+
+
+def _run_execution(workflow_runner, logger, model_storage, dry, mark_pattern):
+ execution_thread_name = '{0}_{1}'.format(workflow_runner.service.name,
+ workflow_runner.execution.workflow_name)
+ execution_thread = threading.ExceptionThread(target=workflow_runner.execute,
+ name=execution_thread_name)
+
+ execution_thread.start()
+
+ last_task_id = workflow_runner.execution.logs[-1].id if workflow_runner.execution.logs else 0
+ log_iterator = cli_logger.ModelLogIterator(model_storage,
+ workflow_runner.execution_id,
+ offset=last_task_id)
+ try:
+ while execution_thread.is_alive():
+ execution_logging.log_list(log_iterator, mark_pattern=mark_pattern)
+ execution_thread.join(1)
+
+ except KeyboardInterrupt:
+ _cancel_execution(workflow_runner, execution_thread, logger, log_iterator)
+
+ # It might be the case where some logs were written and the execution was terminated, thus we
+ # need to drain the remaining logs.
+ execution_logging.log_list(log_iterator, mark_pattern=mark_pattern)
+
+ # raise any errors from the execution thread (note these are not workflow execution errors)
+ execution_thread.raise_error_if_exists()
+
+ execution = workflow_runner.execution
+ logger.info('Execution has ended with "{0}" status'.format(execution.status))
+ if execution.status == Execution.FAILED and execution.error:
+ logger.info('Execution error:{0}{1}'.format(os.linesep, execution.error))
+
+ if dry:
+ # remove traces of the dry execution (including tasks, logs, inputs..)
+ model_storage.execution.delete(execution)
+
+
+def _cancel_execution(workflow_runner, execution_thread, logger, log_iterator):
+ logger.info('Cancelling execution. Press Ctrl+C again to force-cancel.')
+ workflow_runner.cancel()
+ while execution_thread.is_alive():
+ try:
+ execution_logging.log_list(log_iterator)
+ execution_thread.join(1)
+ except KeyboardInterrupt:
+ pass