aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorFu Jinhua <fu.jinhua@zte.com.cn>2018-10-15 11:50:48 +0000
committerGerrit Code Review <gerrit@onap.org>2018-10-15 11:50:48 +0000
commitff49f1daab1c7677ba8401832ea53ae47de90ecc (patch)
tree37ca0a2510828d391c3c5079291fcfb9078b610b
parente36ba50b4665a716aab3051c59e7569356e1ae20 (diff)
parent25942fa48a6b232b242a4a2493485bc6280dde5f (diff)
Merge "add graph flow"
-rw-r--r--lcm/workflows/graphflow/flow/__init__.py13
-rw-r--r--lcm/workflows/graphflow/flow/flow.py79
-rw-r--r--lcm/workflows/graphflow/flow/graph.py73
-rw-r--r--lcm/workflows/graphflow/flow/load.py46
-rw-r--r--lcm/workflows/graphflow/flow/manager.py81
5 files changed, 292 insertions, 0 deletions
diff --git a/lcm/workflows/graphflow/flow/__init__.py b/lcm/workflows/graphflow/flow/__init__.py
new file mode 100644
index 00000000..342c2a8c
--- /dev/null
+++ b/lcm/workflows/graphflow/flow/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2018 ZTE Corporation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/lcm/workflows/graphflow/flow/flow.py b/lcm/workflows/graphflow/flow/flow.py
new file mode 100644
index 00000000..1c5d09ba
--- /dev/null
+++ b/lcm/workflows/graphflow/flow/flow.py
@@ -0,0 +1,79 @@
+# Copyright 2018 ZTE Corporation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import threading
+import json
+from threading import Thread
+from lcm.workflows.graphflow.flow.graph import Graph
+from lcm.workflows.graphflow.flow.load import load_class_from_config
+from lcm.workflows.graphflow.flow.manager import TaskManager
+
+logger = logging.getLogger(__name__)
+
+
+def _execute_task(exec_class):
+ logger.debug("graph task class %s" % exec_class)
+ exec_class.execute()
+
+
+def create_instance(class_key, class_set, *args):
+ if class_key in class_set:
+ import_class = class_set[class_key]
+ return import_class(*args)
+ else:
+ return None
+
+
+class GraphFlow(Thread):
+ def __init__(self, graph, task_para_dict, config):
+ Thread.__init__(self)
+ self._graph = Graph(graph)
+ self._task_para_dict = task_para_dict
+ self._imp_class_set = load_class_from_config(config)
+ self.task_manager = TaskManager()
+
+ def run(self):
+ logger.debug("GraphFlow begin. graph:%s, task_para_dict:%s", self._graph, json.dumps(self._task_para_dict))
+ self.sort_nodes = self._graph.topo_sort()
+ for node in self.sort_nodes:
+ pre_nodes = self._graph.get_pre_nodes(node)
+ logger.debug("current node %s, pre_nodes %s" % (node, pre_nodes))
+ if len(pre_nodes) > 0:
+ self.task_manager.wait_tasks_done(pre_nodes)
+ if self.task_manager.is_all_task_finished(pre_nodes):
+ self.create_task(node)
+ logger.debug("GraphFlow create node %s", node)
+ else:
+ logger.debug("GraphFlow, end, error")
+ break
+ else:
+ self.create_task(node)
+ logger.debug("GraphFlow create node %s", node)
+ logger.debug("GraphFlow, end")
+
+ def create_task(self, node):
+ task_para = self._task_para_dict[node]
+ task_para["key"] = node
+ task_para["status"] = "started"
+ task_para["manager"] = self.task_manager
+ if "type" in task_para:
+ class_key = task_para["type"]
+ exec_task = create_instance(class_key, self._imp_class_set, task_para)
+ self.task_manager.add_task(node, exec_task)
+ thread_task = threading.Thread(target=_execute_task, args=(exec_task,))
+ thread_task.start()
+ return True
+ else:
+ return False
diff --git a/lcm/workflows/graphflow/flow/graph.py b/lcm/workflows/graphflow/flow/graph.py
new file mode 100644
index 00000000..334eea6a
--- /dev/null
+++ b/lcm/workflows/graphflow/flow/graph.py
@@ -0,0 +1,73 @@
+# Copyright 2018 ZTE Corporation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+from collections import deque
+from collections import OrderedDict
+
+logger = logging.getLogger(__name__)
+
+
+class Graph(object):
+
+ def __init__(self, graph_dict=None):
+ self.graph = OrderedDict()
+ if graph_dict:
+ for node, dep_nodes in graph_dict.iteritems():
+ self.add_node(node, dep_nodes)
+
+ def add_node(self, node, dep_nodes):
+ if node not in self.graph:
+ self.graph[node] = set()
+ if isinstance(dep_nodes, list):
+ for dep_node in dep_nodes:
+ if dep_node not in self.graph:
+ self.graph[dep_node] = set()
+ if dep_node not in self.graph[node]:
+ self.graph[node].add(dep_node)
+
+ def get_pre_nodes(self, node):
+ return [k for k in self.graph if node in self.graph[k]]
+
+ def topo_sort(self):
+ degree = {}
+ for node in self.graph:
+ degree[node] = 0
+ for node in self.graph:
+ for dependent in self.graph[node]:
+ degree[dependent] += 1
+ queue = deque()
+ for node in degree:
+ if degree[node] == 0:
+ queue.appendleft(node)
+ sort_list = []
+ while queue:
+ node = queue.pop()
+ sort_list.append(node)
+ for dependent in self.graph[node]:
+ degree[dependent] -= 1
+ if degree[dependent] == 0:
+ queue.appendleft(dependent)
+ if len(sort_list) == len(self.graph):
+ return sort_list
+ else:
+ return None
+
+ def to_dict(self):
+ dict = {}
+ for node, dependents in self.graph.iteritems():
+ dict[node] = []
+ for dep in dependents:
+ dict[node].append(dep)
+ return dict
diff --git a/lcm/workflows/graphflow/flow/load.py b/lcm/workflows/graphflow/flow/load.py
new file mode 100644
index 00000000..757be892
--- /dev/null
+++ b/lcm/workflows/graphflow/flow/load.py
@@ -0,0 +1,46 @@
+# Copyright 2018 ZTE Corporation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import importlib
+import logging
+
+
+logger = logging.getLogger(__name__)
+
+
+def load_module(imp_module):
+ try:
+ imp_module = importlib.import_module(imp_module)
+ except Exception:
+ logger.debug("load_module error: %s", imp_module)
+ imp_module = None
+ return imp_module
+
+
+def load_class(imp_module, imp_class):
+ try:
+ cls = getattr(imp_module, imp_class)
+ except Exception:
+ logger.debug("load_class error: %s", imp_class)
+ cls = None
+ return cls
+
+
+def load_class_from_config(config):
+ class_set = {}
+ for k, v in config.iteritems():
+ imp_module = load_module(v["module"])
+ cls = load_class(imp_module, v["class"])
+ class_set[k] = cls
+ return class_set
diff --git a/lcm/workflows/graphflow/flow/manager.py b/lcm/workflows/graphflow/flow/manager.py
new file mode 100644
index 00000000..f0c2cd67
--- /dev/null
+++ b/lcm/workflows/graphflow/flow/manager.py
@@ -0,0 +1,81 @@
+# Copyright 2018 ZTE Corporation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import datetime
+from lcm.workflows.graphflow import STARTED, PROCESSING, FINISHED, ERROR
+import logging
+import time
+
+logger = logging.getLogger(__name__)
+
+
+class TaskManager(object):
+
+ def __init__(self):
+ self.task_set = {}
+
+ def add_task(self, key, task, timeout=None):
+ self.task_set[key] = task
+ logger.debug("task_set %s" % self.task_set)
+
+ def update_task_status(self, key, status):
+ if key in self.task_set:
+ task = self.task_set[key]
+ task.update_task(status)
+
+ def update_task(self, key, task):
+ if key in self.task_set:
+ self.task_set[key] = task
+
+ def get_task(self, key):
+ if key in self.task_set:
+ return self.task_set[key]
+ else:
+ return None
+
+ def get_all_task(self):
+ return self.task_set
+
+ def is_all_task_finished(self, task_key_set=None):
+ states = []
+ if not task_key_set:
+ task_key_set = self.task_set.keys()
+ total = len(task_key_set)
+ for key in task_key_set:
+ if key in self.task_set:
+ states.append(self.task_set[key].status)
+ if len([state for state in states if state == FINISHED]) == total:
+ return True
+ else:
+ for key in task_key_set:
+ logger.debug("task key %s, status %s" % (key, self.task_set[key].status))
+ return False
+
+ def wait_tasks_done(self, task_key_set=None):
+ if task_key_set:
+ for key in task_key_set:
+ if key in self.task_set.keys():
+ task = self.task_set[key]
+ logger.debug("current wait task %s, endtime %s, status %s" % (task.key, task.endtime, task.status))
+ while task.endtime >= datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') and task.status in [STARTED, PROCESSING]:
+ time.sleep(1)
+ if task.status in [STARTED, PROCESSING]:
+ task.status = ERROR
+ logger.debug("wait task final status %s" % task.status)
+ else:
+ for task in self.task_set.itervalues():
+ while task.endtime >= datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') and task.status in [STARTED, PROCESSING]:
+ time.sleep(1)
+ if task.status in [STARTED, PROCESSING]:
+ task.status = ERROR