summaryrefslogtreecommitdiffstats
path: root/dcae-services-policy-sync/policysync/coroutines.py
blob: 236d6f2d485559ef5a5f2aa9bd19ae0348eb9605 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
# ============LICENSE_START=======================================================
# Copyright (c) 2021 AT&T Intellectual Property. All rights reserved.
# ================================================================================
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============LICENSE_END=========================================================
"""
Asyncio coroutine setup for both periodic and real time notification tasks """
import signal
import asyncio
from prometheus_client import start_http_server
from .inventory import Inventory
from .util import get_module_logger

SLEEP_ON_ERROR = 10
logger = get_module_logger(__name__)


async def notify_task(inventory, sleep):
    """
    start the notification task
    :param inventory: Inventory
    :param sleep: how long to wait on error in seconds
    """

    logger.info("opening notificationhandler for policy...")
    await inventory.client.notificationhandler(
        inventory.check_and_update,
        ids=inventory.policy_ids,
        filters=inventory.policy_filters,
    )
    logger.warning("websocket closed or errored...will attempt reconnection")
    await asyncio.sleep(sleep)


async def periodic_task(inventory, sleep):
    """
    start the periodic task
    :param inventory: Inventory
    :param sleep: how long to wait between periodic checks
    """
    await asyncio.sleep(sleep)
    logger.info("Executing periodic check of PDP policies")
    await inventory.update()


async def task_runner(inventory, sleep, task, should_run):
    """
    Runs a task in an event loop
    :param inventory: Inventory
    :param sleep: how long to wait between loop iterations
    :param task: coroutine to run
    :param should_run: function for should this task continue to run
    """
    # pylint: disable=broad-except
    while should_run():
        try:
            await task(inventory, sleep)
        except asyncio.CancelledError:
            break
        except Exception:
            logger.exception("Received exception")


async def shutdown(loop, tasks, inventory):
    """
    shutdown the event loop and cancel all tasks
    :param loop: Asyncio eventloop
    :param tasks: list of asyncio tasks
    :param inventory: the inventory object
    """

    logger.info("caught signal")
    # Stop the websocket routines
    for task in tasks:
        task.cancel()
        await task

    # Close the client
    await inventory.close()
    loop.stop()


def _setup_coroutines(
    loop,
    inventory,
    shutdown_handler,
    task_r,
    **kwargs
):
    """ sets up the application coroutines"""
    # Task runner takes a function for stop condition
    # (for testing purposes) but should always run in practice
    # pylint: disable=broad-except
    def infinite_condition():
        return True

    logger.info("Starting gather of all policies...")
    try:
        loop.run_until_complete(inventory.gather())
    except Exception:
        logger.exception('received exception on initial gather')

    # websocket and the periodic check of policies
    tasks = [
        loop.create_task(
            task_r(
                inventory,
                kwargs.get('check_period', 2400),
                periodic_task,
                infinite_condition
            )
        )
    ]

    if inventory.client.supports_notifications():
        tasks.append(
            loop.create_task(
                task_r(
                    inventory,
                    SLEEP_ON_ERROR,
                    notify_task,
                    infinite_condition
                )
            )
        )
    else:
        logger.warning(
            "Defaulting to polling... Provide a dmaap url to receive faster updates"
        )

    # Add shutdown handlers for sigint and sigterm
    for signame in ("SIGINT", "SIGTERM"):
        sig = getattr(signal, signame)
        loop.add_signal_handler(
            sig,
            lambda: asyncio.ensure_future(
                shutdown_handler(loop, tasks, inventory)
            ),
        )

    # Start prometheus server daemonthread for metrics/healthchecking
    if 'bind' in kwargs:
        metrics_server = kwargs.get('metrics_server', start_http_server)
        metrics_server(kwargs['bind'].port, addr=kwargs['bind'].hostname)


def start_event_loop(config):
    """
    start the event loop that runs the application
    :param config: Config object for the application
    """
    loop = asyncio.get_event_loop()
    inventory = Inventory(
        config.filters,
        config.ids,
        config.out_file,
        config.client
    )

    _setup_coroutines(
        loop,
        inventory,
        shutdown,
        task_runner,
        metrics_server=start_http_server,
        bind=config.bind,
        check_period=config.check_period
    )

    loop.run_forever()
    loop.close()
    logger.info("shutdown complete")