summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorFu Jinhua <fu.jinhua@zte.com.cn>2018-10-15 09:00:52 +0000
committerGerrit Code Review <gerrit@onap.org>2018-10-15 09:00:52 +0000
commit5beebce488d545bdaf34342571ba1b15df4827e8 (patch)
treea453937d986459cef7fae4646ccf519307cc38a7
parent5d3d082b5db8028585fe2ef38054929c8ba06cd1 (diff)
parent1a33c98a346d9657acb600ea888297820d66a258 (diff)
Merge "add async task in workflow"
-rw-r--r--lcm/workflows/graphflow/task/async_rest_task.py40
-rw-r--r--lcm/workflows/graphflow/task/async_task.py65
2 files changed, 105 insertions, 0 deletions
diff --git a/lcm/workflows/graphflow/task/async_rest_task.py b/lcm/workflows/graphflow/task/async_rest_task.py
new file mode 100644
index 00000000..ac76d7bf
--- /dev/null
+++ b/lcm/workflows/graphflow/task/async_rest_task.py
@@ -0,0 +1,40 @@
+# Copyright 2018 ZTE Corporation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+from lcm.workflows.graphflow.task.async_task import AsyncTask
+
+logger = logging.getLogger(__name__)
+
+
+class ASyncRestTask(AsyncTask):
+ STATUS_OK = (HTTP_200_OK, HTTP_201_CREATED, HTTP_204_NO_CONTENT, HTTP_202_ACCEPTED) = ('200', '201', '204', '202')
+ HTTP_METHOD = (POST, GET, PUT, DELETE) = ("POST", "GET", "PUT", "DELETE")
+
+ def __init__(self, *args):
+ super(ASyncRestTask, self).__init__(*args)
+ self.url = self.input.get(self.URL, "")
+ self.method = self.input.get(self.METHOD, "")
+ self.content = self.input.get(self.CONTENT, "")
+
+ def run(self):
+ status, resp_content = self.call_rest(self.url, self.method, self.content)
+ if status not in self.STATUS_OK:
+ status = self.ERROR
+ else:
+ status = self.PROCESSING
+ return status, resp_content
+
+ def call_rest(self, url, method, content=None):
+ pass
diff --git a/lcm/workflows/graphflow/task/async_task.py b/lcm/workflows/graphflow/task/async_task.py
new file mode 100644
index 00000000..65a1274f
--- /dev/null
+++ b/lcm/workflows/graphflow/task/async_task.py
@@ -0,0 +1,65 @@
+# Copyright 2018 ZTE Corporation.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import time
+import datetime
+from threading import Thread
+from lcm.workflows.graphflow.task.task import Task
+
+logger = logging.getLogger(__name__)
+
+
+class AsyncTask(Task):
+
+ def __init__(self, *args):
+ super(AsyncTask, self).__init__(*args)
+
+ def execute(self):
+ logger.debug("start task: %s", self.key)
+ status, output = self.run()
+ self.update_task(status, output)
+ if status == self.PROCESSING:
+ WatchTask(self).start()
+
+ def run(self):
+ pass
+
+ def get_task_status(self):
+ status = self.get_ext_status()
+ return status if status else self.status
+
+ def update_task_status(self, status):
+ self.status = status
+
+ def get_ext_status(self):
+ return None
+
+
+class WatchTask(Thread):
+
+ def __init__(self, task):
+ Thread.__init__(self)
+ self.task = task
+ self.timeout = task.timeout
+ self.endtime = (datetime.datetime.now() + datetime.timedelta(seconds=self.timeout)).strftime(self.task.TIME_FORMAT)
+
+ def run(self):
+ status = ""
+ while status not in [self.task.FINISHED, self.task.ERROR] and self.endtime >= datetime.datetime.now().strftime(self.task.TIME_FORMAT):
+ status = self.task.get_task_status()
+ logger.debug("task %s, status %s", self.task.key, status)
+ time.sleep(1)
+ status = self.task.ERROR if status != self.task.FINISHED else self.task.FINISHED
+ self.task.update_task_status(status)