summaryrefslogtreecommitdiffstats
path: root/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/builtin/execute_operation.py
blob: 949f864ef2a9e6b5e6860cfb38b8c1135540f1fe (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Built-in operation execution Workflow.
"""

from ... import workflow
from ..api import task


@workflow
def execute_operation(
        ctx,
        graph,
        interface_name,
        operation_name,
        operation_kwargs,
        run_by_dependency_order,
        type_names,
        node_template_ids,
        node_ids,
        **kwargs):
    """
    Built-in operation execution Workflow.

    :param workflow_context: workflow context
    :param graph: graph which will describe the workflow
    :param operation: operation name to execute
    :param operation_kwargs:
    :param run_by_dependency_order:
    :param type_names:
    :param node_template_ids:
    :param node_ids:
    :param kwargs:
    :return:
    """
    subgraphs = {}
    # filtering node instances
    filtered_nodes = list(_filter_nodes(
        context=ctx,
        node_template_ids=node_template_ids,
        node_ids=node_ids,
        type_names=type_names))

    if run_by_dependency_order:
        filtered_node_ids = set(node_instance.id for node_instance in filtered_nodes)
        for node in ctx.nodes:
            if node.id not in filtered_node_ids:
                subgraphs[node.id] = ctx.task_graph(
                    name='execute_operation_stub_{0}'.format(node.id))

    # registering actual tasks to sequences
    for node in filtered_nodes:
        graph.add_tasks(
            task.OperationTask(
                node,
                interface_name=interface_name,
                operation_name=operation_name,
                arguments=operation_kwargs
            )
        )

    for _, node_sub_workflow in subgraphs.items():
        graph.add_tasks(node_sub_workflow)

    # adding tasks dependencies if required
    if run_by_dependency_order:
        for node in ctx.nodes:
            for relationship in node.relationships:
                graph.add_dependency(
                    source_task=subgraphs[node.id], after=[subgraphs[relationship.target_id]])


def _filter_nodes(context, node_template_ids=(), node_ids=(), type_names=()):
    def _is_node_template_by_id(node_template_id):
        return not node_template_ids or node_template_id in node_template_ids

    def _is_node_by_id(node_id):
        return not node_ids or node_id in node_ids

    def _is_node_by_type(node_type):
        return not node_type.name in type_names

    for node in context.nodes:
        if all((_is_node_template_by_id(node.node_template.id),
                _is_node_by_id(node.id),
                _is_node_by_type(node.node_template.type))):
            yield node