aboutsummaryrefslogtreecommitdiffstats
path: root/src/onapsdk/utils/mixins.py
blob: 7a64a157392bd5dab0f68c7baa1da1602a38ec30 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
"""Mixins module."""
#   Copyright 2022 Orange, Deutsche Telekom AG
#
#   Licensed under the Apache License, Version 2.0 (the "License");
#   you may not use this file except in compliance with the License.
#   You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#   Unless required by applicable law or agreed to in writing, software
#   distributed under the License is distributed on an "AS IS" BASIS,
#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#   See the License for the specific language governing permissions and
#   limitations under the License.
from abc import ABC, abstractmethod
from ctypes import c_bool
from multiprocessing import Process, Value
from time import sleep


class WaitForFinishMixin(ABC):
    """Wait for finish mixin.

    Mixin with wait_for_finish method and two properties:
     - completed,
     - finished.

    Can be used to wait for result of asynchronous tasks.

    """

    WAIT_FOR_SLEEP_TIME = 10

    @property
    @abstractmethod
    def completed(self) -> bool:
        """Store an information if object task is completed or not.

        Returns:
            bool: True if object task is completed, False otherwise.

        """

    @property
    @abstractmethod
    def finished(self) -> bool:
        """Store an information if object task is finished or not.

        Returns:
            bool: True if object task is finished, False otherwise.

        """

    def _wait_for_finish(self, return_value: Value) -> bool:
        """Wait until object task is finished.

        Method called in another process.

        Args:
            return_value(Value): value shared with main process to pass there
                if object task was completed or not

        """
        while not self.finished:
            sleep(self.WAIT_FOR_SLEEP_TIME)
        self._logger.info(f"{self.__class__.__name__} task finished")
        return_value.value = self.completed

    def wait_for_finish(self, timeout: float = None) -> bool:
        """Wait until object task is finished.

        It uses time.sleep with WAIT_FOR_SLEEP_TIME value as a parameter to
            wait unitl request is finished (object's finished property is
            equal to True).

        It runs another process to control time of the function. If process timed out
            TimeoutError is going to be raised.

        Args:
            timeout(float, optional): positive number, wait at most timeout seconds

        Raises:
            TimeoutError: Raised when function timed out

        Returns:
            bool: True if object's task is successfully completed, False otherwise

        """
        self._logger.debug(f"Wait until {self.__class__.__name__} task is not finished")
        return_value: Value = Value(c_bool)
        wait_for_process: Process = Process(target=self._wait_for_finish, args=(return_value,))
        try:
            wait_for_process.start()
            wait_for_process.join(timeout)
            return return_value.value
        finally:
            if wait_for_process.is_alive():
                wait_for_process.terminate()
                raise TimeoutError