diff options
Diffstat (limited to 'dcae-services-policy-sync/policysync')
-rw-r--r-- | dcae-services-policy-sync/policysync/__init__.py | 16 | ||||
-rw-r--r-- | dcae-services-policy-sync/policysync/clients.py | 512 | ||||
-rw-r--r-- | dcae-services-policy-sync/policysync/cmd.py | 234 | ||||
-rw-r--r-- | dcae-services-policy-sync/policysync/coroutines.py | 182 | ||||
-rw-r--r-- | dcae-services-policy-sync/policysync/inventory.py | 169 | ||||
-rw-r--r-- | dcae-services-policy-sync/policysync/metrics.py | 38 | ||||
-rw-r--r-- | dcae-services-policy-sync/policysync/util.py | 10 |
7 files changed, 1161 insertions, 0 deletions
diff --git a/dcae-services-policy-sync/policysync/__init__.py b/dcae-services-policy-sync/policysync/__init__.py new file mode 100644 index 0000000..7c04dfd --- /dev/null +++ b/dcae-services-policy-sync/policysync/__init__.py @@ -0,0 +1,16 @@ +# ============LICENSE_START======================================================= +# Copyright (c) 2021 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +""" nothing here """ diff --git a/dcae-services-policy-sync/policysync/clients.py b/dcae-services-policy-sync/policysync/clients.py new file mode 100644 index 0000000..698bc86 --- /dev/null +++ b/dcae-services-policy-sync/policysync/clients.py @@ -0,0 +1,512 @@ +# ============LICENSE_START======================================================= +# Copyright (c) 2021 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +"""Clients for communicating with both the post dublin and pre dublin APIs""" +import json +import re +import base64 +import uuid +import asyncio +import aiohttp +import policysync.metrics as metrics +from .util import get_module_logger + +logger = get_module_logger(__name__) + +# Websocket config +WS_HEARTBEAT = 60 +WS_NOTIFICATIONS_ENDPOINT = "pdp/notifications" +# REST config +V1_DECISION_ENDPOINT = "policy/pdpx/v1/decision" +V0_DECISION_ENDPOINT = "pdp/api" + +APPLICATION_JSON = "application/json" + + +def get_single_regex(filters, ids): + """given a list of filters and ids returns a single regex for matching""" + filters = [] if filters is None else filters + ids = [] if ids is None else ["{}[.][0-9]+[.]xml".format(x) for x in ids] + return "|".join(filters + ids) if filters is not None else "" + + +class BasePolicyClient: + """ Base policy client that is pluggable into inventory """ + def __init__(self, pdp_url, headers=None): + self.headers = {} if headers is None else headers + self.session = None + self.pdp_url = pdp_url + + def _init_rest_session(self): + """ + initialize an aiohttp rest session + :returns: an aiohttp rest session + """ + if self.session is None: + self.session = aiohttp.ClientSession( + headers=self.headers, raise_for_status=True + ) + + return self.session + + async def _run_request(self, endpoint, request_data): + """ + execute a particular REST request + :param endpoint: str rest endpoint to query + :param request_data: dictionary request data + :returns: dictionary response data + """ + session = self._init_rest_session() + async with session.post( + "{0}/{1}".format(self.pdp_url, endpoint), json=request_data + ) as resp: + data = await resp.read() + return json.loads(data) + + def supports_notifications(self): + """ + does this particular client support real time notifictions + :returns: True + """ + # in derived classes we may use self + # pylint: disable=no-self-use + return True + + async def list_policies(self, filters=None, ids=None): + """ + used to get a list of policies matching a particular ID + :param filters: list of regex filter strings for matching + :param ids: list of id strings for matching + :returns: List of policies matching filters or ids + """ + raise NotImplementedError + + async def get_config(self, filters=None, ids=None): + """ + used to get a list of policies matching a particular ID + :returns: List of policies matching filters or ids + """ + raise NotImplementedError + + async def notificationhandler(self, callback, ids=None, filters=None): + """ + Clients should implement this to support real time notifications + :param callback: func to execute when a matching notification is found + :param ids: list of id strings for matching + """ + raise NotImplementedError + + async def close(self): + """ close the policy client """ + logger.info("closing websocket clients...") + if self.session: + await self.session.close() + + +class PolicyClientV0(BasePolicyClient): + """ + Supports the legacy v0 policy API use prior to ONAP Dublin + """ + async def close(self): + """ close the policy client """ + await super().close() + if self.ws_session is not None: + await self.ws_session.close() + + def __init__( + self, + headers, + pdp_url, + decision_endpoint=V0_DECISION_ENDPOINT, + ws_endpoint=WS_NOTIFICATIONS_ENDPOINT + ): + """ + Initialize a v0 policy client + :param headers: Headers to use for policy rest api + :param pdp_url: URL of the PDP + :param decision_endpoint: root for the decison API + :param websocket_endpoint: root of the websocket endpoint + """ + super().__init__(pdp_url, headers=headers) + self.ws_session = None + self.session = None + self.decision_endpoint = decision_endpoint + self.ws_endpoint = ws_endpoint + self._ws = None + + def _init_ws_session(self): + """initialize a websocket session for notifications""" + if self.ws_session is None: + self.ws_session = aiohttp.ClientSession() + + return self.ws_session + + @metrics.list_policy_exceptions.count_exceptions() + async def list_policies(self, filters=None, ids=None): + """ + used to get a list of policies matching a particular ID + :param filters: list of regex filter strings for matching + :param ids: list of id strings for matching + :returns: List of policies matching filters or ids + """ + request_data = self._prepare_request(filters, ids) + policies = await self._run_request( + f"{self.decision_endpoint}/listPolicy", request_data + ) + return set(policies) + + @classmethod + def _prepare_request(cls, filters, ids): + """prepare the request body for the v0 api""" + regex = get_single_regex(filters, ids) + return {"policyName": regex} + + @metrics.get_config_exceptions.count_exceptions() + async def get_config(self, filters=None, ids=None): + """ + Used to get the actual policy configuration from PDP + :return: the policy objects that are currently active + for the given set of filters + """ + request_data = self._prepare_request(filters, ids) + policies = await self._run_request( + f"{self.decision_endpoint}/getConfig", request_data) + + for policy in policies: + try: + policy["config"] = json.loads(policy["config"]) + except json.JSONDecodeError: + pass + + return policies + + @classmethod + def _needs_update(cls, update, ids=None, filters=None): + """ + Expect something like this + { + "removedPolicies": [{ + "policyName": "xyz.45.xml", + "versionNo": "45" + }], + "loadedPolicies": [{ + "policyName": "xyz.46.xml", + "versionNo": "46", + "matches": { + "ONAPName": "DCAE", + "ConfigName": "DCAE_HighlandPark_AgingConfig", + "service": "DCAE_HighlandPark_AgingConfig", + "guard": "false", + "location": " Edge", + "TTLDate": "NA", + "uuid": "TestUUID", + "RiskLevel": "5", + "RiskType": "default" + }, + "updateType": "UPDATE" + }], + "notificationType": "BOTH" + } + """ + for policy in update.get("removedPolicies", []) + update.get( + "loadedPolicies", [] + ): + if ( + re.match(get_single_regex(filters, ids), policy["policyName"]) + is not None + ): + return True + + return False + + async def notificationhandler(self, callback, ids=None, filters=None): + """ + websocket based notification handler for + :param callback: function to execute when + a matching notification is found + :param ids: list of id strings for matching + """ + + url = self.pdp_url.replace("https", "wss") + + # The websocket we start here will periodically + # send heartbeat (ping frames) to policy + # this ensures that we are never left hanging + # with our communication with policy. + session = self._init_ws_session() + try: + websocket = await session.ws_connect( + "{0}/{1}".format(url, self.ws_endpoint), heartbeat=WS_HEARTBEAT + ) + logger.info("websock with policy established") + async for msg in websocket: + # check for websocket errors + # break out of this async for loop. to attempt reconnection + if msg.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.ERROR): + break + + if msg.type is (aiohttp.WSMsgType.TEXT): + if self._needs_update( + json.loads(msg.data), + ids=ids, + filters=filters + ): + logger.debug( + "notification received from pdp websocket -> %s", msg + ) + await callback() + else: + logger.warning( + "unexpected websocket message type received %s", msg.type + ) + except aiohttp.ClientError: + logger.exception("Received connection error with websocket") + + +class PolicyClientV1(BasePolicyClient): + """ + Supports the v1 policy API introduced in ONAP's dublin release + """ + + async def close(self): + """ close the policy client """ + await super().close() + if self.dmaap_session is not None: + await self.dmaap_session.close() + + def _init_dmaap_session(self): + """ initialize a dmaap session for notifications """ + if self.dmaap_session is None: + self.dmaap_session = aiohttp.ClientSession( + headers=self.dmaap_headers, + raise_for_status=True + ) + + return self.dmaap_session + + def __init__( + self, + headers, + pdp_url, + **kwargs, + ): + super().__init__(pdp_url, headers=headers) + self._ws = None + self.audit_uuid = str(uuid.uuid4()) + self.dmaap_url = kwargs.get('dmaap_url') + self.dmaap_timeout = 15000 + self.dmaap_session = None + self.dmaap_headers = kwargs.get('dmaap_headers', {}) + self.decision = kwargs.get('v1_decision', V1_DECISION_ENDPOINT) + + async def list_policies(self, filters=None, ids=None): + """ + ONAP has no real equivalent to this. + :returns: None + """ + # in derived classes we may use self + # pylint: disable=no-self-use + return None + + @classmethod + def convert_to_policy(cls, policy_body): + """ + Convert raw policy to format expected by microservices + :param policy_body: raw dictionary output from pdp + :returns: data in proper formatting + """ + pdp_metadata = policy_body.get("metadata", {}) + policy_id = pdp_metadata.get("policy-id") + policy_version = policy_body.get("version") + if not policy_id or policy_version is None: + logger.warning("Malformed policy is missing policy-id and version") + return None + + policy_body["policyName"] = "{}.{}.xml".format( + policy_id, str(policy_version.replace(".", "-")) + ) + policy_body["policyVersion"] = str(policy_version) + if "properties" in policy_body: + policy_body["config"] = policy_body["properties"] + del policy_body["properties"] + + return policy_body + + @metrics.get_config_exceptions.count_exceptions() + async def get_config(self, filters=None, ids=None): + """ + Used to get the actual policy configuration from PDP + :returns: the policy objects that are currently active + for the given set of filters + """ + if ids is None: + ids = [] + + request_data = { + "ONAPName": "DCAE", + "ONAPComponent": "policy-sync", + "ONAPInstance": self.audit_uuid, + "action": "configure", + "resource": {"policy-id": ids} + } + + data = await self._run_request(self.decision, request_data) + out = [] + for policy_body in data["policies"].values(): + policy = self.convert_to_policy(policy_body) + if policy is not None: + out.append(policy) + + return out + + def supports_notifications(self): + """ + Does this policy client support real time notifications + :returns: True if the dmaap url is set else return false + """ + return self.dmaap_url is not None + + @classmethod + def _needs_update(cls, update, ids): + """ + expect something like this + { + "deployed-policies": [ + { + "policy-type": "onap.policies.monitoring.tcagen2", + "policy-type-version": "1.0.0", + "policy-id": "onap.scaleout.tca", + "policy-version": "2.0.0", + "success-count": 3, + "failure-count": 0 + } + ], + "undeployed-policies": [ + { + "policy-type": "onap.policies.monitoring.tcagen2", + "policy-type-version": "1.0.0", + "policy-id": "onap.firewall.tca", + "policy-version": "6.0.0", + "success-count": 3, + "failure-count": 0 + } + ] + } + """ + for policy in update.get("deployed-policies", []) + update.get( + "undeployed-policies", [] + ): + if policy.get("policy-id") in ids: + return True + + return False + + async def poll_dmaap(self, callback, ids=None): + """ + one GET request to dmaap + :param callback: function to execute when a + matching notification is found + :param ids: list of id strings for matching + """ + query = f"?timeout={self.dmaap_timeout}" + url = f"{self.dmaap_url}/{self.audit_uuid}/0{query}" + logger.info("polling topic: %s", url) + session = self._init_dmaap_session() + try: + async with session.get(url) as response: + messages = await response.read() + + for msg in json.loads(messages): + if self._needs_update(json.loads(msg), ids): + logger.info( + "notification received from dmaap -> %s", msg + ) + await callback() + except aiohttp.ClientError: + logger.exception('received connection error from dmaap topic') + # wait some time + await asyncio.sleep(30) + + async def notificationhandler(self, callback, ids=None, filters=None): + """ + dmaap based notification handler for + :param callback: function to execute when a + matching notification is found + :param ids: list of id strings for matching + """ + if filters is not None: + logger.warning("filters are not supported with pdp v1..ignoring") + while True: + await self.poll_dmaap(callback, ids=ids) + + +def get_client( + pdp_url, + use_v0=False, + **kwargs +): + """ + get a particular policy client + :param use_v0: whether this should be a v0 client or + :return: A policy client + """ + if pdp_url is None: + raise ValueError("POLICY_SYNC_PDP_URL set or --pdp flag not set") + + pdp_headers = { + "Accept": APPLICATION_JSON, + "Content-Type": APPLICATION_JSON + } + + if 'pdp_user' in kwargs and 'pdp_password' in kwargs: + auth = base64.b64encode( + "{}:{}".format( + kwargs.get('pdp_user'), + kwargs.get('pdp_password') + ).encode("utf-8") + ) + pdp_headers["Authorization"] = "Basic {}".format(auth.decode("utf-8")) + + dmaap_headers = { + "Accept": APPLICATION_JSON, + "Content-Type": APPLICATION_JSON + } + + logger.info(kwargs.get('dmaap_password')) + if 'dmaap_user' in kwargs and 'dmaap_password' in kwargs: + auth = base64.b64encode( + "{}:{}".format( + kwargs.get('dmaap_user'), + kwargs.get('dmaap_password') + ).encode("utf-8") + ).decode("utf-8") + dmaap_headers["Authorization"] = f"Basic {auth}" + + # Create client (either v0 or v1) based on arguments) + if use_v0: + return PolicyClientV0( + pdp_headers, + pdp_url, + decision_endpoint=kwargs.get('v0_decision'), + ws_endpoint=kwargs.get('v0_notifications') + ) + + return PolicyClientV1( + pdp_headers, + pdp_url, + v1_decision=kwargs.get('v1_decision'), + dmaap_url=kwargs.get('dmaap_url'), + dmaap_headers=dmaap_headers + ) diff --git a/dcae-services-policy-sync/policysync/cmd.py b/dcae-services-policy-sync/policysync/cmd.py new file mode 100644 index 0000000..9055674 --- /dev/null +++ b/dcae-services-policy-sync/policysync/cmd.py @@ -0,0 +1,234 @@ +# ============LICENSE_START======================================================= +# Copyright (c) 2021 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +""" +CLI parsing for the sync utility. +convert flags/env variables to configuration +""" +import argparse +import collections +import os +import sys +import logging +import logging.config +from urllib.parse import urlsplit +import yaml +import policysync.clients as clients +import policysync.coroutines +from .util import get_module_logger + + +logger = get_module_logger(__name__) + +APPLICATION_JSON = "application/json" + + +Config = collections.namedtuple( + 'Config', ['out_file', 'check_period', 'filters', 'ids', 'client', 'bind']) + + +def parsecmd(args): + """ + Parse the command into a config object + :param args: arguments list for parsing + :returns: Config for the policy sync + """ + parser = argparse.ArgumentParser( + description="Keeps a file updated with policies matching a filter.", + formatter_class=argparse.ArgumentDefaultsHelpFormatter, + ) + + parser.add_argument( + "--out", + type=str, + default=os.environ.get("POLICY_SYNC_OUTFILE", "policies.json"), + help="Output file to dump to", + ) + + parser.add_argument( + "--duration", + type=int, + default=os.environ.get("POLICY_SYNC_DURATION", 1200), + help="frequency (in seconds) to conduct periodic check", + ) + + parser.add_argument( + "--filters", + type=str, + default=os.environ.get("POLICY_SYNC_FILTER", "[]"), + help="Regex of policies that you are interested in.", + ) + parser.add_argument( + "--ids", + type=str, + default=os.environ.get("POLICY_SYNC_ID", "[]"), + help="Specific names of policies you are interested in.", + ) + + parser.add_argument( + "--pdp-user", + type=str, + default=os.environ.get("POLICY_SYNC_PDP_USER", None), + help="PDP basic auth username", + ) + parser.add_argument( + "--pdp-pass", + type=str, + default=os.environ.get("POLICY_SYNC_PDP_PASS", None), + help="PDP basic auth password", + ) + + parser.add_argument( + "--pdp-url", + type=str, + default=os.environ.get("POLICY_SYNC_PDP_URL", None), + help="PDP to connect to", + ) + + parser.add_argument( + "--http-bind", + type=str, + default=os.environ.get("POLICY_SYNC_HTTP_BIND", "localhost:8000"), + help="The bind address for container metrics", + ) + + parser.add_argument( + "--http-metrics", + type=bool, + default=os.environ.get("POLICY_SYNC_HTTP_METRICS", True), + help="turn on or off the prometheus metrics", + ) + + parser.add_argument( + "--use-v0", + type=bool, + default=os.environ.get("POLICY_SYNC_V0_ENABLE", False), + help="Turn on usage of the legacy v0 policy API", + ) + + parser.add_argument( + "--logging-config", + type=str, + default=os.environ.get("POLICY_SYNC_LOGGING_CONFIG", None), + help="Python formatted logging configuration file", + ) + + # V0 API specific configuration + parser.add_argument( + "--v0-notify-endpoint", + type=str, + default=os.environ.get( + "POLICY_SYNC_V0_NOTIFIY_ENDPOINT", "pdp/notifications" + ), + help="Path of the v0 websocket notification", + ) + + parser.add_argument( + "--v0-decision-endpoint", + type=str, + default=os.environ.get("POLICY_SYNC_V0_DECISION_ENDPOINT", "pdp/api"), + help="path of the v0 decision endpoint", + ) + + # V1 API specific configuration + parser.add_argument( + "--v1-dmaap-topic", + type=str, + default=os.environ.get("POLICY_SYNC_V1_DMAAP_URL", None), + help="URL of the dmaap topic used in v1 api for notifications", + ) + + parser.add_argument( + "--v1-dmaap-user", + type=str, + default=os.environ.get("POLICY_SYNC_V1_DMAAP_USER", None), + help="User to use with with the dmaap topic" + ) + + parser.add_argument( + "--v1-dmaap-pass", + type=str, + default=os.environ.get("POLICY_SYNC_V1_DMAAP_PASS", None), + help="Password to use with the dmaap topic" + ) + + parser.add_argument( + "--v1-decision-endpoint", + type=str, + default=os.environ.get( + "POLICY_SYNC_V1_PDP_DECISION_ENDPOINT", + "policy/pdpx/v1/decision" + ), + help="Decision endpoint used in the v1 api for notifications", + ) + + args = parser.parse_args(args) + + if args.logging_config: + logging.config.fileConfig( + args.logging_config, + disable_existing_loggers=False + ) + else: + handler = logging.StreamHandler() + formatter = logging.Formatter( + "[%(asctime)s][%(levelname)-5s]%(message)s" + ) + root = logging.getLogger() + handler.setFormatter(formatter) + root.addHandler(handler) + root.setLevel(logging.INFO) + + bind = args.http_bind if args.http_metrics else None + + client = clients.get_client( + args.pdp_url, + pdp_user=args.pdp_user, + pdp_password=args.pdp_pass, + use_v0=args.use_v0, + v0_decision=args.v0_decision_endpoint, + v0_notifications=args.v0_notify_endpoint, + v1_decision=args.v1_decision_endpoint, + dmaap_url=args.v1_dmaap_topic, + dmaap_user=args.v1_dmaap_user, + dmaap_password=args.v1_dmaap_pass + ) + + if bind is not None: + bind = urlsplit("//" + bind) + + return Config( + out_file=args.out, + check_period=args.duration, + filters=yaml.safe_load(args.filters), + ids=yaml.safe_load(args.ids), + client=client, + bind=bind, + ) + + +def main(): + """ + Parse the arguments passed in via the command line and start the app + """ + try: + config = parsecmd(sys.argv[1:]) + except ValueError: + logger.error( + "There was no POLICY_SYNC_PDP_URL set or --pdp flag set" + ) + return -1 + policysync.coroutines.start_event_loop(config) + return 0 diff --git a/dcae-services-policy-sync/policysync/coroutines.py b/dcae-services-policy-sync/policysync/coroutines.py new file mode 100644 index 0000000..236d6f2 --- /dev/null +++ b/dcae-services-policy-sync/policysync/coroutines.py @@ -0,0 +1,182 @@ +# ============LICENSE_START======================================================= +# Copyright (c) 2021 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +""" +Asyncio coroutine setup for both periodic and real time notification tasks """ +import signal +import asyncio +from prometheus_client import start_http_server +from .inventory import Inventory +from .util import get_module_logger + +SLEEP_ON_ERROR = 10 +logger = get_module_logger(__name__) + + +async def notify_task(inventory, sleep): + """ + start the notification task + :param inventory: Inventory + :param sleep: how long to wait on error in seconds + """ + + logger.info("opening notificationhandler for policy...") + await inventory.client.notificationhandler( + inventory.check_and_update, + ids=inventory.policy_ids, + filters=inventory.policy_filters, + ) + logger.warning("websocket closed or errored...will attempt reconnection") + await asyncio.sleep(sleep) + + +async def periodic_task(inventory, sleep): + """ + start the periodic task + :param inventory: Inventory + :param sleep: how long to wait between periodic checks + """ + await asyncio.sleep(sleep) + logger.info("Executing periodic check of PDP policies") + await inventory.update() + + +async def task_runner(inventory, sleep, task, should_run): + """ + Runs a task in an event loop + :param inventory: Inventory + :param sleep: how long to wait between loop iterations + :param task: coroutine to run + :param should_run: function for should this task continue to run + """ + # pylint: disable=broad-except + while should_run(): + try: + await task(inventory, sleep) + except asyncio.CancelledError: + break + except Exception: + logger.exception("Received exception") + + +async def shutdown(loop, tasks, inventory): + """ + shutdown the event loop and cancel all tasks + :param loop: Asyncio eventloop + :param tasks: list of asyncio tasks + :param inventory: the inventory object + """ + + logger.info("caught signal") + # Stop the websocket routines + for task in tasks: + task.cancel() + await task + + # Close the client + await inventory.close() + loop.stop() + + +def _setup_coroutines( + loop, + inventory, + shutdown_handler, + task_r, + **kwargs +): + """ sets up the application coroutines""" + # Task runner takes a function for stop condition + # (for testing purposes) but should always run in practice + # pylint: disable=broad-except + def infinite_condition(): + return True + + logger.info("Starting gather of all policies...") + try: + loop.run_until_complete(inventory.gather()) + except Exception: + logger.exception('received exception on initial gather') + + # websocket and the periodic check of policies + tasks = [ + loop.create_task( + task_r( + inventory, + kwargs.get('check_period', 2400), + periodic_task, + infinite_condition + ) + ) + ] + + if inventory.client.supports_notifications(): + tasks.append( + loop.create_task( + task_r( + inventory, + SLEEP_ON_ERROR, + notify_task, + infinite_condition + ) + ) + ) + else: + logger.warning( + "Defaulting to polling... Provide a dmaap url to receive faster updates" + ) + + # Add shutdown handlers for sigint and sigterm + for signame in ("SIGINT", "SIGTERM"): + sig = getattr(signal, signame) + loop.add_signal_handler( + sig, + lambda: asyncio.ensure_future( + shutdown_handler(loop, tasks, inventory) + ), + ) + + # Start prometheus server daemonthread for metrics/healthchecking + if 'bind' in kwargs: + metrics_server = kwargs.get('metrics_server', start_http_server) + metrics_server(kwargs['bind'].port, addr=kwargs['bind'].hostname) + + +def start_event_loop(config): + """ + start the event loop that runs the application + :param config: Config object for the application + """ + loop = asyncio.get_event_loop() + inventory = Inventory( + config.filters, + config.ids, + config.out_file, + config.client + ) + + _setup_coroutines( + loop, + inventory, + shutdown, + task_runner, + metrics_server=start_http_server, + bind=config.bind, + check_period=config.check_period + ) + + loop.run_forever() + loop.close() + logger.info("shutdown complete") diff --git a/dcae-services-policy-sync/policysync/inventory.py b/dcae-services-policy-sync/policysync/inventory.py new file mode 100644 index 0000000..0eb91b5 --- /dev/null +++ b/dcae-services-policy-sync/policysync/inventory.py @@ -0,0 +1,169 @@ +# ============LICENSE_START======================================================= +# Copyright (c) 2021 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +""" In memory data store for policies which are currently used by a mS """ +import asyncio +import json +import uuid +import os +import tempfile +import aiohttp +from datetime import datetime +from .util import get_module_logger + +logger = get_module_logger(__name__) + +ACTION_GATHERED = "gathered" +ACTION_UPDATED = "updated" +OUTFILE_INDENT = 4 + + +class Inventory: + """ In memory data store for policies which are currently used by a mS """ + def __init__(self, filters, ids, outfile, client): + self.policy_filters = filters + self.policy_ids = ids + self.hp_active_inventory = set() + self.get_lock = asyncio.Lock() + self.file = outfile + self.queue = asyncio.Queue() + self.client = client + + async def gather(self): + """ + Run at startup to gather an initial inventory of policies + """ + return await self._sync_inventory(ACTION_GATHERED) + + async def update(self): + """ + Run to update an inventory of policies on the fly + """ + return await self._sync_inventory(ACTION_UPDATED) + + async def check_and_update(self): + """ check and update the policy inventory """ + return await self.update() + + async def close(self): + """ close the policy inventory and its associated client """ + await self.client.close() + + def _atomic_dump(self, data): + """ atomically dump the policy content to a file by rename """ + try: + temp_file = tempfile.NamedTemporaryFile( + delete=False, + dir=os.path.dirname(self.file), + prefix=os.path.basename(self.file), + mode="w", + ) + try: + temp_file.write(data) + finally: + # fsync the file so its on disk + temp_file.flush() + os.fsync(temp_file.fileno()) + finally: + temp_file.close() + + os.rename(temp_file.name, os.path.abspath(self.file)) + + async def get_policy_content(self, action=ACTION_UPDATED): + """ + get the policy content off the PDP + :param action: what action to present + :returns: True/False depending on if update was successful + """ + logger.info("Starting policy update process...") + try: + policy_bodies = await self.client.get_config( + filters=self.policy_filters, ids=self.policy_ids + ) + except aiohttp.ClientError: + logger.exception('Conncection Error while connecting to PDP') + return False + + # match the format a bit of the Config Binding Service + out = { + "policies": {"items": policy_bodies}, + "event": { + "action": action, + "timestamp": (datetime.utcnow().isoformat()[:-3] + "Z"), + "update_id": str(uuid.uuid4()), + "policies_count": len(policy_bodies), + }, + } + + # Atomically dump the file to disk + tmp = { + x.get("policyName") for x in policy_bodies if "policyName" in x + } + + if tmp != self.hp_active_inventory: + data = json.dumps(out) + loop = asyncio.get_event_loop() + await loop.run_in_executor(None, self._atomic_dump, data) + logger.info( + "Update complete. Policies dumped to: %s", self.file + ) + self.hp_active_inventory = tmp + return True + else: + logger.info("No updates needed for now") + return False + + async def _sync_inventory(self, action): + """ + Pull an inventory of policies. Commit changes if there is a change. + return: boolean to represent whether changes were commited + """ + try: + pdp_inventory = await self.client.list_policies( + filters=self.policy_filters, ids=self.policy_ids + ) + except aiohttp.ClientError: + logger.exception("Inventory sync failed due to a connection error") + return False + + logger.debug("pdp_inventory -> %s", pdp_inventory) + + # Below needs to be under a lock because of + # the call to getConfig being awaited. + async with self.get_lock: + if self.hp_active_inventory != pdp_inventory or \ + pdp_inventory is None: + + # Log a delta of what has changed related to this policy update + if pdp_inventory is not None and \ + self.hp_active_inventory is not None: + msg = { + "removed": list( + self.hp_active_inventory - pdp_inventory + ), + "added": list( + pdp_inventory - self.hp_active_inventory + ), + } + logger.info( + "PDP indicates the following changes: %s ", msg + ) + + return await self.get_policy_content(action) + + logger.info( + "local matches pdp. no update required for now" + ) + return False diff --git a/dcae-services-policy-sync/policysync/metrics.py b/dcae-services-policy-sync/policysync/metrics.py new file mode 100644 index 0000000..7f825fc --- /dev/null +++ b/dcae-services-policy-sync/policysync/metrics.py @@ -0,0 +1,38 @@ +# ============LICENSE_START======================================================= +# Copyright (c) 2021 AT&T Intellectual Property. All rights reserved. +# ================================================================================ +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END========================================================= +""" counters and gaugages for various metrics """ +from prometheus_client import Counter, Gauge + + +policy_updates_counter = Counter( + "policy_updates", "Number of total policy updates commited" +) +websock_closures = Counter( + "websocket_errors_and_closures", "Number of websocket closures or errors" +) +list_policy_exceptions = Counter( + "list_policy_exception", + "Exceptions that have occured as a result of calling listPolicy", +) +get_config_exceptions = Counter( + "get_config_exception", + "Exceptions that have occured as a result of calling getConfig", +) + +active_policies_gauge = Gauge( + "active_policies", + "Number of policies that have been retrieved off the PDP" +) diff --git a/dcae-services-policy-sync/policysync/util.py b/dcae-services-policy-sync/policysync/util.py new file mode 100644 index 0000000..1bbac5a --- /dev/null +++ b/dcae-services-policy-sync/policysync/util.py @@ -0,0 +1,10 @@ +""" utility functions (currenlty just for logging) """ +import logging + + +def get_module_logger(mod_name): + """ + To use this, do logger = get_module_logger(__name__) + """ + logger = logging.getLogger(mod_name) + return logger |