summaryrefslogtreecommitdiffstats
path: root/dcae-services-policy-sync/policysync
diff options
context:
space:
mode:
authorcluckenbaugh <cl5597@att.com>2021-02-11 18:26:30 -0500
committercluckenbaugh <cl5597@att.com>2021-02-17 17:45:30 -0500
commitfffe41c078fa427f2a62035ee2d6cc5cd407238c (patch)
treecf4c5e7e78c2a5f2a1a9f9e84e67deb2d24b5bbf /dcae-services-policy-sync/policysync
parente8832b777811c0d154929dc10d6a60352cd37bd2 (diff)
Seed policysync container code
For use by helm microservices to receive policy Issue-ID: DCAEGEN2-2556 Change-Id: I2d9cb92ab480a90c63a9d8e6242848f7ca2df0f3 Signed-off-by: cluckenbaugh <cl5597@att.com>
Diffstat (limited to 'dcae-services-policy-sync/policysync')
-rw-r--r--dcae-services-policy-sync/policysync/__init__.py16
-rw-r--r--dcae-services-policy-sync/policysync/clients.py512
-rw-r--r--dcae-services-policy-sync/policysync/cmd.py234
-rw-r--r--dcae-services-policy-sync/policysync/coroutines.py182
-rw-r--r--dcae-services-policy-sync/policysync/inventory.py169
-rw-r--r--dcae-services-policy-sync/policysync/metrics.py38
-rw-r--r--dcae-services-policy-sync/policysync/util.py10
7 files changed, 1161 insertions, 0 deletions
diff --git a/dcae-services-policy-sync/policysync/__init__.py b/dcae-services-policy-sync/policysync/__init__.py
new file mode 100644
index 0000000..7c04dfd
--- /dev/null
+++ b/dcae-services-policy-sync/policysync/__init__.py
@@ -0,0 +1,16 @@
+# ============LICENSE_START=======================================================
+# Copyright (c) 2021 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+""" nothing here """
diff --git a/dcae-services-policy-sync/policysync/clients.py b/dcae-services-policy-sync/policysync/clients.py
new file mode 100644
index 0000000..698bc86
--- /dev/null
+++ b/dcae-services-policy-sync/policysync/clients.py
@@ -0,0 +1,512 @@
+# ============LICENSE_START=======================================================
+# Copyright (c) 2021 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+"""Clients for communicating with both the post dublin and pre dublin APIs"""
+import json
+import re
+import base64
+import uuid
+import asyncio
+import aiohttp
+import policysync.metrics as metrics
+from .util import get_module_logger
+
+logger = get_module_logger(__name__)
+
+# Websocket config
+WS_HEARTBEAT = 60
+WS_NOTIFICATIONS_ENDPOINT = "pdp/notifications"
+# REST config
+V1_DECISION_ENDPOINT = "policy/pdpx/v1/decision"
+V0_DECISION_ENDPOINT = "pdp/api"
+
+APPLICATION_JSON = "application/json"
+
+
+def get_single_regex(filters, ids):
+ """given a list of filters and ids returns a single regex for matching"""
+ filters = [] if filters is None else filters
+ ids = [] if ids is None else ["{}[.][0-9]+[.]xml".format(x) for x in ids]
+ return "|".join(filters + ids) if filters is not None else ""
+
+
+class BasePolicyClient:
+ """ Base policy client that is pluggable into inventory """
+ def __init__(self, pdp_url, headers=None):
+ self.headers = {} if headers is None else headers
+ self.session = None
+ self.pdp_url = pdp_url
+
+ def _init_rest_session(self):
+ """
+ initialize an aiohttp rest session
+ :returns: an aiohttp rest session
+ """
+ if self.session is None:
+ self.session = aiohttp.ClientSession(
+ headers=self.headers, raise_for_status=True
+ )
+
+ return self.session
+
+ async def _run_request(self, endpoint, request_data):
+ """
+ execute a particular REST request
+ :param endpoint: str rest endpoint to query
+ :param request_data: dictionary request data
+ :returns: dictionary response data
+ """
+ session = self._init_rest_session()
+ async with session.post(
+ "{0}/{1}".format(self.pdp_url, endpoint), json=request_data
+ ) as resp:
+ data = await resp.read()
+ return json.loads(data)
+
+ def supports_notifications(self):
+ """
+ does this particular client support real time notifictions
+ :returns: True
+ """
+ # in derived classes we may use self
+ # pylint: disable=no-self-use
+ return True
+
+ async def list_policies(self, filters=None, ids=None):
+ """
+ used to get a list of policies matching a particular ID
+ :param filters: list of regex filter strings for matching
+ :param ids: list of id strings for matching
+ :returns: List of policies matching filters or ids
+ """
+ raise NotImplementedError
+
+ async def get_config(self, filters=None, ids=None):
+ """
+ used to get a list of policies matching a particular ID
+ :returns: List of policies matching filters or ids
+ """
+ raise NotImplementedError
+
+ async def notificationhandler(self, callback, ids=None, filters=None):
+ """
+ Clients should implement this to support real time notifications
+ :param callback: func to execute when a matching notification is found
+ :param ids: list of id strings for matching
+ """
+ raise NotImplementedError
+
+ async def close(self):
+ """ close the policy client """
+ logger.info("closing websocket clients...")
+ if self.session:
+ await self.session.close()
+
+
+class PolicyClientV0(BasePolicyClient):
+ """
+ Supports the legacy v0 policy API use prior to ONAP Dublin
+ """
+ async def close(self):
+ """ close the policy client """
+ await super().close()
+ if self.ws_session is not None:
+ await self.ws_session.close()
+
+ def __init__(
+ self,
+ headers,
+ pdp_url,
+ decision_endpoint=V0_DECISION_ENDPOINT,
+ ws_endpoint=WS_NOTIFICATIONS_ENDPOINT
+ ):
+ """
+ Initialize a v0 policy client
+ :param headers: Headers to use for policy rest api
+ :param pdp_url: URL of the PDP
+ :param decision_endpoint: root for the decison API
+ :param websocket_endpoint: root of the websocket endpoint
+ """
+ super().__init__(pdp_url, headers=headers)
+ self.ws_session = None
+ self.session = None
+ self.decision_endpoint = decision_endpoint
+ self.ws_endpoint = ws_endpoint
+ self._ws = None
+
+ def _init_ws_session(self):
+ """initialize a websocket session for notifications"""
+ if self.ws_session is None:
+ self.ws_session = aiohttp.ClientSession()
+
+ return self.ws_session
+
+ @metrics.list_policy_exceptions.count_exceptions()
+ async def list_policies(self, filters=None, ids=None):
+ """
+ used to get a list of policies matching a particular ID
+ :param filters: list of regex filter strings for matching
+ :param ids: list of id strings for matching
+ :returns: List of policies matching filters or ids
+ """
+ request_data = self._prepare_request(filters, ids)
+ policies = await self._run_request(
+ f"{self.decision_endpoint}/listPolicy", request_data
+ )
+ return set(policies)
+
+ @classmethod
+ def _prepare_request(cls, filters, ids):
+ """prepare the request body for the v0 api"""
+ regex = get_single_regex(filters, ids)
+ return {"policyName": regex}
+
+ @metrics.get_config_exceptions.count_exceptions()
+ async def get_config(self, filters=None, ids=None):
+ """
+ Used to get the actual policy configuration from PDP
+ :return: the policy objects that are currently active
+ for the given set of filters
+ """
+ request_data = self._prepare_request(filters, ids)
+ policies = await self._run_request(
+ f"{self.decision_endpoint}/getConfig", request_data)
+
+ for policy in policies:
+ try:
+ policy["config"] = json.loads(policy["config"])
+ except json.JSONDecodeError:
+ pass
+
+ return policies
+
+ @classmethod
+ def _needs_update(cls, update, ids=None, filters=None):
+ """
+ Expect something like this
+ {
+ "removedPolicies": [{
+ "policyName": "xyz.45.xml",
+ "versionNo": "45"
+ }],
+ "loadedPolicies": [{
+ "policyName": "xyz.46.xml",
+ "versionNo": "46",
+ "matches": {
+ "ONAPName": "DCAE",
+ "ConfigName": "DCAE_HighlandPark_AgingConfig",
+ "service": "DCAE_HighlandPark_AgingConfig",
+ "guard": "false",
+ "location": " Edge",
+ "TTLDate": "NA",
+ "uuid": "TestUUID",
+ "RiskLevel": "5",
+ "RiskType": "default"
+ },
+ "updateType": "UPDATE"
+ }],
+ "notificationType": "BOTH"
+ }
+ """
+ for policy in update.get("removedPolicies", []) + update.get(
+ "loadedPolicies", []
+ ):
+ if (
+ re.match(get_single_regex(filters, ids), policy["policyName"])
+ is not None
+ ):
+ return True
+
+ return False
+
+ async def notificationhandler(self, callback, ids=None, filters=None):
+ """
+ websocket based notification handler for
+ :param callback: function to execute when
+ a matching notification is found
+ :param ids: list of id strings for matching
+ """
+
+ url = self.pdp_url.replace("https", "wss")
+
+ # The websocket we start here will periodically
+ # send heartbeat (ping frames) to policy
+ # this ensures that we are never left hanging
+ # with our communication with policy.
+ session = self._init_ws_session()
+ try:
+ websocket = await session.ws_connect(
+ "{0}/{1}".format(url, self.ws_endpoint), heartbeat=WS_HEARTBEAT
+ )
+ logger.info("websock with policy established")
+ async for msg in websocket:
+ # check for websocket errors
+ # break out of this async for loop. to attempt reconnection
+ if msg.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.ERROR):
+ break
+
+ if msg.type is (aiohttp.WSMsgType.TEXT):
+ if self._needs_update(
+ json.loads(msg.data),
+ ids=ids,
+ filters=filters
+ ):
+ logger.debug(
+ "notification received from pdp websocket -> %s", msg
+ )
+ await callback()
+ else:
+ logger.warning(
+ "unexpected websocket message type received %s", msg.type
+ )
+ except aiohttp.ClientError:
+ logger.exception("Received connection error with websocket")
+
+
+class PolicyClientV1(BasePolicyClient):
+ """
+ Supports the v1 policy API introduced in ONAP's dublin release
+ """
+
+ async def close(self):
+ """ close the policy client """
+ await super().close()
+ if self.dmaap_session is not None:
+ await self.dmaap_session.close()
+
+ def _init_dmaap_session(self):
+ """ initialize a dmaap session for notifications """
+ if self.dmaap_session is None:
+ self.dmaap_session = aiohttp.ClientSession(
+ headers=self.dmaap_headers,
+ raise_for_status=True
+ )
+
+ return self.dmaap_session
+
+ def __init__(
+ self,
+ headers,
+ pdp_url,
+ **kwargs,
+ ):
+ super().__init__(pdp_url, headers=headers)
+ self._ws = None
+ self.audit_uuid = str(uuid.uuid4())
+ self.dmaap_url = kwargs.get('dmaap_url')
+ self.dmaap_timeout = 15000
+ self.dmaap_session = None
+ self.dmaap_headers = kwargs.get('dmaap_headers', {})
+ self.decision = kwargs.get('v1_decision', V1_DECISION_ENDPOINT)
+
+ async def list_policies(self, filters=None, ids=None):
+ """
+ ONAP has no real equivalent to this.
+ :returns: None
+ """
+ # in derived classes we may use self
+ # pylint: disable=no-self-use
+ return None
+
+ @classmethod
+ def convert_to_policy(cls, policy_body):
+ """
+ Convert raw policy to format expected by microservices
+ :param policy_body: raw dictionary output from pdp
+ :returns: data in proper formatting
+ """
+ pdp_metadata = policy_body.get("metadata", {})
+ policy_id = pdp_metadata.get("policy-id")
+ policy_version = policy_body.get("version")
+ if not policy_id or policy_version is None:
+ logger.warning("Malformed policy is missing policy-id and version")
+ return None
+
+ policy_body["policyName"] = "{}.{}.xml".format(
+ policy_id, str(policy_version.replace(".", "-"))
+ )
+ policy_body["policyVersion"] = str(policy_version)
+ if "properties" in policy_body:
+ policy_body["config"] = policy_body["properties"]
+ del policy_body["properties"]
+
+ return policy_body
+
+ @metrics.get_config_exceptions.count_exceptions()
+ async def get_config(self, filters=None, ids=None):
+ """
+ Used to get the actual policy configuration from PDP
+ :returns: the policy objects that are currently active
+ for the given set of filters
+ """
+ if ids is None:
+ ids = []
+
+ request_data = {
+ "ONAPName": "DCAE",
+ "ONAPComponent": "policy-sync",
+ "ONAPInstance": self.audit_uuid,
+ "action": "configure",
+ "resource": {"policy-id": ids}
+ }
+
+ data = await self._run_request(self.decision, request_data)
+ out = []
+ for policy_body in data["policies"].values():
+ policy = self.convert_to_policy(policy_body)
+ if policy is not None:
+ out.append(policy)
+
+ return out
+
+ def supports_notifications(self):
+ """
+ Does this policy client support real time notifications
+ :returns: True if the dmaap url is set else return false
+ """
+ return self.dmaap_url is not None
+
+ @classmethod
+ def _needs_update(cls, update, ids):
+ """
+ expect something like this
+ {
+ "deployed-policies": [
+ {
+ "policy-type": "onap.policies.monitoring.tcagen2",
+ "policy-type-version": "1.0.0",
+ "policy-id": "onap.scaleout.tca",
+ "policy-version": "2.0.0",
+ "success-count": 3,
+ "failure-count": 0
+ }
+ ],
+ "undeployed-policies": [
+ {
+ "policy-type": "onap.policies.monitoring.tcagen2",
+ "policy-type-version": "1.0.0",
+ "policy-id": "onap.firewall.tca",
+ "policy-version": "6.0.0",
+ "success-count": 3,
+ "failure-count": 0
+ }
+ ]
+ }
+ """
+ for policy in update.get("deployed-policies", []) + update.get(
+ "undeployed-policies", []
+ ):
+ if policy.get("policy-id") in ids:
+ return True
+
+ return False
+
+ async def poll_dmaap(self, callback, ids=None):
+ """
+ one GET request to dmaap
+ :param callback: function to execute when a
+ matching notification is found
+ :param ids: list of id strings for matching
+ """
+ query = f"?timeout={self.dmaap_timeout}"
+ url = f"{self.dmaap_url}/{self.audit_uuid}/0{query}"
+ logger.info("polling topic: %s", url)
+ session = self._init_dmaap_session()
+ try:
+ async with session.get(url) as response:
+ messages = await response.read()
+
+ for msg in json.loads(messages):
+ if self._needs_update(json.loads(msg), ids):
+ logger.info(
+ "notification received from dmaap -> %s", msg
+ )
+ await callback()
+ except aiohttp.ClientError:
+ logger.exception('received connection error from dmaap topic')
+ # wait some time
+ await asyncio.sleep(30)
+
+ async def notificationhandler(self, callback, ids=None, filters=None):
+ """
+ dmaap based notification handler for
+ :param callback: function to execute when a
+ matching notification is found
+ :param ids: list of id strings for matching
+ """
+ if filters is not None:
+ logger.warning("filters are not supported with pdp v1..ignoring")
+ while True:
+ await self.poll_dmaap(callback, ids=ids)
+
+
+def get_client(
+ pdp_url,
+ use_v0=False,
+ **kwargs
+):
+ """
+ get a particular policy client
+ :param use_v0: whether this should be a v0 client or
+ :return: A policy client
+ """
+ if pdp_url is None:
+ raise ValueError("POLICY_SYNC_PDP_URL set or --pdp flag not set")
+
+ pdp_headers = {
+ "Accept": APPLICATION_JSON,
+ "Content-Type": APPLICATION_JSON
+ }
+
+ if 'pdp_user' in kwargs and 'pdp_password' in kwargs:
+ auth = base64.b64encode(
+ "{}:{}".format(
+ kwargs.get('pdp_user'),
+ kwargs.get('pdp_password')
+ ).encode("utf-8")
+ )
+ pdp_headers["Authorization"] = "Basic {}".format(auth.decode("utf-8"))
+
+ dmaap_headers = {
+ "Accept": APPLICATION_JSON,
+ "Content-Type": APPLICATION_JSON
+ }
+
+ logger.info(kwargs.get('dmaap_password'))
+ if 'dmaap_user' in kwargs and 'dmaap_password' in kwargs:
+ auth = base64.b64encode(
+ "{}:{}".format(
+ kwargs.get('dmaap_user'),
+ kwargs.get('dmaap_password')
+ ).encode("utf-8")
+ ).decode("utf-8")
+ dmaap_headers["Authorization"] = f"Basic {auth}"
+
+ # Create client (either v0 or v1) based on arguments)
+ if use_v0:
+ return PolicyClientV0(
+ pdp_headers,
+ pdp_url,
+ decision_endpoint=kwargs.get('v0_decision'),
+ ws_endpoint=kwargs.get('v0_notifications')
+ )
+
+ return PolicyClientV1(
+ pdp_headers,
+ pdp_url,
+ v1_decision=kwargs.get('v1_decision'),
+ dmaap_url=kwargs.get('dmaap_url'),
+ dmaap_headers=dmaap_headers
+ )
diff --git a/dcae-services-policy-sync/policysync/cmd.py b/dcae-services-policy-sync/policysync/cmd.py
new file mode 100644
index 0000000..9055674
--- /dev/null
+++ b/dcae-services-policy-sync/policysync/cmd.py
@@ -0,0 +1,234 @@
+# ============LICENSE_START=======================================================
+# Copyright (c) 2021 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+"""
+CLI parsing for the sync utility.
+convert flags/env variables to configuration
+"""
+import argparse
+import collections
+import os
+import sys
+import logging
+import logging.config
+from urllib.parse import urlsplit
+import yaml
+import policysync.clients as clients
+import policysync.coroutines
+from .util import get_module_logger
+
+
+logger = get_module_logger(__name__)
+
+APPLICATION_JSON = "application/json"
+
+
+Config = collections.namedtuple(
+ 'Config', ['out_file', 'check_period', 'filters', 'ids', 'client', 'bind'])
+
+
+def parsecmd(args):
+ """
+ Parse the command into a config object
+ :param args: arguments list for parsing
+ :returns: Config for the policy sync
+ """
+ parser = argparse.ArgumentParser(
+ description="Keeps a file updated with policies matching a filter.",
+ formatter_class=argparse.ArgumentDefaultsHelpFormatter,
+ )
+
+ parser.add_argument(
+ "--out",
+ type=str,
+ default=os.environ.get("POLICY_SYNC_OUTFILE", "policies.json"),
+ help="Output file to dump to",
+ )
+
+ parser.add_argument(
+ "--duration",
+ type=int,
+ default=os.environ.get("POLICY_SYNC_DURATION", 1200),
+ help="frequency (in seconds) to conduct periodic check",
+ )
+
+ parser.add_argument(
+ "--filters",
+ type=str,
+ default=os.environ.get("POLICY_SYNC_FILTER", "[]"),
+ help="Regex of policies that you are interested in.",
+ )
+ parser.add_argument(
+ "--ids",
+ type=str,
+ default=os.environ.get("POLICY_SYNC_ID", "[]"),
+ help="Specific names of policies you are interested in.",
+ )
+
+ parser.add_argument(
+ "--pdp-user",
+ type=str,
+ default=os.environ.get("POLICY_SYNC_PDP_USER", None),
+ help="PDP basic auth username",
+ )
+ parser.add_argument(
+ "--pdp-pass",
+ type=str,
+ default=os.environ.get("POLICY_SYNC_PDP_PASS", None),
+ help="PDP basic auth password",
+ )
+
+ parser.add_argument(
+ "--pdp-url",
+ type=str,
+ default=os.environ.get("POLICY_SYNC_PDP_URL", None),
+ help="PDP to connect to",
+ )
+
+ parser.add_argument(
+ "--http-bind",
+ type=str,
+ default=os.environ.get("POLICY_SYNC_HTTP_BIND", "localhost:8000"),
+ help="The bind address for container metrics",
+ )
+
+ parser.add_argument(
+ "--http-metrics",
+ type=bool,
+ default=os.environ.get("POLICY_SYNC_HTTP_METRICS", True),
+ help="turn on or off the prometheus metrics",
+ )
+
+ parser.add_argument(
+ "--use-v0",
+ type=bool,
+ default=os.environ.get("POLICY_SYNC_V0_ENABLE", False),
+ help="Turn on usage of the legacy v0 policy API",
+ )
+
+ parser.add_argument(
+ "--logging-config",
+ type=str,
+ default=os.environ.get("POLICY_SYNC_LOGGING_CONFIG", None),
+ help="Python formatted logging configuration file",
+ )
+
+ # V0 API specific configuration
+ parser.add_argument(
+ "--v0-notify-endpoint",
+ type=str,
+ default=os.environ.get(
+ "POLICY_SYNC_V0_NOTIFIY_ENDPOINT", "pdp/notifications"
+ ),
+ help="Path of the v0 websocket notification",
+ )
+
+ parser.add_argument(
+ "--v0-decision-endpoint",
+ type=str,
+ default=os.environ.get("POLICY_SYNC_V0_DECISION_ENDPOINT", "pdp/api"),
+ help="path of the v0 decision endpoint",
+ )
+
+ # V1 API specific configuration
+ parser.add_argument(
+ "--v1-dmaap-topic",
+ type=str,
+ default=os.environ.get("POLICY_SYNC_V1_DMAAP_URL", None),
+ help="URL of the dmaap topic used in v1 api for notifications",
+ )
+
+ parser.add_argument(
+ "--v1-dmaap-user",
+ type=str,
+ default=os.environ.get("POLICY_SYNC_V1_DMAAP_USER", None),
+ help="User to use with with the dmaap topic"
+ )
+
+ parser.add_argument(
+ "--v1-dmaap-pass",
+ type=str,
+ default=os.environ.get("POLICY_SYNC_V1_DMAAP_PASS", None),
+ help="Password to use with the dmaap topic"
+ )
+
+ parser.add_argument(
+ "--v1-decision-endpoint",
+ type=str,
+ default=os.environ.get(
+ "POLICY_SYNC_V1_PDP_DECISION_ENDPOINT",
+ "policy/pdpx/v1/decision"
+ ),
+ help="Decision endpoint used in the v1 api for notifications",
+ )
+
+ args = parser.parse_args(args)
+
+ if args.logging_config:
+ logging.config.fileConfig(
+ args.logging_config,
+ disable_existing_loggers=False
+ )
+ else:
+ handler = logging.StreamHandler()
+ formatter = logging.Formatter(
+ "[%(asctime)s][%(levelname)-5s]%(message)s"
+ )
+ root = logging.getLogger()
+ handler.setFormatter(formatter)
+ root.addHandler(handler)
+ root.setLevel(logging.INFO)
+
+ bind = args.http_bind if args.http_metrics else None
+
+ client = clients.get_client(
+ args.pdp_url,
+ pdp_user=args.pdp_user,
+ pdp_password=args.pdp_pass,
+ use_v0=args.use_v0,
+ v0_decision=args.v0_decision_endpoint,
+ v0_notifications=args.v0_notify_endpoint,
+ v1_decision=args.v1_decision_endpoint,
+ dmaap_url=args.v1_dmaap_topic,
+ dmaap_user=args.v1_dmaap_user,
+ dmaap_password=args.v1_dmaap_pass
+ )
+
+ if bind is not None:
+ bind = urlsplit("//" + bind)
+
+ return Config(
+ out_file=args.out,
+ check_period=args.duration,
+ filters=yaml.safe_load(args.filters),
+ ids=yaml.safe_load(args.ids),
+ client=client,
+ bind=bind,
+ )
+
+
+def main():
+ """
+ Parse the arguments passed in via the command line and start the app
+ """
+ try:
+ config = parsecmd(sys.argv[1:])
+ except ValueError:
+ logger.error(
+ "There was no POLICY_SYNC_PDP_URL set or --pdp flag set"
+ )
+ return -1
+ policysync.coroutines.start_event_loop(config)
+ return 0
diff --git a/dcae-services-policy-sync/policysync/coroutines.py b/dcae-services-policy-sync/policysync/coroutines.py
new file mode 100644
index 0000000..236d6f2
--- /dev/null
+++ b/dcae-services-policy-sync/policysync/coroutines.py
@@ -0,0 +1,182 @@
+# ============LICENSE_START=======================================================
+# Copyright (c) 2021 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+"""
+Asyncio coroutine setup for both periodic and real time notification tasks """
+import signal
+import asyncio
+from prometheus_client import start_http_server
+from .inventory import Inventory
+from .util import get_module_logger
+
+SLEEP_ON_ERROR = 10
+logger = get_module_logger(__name__)
+
+
+async def notify_task(inventory, sleep):
+ """
+ start the notification task
+ :param inventory: Inventory
+ :param sleep: how long to wait on error in seconds
+ """
+
+ logger.info("opening notificationhandler for policy...")
+ await inventory.client.notificationhandler(
+ inventory.check_and_update,
+ ids=inventory.policy_ids,
+ filters=inventory.policy_filters,
+ )
+ logger.warning("websocket closed or errored...will attempt reconnection")
+ await asyncio.sleep(sleep)
+
+
+async def periodic_task(inventory, sleep):
+ """
+ start the periodic task
+ :param inventory: Inventory
+ :param sleep: how long to wait between periodic checks
+ """
+ await asyncio.sleep(sleep)
+ logger.info("Executing periodic check of PDP policies")
+ await inventory.update()
+
+
+async def task_runner(inventory, sleep, task, should_run):
+ """
+ Runs a task in an event loop
+ :param inventory: Inventory
+ :param sleep: how long to wait between loop iterations
+ :param task: coroutine to run
+ :param should_run: function for should this task continue to run
+ """
+ # pylint: disable=broad-except
+ while should_run():
+ try:
+ await task(inventory, sleep)
+ except asyncio.CancelledError:
+ break
+ except Exception:
+ logger.exception("Received exception")
+
+
+async def shutdown(loop, tasks, inventory):
+ """
+ shutdown the event loop and cancel all tasks
+ :param loop: Asyncio eventloop
+ :param tasks: list of asyncio tasks
+ :param inventory: the inventory object
+ """
+
+ logger.info("caught signal")
+ # Stop the websocket routines
+ for task in tasks:
+ task.cancel()
+ await task
+
+ # Close the client
+ await inventory.close()
+ loop.stop()
+
+
+def _setup_coroutines(
+ loop,
+ inventory,
+ shutdown_handler,
+ task_r,
+ **kwargs
+):
+ """ sets up the application coroutines"""
+ # Task runner takes a function for stop condition
+ # (for testing purposes) but should always run in practice
+ # pylint: disable=broad-except
+ def infinite_condition():
+ return True
+
+ logger.info("Starting gather of all policies...")
+ try:
+ loop.run_until_complete(inventory.gather())
+ except Exception:
+ logger.exception('received exception on initial gather')
+
+ # websocket and the periodic check of policies
+ tasks = [
+ loop.create_task(
+ task_r(
+ inventory,
+ kwargs.get('check_period', 2400),
+ periodic_task,
+ infinite_condition
+ )
+ )
+ ]
+
+ if inventory.client.supports_notifications():
+ tasks.append(
+ loop.create_task(
+ task_r(
+ inventory,
+ SLEEP_ON_ERROR,
+ notify_task,
+ infinite_condition
+ )
+ )
+ )
+ else:
+ logger.warning(
+ "Defaulting to polling... Provide a dmaap url to receive faster updates"
+ )
+
+ # Add shutdown handlers for sigint and sigterm
+ for signame in ("SIGINT", "SIGTERM"):
+ sig = getattr(signal, signame)
+ loop.add_signal_handler(
+ sig,
+ lambda: asyncio.ensure_future(
+ shutdown_handler(loop, tasks, inventory)
+ ),
+ )
+
+ # Start prometheus server daemonthread for metrics/healthchecking
+ if 'bind' in kwargs:
+ metrics_server = kwargs.get('metrics_server', start_http_server)
+ metrics_server(kwargs['bind'].port, addr=kwargs['bind'].hostname)
+
+
+def start_event_loop(config):
+ """
+ start the event loop that runs the application
+ :param config: Config object for the application
+ """
+ loop = asyncio.get_event_loop()
+ inventory = Inventory(
+ config.filters,
+ config.ids,
+ config.out_file,
+ config.client
+ )
+
+ _setup_coroutines(
+ loop,
+ inventory,
+ shutdown,
+ task_runner,
+ metrics_server=start_http_server,
+ bind=config.bind,
+ check_period=config.check_period
+ )
+
+ loop.run_forever()
+ loop.close()
+ logger.info("shutdown complete")
diff --git a/dcae-services-policy-sync/policysync/inventory.py b/dcae-services-policy-sync/policysync/inventory.py
new file mode 100644
index 0000000..0eb91b5
--- /dev/null
+++ b/dcae-services-policy-sync/policysync/inventory.py
@@ -0,0 +1,169 @@
+# ============LICENSE_START=======================================================
+# Copyright (c) 2021 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+""" In memory data store for policies which are currently used by a mS """
+import asyncio
+import json
+import uuid
+import os
+import tempfile
+import aiohttp
+from datetime import datetime
+from .util import get_module_logger
+
+logger = get_module_logger(__name__)
+
+ACTION_GATHERED = "gathered"
+ACTION_UPDATED = "updated"
+OUTFILE_INDENT = 4
+
+
+class Inventory:
+ """ In memory data store for policies which are currently used by a mS """
+ def __init__(self, filters, ids, outfile, client):
+ self.policy_filters = filters
+ self.policy_ids = ids
+ self.hp_active_inventory = set()
+ self.get_lock = asyncio.Lock()
+ self.file = outfile
+ self.queue = asyncio.Queue()
+ self.client = client
+
+ async def gather(self):
+ """
+ Run at startup to gather an initial inventory of policies
+ """
+ return await self._sync_inventory(ACTION_GATHERED)
+
+ async def update(self):
+ """
+ Run to update an inventory of policies on the fly
+ """
+ return await self._sync_inventory(ACTION_UPDATED)
+
+ async def check_and_update(self):
+ """ check and update the policy inventory """
+ return await self.update()
+
+ async def close(self):
+ """ close the policy inventory and its associated client """
+ await self.client.close()
+
+ def _atomic_dump(self, data):
+ """ atomically dump the policy content to a file by rename """
+ try:
+ temp_file = tempfile.NamedTemporaryFile(
+ delete=False,
+ dir=os.path.dirname(self.file),
+ prefix=os.path.basename(self.file),
+ mode="w",
+ )
+ try:
+ temp_file.write(data)
+ finally:
+ # fsync the file so its on disk
+ temp_file.flush()
+ os.fsync(temp_file.fileno())
+ finally:
+ temp_file.close()
+
+ os.rename(temp_file.name, os.path.abspath(self.file))
+
+ async def get_policy_content(self, action=ACTION_UPDATED):
+ """
+ get the policy content off the PDP
+ :param action: what action to present
+ :returns: True/False depending on if update was successful
+ """
+ logger.info("Starting policy update process...")
+ try:
+ policy_bodies = await self.client.get_config(
+ filters=self.policy_filters, ids=self.policy_ids
+ )
+ except aiohttp.ClientError:
+ logger.exception('Conncection Error while connecting to PDP')
+ return False
+
+ # match the format a bit of the Config Binding Service
+ out = {
+ "policies": {"items": policy_bodies},
+ "event": {
+ "action": action,
+ "timestamp": (datetime.utcnow().isoformat()[:-3] + "Z"),
+ "update_id": str(uuid.uuid4()),
+ "policies_count": len(policy_bodies),
+ },
+ }
+
+ # Atomically dump the file to disk
+ tmp = {
+ x.get("policyName") for x in policy_bodies if "policyName" in x
+ }
+
+ if tmp != self.hp_active_inventory:
+ data = json.dumps(out)
+ loop = asyncio.get_event_loop()
+ await loop.run_in_executor(None, self._atomic_dump, data)
+ logger.info(
+ "Update complete. Policies dumped to: %s", self.file
+ )
+ self.hp_active_inventory = tmp
+ return True
+ else:
+ logger.info("No updates needed for now")
+ return False
+
+ async def _sync_inventory(self, action):
+ """
+ Pull an inventory of policies. Commit changes if there is a change.
+ return: boolean to represent whether changes were commited
+ """
+ try:
+ pdp_inventory = await self.client.list_policies(
+ filters=self.policy_filters, ids=self.policy_ids
+ )
+ except aiohttp.ClientError:
+ logger.exception("Inventory sync failed due to a connection error")
+ return False
+
+ logger.debug("pdp_inventory -> %s", pdp_inventory)
+
+ # Below needs to be under a lock because of
+ # the call to getConfig being awaited.
+ async with self.get_lock:
+ if self.hp_active_inventory != pdp_inventory or \
+ pdp_inventory is None:
+
+ # Log a delta of what has changed related to this policy update
+ if pdp_inventory is not None and \
+ self.hp_active_inventory is not None:
+ msg = {
+ "removed": list(
+ self.hp_active_inventory - pdp_inventory
+ ),
+ "added": list(
+ pdp_inventory - self.hp_active_inventory
+ ),
+ }
+ logger.info(
+ "PDP indicates the following changes: %s ", msg
+ )
+
+ return await self.get_policy_content(action)
+
+ logger.info(
+ "local matches pdp. no update required for now"
+ )
+ return False
diff --git a/dcae-services-policy-sync/policysync/metrics.py b/dcae-services-policy-sync/policysync/metrics.py
new file mode 100644
index 0000000..7f825fc
--- /dev/null
+++ b/dcae-services-policy-sync/policysync/metrics.py
@@ -0,0 +1,38 @@
+# ============LICENSE_START=======================================================
+# Copyright (c) 2021 AT&T Intellectual Property. All rights reserved.
+# ================================================================================
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ============LICENSE_END=========================================================
+""" counters and gaugages for various metrics """
+from prometheus_client import Counter, Gauge
+
+
+policy_updates_counter = Counter(
+ "policy_updates", "Number of total policy updates commited"
+)
+websock_closures = Counter(
+ "websocket_errors_and_closures", "Number of websocket closures or errors"
+)
+list_policy_exceptions = Counter(
+ "list_policy_exception",
+ "Exceptions that have occured as a result of calling listPolicy",
+)
+get_config_exceptions = Counter(
+ "get_config_exception",
+ "Exceptions that have occured as a result of calling getConfig",
+)
+
+active_policies_gauge = Gauge(
+ "active_policies",
+ "Number of policies that have been retrieved off the PDP"
+)
diff --git a/dcae-services-policy-sync/policysync/util.py b/dcae-services-policy-sync/policysync/util.py
new file mode 100644
index 0000000..1bbac5a
--- /dev/null
+++ b/dcae-services-policy-sync/policysync/util.py
@@ -0,0 +1,10 @@
+""" utility functions (currenlty just for logging) """
+import logging
+
+
+def get_module_logger(mod_name):
+ """
+ To use this, do logger = get_module_logger(__name__)
+ """
+ logger = logging.getLogger(mod_name)
+ return logger