diff options
author | 2018-10-15 15:34:36 +0800 | |
---|---|---|
committer | 2018-10-15 16:18:39 +0800 | |
commit | 2ed9e555ceef54bffd5e8e7fe1575a8e59643742 (patch) | |
tree | 668d6493d32e242f3d43df41ece5e7bef2e0f85f | |
parent | 31b41e271ab2002be5cd0c99331771df219bdeaf (diff) |
add sync task in workflow
add sync task in workflow to support NS
Change-Id: I89e6a26551d47218714526c677d551fc55fe1922
Issue-ID: VFC-1041
Signed-off-by: maopengzhang <zhang.maopeng1@zte.com.cn>
-rw-r--r-- | lcm/workflows/graphflow/__init__.py | 25 | ||||
-rw-r--r-- | lcm/workflows/graphflow/task/__init__.py | 19 | ||||
-rw-r--r-- | lcm/workflows/graphflow/task/sync_rest_task.py | 42 | ||||
-rw-r--r-- | lcm/workflows/graphflow/task/sync_task.py | 30 | ||||
-rw-r--r-- | lcm/workflows/graphflow/task/task.py | 48 |
5 files changed, 164 insertions, 0 deletions
diff --git a/lcm/workflows/graphflow/__init__.py b/lcm/workflows/graphflow/__init__.py new file mode 100644 index 00000000..8e6d0ad0 --- /dev/null +++ b/lcm/workflows/graphflow/__init__.py @@ -0,0 +1,25 @@ +# Copyright 2018 ZTE Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +TASK_STAUS = (STARTED, PROCESSING, FINISHED, ERROR) = ("started", "processing", "finished", "error") +TIMEOUT_DEFAULT = 10 + +# from lcm.workflows.graphflow.flow.flow import GraphFlow +# from lcm.workflows.graphflow.task.task import Task +# from lcm.workflows.graphflow.task.sync_task import SyncTask +# from lcm.workflows.graphflow.task.sync_rest_task import SyncRestTask +# from lcm.workflows.graphflow.task.async_task import AsyncTask +# from lcm.workflows.graphflow.task.async_rest_task import ASyncRestTask +# from lcm.workflows.graphflow.task.lcm_async_rest_task import LcmASyncRestTask +# from lcm.workflows.graphflow.task.lcm_sync_rest_task import LcmSyncRestTask diff --git a/lcm/workflows/graphflow/task/__init__.py b/lcm/workflows/graphflow/task/__init__.py new file mode 100644 index 00000000..8f5f8aa7 --- /dev/null +++ b/lcm/workflows/graphflow/task/__init__.py @@ -0,0 +1,19 @@ +# Copyright 2018 ZTE Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# from lcm.workflows.graphflow.task.async_rest_task import ASyncRestTask +# from lcm.workflows.graphflow.task.async_task import AsyncTask +# from lcm.workflows.graphflow.task.sync_rest_task import SyncRestTask +# from lcm.workflows.graphflow.task.sync_task import SyncTask +# from lcm.workflows.graphflow.task.task import Task diff --git a/lcm/workflows/graphflow/task/sync_rest_task.py b/lcm/workflows/graphflow/task/sync_rest_task.py new file mode 100644 index 00000000..0cb8747a --- /dev/null +++ b/lcm/workflows/graphflow/task/sync_rest_task.py @@ -0,0 +1,42 @@ +# Copyright 2018 ZTE Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from lcm.workflows.graphflow.task.sync_task import SyncTask +import logging + +logger = logging.getLogger(__name__) + + +class SyncRestTask(SyncTask): + + STATUS_OK = (HTTP_200_OK, HTTP_201_CREATED, HTTP_204_NO_CONTENT, HTTP_202_ACCEPTED) = ('200', '201', '204', '202') + HTTP_METHOD = (POST, GET, PUT, DELETE) = ("POST", "GET", "PUT", "DELETE") + + def __init__(self, *args): + super(SyncRestTask, self).__init__(*args) + self.url = self.input.get(self.URL, "") + self.method = self.input.get(self.METHOD, "") + self.content = self.input.get(self.CONTENT, "") + + def run(self): + status, output = self.call_rest(self.url, self.method, self.content) + if status in self.STATUS_OK: + status = self.FINISHED + else: + status = self.ERROR + logger.debug("SyncRestTask status %s, output %s" % (status, output)) + return status, output + + def call_rest(self, url, method, content): + pass diff --git a/lcm/workflows/graphflow/task/sync_task.py b/lcm/workflows/graphflow/task/sync_task.py new file mode 100644 index 00000000..9b7d1c85 --- /dev/null +++ b/lcm/workflows/graphflow/task/sync_task.py @@ -0,0 +1,30 @@ +# Copyright 2018 ZTE Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from lcm.workflows.graphflow.task.task import Task +import logging + +logger = logging.getLogger(__name__) + + +class SyncTask(Task): + + def execute(self): + logger.debug("start task: %s", self.key) + status, output = self.run() + logger.debug("SyncTask status %s, output %s" % (status, output)) + self.update_task(status, output) + + def run(self): + pass diff --git a/lcm/workflows/graphflow/task/task.py b/lcm/workflows/graphflow/task/task.py new file mode 100644 index 00000000..9210d06a --- /dev/null +++ b/lcm/workflows/graphflow/task/task.py @@ -0,0 +1,48 @@ +# Copyright 2018 ZTE Corporation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime +from lcm.workflows.graphflow import STARTED, PROCESSING, FINISHED, ERROR, TIMEOUT_DEFAULT +import logging + +logger = logging.getLogger(__name__) + + +class Task(object): + TASK_STATUS = (STARTED, PROCESSING, FINISHED, ERROR) = (STARTED, PROCESSING, FINISHED, ERROR) + TASK_ATTRIBUTES = (KEY, MANAGER, INPUT, TIMEOUT, ENDTIME, OUTPUT, STATUS) = ("key", "manager", "input", "timeout", "endtime", "output", "status") + INPUT_REST = (URL, METHOD, CONTENT) = ("url", "method", "content") + TIMEOUT_DEFAULT = TIMEOUT_DEFAULT + TIME_FORMAT = '%Y-%m-%d %H:%M:%S' + + def __init__(self, *args): + task = args[0] + self.key = task[self.KEY] + self.taskManager = task[self.MANAGER] + self.input = task[self.INPUT] + self.timeout = task[self.TIMEOUT] if self.TIMEOUT in task else self.TIMEOUT_DEFAULT + self.endtime = (datetime.datetime.now() + datetime.timedelta(seconds=self.timeout)).strftime(self.TIME_FORMAT) + self.status = STARTED + self.output = None + + def execute(self): + pass + + def update_task(self, status, output=None): + task = self.taskManager.get_task(self.key) + task.status = status + if output: + task.output = output + logger.debug("Update task %s status %s" % (task.key, task.status)) + self.taskManager.update_task(self.key, task) |