diff options
Diffstat (limited to 'azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/builtin/execute_operation.py')
-rw-r--r-- | azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/builtin/execute_operation.py | 101 |
1 files changed, 101 insertions, 0 deletions
diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/builtin/execute_operation.py b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/builtin/execute_operation.py new file mode 100644 index 0000000..949f864 --- /dev/null +++ b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/builtin/execute_operation.py @@ -0,0 +1,101 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Built-in operation execution Workflow. +""" + +from ... import workflow +from ..api import task + + +@workflow +def execute_operation( + ctx, + graph, + interface_name, + operation_name, + operation_kwargs, + run_by_dependency_order, + type_names, + node_template_ids, + node_ids, + **kwargs): + """ + Built-in operation execution Workflow. + + :param workflow_context: workflow context + :param graph: graph which will describe the workflow + :param operation: operation name to execute + :param operation_kwargs: + :param run_by_dependency_order: + :param type_names: + :param node_template_ids: + :param node_ids: + :param kwargs: + :return: + """ + subgraphs = {} + # filtering node instances + filtered_nodes = list(_filter_nodes( + context=ctx, + node_template_ids=node_template_ids, + node_ids=node_ids, + type_names=type_names)) + + if run_by_dependency_order: + filtered_node_ids = set(node_instance.id for node_instance in filtered_nodes) + for node in ctx.nodes: + if node.id not in filtered_node_ids: + subgraphs[node.id] = ctx.task_graph( + name='execute_operation_stub_{0}'.format(node.id)) + + # registering actual tasks to sequences + for node in filtered_nodes: + graph.add_tasks( + task.OperationTask( + node, + interface_name=interface_name, + operation_name=operation_name, + arguments=operation_kwargs + ) + ) + + for _, node_sub_workflow in subgraphs.items(): + graph.add_tasks(node_sub_workflow) + + # adding tasks dependencies if required + if run_by_dependency_order: + for node in ctx.nodes: + for relationship in node.relationships: + graph.add_dependency( + source_task=subgraphs[node.id], after=[subgraphs[relationship.target_id]]) + + +def _filter_nodes(context, node_template_ids=(), node_ids=(), type_names=()): + def _is_node_template_by_id(node_template_id): + return not node_template_ids or node_template_id in node_template_ids + + def _is_node_by_id(node_id): + return not node_ids or node_id in node_ids + + def _is_node_by_type(node_type): + return not node_type.name in type_names + + for node in context.nodes: + if all((_is_node_template_by_id(node.node_template.id), + _is_node_by_id(node.id), + _is_node_by_type(node.node_template.type))): + yield node |