summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authormaopengzhang <zhang.maopeng1@zte.com.cn>2018-10-15 16:37:30 +0800
committermaopengzhang <zhang.maopeng1@zte.com.cn>2018-10-15 16:37:30 +0800
commit1a33c98a346d9657acb600ea888297820d66a258 (patch)
treed7e715264a31582ff2ce329d20f201ed5a8d7a15
parent2ed9e555ceef54bffd5e8e7fe1575a8e59643742 (diff)
add async task in workflow
add async task in workflow to support NS Change-Id: I91d463c3461e7970c96c42c77bed471043fc76e8 Issue-ID: VFC-1041 Signed-off-by: maopengzhang <zhang.maopeng1@zte.com.cn>
-rw-r--r--lcm/workflows/graphflow/task/async_rest_task.py40
-rw-r--r--lcm/workflows/graphflow/task/async_task.py65
2 files changed, 105 insertions, 0 deletions
diff --git a/lcm/workflows/graphflow/task/async_rest_task.py b/lcm/workflows/graphflow/task/async_rest_task.py
new file mode 100644
index 00000000..ac76d7bf
--- /dev/null
+++ b/lcm/workflows/graphflow/task/async_rest_task.py
@@ -0,0 +1,40 @@
+# Copyright 2018 ZTE Corporation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+from lcm.workflows.graphflow.task.async_task import AsyncTask
+
+logger = logging.getLogger(__name__)
+
+
+class ASyncRestTask(AsyncTask):
+ STATUS_OK = (HTTP_200_OK, HTTP_201_CREATED, HTTP_204_NO_CONTENT, HTTP_202_ACCEPTED) = ('200', '201', '204', '202')
+ HTTP_METHOD = (POST, GET, PUT, DELETE) = ("POST", "GET", "PUT", "DELETE")
+
+ def __init__(self, *args):
+ super(ASyncRestTask, self).__init__(*args)
+ self.url = self.input.get(self.URL, "")
+ self.method = self.input.get(self.METHOD, "")
+ self.content = self.input.get(self.CONTENT, "")
+
+ def run(self):
+ status, resp_content = self.call_rest(self.url, self.method, self.content)
+ if status not in self.STATUS_OK:
+ status = self.ERROR
+ else:
+ status = self.PROCESSING
+ return status, resp_content
+
+ def call_rest(self, url, method, content=None):
+ pass
diff --git a/lcm/workflows/graphflow/task/async_task.py b/lcm/workflows/graphflow/task/async_task.py
new file mode 100644
index 00000000..65a1274f
--- /dev/null
+++ b/lcm/workflows/graphflow/task/async_task.py
@@ -0,0 +1,65 @@
+# Copyright 2018 ZTE Corporation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import time
+import datetime
+from threading import Thread
+from lcm.workflows.graphflow.task.task import Task
+
+logger = logging.getLogger(__name__)
+
+
+class AsyncTask(Task):
+
+ def __init__(self, *args):
+ super(AsyncTask, self).__init__(*args)
+
+ def execute(self):
+ logger.debug("start task: %s", self.key)
+ status, output = self.run()
+ self.update_task(status, output)
+ if status == self.PROCESSING:
+ WatchTask(self).start()
+
+ def run(self):
+ pass
+
+ def get_task_status(self):
+ status = self.get_ext_status()
+ return status if status else self.status
+
+ def update_task_status(self, status):
+ self.status = status
+
+ def get_ext_status(self):
+ return None
+
+
+class WatchTask(Thread):
+
+ def __init__(self, task):
+ Thread.__init__(self)
+ self.task = task
+ self.timeout = task.timeout
+ self.endtime = (datetime.datetime.now() + datetime.timedelta(seconds=self.timeout)).strftime(self.task.TIME_FORMAT)
+
+ def run(self):
+ status = ""
+ while status not in [self.task.FINISHED, self.task.ERROR] and self.endtime >= datetime.datetime.now().strftime(self.task.TIME_FORMAT):
+ status = self.task.get_task_status()
+ logger.debug("task %s, status %s", self.task.key, status)
+ time.sleep(1)
+ status = self.task.ERROR if status != self.task.FINISHED else self.task.FINISHED
+ self.task.update_task_status(status)