summaryrefslogtreecommitdiffstats
path: root/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/builtin/heal.py
diff options
context:
space:
mode:
Diffstat (limited to 'azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/builtin/heal.py')
-rw-r--r--azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/builtin/heal.py179
1 files changed, 179 insertions, 0 deletions
diff --git a/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/builtin/heal.py b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/builtin/heal.py
new file mode 100644
index 0000000..07e27b1
--- /dev/null
+++ b/azure/aria/aria-extension-cloudify/src/aria/aria/orchestrator/workflows/builtin/heal.py
@@ -0,0 +1,179 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# pylint: skip-file
+
+"""
+Built-in heal workflow.
+"""
+
+from aria import workflow
+
+from .workflows import (install_node, uninstall_node)
+from ..api import task
+
+
+@workflow
+def heal(ctx, graph, node_id):
+ """
+ Built-in heal workflow..
+
+ :param ctx: workflow context
+ :param graph: graph which will describe the workflow.
+ :param node_id: ID of the node to heal
+ :return:
+ """
+ failing_node = ctx.model.node.get(node_id)
+ host_node = ctx.model.node.get(failing_node.host.id)
+ failed_node_subgraph = _get_contained_subgraph(ctx, host_node)
+ failed_node_ids = list(n.id for n in failed_node_subgraph)
+
+ targeted_nodes = [node for node in ctx.nodes
+ if node.id not in failed_node_ids]
+
+ uninstall_subgraph = task.WorkflowTask(
+ heal_uninstall,
+ failing_nodes=failed_node_subgraph,
+ targeted_nodes=targeted_nodes
+ )
+
+ install_subgraph = task.WorkflowTask(
+ heal_install,
+ failing_nodes=failed_node_subgraph,
+ targeted_nodes=targeted_nodes)
+
+ graph.sequence(uninstall_subgraph, install_subgraph)
+
+
+@workflow(suffix_template='{failing_nodes}')
+def heal_uninstall(ctx, graph, failing_nodes, targeted_nodes):
+ """
+ Uninstall phase of the heal mechanism.
+
+ :param ctx: workflow context
+ :param graph: task graph to edit
+ :param failing_nodes: failing nodes to heal
+ :param targeted_nodes: targets of the relationships where the failing node are
+ """
+ node_sub_workflows = {}
+
+ # Create install stub workflow for each unaffected node
+ for node in targeted_nodes:
+ node_stub = task.StubTask()
+ node_sub_workflows[node.id] = node_stub
+ graph.add_tasks(node_stub)
+
+ # create install sub workflow for every node
+ for node in failing_nodes:
+ node_sub_workflow = task.WorkflowTask(uninstall_node,
+ node=node)
+ node_sub_workflows[node.id] = node_sub_workflow
+ graph.add_tasks(node_sub_workflow)
+
+ # create dependencies between the node sub workflow
+ for node in failing_nodes:
+ node_sub_workflow = node_sub_workflows[node.id]
+ for relationship in reversed(node.outbound_relationships):
+ graph.add_dependency(
+ node_sub_workflows[relationship.target_node.id],
+ node_sub_workflow)
+
+ # Add operations for intact nodes depending on a node belonging to nodes
+ for node in targeted_nodes:
+ node_sub_workflow = node_sub_workflows[node.id]
+
+ for relationship in reversed(node.outbound_relationships):
+
+ target_node = \
+ ctx.model.node.get(relationship.target_node.id)
+ target_node_subgraph = node_sub_workflows[target_node.id]
+ graph.add_dependency(target_node_subgraph, node_sub_workflow)
+
+ if target_node in failing_nodes:
+ dependency = task.create_relationship_tasks(
+ relationship=relationship,
+ operation_name='aria.interfaces.relationship_lifecycle.unlink')
+ graph.add_tasks(*dependency)
+ graph.add_dependency(node_sub_workflow, dependency)
+
+
+@workflow(suffix_template='{failing_nodes}')
+def heal_install(ctx, graph, failing_nodes, targeted_nodes):
+ """
+ Install phase of the heal mechanism.
+
+ :param ctx: workflow context
+ :param graph: task graph to edit.
+ :param failing_nodes: failing nodes to heal
+ :param targeted_nodes: targets of the relationships where the failing node are
+ """
+ node_sub_workflows = {}
+
+ # Create install sub workflow for each unaffected
+ for node in targeted_nodes:
+ node_stub = task.StubTask()
+ node_sub_workflows[node.id] = node_stub
+ graph.add_tasks(node_stub)
+
+ # create install sub workflow for every node
+ for node in failing_nodes:
+ node_sub_workflow = task.WorkflowTask(install_node,
+ node=node)
+ node_sub_workflows[node.id] = node_sub_workflow
+ graph.add_tasks(node_sub_workflow)
+
+ # create dependencies between the node sub workflow
+ for node in failing_nodes:
+ node_sub_workflow = node_sub_workflows[node.id]
+ if node.outbound_relationships:
+ dependencies = \
+ [node_sub_workflows[relationship.target_node.id]
+ for relationship in node.outbound_relationships]
+ graph.add_dependency(node_sub_workflow, dependencies)
+
+ # Add operations for intact nodes depending on a node
+ # belonging to nodes
+ for node in targeted_nodes:
+ node_sub_workflow = node_sub_workflows[node.id]
+
+ for relationship in node.outbound_relationships:
+ target_node = ctx.model.node.get(
+ relationship.target_node.id)
+ target_node_subworkflow = node_sub_workflows[target_node.id]
+ graph.add_dependency(node_sub_workflow, target_node_subworkflow)
+
+ if target_node in failing_nodes:
+ dependent = task.create_relationship_tasks(
+ relationship=relationship,
+ operation_name='aria.interfaces.relationship_lifecycle.establish')
+ graph.add_tasks(*dependent)
+ graph.add_dependency(dependent, node_sub_workflow)
+
+
+def _get_contained_subgraph(context, host_node):
+ contained_instances = [node
+ for node in context.nodes
+ if node.host_fk == host_node.id and
+ node.host_fk != node.id]
+ result = [host_node]
+
+ if not contained_instances:
+ return result
+
+ result.extend(contained_instances)
+ for node in contained_instances:
+ result.extend(_get_contained_subgraph(context, node))
+
+ return set(result)