From 2ed9e555ceef54bffd5e8e7fe1575a8e59643742 Mon Sep 17 00:00:00 2001 From: maopengzhang Date: Mon, 15 Oct 2018 15:34:36 +0800 Subject: add sync task in workflow add sync task in workflow to support NS Change-Id: I89e6a26551d47218714526c677d551fc55fe1922 Issue-ID: VFC-1041 Signed-off-by: maopengzhang --- lcm/workflows/graphflow/__init__.py | 25 ++++++++++++++ lcm/workflows/graphflow/task/__init__.py | 19 ++++++++++ lcm/workflows/graphflow/task/sync_rest_task.py | 42 ++++++++++++++++++++++ lcm/workflows/graphflow/task/sync_task.py | 30 ++++++++++++++++ lcm/workflows/graphflow/task/task.py | 48 ++++++++++++++++++++++++++ 5 files changed, 164 insertions(+) create mode 100644 lcm/workflows/graphflow/__init__.py create mode 100644 lcm/workflows/graphflow/task/__init__.py create mode 100644 lcm/workflows/graphflow/task/sync_rest_task.py create mode 100644 lcm/workflows/graphflow/task/sync_task.py create mode 100644 lcm/workflows/graphflow/task/task.py diff --git a/lcm/workflows/graphflow/__init__.py b/lcm/workflows/graphflow/__init__.py new file mode 100644 index 00000000..8e6d0ad0 --- /dev/null +++ b/lcm/workflows/graphflow/__init__.py @@ -0,0 +1,25 @@ +# Copyright 2018 ZTE Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +TASK_STAUS = (STARTED, PROCESSING, FINISHED, ERROR) = ("started", "processing", "finished", "error") +TIMEOUT_DEFAULT = 10 + +# from lcm.workflows.graphflow.flow.flow import GraphFlow +# from lcm.workflows.graphflow.task.task import Task +# from lcm.workflows.graphflow.task.sync_task import SyncTask +# from lcm.workflows.graphflow.task.sync_rest_task import SyncRestTask +# from lcm.workflows.graphflow.task.async_task import AsyncTask +# from lcm.workflows.graphflow.task.async_rest_task import ASyncRestTask +# from lcm.workflows.graphflow.task.lcm_async_rest_task import LcmASyncRestTask +# from lcm.workflows.graphflow.task.lcm_sync_rest_task import LcmSyncRestTask diff --git a/lcm/workflows/graphflow/task/__init__.py b/lcm/workflows/graphflow/task/__init__.py new file mode 100644 index 00000000..8f5f8aa7 --- /dev/null +++ b/lcm/workflows/graphflow/task/__init__.py @@ -0,0 +1,19 @@ +# Copyright 2018 ZTE Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# from lcm.workflows.graphflow.task.async_rest_task import ASyncRestTask +# from lcm.workflows.graphflow.task.async_task import AsyncTask +# from lcm.workflows.graphflow.task.sync_rest_task import SyncRestTask +# from lcm.workflows.graphflow.task.sync_task import SyncTask +# from lcm.workflows.graphflow.task.task import Task diff --git a/lcm/workflows/graphflow/task/sync_rest_task.py b/lcm/workflows/graphflow/task/sync_rest_task.py new file mode 100644 index 00000000..0cb8747a --- /dev/null +++ b/lcm/workflows/graphflow/task/sync_rest_task.py @@ -0,0 +1,42 @@ +# Copyright 2018 ZTE Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from lcm.workflows.graphflow.task.sync_task import SyncTask +import logging + +logger = logging.getLogger(__name__) + + +class SyncRestTask(SyncTask): + + STATUS_OK = (HTTP_200_OK, HTTP_201_CREATED, HTTP_204_NO_CONTENT, HTTP_202_ACCEPTED) = ('200', '201', '204', '202') + HTTP_METHOD = (POST, GET, PUT, DELETE) = ("POST", "GET", "PUT", "DELETE") + + def __init__(self, *args): + super(SyncRestTask, self).__init__(*args) + self.url = self.input.get(self.URL, "") + self.method = self.input.get(self.METHOD, "") + self.content = self.input.get(self.CONTENT, "") + + def run(self): + status, output = self.call_rest(self.url, self.method, self.content) + if status in self.STATUS_OK: + status = self.FINISHED + else: + status = self.ERROR + logger.debug("SyncRestTask status %s, output %s" % (status, output)) + return status, output + + def call_rest(self, url, method, content): + pass diff --git a/lcm/workflows/graphflow/task/sync_task.py b/lcm/workflows/graphflow/task/sync_task.py new file mode 100644 index 00000000..9b7d1c85 --- /dev/null +++ b/lcm/workflows/graphflow/task/sync_task.py @@ -0,0 +1,30 @@ +# Copyright 2018 ZTE Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from lcm.workflows.graphflow.task.task import Task +import logging + +logger = logging.getLogger(__name__) + + +class SyncTask(Task): + + def execute(self): + logger.debug("start task: %s", self.key) + status, output = self.run() + logger.debug("SyncTask status %s, output %s" % (status, output)) + self.update_task(status, output) + + def run(self): + pass diff --git a/lcm/workflows/graphflow/task/task.py b/lcm/workflows/graphflow/task/task.py new file mode 100644 index 00000000..9210d06a --- /dev/null +++ b/lcm/workflows/graphflow/task/task.py @@ -0,0 +1,48 @@ +# Copyright 2018 ZTE Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime +from lcm.workflows.graphflow import STARTED, PROCESSING, FINISHED, ERROR, TIMEOUT_DEFAULT +import logging + +logger = logging.getLogger(__name__) + + +class Task(object): + TASK_STATUS = (STARTED, PROCESSING, FINISHED, ERROR) = (STARTED, PROCESSING, FINISHED, ERROR) + TASK_ATTRIBUTES = (KEY, MANAGER, INPUT, TIMEOUT, ENDTIME, OUTPUT, STATUS) = ("key", "manager", "input", "timeout", "endtime", "output", "status") + INPUT_REST = (URL, METHOD, CONTENT) = ("url", "method", "content") + TIMEOUT_DEFAULT = TIMEOUT_DEFAULT + TIME_FORMAT = '%Y-%m-%d %H:%M:%S' + + def __init__(self, *args): + task = args[0] + self.key = task[self.KEY] + self.taskManager = task[self.MANAGER] + self.input = task[self.INPUT] + self.timeout = task[self.TIMEOUT] if self.TIMEOUT in task else self.TIMEOUT_DEFAULT + self.endtime = (datetime.datetime.now() + datetime.timedelta(seconds=self.timeout)).strftime(self.TIME_FORMAT) + self.status = STARTED + self.output = None + + def execute(self): + pass + + def update_task(self, status, output=None): + task = self.taskManager.get_task(self.key) + task.status = status + if output: + task.output = output + logger.debug("Update task %s status %s" % (task.key, task.status)) + self.taskManager.update_task(self.key, task) -- cgit 1.2.3-korg