diff options
author | Fu Jinhua <fu.jinhua@zte.com.cn> | 2018-10-15 11:50:48 +0000 |
---|---|---|
committer | Gerrit Code Review <gerrit@onap.org> | 2018-10-15 11:50:48 +0000 |
commit | ff49f1daab1c7677ba8401832ea53ae47de90ecc (patch) | |
tree | 37ca0a2510828d391c3c5079291fcfb9078b610b | |
parent | e36ba50b4665a716aab3051c59e7569356e1ae20 (diff) | |
parent | 25942fa48a6b232b242a4a2493485bc6280dde5f (diff) |
Merge "add graph flow"
-rw-r--r-- | lcm/workflows/graphflow/flow/__init__.py | 13 | ||||
-rw-r--r-- | lcm/workflows/graphflow/flow/flow.py | 79 | ||||
-rw-r--r-- | lcm/workflows/graphflow/flow/graph.py | 73 | ||||
-rw-r--r-- | lcm/workflows/graphflow/flow/load.py | 46 | ||||
-rw-r--r-- | lcm/workflows/graphflow/flow/manager.py | 81 |
5 files changed, 292 insertions, 0 deletions
diff --git a/lcm/workflows/graphflow/flow/__init__.py b/lcm/workflows/graphflow/flow/__init__.py new file mode 100644 index 00000000..342c2a8c --- /dev/null +++ b/lcm/workflows/graphflow/flow/__init__.py @@ -0,0 +1,13 @@ +# Copyright 2018 ZTE Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/lcm/workflows/graphflow/flow/flow.py b/lcm/workflows/graphflow/flow/flow.py new file mode 100644 index 00000000..1c5d09ba --- /dev/null +++ b/lcm/workflows/graphflow/flow/flow.py @@ -0,0 +1,79 @@ +# Copyright 2018 ZTE Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import threading +import json +from threading import Thread +from lcm.workflows.graphflow.flow.graph import Graph +from lcm.workflows.graphflow.flow.load import load_class_from_config +from lcm.workflows.graphflow.flow.manager import TaskManager + +logger = logging.getLogger(__name__) + + +def _execute_task(exec_class): + logger.debug("graph task class %s" % exec_class) + exec_class.execute() + + +def create_instance(class_key, class_set, *args): + if class_key in class_set: + import_class = class_set[class_key] + return import_class(*args) + else: + return None + + +class GraphFlow(Thread): + def __init__(self, graph, task_para_dict, config): + Thread.__init__(self) + self._graph = Graph(graph) + self._task_para_dict = task_para_dict + self._imp_class_set = load_class_from_config(config) + self.task_manager = TaskManager() + + def run(self): + logger.debug("GraphFlow begin. graph:%s, task_para_dict:%s", self._graph, json.dumps(self._task_para_dict)) + self.sort_nodes = self._graph.topo_sort() + for node in self.sort_nodes: + pre_nodes = self._graph.get_pre_nodes(node) + logger.debug("current node %s, pre_nodes %s" % (node, pre_nodes)) + if len(pre_nodes) > 0: + self.task_manager.wait_tasks_done(pre_nodes) + if self.task_manager.is_all_task_finished(pre_nodes): + self.create_task(node) + logger.debug("GraphFlow create node %s", node) + else: + logger.debug("GraphFlow, end, error") + break + else: + self.create_task(node) + logger.debug("GraphFlow create node %s", node) + logger.debug("GraphFlow, end") + + def create_task(self, node): + task_para = self._task_para_dict[node] + task_para["key"] = node + task_para["status"] = "started" + task_para["manager"] = self.task_manager + if "type" in task_para: + class_key = task_para["type"] + exec_task = create_instance(class_key, self._imp_class_set, task_para) + self.task_manager.add_task(node, exec_task) + thread_task = threading.Thread(target=_execute_task, args=(exec_task,)) + thread_task.start() + return True + else: + return False diff --git a/lcm/workflows/graphflow/flow/graph.py b/lcm/workflows/graphflow/flow/graph.py new file mode 100644 index 00000000..334eea6a --- /dev/null +++ b/lcm/workflows/graphflow/flow/graph.py @@ -0,0 +1,73 @@ +# Copyright 2018 ZTE Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from collections import deque +from collections import OrderedDict + +logger = logging.getLogger(__name__) + + +class Graph(object): + + def __init__(self, graph_dict=None): + self.graph = OrderedDict() + if graph_dict: + for node, dep_nodes in graph_dict.iteritems(): + self.add_node(node, dep_nodes) + + def add_node(self, node, dep_nodes): + if node not in self.graph: + self.graph[node] = set() + if isinstance(dep_nodes, list): + for dep_node in dep_nodes: + if dep_node not in self.graph: + self.graph[dep_node] = set() + if dep_node not in self.graph[node]: + self.graph[node].add(dep_node) + + def get_pre_nodes(self, node): + return [k for k in self.graph if node in self.graph[k]] + + def topo_sort(self): + degree = {} + for node in self.graph: + degree[node] = 0 + for node in self.graph: + for dependent in self.graph[node]: + degree[dependent] += 1 + queue = deque() + for node in degree: + if degree[node] == 0: + queue.appendleft(node) + sort_list = [] + while queue: + node = queue.pop() + sort_list.append(node) + for dependent in self.graph[node]: + degree[dependent] -= 1 + if degree[dependent] == 0: + queue.appendleft(dependent) + if len(sort_list) == len(self.graph): + return sort_list + else: + return None + + def to_dict(self): + dict = {} + for node, dependents in self.graph.iteritems(): + dict[node] = [] + for dep in dependents: + dict[node].append(dep) + return dict diff --git a/lcm/workflows/graphflow/flow/load.py b/lcm/workflows/graphflow/flow/load.py new file mode 100644 index 00000000..757be892 --- /dev/null +++ b/lcm/workflows/graphflow/flow/load.py @@ -0,0 +1,46 @@ +# Copyright 2018 ZTE Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import importlib +import logging + + +logger = logging.getLogger(__name__) + + +def load_module(imp_module): + try: + imp_module = importlib.import_module(imp_module) + except Exception: + logger.debug("load_module error: %s", imp_module) + imp_module = None + return imp_module + + +def load_class(imp_module, imp_class): + try: + cls = getattr(imp_module, imp_class) + except Exception: + logger.debug("load_class error: %s", imp_class) + cls = None + return cls + + +def load_class_from_config(config): + class_set = {} + for k, v in config.iteritems(): + imp_module = load_module(v["module"]) + cls = load_class(imp_module, v["class"]) + class_set[k] = cls + return class_set diff --git a/lcm/workflows/graphflow/flow/manager.py b/lcm/workflows/graphflow/flow/manager.py new file mode 100644 index 00000000..f0c2cd67 --- /dev/null +++ b/lcm/workflows/graphflow/flow/manager.py @@ -0,0 +1,81 @@ +# Copyright 2018 ZTE Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime +from lcm.workflows.graphflow import STARTED, PROCESSING, FINISHED, ERROR +import logging +import time + +logger = logging.getLogger(__name__) + + +class TaskManager(object): + + def __init__(self): + self.task_set = {} + + def add_task(self, key, task, timeout=None): + self.task_set[key] = task + logger.debug("task_set %s" % self.task_set) + + def update_task_status(self, key, status): + if key in self.task_set: + task = self.task_set[key] + task.update_task(status) + + def update_task(self, key, task): + if key in self.task_set: + self.task_set[key] = task + + def get_task(self, key): + if key in self.task_set: + return self.task_set[key] + else: + return None + + def get_all_task(self): + return self.task_set + + def is_all_task_finished(self, task_key_set=None): + states = [] + if not task_key_set: + task_key_set = self.task_set.keys() + total = len(task_key_set) + for key in task_key_set: + if key in self.task_set: + states.append(self.task_set[key].status) + if len([state for state in states if state == FINISHED]) == total: + return True + else: + for key in task_key_set: + logger.debug("task key %s, status %s" % (key, self.task_set[key].status)) + return False + + def wait_tasks_done(self, task_key_set=None): + if task_key_set: + for key in task_key_set: + if key in self.task_set.keys(): + task = self.task_set[key] + logger.debug("current wait task %s, endtime %s, status %s" % (task.key, task.endtime, task.status)) + while task.endtime >= datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') and task.status in [STARTED, PROCESSING]: + time.sleep(1) + if task.status in [STARTED, PROCESSING]: + task.status = ERROR + logger.debug("wait task final status %s" % task.status) + else: + for task in self.task_set.itervalues(): + while task.endtime >= datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') and task.status in [STARTED, PROCESSING]: + time.sleep(1) + if task.status in [STARTED, PROCESSING]: + task.status = ERROR |