diff options
author | cluckenbaugh <cl5597@att.com> | 2021-02-11 18:26:30 -0500 |
---|---|---|
committer | cluckenbaugh <cl5597@att.com> | 2021-02-17 17:45:30 -0500 |
commit | fffe41c078fa427f2a62035ee2d6cc5cd407238c (patch) | |
tree | cf4c5e7e78c2a5f2a1a9f9e84e67deb2d24b5bbf /dcae-services-policy-sync/policysync/coroutines.py | |
parent | e8832b777811c0d154929dc10d6a60352cd37bd2 (diff) |
Seed policysync container code
For use by helm microservices to receive policy
Issue-ID: DCAEGEN2-2556
Change-Id: I2d9cb92ab480a90c63a9d8e6242848f7ca2df0f3
Signed-off-by: cluckenbaugh <cl5597@att.com>
Diffstat (limited to 'dcae-services-policy-sync/policysync/coroutines.py')
-rw-r--r-- | dcae-services-policy-sync/policysync/coroutines.py | 182 |
1 files changed, 182 insertions, 0 deletions
diff --git a/dcae-services-policy-sync/policysync/coroutines.py b/dcae-services-policy-sync/policysync/coroutines.py new file mode 100644 index 0000000..236d6f2 --- /dev/null +++ b/dcae-services-policy-sync/policysync/coroutines.py @@ -0,0 +1,182 @@ +# ============LICENSE_START======================================================= +# Copyright (c) 2021 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +""" +Asyncio coroutine setup for both periodic and real time notification tasks """ +import signal +import asyncio +from prometheus_client import start_http_server +from .inventory import Inventory +from .util import get_module_logger + +SLEEP_ON_ERROR = 10 +logger = get_module_logger(__name__) + + +async def notify_task(inventory, sleep): + """ + start the notification task + :param inventory: Inventory + :param sleep: how long to wait on error in seconds + """ + + logger.info("opening notificationhandler for policy...") + await inventory.client.notificationhandler( + inventory.check_and_update, + ids=inventory.policy_ids, + filters=inventory.policy_filters, + ) + logger.warning("websocket closed or errored...will attempt reconnection") + await asyncio.sleep(sleep) + + +async def periodic_task(inventory, sleep): + """ + start the periodic task + :param inventory: Inventory + :param sleep: how long to wait between periodic checks + """ + await asyncio.sleep(sleep) + logger.info("Executing periodic check of PDP policies") + await inventory.update() + + +async def task_runner(inventory, sleep, task, should_run): + """ + Runs a task in an event loop + :param inventory: Inventory + :param sleep: how long to wait between loop iterations + :param task: coroutine to run + :param should_run: function for should this task continue to run + """ + # pylint: disable=broad-except + while should_run(): + try: + await task(inventory, sleep) + except asyncio.CancelledError: + break + except Exception: + logger.exception("Received exception") + + +async def shutdown(loop, tasks, inventory): + """ + shutdown the event loop and cancel all tasks + :param loop: Asyncio eventloop + :param tasks: list of asyncio tasks + :param inventory: the inventory object + """ + + logger.info("caught signal") + # Stop the websocket routines + for task in tasks: + task.cancel() + await task + + # Close the client + await inventory.close() + loop.stop() + + +def _setup_coroutines( + loop, + inventory, + shutdown_handler, + task_r, + **kwargs +): + """ sets up the application coroutines""" + # Task runner takes a function for stop condition + # (for testing purposes) but should always run in practice + # pylint: disable=broad-except + def infinite_condition(): + return True + + logger.info("Starting gather of all policies...") + try: + loop.run_until_complete(inventory.gather()) + except Exception: + logger.exception('received exception on initial gather') + + # websocket and the periodic check of policies + tasks = [ + loop.create_task( + task_r( + inventory, + kwargs.get('check_period', 2400), + periodic_task, + infinite_condition + ) + ) + ] + + if inventory.client.supports_notifications(): + tasks.append( + loop.create_task( + task_r( + inventory, + SLEEP_ON_ERROR, + notify_task, + infinite_condition + ) + ) + ) + else: + logger.warning( + "Defaulting to polling... Provide a dmaap url to receive faster updates" + ) + + # Add shutdown handlers for sigint and sigterm + for signame in ("SIGINT", "SIGTERM"): + sig = getattr(signal, signame) + loop.add_signal_handler( + sig, + lambda: asyncio.ensure_future( + shutdown_handler(loop, tasks, inventory) + ), + ) + + # Start prometheus server daemonthread for metrics/healthchecking + if 'bind' in kwargs: + metrics_server = kwargs.get('metrics_server', start_http_server) + metrics_server(kwargs['bind'].port, addr=kwargs['bind'].hostname) + + +def start_event_loop(config): + """ + start the event loop that runs the application + :param config: Config object for the application + """ + loop = asyncio.get_event_loop() + inventory = Inventory( + config.filters, + config.ids, + config.out_file, + config.client + ) + + _setup_coroutines( + loop, + inventory, + shutdown, + task_runner, + metrics_server=start_http_server, + bind=config.bind, + check_period=config.check_period + ) + + loop.run_forever() + loop.close() + logger.info("shutdown complete") |