diff options
Diffstat (limited to 'src/onapsdk/utils/mixins.py')
-rw-r--r-- | src/onapsdk/utils/mixins.py | 99 |
1 files changed, 99 insertions, 0 deletions
diff --git a/src/onapsdk/utils/mixins.py b/src/onapsdk/utils/mixins.py new file mode 100644 index 0000000..7a64a15 --- /dev/null +++ b/src/onapsdk/utils/mixins.py @@ -0,0 +1,99 @@ +"""Mixins module.""" +# Copyright 2022 Orange, Deutsche Telekom AG +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from abc import ABC, abstractmethod +from ctypes import c_bool +from multiprocessing import Process, Value +from time import sleep + + +class WaitForFinishMixin(ABC): + """Wait for finish mixin. + + Mixin with wait_for_finish method and two properties: + - completed, + - finished. + + Can be used to wait for result of asynchronous tasks. + + """ + + WAIT_FOR_SLEEP_TIME = 10 + + @property + @abstractmethod + def completed(self) -> bool: + """Store an information if object task is completed or not. + + Returns: + bool: True if object task is completed, False otherwise. + + """ + + @property + @abstractmethod + def finished(self) -> bool: + """Store an information if object task is finished or not. + + Returns: + bool: True if object task is finished, False otherwise. + + """ + + def _wait_for_finish(self, return_value: Value) -> bool: + """Wait until object task is finished. + + Method called in another process. + + Args: + return_value(Value): value shared with main process to pass there + if object task was completed or not + + """ + while not self.finished: + sleep(self.WAIT_FOR_SLEEP_TIME) + self._logger.info(f"{self.__class__.__name__} task finished") + return_value.value = self.completed + + def wait_for_finish(self, timeout: float = None) -> bool: + """Wait until object task is finished. + + It uses time.sleep with WAIT_FOR_SLEEP_TIME value as a parameter to + wait unitl request is finished (object's finished property is + equal to True). + + It runs another process to control time of the function. If process timed out + TimeoutError is going to be raised. + + Args: + timeout(float, optional): positive number, wait at most timeout seconds + + Raises: + TimeoutError: Raised when function timed out + + Returns: + bool: True if object's task is successfully completed, False otherwise + + """ + self._logger.debug(f"Wait until {self.__class__.__name__} task is not finished") + return_value: Value = Value(c_bool) + wait_for_process: Process = Process(target=self._wait_for_finish, args=(return_value,)) + try: + wait_for_process.start() + wait_for_process.join(timeout) + return return_value.value + finally: + if wait_for_process.is_alive(): + wait_for_process.terminate() + raise TimeoutError |